# CodeBERT
## Luis Arturo Rendon Inarritu
### A01703572 

### Imports

In [None]:
import torch
import torch.nn as nn
from torch.optim import AdamW
from os.path import join as joinpath
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score, accuracy_score, recall_score, confusion_matrix, ConfusionMatrixDisplay
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers.optimization import get_linear_schedule_with_warmup
from tqdm import tqdm
from torch.utils.data import DataLoader, Dataset, random_split
from sklearn.model_selection import train_test_split
import copy
import torch.nn.functional as F
from sklearn.metrics import accuracy_score, confusion_matrix
from tqdm.notebook import tqdm, trange

### Model name and dataset read

In [3]:
MODEL_NAME = "microsoft/codebert-base"
plag_dataset = pd.read_csv(joinpath('..', 'plag_dataset.csv'))

In [4]:
print(f'Dataset lenght {len(plag_dataset)}')
plag_dataset.head()

Dataset lenght 1371


Unnamed: 0.1,Unnamed: 0,original,untrusted,plagiarism
0,0,public class T1 { public static void main(Str...,import java.util.Scanner; public class Soal1 ...,0
1,1,public class T1 { public static void main(Str...,public class Kasus1L1 { public stati...,1
2,2,public class T1 { public static void main(Str...,public class Kasus1L3 { public stati...,1
3,3,public class T1 { public static void main(Str...,public class Kasus1L6 { public ...,1
4,4,public class T1 { public static void main(Str...,public class L1 { public static void main...,1


### Divide dataset in train 80% and test 20%

In [5]:
train, test = train_test_split(plag_dataset, test_size=0.2, random_state=69)

In [6]:
print(f'Train size: {len(train)}\nTest size: {len(test)}')

Train size: 1096
Test size: 275


## Tokenization
This section is meant to show the tokenization process for the bert transformer

In [7]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

In [8]:
print(tokenizer)

RobertaTokenizerFast(name_or_path='microsoft/codebert-base', vocab_size=50265, model_max_length=512, is_fast=True, padding_side='right', truncation_side='right', special_tokens={'bos_token': '<s>', 'eos_token': '</s>', 'unk_token': '<unk>', 'sep_token': '</s>', 'pad_token': '<pad>', 'cls_token': '<s>', 'mask_token': '<mask>'}, clean_up_tokenization_spaces=False, added_tokens_decoder={
	0: AddedToken("<s>", rstrip=False, lstrip=False, single_word=False, normalized=True, special=True),
	1: AddedToken("<pad>", rstrip=False, lstrip=False, single_word=False, normalized=True, special=True),
	2: AddedToken("</s>", rstrip=False, lstrip=False, single_word=False, normalized=True, special=True),
	3: AddedToken("<unk>", rstrip=False, lstrip=False, single_word=False, normalized=True, special=True),
	50264: AddedToken("<mask>", rstrip=False, lstrip=True, single_word=False, normalized=False, special=True),
}
)


In [9]:
original_string = 'public static void main(String[] args){}'
print(f'Original string: {original_string}')

token_ids = tokenizer.encode(original_string)
print(f'Token IDs: {token_ids}')

tokens = tokenizer.convert_ids_to_tokens(token_ids)
print(f'Tokens: {tokens}')

Original string: public static void main(String[] args){}
Token IDs: [0, 15110, 25156, 13842, 1049, 1640, 34222, 48992, 49503, 48512, 24303, 2]
Tokens: ['<s>', 'public', 'Ġstatic', 'Ġvoid', 'Ġmain', '(', 'String', '[]', 'Ġargs', '){', '}', '</s>']


In [10]:
sourceCode = plag_dataset['original'].iloc[0]

# With encode_plus we generate both an id an attention mask
# when sequences are padded we need a mask for the attention scores to avoid 
# tokens to be affected by the zeros padding
token_ids = tokenizer.encode_plus(
  sourceCode,
  max_length = 512,
  padding = 'max_length',
  truncation = True,
  return_tensors = 'pt',
)

print('Token IDs:')
print(token_ids['input_ids'])

print('Attention Mask:')
print(token_ids['attention_mask'])


Token IDs:
tensor([[    0,   285,  1380,   255,   134, 25522,   285, 25156, 13842,  1049,
          1640, 34222, 48992, 49503,    43, 25522,  5149,     4,   995,     4,
         49396, 46469, 25194,     7, 24549, 45751,  5149,     4,   995,     4,
         49396, 46469, 25194,     7, 24549, 45751,  5149,     4,   995,     4,
         49396, 46469, 25194,     7, 24549, 45751,  5149,     4,   995,     4,
         49396, 46469, 25194,     7, 24549, 45751,  5149,     4,   995,     4,
         49396, 46469, 25194,     7, 24549, 45751, 35524, 35524,  1437,     2,
             1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
             1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
             1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
             1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
             1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
             1,     1,     1,     1,     

# Model definition

In [11]:
model = AutoModelForSequenceClassification.from_pretrained(
    MODEL_NAME,
    num_labels=2,
    problem_type="single_label_classification"
)

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at microsoft/codebert-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


## Data Loading
We will create an in-memory dataset with the pre-computed ids and attention masks tensors
and another dataset for the pre-generating the BERT embeddings

In [12]:
class TokenizedDataset(Dataset):
    def __init__(self, df: pd.DataFrame):
        super().__init__()
        self.tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

        # just keep the texts and the label
        self.original  = df["original"].tolist()
        self.untrusted = df["untrusted"].tolist()
        self.labels    = df["plagiarism"].tolist()

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return {
            "code_a": self.original[idx],
            "code_b": self.untrusted[idx],
            "label":  self.labels[idx],
        }

In [13]:
def collate_fn(batch, max_len=256):
    texts_a = [item['code_a'] for item in batch]
    texts_b = [item['code_b'] for item in batch]
    labels  = torch.tensor([item['label'] for item in batch])

    enc = tokenizer(
        texts_a,
        texts_b,
        padding='max_length',
        truncation=True,
        max_length=max_len,
        return_tensors='pt'
    )
    enc['labels'] = labels
    return enc


In [14]:
class EmbeddedDataset(Dataset):
  def __init__(self, path):
    # We just read the tensors from a file after generating them (keeps this class less cluttered)
    data = torch.load(path)
    self.originals = data['originals']
    self.untrusted = data['untrusted']
    self.labels = data['labels']

  def __len__(self):
    return len(self.labels)

  def __getitem__(self, idx):
    return {
      'original': self.originals[idx],
      'untrusted': self.untrusted[idx],
      'label': self.labels[idx],
    }

In [15]:
train_dataset = TokenizedDataset(train)
test_dataset = TokenizedDataset(test)

# GPU memory is limited so we need the loader to give us the data in batches 
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True,
                          collate_fn=collate_fn)
test_loader  = DataLoader(test_dataset,  batch_size=16,
                          collate_fn=collate_fn)


# 1 -------

In [17]:
def evaluate(model, dataloader, device, loss_fn, get_cm=False):
    model.eval()
    total_loss = 0.0
    y_true, y_pred = [], []

    with torch.no_grad():
        for batch in dataloader:
            batch = {k: v.to(device) for k, v in batch.items()}
            labels = batch["labels"]           # shape [B]
            outputs = model(**batch)
            
            total_loss += outputs.loss.item() * labels.size(0)

            # logits → probs → hard predictions (class-1 threshold = 0.5)
            probs = torch.sigmoid(outputs.logits[:, 1])  # take column for class 1
            preds = (probs >= 0.5).long()

            y_true.extend(labels.cpu().tolist())
            y_pred.extend(preds.cpu().tolist())

    avg_loss = total_loss / len(dataloader.dataset)
    acc      = accuracy_score(y_true, y_pred)

    metrics = {"loss": avg_loss, "accuracy": acc}
    if get_cm:
        metrics["cm"] = confusion_matrix(y_true, y_pred)
    return metrics

In [18]:
EPOCHS = 150

history = {
    "train_loss": [], "test_loss": [],
    "train_accuracy": [], "test_accuracy": []
}

optimizer = AdamW(model.parameters(), lr=2e-5)
scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=0,
    num_training_steps=len(train_loader)*EPOCHS
)

device = 'cuda' if torch.cuda.is_available() else 'cpu'
model.to(device)

for epoch in trange(EPOCHS, desc="Epoch"):
    model.train()
    running_loss = 0
    progress = tqdm(train_loader, leave=False)
    for step, batch in enumerate(progress):
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(**batch)

        loss = outputs.loss
        loss.backward()
        optimizer.step(); scheduler.step(); optimizer.zero_grad()

        running_loss += loss.item()
        progress.set_postfix(loss=loss.item())
        
# ▶ collect metrics
train_metrics = evaluate(model, train_loader, device, loss_fn)
test_metrics  = evaluate(model, test_loader,  device, loss_fn)

for k in history:
    history[k].append(train_metrics[k.split('_')[0]] if 'train' in k else test_metrics[k.split('_')[0]])

print(f"Epoch {epoch+1:02d} | "
    f"train loss {train_metrics['loss']:.4f} – acc {train_metrics['accuracy']:.3f} | "
    f"test loss  {test_metrics['loss']:.4f} – acc {test_metrics['accuracy']:.3f}")


Epoch:   0%|          | 0/150 [00:03<?, ?it/s]


KeyboardInterrupt: 

In [26]:
model.eval()
with torch.no_grad():
    for batch in test_loader:
        batch = {k: v.to(device) for k, v in batch.items()}
        logits = model(**batch).logits  # shape [B,2]
        preds = logits.argmax(1)
        ...


# Plotting


In [None]:
eval_metrics = evaluate(model, test_loader, device, loss_fn, get_cm=True)

from sklearn.metrics import ConfusionMatrixDisplay
disp = ConfusionMatrixDisplay(
    confusion_matrix=eval_metrics["cm"],
    display_labels=["non-plagiarized", "plagiarized"]
)
disp.plot(cmap="Blues")
plt.title("Confusion Matrix – Test set")
plt.show()
