In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from datasets import load_dataset

from src.agent import *

# Download and save the safe-guard-prompt-injection dataset
raw_dataset = load_dataset("xTRam1/safe-guard-prompt-injection", cache_dir=DATA_DIR)
cleaned_dataset = raw_dataset.map(clean_text)

training_prompts = [row['text'] for row in cleaned_dataset['train']]
training_prompt_lens = [count_words(row['text']) for row in raw_dataset['train']]
training_prompt_labels = [row['label'] for row in cleaned_dataset['train']]

test_prompts = [row['text'] for row in cleaned_dataset['test']]
test_prompt_lens = [count_words(row['text']) for row in raw_dataset['test']]
test_prompt_labels = [row['label'] for row in cleaned_dataset['test']]

print(f"Test/Train size: {len(cleaned_dataset['test'])}/{len(cleaned_dataset['train'])}")

In [None]:
print_duplicates(training_prompts)

In [None]:
train_unique_texts = set(training_prompts)
test_unique_texts = set(test_prompts)

# Find overlaps
overlap = train_unique_texts.intersection(test_unique_texts)
print(f"Number of overlapping samples: {len(overlap)}")

# Print all prompts which exists in both training and test set
# Observations:
for text in overlap:
    for row in raw_dataset['train']:
        if row['text'] == text:
            print(f"From Train - Label: {row['label']} for Overlap Text: {text[:100]}")

    for row in raw_dataset['test']:
        if row['text'] == text:
            print(f"From Test  - Label: {row['label']} for Overlap Text: {text[:100]}")

In [None]:
test_label_percentages = pd.Series(test_prompt_labels).value_counts(normalize=True) * 100
training_label_percentages = pd.Series(training_prompt_labels).value_counts(normalize=True) * 100

plt.figure(figsize=[10,10])
plt.subplot(2, 2, 1)
plt.hist(training_prompt_lens, bins=100)
plt.xlim(0,2000)
plt.xlabel("Words")
plt.ylabel('Frequency')
plt.title('Training Prompt Length Histogram')

plt.subplot(2, 2, 2)
plt.pie(training_label_percentages.values, labels=training_label_percentages.index, autopct='%1.1f%%', startangle=90)
plt.title('Distribution of Classification Training Labels')

plt.subplot(2, 2, 3)
plt.hist(test_prompt_lens, bins=100)
plt.xlim(0,2000)
plt.xlabel("Words")
plt.ylabel('Frequency')
plt.title('Test Prompt Length Histogram')

plt.subplot(2, 2, 4)
plt.pie(test_label_percentages.values, labels=test_label_percentages.index, autopct='%1.1f%%', startangle=90)
plt.title('Distribution of Classification Test Labels')
plt.show()

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report

tfidf_dataset = cleaned_dataset.map(lambda x: {"text": preprocess_spacy(x["text"])})
vectorizer = TfidfVectorizer(max_features=25000)
X_train = vectorizer.fit_transform(tfidf_dataset["train"]["text"])
X_test = vectorizer.transform(tfidf_dataset["test"]["text"])
y_train = cleaned_dataset["train"]["label"]
y_test = cleaned_dataset["test"]["label"]
save_with_pickle(vectorizer, f"{MODEL_DIR}/vectorizer.pkl")

print("TF-IDF shape:", X_train.shape,)  # (num_samples, num_features)

# Train Logistic Regression
clf = LogisticRegression(max_iter=200)
clf.fit(X_train, y_train)

# Evaluate Logistic Regression
y_pred = clf.predict(X_test)
print(classification_report(y_test, y_pred))

misclassified = []
for i, (gt, pred) in enumerate(zip(y_test, y_pred)):
    if gt != pred:
        misclassified.append({
            "prompt": cleaned_dataset["test"]["text"][i],
            "ground_truth": gt,
            "prediction": pred
        })

# Display Mismatched Labels for baseline logistic model
print("Total Misclassified:", len(misclassified))
for i, item in enumerate(misclassified):
    print(f"\n{separator(f"Prompt {i+1} Summary")}\nGround Truth Label: {item['ground_truth']} - Predicted Label: {item['prediction']} - Prompt Word Length: {count_words(item['prompt'])}", f"\n{separator("Prompt")}\n {item['prompt']}")
