In [None]:
import torch
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModel
from transformers import AutoTokenizer
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset , DataLoader
from sklearn.model_selection import train_test_split

In [None]:
model_name = "bert-base-uncased"
token = AutoTokenizer.from_pretrained(model_name)

emb_model = AutoModel.from_pretrained( model_name )
embedding_matrix = emb_model.embeddings.word_embeddings.weight

# **Data**

In [None]:
data_path = '/content/drive/MyDrive/DataColab/NLP_project/Smart Tech CV Parser & Matcher/DataSets/Preprocessed_Data.txt'
data = pd.read_csv( data_path )
data.head()

In [None]:
data['Category'].value_counts().shape

In [None]:
train , test = train_test_split(data , test_size = 0.25 , shuffle = True, stratify=data['Category'])

train = train.reset_index(drop=True)
test = test.reset_index(drop=True)

encoder = LabelEncoder()
train['Category'] = encoder.fit_transform(train['Category'] )
test['Category'] = encoder.transform(test['Category'])

train.to_csv('train.csv' , index = False)
test.to_csv('test.csv' , index = False)



---



# **Data Loader**

`Dataset`

In [None]:
import pandas as pd
import torch
from torch.utils.data import Dataset

class MyData(Dataset):
  def __init__(self, df_path, token, max_len=512, features_col='Text', label_col='Category'):
    self.df = pd.read_csv(df_path)
    self.x = self.df[features_col].tolist()
    self.y = self.df[label_col].tolist()
    self.token = token
    self.max_len = max_len

  def __len__(self):
    return len(self.x)

  def __getitem__(self, idx):
    text = self.x[idx]
    label = int(self.y[idx])

    enc = self.token(
      text,
      truncation=True,
      padding=False,
      max_length=self.max_len,
      return_tensors=None
    )

    input_ids = torch.tensor(enc["input_ids"], dtype=torch.long)  # (T,)
    length = input_ids.size(0)

    return input_ids, torch.tensor(label, dtype=torch.long), torch.tensor(length, dtype=torch.long)

`collate_fn`

In [None]:
from torch.nn.utils.rnn import pad_sequence

PAD_ID = 0

def collate_fn(batch):
  ids_list, labels, lengths = zip(*batch)

  x_padded = pad_sequence(ids_list, batch_first=True, padding_value=PAD_ID)  # (B, T)
  y = torch.stack(labels)       # (B,)
  lengths = torch.stack(lengths)  # (B,)

  return x_padded, y, lengths

In [None]:
train_path = '/content/train.csv'
test_path = '/content/test.csv'
train_data = MyData(train_path , token )
test_data = MyData(test_path , token )

In [None]:
# data now is ids tokens with padding [ 101, 9262, 9722,  ...,    0,    0,    0]
train_loader = DataLoader(train_data , batch_size = 16 , collate_fn=collate_fn)
test_loader = DataLoader(test_data , batch_size = 16 , collate_fn=collate_fn)



---



# **Model**

In [None]:
class BiLstmAttention(nn.Module) :

  def __init__(self , vocab_size , embedding_dim , embedding_matrix , hidden_dim=128 ,num_class = 43  ) :
    super().__init__()

    # Embedding
    self.embedding = nn.Embedding(vocab_size , embedding_dim , padding_idx=0 )
    self.embedding.weight.data.copy_(embedding_matrix)

    # Bi Lstm
    self.lstm = nn.LSTM(embedding_dim ,
                        hidden_dim , batch_first=True ,
                        num_layers=2 ,
                        bidirectional=True , dropout = 0.3)

    self.dropout = nn.Dropout(0.5)

    # Attention
    self.attention = nn.Linear(hidden_dim*2 , 1) # for score

    # fc
    self.fc = nn.Linear(hidden_dim*2 , num_class)


  def forward (self ,x , length ) :
    emb = self.embedding(x)
    emb = self.dropout(emb)

    output , _ = self.lstm(emb)

    scores = self.attention(output).squeeze(-1)

    T = x.size(1)
    mask = torch.arange(T, device=x.device).unsqueeze(0) < length.unsqueeze(1)  # (B, T)
    scores = scores.masked_fill(~mask, -1e9)

    weights = F.softmax(scores ,dim=1)

    sent_vec = (output * weights.unsqueeze(-1)).sum(dim=1)  # (B, 2H)

    sent_vec = self.dropout(sent_vec)

    logits = self.fc(sent_vec)

    return logits , weights

