In [None]:
# 1. Import Required Libraries
import requests
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datasets import load_dataset
from tqdm import tqdm
from sklearn.calibration import calibration_curve
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification

In [None]:
# 2. Configuration
CONTROL_MODELS = [
    {
        "name": "Control-1",
        "model_name": "qualifire/prompt-injection-sentinel",
        "device": "mps" if torch.backends.mps.is_available() else "cpu"
    },
    {
        "name": "Control-2",
        "model_name": "qualifire/prompt-injection-jailbreak-sentinel-v2",
        "device": "mps" if torch.backends.mps.is_available() else "cpu"
    }
]
PF_SERVER_URL = "http://localhost:1000/analyze"
MAX_SAMPLES = 500
BENCHMARK_DATASET = "qualifire/prompt-injections-benchmark"
BENCHMARK_SPLIT = "test"

In [None]:
# 3. Helper Functions
def parse_label(value):
    if isinstance(value, int): return value
    if isinstance(value, str):
        v = value.lower()
        if v in ["jailbreak", "malicious", "unsafe", "attack", "injection", "1"]: return 1
        if v in ["benign", "safe", "legit", "0"]: return 0
    try: return int(value)
    except: return 0

def get_control_prob(text, tokenizer, model, device):
    try:
        inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512).to(device)
        with torch.no_grad():
            logits = model(**inputs).logits
        probs = torch.softmax(logits, dim=-1)
        return probs[0][1].item()  # probability of class 1 (malicious)
    except Exception as e:
        print(f"Error: {e}")
        return 0.5

def get_pf_prediction(text):
    try:
        resp = requests.post(PF_SERVER_URL, json={"prompt": text}, timeout=5)
        resp.raise_for_status()
        data = resp.json()
        pred_label = int(data.get("is_malicious", 0))
        conf = data.get("confidence")
        if conf is None:
            prob_malicious = 1.0 if pred_label == 1 else 0.0
        else:
            prob_malicious = conf if pred_label == 1 else 1 - conf
        return pred_label, prob_malicious
    except Exception as e:
        print(f"Request failed for text: {text[:30]}... -> {e}")
        return 0, 0.5

In [None]:
# 4. Load Benchmark Dataset
ds = load_dataset(BENCHMARK_DATASET, split=BENCHMARK_SPLIT)
ds = ds.shuffle(seed=42).select(range(min(len(ds), MAX_SAMPLES)))

In [None]:
# 5. Run Inference for Each Model
results = []

# Load and run inference for each control model
for control in CONTROL_MODELS:
    model_name = control['model_name']
    device = control['device']
    print(f"Loading control model: {model_name} on {device}...")
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSequenceClassification.from_pretrained(model_name, torch_dtype=torch.float16)
    model.to(device)
    model.eval()

    print(f"Running inference for {control['name']} (HuggingFace model)...")
    for ex in tqdm(ds, desc=control['name']):
        text = ex.get("text") or ex.get("prompt")
        if not text:
            continue
        label = parse_label(ex["label"])
        prob_malicious = get_control_prob(text, tokenizer, model, device)
        pred_label = 1 if prob_malicious > 0.5 else 0
        confidence = prob_malicious if pred_label == 1 else 1 - prob_malicious
        results.append({
            "model": control['name'],
            "label": label,
            "prediction": pred_label,
            "prob_malicious": prob_malicious,
            "confidence": confidence,
            "is_correct": pred_label == label
        })

# Run inference for PromptForest API
print("Running inference for PromptForest (API)...")
for ex in tqdm(ds, desc="PromptForest"):
    text = ex.get("text") or ex.get("prompt")
    if not text:
        continue
    label = parse_label(ex["label"])
    pred_label, prob_malicious = get_pf_prediction(text)
    confidence = prob_malicious if pred_label == 1 else 1 - prob_malicious
    results.append({
        "model": "PromptForest",
        "label": label,
        "prediction": pred_label,
        "prob_malicious": prob_malicious,
        "confidence": confidence,
        "is_correct": pred_label == label
    })

