# Classification of Bug and Enhancement Reports with RoBERTa

This notebook demonstrates how to train a RoBERTa model for bug and enhancement report classification using the Hugging Face `transformers` framework.

## Setup

In [None]:
%pip install pandas
%pip install numpy
%pip install matplotlib
%pip install torch torchvision
%pip install transformers
%pip install scikit-learn
%pip install accelerate
%pip install imbalanced-learn
%pip install sentence-transformers
%pip install alibi-detect

## Importing Libraries

Let's start by importing all the libraries needed for our project.

In [1]:
import os
import zipfile
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import torch
import yaml
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset
from transformers import AutoModelForSequenceClassification, AutoTokenizer, Trainer, TrainingArguments
from sentence_transformers import SentenceTransformer, util
from alibi_detect.cd import KSDrift
from alibi_detect.cd.tensorflow import UAE

## Choice of Dataset and Parameters
Write the parameters in the config.yaml file.

Enabling undersampling is recommended if the dataset is unbalanced for Bugs and Enhancements

In [None]:
config_path = './config.yaml'

with open(config_path, 'r') as file:
    config = yaml.safe_load(file)

dataset_path = config['dataset_path']
start_year_train = config['start_year_train']
end_year_train = config['end_year_train']
last_year_test = config['last_year_test']
undersampling_flag = config['undersampling_flag']

## Setup Directories

In [3]:
# Extracting the dataset name from the dataset_path
dataset_name = os.path.basename(dataset_path).split('.')[0]

# Defining the results and model save paths using the dataset name
results_path = f'./RESULTS/{dataset_name}'

model_save_path = os.path.join(results_path, "model")

if not os.path.exists(results_path):
    os.makedirs(results_path)
    
if not os.path.exists(model_save_path):
    os.makedirs(model_save_path)

## Data Preparation

We load the data from the CSV file and prepare it for training and evaluation. 
We filter the data to include only those between 2000 and 2007 and correctly label them as Bug (0) or Enhancement (1).

In [None]:
# Load the dataset
df_all = pd.read_csv(dataset_path)
df_all['date'] = pd.to_datetime(df_all['date'], errors='coerce', format='%Y-%m-%dT%H:%M:%S.%f+0000')
df_all = df_all[df_all['label'].isin(['Bug', 'Enhancement'])]

# Filter data for training and validation (start_year_train - end_year_train)
df_train_val = df_all[(df_all['date'].dt.year >= start_year_train) & (df_all['date'].dt.year <= end_year_train)]
df_train_val['text'] = df_train_val['title'] + " " + df_train_val['body']
df_train_val['labels'] = df_train_val['label'].apply(lambda x: 1 if x == 'Enhancement' else 0)

## Data Division in Train e Validation
We split the data in train (70%) and validation (30%)

In [None]:
train_df, validation_df = train_test_split(df_train_val, test_size=0.3, random_state=42, stratify=df_train_val['labels'])

## Undersampling

This code initializes a RandomUnderSampler, fits it to the training data, and creates a new dataframe (train_df_resampled) with the undersampled data.
You can then use train_df_resampled to train the model instead of train_df.

In [1]:
if undersampling_flag:
    from imblearn.under_sampling import RandomUnderSampler

    # Initialize the RandomUnderSampler
    rus = RandomUnderSampler(random_state=42)

    # Resample the training data
    x_train_resampled, y_train_resampled = rus.fit_resample(train_df.drop('labels', axis=1), train_df['labels'])

    # Reconstruct the training dataframe with the resampled data
    train_df = pd.concat([x_train_resampled, y_train_resampled], axis=1)


## Creation of the Dataset

We define a `CustomDataset` class to prepare the data for training with BERT.

In [None]:
class CustomDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_token_len=512):
        self.tokenizer = tokenizer
        self.texts = texts
        self.labels = labels
        self.max_token_len = max_token_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        if pd.isna(text):
            text = ""  # Replaces NaN values ​​with empty strings
        labels = self.labels[idx]

        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_token_len,
            return_token_type_ids=False,
            padding="max_length",
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(labels, dtype=torch.long)
        }

## Choice of Model

In [None]:
model_name = "roberta-base"

## Tokenization

