# Core Metric Functions:

## **calculate_attack_graph_inference()**

- ### **Params tailored to specific test case:**
- ### **GT:** Attacker's interesting honeypots
- ### **INFERRED:** Agent's inferred interesting honeypots for attacker
- ### **TP:** true positives -> INFERRED ∩ GT
- ### **FP:** false positives -> INFERRED / GT
- ### **FN:** false negatives -> GT / INFERRED
- ### **PRECISION:** |TP| / (|TP| + |FP|)
- ### **RECALL:** |TP| / (|TP| + |FN|)
- ### **F1-SCORE:** 2 * (Precision * Recall) / (Precision + Recall)


## **calculate_epoch_efficiency_score()**

- ### **Computes:** (Fully exploited honeypots) / (Total epochs)
- ### Tracks when each honeypot reaches 100% and calculates average time-to-exploit

## **calculate_firewall_rule_efficiency()**

- ### **Computes:** (Rules leading to exploitation progress) / (Total rules added)
- ### Determines effectiveness by checking if rules led to increased exploitation percentages

In [70]:
import json
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from pathlib import Path
from typing import Dict, List, Any
import warnings
import matplotlib.patches as mpatches
import re
warnings.filterwarnings('ignore')

In [2]:
def load_epoch_data(results_dir: str) -> List[Dict[str, Any]]:
    """Load all epoch JSON files from results directory"""
    epoch_files = []
    results_path = Path(results_dir)
    
    # Find all epoch files
    for file_path in results_path.glob("epoch_*.json"):
        try:
            with open(file_path, 'r') as f:
                data = json.load(f)
                epoch_files.append(data)
        except Exception as e:
            print(f"Error loading {file_path}: {e}")
    
    # Sort by epoch number
    epoch_files.sort(key=lambda x: x.get('epoch_number', 0))
    
    print(f"Loaded {len(epoch_files)} epoch files")
    return epoch_files

# Metrics definitions

In [71]:
def total_flags_count(configuration: List[str]) -> int:
    """
    Calculate total flags based on honeypots deployed.
    
    Args:
        configuration: List of honeypots deployed in the environment
    
    Returns:
        Total flags needed for this configuration
    """
    
    total_flags = 0 
    for service in configuration:
        if service == "gitlab":
            total_flags += 2
        else:
            total_flags += 1
    
    return total_flags

def define_custom_golden_steps(configuration: List[str]) -> int:
    """
    Define custom golden steps based on honeypot deployed.
    
    Args:
        configuration: List of honeypots deployed in the environment
    
    Returns:
        Total golden steps needed for this configuration
    """
    
    total_golden_steps = 0 
    if "gitlab" in configuration:
        total_golden_steps = 6 # Epoch 5 for Root access, Epoch 6 for lockdown
    else:
        for service in configuration:
            total_golden_steps += 1
        total_golden_steps += 2 # +1 for initial access, +1 for lockdown  

    return total_golden_steps

def intersection_operator(a, b):
    return list(set(a).intersection(b))

def union_operator(a,b):
    return list(set(a).union(b))

def difference_operator(a, b):
    return list(set(a) - set(b))

def attack_graph_inference(epochs: List[Dict], configuration: List[str]) -> Dict[str, Any]:
    interesting_honeypots = ["gitlab", "struts2", "docker"]
    GT = intersection_operator(interesting_honeypots, configuration)
    print(f"GT: {GT}")
    data = epochs[-1].get("inferred_attack_graph", "").values()
    inferred_honeypots = []
    for value in data:
        if re.match('struts', value):
            inferred_honeypots.append("struts2")
        else:
            inferred_honeypots.append(value.split(" ")[0])
    
    print(f"Inferred Honeypots: {inferred_honeypots}")
    TP = intersection_operator(inferred_honeypots, GT)
    FP = difference_operator(inferred_honeypots, GT)
    FN = difference_operator(GT, inferred_honeypots)
    precision = len(TP) / (len(TP) + len(FP))
    recall = len(TP) / (len(TP) + len(FN))
    
    f1 = 2 * precision * recall / (precision + recall)
    return {
        "precision" : precision,
        "recall": recall,
        "f1": f1
        }

def calculate_epoch_efficiency_score(epochs: List[Dict], configuration: List[str]) -> float:
    """
    Calculate how efficiently the agent achieved honeypot exploitation based on golden steps.
    
    Args:
        epochs: List of epoch data dictionaries
        configuration: List of honeypots deployed in the environment

    Returns:
        Dictionary with efficiency metrics
    """
    if not epochs:
        return {"efficiency_score": 0.0, "epochs_to_completion": 0}

    total_golden_steps = define_custom_golden_steps(configuration)
    total_epochs = len(epochs)

    efficiency = (total_golden_steps / total_epochs) * 100 if total_epochs > 0 else 0.0
    return efficiency
    
