# **Alfred Code Classifier**

## Dependency Installs and Imports

In [None]:
# Dependency Installs
!pip install --upgrade pip
!pip install transformers datasets tokenizers torch tqdm sentencepiece accelerate

In [None]:
# Imports
import os
import math
import random
from pathlib import Path
from tqdm.auto import tqdm
import numpy as np
import torch
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

from datasets import load_dataset
from tokenizers import Tokenizer, trainers, pre_tokenizers, models, processors, normalizers
from transformers import PreTrainedTokenizerFast, AlbertConfig, AlbertForSequenceClassification, get_linear_schedule_with_warmup
from torch.optim import AdamW

seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)

OUTPUT_DIR = Path("Alfred")
OUTPUT_DIR.mkdir(exist_ok=True)

## Dataset Downloading and Loading

In [None]:
# Downloading and Loading the Dataset
dataset_name = "burtenshaw/PleIAs_common_corpus_code_classification"
print("Loading dataset:", dataset_name)
ds = load_dataset(dataset_name)
print(ds)

## Tokenizer and Batches Creation

In [None]:
# Tokenizer Training
from pathlib import Path
vocab_size = 10000
tokenizer_dir = Path("tokenizer")
tokenizer_dir.mkdir(exist_ok=True)

def batch_iterator(split_name="train", batch_size=1000):
    for i in range(0, len(ds[split_name]), batch_size):
        yield [str(x) for x in ds[split_name]['text'][i : i + batch_size]]

if not (tokenizer_dir / "tokenizer.json").exists():
    print("Training a WordPiece tokenizer on the dataset (vocab_size=%d)..." % vocab_size)
    wp_model = models.WordPiece(unk_token="[UNK]")
    tokenizer = Tokenizer(wp_model)
    tokenizer.normalizer = normalizers.BertNormalizer(lowercase=True)
    tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()
    trainer = trainers.WordPieceTrainer(vocab_size=vocab_size, special_tokens=["[PAD]","[UNK]","[CLS]","[SEP]","[MASK]"])
    tokenizer.train_from_iterator(batch_iterator("train"), trainer=trainer)
    tokenizer.post_processor = processors.TemplateProcessing(
        single="[CLS] $A [SEP]",
        pair="[CLS] $A [SEP] $B:1 [SEP]:1",
        special_tokens=[("[CLS]", tokenizer.token_to_id("[CLS]")), ("[SEP]", tokenizer.token_to_id("[SEP]"))],
    )
    tokenizer.save(str(tokenizer_dir / "tokenizer.json"))
    print("Tokenizer trained and saved to", tokenizer_dir)
else:
    print("Tokenizer already exists, loading...")
    tokenizer = Tokenizer.from_file(str(tokenizer_dir / "tokenizer.json"))

tok = PreTrainedTokenizerFast(tokenizer_object=tokenizer)
tok.add_special_tokens({"pad_token":"[PAD]", "unk_token":"[UNK]", "cls_token":"[CLS]", "sep_token":"[SEP]", "mask_token":"[MASK]"})
print("Vocab size:", tok.vocab_size)

In [None]:
# Analyzing Labels
all_labels = set()
for split in ds.keys():
    for lbl in ds[split]['labels']:
        all_labels.add(lbl)

all_labels = sorted(list(all_labels))
print("Unique labels across full dataset:", all_labels)

label2id = {lbl: i for i, lbl in enumerate(all_labels)}
id2label = {i: lbl for lbl, i in label2id.items()}
num_labels = len(all_labels)

print("num_labels =", num_labels)
print("label2id =", label2id)

def map_label_batch(batch):
    return {'labels': [label2id[x] for x in batch['labels']]}

ds = ds.map(map_label_batch, batched=True)

In [None]:
# Expanding the dataset with a 256-token sliding window and stride of 128
max_length = 256
stride = 128

def sliding_window_tokenize(examples):
    batch_tokens = tok(
        examples["text"],
        truncation=False,
        padding=False,
        add_special_tokens=True
    )

    all_input_ids = []
    all_attention = []
    all_labels = []

    for input_ids, label in zip(batch_tokens["input_ids"], examples["labels"]):

        total_len = len(input_ids)
        start = 0

        while True:
            end = start + max_length

            window_ids = input_ids[start:end]

            all_input_ids.append(window_ids)
            all_attention.append([1] * len(window_ids))
            all_labels.append(label)

            if end >= total_len:
                break

            start += stride

    padded = tok.pad(
        {
            "input_ids": all_input_ids,
            "attention_mask": all_attention
        },
        padding="max_length",
        max_length=max_length,
        return_tensors=None
    )

    return {
        "input_ids": padded["input_ids"],
        "attention_mask": padded["attention_mask"],
        "labels": all_labels
    }


print("Tokenizing dataset with optimized sliding windows...")

keep_cols = [c for c in ds['train'].column_names if c not in ['labels']]

