# AI Log Anomaly Detection – Training Notebook

This notebook trains an Isolation Forest on normal logs and evaluates anomalies on a candidate log file. No plaintext secrets are embedded. Any thresholds can be pulled from a vault or environment variables.

In [None]:
import os, re, math, statistics
from typing import List, Dict, Any, Optional
from sklearn.ensemble import IsolationForest

CLF_REGEX = re.compile(r'(\S+)\s+\S+\s+\S+\s+\[([^\]]+)\]\s+"(\S+)\s+(\S+)\s+([^\"]+)"\s+(\d{3})\s+(\S+)\s+"([^\"]*)"\s+"([^\"]*)"')
SUSPICIOUS_TOKENS = ["union","select","drop","sleep(","' or '1'='1","%27","../",";--","xp_cmdshell","<script","benchmark(","load_file","outfile"]

def shannon_entropy(s: str) -> float:
    if not s: return 0.0
    freq = {}
    for ch in s: freq[ch] = freq.get(ch,0)+1
    ent = 0.0
    for c in freq.values():
        p = c/len(s)
        ent -= p*math.log2(p)
    return ent

def parse_line(line: str):
    m = CLF_REGEX.match(line.strip())
    if not m: return None
    ip, ts, method, path, proto, status, bts, ref, ua = m.groups()
    status = int(status)
    bts = 0 if bts == '-' else int(bts)
    return dict(ip=ip, ts=ts, method=method, path=path, proto=proto, status=status, bytes=bts, ref=ref, ua=ua)

def extract_features(d: Dict[str, Any]):
    path = d.get('path',''); ua = d.get('ua',''); method = d.get('method','')
    method_map = {"GET":0,"POST":1,"PUT":2,"DELETE":3,"PATCH":4,"HEAD":5,"OPTIONS":6}
    method_id = method_map.get(method.upper(), -1)
    suspicious = any(tok in path.lower() for tok in SUSPICIOUS_TOKENS)
    qlen = len(path.split('?',1)[1]) if '?' in path else 0
    return [float(method_id), float(d.get('status',0)), float(d.get('bytes',0)), float(len(path)), float(qlen), float(shannon_entropy(path)), float(shannon_entropy(ua)), 1.0 if suspicious else 0.0]

def load_dataset(path: str):
    X, raw = [], []
    with open(path, 'r', encoding='utf-8') as f:
        for line in f:
            d = parse_line(line)
            if not d: continue
            X.append(extract_features(d))
            raw.append(line.rstrip('\n'))
    return X, raw

## Load data
Set the paths to your normal and candidate logs within this lab's `data/` directory.

In [None]:
NORMAL = '../data/normal_traffic.log'
CANDIDATE = '../data/anomalous_traffic.log'
X_train, _ = load_dataset(NORMAL)
X_test, raw = load_dataset(CANDIDATE)
len(X_train), len(X_test)

## Train Isolation Forest and score candidates

In [None]:
iso = IsolationForest(n_estimators=200, contamination='auto', random_state=42)
iso.fit(X_train)
scores = -iso.decision_function(X_test)  # higher = more anomalous
mu = statistics.mean(scores)
sd = statistics.pstdev(scores) or 1.0
zs = [(s - mu)/sd for s in scores]
sum(1 for z in zs if z >= 1.5), len(zs)

## Inspect top anomalies

In [None]:
pairs = list(zip(zs, raw))
pairs.sort(key=lambda x: x[0], reverse=True)
for i,(z,line) in enumerate(pairs[:10]):
    print(f"{i+1:02d}  z={z:.3f}  {line}")