In [1]:
#!/usr/bin/env python3
"""
CPU-optimized Logistic Regression text classifier (TF-IDF + multinomial LR)

Matches DistilBERT script conventions:
- Reads: out/alerts_pseudo.csv  (columns: Pseudo_Description, Priority_Level)
- Same stratified 70/15/15 split + deterministic label encoding
- Class imbalance handling (class_weight='balanced')
- Simple hyperparam sweep on C using the validation set (macro-F1)
- Saves: metrics JSON, confusion matrix CSV/PNG, curves PNG, label map
- Fast on CPU; no extra deps beyond scikit-learn, numpy, pandas, matplotlib

Run:
    python train_logreg_text.py
"""

import os, json, math, warnings
from pathlib import Path

warnings.filterwarnings("ignore", category=UserWarning)

import numpy as np
import pandas as pd
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.metrics import f1_score

# ---------------- Hard-coded settings ----------------
in_csv = "out/alerts_pseudo.csv"      # <--- change if needed
text_col = "Pseudo_Description"
label_col = "Priority_Level"
out_dir = Path("artifacts/priority_model_lr")

# Vectorizer settings (good defaults for alert text)
tfidf_params = dict(
    lowercase=True,
    strip_accents="unicode",
    analyzer="word",
    ngram_range=(1, 2),   # unigrams + bigrams
    min_df=2,
    max_df=0.95,
    sublinear_tf=True
)

# Logistic Regression settings
# We'll sweep over C values on the validation set to pick best macro-F1.
C_grid = [0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0]
max_iter = 2000
penalty = "l2"
solver = "saga"  # supports multinomial + class_weight
class_weight = "balanced"
seed = 42
# -----------------------------------------------------


def clean_text(s: str) -> str:
    s = "" if pd.isna(s) else str(s)
    return " ".join(s.split())


def split_stratified(texts, y, seed=42):
    X_train, X_tmp, y_train, y_tmp = train_test_split(
        texts, y, test_size=0.30, random_state=seed, stratify=y
    )
    X_val, X_test, y_val, y_test = train_test_split(
        X_tmp, y_tmp, test_size=0.50, random_state=seed, stratify=y_tmp
    )
    return X_train, X_val, X_test, y_train, y_val, y_test


def accuracy(y_true, y_pred):
    return float((y_true == y_pred).mean()) if len(y_true) else 0.0


def per_class_metrics(y_true, y_pred, n_classes):
    metrics = {}
    pr_list, rc_list, f1_list = [], [], []
    for c in range(n_classes):
        tp = int(((y_true == c) & (y_pred == c)).sum())
        fp = int(((y_true != c) & (y_pred == c)).sum())
        fn = int(((y_true == c) & (y_pred != c)).sum())
        prec = tp / (tp + fp) if (tp + fp) > 0 else 0.0
        rec  = tp / (tp + fn) if (tp + fn) > 0 else 0.0
        f1   = (2 * prec * rec) / (prec + rec) if (prec + rec) > 0 else 0.0
        metrics[c] = {"precision": prec, "recall": rec, "f1": f1, "support": int((y_true == c).sum())}
        pr_list.append(prec); rc_list.append(rec); f1_list.append(f1)
    macro = {
        "precision": float(np.mean(pr_list)) if pr_list else 0.0,
        "recall": float(np.mean(rc_list)) if rc_list else 0.0,
        "f1": float(np.mean(f1_list)) if f1_list else 0.0,
    }
    return metrics, macro


def confusion_matrix_counts(y_true, y_pred, n_classes):
    cm = np.zeros((n_classes, n_classes), dtype=int)
    for t, p in zip(y_true, y_pred):
        cm[int(t), int(p)] += 1
    return cm


