<a href="https://colab.research.google.com/github/ikoojos/Algorithm-Debt-Research/blob/master/RoBERTa_DL_and_Embeddings.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%cd '/content/drive/My Drive/AD Final Experiments'

In [None]:
import sys
import os
from google.colab import drive
drive.mount('/content/drive')
sys.path.append('/content/drive/My Drive/AD Final Experiments')

import importlib
import numpy as np
import pandas as pd
from itertools import product

from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler, MultiLabelBinarizer
from sklearn.multioutput import MultiOutputClassifier

from nltk.tokenize import word_tokenize
from gensim.models import Word2Vec

import torch
from torch.utils.data import DataLoader, Dataset
from transformers import RobertaTokenizer, RobertaForSequenceClassification, Trainer, TrainingArguments, EarlyStoppingCallback

from preprocessing import preprocess_data
from splitting import split_data
from utils import *
from evaluate_model import evaluate_best_model
from lr_tuning import hyperparameter_tuning

for module in ['preprocessing', 'splitting', 'utils', 'evaluate_model', 'lr_tuning']:
    importlib.reload(sys.modules[module])


Mounted at /content/drive


In [None]:
file_path = '/content/drive/My Drive/AD Identification using SATD/liu_datset_processed.csv'
data = preprocess_data(file_path)

In [None]:
pip install transformers



In [None]:
import wandb

wandb.init(mode="disabled")

In [None]:
class_mapping = {label: idx for idx, label in enumerate(data['TDType'].unique())}
data['label'] = data['TDType'].map(class_mapping)


X_train_temp, X_test, y_train_temp, y_test = train_test_split(data['Comments'], data['label'], test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train_temp, y_train_temp, test_size=0.2, random_state=42)


tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
model = RobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=len(class_mapping))


class CustomDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts.iloc[idx])
        label = self.labels.iloc[idx]

        # Tokenize the text
        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

train_dataset = CustomDataset(X_train, y_train, tokenizer)
val_dataset = CustomDataset(X_val, y_val, tokenizer)
test_dataset = CustomDataset(X_test, y_test, tokenizer)

# Define training arguments
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=30,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    logging_dir='./logs',
    logging_steps=10,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    greater_is_better=True,
)

def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
        'f1': f1_score(labels, preds, average='weighted')
    }

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
)


trainer.train()

test_results = trainer.predict(test_dataset)

y_test_pred = np.argmax(test_results.predictions, axis=1)
print("\nClassification Report on Test Set:")
print(classification_report(y_test, y_test_pred, target_names=class_mapping.keys()))

def extract_embeddings(model, dataset):
    embeddings = []
    dataloader = DataLoader(dataset, batch_size=16)

    model.eval()
    with torch.no_grad():
        for batch in dataloader:
            inputs = {
                'input_ids': batch['input_ids'].to(model.device),
                'attention_mask': batch['attention_mask'].to(model.device),
            }
            outputs = model.roberta(**inputs)
            hidden_states = outputs.last_hidden_state[:, 0, :]  # Extract [CLS] token embeddings
            embeddings.append(hidden_states.cpu().numpy())

    return np.concatenate(embeddings, axis=0)

embeddings_train = extract_embeddings(model, train_dataset)
embeddings_val = extract_embeddings(model, val_dataset)
embeddings_test = extract_embeddings(model, test_dataset)


Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Epoch,Training Loss,Validation Loss,Accuracy,F1
1,0.6614,0.488906,0.861254,0.834444
2,0.518,0.455821,0.872026,0.857409
3,0.4606,0.447091,0.869936,0.862942
4,0.2652,0.45514,0.873794,0.874766
5,0.2247,0.51513,0.87508,0.87289
6,0.2243,0.504594,0.893248,0.888905
7,0.1341,0.520049,0.890032,0.887895
8,0.113,0.605652,0.893891,0.89017
9,0.1762,0.639823,0.890032,0.889778
10,0.1204,0.670379,0.885691,0.884954



