In [None]:
! pip install transformers datasets

In [None]:
import pandas as pd
import numpy as np
import time

import torch
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

import transformers
from transformers import BertTokenizer, BertModel
from transformers import Trainer, TrainingArguments
from transformers import AutoModelForSequenceClassification
from transformers import AutoTokenizer, DataCollatorWithPadding

from datasets import load_metric

In [None]:
torch.manual_seed(1)
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


In [None]:
# Step 1: Load prompt injecction dataset
csv_file = 'cv-prompt-injection-dataset.csv'
dataset = pd.read_csv(csv_file)
dataset = dataset.sample(frac=1, random_state=42)
dataset = dataset.reset_index(drop=True)

dataset_train = dataset.iloc[:int(0.8 * len(dataset))]
dataset_test = dataset.iloc[int(0.8 * len(dataset)):]


In [None]:
# Step 2: Tokenise the dataset
print('Step 2: Tokenise dataset')
tokeniser = AutoTokenizer.from_pretrained('bert-base-uncased')

train_texts = dataset_train['text'].values
train_labels = dataset_train['label'].values
test_texts =  dataset_test['text'].values
test_labels = dataset_test['label'].values

train_encodings = tokeniser(list(train_texts), truncation=True, padding=True)
test_encodings = tokeniser(list(test_texts), truncation=True, padding=True)


In [None]:
# Step 3: Create dataset class and loaders
print('Step 3: Create dataloader')
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)


train_dataset = CustomDataset(train_encodings, train_labels)
test_dataset = CustomDataset(test_encodings, test_labels)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=16, shuffle=False)


In [None]:
# Step 4: Load BERT model
print('Step 4: Load BERT model')
model = AutoModelForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)
model.to(DEVICE)
model.train()


In [None]:
# Step 5: train the model using the Trainer API
print('Step 5: Begin training using Trainer API')
optim = torch.optim.Adam(model.parameters(), lr=0.0001)
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=8,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    evaluation_strategy="epoch",
    learning_rate=0.0001,
    logging_dir='./logs',
    logging_steps=10,
)

metric = load_metric("accuracy")
results_df = pd.DataFrame(columns=["epoch","accuracy","precision","recall","f1"])

def compute_metrics(eval_pred):
    logits, labels = eval_pred # logits are a numpy array, not pytorch tensor
    predictions = np.argmax(logits, axis=-1)
    return metric.compute(predictions=predictions, references=labels)

def evaluate_model(trainer, epoch):

    # Extract predictions and labels
    predictions, labels = trainer.predictions.argmax(axis=1), trainer.label_ids

    # Calculate accuracy
    accuracy = accuracy_score(labels, predictions)

    # Calculate precision, recall, and f1 score
    precision, recall, f1, _ = precision_recall_fscore_support(labels, predictions, average="binary")

    # Append current metrics to results
    global results_df
    results_df.loc[len(results_df)] = [epoch, accuracy, precision, recall, f1]

    # Return
    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1,
    }

trainer = Trainer(
    model=model,
    compute_metrics=lambda p: evaluate_model(p, trainer.state.epoch),
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    optimizers=(optim, None) # optimizer and learning rate scheduler
)


In [None]:
# Training Loop
start_time = time.time()
trainer.train()
print(f'Total Training Time: {(time.time() - start_time)/60:.2f} min')
print(results_df)


In [None]:
# Evaluate performance metrics
trainer.evaluate()


In [None]:
# Test Accuracy
def compute_accuracy(model, data_loader, device):
    with torch.no_grad():
        correct_pred, num_examples = 0, 0

        for batch_idx, batch in enumerate(data_loader):

        ### Prepare data
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids, attention_mask=attention_mask)
            logits = outputs['logits']
            predicted_labels = torch.argmax(logits, 1)
            num_examples += labels.size(0)
            correct_pred += (predicted_labels == labels).sum()

        return correct_pred.float()/num_examples * 100

model.eval()
model.to(DEVICE)
print(f'Test accuracy: {compute_accuracy(model, test_loader, DEVICE):.2f}%')



In [None]:
# Save model to disk
trainer.save_model('./models/llm-prompt-injection-detection-800')
