In [None]:
# Install necessary libraries
!pip install transformers datasets torch scikit-learn pandas

import pandas as pd
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AdamW, get_linear_schedule_with_warmup
from datasets import Dataset
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler
from sklearn.metrics import classification_report
import torch.nn.functional as F
import numpy as np

In [None]:
import pandas as pd
import random
from datasets import load_dataset

# Load the datasets
hackaprompt_dataset = load_dataset('hackaprompt/hackaprompt-dataset')
squad_dataset = load_dataset('rajpurkar/squad')
deepset_dataset = load_dataset('deepset/prompt-injections')
spml_dataset = load_dataset('reshabhs/SPML_Chatbot_Prompt_Injection')

# Prepare prompts from each dataset
injection_prompts = {
    'SPML': [{'text': p, 'label': 1, 'source': 'SPML'} for p in spml_dataset['train']['User Prompt']],
    'Hackaprompt': [{'text': p, 'label': 1, 'source': 'Hackaprompt'} for p in hackaprompt_dataset['train']['prompt']],
    'Deepset': [{'text': p['text'], 'label': 1, 'source': 'Deepset'} for p in deepset_dataset['train'] if p['label'] == 1]
}

benign_prompts = {
    'SPML': [{'text': p, 'label': 0, 'source': 'SPML'} for p in spml_dataset['train']['System Prompt']],
    'SQuAD': [{'text': p, 'label': 0, 'source': 'SQuAD'} for p in squad_dataset['train']['question']],
    'Deepset': [{'text': p['text'], 'label': 0, 'source': 'Deepset'} for p in deepset_dataset['train'] if p['label'] == 0]
}

# Function to sample prompts
def sample_prompts(prompt_dict, n):
    sampled = []
    sources = list(prompt_dict.keys())
    while len(sampled) < n:
        source = random.choice(sources)
        if prompt_dict[source]:
            prompt = random.choice(prompt_dict[source])
            sampled.append(prompt)
            prompt_dict[source].remove(prompt)
        else:
            sources.remove(source)
        if not sources:
            break
    return sampled

# Determine number of prompts to sample
total_prompts = 5000
injection_prompts_count = int(total_prompts * 0.5)
benign_prompts_count = total_prompts - injection_prompts_count

# Sample prompts
sampled_injection_prompts = sample_prompts(injection_prompts, injection_prompts_count)
sampled_benign_prompts = sample_prompts(benign_prompts, benign_prompts_count)

# If we don't have enough prompts, sample more from any source
while len(sampled_injection_prompts) < injection_prompts_count:
    source = random.choice(list(injection_prompts.keys()))
    if injection_prompts[source]:
        sampled_injection_prompts.append(injection_prompts[source].pop())

while len(sampled_benign_prompts) < benign_prompts_count:
    source = random.choice(list(benign_prompts.keys()))
    if benign_prompts[source]:
        sampled_benign_prompts.append(benign_prompts[source].pop())

# Combine and shuffle the final dataset
sampled_prompts = sampled_injection_prompts + sampled_benign_prompts
random.shuffle(sampled_prompts)

# Convert to DataFrame
df = pd.DataFrame(sampled_prompts)

# Verify the counts and ratio
injection_count = sum(1 for item in sampled_prompts if item['label'] == 1)
benign_count = sum(1 for item in sampled_prompts if item['label'] == 0)

print(f"Number of injection prompts: {injection_count}")
print(f"Number of benign prompts: {benign_count}")
print(f"Percentage of injection prompts: {injection_count / total_prompts * 100:.2f}%")

# Save the DataFrame as a CSV file
df[['text', 'label']].to_csv('updated_prompt_injections.csv', index=False)

print("Updated dataset saved as 'updated_prompt_injections.csv'")

# Display the first few rows of the dataset
print("\nFirst few rows of the dataset:")
print(df.head())

# Count prompts from each source
for source in set(item['source'] for item in sampled_prompts):
    count = sum(1 for item in sampled_prompts if item['source'] == source)
    print(f"Prompts from {source}: {count}")

