# 2. Baseline Metrics

In [1]:
!git clone https://github.com/cmhobbs96/meta-semantic-research.git

fatal: destination path 'meta-semantic-research' already exists and is not an empty directory.


In [2]:
!pip install numpy==1.26.4 --force-reinstall
!pip install --upgrade nltk gensim google-colab tensorflow torch torchvision numba pandas --no-cache-dir
!pip install OpenNMT-py

Collecting numpy==1.26.4
  Using cached numpy-1.26.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
Using cached numpy-1.26.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.3 MB)
Installing collected packages: numpy
  Attempting uninstall: numpy
    Found existing installation: numpy 1.26.4
    Uninstalling numpy-1.26.4:
      Successfully uninstalled numpy-1.26.4
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
nlkt 3.0.0 requires pandas==2.2.3, but you have pandas 2.2.2 which is incompatible.
tensorflow-text 2.18.1 requires tensorflow<2.19,>=2.18.0, but you have tensorflow 2.19.0 which is incompatible.
cudf-cu12 25.2.1 requires numba<0.61.0a0,>=0.59.1, but you have numba 0.61.0 which is incompatible.
tf-keras 2.18.0 requires tensorflow<2.19,>=2.18, but you have tensorflow 2.19.0 which is incompatible.
dask-cuda

In [5]:
from google.colab import drive

import os
import subprocess
import time
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.amp import autocast
from torch.cuda.amp import GradScaler
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModelForCausalLM, AutoModelForSequenceClassification
import OpenNMT_py

import nltk
nltk.download('punkt')
nltk.download('punkt_tab')
from nltk import word_tokenize
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from difflib import unified_diff
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

# Set device
if torch.cuda.is_available():
  device = torch.device("cuda")
  scaler = GradScaler()
  os.system("nvidia-smi")
else:
  device = torch.device("cpu")
  scaler = None

ModuleNotFoundError: No module named 'OpenNMT_py'

**Fine-Tuning Hyperparameters**

In [None]:
# Define Hyperparameters
NUM_EPOCHS = 5
BATCH_SIZE = 32
LEARNING_RATE = 3e-5
MAX_LENGTH = 128
ACCUMULATION_STEPS = 2

all_results = []

**Connect to Data**

In [None]:
# Mount to Google Drive
drive.mount('/content/drive')

# Define paths for datasets
train_path = "/content/drive/My Drive/Academia/MS in AI/ECE 57000/Research/data/train.tsv"
test_path = "/content/drive/My Drive/Academia/MS in AI/ECE 57000/Research/data/test.tsv"
gen_path = "/content/drive/My Drive/Academia/MS in AI/ECE 57000/Research/data/gen.tsv"

**Inspect Data**

In [None]:
# Function to check dataset format
def inspect_dataset(file_path, file_name):
    try:
        data = pd.read_csv(file_path, sep="\t", header=None)
        print(f"\n Inspecting {file_name}:")
        print(data.head(5))  # Print first 5 rows
        print(f"Columns: {data.columns}")
    except Exception as e:
        print(f"Error loading {file_name}: {e}")

# Inspect each dataset
inspect_dataset(train_path, "train.tsv")
inspect_dataset(test_path, "test.tsv")
inspect_dataset(gen_path, "gen.tsv")

**Define Dataset**

In [None]:
# Define Dataset class for COGS
class COGSDataset(Dataset):
  def __init__(self, file_path, tokenizer):
    self.data = pd.read_csv(file_path, sep="\t", header=None, names=["input", "output", "split"]).iloc[:, :2]
    self.tokenizer = tokenizer

  def __len__(self):
    return len(self.data)

  def __getitem__(self, index):
    input_text = self.data.iloc[index]["input"]
    output_text = self.data.iloc[index]["output"]

    inputs = self.tokenizer(input_text, padding="max_length", truncation=True, max_length=MAX_LENGTH, return_tensors="pt")
    targets = self.tokenizer(output_text, padding="max_length", truncation=True, max_length=MAX_LENGTH, return_tensors="pt")

    return {
        "input_ids": inputs["input_ids"].squeeze(),
        "attention_mask": inputs["attention_mask"].squeeze(),
        "labels": targets["input_ids"].squeeze()
    }

