In [1]:
import os
import json
import random
import torch
import numpy as np
from tqdm import tqdm
from datasets import load_dataset
from sklearn.metrics import roc_auc_score, average_precision_score, confusion_matrix

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed) 
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

In [3]:
class Model:
    def __init__(self, config):
        self.provider = config["model_info"]["provider"]
        self.name = config["model_info"]["name"]
        self.temperature = float(config["params"]["temperature"])

    def print_model_info(self):
        print(f"{'-'*len(f'| Model name: {self.name}')}\n| Provider: {self.provider}\n| Model name: {self.name}\n{'-'*len(f'| Model name: {self.name}')}")

    def set_API_key(self):
        raise NotImplementedError("ERROR: Interface doesn't have the implementation for set_API_key")
    
    def query(self):
        raise NotImplementedError("ERROR: Interface doesn't have the implementation for query")

In [4]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch.nn.functional as F

device = 'cpu'

class AttentionModel(Model):
    def __init__(self, config):
        super().__init__(config)
        self.name = config["model_info"]["name"]
        self.max_output_tokens = int(config["params"]["max_output_tokens"])
        model_id = config["model_info"]["model_id"]
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)
        self.model = AutoModelForCausalLM.from_pretrained(
            model_id,
            torch_dtype=torch.bfloat16,
            device_map = device,
            trust_remote_code=True,
            attn_implementation="eager",
        ).eval()

        self.top_k = 50
        self.top_p = None

        if config["params"]["important_heads"] == "all":
            attn_size = self.get_map_dim()
            self.important_heads = [[i, j] for i in range(
                attn_size[0]) for j in range(attn_size[1])]
        else:
            self.important_heads = config["params"]["important_heads"]


    def get_map_dim(self):
        _, _, attention_maps, _, _, _ = self.inference("print hi", "")
        attention_map = attention_maps[0]
        return len(attention_map), attention_map[0].shape[1]

    def inference(self, instruction, data, max_output_tokens=None):
        messages = [
            {"role": "system", "content": instruction},
            {"role": "user", "content": "Data: " + data}
        ]

        # Use tokenization with minimal overhead
        text = self.tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True
        )

        instruction_len = len(self.tokenizer.encode(instruction))
        data_len = len(self.tokenizer.encode(data))

        model_inputs = self.tokenizer(
            [text], return_tensors="pt").to(self.model.device)
        input_tokens = self.tokenizer.convert_ids_to_tokens(
            model_inputs['input_ids'][0])

        # find the data token positions
        if "qwen" in self.name:
            data_range = ((3, 3+instruction_len), (-5-data_len, -5))
        elif "phi3" in self.name:
            data_range = ((1, 1+instruction_len), (-2-data_len, -2))
        elif "granite3-8b" in self.name:
            data_range = ((3, 3+instruction_len), (-5-data_len, -5))
        else:
            raise NotImplementedError

        generated_tokens = []
        generated_probs = []
        input_ids = model_inputs.input_ids
        attention_mask = model_inputs.attention_mask

        attention_maps = []

        if max_output_tokens != None:
            n_tokens = max_output_tokens
        else:
            n_tokens = self.max_output_tokens

        with torch.no_grad():
            for i in range(n_tokens):
                output = self.model(
                    input_ids=input_ids,
                    attention_mask=attention_mask,
                    output_attentions=True
                )

                logits = output.logits[:, -1, :]
                probs = F.softmax(logits, dim=-1)
                # next_token_id = logits.argmax(dim=-1).squeeze()
                next_token_id = sample_token(
                    logits[0], top_k=self.top_k, top_p=self.top_p, temperature=1.0)[0]

                generated_probs.append(probs[0, next_token_id.item()].item())
                generated_tokens.append(next_token_id.item())

                if next_token_id.item() == self.tokenizer.eos_token_id:
                    break

                input_ids = torch.cat(
                    (input_ids, next_token_id.unsqueeze(0).unsqueeze(0)), dim=-1)
                attention_mask = torch.cat(
                    (attention_mask, torch.tensor([[1]], device=input_ids.device)), dim=-1)

                attention_map = [attention.detach().cpu().half()
                                 for attention in output['attentions']]
                attention_map = [torch.nan_to_num(
                    attention, nan=0.0) for attention in attention_map]
                attention_map = get_last_attn(attention_map)
                attention_maps.append(attention_map)

        output_tokens = [self.tokenizer.decode(
            token, skip_special_tokens=True) for token in generated_tokens]
        generated_text = self.tokenizer.decode(
            generated_tokens, skip_special_tokens=True)

        return generated_text, output_tokens, attention_maps, input_tokens, data_range, generated_probs


