# Lab 02 — TLS 1.3 fingerprints

This notebook summarizes TLS metadata for the lab dataset and prepares `answers.sample.yml`.

**What you’ll do**
- Parse **Zeek** `ssl.log` for ALPN (field: `next_protocol`).
- (Optional) If available, also look at **Suricata** EVE `tls` events.
- Identify **anomalous ALPN** (low-share protocols) using the lab's rubric threshold.
- Write your `answers.sample.yml` for the autograder.

**Note**: The autograder for v1 grades **ALPN anomalies**. JA3 counts are optional here.


## Paths & environment

- In the notebooks container, logs are mounted at `/home/jovyan/logs` and labs at `/home/jovyan/labs`.
- If you run this outside the container, it will fall back to `./logs` and `./labs/...` in the repo.


In [None]:
from pathlib import Path
import json, yaml
from collections import Counter

def resolve_path(container_path: str, local_path: str) -> Path:
    p = Path(container_path)
    return p if p.exists() else Path(local_path)

LOGS_DIR = resolve_path("/home/jovyan/logs", "logs")
LAB_DIR = resolve_path("/home/jovyan/labs/02_tls13_fingerprints", "labs/02_tls13_fingerprints")

print("Using LOGS_DIR:", LOGS_DIR)
print("Using LAB_DIR:", LAB_DIR)
print("Exists:", LOGS_DIR.exists(), LAB_DIR.exists())

## Count ALPN via Zeek `ssl.log`
We prefer Zeek for ALPN in v1. Field name is `next_protocol` (e.g., `h2`, `http/1.1`).


In [None]:
def iter_zeek_tsv(path: Path):
    """Minimal Zeek TSV reader honoring the `#fields` line."""
    if not path.exists():
        return
    fields = None
    with path.open("r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            if line.startswith("#fields"):
                fields = line.rstrip("\n").split("\t")[1:]
                continue
            if line.startswith("#") or not line:
                continue
            parts = line.rstrip("\n").split("\t")
            if fields and len(parts) == len(fields):
                yield dict(zip(fields, parts))

ssl_path = LOGS_DIR / "zeek" / "ssl.log"
alpn_counts_zeek = Counter()
for row in iter_zeek_tsv(ssl_path):
    np = (row.get("next_protocol") or "").strip()
    if np and np != "-":
        alpn_counts_zeek[np] += 1

alpn_counts_zeek

## (Optional) Suricata EVE TLS — fallback only
If your environment emits `event_type: tls`, we can also read ALPN from there.
- Some builds/configs only emit flow classification (with `app_proto: tls`).
- The autograder prefers Zeek; Suricata is used as a fallback if present.


In [None]:
alpn_counts_eve = Counter()
eve_path = LOGS_DIR / "suricata" / "eve.json"
if eve_path.exists():
    with eve_path.open("r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            try:
                obj = json.loads(line)
            except Exception:
                continue
            if obj.get("event_type") != "tls":
                continue
            t = obj.get("tls", {}) or {}
            a_single = t.get("alpn")
            if isinstance(a_single, str) and a_single:
                alpn_counts_eve[a_single] += 1
            a_list = t.get("client_alpns")
            if isinstance(a_list, list):
                for a in a_list:
                    if isinstance(a, str) and a:
                        alpn_counts_eve[a] += 1
            elif isinstance(a_list, str) and a_list:
                alpn_counts_eve[a_list] += 1

alpn_counts_eve if alpn_counts_eve else "(no Suricata TLS EVE events found — this is OK)"

## Choose source & compute anomalies
We choose Zeek if it has ALPNs; otherwise we fall back to Suricata TLS EVE (if present). Then we compute anomalies using the lab rubric threshold (`checks.alpn_min_share`).


In [None]:
alpn_counts = dict(alpn_counts_zeek) if alpn_counts_zeek else dict(alpn_counts_eve)
source = "zeek ssl.log" if alpn_counts_zeek else ("suricata eve.json" if alpn_counts_eve else "none")
print("Using source:", source)
print("ALPN counts:", alpn_counts)

rubric_path = LAB_DIR / "rubric.yml"
rubric = yaml.safe_load(rubric_path.read_text()) if rubric_path.exists() else {"checks": {"alpn_min_share": 0.15}}
threshold = float(rubric["checks"]["alpn_min_share"]) if rubric and "checks" in rubric else 0.15
total = sum(alpn_counts.values())
anomalies = sorted([k for k, v in alpn_counts.items() if total and (v/total) < threshold])
print("Threshold:", threshold)
print("Anomalous ALPN:", anomalies)
anomalies

## (Optional) JA3 top‑N (only if Suricata TLS EVE exists)
Not graded in v1, but you can inspect if your EVE contains `tls.ja3`.


In [None]:
ja3_counts = Counter()
if eve_path.exists():
    with eve_path.open("r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            try:
                obj = json.loads(line)
            except Exception:
                continue
            if obj.get("event_type") != "tls":
                continue
            t = obj.get("tls", {}) or {}
            j = t.get("ja3")
            if isinstance(j, str) and j:
                ja3_counts[j] += 1
            elif isinstance(j, dict) and j.get("hash"):
                ja3_counts[j["hash"]] += 1
ja3_counts.most_common(5) if ja3_counts else "(no JA3 observed — this is OK for v1)"

## Write `answers.sample.yml`
This writes your computed anomalies to the lab folder. You can edit before grading if desired.


In [None]:
answers = {
    "top_ja3": [],  # optional in v1 grading
    "anomalous_alpn": anomalies,
}
out_path = LAB_DIR / "answers.sample.yml"
with out_path.open("w") as f:
    yaml.safe_dump(answers, f, sort_keys=False)
print("Wrote:", out_path)

### Next steps
1. Ensure your logs exist (rerun the stack if needed):
   ```bash
   docker compose -f compose/docker-compose.lite.yml up --build
   ```
2. Grade locally:
   ```bash
   labgrade labs/02_tls13_fingerprints labs/02_tls13_fingerprints/answers.sample.yml --logs ./logs
   ```
3. Expected for the synthetic dataset: `acme-tls/1` should be flagged anomalous.