In [None]:
# Training function
def train_model(model, train_loader, optimizer, epochs=NUM_EPOCHS):
  model.train()
  loss_fn = nn.CrossEntropyLoss()
  accumulation_steps = ACCUMULATION_STEPS
  scaler = torch.amp.GradScaler('cuda')

  print("Training Model on:", next(model.parameters()).device)

  for epoch in range(epochs):
    total_loss = 0

    for i, batch in enumerate(train_loader):
      optimizer.zero_grad()

      input_ids = batch['input_ids'].to(device)
      attention_mask = batch['attention_mask'].to(device)
      labels = batch['labels'].to(device)

      with autocast("cuda"):
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss / accumulation_steps

      scaler.scale(loss).backward()

      if (i + 1) % accumulation_steps == 0:
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad()

      total_loss += loss.item()

    print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader)}")

In [None]:
# Debug Model
def debug_model(model, test_loader, dataset_name):
  model.eval()
  exact_match = 0
  total = 0
  num_samples_to_check = 10
  checked = 0
  predictions_list = []
  references_list = []

  print("Debugging Model on:", next(model.parameters()).device)

  with torch.no_grad():
    for batch in test_loader:
      input_ids = batch["input_ids"].to(device)
      attention_mask = batch["attention_mask"].to(device)
      labels = batch["labels"].to(device)

      print(f"input_ids.shape: {input_ids.shape}")
      print("Sample input_ids[0]:", input_ids[0])
      print("Decoded input[0]:", tokenizer.decode(input_ids[0], skip_special_tokens=True))

      # Generate predictions
      outputs = model.generate(
        input_ids=input_ids,
        attention_mask=attention_mask,
        max_length=64,
        do_sample=True,
        temperature=1.5,
        top_k=50,
        top_p=0.95,
        repetition_penalty=1.2,
        pad_token_id=tokenizer.pad_token_id,
        eos_token_id=tokenizer.eos_token_id
      )
      predictions = tokenizer.batch_decode(outputs, skip_special_tokens=True)
      references = tokenizer.batch_decode(labels, skip_special_tokens=True)

      for i in range(len(predictions)):
        print(f"--- Sample {checked + 1} ---")

        print(f"[Input Text]        : {tokenizer.decode(input_ids[i], skip_special_tokens=True)}")
        print(f"[Input Tokens]      : {input_ids[i].tolist()}")
        print(f"[Label Tokens]      : {labels[i].tolist()}")
        print(f"[Generated Tokens]  : {outputs[i].tolist()}")
        print(f"[Reference Text]    : {references[i]}")
        print(f"[Predictions]       : {predictions[i]}")

        # Quick check for empty predictions
        if len(predictions[i].strip()) == 0:
            print("EMPTY PREDICTION")

        # Check if it's an exact match
        if predictions[i].strip() == references[i].strip():
            print("Exact Match")
        else:
            print("Mismatch")

        print("\n")
        checked += 1
        if checked >= num_samples_to_check:
            break
      if checked >= num_samples_to_check:
        break