In [5]:
def open_config(config_path):
    with open(config_path, 'r') as f:
        config = json.load(f)
    return config

def create_model(config):
    provider = config["model_info"]["provider"].lower()
    if provider == 'attn-hf':
        model = AttentionModel(config)
    else:
        raise ValueError(f"ERROR: Unknown provider {provider}")
    return model

In [6]:
class AttentionDetector():
    def __init__(self, model, pos_examples=None, neg_examples=None, use_token="first", instruction="Say xxxxxx", threshold=0.5):
        self.name = "attention"
        self.attn_func = "normalize_sum"
        self.model = model
        self.important_heads = model.important_heads
        self.instruction = instruction
        self.use_token = use_token
        self.threshold = threshold
        if pos_examples and neg_examples:
            pos_scores, neg_scores = [], []
            for prompt in pos_examples:
                _, _, attention_maps, _, input_range, generated_probs = self.model.query(
                    prompt, return_type="attention")
                pos_scores.append(self.attn2score(attention_maps, input_range))

            for prompt in neg_examples:
                _, _, attention_maps, _, input_range, generated_probs = self.model.query(
                    prompt, return_type="attention")
                neg_scores.append(self.attn2score(attention_maps, input_range))

            self.threshold = np.mean(neg_scores)

        if pos_examples and not neg_examples:
            pos_scores = []
            for prompt in pos_examples:
                _, _, attention_maps, _, input_range, generated_probs = self.model.query(
                    prompt, return_type="attention")
                pos_scores.append(self.attn2score(attention_maps, input_range))

            self.threshold = np.mean(pos_scores) - 4 * np.std(pos_scores)

    def attn2score(self, attention_maps, input_range):
        if self.use_token == "first":
            attention_maps = [attention_maps[0]]

        scores = []
        for attention_map in attention_maps:
            heatmap = process_attn(
                attention_map, input_range, self.attn_func)
            score = calc_attn_score(heatmap, self.important_heads)
            scores.append(score)

        return sum(scores) if len(scores) > 0 else 0

    def detect(self, data_prompt):
        _, _, attention_maps, _, input_range, _ = self.model.inference(
            self.instruction, data_prompt, max_output_tokens=1)

        focus_score = self.attn2score(attention_maps, input_range)
        return bool(focus_score <= self.threshold), {"focus_score": focus_score}


In [7]:
def get_last_attn(attn_map):
    for i, layer in enumerate(attn_map):
        attn_map[i] = layer[:, :, -1, :].unsqueeze(2)

    return attn_map

def sample_token(logits, top_k=None, top_p=None, temperature=1.0):
    # Optionally apply temperature
    logits = logits / temperature

    # Apply top-k sampling
    if top_k is not None:
        top_k = min(top_k, logits.size(-1))  # Ensure top_k <= vocab size
        values, indices = torch.topk(logits, top_k)
        probs = F.softmax(values, dim=-1)
        next_token_id = indices[torch.multinomial(probs, 1)]

        return next_token_id

    return logits.argmax(dim=-1).squeeze()

