In [None]:
# final code for training and ecvaluating the model on the NPE dataset using GPT2 model
import pandas as pd
import torch
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from transformers import GPT2Tokenizer, GPT2ForSequenceClassification, Trainer, TrainingArguments
from torch.utils.data import Dataset
from transformers import EarlyStoppingCallback, TrainerCallback

# Define paths
DATA_PATH = "/root/workspace/npe_project/Dataset/NPEPatches.json"
MODEL_PATH = "/root/workspace/npe_project/GPT"

# Dataset class
class NPECommitDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=512):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encoding = self.tokenizer(
            text,
            max_length=self.max_len,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )
        return {
            "input_ids": encoding["input_ids"].squeeze(0),
            "attention_mask": encoding["attention_mask"].squeeze(0),
            "labels": torch.tensor(label, dtype=torch.long),
        }

# Load data
data = pd.read_csv(DATA_PATH)

# Remove duplicates and handle NaN values
data = data.drop_duplicates(subset=["Patch"])
data = data.dropna(subset=["Category"])
label_mapping = {'NPE': 1, 'Non-NPE': 0}
data["Category"] = data["Category"].map(label_mapping)

# Extract features and labels
X = data["Patch"]
y = data["Category"]

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Tokenizer and Dataset Preparation
tokenizer = GPT2Tokenizer.from_pretrained(MODEL_PATH)

# Add a padding token if not defined
if tokenizer.pad_token is None:
    tokenizer.add_special_tokens({'pad_token': '[PAD]'})
    print("Padding token added.")

# Ensure model is aware of the new token
model = GPT2ForSequenceClassification.from_pretrained(MODEL_PATH, num_labels=2)
model.resize_token_embeddings(len(tokenizer))

# Set the pad_token_id in the model configuration
model.config.pad_token_id = tokenizer.pad_token_id

train_dataset = NPECommitDataset(X_train.tolist(), y_train.tolist(), tokenizer)
test_dataset = NPECommitDataset(X_test.tolist(), y_test.tolist(), tokenizer)

# Training arguments with overfitting control
training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    logging_dir="./logs",
    logging_steps=100,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    greater_is_better=True,
    fp16=True,
    weight_decay=0.01,
    learning_rate=5e-5,
    save_total_limit=1,
)

# Compute metrics
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = torch.argmax(torch.tensor(logits), dim=1)
    accuracy = accuracy_score(labels, predictions)
    precision = precision_score(labels, predictions, average="binary")
    recall = recall_score(labels, predictions, average="binary")
    f1 = f1_score(labels, predictions, average="binary")
    tn, fp, fn, tp = confusion_matrix(labels, predictions).ravel()
    fpr = fp / (fp + tn)
    fnr = fn / (fn + tp)
    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1_score": f1,
        "fpr": fpr,
        "fnr": fnr,
    }

# Metrics logging callback
class MetricsLoggerCallback(TrainerCallback):
    def __init__(self):
        super().__init__()
        self.epoch_accuracies = []
        self.epoch_precisions = []
        self.epoch_recalls = []
        self.epoch_f1_scores = []
        self.epoch_fprs = []
        self.epoch_fnrs = []

    def on_evaluate(self, args, state, control, metrics=None, **kwargs):
        if metrics is not None:
            self.epoch_accuracies.append(metrics.get("eval_accuracy", 0))
            self.epoch_precisions.append(metrics.get("eval_precision", 0))
            self.epoch_recalls.append(metrics.get("eval_recall", 0))
            self.epoch_f1_scores.append(metrics.get("eval_f1_score", 0))
            self.epoch_fprs.append(metrics.get("eval_fpr", 0))
            self.epoch_fnrs.append(metrics.get("eval_fnr", 0))

# Early stopping callback
early_stopping_callback = EarlyStoppingCallback(early_stopping_patience=2)

# Trainer
metrics_logger = MetricsLoggerCallback()
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
    callbacks=[early_stopping_callback, metrics_logger],
)

# Training
trainer.train()

# Evaluation
eval_results = trainer.evaluate()

# Average metrics across epochs
avg_metrics = {
    "Average Accuracy": sum(metrics_logger.epoch_accuracies) / len(metrics_logger.epoch_accuracies),
    "Average Precision": sum(metrics_logger.epoch_precisions) / len(metrics_logger.epoch_precisions),
    "Average Recall": sum(metrics_logger.epoch_recalls) / len(metrics_logger.epoch_recalls),
    "Average F1-Score": sum(metrics_logger.epoch_f1_scores) / len(metrics_logger.epoch_f1_scores),
    "Average FPR": sum(metrics_logger.epoch_fprs) / len(metrics_logger.epoch_fprs),
    "Average FNR": sum(metrics_logger.epoch_fnrs) / len(metrics_logger.epoch_fnrs),
}

# Display Average Metrics
print("Average Metrics:")
for metric, value in avg_metrics.items():
    print(f"{metric}: {value:.4f}")