# 02 — ETL Training
## Building the pipeline: ingestion → Kafka → processing → TimescaleDB

This notebook demonstrates the ETL flow used in the Fleet Data Pipeline: generate telemetry and perception events, and (conceptually) how they are consumed and written to TimescaleDB.

In [None]:
import sys
from pathlib import Path
sys.path.insert(0, str(Path().resolve().parent))

from src.ingestion.producer import build_vehicle_telemetry, build_perception_event, build_driving_event

### Simulate one vehicle state and generate records

In [None]:
state = {
    "speed": 55,
    "battery": 72,
    "lat": 37.3947,
    "lon": -122.1503,
    "city": "Palo Alto",
    "start_location": {"name": "Tesla HQ"},
    "destination": {"name": "Stanford University"},
    "autopilot_engaged": True,
    "odometer_km": 1000.0,
}

telemetry = build_vehicle_telemetry(1, state)
perception = build_perception_event(1)
driving = build_driving_event(1, state)

print("Telemetry:", telemetry)
print("Perception:", perception)
print("Driving event:", driving)

### ETL flow summary

1. **Ingestion**: `producer.py` sends JSON to Kafka topics: `vehicle_telemetry`, `perception_events`, `driving_events`.
2. **Processing**: `consumer.py` reads from Kafka, applies metric rules (speed violation, low battery, collision risk), and batch-inserts into TimescaleDB.
3. **Storage**: Hypertables `vehicle_telemetry`, `perception_events`, `driving_events`, `alerts`.

In [None]:
from src.storage.db import get_conn, insert_vehicle_telemetry, insert_alert

# Example: write one telemetry row (requires DB up)
try:
    with get_conn() as conn:
        insert_vehicle_telemetry(conn, [telemetry])
    print("Inserted 1 telemetry row.")
except Exception as e:
    print("DB not available:", e)