# Sentinel Stream â€” Streaming Anomaly Detection (Notebook)

This notebook demonstrates an **industrial-grade streaming pipeline** for anomaly detection:

- Synthetic event stream generation (security-ish telemetry)
- Feature extraction in a rolling window
- Online-style scoring (no lookahead)
- Thresholding + alert summarization

It is designed to be reproducible and easy to port into the `sentinel_stream/` package.

In [None]:
import numpy as np
import pandas as pd
from collections import deque
from dataclasses import dataclass

SEED = 42
rng = np.random.default_rng(SEED)

pd.set_option('display.max_columns', 50)

## 1) Generate a telemetry stream
Each row is an aggregated event (per device, per minute). We inject anomalies: bursts, rare ports, entropy spikes.

In [None]:
n_devices = 50
n_steps = 24 * 60  # 1 day at 1-min steps

device_ids = [f'dev-{i:03d}' for i in range(n_devices)]

rows = []
for t in range(n_steps):
    for dev in device_ids:
        base_conn = rng.poisson(8)
        base_bytes = max(0, rng.normal(80_000, 25_000))
        # categorical-ish signals
        port = int(rng.choice([22, 80, 443, 53, 445, 3389], p=[0.08, 0.25, 0.42, 0.18, 0.05, 0.02]))
        entropy = float(np.clip(rng.normal(3.8, 0.4), 0.0, 8.0))

        # inject anomalies (sparse)
        is_anom = 0
        if rng.random() < 0.0015:
            # burst
            base_conn += int(rng.integers(50, 150))
            base_bytes *= float(rng.uniform(2.5, 6.0))
            entropy = float(np.clip(entropy + rng.uniform(1.0, 2.5), 0.0, 8.0))
            port = int(rng.choice([4444, 31337, 6667, 1337]))
            is_anom = 1

        rows.append((t, dev, base_conn, float(base_bytes), port, entropy, is_anom))

df = pd.DataFrame(rows, columns=['t', 'device_id', 'connections', 'bytes', 'dst_port', 'payload_entropy', 'is_anom'])
df.head(), df['is_anom'].mean()

## 2) Rolling feature extraction (no lookahead)

In [None]:
@dataclass
class RollingStats:
    window: int
    q: deque
    sum_x: float = 0.0
    sum_x2: float = 0.0

    def push(self, x: float):
        self.q.append(x)
        self.sum_x += x
        self.sum_x2 += x * x
        if len(self.q) > self.window:
            old = self.q.popleft()
            self.sum_x -= old
            self.sum_x2 -= old * old

    def mean(self):
        return self.sum_x / max(1, len(self.q))

    def std(self):
        n = len(self.q)
        if n < 2:
            return 0.0
        mu = self.sum_x / n
        var = max(0.0, (self.sum_x2 / n) - (mu * mu))
        return float(np.sqrt(var))

def zscore(x, mu, sigma):
    return 0.0 if sigma <= 1e-9 else float((x - mu) / sigma)

window = 60  # 60 minutes
state = {dev: (RollingStats(window, deque()), RollingStats(window, deque()), RollingStats(window, deque())) for dev in device_ids}

scores = []
for row in df.itertuples(index=False):
    t, dev, conns, bytes_, port, ent, is_anom = row
    st_c, st_b, st_e = state[dev]

    # compute score BEFORE update => no lookahead
    zc = abs(zscore(conns, st_c.mean(), st_c.std()))
    zb = abs(zscore(bytes_, st_b.mean(), st_b.std()))
    ze = abs(zscore(ent, st_e.mean(), st_e.std()))

    rare_port = 1.0 if port not in (22, 53, 80, 443, 445, 3389) else 0.0
    score = 0.55 * zc + 0.35 * zb + 0.15 * ze + 2.0 * rare_port

    scores.append((t, dev, score, is_anom))

    # update window
    st_c.push(float(conns))
    st_b.push(float(bytes_))
    st_e.push(float(ent))

sc = pd.DataFrame(scores, columns=['t','device_id','score','is_anom'])
sc.head()

## 3) Alerting & evaluation

In [None]:
# pick a threshold as a high quantile (unsupervised-ish)
thr = sc['score'].quantile(0.999)
sc['alert'] = (sc['score'] >= thr).astype(int)

precision = (sc.query('alert==1 and is_anom==1').shape[0] / max(1, sc.query('alert==1').shape[0]))
recall = (sc.query('alert==1 and is_anom==1').shape[0] / max(1, sc.query('is_anom==1').shape[0]))
thr, precision, recall

In [None]:
alerts = sc.query('alert==1').sort_values('score', ascending=False).head(15)
alerts