# 03 - SIEM Alert Examples

Run a handful of samples through the Dynamic GNN, generate explanations,
and produce SIEM-ready ECS JSON alerts.

In [None]:
import sys, json
from pathlib import Path
sys.path.insert(0, str(Path('..').resolve()))

import torch
from src.data.preprocess import load_config
from src.models.dynamic_gnn import DynamicGNN
from src.explain.explainer import explain_sequence
from src.siem.alert_formatter import format_ecs_alert, alert_to_json

cfg = load_config('config/experiment.yaml')
device = torch.device('cpu')
model = DynamicGNN.from_config(cfg)

ckpt = Path('..') / 'results' / 'checkpoints' / 'dynamic_gnn_best.pt'
if ckpt.exists():
    model.load_state_dict(torch.load(ckpt, map_location=device))
    print('Loaded model checkpoint')
else:
    print('No checkpoint found - using untrained model for demo')
model.eval()
print('Model ready')

In [None]:
# Load a few test graph sequences
graphs_path = Path('..') / 'data' / 'graphs' / 'test_graphs.pt'
if graphs_path.exists():
    graphs = torch.load(graphs_path, weights_only=False)
    seq_len = cfg.get('graph', {}).get('sequence_length', 5)
    test_sequences = []
    for i in range(0, min(20, len(graphs) - seq_len + 1), seq_len):
        test_sequences.append(graphs[i:i+seq_len])
    print(f'Loaded {len(test_sequences)} test sequences')
else:
    print('No test graphs found. Run graph builder first.')
    test_sequences = []

In [None]:
# Generate explanations and SIEM alerts for first 5 sequences
feat_names = cfg['data']['flow_feature_columns']

for i, seq in enumerate(test_sequences[:5]):
    bundle = explain_sequence(model, seq, device, top_k_nodes=5, top_k_features=5)
    alert = format_ecs_alert(bundle, inference_ms=0.0, flow_feature_names=feat_names)
    
    print(f'\n=== Sample {i+1} ===')
    print(f'Prediction: {"MALICIOUS" if bundle.prediction == 1 else "BENIGN"} (score={bundle.score:.4f})')
    print(f'Top features: {bundle.top_features[:3]}')
    print(f'Top nodes: {bundle.top_nodes[:3]}')
    print(f'\nECS Alert JSON:')
    print(alert_to_json(alert))

In [None]:
# Save example alerts
out_dir = Path('..') / 'results' / 'alerts'
out_dir.mkdir(parents=True, exist_ok=True)

alerts = []
for seq in test_sequences[:10]:
    bundle = explain_sequence(model, seq, device)
    alert = format_ecs_alert(bundle, flow_feature_names=feat_names)
    alerts.append(alert)

with open(out_dir / 'example_alerts.json', 'w') as f:
    json.dump(alerts, f, indent=2, default=str)
print(f'Saved {len(alerts)} example alerts to {out_dir / "example_alerts.json"}')