In [3]:
import pandas as pd
import numpy as np
import json


In [4]:
df = pd.read_csv("reasoned_symbolic_alerts.csv")

print("Loaded alerts:", df.shape)
df.head()


Loaded alerts: (13, 14)


Unnamed: 0,final_anomaly_score,event_score_norm,sequence_score_norm,traffic_rate_log,packet_count_log,flow_duration_log,protocol_tcp,protocol_udp,protocol_http,confidence,mitre_matches,ranked_mitre_techniques,explanation,reasoned_decision
0,0.59979,0.593313,0.609506,15.279559,3.401197,6.135779,1,0,0,0.983807,"[{'rule_id': 'C2_LIKE_BEHAVIOR', 'mitre_family...","[{'mitre_family': 'T1095', 'score': 1.082}]","{'final_anomaly_score': 0.6, 'confidence': 0.9...","{'decision': 'ANOMALOUS', 'top_mitre_technique..."
1,0.332068,0.138351,0.622642,1.110216,5.105945,9.021309,1,0,0,0.515709,"[{'rule_id': 'C2_LIKE_BEHAVIOR', 'mitre_family...","[{'mitre_family': 'T1095', 'score': 0.567}]","{'final_anomaly_score': 0.332, 'confidence': 0...","{'decision': 'ANOMALOUS', 'top_mitre_technique..."
2,0.594597,0.501491,0.734256,7.318563,11.711334,10.011978,1,0,0,0.767235,"[{'rule_id': 'C2_LIKE_BEHAVIOR', 'mitre_family...","[{'mitre_family': 'T1095', 'score': 0.844}]","{'final_anomaly_score': 0.595, 'confidence': 0...","{'decision': 'ANOMALOUS', 'top_mitre_technique..."
3,0.563926,0.281562,0.987473,2.762219,7.813592,10.646778,1,0,0,0.294089,"[{'rule_id': 'C2_LIKE_BEHAVIOR', 'mitre_family...","[{'mitre_family': 'T1095', 'score': 0.323}]","{'final_anomaly_score': 0.564, 'confidence': 0...","{'decision': 'ANOMALOUS', 'top_mitre_technique..."
4,0.507499,0.294241,0.827385,3.105048,7.876638,10.665518,1,0,0,0.466856,"[{'rule_id': 'C2_LIKE_BEHAVIOR', 'mitre_family...","[{'mitre_family': 'T1095', 'score': 0.514}]","{'final_anomaly_score': 0.507, 'confidence': 0...","{'decision': 'ANOMALOUS', 'top_mitre_technique..."


In [14]:
def safe_get(row, col, default=0.0):
    val = row.get(col, default)
    if val is None or (isinstance(val, float) and np.isnan(val)):
        return default
    return val


In [15]:
def build_provenance(row):
    reasoned = row.get("reasoned_decision")

    return {
        "neural_layer": {
            "final_anomaly_score": round(safe_get(row, "final_anomaly_score"), 3),
            "event_score": round(safe_get(row, "event_score_norm"), 3),
            "sequence_score": round(safe_get(row, "sequence_score_norm"), 3),
            "confidence": round(safe_get(row, "confidence"), 3)
        },
        "symbolic_layer": {
            "gate_quantile": row.get("symbolic_gate_quantile"),
            "top_mitre_techniques": (
                reasoned.get("top_mitre_techniques", [])
                if isinstance(reasoned, dict)
                else []
            )
        }
    }


In [16]:
def build_evidence(row):
    return {
        "key_features": {
            "traffic_rate_log": round(safe_get(row, "traffic_rate_log"), 2),
            "packet_count_log": round(safe_get(row, "packet_count_log"), 2),
            "flow_duration_log": round(safe_get(row, "flow_duration_log"), 2)
        }
    }


In [17]:
def build_human_readable_explanation(row):
    reasoned = row.get("reasoned_decision")

    base = {
        "summary": (
            f"Anomalous behavior detected with confidence "
            f"{round(safe_get(row, 'confidence'), 2)}"
        ),
        "provenance": build_provenance(row),
        "evidence": build_evidence(row),
        "counterfactuals": generate_counterfactual(row)
    }

    if not isinstance(reasoned, dict):
        base.update({
            "decision": "ANOMALOUS",
            "mitre_techniques": [],
            "note": "No symbolic rules triggered for this event"
        })
        return base

    base.update({
        "decision": reasoned.get("decision", "ANOMALOUS"),
        "mitre_techniques": reasoned.get("top_mitre_techniques", []),
        "rule_trace": reasoned.get("rule_trace", [])
    })

    return base


In [18]:
explanations = []

for idx, row in df.iterrows():
    try:
        explanations.append(build_human_readable_explanation(row))
    except Exception as e:
        explanations.append({
            "fatal_error": str(e),
            "row_index": idx
        })

df["explanation"] = explanations


In [19]:
df["explanation"].head()


0    {'summary': 'Anomalous behavior detected with ...
1    {'summary': 'Anomalous behavior detected with ...
2    {'summary': 'Anomalous behavior detected with ...
3    {'summary': 'Anomalous behavior detected with ...
4    {'summary': 'Anomalous behavior detected with ...
Name: explanation, dtype: object

In [21]:
df.to_csv(
    "explainable_alerts.csv",
    index=False
)

print("Explainability engine output saved")


Explainability engine output saved