In [None]:
# Evaluate Model
def evaluate_model(model, test_loader, dataset_name):
  start_time = time.time()
  model.eval()
  exact_match = 0
  total = 0
  predictions_list = []
  references_list = []

  sample_precisions = []
  sample_recalls = []
  sample_f1s = []
  sample_bleus = []

  smoothie = SmoothingFunction().method4

  print("Evaluating Model on:", next(model.parameters()).device)

  with torch.no_grad():
    for batch_idx, batch in enumerate(test_loader):
      input_ids = batch["input_ids"].to(device)
      attention_mask = batch["attention_mask"].to(device)
      labels = batch["labels"].to(device)

      # Generate predictions
      outputs = model.generate(
        input_ids=input_ids,
        attention_mask=attention_mask,
        max_length=128,
        pad_token_id=tokenizer.pad_token_id,
        eos_token_id=tokenizer.eos_token_id
      )

      decoded_preds = tokenizer.batch_decode(outputs, skip_special_tokens=True)
      references = [
          tokenizer.decode(label[label != -100], skip_special_tokens=True)
          for label in labels
      ]
      predictions = [
          tokenizer.decode(output, skip_special_tokens=True)
          for output in outputs
      ]

      # Append to lists
      predictions_list.extend(predictions)
      references_list.extend(references)

      # Calculate exact match
      exact_match += sum([1 for pred, ref in zip(predictions, references) if pred.strip() == ref.strip()])
      total += len(predictions)


      # Token-level precision, recall, f1 per sample
      for pred, ref in zip(predictions, references):
        pred_tokens = pred.split()
        ref_tokens = ref.split()

        # BLEU Score
        if len(pred_tokens) > 0 and len(ref_tokens) > 0:
          bleu = sentence_bleu([ref_tokens], pred_tokens, smoothing_function=smoothie)
          sample_bleus.append(bleu)

        # Token-level metrics
        min_len = min(len(pred_tokens), len(ref_tokens))
        if min_len == 0:
            continue  # skip empty predictions or references
        pred_slice = pred_tokens[:min_len]
        ref_slice = ref_tokens[:min_len]

        sample_precisions.append(precision_score(ref_slice, pred_slice, average='macro', zero_division=0))
        sample_recalls.append(recall_score(ref_slice, pred_slice, average='macro', zero_division=0))
        sample_f1s.append(f1_score(ref_slice, pred_slice, average='macro', zero_division=0))

      if batch_idx % 10 == 0:
        print(f"Batch {batch_idx}: Processed {total} examples...")

  # Final metrics
  exact_match_score = 100 * exact_match / total
  precision = 100 * np.mean(sample_precisions)
  recall = 100 * np.mean(sample_recalls)
  f1 = 100 * np.mean(sample_f1s)
  bleu = 100 * np.mean(sample_bleus)
  elapsed_time = time.time() - start_time

  # Print 5 sample predictions for debugging
  print(f"\n** {dataset_name} Sample Predictions vs. References:**")
  for i in range(min(5, len(predictions_list))):
    print(f"{dataset_name} Prediction {i+1}: {predictions_list[i]}")
    print(f"{dataset_name} Reference {i+1}: {references_list[i]}\n")

  print(f"{dataset_name} Exact Match Score: {exact_match_score:.4f}%")
  print(f"{dataset_name} Precision: {precision:.4f}%")
  print(f"{dataset_name} Recall: {recall:.4f}%")
  print(f"{dataset_name} F1 Score: {f1:.4f}%")
  print(f"{dataset_name} BLEU Score: {bleu:.4f}%")
  print(f"{dataset_name} Elapsed Time: {elapsed_time} seconds")

  return {
      "exact_match": exact_match_score,
      "precision": precision,
      "recall": recall,
      "f1": f1,
      "bleu": bleu,
      "elapsed_time": elapsed_time
  }

**T5 Model**

In [None]:
MODEL_NAME = "T5"
MODEL_PATH = "google-t5/t5-small"
DEBUG = True

print(f"\n---- Evaluating {MODEL_NAME} ----")

tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_PATH).to(device)

# Create dataset with current tokenizer
train_dataset = COGSDataset(train_path, tokenizer)
test_dataset = COGSDataset(test_path, tokenizer)
gen_dataset = COGSDataset(gen_path, tokenizer)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
gen_loader = DataLoader(gen_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Define Optimizer
optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE)

# Train the Model
train_model(model, train_loader, optimizer)

# Check trained model
for name, param in model.named_parameters():
    if param.requires_grad:
        print(f"{name}: {param.grad is not None}")

# Save model
output_dir = f"models/{MODEL_NAME}"
os.makedirs(output_dir, exist_ok=True)

model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)

print(f"Training complete. Model saved to: {output_dir}")

# Evaluate the Model
if DEBUG == True:
  debug_results = debug_model(model, test_loader, "Test Set")
else:
  test_results = evaluate_model(model, test_loader, "Test Set")
  test_results.update({"model": MODEL_NAME, "set": "Test"})

  gen_results = evaluate_model(model, gen_loader, "Generalization Set")
  gen_results.update({"model": MODEL_NAME, "set": "Generalization"})

  all_results.extend([test_results, gen_results])

**BART Model**

In [None]:
MODEL_NAME = "BART"
MODEL_PATH = "facebook/bart-base"
DEBUG = True

print(f"\n---- Evaluating {MODEL_NAME} ----")

tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_PATH).to(device)

# Create dataset with current tokenizer
train_dataset = COGSDataset(train_path, tokenizer)
test_dataset = COGSDataset(test_path, tokenizer)
gen_dataset = COGSDataset(gen_path, tokenizer)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
gen_loader = DataLoader(gen_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Define Optimizer
optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE)

# Train the Model
train_model(model, train_loader, optimizer)

# Check trained model
for name, param in model.named_parameters():
    if param.requires_grad:
        print(f"{name}: {param.grad is not None}")