We use the BERT tokenizer to convert text into tokens that the model can understand.

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_name)
train_dataset = CustomDataset(train_df['text'].to_numpy(), train_df['labels'].to_numpy(), tokenizer)
validation_dataset = CustomDataset(validation_df['text'].to_numpy(), validation_df['labels'].to_numpy(), tokenizer)

## Model Training

We configure and train the RoBERTa model for classification.
The training parameters `num_train_epochs`, `batch_size`, `weight_decay`, `learning_rate`, and `adam_epsilon` were carefully selected based on the recommendations provided in a recent study.

Additional `TrainingArguments` parameters such as `load_best_model_at_end`, `metric_for_best_model`, and `greater_is_better` provide advanced, automated control over the training process, allowing for optimal model selection and saving.

In [None]:
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    report = classification_report(labels, preds, output_dict=True)
    return report
    
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)
training_args = TrainingArguments(
    output_dir=results_path,          # Directory where to save the trained models
    num_train_epochs=4,              # Total number of training epochs
    per_device_train_batch_size=32,  # Batch size for training
    per_device_eval_batch_size=32,   # Batch size for evaluation
    warmup_steps=500,                # Number of warmup steps
    weight_decay=0.01,               # Weight decay if applicable
    logging_dir='./logs',            # Directory where to save logs
    evaluation_strategy="epoch",     # Evaluation strategy to adopt during training
    save_strategy="epoch",           # Save the model at the end of each epoch
    learning_rate=2e-5,              # Learning rate specified in the paper
    adam_epsilon=1e-8,               # Can be "no", "steps", or "epoch"
    eval_steps=100,                  # Number of training steps between two evaluations
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=validation_dataset,
    compute_metrics=compute_metrics,
)

trainer.train()

## Save the Model

In [None]:
# Save the model and tokenizer
model.save_pretrained(model_save_path)
tokenizer.save_pretrained(model_save_path)

print(f"Model saved to {model_save_path}")

## Test Year by Year
To test the model on data from subsequent years, one at a time, we load the data for each year after 2007 and evaluate the model on them.

In [None]:
results_by_year = {}

def evaluate_model_for_year(model, tokenizer, year, df):
    test_df = df[df['date'].dt.year == year]
    if test_df.empty:
        print(f"No data for year {year}")
        return None
    
    test_df = test_df.copy()
    test_df['text'] = test_df['title'] + " " + test_df['body']
    test_df['labels'] = test_df['label'].apply(lambda x: 1 if x == 'Enhancement' else 0)
    
    test_dataset = CustomDataset(test_df['text'].to_numpy(), test_df['labels'].to_numpy(), tokenizer)
    predictions = trainer.predict(test_dataset)
    metrics = compute_metrics(predictions)
    
    return metrics

def evaluate_model_for_range(model, tokenizer, start_year, end_year, df):
    test_df = df[(df['date'].dt.year >= start_year) & (df['date'].dt.year <= end_year)]
    if test_df.empty:
        print(f"No data for range {start_year}-{end_year}")
        return None
    
    test_df = test_df.copy()
    test_df['text'] = test_df['title'] + " " + test_df['body']
    test_df['labels'] = test_df['label'].apply(lambda x: 1 if x == 'Enhancement' else 0)
    
    test_dataset = CustomDataset(test_df['text'].to_numpy(), test_df['labels'].to_numpy(), tokenizer)
    predictions = trainer.predict(test_dataset)
    metrics = compute_metrics(predictions)
    
    return metrics

# Evaluation for the range start_year_train - end_year_train
print(f"Testing the year range {start_year_train} - {end_year_train} ...")
range_metrics = evaluate_model_for_range(model, tokenizer, start_year_train, end_year_train, df_train_val)
if range_metrics:
    results_by_year[f"{start_year_train}-{end_year_train}"] = range_metrics

# Evaluation for each subsequent year
for year in range(end_year_train + 1, last_year_test):
    print(f"Testing the year {year}...")
    year_metrics = evaluate_model_for_year(model, tokenizer, year, df_all)
    if year_metrics:
        results_by_year[year] = year_metrics

## Definition of methods for generating plots
We define a parametric method for generating plots with respect to the desired metrics