In [8]:
def process_attn(attention, rng, attn_func):
    heatmap = np.zeros((len(attention), attention[0].shape[1]))
    for i, attn_layer in enumerate(attention):
        attn_layer = attn_layer.to(torch.float32).numpy()

        if "sum" in attn_func:
            last_token_attn_to_inst = np.sum(attn_layer[0, :, -1, rng[0][0]:rng[0][1]], axis=1)
            attn = last_token_attn_to_inst
        
        elif "max" in attn_func:
            last_token_attn_to_inst = np.max(attn_layer[0, :, -1, rng[0][0]:rng[0][1]], axis=1)
            attn = last_token_attn_to_inst

        else: raise NotImplementedError
            
        last_token_attn_to_inst_sum = np.sum(attn_layer[0, :, -1, rng[0][0]:rng[0][1]], axis=1)
        last_token_attn_to_data_sum = np.sum(attn_layer[0, :, -1, rng[1][0]:rng[1][1]], axis=1)

        if "normalize" in attn_func:
            epsilon = 1e-8
            heatmap[i, :] = attn / (last_token_attn_to_inst_sum + last_token_attn_to_data_sum + epsilon)
        else:
            heatmap[i, :] = attn

    heatmap = np.nan_to_num(heatmap, nan=0.0)

    return heatmap


def calc_attn_score(heatmap, heads):
    score = np.mean([heatmap[l, h] for l, h in heads], axis=0)
    return score



In [18]:
import argparse
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed) 
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def main(args):
    set_seed(args.seed)

    output_logs = f"./result/{args.dataset_name}/{args.model_name}-{args.seed}.json"
    output_result = f"./result/{args.dataset_name}/result.jsonl"
    
    model_config_path = f"configs/model_configs/qwen2-attn_config.json"
    model_config = open_config(config_path=model_config_path)

    model = create_model(config=model_config)
    model.print_model_info()

    dataset = load_dataset("deepset/prompt-injections")
    test_data = dataset['test']
    
    detector = AttentionDetector(model)
    print("===================")
    print(f"Using detector: {detector.name}")

    labels, predictions, scores = [], [], []
    logs = []

    for data in tqdm(test_data):
        result = detector.detect(data['text'])
        detect = result[0]
        score = result[1]['focus_score']

        labels.append(data['label'])
        predictions.append(detect)
        scores.append(1-score)

        result_data = {
            "text": data['text'],
            "label": data['label'],
            "result": result
        }

        logs.append(result_data)

    auc_score = roc_auc_score(labels, scores)
    auprc_score = average_precision_score(labels, scores)

    tn, fp, fn, tp = confusion_matrix(labels, predictions).ravel()
    fnr = fn / (fn + tp) if (fn + tp) > 0 else 0
    fpr = fp / (fp + tn) if (fp + tn) > 0 else 0

    auc_score = round(auc_score, 3)
    auprc_score = round(auprc_score, 3)
    fnr = round(fnr, 3)
    fpr = round(fpr, 3)

    print(f"AUC Score: {auc_score}; AUPRC Score: {auprc_score}; FNR: {fnr}; FPR: {fpr}")
    
    os.makedirs(os.path.dirname(output_logs), exist_ok=True)
    with open(output_logs, "w") as f_out:
        f_out.write(json.dumps({"result": logs}, indent=4))

    os.makedirs(os.path.dirname(output_result), exist_ok=True)
    with open(output_result, "a") as f_out:
        f_out.write(json.dumps({
            "model": args.model_name,
            "seed": args.seed,
            "auc": auc_score,
            "auprc": auprc_score,
            "fnr": fnr,
            "fpr": fpr
        }) + "\n")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Prompt Injection Detection Script")
    
    parser.add_argument("--model_name", type=str, default="qwen-attn", 
                        help="Path to the model configuration file.")
    parser.add_argument("--dataset_name", type=str, default="deepset/prompt-injections", 
                        help="Path to the dataset.")
    parser.add_argument("--seed", type=int, default=0)
    
    args = parser.parse_args(args=[])

    main(args)

