# Lab 20 · Network to Timeline

We transform a synthetic packet capture into structured flow, DNS, and HTTP artefacts using SDK-like helpers before deriving a consolidated investigation timeline.

In [None]:
DRY_RUN = True
from notebooks._utils.common import *

DRY_RUN.value = True
CLI_OK.value = shell_available('forensic-cli')
LAB_ID = '20_network'
LAB_ROOT = lab_root(LAB_ID)


## Craft a deterministic pcap-like JSON sample

The flows include TLS, DNS, and HTTP interactions to exercise the downstream parsers.

In [None]:

from pathlib import Path

pcap_records = [
    {
        'type': 'flow',
        'timestamp': '2024-01-01T09:00:00Z',
        'src_ip': '10.0.0.10',
        'src_port': 49512,
        'dst_ip': '198.51.100.20',
        'dst_port': 443,
        'protocol': 'tcp',
        'bytes_sent': 512,
        'bytes_received': 2048,
        'duration_ms': 350,
    },
    {
        'type': 'flow',
        'timestamp': '2024-01-01T09:00:02Z',
        'src_ip': '10.0.0.10',
        'src_port': 49513,
        'dst_ip': '203.0.113.53',
        'dst_port': 53,
        'protocol': 'udp',
        'bytes_sent': 128,
        'bytes_received': 128,
        'duration_ms': 40,
    },
    {
        'type': 'dns',
        'timestamp': '2024-01-01T09:00:02Z',
        'query': 'example.internal',
        'response': '203.0.113.53',
        'rcode': 'NOERROR',
    },
    {
        'type': 'flow',
        'timestamp': '2024-01-01T09:00:05Z',
        'src_ip': '10.0.0.10',
        'src_port': 49514,
        'dst_ip': '93.184.216.34',
        'dst_port': 80,
        'protocol': 'tcp',
        'bytes_sent': 1024,
        'bytes_received': 8192,
        'duration_ms': 1020,
    },
    {
        'type': 'http',
        'timestamp': '2024-01-01T09:00:05Z',
        'method': 'GET',
        'host': 'example.com',
        'uri': '/index.html',
        'status': 200,
        'user_agent': 'LabClient/1.0',
    },
]

pcap_path = LAB_ROOT / 'inputs' / 'pcap.json'
json_dump_sorted(pcap_records, pcap_path)
pcap_path


## Derive network artefacts with SDK-first helpers

The helpers emulate `analysis.network --pcap-json` behaviour by emitting deterministic JSON outputs.

In [None]:

from typing import Dict, List

network_dir = LAB_ROOT / 'outputs' / 'network'
network_dir.mkdir(parents=True, exist_ok=True)


def _select(records: List[Dict], record_type: str) -> list[dict]:
    subset = [r for r in records if r['type'] == record_type]
    subset.sort(key=lambda item: tuple(item.get(key) for key in sorted(item.keys())))
    return subset


flows = _select(pcap_records, 'flow')
dns_records = _select(pcap_records, 'dns')
http_records = _select(pcap_records, 'http')

flows_path = network_dir / 'flows.json'
dns_path = network_dir / 'dns.json'
http_path = network_dir / 'http.json'

json_dump_sorted(flows, flows_path)
json_dump_sorted(dns_records, dns_path)
json_dump_sorted(http_records, http_path)
flows_path, dns_path, http_path


## Inspect generated artefacts

Quick previews verify that the JSON outputs follow the expected schema and ordering.

In [None]:

print('flows.json
', preview(flows_path))
print('
dns.json
', preview(dns_path))
print('
http.json
', preview(http_path))


## Build a consolidated timeline

The timeline joins flow, DNS, and HTTP context into a single chronological view.

In [None]:

timeline_dir = LAB_ROOT / 'outputs' / 'timeline'
timeline_dir.mkdir(parents=True, exist_ok=True)

timeline_events = []
for flow in flows:
    timeline_events.append({
        'timestamp': flow['timestamp'],
        'event': 'network_flow',
        'summary': f"{flow['src_ip']}:{flow['src_port']} -> {flow['dst_ip']}:{flow['dst_port']} ({flow['protocol']})",
    })
for dns in dns_records:
    timeline_events.append({
        'timestamp': dns['timestamp'],
        'event': 'dns_query',
        'summary': f"{dns['query']} -> {dns['response']} ({dns['rcode']})",
    })
for http in http_records:
    timeline_events.append({
        'timestamp': http['timestamp'],
        'event': 'http_request',
        'summary': f"{http['method']} {http['host']}{http['uri']} {http['status']}",
    })

timeline_events.sort(key=lambda item: (item['timestamp'], item['event']))

timeline_json_path = timeline_dir / 'timeline.json'
timeline_csv_path = timeline_dir / 'timeline.csv'
json_dump_sorted({'events': timeline_events}, timeline_json_path)
csv_rows = [(item['timestamp'], item['event'], item['summary']) for item in timeline_events]
csv_write_rows_sorted(csv_rows, timeline_csv_path, header=('timestamp', 'event', 'summary'))
timeline_events


## Visualise the timeline

A simple scatter and stem plot highlights temporal ordering without extra styling.

In [None]:

from datetime import datetime
import matplotlib.pyplot as plt

times = [datetime.strptime(item['timestamp'], '%Y-%m-%dT%H:%M:%SZ') for item in timeline_events]
positions = list(range(len(times)))

plt.figure(figsize=(6, 3))
plt.vlines(times, [0] * len(times), positions)
plt.scatter(times, positions)
plt.xlabel('Timestamp')
plt.ylabel('Event index')
plt.title('Network to Timeline progression')
plt.tight_layout()
plt.show()


## Optional CLI mirror

When `forensic-cli` is present the same flow can be replayed via the command line.

In [None]:

if CLI_OK.value:
    rc, out, err = run_cli(
        ['forensic-cli', 'analysis', 'network', '--pcap-json', str(pcap_path)],
        tolerate=True,
    )
    print('return code:', rc)
    print(out or err)
else:
    print('CLI mirror skipped; forensic-cli not available in this environment.')


## Checkpoint

We assert that the derived artefacts exist to keep CI executions honest.

In [None]:

assert flows_path.exists()
assert timeline_csv_path.exists()
len(timeline_events)