def calculate_firewall_rule_efficiency(epochs: List[Dict], configuration: List[str]) -> Dict[str, float]:
    """
    Calculate the efficiency of firewall rule decisions based on firewall updates and flag capture progression.
    
    Args:
        epochs: List of epoch data dictionaries
        
    Returns:
        Dictionary with rule efficiency metrics
    """
    if not epochs:
        return {"rule_efficiency": 0.0, "total_updates": 0}
    
    total_updates = 0
    effective_updates = 0
    rules_by_epoch = []
    flags_captured = set()
    for epoch_idx, epoch in enumerate(epochs):
        rules_added = epoch.get("firewall_rules_added", [])
        rules_removed = epoch.get("firewall_rules_removed", [])
        current_flags = len(epoch.get("flags_captured", []))
        flags_captured.add(flag for flag in epoch.get("flags_captured", []))
        
        # Check if there was any firewall update (rules added OR removed)
        has_update = len(rules_added) > 0 or len(rules_removed) > 0
        
        is_effective = False
        
        if has_update:
            total_updates += 1
            
            # Check effectiveness based on next epoch's flags
            if epoch_idx == len(epochs) - 1:
                # Last epoch - effective if lockdown activated and no flags captured
                lockdown_activated = epoch.get("lockdown_activated", False)
                attack_graph = epoch.get("honeypots_exploitation", {})
                percentages = []
                if attack_graph:
                    for ip, data in attack_graph.items():
                        percentages.append(data.get("percentage", 0))
                is_effective = lockdown_activated and percentages and all(p>= 100 for p in percentages) and (flags_captured == len(configuration))
            else:
                # Not last epoch - check if next epoch has more flags than current
                next_epoch = epochs[epoch_idx + 1]
                next_flags = len(next_epoch.get("flags_captured", []))
                is_effective = next_flags > current_flags
            
            if is_effective:
                effective_updates += 1
        
        rules_by_epoch.append({
            "epoch": epoch_idx + 1,
            "has_update": has_update,
            "flags_captured": current_flags,
            "effective": is_effective if has_update else False
        })
    total_epochs = len(epochs)
    # Calculate efficiency
    rule_efficiency = ((effective_updates / (total_epochs)) * 100 if effective_updates > 0 else 0.0)

    return {
        "rule_efficiency": rule_efficiency,
        "total_updates": total_updates,
        "effective_updates": effective_updates,
        "total_epochs": total_epochs,
        "rules_by_epoch": rules_by_epoch
    }

# Computation

In [66]:
test_cases = ["Test Docker + Struts + Gitlab - Exploitation maximizing", "Test Gitlab + Struts - Assistant + Summary Fast Log"]
configuration = [["docker", "struts", "gitlab"], ["gitlab", "struts"]]
for path, config in zip(test_cases, configuration):
    print(f"Analyzing {path} with configuration: {config}")
    epochs = load_epoch_data(f"./{path}")
    ees = calculate_epoch_efficiency_score(epochs, config)
    fue = calculate_firewall_rule_efficiency(epochs, config)
    print(f"Epoch Efficiency Score: {ees}\nFirewall Rule Efficiency: {fue["rule_efficiency"]}")
    print("\n" + "=" * 128 + "\n")

Analyzing Test Docker + Struts + Gitlab - Exploitation maximizing with configuration: ['docker', 'struts', 'gitlab']
Loaded 8 epoch files
Epoch Efficiency Score: 75.0
Firewall Rule Efficiency: 37.5


Analyzing Test Gitlab + Struts - Assistant + Summary Fast Log with configuration: ['gitlab', 'struts']
Loaded 7 epoch files
Epoch Efficiency Score: 85.71428571428571
Firewall Rule Efficiency: 28.57142857142857




In [72]:
test_cases = ["Test Docker + Struts + 2 decoys - Inference + Summary Fast Log"]
configuration = [["docker", "struts2", "activemq", "bash"]]
result = None
for path, config in zip(test_cases, configuration):
    epochs = load_epoch_data(f"./{path}")
    result = attack_graph_inference(epochs, config)

for k,v in result.items():
    print(f"{k}: {v}")


Loaded 5 epoch files
GT: ['struts2', 'docker']
Inferred Honeypots: ['struts2']
precision: 1.0
recall: 0.5
f1: 0.6666666666666666