-----------------------
| Provider: attn-hf
| Model name: qwen-attn
-----------------------
Using detector: attention


100%|██████████| 116/116 [04:24<00:00,  2.28s/it]


AUC Score: 0.978; AUPRC Score: 0.983; FNR: 0.0; FPR: 0.786


In [20]:
import argparse
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def main(args):
    set_seed(args.seed)

    output_logs = f"./result/{args.dataset_name}/{args.model_name}-{args.seed}.json"
    output_result = f"./result/{args.dataset_name}/result.jsonl"

    model_config_path = f"configs/model_configs/phi3-mini-attn_config.json"
    model_config = open_config(config_path=model_config_path)

    model = create_model(config=model_config)
    model.print_model_info()

    dataset = load_dataset("deepset/prompt-injections")
    test_data = dataset['test']

    detector = AttentionDetector(model)
    print("===================")
    print(f"Using detector: {detector.name}")

    labels, predictions, scores = [], [], []
    logs = []

    for data in tqdm(test_data):
        result = detector.detect(data['text'])
        detect = result[0]
        score = result[1]['focus_score']

        labels.append(data['label'])
        predictions.append(detect)
        scores.append(1-score)

        result_data = {
            "text": data['text'],
            "label": data['label'],
            "result": result
        }

        logs.append(result_data)

    auc_score = roc_auc_score(labels, scores)
    auprc_score = average_precision_score(labels, scores)

    tn, fp, fn, tp = confusion_matrix(labels, predictions).ravel()
    fnr = fn / (fn + tp) if (fn + tp) > 0 else 0
    fpr = fp / (fp + tn) if (fp + tn) > 0 else 0

    auc_score = round(auc_score, 3)
    auprc_score = round(auprc_score, 3)
    fnr = round(fnr, 3)
    fpr = round(fpr, 3)

    print(f"AUC Score: {auc_score}; AUPRC Score: {auprc_score}; FNR: {fnr}; FPR: {fpr}")

    os.makedirs(os.path.dirname(output_logs), exist_ok=True)
    with open(output_logs, "w") as f_out:
        f_out.write(json.dumps({"result": logs}, indent=4))

    os.makedirs(os.path.dirname(output_result), exist_ok=True)
    with open(output_result, "a") as f_out:
        f_out.write(json.dumps({
            "model": args.model_name,
            "seed": args.seed,
            "auc": auc_score,
            "auprc": auprc_score,
            "fnr": fnr,
            "fpr": fpr
        }) + "\n")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Prompt Injection Detection Script")

    parser.add_argument("--model_name", type=str, default="phi3-mini-attn",
                        help="Path to the model configuration file.")
    parser.add_argument("--dataset_name", type=str, default="deepset/prompt-injections",
                        help="Path to the dataset.")
    parser.add_argument("--seed", type=int, default=0)

    args = parser.parse_args(args=[])

    main(args)

Downloading shards: 100%|██████████| 2/2 [02:14<00:00, 67.14s/it]
Loading checkpoint shards: 100%|██████████| 2/2 [00:00<00:00,  5.50it/s]


-----------------------
| Provider: attn-hf
| Model name: phi3-attn
-----------------------
Using detector: attention


  0%|          | 0/116 [00:00<?, ?it/s]You are not running the flash-attention implementation, expect numerical differences.
100%|██████████| 116/116 [07:20<00:00,  3.79s/it]


AUC Score: 0.915; AUPRC Score: 0.917; FNR: 0.0; FPR: 1.0