In [None]:
# 6. Aggregate Results
df = pd.DataFrame(results)
df.head()

In [None]:
# 7. Compute Metrics (ECE, etc.)
from collections import defaultdict
metrics = defaultdict(dict)

for model in df['model'].unique():
    sub = df[df['model'] == model]
    wrong_df = sub[~sub['is_correct']]
    probs = np.array(sub['prob_malicious'])
    labels = np.array(sub['label'])
    prob_true, prob_pred = calibration_curve(labels, probs, n_bins=10)
    ece = np.mean(np.abs(prob_true - prob_pred))
    metrics[model]['ece'] = ece
    metrics[model]['prob_true'] = prob_true
    metrics[model]['prob_pred'] = prob_pred

In [None]:
# 8. Compare Model Results
summary = []
for model in df['model'].unique():
    summary.append({
        'Model': model,
        'ECE': metrics[model]['ece'],
    })
pd.DataFrame(summary).set_index('Model')

In [None]:
# 9. Plot Reliability Diagrams for All Models
plt.figure(figsize=(7, 7))
colors = {"Control-1": "blue", "Control-2": "green", "PromptForest": "orange"}
for model in df['model'].unique():
    color = colors.get(model, None)
    plt.plot(metrics[model]['prob_pred'], metrics[model]['prob_true'], marker='x', label=f"{model} (ECE={metrics[model]['ece']:.3f})", color=color)
plt.plot([0, 1], [0, 1], linestyle='--', color='gray')
plt.xlabel('Mean Predicted Probability')
plt.ylabel('Fraction of Positives')
plt.title('Reliability Diagram (Calibration Curve)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

In [None]:
# 1. Import Required Libraries
import requests
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datasets import load_dataset
from tqdm import tqdm
from sklearn.calibration import calibration_curve
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification

In [None]:
# 2. Configuration
CONTROL_MODEL_NAME = "qualifire/prompt-injection-jailbreak-sentinel-v2"
CONTROL_DEVICE = "mps" if torch.backends.mps.is_available() else "cpu"
PF_SERVER_URL = "http://localhost:1000/analyze"
MAX_SAMPLES = 500
BENCHMARK_DATASET = "qualifire/prompt-injections-benchmark"
BENCHMARK_SPLIT = "test"

In [None]:
# 3. Helper Functions
def parse_label(value):
    if isinstance(value, int): return value
    if isinstance(value, str):
        v = value.lower()
        if v in ["jailbreak", "malicious", "unsafe", "attack", "injection", "1"]: return 1
        if v in ["benign", "safe", "legit", "0"]: return 0
    try: return int(value)
    except: return 0

def get_control_prob(text, tokenizer, model, device):
    try:
        inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512).to(device)
        with torch.no_grad():
            logits = model(**inputs).logits
        probs = torch.softmax(logits, dim=-1)
        return probs[0][1].item()  # probability of class 1 (malicious)
    except Exception as e:
        print(f"Error: {e}")
        return 0.5

def get_pf_prediction(text):
    try:
        resp = requests.post(PF_SERVER_URL, json={"prompt": text}, timeout=5)
        resp.raise_for_status()
        data = resp.json()
        pred_label = int(data.get("is_malicious", 0))
        conf = data.get("confidence")
        if conf is None:
            prob_malicious = 1.0 if pred_label == 1 else 0.0
        else:
            prob_malicious = conf if pred_label == 1 else 1 - conf
        return pred_label, prob_malicious
    except Exception as e:
        print(f"Request failed for text: {text[:30]}... -> {e}")
        return 0, 0.5

In [None]:
# 4. Load Benchmark Dataset
ds = load_dataset(BENCHMARK_DATASET, split=BENCHMARK_SPLIT)
ds = ds.shuffle(seed=42).select(range(min(len(ds), MAX_SAMPLES)))