# Save model
output_dir = f"models/{MODEL_NAME}"
os.makedirs(output_dir, exist_ok=True)

model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)

print(f"Training complete. Model saved to: {output_dir}")

# Evaluate the Model
if DEBUG == True:
  debug_results = debug_model(model, test_loader, "Test Set")
else:
  test_results = evaluate_model(model, test_loader, "Test Set")
  test_results.update({"model": MODEL_NAME, "set": "Test"})

  gen_results = evaluate_model(model, gen_loader, "Generalization Set")
  gen_results.update({"model": MODEL_NAME, "set": "Generalization"})

  all_results.extend([test_results, gen_results])

**OpenNMT Debug and Evaluating**

In [None]:
def debug_opennmt(model_path, src_file, ref_file, num_samples_to_check=10):
    print(f"\n---- Debugging {MODEL_NAME} ----")

    # Create a temporary debug file
    debug_output_file = os.path.join(output_dir, "debug_predictions.txt")

    # Run OpenNMT Translation on the test set
    translate_command = [
        "python", "-m", "onmt.bin.translate",
        "-model", model_path,
        "-src", src_file,
        "-output", debug_output_file,
        "-gpu", "0" if torch.cuda.is_available() else "-1",
        "-batch_size", "1",
        "-beam_size", "5"
    ]
    subprocess.run(translate_command)

    # Load original inputs & references
    with open(src_file, "r") as src_f, open(ref_file, "r") as ref_f, open(debug_output_file, "r") as pred_f:
        inputs = src_f.readlines()
        references = ref_f.readlines()
        predictions = pred_f.readlines()

    # Ensure length matches
    num_samples = min(len(inputs), len(references), len(predictions), num_samples_to_check)

    exact_match = 0
    print("\n---- Debugging Output ----")
    for i in range(num_samples):
        input_text = inputs[i].strip()
        reference_text = references[i].strip()
        prediction_text = predictions[i].strip()

        print(f"--- Sample {i + 1} ---")
        print(f"[Input Text]        : {input_text}")
        print(f"[Reference Text]    : {reference_text}")
        print(f"[Predictions]       : {prediction_text}")

        # Quick check for empty predictions
        if len(prediction_text.strip()) == 0:
            print("EMPTY PREDICTION")

        # Check if it's an exact match
        if prediction_text.strip() == reference_text.strip():
            print("Exact Match")
            exact_match += 1
        else:
            print("Mismatch")

        print("\n")

    # Print accuracy
    match_rate = (exact_match / num_samples) * 100
    print(f"Debugging Complete! Exact Match Rate: {match_rate:.2f}%\n")

    return {"model": MODEL_NAME, "set": "Debug", "exact_match_rate": match_rate}

