In [1]:
import pandas as pd
import numpy as np
import torch
# from transformers import RobertaTokenizer, RobertaForSequenceClassification
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers import TrainingArguments, Trainer
from transformers import EarlyStoppingCallback
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt
from datasets import Dataset, load_from_disk
import random
import os
import evaluate
import wandb


# Set random seeds for reproducibility
def set_seed(seed_value=42):
    random.seed(seed_value)
    np.random.seed(seed_value) 
    torch.manual_seed(seed_value)
    torch.cuda.manual_seed_all(seed_value)
    os.environ['PYTHONHASHSEED'] = str(seed_value)
    torch.backends.cudnn.deterministic = True

set_seed(42)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# set the wandb project where this run will be logged
os.environ["WANDB_PROJECT"]="cs769_llama"
# turn off watch to log faster
os.environ["WANDB_WATCH"]="false"

In [3]:
def format_mcq_input(question, option_a, option_b, option_c, option_d, correct_option):
    """
    Format the MCQ question and options into a single text string for the model.
    
    Parameters:
    - question: The question text
    - option_a, option_b, option_c, option_d: The option texts
    - correct_option: The correct option (A, B, C, or D)
    
    Returns:
    - A formatted string combining all information
    """

    idx_to_ans_map = {0:"A", 1:"B", 2:"C", 3:"D"}


    formatted_text = f"Question: {question}\n"
    # formatted_text += f"A: {option_a}\n"
    # formatted_text += f"B: {option_b}\n"
    # formatted_text += f"C: {option_c}\n"
    # formatted_text += f"D: {option_d}\n"
    # formatted_text += f"Answer: {idx_to_ans_map[correct_option]}"
    # formatted_text += f"Answer: A"
    
    return formatted_text

def preprocess_function(examples):
    """
    Preprocess function to format and tokenize the input examples.
    """
    formatted_inputs = [
        format_mcq_input(
            question, opa, opb, opc, opd, cop
        ) for question, opa, opb, opc, opd,cop in zip(
            examples['question'], 
            examples['opa'], 
            examples['opb'], 
            examples['opc'], 
            examples['opd'],
            examples['cop']
        )
    ]
    
    # Tokenize the formatted inputs
    tokenized_inputs = tokenizer(
        formatted_inputs,
        padding='max_length',
        truncation=True,
        max_length=512,
        return_tensors="pt"
    )
    
    return tokenized_inputs

def compute_metrics(eval_pred):
    """
    Compute metrics for evaluation.
    """
    metric = evaluate.load("accuracy")

    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    accuracy = metric.compute(predictions=predictions, references=labels)
    result = {**accuracy}

    return result

def predict_difficulty(model, tokenizer, question, option_a, option_b, option_c, option_d):
    """
    Predict the difficulty of a single MCQ question.
    
    Parameters:
    - model: The trained model
    - tokenizer: The tokenizer
    - question, option_a, option_b, option_c, option_d, correct_option: MCQ components
    
    Returns:
    - Predicted difficulty level ('easy', 'medium', or 'hard')
    """
    # Format the input
    formatted_input = format_mcq_input(
        question, option_a, option_b, option_c, option_d
    )
    
    # Tokenize
    inputs = tokenizer(
        formatted_input,
        padding='max_length',
        truncation=True,
        max_length=512,
        return_tensors="pt"
    )
    
    # Move inputs to the same device as the model
    device = model.device
    inputs = {k: v.to(device) for k, v in inputs.items()}
    
    # Get prediction
    with torch.no_grad():
        outputs = model(**inputs)
        predictions = torch.argmax(outputs.logits, dim=-1)
    
    # Map prediction to difficulty level
    difficulty_map = {0: 'easy', 1: 'medium', 2: 'hard'}
    predicted_difficulty = difficulty_map[predictions.item()]
    
    return predicted_difficulty

In [4]:
# Load data
easy_data = load_from_disk('./json_to_hf/subset1')
medium_data = load_from_disk('./json_to_hf/subset2')
hard_data = load_from_disk('./json_to_hf/subset3')

easy_df = pd.DataFrame(easy_data)
medium_df = pd.DataFrame(medium_data)
hard_df = pd.DataFrame(hard_data)

# Add difficulty labels
easy_df['difficulty'] = 'easy'
medium_df['difficulty'] = 'medium'
hard_df['difficulty'] = 'hard'

# Combine dataframes
combined_df = pd.concat([easy_df, medium_df, hard_df], ignore_index=True)

# Shuffle the data
combined_df = combined_df.sample(frac=1, random_state=42).reset_index(drop=True)

# Map text labels to numeric
label_map = {'easy': 0, 'medium': 1, 'hard': 2}
combined_df['label'] = combined_df['difficulty'].map(label_map)

# Split data
train_df, temp_df = train_test_split(
    combined_df, test_size=0.3, random_state=42, stratify=combined_df['difficulty']
)
val_df, test_df = train_test_split(
    temp_df, test_size=0.7, random_state=42, stratify=temp_df['difficulty']
)

print(f"Training samples: {len(train_df)}")
print(f"Validation samples: {len(val_df)}")
print(f"Testing samples: {len(test_df)}")

# Convert to HuggingFace datasets
train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)
test_dataset = Dataset.from_pandas(test_df)

Training samples: 127975
Validation samples: 16454
Testing samples: 38393


In [5]:
# Initialize tokenizer
tokenizer = AutoTokenizer.from_pretrained('microsoft/deberta-base')