In [None]:
vocab_size = token.vocab_size
embedding_dim = 768

In [None]:
model = BiLstmAttention(vocab_size , embedding_dim , embedding_matrix , hidden_dim=128 ,num_class = 43)

# **Train**

In [None]:
import torch
import torch.nn as nn

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

def train_one_epoch(model, loader):
    model.train()

    total_loss = 0
    correct = 0
    total = 0

    for x, y , lengths in loader:
        x = x.to(device)
        lengths = lengths.to(device)
        y = y.to(device)

        # forward
        logits, _ = model(x, lengths)

        loss = criterion(logits, y)

        # backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        # accuracy
        preds = torch.argmax(logits, dim=1)
        correct += (preds == y).sum().item()
        total += y.size(0)

    avg_loss = total_loss / len(loader)
    acc = correct / total


    print(f"Train Loss: {avg_loss:.4f} | Train Acc: {acc:.4f}")

In [None]:
epochs = 10
for i in range(epochs) :
  print('epoch : ' ,i+1 )
  train_one_epoch(model , train_loader)
  print('------------------|------------------')

# **Test**

In [None]:
import torch
import numpy as np

def test_model_full(
    model,
    loader,
    num_classes,
    device=None,
    class_names=None,
    plot_roc=True
):
    """
    Full evaluation for classification models.

    Inputs
    ------
    model: PyTorch model that returns (logits, attention_weights)
    loader: DataLoader yielding (x, lengths, y)
    num_classes: int
    device: torch.device or None (auto)
    class_names: list[str] or None
    plot_roc: bool (draw ROC curves)

    Returns
    -------
    results: dict with accuracy, report, confusion_matrix, auc (macro/micro/per_class), roc (fpr/tpr)
    """

    # Lazy imports (so code doesn't crash if you only want basic metrics)
    from sklearn.metrics import (
        classification_report,
        confusion_matrix,
        accuracy_score,
        roc_auc_score,
        roc_curve
    )
    from sklearn.preprocessing import label_binarize

    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model.to(device)
    model.eval()

    all_logits = []
    all_y = []

    with torch.no_grad():
        for x, y , lengths in loader:
            x = x.to(device)
            lengths = lengths.to(device)
            y = y.to(device)

            logits, _ = model(x, lengths)  # logits: (B, C)
            all_logits.append(logits.detach().cpu())
            all_y.append(y.detach().cpu())

    logits = torch.cat(all_logits, dim=0).numpy()  # (N, C)
    y_true = torch.cat(all_y, dim=0).numpy()       # (N,)

    # probabilities for ROC/AUC
    probs = torch.softmax(torch.tensor(logits), dim=1).numpy()  # (N, C)
    y_pred = probs.argmax(axis=1)

    # ---- Basic metrics ----
    acc = accuracy_score(y_true, y_pred)

    if class_names is None:
        class_names = [str(i) for i in range(num_classes)]

    report = classification_report(
        y_true, y_pred,
        target_names=class_names,
        digits=4,
        zero_division=0
    )

    cm = confusion_matrix(y_true, y_pred)

    results = {
        "accuracy": acc,
        "classification_report": report,
        "confusion_matrix": cm,
    }

    # ---- ROC / AUC ----
    # For multiclass: compute One-vs-Rest ROC and AUC
    # y_true_bin: (N, C)
    y_true_bin = label_binarize(y_true, classes=list(range(num_classes)))

    roc_data = {}
    auc_data = {}

    # per-class ROC + AUC
    per_class_auc = []
    for c in range(num_classes):
        # If a class doesn't appear in y_true, roc_curve can error.
        # We handle it safely.
        if y_true_bin[:, c].sum() == 0:
            roc_data[c] = {"fpr": None, "tpr": None}
            per_class_auc.append(np.nan)
            continue

        fpr, tpr, _ = roc_curve(y_true_bin[:, c], probs[:, c])
        roc_data[c] = {"fpr": fpr, "tpr": tpr}
        per_class_auc.append(roc_auc_score(y_true_bin[:, c], probs[:, c]))

    # macro/micro AUC (OvR)
    # micro: treat every (sample,class) decision as one binary decision
    try:
        auc_micro = roc_auc_score(y_true_bin, probs, average="micro", multi_class="ovr")
    except Exception:
        auc_micro = np.nan

    # macro: average over classes (ignoring NaNs)
    per_class_auc_np = np.array(per_class_auc, dtype=float)
    auc_macro = np.nanmean(per_class_auc_np)

    auc_data["per_class"] = per_class_auc
    auc_data["macro_ovr"] = float(auc_macro) if not np.isnan(auc_macro) else np.nan
    auc_data["micro_ovr"] = float(auc_micro) if not np.isnan(auc_micro) else np.nan

    results["roc"] = roc_data
    results["auc"] = auc_data

    # ---- Print summary ----
    print(f"Accuracy: {acc:.4f}\n")
    print("Classification Report:\n")
    print(report)
    print("Confusion Matrix:\n", cm)
    print("\nAUC (OvR):")
    print("  macro:", auc_data["macro_ovr"])
    print("  micro:", auc_data["micro_ovr"])
    print("  per_class:", auc_data["per_class"])

    # ---- Plot ROC curves (optional) ----
    if plot_roc:
        import matplotlib.pyplot as plt

        # Plot only classes that have valid ROC
        plt.figure()
        any_plotted = False
        for c in range(num_classes):
            fpr = roc_data[c]["fpr"]
            tpr = roc_data[c]["tpr"]
            if fpr is None or tpr is None:
                continue
            any_plotted = True
            label = f"{class_names[c]} (AUC={per_class_auc[c]:.3f})"
            plt.plot(fpr, tpr, label=label)

        if any_plotted:
            plt.plot([0, 1], [0, 1], linestyle="--", label="Chance")
            plt.xlabel("False Positive Rate")
            plt.ylabel("True Positive Rate")
            plt.title("ROC Curve (One-vs-Rest)")
            plt.legend()
            plt.show()
        else:
            print("\nROC plot skipped: not enough positive samples per class to draw curves.")

    return results

