In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
import re

from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.pipeline import Pipeline
from sklearn.metrics import (
    f1_score, confusion_matrix, roc_curve, roc_auc_score
)
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.linear_model import SGDClassifier

In [None]:
def clean_text(text: str) -> str:
    """
    Minimal cleaning for classical ML workflows:
    - lowercase
    - strip whitespace
    - normalize spaces
    - keep punctuation
    """
    if not isinstance(text, str):
        return ""

    text = text.lower()
    text = text.strip()
    text = re.sub(r"\s+", " ", text)
    return text

def deduplicate_reviews(df: pd.DataFrame, text_col: str, label_col: str) -> pd.DataFrame:
    """
    Updated deduplication pipeline:

    1. Clean text using ML-cleaning (lowercase + normalize);
       this ensures identical texts compare correctly.
    2. Group by cleaned text.
    3. If group labels conflict → drop the entire group.
    4. If group labels are consistent → keep ONE copy.

    Returns a cleaned DataFrame.
    """

    # Step 1 — Clean the text BEFORE grouping
    df = df.copy()
    df["_clean_text"] = df[text_col].apply(clean_text)

    keep_indices = []

    grouped = df.groupby("_clean_text")
    for clean_txt, group in grouped:
        unique_labels = group[label_col].unique()

        if len(unique_labels) > 1:
            # Conflict → remove whole group
            continue

        # Keep one example
        keep_indices.append(group.index[0])

    cleaned_df = df.loc[keep_indices].copy()
    cleaned_df.drop(columns=["_clean_text"], inplace=True)

    return cleaned_df

## Loading, Cleaning and Deduplication

In [None]:
# Load dataset
df = pd.read_excel(os.path.join("..", "data", "raw", "Allegato 1 - data_classification.xlsx"))

# Clean text for ML use
df["clean_text"] = df["Review"].apply(clean_text)

# Deduplicate using cleaned text + consistent label logic
df = deduplicate_reviews(df, text_col="clean_text", label_col="Promotore")

print("Final dataset size after dedupe:", df.shape)

## Train/Test Split

In [None]:
X = df["clean_text"]
y = df["Promotore"]

X_train, X_val, y_train, y_val = train_test_split(
    X, y,
    test_size=0.20,
    random_state=42,
    stratify=y
)

## Model Candidates and Search Space

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

# Logistic Regression
pipe_lr = Pipeline([
    ("tfidf", TfidfVectorizer(preprocessor=clean_text)),
    ("clf", LogisticRegression(max_iter=1000, n_jobs=-1))
])

# Linear SVC
pipe_svc = Pipeline([
    ("tfidf", TfidfVectorizer(preprocessor=clean_text)),
    ("clf", LinearSVC())
])

# Stochastic Gradient Descent Classifier
pipe_sgd = Pipeline([
    ("tfidf", TfidfVectorizer(preprocessor=clean_text)),
    ("clf", SGDClassifier(random_state=42))
])

pipelines = {
    "logreg": pipe_lr,
    "svm": pipe_svc,
    "sgd": pipe_sgd
}

In [None]:
# Vectorizer hyperparameters
tfidf_params = {
    "tfidf__ngram_range": [(1,1), (1,2)],
    "tfidf__min_df": [3, 5],
    "tfidf__max_df": [0.90, 0.95],
    "tfidf__max_features": [50_000, 100_000]
}

lr_params = {
    **tfidf_params,
    "clf__C": [0.5, 1.0, 2.0, 3.0],
    "clf__class_weight": [None, "balanced"]
}

svc_params = {
    **tfidf_params,
    "clf__C": [0.5, 1.0, 2.0]
}

sgd_params = {
    **tfidf_params,
    "clf__loss": ["log_loss", "hinge"],
    "clf__alpha": [1e-4, 1e-5],
    "clf__penalty": ["l2", "l1", "elasticnet"]
}

param_grids = {
    "logreg": lr_params,
    "svm": svc_params,
    "sgd": sgd_params
}

## Grid Search

In [None]:
best_models = {}
results = []

for name in pipelines:
    print(f"\n### Running search for {name} ###")

    search = RandomizedSearchCV(
        estimator=pipelines[name],
        param_distributions=param_grids[name],
        n_iter=20,
        scoring="f1",
        cv=3,
        n_jobs=-1,
        verbose=2,
        random_state=42
    )

    search.fit(X_train, y_train)

    best_models[name] = search.best_estimator_
    results.append({
        "model": name,
        "best_score": search.best_score_,
        "best_params": search.best_params_
    })


In [None]:
pd.DataFrame(results)

## Evaluate Best Model

In [None]:
best_name = max(results, key=lambda x: x["best_score"])["model"]
best_model = best_models[best_name]

print("Best model:", best_name)

In [None]:
y_pred = best_model.predict(X_val)
y_scores = (
    best_model.decision_function(X_val)
    if hasattr(best_model, "decision_function")
    else best_model.predict_proba(X_val)[:,1]
)

f1 = f1_score(y_val, y_pred)
auc = roc_auc_score(y_val, y_scores)
cm = confusion_matrix(y_val, y_pred)

print("Validation F1:", f1)
print("Validation AUC:", auc)
print("Confusion Matrix:\n", cm)

In [None]:
# ROC Curve
fpr, tpr, thresh = roc_curve(y_val, y_scores)
plt.plot(fpr, tpr, label=f"AUC={auc:.3f}")
plt.plot([0,1],[0,1],"--")
plt.title(f"ROC Curve — {best_name}")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.legend()
plt.show()

In [None]:
# Confusion Matrix
plt.figure(figsize=(6,4))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.title(f"Confusion Matrix — {best_name}")
plt.show()

## Save Best Configuration

In [None]:
import json
import pickle

# Output dir
out_dir = os.path.join("..", "models")
os.makedirs(out_dir, exist_ok=True)

# Extract best params and model
best_params = best_model["best_params"]

# Save best params (JSON)
params_path = os.path.join(out_dir, f"best_params_{best_name}.json")
with open(params_path, "w", encoding="utf-8") as f:
	json.dump(best_params, f, ensure_ascii=False, indent=2, default=str)

# Save best model (Pickle)
model_path = os.path.join(out_dir, f"best_model_{best_name}.pkl")
with open(model_path, "wb") as f:
	pickle.dump(best_model, f, protocol=pickle.HIGHEST_PROTOCOL)

print(f"Saved best params to: {params_path}")
print(f"Saved best model to:  {model_path}")