In [None]:
def evaluate_opennmt(model_path, src_file, ref_file, dataset_name):
    start_time = time.time()

    print(f"\n---- Evaluating {MODEL_NAME} on {dataset_name} ----")

    # Create temporary output file for predictions
    output_file = os.path.join(output_dir, f"{dataset_name}_predictions.txt")

    # Run OpenNMT translation
    translate_command = [
        "python", "-m", "onmt.bin.translate",
        "-model", model_path,
        "-src", src_file,
        "-output", output_file,
        "-gpu", "0" if torch.cuda.is_available() else "-1",
        "-batch_size", "32",
        "-beam_size", "5"
    ]
    subprocess.run(translate_command)

    # Load reference and prediction files
    with open(ref_file, "r") as ref_f, open(output_file, "r") as pred_f:
        references = [line.strip() for line in ref_f.readlines()]
        predictions = [line.strip() for line in pred_f.readlines()]

    # Ensure we only evaluate the number of samples available
    total = min(len(references), len(predictions))

    exact_match = 0
    predictions_list = []
    references_list = []
    sample_precisions = []
    sample_recalls = []
    sample_f1s = []
    sample_bleus = []

    smoothie = SmoothingFunction().method4

    for i in range(total):
        ref = references[i]
        pred = predictions[i]

        predictions_list.append(pred)
        references_list.append(ref)

        # Check exact match
        if pred == ref:
            exact_match += 1

        # Token-level precision, recall, f1 per sample
        pred_tokens = pred.split()
        ref_tokens = ref.split()

        # BLEU Score
        if len(pred_tokens) > 0 and len(ref_tokens) > 0:
            bleu = sentence_bleu([ref_tokens], pred_tokens, smoothing_function=smoothie)
            sample_bleus.append(bleu)

        # Token-level metrics
        min_len = min(len(pred_tokens), len(ref_tokens))
        if min_len == 0:
            continue  # Skip empty predictions or references
        pred_slice = pred_tokens[:min_len]
        ref_slice = ref_tokens[:min_len]

        sample_precisions.append(precision_score(ref_slice, pred_slice, average='macro', zero_division=0))
        sample_recalls.append(recall_score(ref_slice, pred_slice, average='macro', zero_division=0))
        sample_f1s.append(f1_score(ref_slice, pred_slice, average='macro', zero_division=0))

    # Compute final metrics
    exact_match_score = 100 * exact_match / total
    precision = 100 * np.mean(sample_precisions)
    recall = 100 * np.mean(sample_recalls)
    f1 = 100 * np.mean(sample_f1s)
    bleu = 100 * np.mean(sample_bleus)
    elapsed_time = time.time() - start_time

    # Print 5 sample predictions
    print(f"\n** {dataset_name} Sample Predictions vs. References:**")
    for i in range(min(5, total)):
        print(f"{dataset_name} Prediction {i+1}: {predictions_list[i]}")
        print(f"{dataset_name} Reference {i+1}: {references_list[i]}\n")

    print(f"{dataset_name} Exact Match Score: {exact_match_score:.4f}%")
    print(f"{dataset_name} Precision: {precision:.4f}%")
    print(f"{dataset_name} Recall: {recall:.4f}%")
    print(f"{dataset_name} F1 Score: {f1:.4f}%")
    print(f"{dataset_name} BLEU Score: {bleu:.4f}%")
    print(f"{dataset_name} Elapsed Time: {elapsed_time:.2f} seconds")

    return {
        "model": MODEL_NAME,
        "set": dataset_name,
        "exact_match": exact_match_score,
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "bleu": bleu,
        "elapsed_time": elapsed_time
    }

**OPENNMT Model**

In [None]:
# Define output directory for OpenNMT
MODEL_NAME = "OPENNMT"
output_dir = "opennmt_data"
os.makedirs(output_dir, exist_ok=True)

# Function to preprocess COGS dataset for OpenNMT
def preprocess_cogs(file_path, prefix):
    data = pd.read_csv(file_path, sep="\t", header=None, names=["input", "output"])

    # Save source (input) and target (output) files
    input_file = os.path.join(output_dir, f"{prefix}.src")
    output_file = os.path.join(output_dir, f"{prefix}.tgt")

    data["input"].to_csv(input_file, index=False, header=False)
    data["output"].to_csv(output_file, index=False, header=False)

    print(f"Processed {file_path}: Saved {input_file} and {output_file}")

# Run preprocessing on datasets
preprocess_cogs(train_path, "train")
preprocess_cogs(test_path, "test")
preprocess_cogs(gen_path, "gen")

print("Preprocessing complete! Ready for OpenNMT training.")

train_command = [
    "python", "-m", "onmt.bin.train",
    "-data", "opennmt_data/cogs_preprocessed",
    "-save_model", "models/cogs_baseline",
    "-layers", "2",
    "-rnn_size", "512",
    "-word_vec_size", "512",
    "-train_steps", "50000",
    "-batch_size", "32",
    "-valid_steps", "1000"
]

# Run OpenNMT Training
subprocess.run(train_command)

# Save Model
output_dir = f"models/{MODEL_NAME}"
os.makedirs(output_dir, exist_ok=True)

print("\n---- Saving Model ----")

# Move trained model to the output directory
trained_model_path = "models/cogs_baseline_step_50000.pt"
new_model_path = os.path.join(output_dir, "cogs_model.pt")
os.rename(trained_model_path, new_model_path)

print(f"Training complete. Model saved to: {new_model_path}")

# Define test and generalization files
test_file = "opennmt_data/test.src"
gen_file = "opennmt_data/gen.src"
test_output = os.path.join(output_dir, "test_predictions.txt")
gen_output = os.path.join(output_dir, "gen_predictions.txt")

if DEBUG:
    print("Debug mode: Checking model outputs.")
    debug_opennmt(new_model_path, test_file, test_output)
else:
    print("Evaluating model on test and generalization sets...")
    evaluate_opennmt(new_model_path, test_file, test_output)
    evaluate_opennmt(new_model_path, gen_file, gen_output)

print("Evaluation complete!")


**GPT Model**

