In [None]:
import json
import torch
import random
import re
import string
import nltk
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.optim as optim
from transformers import AutoTokenizer, AutoModel
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer
from bs4 import BeautifulSoup
from nltk.corpus import stopwords

nltk.download('stopwords')

import warnings
warnings.filterwarnings("ignore")


In [None]:
train_path = "/home/numaan.naeem/BEA_2025/mrbench_v3_devset.json"
test_path  = "/home/numaan.naeem/BEA_2025/mrbench_v3_testset.json"

In [None]:
# -------------------------- Load and preprocess data -------------------------- #
def load_and_process_train_data(json_path):
    with open(json_path, "r", encoding="utf-8") as f:
        data = json.load(f)

    records = []
    for item in data:
        conv_id = item["conversation_id"]
        history = item["conversation_history"]
        for model, details in item["tutor_responses"].items():
            record = {
                "conversation_id": conv_id,
                "model": model,
                "conversation_history": history,
                "response": details["response"],
                "mistake_identification": details["annotation"]["Mistake_Identification"].lower()
            }
            records.append(record)

    return pd.DataFrame(records)

def load_and_process_test_data(json_path):
    with open(json_path, "r", encoding="utf-8") as f:
        data = json.load(f)

    records = []
    for item in data:
        conv_id = item["conversation_id"]
        history = item["conversation_history"]
        for model, details in item["tutor_responses"].items():
            record = {
                "conversation_id": conv_id,
                "model": model,
                "conversation_history": history,
                "response": details["response"]
            }
            records.append(record)

    return pd.DataFrame(records)

In [None]:
df_train = load_and_process_train_data(train_path)
df_test  = load_and_process_test_data(test_path)

In [None]:
df_train["mistake_identification"].value_counts()

In [None]:
def clean_text(text):
    '''Clean emoji, Make text lowercase, remove text in square brackets,remove links,remove punctuation
    and remove words containing numbers.'''
    text = re.sub(r'\:(.*?)\:','',text)
    text = str(text).lower()    #Making Text Lowercase
    text = re.sub('\[.*?\]', '', text)
    #The next 2 lines remove html text
    text = BeautifulSoup(text, 'lxml').get_text()
    text = re.sub('https?://\S+|www\.\S+', '', text)
    text = re.sub('<.*?>+', '', text)
    text = re.sub('\n', '', text)
    text = re.sub('\w*\d\w*', '', text)
    # replacing everything with space except (a-z, A-Z, ".", "?", "!", ",", "'")
    text = re.sub(r"[^a-zA-Z?.!,¿']+", " ", text)
    return text


def clean_contractions(text):
    '''Clean contraction using contraction mapping'''    
    specials = ["’", "‘", "´", "`"]
    for s in specials:
        text = text.replace(s, "'")
    #Remove Punctuations
    text = re.sub('[%s]' % re.escape(string.punctuation), '', text)
    # creating a space between a word and the punctuation following it
    # eg: "he is a boy." => "he is a boy ."
    text = re.sub(r"([?.!,¿])", r" \1 ", text)
    text = re.sub(r'[" "]+', " ", text)
    return text

def remove_space(text):
    '''Removes awkward spaces'''   
    #Removes awkward spaces 
    text = text.strip()
    text = text.split()
    return " ".join(text)

def text_preprocessing_pipeline(text):
    '''Cleaning and parsing the text.'''
    text = clean_text(text)
    text = clean_contractions(text)
    text = remove_space(text)
    # text = remove_stopwords(text)  # Added stopword removal step
    return text

def remove_stopwords(text):
    '''Removes stopwords from the text.'''
    stop_words = set(stopwords.words('english'))
    text = text.split()
    text = [word for word in text if word not in stop_words]
    return " ".join(text)

# for df in [df_train, df_test]:
df['conversation_history'] = df['conversation_history'].apply(text_preprocessing_pipeline)
df['response'] = df['response'].apply(text_preprocessing_pipeline)
df['conversation_history'] = df['conversation_history'].apply(remove_stopwords)
df['response'] = df['response'].apply(remove_stopwords)

In [None]:
label_map = {'No': 0, 'Yes': 1, 'To some extent': 2}

In [None]:
df_train['mistake_identification'] = df_train['mistake_identification'].str.strip().str.lower().map(label_map)

In [None]:
X_train = df_train.drop(columns=["mistake_identification"])
y_train = df_train["mistake_identification"]

In [None]:
MODEL_NAME = "sentence-transformers/all-mpnet-base-v2"
# MODEL_NAME = "google-bert/bert-base-uncased"

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device is: ", DEVICE)

BATCH_SIZE = 4
EPOCHS = 10
LR = 1e-4

In [None]:
class HistoryDataset(Dataset):
    def __init__(self, samples):
        # samples = list of (history_str, response_str, label_int)
        self.samples = samples
    def __len__(self):
        return len(self.samples)
    def __getitem__(self, idx):
        return self.samples[idx]