tokenized = ds.map(
    sliding_window_tokenize,
    batched=True,
    remove_columns=keep_cols
)

print(tokenized)

In [None]:
# Creating the batches for training
def collate_fn(batch):
    input_ids = [torch.tensor(b['input_ids'], dtype=torch.long) for b in batch]
    attention_mask = [torch.tensor(b['attention_mask'], dtype=torch.long) for b in batch]
    labels = torch.tensor([b['labels'] for b in batch], dtype=torch.long)
    input_ids_padded = pad_sequence(input_ids, batch_first=True, padding_value=tok.pad_token_id)
    attention_mask_padded = pad_sequence(attention_mask, batch_first=True, padding_value=0)
    return {"input_ids": input_ids_padded, "attention_mask": attention_mask_padded, "labels": labels}

batch_size = 64
train_loader = DataLoader(tokenized['train'], batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
if 'validation' in tokenized:
    val_loader = DataLoader(tokenized['validation'], batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
else:
    small_val = tokenized['train'].train_test_split(test_size=0.1, seed=seed)
    train_loader = DataLoader(small_val['train'], batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(small_val['test'], batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

print("Batches - train:", len(train_loader), " val:", len(val_loader))

## Defining The Model Architecture

In [None]:
# Model Architecture
config = AlbertConfig(
    vocab_size=11273, #hardcodded
    embedding_size=64,
    hidden_size=128,
    num_hidden_layers=12,
    num_attention_heads=4,
    intermediate_size=512,
    hidden_act="gelu",
    type_vocab_size=2,
    layer_norm_eps=1e-12,
    classifier_dropout_prob=0.1,
    hidden_dropout_prob=0.1,
    attention_probs_dropout_prob=0.1,
    num_labels=155, #hardcodded
)

model = AlbertForSequenceClassification(config)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)


## Training and Validation

In [None]:
# Training Loop
epochs = 4
total_steps = epochs * len(train_loader)
optimizer = AdamW(model.parameters(), lr=2e-4)
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=int(0.05*total_steps), num_training_steps=total_steps)

from sklearn.metrics import accuracy_score

model.train()
for epoch in range(1, epochs+1):
    running_loss = 0.0
    pbar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch}/{epochs}")
    for step, batch in pbar:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()
        running_loss += loss.item()
        if (step+1) % 10 == 0 or (step+1)==len(train_loader):
            pbar.set_postfix({'loss': f'{running_loss/(step+1):.4f}'})
    avg_train_loss = running_loss / len(train_loader)
    # Validation
    model.eval()
    preds = []
    trues = []
    with torch.no_grad():
        for batch in tqdm(val_loader, desc="Validation", leave=False):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs.logits
            batch_preds = torch.argmax(logits, dim=-1).cpu().numpy().tolist()
            preds.extend(batch_preds)
            trues.extend(labels.cpu().numpy().tolist())
    val_acc = accuracy_score(trues, preds)
    print(f"Epoch {epoch} finished. Train loss: {avg_train_loss:.4f} | Val accuracy: {val_acc:.4f}")
    model.train()

## Saving and Loading the Model and Tokenizer

In [None]:
# Model Saving
print('Saving model and tokenizer to', OUTPUT_DIR)
model.save_pretrained(OUTPUT_DIR)
tok.save_pretrained(OUTPUT_DIR)
print('Done.')

In [None]:
!zip Alfred.zip CodeClassifier tokenizer

In [None]:
# Loading the Model and Tokenizer
!unzip Alfred.zip
tok = PreTrainedTokenizerFast.from_pretrained(OUTPUT_DIR)
model = AlbertForSequenceClassification.from_pretrained(OUTPUT_DIR)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

print("Model and tokenizer loaded. Ready to resume training.")

## Confusiom Matrix for Training and Validation

In [None]:
# Confusion Matrix
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

model.eval()
train_preds = []
train_trues = []

with torch.no_grad():
    for step, batch in enumerate(tqdm(train_loader, desc="Training set predictions")):
      input_ids = batch['input_ids'].to(device)
      attention_mask = batch['attention_mask'].to(device)
      labels = batch['labels'].to(device)

      outputs = model(input_ids=input_ids, attention_mask=attention_mask)
      logits = outputs.logits
      batch_preds = torch.argmax(logits, dim=-1).cpu().numpy().tolist()
      train_preds.extend(batch_preds)
      train_trues.extend(labels.cpu().numpy().tolist())

# Confusion matrix
train_trues_str = [all_labels[i] for i in train_trues]
train_preds_str = [all_labels[i] for i in train_preds]
cm = confusion_matrix(train_trues_str, train_preds_str, labels=all_labels)
cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

# Plot
plt.figure(figsize=(100, 100))
sns.heatmap(cm_norm, annot=False, cmap='Blues', xticklabels=all_labels, yticklabels=all_labels)
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('155x155 Confusion Matrix on Training Set')
plt.show()