print("\n", separator())

In [None]:
misclassified_prompt_len = [count_words(item['prompt']) for item in misclassified]
plt.figure(figsize=[5,5])
plt.subplot(1, 1, 1)
plt.hist(misclassified_prompt_len, bins=100)
plt.xlim(0,2000)
plt.xlabel("Words")
plt.ylabel('Frequency')
plt.title('Misclassified Prompt Length Histogram')
plt.show()

In [None]:
from src.evaluate import *

raw_train_valid = cleaned_dataset["train"].train_test_split(test_size=VALIDATION_SPLIT, seed=SEED)
dataset_split = {
    "train": raw_train_valid["train"],
    "validation": raw_train_valid["test"],
    "test": cleaned_dataset["test"]
}
# Load tokenizer and split the dataset into training and validation
bert_tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased', cache_dir=MODEL_DIR)
bert_tokenized_dataset = {
    split: dataset_split[split].map(lambda x: bert_tokenizer(x['text'], truncation=True, max_length=512),
                                    batched=True) for split in dataset_split}

bert_train_dataset = bert_tokenized_dataset["train"]
bert_val_dataset = bert_tokenized_dataset["validation"]
bert_test_dataset = bert_tokenized_dataset["test"]

In [None]:
# Classification Stage
bert_model_path = f"{MODEL_DIR}/distilbert-finetuned-psa-5eps"
# train_distilbert_model(bert_train_dataset, bert_val_dataset, bert_tokenizer, compute_metrics, model_path=bert_model_path, model_dir=MODEL_DIR, epoches=5)
bert_model = fetch_distilbert_model(bert_model_path)

# Run DistilBERT classifier prediction
sliding_y_preds, sliding_y_probs = distilbert_predict(bert_test_dataset, bert_tokenizer, bert_model)

# Show first 10 examples
for i in range(10):
    print(f"Text: {cleaned_dataset['test'][i]['text'][:60]}...")
    print(f"True Label: {cleaned_dataset['test'][i]['label']}, Predicted: {sliding_y_preds[i]}")
    print("-" * 50)

print(classification_report(cleaned_dataset["test"]["label"], sliding_y_preds, target_names=["negative", "positive"]))

In [None]:
print_misclassified(y_test, sliding_y_preds, cleaned_dataset)

In [None]:
ith_row = 0
phi_model, phi_tokenizer = setup_phi2(MODEL_DIR)

In [None]:
import time
start_row = ith_row
results = {"ground_truth":[], "label":[], "score":[], "confidence":[], "fallback_used":[], "explanation":[], "recommendation":[], "prompt":[]}
total_time = 0
exec_count = 0
for i in range(start_row, len(cleaned_dataset['test'])):
    print(separator(f"Test Prompt {i}", width=150))
    data_row = cleaned_dataset['test'][i]

    # Accumulate execution time
    start_time = time.time()
    json_output = run_agents(data_row['text'], decision_label=sliding_y_preds[i], reasoning_agent_info=(phi_model, phi_tokenizer), vectorizer=vectorizer, debug_mode=True)
    total_time += time.time() - start_time
    exec_count += 1
    # Record fields
    results['ground_truth'].append(cleaned_dataset['test']["label"][i])
    results['label'].append(sliding_y_preds[i])
    results['score'].append(sliding_y_probs[i])
    results['confidence'].append(json_output['confidence_score'])
    results['fallback_used'].append(json_output['confidence_score'] <= 0.5)
    results['explanation'].append(json_output['reasoning'])
    results['recommendation'].append(json_output['recommendation'])
    results['prompt'].append(data_row['text'])
    ith_row += 1
print(f"average elapsed time: {total_time/exec_count}")

In [None]:
# Save Result to data
result_df = pd.DataFrame(results)
result_df.to_csv(os.path.join(DATA_DIR, "results.csv"))

In [None]:
result_df[result_df['confidence'] <= 0.3][['ground_truth', 'label', "prompt", 'explanation']]
# filtered_df[filtered_df['label'] != filtered_df['ground_truth']]