<a href="https://colab.research.google.com/github/claire-fang/cs182-final-project/blob/main/array_sorting/gpt2sort_final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# @title Mount your Google Drive

import os
from google.colab import drive
drive.mount('/content/gdrive')

DRIVE_PATH = '/content/gdrive/My\ Drive/gpt2_sort'
DRIVE_PYTHON_PATH = DRIVE_PATH.replace('\\', '')
if not os.path.exists(DRIVE_PYTHON_PATH):
  %mkdir $DRIVE_PATH

%cd $DRIVE_PATH

## The space in `My Drive` causes some issues, so make a symlink to avoid this.
SYM_PATH = '/content/gpt2_sort'
if not os.path.exists(SYM_PATH):
  !ln -s $DRIVE_PATH $SYM_PATH

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).
/content/gdrive/My Drive/gpt2_sort


In [22]:
import numpy as np
import os
import random
import torch
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset
from transformers import (
    GPT2Tokenizer,
    GPT2LMHeadModel,
    Trainer,
    TrainingArguments,
    GPT2Model
)
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from collections import defaultdict
import matplotlib.pyplot as plt

# GPT-2 Sort Fine-tuning & Inference with Masked Loss
This notebook fine-tunes GPT-2 on a custom sort dataset, masks the prompt tokens in loss so the model only learns to predict the output list, and supports inference.

## Customized Dataset

In [None]:
class SortDataset(Dataset):
    """
    Each example has a prompt and an answer; inject an EOS after the prompt and another after the answer.
    """
    def __init__(self, file_path, tokenizer, max_length=128):
        with open(file_path, 'r') as f:
            text = f.read().strip()
        self.examples = text.split('\n\n')
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.examples)

    def __getitem__(self, idx):
        # Split into prompt and answer
        raw = self.examples[idx]             # e.g. "Input: [13, 4]\nOutput: [4, 13]"
        prompt, answer = raw.split('\n', 1)
        eos = self.tokenizer.eos_token

        # Inject EOS between prompt and answer, and at the end
        full = prompt + eos + answer + eos

        tok = self.tokenizer(
            full,
            truncation=True,
            max_length=self.max_length,
            padding='max_length',
            return_tensors='pt'
        )
        return {
            'input_ids':      tok['input_ids'].squeeze(0),
            'attention_mask': tok['attention_mask'].squeeze(0)
        }

In [None]:
def sort_collate(batch):
    # 1) pull out lists of individual tensors
    input_ids_list     = [item['input_ids']     for item in batch]
    attention_mask_list= [item['attention_mask'] for item in batch]

    # 2) pad up to the longest in this batch
    input_ids     = pad_sequence(input_ids_list,     batch_first=True, padding_value=tokenizer.pad_token_id)
    attention_mask= pad_sequence(attention_mask_list,batch_first=True, padding_value=0)

    # 3) clone for labels
    labels = input_ids.clone()

    # 4) mask out the prompt (everything up to and including the first EOS)
    eos_id = tokenizer.eos_token_id
    for i, seq in enumerate(input_ids):
        eos_positions = (seq == eos_id).nonzero(as_tuple=True)[0]
        if len(eos_positions)>0:
            prompt_end = eos_positions[0].item() + 1
        else:
            prompt_end = 0
        labels[i, :prompt_end] = -100

    # 5) mask out any padding positions
    labels[attention_mask == 0] = -100

    return {
        'input_ids':      input_ids,
        'attention_mask': attention_mask,
        'labels':         labels
    }

## Generate Training Samples

In [None]:
def generate_random_arrays(num_examples=10000, max_arr_len=5, seed=123, weight=np.array([0.05, 0.05, 0.05, 0.05, 0.8])):
  random.seed(seed)
  length_num = map(int, weight * num_examples)
  examples = []
  for len, num in enumerate(length_num):
    for _ in range(num):
      arr = random.sample(range(100), len + 1)
      sorted_arr = sorted(arr)
      example = f"Input: {arr}\nOutput: {sorted_arr}"
      examples.append(example)
  # shuffle the ordering
  random.shuffle(examples)
  return examples
# generate_random_arrays(100)

In [None]:
# # You only need to run this once!
# samples = generate_random_arrays()

