In [None]:
import argparse
import math
import time
import sys
import json
import os
import shutil
import random
import pickle
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from transformers import AutoTokenizer, BertModel, GPT2LMHeadModel, GPT2Tokenizer
from google.colab import drive
drive.mount('/content/drive')
cs461dir = '/content/drive/My Drive/Colab Notebooks/CS461/'

file_paths = ['train_complete.jsonl', 'dev_complete.jsonl', 'test_complete.jsonl']
if any(not os.path.exists(filepath) for filepath in file_paths):
  !cp -r "{cs461dir}"* /content/
  print("Files imported")

model_path = "./model_weights"
os.makedirs(model_path, exist_ok=True)

Mounted at /content/drive
Files imported


In [None]:
def load_data_generative():
  train = []
  valid = []
  test = []

  file_name = 'train_complete.jsonl'
  with open(file_name) as json_file:
      json_list = list(json_file)
  for i in range(len(json_list)):
      json_str = json_list[i]
      result = json.loads(json_str)

      base = result["fact1"] + " " + result["question"]["stem"]
      choices = result["question"]["choices"]
      ans = result["answerKey"]

      text = f'[START] {base} [A] {choices[0]["text"]} [B] {choices[1]["text"]} [C] {choices[2]["text"]} [D] {choices[3]["text"]} [ANSWER] {ans}'
      train.append([text, ans])

  file_name = 'dev_complete.jsonl'
  with open(file_name) as json_file:
      json_list = list(json_file)
  for i in range(len(json_list)):
      json_str = json_list[i]
      result = json.loads(json_str)

      base = result["fact1"] + " " + result["question"]["stem"]
      choices = result["question"]["choices"]
      ans = result["answerKey"]

      text = f'[START] {base} [A] {choices[0]["text"]} [B] {choices[1]["text"]} [C] {choices[2]["text"]} [D] {choices[3]["text"]} [ANSWER] {ans}'
      valid.append([text, ans])

  file_name = 'test_complete.jsonl'
  with open(file_name) as json_file:
      json_list = list(json_file)
  for i in range(len(json_list)):
      json_str = json_list[i]
      result = json.loads(json_str)

      base = result["fact1"] + " " + result["question"]["stem"]
      choices = result["question"]["choices"]
      ans = result["answerKey"]

      text = f'[START] {base} [A] {choices[0]["text"]} [B] {choices[1]["text"]} [C] {choices[2]["text"]} [D] {choices[3]["text"]} [ANSWER] {ans}'
      test.append([text, ans])

  return train, valid, test

In [None]:
def train_model(model, opt):
  print("training model...")
  model.to(opt.device)
  model.train()

  optimizer = opt.optimizer
  data = opt.train

  for epoch in range(opt.epochs):
    total_loss = 0
    random.shuffle(data)
    for i in range(0, len(data), opt.batchsize):
      batch = data[i: i + opt.batchsize]
      inputs = []

      for question, _ in batch:
        input = opt.tokenizer(question, padding = 'max_length', truncation=True, max_length = opt.max_length)
        inputs.append(torch.tensor(input["input_ids"]).squeeze(0))

      inputs = torch.stack(inputs).to(opt.device)

      optimizer.zero_grad()
      outputs = model(input_ids = inputs, labels = inputs)
      loss = outputs.loss
      loss.backward()

      nn.utils.clip_grad_norm_(model.parameters(), opt.norm)
      optimizer.step()

      total_loss += loss.item()

    avg_loss = total_loss / (len(data) / opt.batchsize)
    print(f"Epoch {epoch+1}/{opt.epochs}, Loss: {avg_loss:.4f}")

