In [60]:
import torch
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from transformers import GPT2Tokenizer, GPT2ForSequenceClassification, AdamW, get_linear_schedule_with_warmup
from transformers import AutoTokenizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import re
from torch.nn.utils.rnn import pad_sequence

from transformers import GPT2Config



In [61]:
# get csv file
data_path = "data/raw/data/beetle.csv"
df = pd.read_csv(data_path)


In [62]:
def preprocess_text(text):
    # Remove extra white spaces
    text = re.sub(r'\s+', ' ', text)

    # Convert to lowercase
    text = text.lower()

    return text

# Preprocess student and reference answers in the dataset
df['student_answer'] = df['student_answer'].apply(preprocess_text)
df['reference_answer'] = df['reference_answer'].apply(preprocess_text)


In [63]:
model_name = "gpt2"
tokenizer = AutoTokenizer.from_pretrained(model_name)


In [64]:
def encode_sentence_pair(student_answer, reference_answer, max_length=512):
    # Set the padding token to the EOS token if not defined
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    tokens = tokenizer(student_answer, reference_answer, return_tensors="pt", padding=True, truncation=True)
    return tokens


In [72]:
class ASAGDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        student_answer = row["student_answer"]
        reference_answer = row["reference_answer"]
        assigned_points = row["assigned_points"]
        max_points = row["max_points"]
        percentage_of_correctness = row["assigned_points"] / row["max_points"]

        encoded = encode_sentence_pair(student_answer, reference_answer)

        return {
            "input_ids": encoded["input_ids"].squeeze(),
            "attention_mask": encoded["attention_mask"].squeeze(),
            "assigned_points": torch.tensor(assigned_points, dtype=torch.float32),
            "max_points": torch.tensor(max_points, dtype=torch.float32),
            "percentage_of_correctness": torch.tensor(percentage_of_correctness, dtype=torch.float32),
        }


In [76]:
def collate_fn(batch):
    # Set the padding token to the EOS token if not defined
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    # Pad input_ids and attention_mask tensors
    input_ids = pad_sequence([item["input_ids"] for item in batch], batch_first=True, padding_value=tokenizer.pad_token_id)
    attention_mask = pad_sequence([item["attention_mask"] for item in batch], batch_first=True, padding_value=0)

    percentage_of_correctness = torch.tensor([item["percentage_of_correctness"] for item in batch], dtype=torch.float32)
    assigned_points = torch.tensor([item["assigned_points"] for item in batch], dtype=torch.float32)
    max_points = torch.tensor([item["max_points"] for item in batch], dtype=torch.float32)


    return {
        "input_ids": input_ids,
        "attention_mask": attention_mask,
        "percentage_of_correctness": percentage_of_correctness,
        "assigned_points": assigned_points,
        "max_points": max_points,
    }


In [77]:
train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)

train_dataset = ASAGDataset(train_df)
val_dataset = ASAGDataset(val_df)

batch_size = 16
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)
val_dataloader = DataLoader(val_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn)



In [68]:
device = "mps" if getattr(torch,'has_mps',False) \
    else "gpu" if torch.cuda.is_available() else "cpu"

tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token
gpt2_config = GPT2Config.from_pretrained("gpt2", num_labels=1)
gpt2_config.pad_token_id = tokenizer.pad_token_id
model = GPT2ForSequenceClassification(gpt2_config)
model.to(device)

num_epochs = 3
optimizer = AdamW(model.parameters(), lr=2e-5)

total_steps = len(train_dataloader) * num_epochs
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

# Training loop
for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    for batch in train_dataloader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        percentage_of_correctness = batch["percentage_of_correctness"].to(device)

        model.zero_grad()
        outputs = model(input_ids, attention_mask=attention_mask, labels=percentage_of_correctness.unsqueeze(1))
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        scheduler.step()

        train_loss += loss.item()

    print(f"Epoch {epoch + 1}/{num_epochs} - Train loss: {train_loss / len(train_dataloader)}")

# Evaluation loop
model.eval()
predictions = []
ground_truth = []

with torch.no_grad():
    for batch in val_dataloader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        percentage_of_correctness = batch["percentage_of_correctness"].to(device)

        outputs = model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits.squeeze().detach().cpu()

        predictions.extend(logits.tolist())
        ground_truth.extend(percentage_of_correctness.cpu().tolist())

# Calculate the mean squared error
mse = mean_squared_error(ground_truth, predictions)
print(f"Mean Squared Error: {mse}")




Epoch 1/3 - Train loss: 0.4552588740144251
Epoch 2/3 - Train loss: 0.2015805348663892
Epoch 3/3 - Train loss: 0.17757784188063844
Mean Squared Error: 0.16920366439962303


In [69]:
df

Unnamed: 0,row_id,question,question_id,student_answer,reference_answer,assigned_points,max_points,domain
0,0,What role does the path play in determining wh...,0,if that switch is with the path between that b...,if a bulb and a switch are in the same path th...,1,1,
1,1,What role does the path play in determining wh...,0,"the switch, the bulb, and the battery have to ...",if a bulb and a switch are in the same path th...,1,1,
2,2,What role does the path play in determining wh...,0,the path plays an important role,if a bulb and a switch are in the same path th...,0,1,
3,3,What role does the path play in determining wh...,0,uh-huh,if a bulb and a switch are in the same path th...,0,1,
4,4,What role does the path play in determining wh...,0,switch is contained in a circuit,if a bulb and a switch are in the same path th...,0,1,
...,...,...,...,...,...,...,...,...
6613,6613,Explain your reasoning.,130,"if one is out the others will go out, they are...",a and c are in the same closed path,1,1,
6614,6614,Explain your reasoning.,130,they are all on the dame closed path,a and c are in the same closed path,1,1,
6615,6615,Explain your reasoning.,130,they are contained on the same closed path.,a and c are in the same closed path,1,1,
6616,6616,Explain your reasoning.,130,they are not parallel,a and c are in the same closed path,0,1,


In [80]:
device = "mps" if getattr(torch,'has_mps',False) \
    else "gpu" if torch.cuda.is_available() else "cpu"

# Evaluation loop
model.eval()
predictions = []
ground_truth = []
num_correct_predictions = 0
total_predictions = 0

with torch.no_grad():
    for batch in val_dataloader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        percentage_of_correctness = batch["percentage_of_correctness"].to(device)
        max_points = batch["max_points"].to(device)
        assigned_points = batch["assigned_points"].to(device)

        outputs = model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits.squeeze().detach()

        # Round the predictions and multiply by max_points
        rounded_predictions = torch.round(logits * max_points)
        
        # Compare the rounded_predictions to the assigned_points and count the number of correct predictions
        num_correct_predictions += torch.sum(rounded_predictions == assigned_points).item()
        total_predictions += assigned_points.size(0)
        
        predictions.extend(logits.tolist())
        ground_truth.extend(percentage_of_correctness.cpu().tolist())

# Calculate the mean squared error
mse = mean_squared_error(ground_truth, predictions)
print(f"Mean Squared Error: {mse}")

# Calculate the accuracy
accuracy = num_correct_predictions / total_predictions
print(f"Accuracy: {accuracy * 100:.2f}%")

Mean Squared Error: 0.16920366439962303
Accuracy: 76.81%