# with open("./sort_train_with_weight.txt", "w") as f:
#     f.write("\n\n".join(samples))

## Fine-Tuning GPT2

In [None]:
def train(train_file, model_name='gpt2', output_dir='result',
          batch_size=2, epochs=3, max_length=128, save_steps=500):
    os.makedirs(output_dir, exist_ok=True)
    global tokenizer
    tokenizer = GPT2Tokenizer.from_pretrained(model_name)
    tokenizer.pad_token = tokenizer.eos_token
    # Save tokenizer for inference BEFORE training
    tokenizer.save_pretrained(output_dir)
    # dataset
    ds = SortDataset(train_file, tokenizer, max_length)
    # model
    model = GPT2LMHeadModel.from_pretrained(model_name)
    model.config.eos_token_id = tokenizer.eos_token_id
    model.config.pad_token_id = tokenizer.pad_token_id
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    # training args
    args = TrainingArguments(
        output_dir=output_dir,
        overwrite_output_dir=True,
        per_device_train_batch_size=batch_size,
        num_train_epochs=epochs,
        save_steps=save_steps,
        logging_steps=100,
        report_to='none'
    )
    # trainer
    trainer = Trainer(
        model=model,
        args=args,
        data_collator=sort_collate,
        train_dataset=ds
    )
    trainer.train()
    trainer.save_model()


In [None]:
# train('sort_train_with_weight.txt')

In [None]:
steps = np.arange(1000, 15001, 1000)
losses = [0.245800, 0.074000, 0.054100, 0.029900, 0.026200, 0.019400, 0.016600, 0.013000, 0.011100, 0.006500, 0.003600, 0.003200, 0.001400, 0.001200, 0.000900]
plt.plot(steps, losses)

## Inference
You can load our fine-tuned model if you don't want to train by *yourself*.

In [4]:
def infer(input_sequence: str,
          model_dir: str = 'result',
          max_new_tokens: int = 20):
    # 1) Load tokenizer & model
    tokenizer = GPT2Tokenizer.from_pretrained(model_dir)
    if tokenizer.pad_token is None:
        tokenizer.add_special_tokens({'pad_token':'<|pad|>'})
    model = GPT2LMHeadModel.from_pretrained(model_dir)
    model.resize_token_embeddings(len(tokenizer))
    model.config.eos_token_id = tokenizer.eos_token_id
    model.config.pad_token_id = tokenizer.pad_token_id

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device).eval()

    # 2) Tokenize with attention mask
    enc = tokenizer(
        input_sequence,
        return_tensors='pt',
        padding=False,
        truncation=True,
        add_special_tokens=False
    )
    input_ids = enc['input_ids'].to(device)
    attention_mask = enc['attention_mask'].to(device)

    # 3) Generate *only* max_new_tokens
    outputs = model.generate(
        input_ids,
        attention_mask=attention_mask,
        max_new_tokens=max_new_tokens,
        do_sample=False,    # ← switch to greedy
        num_beams=1,        # ← no beam search, just greedy
        eos_token_id=tokenizer.eos_token_id,
        pad_token_id=tokenizer.pad_token_id,
    )

    # 4) Slice off the prompt tokens and decode only the new ones
    gen_tokens = outputs[0, input_ids.shape[-1]:]
    result = tokenizer.decode(gen_tokens, skip_special_tokens=True)
    result = result.lstrip()
    # print(result)
    return result

### Length-2 Array Sorting

In [30]:
# generate length "len" arrays
def inference_with_len(seed, len, num_samples=100):
  num_correct = 0
  random.seed(seed)
  for i in range(num_samples):
    lst = []
    input = "Input: ["
    for j in range(len):
      random_a = random.randint(0, 100)
      lst.append(random_a)
    # sort a, b, c with python library:
    sorted_lst = sorted(lst)
    input = "Input: " + str(lst) + "\nOutput:"
    # print(input)
    if infer(input) == f'{sorted_lst}':
      num_correct += 1
    if i % 10 == 0:
        print(i)
        print(f'Accuracy: {num_correct / (i+1)}')

  print(num_samples)
  print(f'Accuracy: {num_correct / num_samples}')

In [16]:
inference_with_len(12345, 2, 100)