In [None]:
def test(model, opt, test_dataset="valid", verbose = False):
  model.eval()
  model.to(opt.device)

  data = getattr(opt, test_dataset)

  total_correct = 0
  num_printed = 0
  num_samples = len(data) - len(data) % opt.batchsize

  with torch.no_grad():
    for i in range(0, num_samples, opt.batchsize):
      batch = data[i: i + opt.batchsize]
      prompts = []
      labels = []
      # remove answer
      for question, answer in batch:
        prompt = question.split("[ANSWER]")[0] + "[ANSWER]"
        prompts.append(prompt)
        labels.append(answer)

      inputs = opt.tokenizer(prompts, padding=True, truncation=True, return_tensors="pt").to(opt.device)

      outputs = model.generate(input_ids = inputs["input_ids"], max_new_tokens = opt.max_new_tokens, pad_token_id = opt.tokenizer.eos_token_id)
      generated = opt.tokenizer.batch_decode(outputs, skip_special_tokens=True)
      generated = [gen_ans.split("[ANSWER]")[1].strip()[0] if "[ANSWER]" in gen_ans else "" for gen_ans in generated]

      for (question_input, true_answer, gen_answer) in zip(prompts, labels, generated):
        # print examples
        if verbose and num_printed < opt.num_examples:
          print("\n Example", num_printed + 1)
          print(f" Question: {question_input}")
          print(f" True Answer: {true_answer}")
          print(f" Predicted Answer: {gen_answer}")
          num_printed += 1
        if gen_answer == true_answer:
          total_correct += 1

  accuracy = total_correct / num_samples
  return accuracy

In [None]:
def main():
  random.seed(10)

  parser = argparse.ArgumentParser()
  parser.add_argument('-no_cuda', action='store_true')
  parser.add_argument('-epochs', type=int, default=20)
  parser.add_argument('-batchsize', type=int, default=8)
  parser.add_argument('-lr', type=float, default=5e-5)
  parser.add_argument('-norm', type=float, default=1.0)
  parser.add_argument('-max_length', type=int, default=128)
  parser.add_argument('-max_new_tokens', type=int, default=3)
  parser.add_argument('-num_examples', type=int, default=8)

  if "google.colab" in sys.modules:
    sys.argv = ["notebook"]

  opt = parser.parse_args()

  opt.device = torch.device("cuda" if torch.cuda.is_available() and not opt.no_cuda else "cpu")

  model = GPT2LMHeadModel.from_pretrained("gpt2")

  opt.optimizer = optim.AdamW(model.parameters(), lr=opt.lr)

  tokenizer = AutoTokenizer.from_pretrained("gpt2")
  if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = "left"
  opt.tokenizer = tokenizer

  opt.train, opt.valid, opt.test = load_data_generative()

  print("testing model pre-trained model...\n")

  zeroshot_valid_acc = test(model, opt, test_dataset = "valid")
  zeroshot_test_acc = test(model, opt, test_dataset = "test")
  print(f"Zero-shot accuracy on validation dataset: {zeroshot_valid_acc:.3f}")
  print(f"Zero-shot accuracy on test dataset: {zeroshot_test_acc:.3f}")

  print("Now fine-tuning...")

  train_model(model, opt)

  print("testing model fine-tuned model...\n")

  finetuned_valid_acc = test(model, opt, test_dataset = "valid", verbose = True)
  finetuned_test_acc = test(model, opt, test_dataset = "test", verbose = True)
  print(f"Fine-tuned accuracy on validation dataset: {finetuned_valid_acc:.3f}")
  print(f"Fine-tuned accuracy on test dataset: {finetuned_test_acc:.3f}")

if __name__ == "__main__":
  main()

testing model pre-trained model...

Zero-shot accuracy on validation dataset: 0.004
Zero-shot accuracy on test dataset: 0.000
Now fine-tuning...
training model...
Epoch 1/20, Loss: 1.0673
Epoch 2/20, Loss: 0.8510
Epoch 3/20, Loss: 0.7551
Epoch 4/20, Loss: 0.6893
Epoch 5/20, Loss: 0.6330
Epoch 6/20, Loss: 0.5811
Epoch 7/20, Loss: 0.5355
Epoch 8/20, Loss: 0.4895
Epoch 9/20, Loss: 0.4483
Epoch 10/20, Loss: 0.4099
Epoch 11/20, Loss: 0.3749
Epoch 12/20, Loss: 0.3445
Epoch 13/20, Loss: 0.3162
Epoch 14/20, Loss: 0.2903
Epoch 15/20, Loss: 0.2680
Epoch 16/20, Loss: 0.2476
Epoch 17/20, Loss: 0.2295
Epoch 18/20, Loss: 0.2151
Epoch 19/20, Loss: 0.2006
Epoch 20/20, Loss: 0.1893
testing model fine-tuned model...


 Example 1
 Question: [START] deep sea animals live deep in the ocean Frilled sharks and angler fish live far beneath the surface of the ocean, which is why they are known as [A] Deep sea animals [B] fish [C] Long Sea Fish [D] Far Sea Animals [ANSWER]
 True Answer: A
 Predicted Answer: A