In [None]:
# class_names = ["class0", "class1", "..."]

results = test_model_full(
    model=model,
    loader=test_loader,
    num_classes=43,
    # class_names=class_names,
    plot_roc=True
)

# **Error Analysis**

In [None]:
import os
import json
import csv
import torch
import torch.nn.functional as F
import numpy as np
from collections import Counter

def error_analysis(
    model,
    loader,
    tokenizer,
    class_names=None,
    device=None,

    # what to return
    max_errors=100,
    topk_tokens=12,
    confidence_threshold=0.0,   # keep wrong preds with conf >= threshold

    # plots
    plot=True,
    n_attn_plots=10,
    top_confusions=15,          # bar chart: top confusing pairs

    # attention display
    merge_wordpieces=True,
    ignore_special=True,

    # export
    export_dir=None,            # e.g. "outputs" or None
    export_prefix="errors"
):
    """
    ERROR ANALYSIS ONLY (no ROC/AUC, no full test suite).
    - Collects misclassified samples (sorted by highest confidence first)
    - Shows top confusing true->pred pairs
    - Visualizes:
        1) Wrong-confidence histogram
        2) Confusion pairs bar chart
        3) Attention heatmaps for top wrong samples
      ...and prints an English explanation after each figure.
    - Optional export to JSON/CSV + summary JSON.

    Assumptions:
      loader yields (x, lengths, y)
      model(x, lengths) returns (logits, attention_weights)
      x are BERT input_ids (so we can convert ids->tokens with tokenizer)
    """

    # ---------- helpers ----------
    def _safe_label(i: int) -> str:
        if class_names is None:
            return str(i)
        return class_names[i] if 0 <= i < len(class_names) else str(i)

    def _merge_wordpieces(tokens, weights):
        """
        Merge WordPiece tokens: ["play", "##ing"] -> ["playing"]
        Aggregate attention weights by summing, then renormalize.
        """
        merged_tokens, merged_weights = [], []
        cur_tok, cur_w = "", 0.0

        for tok, w in zip(tokens, weights):
            if tok.startswith("##") and cur_tok != "":
                cur_tok += tok[2:]
                cur_w += w
            else:
                if cur_tok != "":
                    merged_tokens.append(cur_tok)
                    merged_weights.append(cur_w)
                cur_tok = tok
                cur_w = w

        if cur_tok != "":
            merged_tokens.append(cur_tok)
            merged_weights.append(cur_w)

        s = sum(merged_weights)
        if s > 0:
            merged_weights = [mw / s for mw in merged_weights]

        return merged_tokens, merged_weights

    def _topk(tokens, weights, k):
        k = min(k, len(tokens))
        if k <= 0:
            return []
        idx = np.argsort(-np.array(weights))[:k]
        return [{"token": tokens[i], "weight": float(weights[i]), "index": int(i)} for i in idx]

    # ---------- device ----------
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model.to(device)
    model.eval()

    special = set(tokenizer.all_special_tokens) if ignore_special else set()

    # ---------- collect wrong predictions only ----------
    errors = []
    confusion_counter = Counter()
    wrong_confidences = []

    with torch.no_grad():
        global_index = 0  # index across the whole loader

        for x, lengths, y in loader:
            x = x.to(device)
            lengths = lengths.to(device)
            y = y.to(device)

            logits, attn = model(x, lengths)     # logits (B,C), attn (B,T)
            probs = F.softmax(logits, dim=1)     # (B,C)
            preds = probs.argmax(dim=1)          # (B,)
            confs = probs.max(dim=1).values      # (B,)

            B = x.size(0)
            for i in range(B):
                true_id = int(y[i].item())
                pred_id = int(preds[i].item())
                conf = float(confs[i].item())

                if pred_id != true_id and conf >= confidence_threshold:
                    # tokens + attention for this sample
                    T = int(lengths[i].item())
                    ids = x[i].detach().cpu().tolist()[:T]
                    tokens = tokenizer.convert_ids_to_tokens(ids)

                    weights = attn[i].detach().cpu().tolist()[:T]
                    s = sum(weights)
                    if s > 0:
                        weights = [w / s for w in weights]

                    # remove special tokens
                    if ignore_special:
                        kept = [(t, w) for t, w in zip(tokens, weights) if t not in special]
                        tokens_kept = [t for t, _ in kept]
                        weights_kept = [w for _, w in kept]
                    else:
                        tokens_kept, weights_kept = tokens, weights

                    # merge wordpieces
                    if merge_wordpieces:
                        tokens_final, weights_final = _merge_wordpieces(tokens_kept, weights_kept)
                    else:
                        tokens_final, weights_final = tokens_kept, weights_kept

                    top_tokens = _topk(tokens_final, weights_final, topk_tokens)

                    errors.append({
                        "global_index": int(global_index + i),
                        "true_id": true_id,
                        "true_label": _safe_label(true_id),
                        "pred_id": pred_id,
                        "pred_label": _safe_label(pred_id),
                        "confidence": conf,
                        "length": int(T),
                        "top_tokens_by_attention": top_tokens,
                        "tokens": tokens_final,
                        "attention": [float(a) for a in weights_final],
                    })

                    wrong_confidences.append(conf)
                    confusion_counter[(true_id, pred_id)] += 1

            global_index += B

    # sort errors: highest confidence wrong first
    errors.sort(key=lambda e: e["confidence"], reverse=True)
    errors = errors[:max_errors]

    # top confusion pairs
    confusions = []
    for (t, p), c in confusion_counter.most_common(top_confusions):
        confusions.append({
            "true_id": int(t), "true_label": _safe_label(int(t)),
            "pred_id": int(p), "pred_label": _safe_label(int(p)),
            "count": int(c)
        })

    summary = {
        "wrong_found_total": int(sum(confusion_counter.values())),
        "returned_errors": int(len(errors)),
        "confidence_threshold": float(confidence_threshold),
        "top_confusions": confusions,
    }

    print(f"Wrong samples found: {summary['wrong_found_total']} | "
          f"Returned: {summary['returned_errors']} (conf >= {confidence_threshold})")

    # ---------- plots + English explanations ----------
    if plot:
        import matplotlib.pyplot as plt

        # 1) Wrong-confidence histogram
        if len(wrong_confidences) > 0:
            plt.figure(figsize=(7, 4))
            plt.hist(wrong_confidences, bins=25)
            plt.title("Wrong Predictions — Confidence Histogram")
            plt.xlabel("Confidence (max softmax probability)")
            plt.ylabel("Count")
            plt.tight_layout()
            plt.show()

            print("\n[Figure 1 Explanation — Wrong Confidence Histogram]")
            print("- This shows how confident the model was on the samples it MISCLASSIFIED.")
            print("- If many errors have high confidence (e.g., > 0.7), the model is overconfident and making 'dangerous' mistakes.")
            print("- If most errors have low confidence, the model is uncertain (less risky) when it makes mistakes.")
            print("How to use it:")
            print("- High-confidence errors: inspect data quality/labels, add regularization, or improve features/model.")
            print("- You can set a confidence threshold in production to reject uncertain predictions.\n")

        # 2) Top confusion pairs bar chart
        if len(confusions) > 0:
            labels = [f"{c['true_label']} → {c['pred_label']}" for c in confusions]
            counts = [c["count"] for c in confusions]

            plt.figure(figsize=(10, max(4, 0.35 * len(labels))))
            plt.barh(labels[::-1], counts[::-1])
            plt.title("Top Confusion Pairs (True → Predicted)")
            plt.xlabel("Number of Mistakes")
            plt.tight_layout()
            plt.show()

            print("[Figure 2 Explanation — Top Confusion Pairs]")
            print("- Each bar shows the most common 'true class → predicted class' mistakes.")
            print("- This tells you which classes the model confuses the most.")
            print("How to use it:")
            print("- If class A is often predicted as class B, compare their training examples: they may overlap in meaning.")
            print("- Add more data for those classes, improve labeling rules, or redesign class definitions if needed.")
            print("- You can also consider class-specific weighting or focal loss if imbalance is an issue.\n")

        # 3) Attention heatmaps for some errors
        for idx_plot, e in enumerate(errors[:min(n_attn_plots, len(errors))], start=1):
            tokens_plot = e["tokens"]
            att_plot = np.array(e["attention"], dtype=float)

            plt.figure(figsize=(max(8, len(tokens_plot) * 0.35), 2.2))
            plt.imshow(att_plot.reshape(1, -1), aspect="auto")
            plt.yticks([])
            plt.xticks(np.arange(len(tokens_plot)), tokens_plot, rotation=45, ha="right")
            plt.colorbar()
            plt.title(
                f"Attention Heatmap (WRONG #{idx_plot}) | true={e['true_label']} "
                f"pred={e['pred_label']} conf={e['confidence']:.2f}"
            )
            plt.tight_layout()
            plt.show()

            print(f"[Figure 3.{idx_plot} Explanation — Attention Heatmap for a Wrong Prediction]")
            print("- Darker/stronger colors mean the model gave more attention to those tokens.")
            print("- This helps you understand *what evidence* the model used when it made a wrong decision.")
            print("How to use it:")
            print("- If attention focuses on irrelevant words (e.g., stopwords) or on artifacts, improve preprocessing or training data.")
            print("- If attention highlights misleading keywords, you may need more diverse examples or better class separation.")
            print("- If you see special tokens or padding receiving attention, your masking/lengths logic is likely wrong.\n")

    # ---------- export ----------
    if export_dir is not None:
        os.makedirs(export_dir, exist_ok=True)

        json_path = os.path.join(export_dir, f"{export_prefix}.json")
        with open(json_path, "w", encoding="utf-8") as f:
            json.dump({"summary": summary, "errors": errors}, f, ensure_ascii=False, indent=2)

        csv_path = os.path.join(export_dir, f"{export_prefix}.csv")
        with open(csv_path, "w", encoding="utf-8", newline="") as f:
            writer = csv.writer(f)
            writer.writerow([
                "global_index","true_id","true_label","pred_id","pred_label",
                "confidence","length","top_tokens_by_attention"
            ])
            for e in errors:
                top_str = "; ".join([f"{t['token']}:{t['weight']:.4f}" for t in e["top_tokens_by_attention"]])
                writer.writerow([
                    e["global_index"], e["true_id"], e["true_label"],
                    e["pred_id"], e["pred_label"],
                    f"{e['confidence']:.6f}", e["length"], top_str
                ])

        summary_path = os.path.join(export_dir, f"{export_prefix}_summary.json")
        with open(summary_path, "w", encoding="utf-8") as f:
            json.dump(summary, f, ensure_ascii=False, indent=2)

        print(f"✅ Exported:\n- {json_path}\n- {csv_path}\n- {summary_path}")

    return errors, confusions, summary

In [None]:
errors, confusions, summary = error_analysis(
    model=model,
    loader=test_loader,
    tokenizer=token,
    # class_names=class_names,     # optional
    confidence_threshold=0.7,
    max_errors=100,
    n_attn_plots=10,
    plot=True,
    export_dir="outputs",
    export_prefix="bilstm_attn_errors"
)

# **Save Model**

In [None]:
# torch.save(model.state_dict(), "bilstm_attention.pth")