0
Accuracy: 1.0
10
Accuracy: 1.0
20
Accuracy: 1.0
30
Accuracy: 1.0
40
Accuracy: 1.0
50
Accuracy: 1.0
60
Accuracy: 1.0
70
Accuracy: 1.0
80
Accuracy: 1.0
90
Accuracy: 1.0
100
Accuracy: 1.0


### Length-3 Array Sorting

In [31]:
inference_with_len(23456, 3, 100)

0
Accuracy: 1.0
10
Accuracy: 1.0
20
Accuracy: 1.0
30
Accuracy: 1.0
40
Accuracy: 1.0
50
Accuracy: 0.9803921568627451
60
Accuracy: 0.9836065573770492
70
Accuracy: 0.9859154929577465
80
Accuracy: 0.9876543209876543
90
Accuracy: 0.989010989010989
100
Accuracy: 0.99


### Length-4 Array Sorting

In [None]:
inference_with_len(34567, 4, 100)

### Length-5 Array Sorting

In [None]:
inference_with_len(45678, 5, 100)

### Length-6 Array Sorting

In [21]:
inference_with_len(56789, 6, 100)

0
Accuracy: 1.0
10
Accuracy: 0.7272727272727273
20
Accuracy: 0.6190476190476191
30
Accuracy: 0.5806451612903226
40
Accuracy: 0.5121951219512195
50
Accuracy: 0.47058823529411764
60
Accuracy: 0.45901639344262296
70
Accuracy: 0.4225352112676056
80
Accuracy: 0.43209876543209874
90
Accuracy: 0.4725274725274725
100
Accuracy: 0.46


In [23]:
inference_with_len(56789, 7, 100)

0
Accuracy: 0.0
10
Accuracy: 0.5454545454545454
20
Accuracy: 0.7142857142857143
30
Accuracy: 0.6451612903225806
40
Accuracy: 0.6097560975609756
50
Accuracy: 0.5882352941176471
60
Accuracy: 0.5245901639344263
70
Accuracy: 0.5070422535211268
80
Accuracy: 0.5308641975308642
90
Accuracy: 0.5164835164835165
100
Accuracy: 0.5


## Linear Probing

In [None]:
# Load Fine-tuned result
tokenizer = GPT2Tokenizer.from_pretrained("result")
model = GPT2Model.from_pretrained("result", output_hidden_states=True)
model.eval()
for p in model.parameters():
    p.requires_grad = False

# Generate synthetic data
def generate_example():
    arr = random.sample(range(100), 5)
    sorted_arr = sorted(arr)
    return arr, [sorted_arr.index(x) for x in arr]  # arr, sorted_positions

# Get hidden states for number tokens
def extract_token_hidden_states(arr, labels):
    layer_dict = {}
    prompt = f"Input: {arr}\nOutput:"
    # Tokenize with attention mask
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    inputs = tokenizer(prompt, return_tensors="pt")

    with torch.no_grad():
        outputs = model(**inputs) # perform a forward pass

    tokens = tokenizer.tokenize(prompt)
    token_ids = inputs['input_ids'][0].tolist()
    decoded_tokens = [tokenizer.decode([tok]) for tok in token_ids]

    for layer in range(13):
        hidden = outputs.hidden_states[layer][0]  # shape: [seq_len, hidden_dim]; use [0] since we only have one batch

        features = []
        final_labels = []

        for i, tok in enumerate(decoded_tokens):
            try:
                # Only keep number tokens (e.g. "3", "Ġ1", "Ġ4")
                val = int(tok.strip())
                if val in arr:
                    idx_in_arr = arr.index(val)
                    features.append(hidden[i].numpy()) # the hidden state correspinding to the ith element inside arr
                    final_labels.append(labels[idx_in_arr])
            except:
                continue
        layer_dict[layer] = (features, final_labels)

    return layer_dict

In [None]:
# Build dataset for each layer
samples = []
for _ in range(500):  # generate 500 samples
  samples.append(generate_example())

X_dict, y_dict = defaultdict(list), defaultdict(list)
for arr, labels in samples:
  layer_dict = extract_token_hidden_states(arr, labels)
  for layer in range(13):
    feats, labs = layer_dict[layer]
    X_dict[layer].extend(feats)
    y_dict[layer].extend(labs)