Classification Report on Test Set:
                        precision    recall  f1-score   support

             ALGORITHM       0.55      0.36      0.43       200
         COMPATIBILITY       0.59      0.44      0.50        89
                DEFECT       0.58      0.47      0.52       135
                DESIGN       0.82      0.89      0.85      2206
         DOCUMENTATION       0.71      0.43      0.54        23
        IMPLEMENTATION       0.78      0.60      0.68       387
                  TEST       0.82      0.78      0.80       143
WITHOUT_CLASSIFICATION       0.96      0.97      0.97      4592

              accuracy                           0.89      7775
             macro avg       0.73      0.62      0.66      7775
          weighted avg       0.89      0.89      0.89      7775



In [None]:
def save_embeddings_to_csv(embeddings, file_path, labels=None):

    df = pd.DataFrame(embeddings)
    if labels is not None:
        df['label'] = labels
    df.to_csv(file_path, index=False)

save_embeddings_to_csv(embeddings_train, 'rober_train_embeddings_.csv', y_train.values)
save_embeddings_to_csv(embeddings_val, 'rober_val_embeddings_.csv', y_val.values)
save_embeddings_to_csv(embeddings_test, 'rober_test_embeddings_.csv', y_test.values)

print("Embeddings saved to CSV files.")


Embeddings saved to CSV files.


##Train LR using RobERTA Embeddings

In [None]:
from sklearn.pipeline import Pipeline

train = pd.read_csv('rober_train_embeddings_.csv')
val = pd.read_csv('rober_val_embeddings_.csv')
test = pd.read_csv('rober_test_embeddings_.csv')

X_train_final = train.iloc[:, :-1].to_numpy()
y_train_final = train['label']

X_val = val.iloc[:, :-1].to_numpy()
y_val = val['label']

X_test = test.iloc[:, :-1].to_numpy()
y_test = test['label']


param_grid = {
    'C': [0.01, 1, 10],
    'penalty': ['l2'],
    'max_iter': [1, 10, 100, 200]
}

best_score = -1
best_params = None
best_model = None


for C, penalty, max_iter in product(param_grid['C'], param_grid['penalty'], param_grid['max_iter']):
    solver = 'lbfgs'
    try:
        pipeline = Pipeline([
            ('scaler', StandardScaler()),
            ('clf', LogisticRegression(C=C, penalty=penalty, max_iter=max_iter, solver=solver, random_state=42, class_weight='balanced'))
        ])

        pipeline.fit(X_train_final, y_train_final)
        y_val_pred = pipeline.predict(X_val)
        score = accuracy_score(y_val, y_val_pred)


        if score > best_score:
            best_score = score
            best_params = {'C': C, 'penalty': penalty, 'max_iter': max_iter}
            best_model = pipeline

    except Exception as e:
        print(f"Skipping configuration C={C}, penalty={penalty}, max_iter={max_iter} due to error: {e}")

def evaluate_best_model(model, params, score, X_test, y_test):
    print(f"Best Params: {params}")
    print(f"Validation Best Score: {score}")
    y_test_pred = model.predict(X_test)
    print("\nTest Accuracy:", accuracy_score(y_test, y_test_pred))
    print("\nTest Classification Report:")
    print(classification_report(y_test, y_test_pred))

evaluate_best_model(best_model, best_params, best_score, X_test, y_test)


Best Params: {'C': 0.01, 'penalty': 'l2', 'max_iter': 1}
Validation Best Score: 0.8831189710610933

Test Accuracy: 0.8770418006430868

Test Classification Report:
              precision    recall  f1-score   support

           0       0.49      0.39      0.44       200
           1       0.47      0.53      0.50        89
           2       0.35      0.56      0.43       135
           3       0.85      0.84      0.84      2206
           4       0.10      0.48      0.16        23
           5       0.75      0.64      0.69       387
           6       0.72      0.80      0.76       143
           7       0.97      0.96      0.96      4592

    accuracy                           0.88      7775
   macro avg       0.59      0.65      0.60      7775
weighted avg       0.89      0.88      0.88      7775

