In [1]:
import json
import pandas as pd
import numpy as np
import re
import nltk
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, confusion_matrix, f1_score
from sklearn.base import BaseEstimator, TransformerMixin

# Gensim for Dense Embeddings
from gensim.models import Word2Vec

# NLTK downloads for preprocessing
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)
nltk.download('omw-1.4', quiet=True)
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

# --- MANDATORY METHODOLOGY: REPRODUCIBILITY ---
RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)

In [2]:
def load_and_parse_data(filepath, is_test=False):
    with open(filepath, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    # Convert dictionary of dictionaries to DataFrame
    df = pd.DataFrame.from_dict(data, orient='index')
    
    # Reset index to make the ID a column
    df = df.reset_index().rename(columns={'index': 'id_EXIST'})
    
    # Label Processing (Only for Training Data)
    if not is_test and 'labels_task1_1' in df.columns:
        # Majority Vote Function
        def get_majority_vote(labels_list):
            # labels_list is like ['YES', 'YES', 'NO', 'NO', 'YES', 'NO']
            # We count occurrences
            counts = pd.Series(labels_list).value_counts()
            # Return the label with the highest count
            return counts.idxmax()
        
        df['final_label'] = df['labels_task1_1'].apply(get_majority_vote)
        
        # Binary encoding: YES=1, NO=0 (adjust based on your specific classes)
        # Check unique classes first
        print(f"Classes found: {df['final_label'].unique()}")
    
    return df

# Load Data
print("Loading Training Data...")
df_train_full = load_and_parse_data('EXIST2025_training.json', is_test=False)

print(f"Total Training Samples: {len(df_train_full)}")
display(df_train_full[['id_EXIST', 'tweet', 'final_label']].head())

# --- MANDATORY METHODOLOGY: DATA SPLITTING ---
# Splitting the provided training file into an internal Train and Test set
# to perform the final Error Analysis effectively.
X = df_train_full['tweet']
y = df_train_full['final_label']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, 
    test_size=0.2, 
    stratify=y, # Maintains class balance
    random_state=RANDOM_STATE
)

print(f"Internal Train Shape: {X_train.shape}")
print(f"Internal Test Shape: {X_test.shape}")

Loading Training Data...


FileNotFoundError: [Errno 2] No such file or directory: 'EXIST2025_training.json'

In [None]:
lemmatizer = WordNetLemmatizer()
stop_words = set(stopwords.words('english')) | set(stopwords.words('spanish')) # Assuming mixed EN/ES data based on snippets

def clean_text(text, usage='clean'):
    # Basic Lowercase
    text = text.lower()
    
    if usage == 'raw':
        return text
    
    # Remove URLs
    text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
    # Remove user @ references and '#' from hashtags
    text = re.sub(r'\@\w+|\#', '', text)
    # Remove punctuation
    text = re.sub(r'[^\w\s]', '', text)
    
    # Tokenization
    tokens = text.split()
    
    if usage == 'clean':
        # Remove Stopwords and Lemmatize
        tokens = [lemmatizer.lemmatize(word) for word in tokens if word not in stop_words]
    
    return " ".join(tokens)

# Apply preprocessing strategies to create separate feature sets for ablation
X_train_clean = X_train.apply(lambda x: clean_text(x, usage='clean'))
X_test_clean = X_test.apply(lambda x: clean_text(x, usage='clean'))

X_train_raw = X_train.apply(lambda x: clean_text(x, usage='raw'))
X_test_raw = X_test.apply(lambda x: clean_text(x, usage='raw'))

print("Example Raw:", X_train_raw.iloc[0])
print("Example Clean:", X_train_clean.iloc[0])

In [None]:
class MeanEmbeddingVectorizer(BaseEstimator, TransformerMixin):
    def __init__(self, vector_size=100, window=5, min_count=1):
        self.vector_size = vector_size
        self.window = window
        self.min_count = min_count
        self.word2vec = None
        self.dim = vector_size

    def fit(self, X, y=None):
        # Train Word2Vec on the corpus provided (Domain Specific)
        sentences = [row.split() for row in X]
        self.word2vec = Word2Vec(sentences, 
                                 vector_size=self.vector_size, 
                                 window=self.window, 
                                 min_count=self.min_count, 
                                 workers=4,
                                 seed=RANDOM_STATE)
        return self

    def transform(self, X):
        return np.array([
            np.mean([self.word2vec.wv[w] for w in words if w in self.word2vec.wv]
                    or [np.zeros(self.dim)], axis=0)
            for words in [row.split() for row in X]
        ])

print("Dense Vectorizer Class Ready.")

In [None]:
# Initialize Results Dictionary
results = {}

# --- SETUP CROSS VALIDATION ---
cv_strat = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)