In [13]:
def generate_response(model, tokenizer, prompt, max_tokens=50):
    """Tạo câu trả lời cho prompt không phải injection."""
    messages = [
        {"role": "user", "content": prompt}
    ]
    text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    model_inputs = tokenizer([text], return_tensors="pt").to(model.device)
    
    with torch.no_grad():
        output = model.generate(
            model_inputs.input_ids,
            max_new_tokens=max_tokens,
            temperature=1.0,
            top_k=50,
            do_sample=True
        )
    
    response = tokenizer.decode(output[0], skip_special_tokens=True)
    return response

parser = argparse.ArgumentParser(description="Kịch bản phát hiện Prompt Injection")
parser.add_argument("--model_name", type=str, default="qwen-attn", 
                    help="Đường dẫn đến tệp cấu hình mô hình.")
parser.add_argument("--seed", type=int, default=0)

args = parser.parse_args(args=[])

# Nhập query
test_query = input("Nhập câu query test của bạn: ")

# Đặt seed
set_seed(args.seed)

# Tải mô hình
model_config_path = "configs/model_configs/qwen2-attn_config.json"
model_config = open_config(config_path=model_config_path)
model = create_model(config=model_config)
model.print_model_info()

# Tạo detector
detector = AttentionDetector(model)
print("===================")
print(f"Sử dụng detector: {detector.name}")

# Phát hiện
result = detector.detect(test_query)
is_injection = result[0]
focus_score = result[1]['focus_score']

print("===================")
print(f"Input: {test_query}")
print("Output: ")
print("Phát hiện prompt injection? ", is_injection)
print("Điểm focus: ", focus_score)

if is_injection:
    print("CẢNH BÁO: Phát hiện prompt injection tiềm năng! Input có thể là độc hại và sẽ không được xử lý.")
else:
    print("Câu trả lời: ")
    response = generate_response(model.model, model.tokenizer, test_query, max_tokens=model.max_output_tokens)
    print(response)

-----------------------
| Provider: attn-hf
| Model name: qwen-attn
-----------------------
Sử dụng detector: attention
Input: what is machine learning
Output: 
Phát hiện prompt injection?  True
Điểm focus:  0.4545251413115433
CẢNH BÁO: Phát hiện prompt injection tiềm năng! Input có thể là độc hại và sẽ không được xử lý.


In [9]:
import argparse
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed) 
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def main(args):
    set_seed(args.seed)

    output_logs = f"./result/{args.dataset_name}/{args.model_name}-{args.seed}.json"
    output_result = f"./result/{args.dataset_name}/result.jsonl"
    
    model_config_path = f"configs/model_configs/granite3_8b-attn_config.json"
    model_config = open_config(config_path=model_config_path)

    model = create_model(config=model_config)
    model.print_model_info()

    dataset = load_dataset("deepset/prompt-injections")
    test_data = dataset['test']
    
    detector = AttentionDetector(model)
    print("===================")
    print(f"Using detector: {detector.name}")

    labels, predictions, scores = [], [], []
    logs = []

    for data in tqdm(test_data):
        result = detector.detect(data['text'])
        detect = result[0]
        score = result[1]['focus_score']

        labels.append(data['label'])
        predictions.append(detect)
        scores.append(1-score)

        result_data = {
            "text": data['text'],
            "label": data['label'],
            "result": result
        }

        logs.append(result_data)

    auc_score = roc_auc_score(labels, scores)
    auprc_score = average_precision_score(labels, scores)

    tn, fp, fn, tp = confusion_matrix(labels, predictions).ravel()
    fnr = fn / (fn + tp) if (fn + tp) > 0 else 0
    fpr = fp / (fp + tn) if (fp + tn) > 0 else 0

    auc_score = round(auc_score, 3)
    auprc_score = round(auprc_score, 3)
    fnr = round(fnr, 3)
    fpr = round(fpr, 3)

    print(f"AUC Score: {auc_score}; AUPRC Score: {auprc_score}; FNR: {fnr}; FPR: {fpr}")
    
    os.makedirs(os.path.dirname(output_logs), exist_ok=True)
    with open(output_logs, "w") as f_out:
        f_out.write(json.dumps({"result": logs}, indent=4))

    os.makedirs(os.path.dirname(output_result), exist_ok=True)
    with open(output_result, "a") as f_out:
        f_out.write(json.dumps({
            "model": args.model_name,
            "seed": args.seed,
            "auc": auc_score,
            "auprc": auprc_score,
            "fnr": fnr,
            "fpr": fpr
        }) + "\n")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Prompt Injection Detection Script")
    
    parser.add_argument("--model_name", type=str, default="granite3",
                        help="Path to the model configuration file.")
    parser.add_argument("--dataset_name", type=str, default="deepset/prompt-injections", 
                        help="Path to the dataset.")
    parser.add_argument("--seed", type=int, default=0)
    
    args = parser.parse_args(args=[])

    main(args)