In [33]:
def plot_metrics(results, metrics, title, start_ylim=None, end_ylim=None):
    plt.figure(figsize=(10, 6))
    years = list(results.keys())
    
    if start_ylim is not None and end_ylim is not None and start_ylim < end_ylim:
        plt.ylim(start_ylim, end_ylim)
    
    for metric in metrics:
        if metric == 'accuracy':
            values = [results[year][metric] for year in years]
        else:
            values = [results[year]['macro avg'][metric] for year in years]
        plt.plot(years, values, label=metric.capitalize(), marker='o')
    
    plt.title(title)
    plt.xlabel('Year')
    plt.ylabel('Score')
    plt.legend()
    plt.grid(True)
    plt.xticks(rotation=45)
    plt.tight_layout()
    
    # Save the plot to file
    file_name = f"{results_path}/{title.replace(' ', '_').lower()}.png"
    plt.savefig(file_name)
    print(f"Plot saved: {file_name}")
    
    plt.show()
    
def plot_class_metrics(results, classes, metrics, title_prefix, start_ylim=None, end_ylim=None):
    years = list(results.keys())

    for index, class_name in enumerate(classes):
        plt.figure(figsize=(10, 6))

        for metric in metrics:
            metric_values = [results[year][str(index)][metric] for year in years if str(index) in results[year]]
            plt.plot(years, metric_values, label=f'{metric} ({class_name})', marker='o')

        plt.title(f'{title_prefix} for {class_name}')
        plt.xlabel('Year')
        plt.ylabel('Score')
        plt.legend()
        plt.grid(True)
        plt.xticks(rotation=45)
        if start_ylim is not None and end_ylim is not None:
            plt.ylim(start_ylim, end_ylim)
        plt.tight_layout()

        # Save the plot to file
        file_name = f"{results_path}/{title_prefix.replace(' ', '_').lower()}_{class_name.lower()}.png"
        plt.savefig(file_name)
        print(f"Plot saved: {file_name}")
        
        plt.show()

## Plot Printing [1]
Precision, Recall, F1-Score, Accuracy

In [None]:
# Print Plot: Precision and Recall by Year (ylim: 0-1)
plot_metrics(results_by_year, ['precision', 'recall'], 'Precision and Recall by Year', 0, 1)
# Print Plot: F1 Score by Year (ylim: None-None)
plot_metrics(results_by_year, ['precision', 'recall'], 'Precision and Recall by Year')

In [None]:
# Print Plot: F1 Score and Accuracy by Year (ylim: 0-1)
plot_metrics(results_by_year, ['f1-score', 'accuracy'], 'F1 Score and Accuracy by Year', 0, 1)
# Print Plot: F1 Score and Accuracy by Year (ylim: None-None)
plot_metrics(results_by_year, ['f1-score', 'accuracy'], 'F1 Score and Accuracy by Year')

## Plot Printing [2]
Metrics by class

In [None]:
classes = ['Bug', 'Enhancement'] 
metrics = ['precision', 'recall']
# Print Plot: Class Metrics for each class (ylim: 0-1)
plot_class_metrics(results_by_year, classes, metrics, 'Class Metrics', 0, 1)
# Print Plot: Class Metrics for each class (ylim: None-None)
plot_class_metrics(results_by_year, classes, metrics, 'Class Metrics')

## Measure of Corpus Variability
It measures how similar the embeddings (vector representations of text) of two data sets (for example, the training set and the test set for a given year) are to each other. A higher value indicates greater similarity, which may suggest that the two datasets have a similar linguistic distribution or cover related topics.

In [None]:
model = SentenceTransformer("all-MiniLM-L6-v2")

# Initialize a list to store the annual average similarity
yearly_similarity = []