In [None]:
MODEL_NAME = "GPT"
MODEL_PATH = "microsoft/phi-2"
DEBUG = True

print(f"\n---- Evaluating {MODEL_NAME} ----")

tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_PATH).to(device)

# Create dataset with current tokenizer
train_dataset = COGSDataset(train_path, tokenizer)
test_dataset = COGSDataset(test_path, tokenizer)
gen_dataset = COGSDataset(gen_path, tokenizer)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
gen_loader = DataLoader(gen_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Define Optimizer
optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE)

# Train the Model
train_model(model, train_loader, optimizer)

# Check trained model
for name, param in model.named_parameters():
    if param.requires_grad:
        print(f"{name}: {param.grad is not None}")

# Save model
output_dir = f"models/{MODEL_NAME}"
os.makedirs(output_dir, exist_ok=True)

model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)

print(f"Training complete. Model saved to: {output_dir}")

# Evaluate the Model
if DEBUG == True:
  debug_results = debug_model(model, test_loader, "Test Set")
else:
  test_results = evaluate_model(model, test_loader, "Test Set")
  test_results.update({"model": MODEL_NAME, "set": "Test"})

  gen_results = evaluate_model(model, gen_loader, "Generalization Set")
  gen_results.update({"model": MODEL_NAME, "set": "Generalization"})

  all_results.extend([test_results, gen_results])

**LSTM With Attention**

In [None]:
class LSTMWithAttention(nn.Module):
    def __init__(self, vocab_size, embedding_dim=256, hidden_dim=512, output_dim=1000, num_layers=2):
        super(LSTMWithAttention, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True)

        # Attention layer
        self.attn = nn.Linear(hidden_dim, hidden_dim)
        self.context = nn.Linear(hidden_dim, 1, bias=False)

        # Output layer
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, input_ids):
        embedded = self.embedding(input_ids)
        lstm_out, _ = self.lstm(embedded)

        # Attention mechanism
        attn_scores = torch.tanh(self.attn(lstm_out))  # Compute attention scores
        attn_weights = torch.softmax(self.context(attn_scores), dim=1)  # Softmax over time steps
        context_vector = (attn_weights * lstm_out).sum(dim=1)  # Weighted sum

        output = self.fc(context_vector)  # Final prediction
        return output


In [None]:
MODEL_NAME = "LSTMWithAttention"
DEBUG = True

print(f"\n---- Evaluating {MODEL_NAME} ----")

tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_PATH).to(device)

# Create dataset with current tokenizer
train_dataset = COGSDataset(train_path, tokenizer)
test_dataset = COGSDataset(test_path, tokenizer)
gen_dataset = COGSDataset(gen_path, tokenizer)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
gen_loader = DataLoader(gen_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Define Optimizer
optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE)

# Train the Model
train_model(model, train_loader, optimizer)

# Check trained model
for name, param in model.named_parameters():
    if param.requires_grad:
        print(f"{name}: {param.grad is not None}")

# Save model
output_dir = f"models/{MODEL_NAME}"
os.makedirs(output_dir, exist_ok=True)

model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)

print(f"Training complete. Model saved to: {output_dir}")

# Evaluate the Model
if DEBUG == True:
  debug_results = debug_model(model, test_loader, "Test Set")
else:
  test_results = evaluate_model(model, test_loader, "Test Set")
  test_results.update({"model": MODEL_NAME, "set": "Test"})

  gen_results = evaluate_model(model, gen_loader, "Generalization Set")
  gen_results.update({"model": MODEL_NAME, "set": "Generalization"})

  all_results.extend([test_results, gen_results])

In [None]:
df_results = pd.DataFrame(all_results)
# df_results.to_csv("model_evaluation_results.csv", index=False)
# print("📊 All evaluation results saved to: model_evaluation_results.csv")
df_results
print(df_results.to_markdown())

In [None]:
# Drop non-metric columns for plotting
metrics = ['exact_match', 'precision', 'recall', 'f1', 'bleu']

# Plot each metric
for metric in metrics:
    plt.figure(figsize=(10, 5))
    sns.barplot(data=df_results, x='model', y=metric, hue='set')
    plt.title(f'Model Comparison: {metric.capitalize()}')
    plt.ylabel("Score")
    plt.ylim(0, 1.0)
    plt.xticks(rotation=45)
    plt.grid(axis="y", linestyle="--", alpha=0.7)
    plt.legend(title="Dataset")
    plt.tight_layout()
    plt.show()