In [None]:
train_samples = list(zip(X_train['conversation_history'], X_train['response'], y_train))
test_samples  = list(zip(df_test['conversation_history'], df_test['response'], [0]*len(df_test)))

In [None]:
train_ds = HistoryDataset(train_samples)
test_ds  = HistoryDataset(test_samples)

In [None]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
encoder   = AutoModel.from_pretrained(MODEL_NAME).to(DEVICE)
encoder.eval()  # We'll freeze it, as in the paper's approach

In [None]:
# TOKEN LEVEL EMBEDDING
@torch.no_grad()
def get_sequence_embeddings(texts):
    """
    texts: list of strings
    Return shape: [batch_size, seq_len, hidden_dim]
    We do *no pooling*, we keep the full token sequence for attention.
    """
    enc = tokenizer(texts, return_tensors="pt", padding=True,
                    truncation=True).to(DEVICE)
    outputs = encoder(**enc)
    return outputs.last_hidden_state  # [batch, seq_len, hidden_dim]

def collate_fn(batch):
    """
    batch: list of (history_str, response_str, label_int)
    We'll embed them in a single pass for efficiency.
    Returns (hist_emb, resp_emb, labels).
    """
    hist_texts = [item[0] for item in batch]
    resp_texts = [item[1] for item in batch]
    labels = [item[2] for item in batch]

    # shape => [B, hist_len, hidden_dim]
    hist_emb = get_sequence_embeddings(hist_texts)
    # shape => [B, resp_len, hidden_dim]
    resp_emb = get_sequence_embeddings(resp_texts)

    labels_t = torch.tensor(labels, dtype=torch.long)

    return hist_emb, resp_emb, labels_t

In [None]:
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,
                          collate_fn=collate_fn)
test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False,
                          collate_fn=collate_fn)

In [None]:
###############################################################################
# 4) MODEL DEFINITION: SIMPLE HISTORY-BASED
###############################################################################
# This follows the paper's architecture for the "Simple History-Based Model":
#  - K from "previous sentence" embeddings (the conversation_history)
#  - Q,V from "current sentence" (the tutor response).
#  - MultiHeadAttention (Q=resp, K=hist, V=resp).
#  - Then we pool the output and pass it through a small feed-forward to get 3-class logits.

class SimpleHistoryBasedModel(nn.Module):
    def __init__(self, hidden_dim=768, n_heads=8, num_classes=3):
        super().__init__()
        self.mha = nn.MultiheadAttention(embed_dim=hidden_dim,
                                         num_heads=n_heads,
                                         batch_first=True)
        self.ff = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.LeakyReLU(),  # paper typically used some activation
            nn.Linear(hidden_dim // 2, num_classes)
        )

    def forward(self, hist_emb, resp_emb):
        """
        hist_emb: [batch, hist_len, hidden_dim] -> used as K
        resp_emb: [batch, resp_len, hidden_dim] -> used as Q & V
        Returns logits => [batch, num_classes]
        """
        # standard multi-head attention: Q=resp, K=hist, V=resp
        # attn_out => [batch, resp_len, hidden_dim]
        attn_out, _ = self.mha(query=resp_emb,
                               key=hist_emb,
                               value=hist_emb)

        # We can pool over resp_len dimension to get a single vector
        # The paper used a feed-forward on "the output of the attention mechanism"
        # We'll do a simple mean-pool:
        pooled = attn_out.mean(dim=1)  # => [batch, hidden_dim]
        logits = self.ff(pooled)       # => [batch, num_classes]
        return logits

In [None]:
model = SimpleHistoryBasedModel(
    hidden_dim=768,
    num_classes=3
).to(DEVICE)

In [None]:
###############################################################################
# 5) TRAINING LOOP
###############################################################################
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=LR)

for epoch in range(EPOCHS):
    model.train()
    total_loss = 0.0
    for hist_emb, resp_emb, labels in train_loader:
        labels = labels.to(DEVICE)

        optimizer.zero_grad()
        logits = model(hist_emb, resp_emb)  # [batch, num_classes]
        loss = criterion(logits, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{EPOCHS}, Train Loss: {avg_loss:.4f}")


In [None]:
# ----------------------------- Prediction on Test Set ----------------------------- #
model.eval()
test_preds = []
with torch.no_grad():
    for hist_emb, resp_emb, _ in test_loader:
        logits = model(hist_emb, resp_emb)
        preds = torch.argmax(logits, dim=1)
        test_preds.append(preds.cpu().numpy())

test_preds = np.concatenate(test_preds, axis=0)
df_test['predicted_mistake_identification'] = test_preds
print("Sample Predictions:")
print(df_test[['conversation_id', 'model', 'predicted_mistake_identification']].head())

# Save predictions to CSV
df_test.to_csv("predicted_mistake_identifications_token.csv", index=False)
print("Predictions exported to predicted_mistake_identifications.csv")