# Preprocess datasets
train_dataset = train_dataset.map(
    preprocess_function, 
    batched=True,
    remove_columns=['question', 'opa', 'opb', 'opc', 'opd', 'cop', 'difficulty', '__index_level_0__'],
    num_proc=4,

)
val_dataset = val_dataset.map(
    preprocess_function, 
    batched=True,
    remove_columns=['question', 'opa', 'opb', 'opc', 'opd', 'cop', 'difficulty', '__index_level_0__'],
    num_proc=4,
)
test_dataset = test_dataset.map(
    preprocess_function, 
    batched=True,
    remove_columns=['question', 'opa', 'opb', 'opc', 'opd', 'cop', 'difficulty', '__index_level_0__'],
    num_proc=4,
)

# Set format for PyTorch
train_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])
val_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])
test_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])

Map (num_proc=4): 100%|██████████| 127975/127975 [00:19<00:00, 6435.59 examples/s] 
Map (num_proc=4): 100%|██████████| 16454/16454 [00:03<00:00, 4942.04 examples/s]
Map (num_proc=4): 100%|██████████| 38393/38393 [00:06<00:00, 5796.85 examples/s] 


In [6]:
model_dir = './med_mcqa_router_deberta'
# Initialize model
model = AutoModelForSequenceClassification.from_pretrained(
    'microsoft/deberta-base', 
    num_labels=3
)

Some weights of DebertaForSequenceClassification were not initialized from the model checkpoint at microsoft/deberta-base and are newly initialized: ['classifier.bias', 'classifier.weight', 'pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [7]:
# Freeze all encoder layers
for param in model.deberta.encoder.layer.parameters():
    param.requires_grad = False

# Unfreeze the last two layers
for layer in model.deberta.encoder.layer[-2:]:
    for param in layer.parameters():
        param.requires_grad = True

# Also unfreeze the classifier head
for param in model.classifier.parameters():
    param.requires_grad = True

In [8]:
trainable_layers = [name for name, param in model.named_parameters() if param.requires_grad]
print(f"Trainable layers:\n{trainable_layers}")

Trainable layers:
['deberta.embeddings.word_embeddings.weight', 'deberta.embeddings.LayerNorm.weight', 'deberta.embeddings.LayerNorm.bias', 'deberta.encoder.layer.10.attention.self.q_bias', 'deberta.encoder.layer.10.attention.self.v_bias', 'deberta.encoder.layer.10.attention.self.in_proj.weight', 'deberta.encoder.layer.10.attention.self.pos_proj.weight', 'deberta.encoder.layer.10.attention.self.pos_q_proj.weight', 'deberta.encoder.layer.10.attention.self.pos_q_proj.bias', 'deberta.encoder.layer.10.attention.output.dense.weight', 'deberta.encoder.layer.10.attention.output.dense.bias', 'deberta.encoder.layer.10.attention.output.LayerNorm.weight', 'deberta.encoder.layer.10.attention.output.LayerNorm.bias', 'deberta.encoder.layer.10.intermediate.dense.weight', 'deberta.encoder.layer.10.intermediate.dense.bias', 'deberta.encoder.layer.10.output.dense.weight', 'deberta.encoder.layer.10.output.dense.bias', 'deberta.encoder.layer.10.output.LayerNorm.weight', 'deberta.encoder.layer.10.output.La

In [None]:

# Define training arguments
training_args = TrainingArguments(
    output_dir=model_dir,
    eval_strategy="steps",
    save_strategy="steps",
    learning_rate=3e-4,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=64,
    logging_steps=100,
    eval_steps=100,
    num_train_epochs=4,
    weight_decay=0.01,
    load_best_model_at_end=True,
    # metric_for_best_model="accuracy",
    push_to_hub=False,
    report_to='wandb',  # Disable wandb, tensorboard etc.
    run_name='router_classifier'
)

# Initialize Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics,
    # callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
)

# Train model
print("Training model...")
trainer.train()


# Save model and tokenizer
model_path = os.path.join(model_dir, 'best')
trainer.save_model(model_path)
tokenizer.save_pretrained(model_path)
print(f"Model saved to {model_path}")

Training model...


[34m[1mwandb[0m: Currently logged in as: [33msyammohan2103[0m to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


Step,Training Loss,Validation Loss,Accuracy
100,1.0652,1.068372,0.401118
200,1.0625,1.063062,0.386654


In [None]:
# Evaluate on test set
print("Evaluating on test set...")
test_results = trainer.evaluate(test_dataset)
print(f"Test results: {test_results}")

# Make prediction on a sample
sample_idx = 0
sample = test_df.iloc[sample_idx]

predicted_difficulty = predict_difficulty(
    model,
    tokenizer,
    sample['question'],
    sample['opa'],
    sample['opb'],
    sample['opc'],
    sample['opd'],
    sample['cop']
)

print(f"\nSample question: {sample['question']}")
print(f"Actual difficulty: {sample['difficulty']}")
print(f"Predicted difficulty: {predicted_difficulty}")

In [None]:
# Plot training results
train_history = trainer.state.log_history

# Extract metrics
train_losses = []
val_losses = []
val_accuracies = []

for entry in train_history:
    if 'loss' in entry and 'step' in entry:
        train_losses.append(entry['loss'])
    if 'eval_loss' in entry:
        val_losses.append(entry['eval_loss'])
    if 'eval_accuracy' in entry:
        val_accuracies.append(entry['eval_accuracy'])

# Plot
epochs = range(1, len(val_losses) + 1)

plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Training Loss')
plt.title('Training Loss')
plt.xlabel('Step')
plt.ylabel('Loss')

plt.subplot(1, 2, 2)
plt.plot(epochs, val_losses, 'b-', label='Validation Loss')
plt.plot(epochs, val_accuracies, 'r-', label='Validation Accuracy')
plt.title('Validation Metrics')
plt.xlabel('Epoch')
plt.ylabel('Value')
plt.legend()
plt.tight_layout()
plt.show()