# Print label distribution for each source
print("\nLabel distribution for each source:")
for source in set(item['source'] for item in sampled_prompts):
    injection_count = sum(1 for item in sampled_prompts if item['source'] == source and item['label'] == 1)
    benign_count = sum(1 for item in sampled_prompts if item['source'] == source and item['label'] == 0)
    print(f"{source}: Injection = {injection_count}, Benign = {benign_count}")

In [None]:
import pandas as pd
from datasets import Dataset
from transformers import AutoTokenizer
from sklearn.model_selection import train_test_split

# Load custom dataset from CSV
custom_dataset = pd.read_csv('updated_prompt_injections.csv')

# Ensure the 'text' column contains strings
custom_dataset['text'] = custom_dataset['text'].astype(str)

# Convert to Hugging Face Dataset
dataset = Dataset.from_pandas(custom_dataset)

# Split the dataset into train and test sets
# Using a 50-50 split as per your code
train_dataset, test_dataset = train_test_split(dataset, test_size=0.5, random_state=42)

# Convert split datasets back to Hugging Face Datasets
train_dataset = Dataset.from_pandas(pd.DataFrame(train_dataset))
test_dataset = Dataset.from_pandas(pd.DataFrame(test_dataset))

# Load tokenizer
model_name = 'albert-base-v2'
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Tokenize function
def tokenize_function(examples):
    return tokenizer(examples['text'], truncation=True, padding='max_length')

# Apply tokenize function
train_dataset = train_dataset.map(tokenize_function, batched=True)
test_dataset = test_dataset.map(tokenize_function, batched=True)

# Remove unneeded columns and set format for PyTorch
columns_to_return = ['input_ids', 'attention_mask', 'label']
train_dataset.set_format(type='torch', columns=columns_to_return)
test_dataset.set_format(type='torch', columns=columns_to_return)

# Print some information about the datasets
print(f"Train dataset size: {len(train_dataset)}")
print(f"Test dataset size: {len(test_dataset)}")
print(f"Sample from train dataset: {train_dataset[0]}")
print(f"Sample from test dataset: {test_dataset[0]}")

In [None]:
import optuna
import numpy as np
from transformers import AlbertForSequenceClassification, AlbertTokenizer, AlbertConfig
import torch
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler
from torch.optim import AdamW
from torch.cuda.amp import GradScaler, autocast
from transformers import get_linear_schedule_with_warmup
from sklearn.metrics import classification_report, accuracy_score
import time

def measure_time(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{func.__name__} took {end_time - start_time:.2f} seconds")
        return result
    return wrapper

@measure_time
def train_and_evaluate(params, train_dataset, test_dataset):
    model_name = "albert-base-v2"
    tokenizer = AlbertTokenizer.from_pretrained(model_name)
    config = AlbertConfig.from_pretrained(model_name)
    config.num_labels = 2  # Binary classification
    model = AlbertForSequenceClassification.from_pretrained(model_name, config=config)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    eval_batch_size = 32
    seed = 42
    betas = (0.9, 0.999)
    epsilon = 1e-08
    lr_scheduler_warmup_steps = 500

    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

    train_dataloader = DataLoader(train_dataset, sampler=RandomSampler(train_dataset), batch_size=params['train_batch_size'])
    test_dataloader = DataLoader(test_dataset, sampler=SequentialSampler(test_dataset), batch_size=eval_batch_size)

    optimizer = AdamW(model.parameters(), lr=params['learning_rate'], betas=betas, eps=epsilon)

    num_epochs = params['num_epochs']
    total_steps = len(train_dataloader) * num_epochs // params['accumulation_steps']
    scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=lr_scheduler_warmup_steps, num_training_steps=total_steps)

    scaler = GradScaler()

    for epoch in range(num_epochs):
        total_loss = 0
        model.train()
        for step, batch in enumerate(train_dataloader):
            inputs = {k: v.to(device) for k, v in batch.items() if k != 'label'}
            labels = batch['label'].to(device)

            optimizer.zero_grad()

            with autocast(enabled=torch.cuda.is_available()):
                outputs = model(**inputs, labels=labels)
                loss = outputs.loss
                loss = loss / params['accumulation_steps']

            scaler.scale(loss).backward()

            if (step + 1) % params['accumulation_steps'] == 0:
                scaler.step(optimizer)
                scaler.update()
                scheduler.step()
                optimizer.zero_grad()

            total_loss += loss.item() * params['accumulation_steps']  # Undo the division

        avg_train_loss = total_loss / len(train_dataloader)
        print(f"Epoch: {epoch + 1}, Loss: {avg_train_loss}")

    model.eval()
    all_labels = []
    all_preds = []

    for batch in test_dataloader:
        inputs = {k: v.to(device) for k, v in batch.items() if k != 'label'}
        labels = batch['label'].to(device)

        with torch.no_grad():
            with autocast(enabled=torch.cuda.is_available()):
                outputs = model(**inputs, labels=labels)
                logits = outputs.logits

        preds = torch.argmax(logits, dim=1).cpu().numpy()
        labels = labels.cpu().numpy()

        all_preds.extend(preds)
        all_labels.extend(labels)

    accuracy = accuracy_score(all_labels, all_preds)
    report = classification_report(all_labels, all_preds)

    return accuracy, report