def main():
    np.random.seed(seed)
    out_dir.mkdir(parents=True, exist_ok=True)

    # ---- Data
    df = pd.read_csv(in_csv)
    need = {text_col, label_col}
    missing = need - set(df.columns)
    if missing:
        raise SystemExit(f"Missing columns: {missing}")

    df = df[[text_col, label_col]].dropna().drop_duplicates()
    df[text_col] = df[text_col].apply(clean_text)
    df = df[df[text_col].str.len() > 0].reset_index(drop=True)

    # Label encoding (deterministic, lexicographic order like the BERT script)
    labels_raw = df[label_col].astype(str).values
    classes_sorted = sorted(np.unique(labels_raw).tolist())
    label2id = {lbl: i for i, lbl in enumerate(classes_sorted)}
    id2label = {i: lbl for lbl, i in label2id.items()}
    y = np.array([label2id[s] for s in labels_raw], dtype=np.int64)
    num_classes = len(classes_sorted)

    # Persist label map
    (out_dir / "label_map.json").write_text(
        json.dumps({"label2id": label2id, "id2label": {int(k): v for k, v in id2label.items()}}, indent=2),
        encoding="utf-8"
    )

    # Split
    X_train, X_val, X_test, y_train, y_val, y_test = split_stratified(df[text_col].tolist(), y, seed=seed)

    # ---- Model selection on validation set (sweep C)
    val_scores = []
    best = {"C": None, "val_macro_f1": -1.0, "model": None}

    for C in C_grid:
        pipe = Pipeline([
            ("tfidf", TfidfVectorizer(**tfidf_params)),
            ("clf", LogisticRegression(
                C=C,
                penalty=penalty,
                solver=solver,
                class_weight=class_weight,
                max_iter=max_iter,
                n_jobs=os.cpu_count() or 1,
                random_state=seed,
                multi_class="multinomial"
            ))
        ])

        pipe.fit(X_train, y_train)
        val_pred = pipe.predict(X_val)
        val_macro_f1 = f1_score(y_val, val_pred, average="macro")
        val_scores.append((C, val_macro_f1))

        if val_macro_f1 > best["val_macro_f1"]:
            best.update({"C": C, "val_macro_f1": float(val_macro_f1), "model": pipe})

        print(f"C={C:<4}  val_macro_f1={val_macro_f1:.4f}")

    # Save the C-sweep curve
    if val_scores:
        Cs = [c for c, _ in val_scores]
        f1s = [s for _, s in val_scores]
        fig = plt.figure(figsize=(7, 4.5))
        plt.plot(Cs, f1s, marker="o", label="val_macro_f1")
        plt.xscale("log")
        plt.xlabel("C (inverse regularization) [log scale]")
        plt.ylabel("Macro-F1 (validation)")
        plt.title("LR model selection on validation set")
        plt.grid(True, alpha=0.3)
        plt.legend()
        fig.tight_layout()
        fig.savefig(out_dir / "val_c_sweep.png", dpi=160)
        plt.close(fig)

    if best["model"] is None:
        raise SystemExit("No model trained during C sweep.")

    # ---- (Optional) Refit on Train+Val using best C, evaluate on Test
    X_trval = X_train + X_val
    y_trval = np.concatenate([y_train, y_val])

    final_pipe = Pipeline([
        ("tfidf", TfidfVectorizer(**tfidf_params)),
        ("clf", LogisticRegression(
            C=best["C"],
            penalty=penalty,
            solver=solver,
            class_weight=class_weight,
            max_iter=max_iter,
            n_jobs=os.cpu_count() or 1,
            random_state=seed,
            multi_class="multinomial"
        ))
    ])
    final_pipe.fit(X_trval, y_trval)

    # ---- Test evaluation
    test_pred = final_pipe.predict(X_test)
    acc = accuracy(y_test, test_pred)
    per_class, macro = per_class_metrics(y_test, test_pred, n_classes=num_classes)
    cm = confusion_matrix_counts(y_test, test_pred, n_classes=num_classes)

    # ---- Save reports
    rep = {
        "accuracy": float(acc),
        "macro": macro,
        "per_class": {id2label[i]: per_class[i] for i in range(num_classes)},
        "best_C": float(best["C"]),
        "val_macro_f1_at_best_C": float(best["val_macro_f1"])
    }
    (out_dir / "test_classification_report.json").write_text(json.dumps(rep, indent=2), encoding="utf-8")

    pd.DataFrame(
        cm,
        index=[id2label[i] for i in range(num_classes)],
        columns=[id2label[i] for i in range(num_classes)]
    ).to_csv(out_dir / "test_confusion_matrix.csv", index=True)

    summary = {
        "test_accuracy": float(acc),
        "test_macro_f1": float(macro["f1"]),
        "n_train": int(len(X_train)),
        "n_val": int(len(X_val)),
        "n_test": int(len(X_test)),
        "labels": [id2label[i] for i in range(num_classes)],
        "best_C": float(best["C"])
    }
    (out_dir / "summary.json").write_text(json.dumps(summary, indent=2), encoding="utf-8")
    print(json.dumps(summary, indent=2))

    # ---- Confusion matrix plot (row-normalized)
    cm_norm = cm.astype(float) / cm.sum(axis=1, keepdims=True).clip(min=1.0)
    fig = plt.figure(figsize=(6 + 0.3*num_classes, 5 + 0.3*num_classes))
    plt.imshow(cm_norm, aspect="auto")
    ticks = np.arange(num_classes)
    labels = [id2label[i] for i in ticks]
    plt.xticks(ticks, labels, rotation=45, ha="right")
    plt.yticks(ticks, labels)
    plt.xlabel("Predicted"); plt.ylabel("True")
    plt.title("Confusion Matrix (row-normalized) — Logistic Regression")
    for i in range(cm_norm.shape[0]):
        for j in range(cm_norm.shape[1]):
            plt.text(j, i, f"{cm_norm[i, j]:.2f}", ha="center", va="center")
    plt.tight_layout()
    fig.savefig(out_dir / "confusion_matrix.png", dpi=160)
    plt.close(fig)

    # ---- Persist vectorizer + model (sklearn pipeline)
    # Joblib is standard for sklearn persistence
    try:
        import joblib
        joblib.dump(final_pipe, out_dir / "model.joblib", compress=3)
    except Exception as e:
        print(f"Warning: failed to save model.joblib due to: {e}")

    # Also persist the tf-idf vocabulary for transparency (optional)
    try:
        tfidf = final_pipe.named_steps["tfidf"]
        vocab = {k: int(v) for k, v in tfidf.vocabulary_.items()}
        (out_dir / "tfidf_vocab.json").write_text(json.dumps(vocab), encoding="utf-8")
    except Exception:
        pass


if __name__ == "__main__":
    main()




C=0.05  val_macro_f1=0.6431
C=0.1   val_macro_f1=0.6745




C=0.2   val_macro_f1=0.6892
C=0.5   val_macro_f1=0.7151




C=1.0   val_macro_f1=0.6722




C=2.0   val_macro_f1=0.6746




C=5.0   val_macro_f1=0.6693




{
  "test_accuracy": 0.7597765363128491,
  "test_macro_f1": 0.6297132751690375,
  "n_train": 833,
  "n_val": 178,
  "n_test": 179,
  "labels": [
    "1",
    "2",
    "3"
  ],
  "best_C": 0.5
}