# Calculation of cosine similarity for each year of testing
for year in range(end_year_train + 1, last_year_test + 1):
    test_df = df_all[df_all['date'].dt.year == year]

    if test_df.empty:
        print(f"No data for year {year}")
        yearly_similarity.append(None)  # Adds None for years with no data
        continue
    
    # Handle NaN with empty strings
    test_df['title'].fillna('', inplace=True)
    test_df['body'].fillna('', inplace=True)
    train_df['title'].fillna('', inplace=True)
    train_df['body'].fillna('', inplace=True)

    # Prepare the texts
    test_texts = test_df['title'] + " " + test_df['body']
    train_texts = train_df['title'] + " " + train_df['body']
    
    # Compute embeddings
    train_embeddings = model.encode(train_texts.tolist(), convert_to_tensor=True, show_progress_bar=True)
    test_embeddings = model.encode(test_texts.tolist(), convert_to_tensor=True, show_progress_bar=True)
    
    # Calculate cosine similarity
    similarity_matrix = util.pytorch_cos_sim(train_embeddings, test_embeddings)
    yearly_similarity.append(similarity_matrix.mean().item())

# Plot the average cosine similarity by year
plt.figure(figsize=(10, 6))
years = range(end_year_train + 1, last_year_test + 1)
plt.plot(years, yearly_similarity, marker='o')
plt.title('Average Cosine Similarity per Year')
plt.xticks(years)
plt.xlabel('Year')
plt.ylabel('Average Cosine Similarity')
plt.grid(True)
# Save the plot to a file
plot_file_name = f"{results_path}/cosine_similarity_year.png"
plt.savefig(plot_file_name)
print(f"Plot saved: {plot_file_name}")
plt.show()

## Drift Detection
We'll use the alibi-detect library to implement drift detection on text data processed by RoBERTa model.

In [None]:
# Load all-MiniLM-L6-v2 model
model = SentenceTransformer("all-MiniLM-L6-v2")

def calculate_embeddings(texts):
    return model.encode(texts, batch_size=32, show_progress_bar=True)

# Embeddings for the training data (Handle NaN with empty strings)
train_texts = train_df['title'].fillna('') + " " + train_df['body'].fillna('')
train_embeddings = calculate_embeddings(train_texts.tolist())

# Autoencoder
enc_dim = 32
shape = train_embeddings.shape[1:]
uae = UAE(shape=shape, enc_dim=enc_dim)

# Initialize KSDrift detector
ks_drift = KSDrift(train_embeddings, p_val=0.05)

# Dictionary to store drift detection results
drift_results = {}

# Check for drift in each year
for year in range(end_year_train + 1, last_year_test + 1):
    test_df = df_all[df_all['date'].dt.year == year].copy()
    if test_df.empty:
        print(f"No data for year {year}")
        continue

    # Prepare test data
    test_texts = test_df['title'].fillna('') + " " + test_df['body'].fillna('')
    test_embeddings = calculate_embeddings(test_texts.tolist())

    # Perform drift detection
    preds = ks_drift.predict(test_embeddings)
    drift_results[year] = {'data_drift': preds['data']['is_drift'], 'p_value': preds['data']['p_val']}

# Convert the drift results dictionary to a DataFrame for plotting
df_drift_results = pd.DataFrame.from_dict(drift_results, orient='index', columns=['data_drift', 'p_value'])
df_drift_results.index.name = 'Year'
df_drift_results.reset_index(inplace=True)
df_drift_results['p_value_mean'] = df_drift_results['p_value'].apply(np.mean)

# Plotting
plt.figure(figsize=(10, 6))
plt.plot(df_drift_results['Year'], df_drift_results['p_value_mean'], marker='o')
plt.title('Data Drift Detection Results')
plt.xlabel('Year')
plt.ylabel('Data Drift Detected')
plt.xticks(df_drift_results['Year'], rotation=45)
plt.tight_layout()

# Save the plot
plot_file_name = f"{results_path}/drift_detection_results.png"
plt.savefig(plot_file_name)
print(f"Plot saved: {plot_file_name}")
plt.show()

## Archiving of Results

We compress and save the training results exluding checkpoints.

In [None]:
def zip_results(results_dir=results_path, zip_name='results.zip'):
    with zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, dirs, files in os.walk(results_dir):
            dirs[:] = [d for d in dirs if 'checkpoint' not in d]
            for file in files:
                if 'checkpoint' not in root:
                    file_path = os.path.join(root, file)
                    zipf.write(file_path, os.path.relpath(file_path, start=os.path.join(results_dir, '..')))
    print(f"Results archived in {zip_name}")

# Call the function to create the zip archive
zip_results()