def objective(trial):
    params = {
        'learning_rate': trial.suggest_loguniform('learning_rate', 1e-6, 1e-3),
        'train_batch_size': trial.suggest_categorical('train_batch_size', [4, 8, 16, 32]),
        'accumulation_steps': trial.suggest_int('accumulation_steps', 1, 8),
    }

    max_epochs = 3
    best_accuracy = 0

    for epoch in range(max_epochs):
        params['num_epochs'] = epoch + 1
        accuracy, _ = train_and_evaluate(params, train_dataset, test_dataset)

        trial.report(accuracy, epoch)

        if trial.should_prune():
            raise optuna.TrialPruned()

        if accuracy > best_accuracy:
            best_accuracy = accuracy

    return best_accuracy

start_time = time.time()

print("Starting the process...")

baseline_params = {
    'learning_rate': 1e-3,
    'train_batch_size': 2,
    'num_epochs': 3,
    'accumulation_steps': 1
}

print("Evaluating baseline model...")
baseline_accuracy, baseline_report = train_and_evaluate(baseline_params, train_dataset, test_dataset)

print(f"\nBaseline model accuracy: {baseline_accuracy:.4f}")
print("\nBaseline Classification Report:")
print(baseline_report)

study = optuna.create_study(
    study_name="Hyperband",
    direction="maximize",
    sampler=optuna.samplers.TPESampler(),
    pruner=optuna.pruners.HyperbandPruner(
        min_resource=1,
        max_resource=5,
        reduction_factor=3
    )
)

n_trials = 5

print("\nRunning Hyperband optimization...")
optimization_start_time = time.time()
study.optimize(objective, n_trials=n_trials)
optimization_end_time = time.time()
print(f"Optimization took {optimization_end_time - optimization_start_time:.2f} seconds")

print("\nHyperband optimization results:")
print(f"  Best value: {study.best_value:.4f}")
print("  Best params:")
for key, value in study.best_params.items():
    print(f"    {key}: {value}")

values = [t.value for t in study.trials if t.value is not None]
print(f"  Mean: {np.mean(values):.4f}")
print(f"  Std: {np.std(values):.4f}")

best_params = study.best_params.copy()
best_params['num_epochs'] = 3
print("\nEvaluating best model...")
best_accuracy, best_report = train_and_evaluate(best_params, train_dataset, test_dataset)

print('\nBest Model Classification Report:')
print(best_report)

print("\nComparison:")
print(f"Baseline accuracy: {baseline_accuracy:.4f}")
print(f"Best model accuracy: {best_accuracy:.4f}")
print(f"Improvement: {best_accuracy - baseline_accuracy:.4f}")

end_time = time.time()
print(f"\nTotal execution time: {end_time - start_time:.2f} seconds")