Loading checkpoint shards: 100%|██████████| 4/4 [00:00<00:00, 35.35it/s]


------------------------------
| Provider: attn-hf
| Model name: granite3-8b-attn
------------------------------
Using detector: attention


100%|██████████| 116/116 [26:19<00:00, 13.61s/it]


AUC Score: 0.996; AUPRC Score: 0.996; FNR: 0.0; FPR: 0.696


In [10]:
def generate_response(model, tokenizer, prompt, max_tokens=50):
    """Tạo câu trả lời cho prompt không phải injection."""
    messages = [
        {"role": "user", "content": prompt}
    ]
    text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    model_inputs = tokenizer([text], return_tensors="pt").to(model.device)
    
    with torch.no_grad():
        output = model.generate(
            model_inputs.input_ids,
            max_new_tokens=max_tokens,
            temperature=1.0,
            top_k=50,
            do_sample=True
        )
    
    response = tokenizer.decode(output[0], skip_special_tokens=True)
    return response

parser = argparse.ArgumentParser(description="Kịch bản phát hiện Prompt Injection")
parser.add_argument("--model_name", type=str, default="qwen-attn", 
                    help="Đường dẫn đến tệp cấu hình mô hình.")
parser.add_argument("--seed", type=int, default=0)

args = parser.parse_args(args=[])

# Nhập query
test_query = input("Nhập câu query test của bạn: ")

# Đặt seed
set_seed(args.seed)

# Tải mô hình
model_config_path = "configs/model_configs/granite3_8b-attn_config.json"
model_config = open_config(config_path=model_config_path)
model = create_model(config=model_config)
model.print_model_info()

# Tạo detector
detector = AttentionDetector(model)
print("===================")
print(f"Sử dụng detector: {detector.name}")

# Phát hiện
result = detector.detect(test_query)
is_injection = result[0]
focus_score = result[1]['focus_score']

print("===================")
print(f"Input: {test_query}")
print("Output: ")
print("Phát hiện prompt injection? ", is_injection)
print("Điểm focus: ", focus_score)

if is_injection:
    print("CẢNH BÁO: Phát hiện prompt injection tiềm năng! Input có thể là độc hại và sẽ không được xử lý.")
else:
    print("Câu trả lời: ")
    response = generate_response(model.model, model.tokenizer, test_query, max_tokens=model.max_output_tokens)
    print(response)

Loading checkpoint shards: 100%|██████████| 4/4 [00:00<00:00, 52.72it/s]


------------------------------
| Provider: attn-hf
| Model name: granite3-8b-attn
------------------------------
Sử dụng detector: attention


The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


Input: the ocean is blue
Output: 
Phát hiện prompt injection?  False
Điểm focus:  0.7635254074887532
Câu trả lời: 
systemKnowledge Cutoff Date: April 2024.
Today's Date: July 23, 2025.
You are Granite, developed by IBM. You are a helpful AI assistant.
userthe ocean is blue
assistantYes, that's correct. The ocean appears blue due to the absorption and scattering of light. Water absorbs longer wavelengths of light, such as red and orange, more quickly than shorter wavelengths like blue and violet.
