In [None]:
!pip install transformers



In [None]:
import pandas as pd
import json
import torch
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer, BertModel
from tqdm import tqdm
import torch.nn.functional as F
import os


In [None]:
class MCQADataset(Dataset):
    def __init__(self, data, tokenizer):
        self.data = data
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        inputs = self.tokenizer(item['text_with_choice'], padding='max_length', truncation=True, max_length=512, return_tensors="pt")
        inputs = {key: val.squeeze(0) for key, val in inputs.items()}
        inputs['labels'] = torch.tensor(item['label'])
        return inputs

In [None]:
def load_data(file_name, answers):
    data = []
    with open(file_name) as json_file:
        json_list = list(json_file)
    for json_str in json_list:
        result = json.loads(json_str)
        base = '[CLS] ' + result['fact1'] + ' [SEP] ' + result['question']['stem']
        ans = answers.index(result['answerKey'])
        for j in range(4):
            text = base + ' ' + result['question']['choices'][j]['text'] + ' [END]'
            label = 1 if j == ans else 0
            data.append({
                "text_with_choice": text,
                "label": label
            })
    return data




In [None]:
def train_model(train_loader, model, linear_layer, optimizer, device):
    model.train()
    total_loss = 0.0
    correct_predictions = 0
    total_examples = 0

    for batch in tqdm(train_loader, desc="Training"):
        inputs = {key: val.to(device) for key, val in batch.items() if key != 'labels'}
        labels = batch['labels'].to(device)

        optimizer.zero_grad()
        outputs = model(**inputs)
        cls_embeddings = outputs.last_hidden_state[:, 0, :]

        logits = linear_layer(cls_embeddings)
        loss = criterion(logits, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        _, preds = torch.max(logits, dim=1)
        correct_predictions += torch.sum(preds == labels)
        total_examples += labels.size(0)

    avg_loss = total_loss / len(train_loader)
    accuracy = correct_predictions.double() / total_examples
    return avg_loss, accuracy.item()


In [None]:
def evaluate_model(loader, model, linear_layer, device):
    model.eval()
    total, correct = 0, 0
    with torch.no_grad():
        for batch in tqdm(loader, desc="Evaluating"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            cls_embeddings = outputs.last_hidden_state[:, 0, :]

            logits = linear_layer(cls_embeddings)
            predictions = torch.argmax(logits, dim=-1)
            correct += (predictions == labels).sum().item()
            total += labels.size(0)
    accuracy = correct / total
    return accuracy




In [None]:
def save_model(epoch, model, optimizer, linear_layer, path):
    state = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'linear_layer_state_dict': linear_layer.state_dict()
    }
    torch.save(state, path)
def load_model(path, model, optimizer, linear_layer):
    checkpoint = torch.load(path)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    linear_layer.load_state_dict(checkpoint['linear_layer_state_dict'])
    epoch = checkpoint['epoch']
    return epoch


In [None]:
from transformers import AutoTokenizer, BertModel, GPT2LMHeadModel, GPT2Tokenizer
import torch.optim as optim

import torch
import math
import time
import sys
import json
import numpy as np


def main():
    torch.manual_seed(0)
    answers = ['A','B','C','D']

    train = []
    test = []
    valid = []
    train_data =[]
    valid_data =[]
    test_data =[]
    file_name = 'train_complete.jsonl'
    with open(file_name) as json_file:
        json_list = list(json_file)
    for i in range(len(json_list)):
        json_str = json_list[i]
        result = json.loads(json_str)

        base = result['fact1'] + ' [SEP] ' + result['question']['stem']
        ans = answers.index(result['answerKey'])

        obs = []
        for j in range(4):
            text = base + result['question']['choices'][j]['text'] + ' [SEP]'
            if j == ans:
                label = 1
            else:
                label = 0
            obs.append([text,label])
        train.append(obs)

        # print(obs)
        # print(' ')

        # print(result['question']['stem'])
        # print(' ',result['question']['choices'][0]['label'],result['question']['choices'][0]['text'])
        # print(' ',result['question']['choices'][1]['label'],result['question']['choices'][1]['text'])
        # print(' ',result['question']['choices'][2]['label'],result['question']['choices'][2]['text'])
        # print(' ',result['question']['choices'][3]['label'],result['question']['choices'][3]['text'])
        # print('  Fact: ',result['fact1'])
        # print('  Answer: ',result['answerKey'])
        # print('  ')

        train_data.append({
            "fact": result['fact1'],
            "question": result['question']['stem'],
            "choice_label": result['question']['choices'][j]['label'],
            "choice_text": result['question']['choices'][j]['text'],
            "answer": result['answerKey'],
            "text_with_choice": text,
            "label": label
        })
        #train_data.append(obs)


    df_train = pd.DataFrame(train_data)
    df_train.head()

    file_name = 'dev_complete.jsonl'
    with open(file_name) as json_file:
        json_list = list(json_file)
    for i in range(len(json_list)):
        json_str = json_list[i]
        result = json.loads(json_str)

        base = result['fact1'] + ' [SEP] ' + result['question']['stem']
        ans = answers.index(result['answerKey'])

        obs = []
        for j in range(4):
            text = base + result['question']['choices'][j]['text'] + ' [SEP]'
            if j == ans:
                label = 1
            else:
                label = 0
            obs.append([text,label])
        valid.append(obs)
        valid_data.append({
            "fact": result['fact1'],
            "question": result['question']['stem'],
            "choice_label": result['question']['choices'][j]['label'],
            "choice_text": result['question']['choices'][j]['text'],
            "answer": result['answerKey'],
            "text_with_choice": text,
            "label": label
        })


    df_valid = pd.DataFrame(valid)

    file_name = 'test_complete.jsonl'
    with open(file_name) as json_file:
        json_list = list(json_file)
    for i in range(len(json_list)):
        json_str = json_list[i]
        result = json.loads(json_str)

        base = result['fact1'] + ' [SEP] ' + result['question']['stem']
        ans = answers.index(result['answerKey'])

        obs = []
        for j in range(4):
            text = base + result['question']['choices'][j]['text'] + ' [SEP]'
            if j == ans:
                label = 1
            else:
                label = 0
            obs.append([text,label])
        test.append(obs)
        test_data.append({
            "fact": result['fact1'],
            "question": result['question']['stem'],
            "choice_label": result['question']['choices'][j]['label'],
            "choice_text": result['question']['choices'][j]['text'],
            "answer": result['answerKey'],
            "text_with_choice": text,
            "label": label
        })


    df_test = pd.DataFrame(test)

    train_data = load_data('train_complete.jsonl', answers)
    valid_data = load_data('dev_complete.jsonl', answers)
    test_data = load_data('test_complete.jsonl', answers)

    df_train = pd.DataFrame(train_data)
    df_valid = pd.DataFrame(valid_data)
    df_test = pd.DataFrame(test_data)

    tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
    model = BertModel.from_pretrained("bert-base-uncased")
    optimizer = optim.Adam(model.parameters(), lr=3e-5)
    linear = torch.rand(768,2)

    train_dataset = MCQADataset(train_data, tokenizer)
    valid_dataset = MCQADataset(valid_data, tokenizer)
    test_dataset = MCQADataset(test_data, tokenizer)

    train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
    valid_loader = DataLoader(valid_dataset, batch_size=8, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

    global linear_layer, criterion
    #linear_layer = torch.nn.Parameter(torch.rand(768, 2))
    linear_layer = torch.nn.Linear(768, 4)
    criterion = torch.nn.CrossEntropyLoss()

    #optimizer = optim.Adam(list(model.parameters()) + [linear_layer], lr=3e-5)
    optimizer = optim.Adam(list(model.parameters()) + list(linear_layer.parameters()), lr=3e-5)


    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    linear_layer = linear_layer.to(device)


    output_dir = './model_checkpoints'
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    checkpoint_path = os.path.join(output_dir, "latest_checkpoint.bin")
    start_epoch = 0
    if os.path.exists(checkpoint_path):
        start_epoch = load_model(checkpoint_path, model, optimizer, linear_layer)
        print(f"Resumed from checkpoint: epoch {start_epoch + 1}")

    for epoch in range(3):
        print(f"Epoch {epoch + 1}/{3}")
        train_loss, train_accuracy = train_model(train_loader, model, linear_layer, optimizer, device)
        print(f"Epoch {epoch + 1}, Train Loss: {train_loss}, Train Accuracy: {train_accuracy}")

        valid_accuracy = evaluate_model(valid_loader, model, linear_layer, device)
        print(f"Epoch {epoch + 1}, Train Loss: {train_loss}, Validation Accuracy: {valid_accuracy}")
        model_save_path = os.path.join(output_dir, f"model_epoch_{epoch + 1}.bin")
        torch.save(model.state_dict(), model_save_path)

    test_accuracy = evaluate_model(test_loader, model, linear_layer, device)
    print(f"Test Accuracy: {test_accuracy}")

#    Add code to fine-tune and test your MCQA classifier.
    #return df_train.head()

if __name__ == "__main__":
    main()



The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]



config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

Epoch 1/3


Training: 100%|██████████| 2479/2479 [30:07<00:00,  1.37it/s]


Epoch 1, Train Loss: 0.5264545622168649, Train Accuracy: 0.7570102884809361


Evaluating: 100%|██████████| 250/250 [00:56<00:00,  4.42it/s]


Epoch 1, Train Loss: 0.5264545622168649, Validation Accuracy: 0.769
Epoch 2/3


Training: 100%|██████████| 2479/2479 [30:07<00:00,  1.37it/s]


Epoch 2, Train Loss: 0.38015122536514234, Train Accuracy: 0.8359895097841437


Evaluating: 100%|██████████| 250/250 [00:56<00:00,  4.42it/s]


Epoch 2, Train Loss: 0.38015122536514234, Validation Accuracy: 0.7795
Epoch 3/3


Training: 100%|██████████| 2479/2479 [30:07<00:00,  1.37it/s]


Epoch 3, Train Loss: 0.22159426725710343, Train Accuracy: 0.915473068388138


Evaluating: 100%|██████████| 250/250 [00:56<00:00,  4.41it/s]


Epoch 3, Train Loss: 0.22159426725710343, Validation Accuracy: 0.784


Evaluating: 100%|██████████| 250/250 [00:57<00:00,  4.38it/s]

Test Accuracy: 0.788



