## Libraries

In [None]:
import pandas as pd
import numpy as np
import torch
import random
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
from sklearn.utils import resample
import seaborn as sns

from datasets import Dataset
from transformers import (
    AutoTokenizer, AutoModelForSequenceClassification,
    TrainingArguments, Trainer, DataCollatorWithPadding,
    EarlyStoppingCallback
)
import evaluate
import pandas as pd
import torch
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments

from transformers import Trainer
from transformers import Trainer, TrainingArguments
import numpy as np
import evaluate

import re
from urllib.parse import unquote, urlparse
import string
from nltk.corpus import stopwords
from urllib.parse import urlparse
from nltk.tokenize import word_tokenize
import nltk
nltk.download('stopwords')
nltk.download('punkt_tab')




## Text Preprocessing Functions

In [None]:

def replace_url(match):
    url = match.group(0)
    parsed_url = urlparse(url)
    domain = parsed_url.netloc.replace('www.', '').replace('.com', '').replace('.org', '').replace('.net', '')
    path = unquote(parsed_url.path)

    tokens = re.split(r'[/@]', path)
    tokens = [t for t in tokens if t and len(t) < 20 and not re.match(r'^[0-9a-f]{16,}$', t, re.IGNORECASE)]

    if '@' in url:
        email_domain = re.findall(r'@<*(.*?)>*$', url)
        if email_domain:
            tokens += email_domain[0].split('.')

    tokens = list(dict.fromkeys(tokens))
    tokens = [re.sub(r'[^a-z]', ' ', token.lower()) for token in tokens if token.isalnum() or token.isalpha()]
    tokens = [token for token in tokens if token]

    return 'url of ' + ' '.join([domain] + tokens)

def preprocess_bug_report(text):
    if pd.isnull(text):
        return ""

    # 1. Lowercase
    text = text.lower()

    # 2. Format URLs
    text = re.sub(r'https?://[^\s]+', replace_url, text)

    # 3. Normalize config keys and code-like tokens
    text = re.sub(r'\{\{([^}]+)\}\}', r'\1', text)  # {{config.property}} → config.property
    text = re.sub(r'\b[a-zA-Z_]+\.[a-zA-Z0-9_.]+\b', lambda m: m.group(0).replace('.', ' '), text)  # keep config.package.class names as token sequences

    # 4. Remove punctuation except for useful symbols in code (e.g. underscore, parentheses, dot)
    allowed = set('_().')
    text = ''.join(ch if ch.isalnum() or ch.isspace() or ch in allowed else ' ' for ch in text)

    # 5. Tokenize and remove stopwords (but keep code-relevant terms)
    stop_words = set(stopwords.words('english'))
    tokens = word_tokenize(text)
    tokens = [tok for tok in tokens if tok not in stop_words and (tok.isalpha() or re.match(r'[a-zA-Z_]+\(\)', tok))]

    return ' '.join(tokens)


## DATASET, loading, balancing, and preprocessing

In [None]:

def clean_dataframe(df, text_col="text", label_col="label", allowed_classes=[0, 1]):
    df[text_col] = df[text_col].apply(lambda x: x if pd.notnull(x) and str(x).strip() != "" else None)
    df.dropna(subset=[text_col, label_col], inplace=True)
    df = df[df[label_col].isin(allowed_classes)]
    return df

In [None]:
file_path = 'Hbase_DE - v.01.csv'

df = pd.read_csv(file_path, names=['text', 'label'], dtype={0: str, 1: int}, header=None)

# Clean the dataframe
df = clean_dataframe(df, text_col='text', label_col='label', allowed_classes=[0, 1])

print("Sample of loaded text:")
print('\n',df['text'].iloc[150])
print('\n',df['text'].iloc[66])
print('\n',df['text'].iloc[222])
print("\nTotal samples:", len(df))
print("\nClass distribution:\n", df['label'].value_counts())


In [None]:

# Apply preprocessing to text
df['text'] = df['text'].apply(preprocess_bug_report_4)

print("\nSample of preprocessed text:")
print('\n',df['text'].iloc[15])
print('\n',df['text'].iloc[66])
print('\n',df['text'].iloc[2222])
print("\nTotal samples:", len(df))
print("\nClass distribution:\n", df['label'].value_counts())


In [None]:
print(f"Total Samples: {len(df)}")
print("Class Distribution:\n", df['label'].value_counts())

# Separate majority and minority classes
df_majority = df[df.label == 0]
df_minority = df[df.label == 1]

# Upsample minority class
df_minority_upsampled = resample(
    df_minority,
    replace=True,               # sample with replacement
    n_samples=len(df_majority), # match majority count
    random_state=42
)

# Combine majority and upsampled minority
df_balanced = pd.concat([df_majority, df_minority_upsampled]).sample(frac=1).reset_index(drop=True)


print(f"\n Total Samples of balanced data: {len(df_balanced)}")
print("\n Class Distribution of balanced:\n", df_balanced['label'].value_counts())

## model loading

In [None]:
from transformers import AutoConfig, AutoModelForSequenceClassification
from transformers import AutoModelForSequenceClassification, AutoTokenizer

# load the model and tokenizer
model_ckpt = "microsoft/codebert-base"
model = AutoModelForSequenceClassification.from_pretrained(model_ckpt, num_labels=2)

# Load model and tokenizer from saved directory
#loaded_model = AutoModelForSequenceClassification.from_pretrained("models/codebert-exp2-model_1")
#loaded_tokenizer = AutoTokenizer.from_pretrained("models/codebert-exp2-model_1")