# --- PIPELINE 1: SPARSE (TF-IDF + Logistic Regression) ---
# Covers: N-gram Exploration, TF-IDF
pipe_sparse = Pipeline([
    ('vect', TfidfVectorizer()),
    ('clf', LogisticRegression(class_weight='balanced', max_iter=1000, random_state=RANDOM_STATE))
])

# Grid Search for Sparse
# Covers: Hyperparameter Optimization (C, ngram_range)
param_grid_sparse = {
    'vect__ngram_range': [(1, 1), (1, 2)], # Unigrams vs Bigrams
    'clf__C': [0.1, 1, 10]                 # Regularization strength
}

print("Running Grid Search on Sparse (TF-IDF)...")
grid_sparse = GridSearchCV(pipe_sparse, param_grid_sparse, cv=cv_strat, scoring='f1_macro', n_jobs=-1)
grid_sparse.fit(X_train_clean, y_train)

results['Sparse_Best_Score'] = grid_sparse.best_score_
results['Sparse_Best_Params'] = grid_sparse.best_params_
print(f"Best Sparse F1-Macro: {grid_sparse.best_score_:.4f}")
print(f"Best Sparse Params: {grid_sparse.best_params_}")


# --- PIPELINE 2: DENSE (Word2Vec + Logistic Regression) ---
pipe_dense = Pipeline([
    ('vect', MeanEmbeddingVectorizer(vector_size=100)),
    ('clf', LogisticRegression(class_weight='balanced', max_iter=1000, random_state=RANDOM_STATE))
])

# Grid Search for Dense
param_grid_dense = {
    'clf__C': [0.1, 1, 10]
}

print("\nRunning Grid Search on Dense (Word2Vec)...")
grid_dense = GridSearchCV(pipe_dense, param_grid_dense, cv=cv_strat, scoring='f1_macro', n_jobs=-1)
grid_dense.fit(X_train_clean, y_train)

results['Dense_Best_Score'] = grid_dense.best_score_
results['Dense_Best_Params'] = grid_dense.best_params_
print(f"Best Dense F1-Macro: {grid_dense.best_score_:.4f}")

In [None]:
best_model_sparse = grid_sparse.best_estimator_

# Evaluate on CLEAN data (Already fitted above, but for clarity)
clean_score = grid_sparse.best_score_

# Evaluate on RAW data (Retraining best params on raw text)
print("\n--- Ablation Study: Raw vs Clean ---")
best_model_sparse.fit(X_train_raw, y_train)
# Simple cross val score to compare
from sklearn.model_selection import cross_val_score
raw_scores = cross_val_score(best_model_sparse, X_train_raw, y_train, cv=cv_strat, scoring='f1_macro')
raw_mean_score = raw_scores.mean()

print(f"F1-Macro with CLEANED text: {clean_score:.4f}")
print(f"F1-Macro with RAW text:     {raw_mean_score:.4f}")

In [None]:
# Select the best performing model overall (usually Sparse for text)
final_model = grid_sparse.best_estimator_
# Ensure it is fitted on the full internal train set (Clean version)
final_model.fit(X_train_clean, y_train)

# Predict on Internal Test Set
y_pred = final_model.predict(X_test_clean)

# 1. Quantitative Metrics
print("\n--- Classification Report ---")
print(classification_report(y_test, y_pred))

# 2. Confusion Matrix 
conf_mat = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(6,5))
sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues', 
            xticklabels=final_model.classes_, yticklabels=final_model.classes_)
plt.title('Confusion Matrix')
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.show()

# 3. Discriminative Features (Top Weighted Words)
# Extract feature names and coefficients
vectorizer = final_model.named_steps['vect']
classifier = final_model.named_steps['clf']
feature_names = vectorizer.get_feature_names_out()
coefs = classifier.coef_[0] # Assuming binary classification

# Create dataframe of terms and weights
df_features = pd.DataFrame({'term': feature_names, 'weight': coefs})
top_positive = df_features.sort_values(by='weight', ascending=False).head(10)
top_negative = df_features.sort_values(by='weight', ascending=True).head(10)

print("\n--- Top Discriminative Features (Positive Class) ---")
print(top_positive)
print("\n--- Top Discriminative Features (Negative Class) ---")
print(top_negative)

# 4. Qualitative Failure Analysis
print("\n--- Qualitative Failure Analysis (Misclassified Examples) ---")
results_df = pd.DataFrame({
    'text': X_test, # Use original text for readability
    'true': y_test,
    'pred': y_pred
})

# Filter for errors
errors = results_df[results_df['true'] != results_df['pred']]

# Display 5 examples
for i, row in errors.head(5).iterrows():
    print(f"Text: {row['text']}")
    print(f"True: {row['true']} | Pred: {row['pred']}")
    print("-" * 50)