# Optional: Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#loaded_model = loaded_model.to(device)
model = model.to(device)


## direct model inference

In [None]:

# Test the loaded model on a sample text
#def predict_bug_severity(text):
    # Preprocess the text
 #   text = preprocess_bug_report(text)
    
    # Tokenize
  #  inputs = loaded_tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512)
  #  inputs = {k: v.to(device) for k, v in inputs.items()}
    
    # Get prediction
   # with torch.no_grad():
    #    outputs = loaded_model(**inputs)
     #   prediction = torch.argmax(outputs.logits, dim=1).item()
    
    #return "Critical Bug" if prediction == 1 else "Non-Critical Bug"

# Test on a sample
#sample_text = "There is a null pointer exception in the cache server causing system crash"
#print(f"Prediction for sample text: {predict_bug_severity(sample_text)}")

## tokenizer, dataloader

In [None]:
import pandas as pd
import torch
from torch.utils.data import Dataset
from transformers import AutoTokenizer
from sklearn.model_selection import train_test_split


# ---- Step 2: Train-Validation Split ----
train_texts, val_texts, train_labels, val_labels = train_test_split(
    df_balanced["text"].tolist(),
    df_balanced["label"].tolist(),
    test_size=0.1,
    stratify=df_balanced["label"],
    random_state=42
)

# ---- Step 3: Tokenization for CodeBERT ----
model_ckpt = "microsoft/codebert-base"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)

# Use batch tokenization with consistent max_length
train_encodings = tokenizer(train_texts, truncation=True, padding="max_length", max_length=512)
val_encodings = tokenizer(val_texts, truncation=True, padding="max_length", max_length=512)

# ---- Step 4: Custom PyTorch Dataset ----
class BugReportDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item["labels"] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

# ---- Step 5: Dataset Objects ----
train_dataset = BugReportDataset(train_encodings, train_labels)
val_dataset = BugReportDataset(val_encodings, val_labels)




## evaluation metrics

In [None]:
import numpy as np
import evaluate

accuracy = evaluate.load("accuracy")
f1 = evaluate.load("f1")
precision = evaluate.load("precision")
recall = evaluate.load("recall")

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    # T5 returns a tuple, we want logits which is the first element
    if isinstance(predictions, tuple):
        predictions = predictions[0]
    # Get the most likely prediction for each sequence
    predictions = np.argmax(predictions, axis=-1)
    # Flatten both predictions and labels t;o 1D arrays
    predictions = predictions.flatten()
    labels = labels.flatten()

    return {
        "accuracy": accuracy.compute(predictions=predictions, references=labels)["accuracy"],
        "f1": f1.compute(predictions=predictions, references=labels, average="weighted")["f1"],
        "precision": precision.compute(predictions=predictions, references=labels, average="weighted")["precision"],
        "recall": recall.compute(predictions=predictions, references=labels, average="weighted")["recall"],
    }

## training args

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="steps",        # Changed from evaluation_strategy to eval_strategy
    eval_steps=500,
    save_steps=500,
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=50,
    weight_decay=0.01,
    label_smoothing_factor=0.1,
    max_grad_norm=1.0,
    lr_scheduler_type="cosine",
    warmup_steps=500,
    logging_dir="./logs",
    logging_steps=100,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    greater_is_better=True,
    remove_unused_columns=True,    # Added to ensure proper column handling
    dataloader_drop_last=False,    # Added to handle last batch
    overwrite_output_dir=True      # Added to allow overwriting existing results
)


## initialization

In [None]:

from transformers import EarlyStoppingCallback

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics
)


## training & validation

In [None]:
trainer.train()

trainer.evaluate()


mODEL RELOAD:from transformers import RobertaForSequenceClassification, RobertaTokenizer

model = RobertaForSequenceClassification.from_pretrained("./codebert-bug-model")
tokenizer = RobertaTokenizer.from_pretrained("./codebert-bug-model")


# CONFUSION METRIX PLOT

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import seaborn as sns

# Get predictions from the model on validation dataset
predictions = trainer.predict(val_dataset)
preds = predictions.predictions.argmax(-1)
labels = predictions.label_ids

# Calculate confusion matrix
cm = confusion_matrix(labels, preds)

# Create a prettier confusion matrix plot using seaborn
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Non-Critical', 'Critical'],
            yticklabels=['Non-Critical', 'Critical'])
plt.title('Confusion Matrix')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.show()

# Print classification metrics
print("\nClassification Report:")
print(f"Accuracy: {predictions.metrics['test_accuracy']:.4f}")
print(f"F1 Score: {predictions.metrics['test_f1']:.4f}")
print(f"Precision: {predictions.metrics['test_precision']:.4f}")
print(f"Recall: {predictions.metrics['test_recall']:.4f}")

## ploting train/val loss

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Assuming trainer.state.log_history is available
log_history = trainer.state.log_history
log_df = pd.DataFrame(log_history)

# Filter train and eval steps separately
train_df = log_df[log_df['loss'].notna()]
eval_df = log_df[log_df['eval_loss'].notna()]

plt.figure(figsize=(10, 5))
plt.plot(train_df["step"], train_df["loss"], label="Train Loss")
plt.plot(eval_df["step"], eval_df["eval_loss"], label="Eval Loss", color="orange")

plt.xlabel("Step")
plt.ylabel("Loss")
plt.title("Training and Validation Loss")
plt.legend()
plt.grid(True)
plt.show()


## LUCKY? SAVE THE MODEL NOW

In [None]:
# Save model and tokenizer
trainer.save_model("./codebert-model")
tokenizer.save_pretrained("./codebert-model")
