# Fleet Management Alerting System

This notebook uses Pathway to process real-time vehicle telemetry data from a NATS message queue, detect anomalies, and publish alerts back to NATS.

In [None]:
import pathway as pw

## 1. Define Data Schema

First, we define the schema for the incoming telemetry data. This ensures that the data is structured correctly.

In [None]:
class TelemetrySchema(pw.Schema):
   vehicle_id: str
   timestamp: str
   lat: float
   lon: float
   engine_temp: int
   fuel_level: int
   brake_health: int

## 2. Ingest Data from NATS

We connect to the NATS server and read the telemetry data from the `fleet.telemetry` topic.

In [None]:
telemetry_table = pw.io.nats.read(
   uri="nats://host.docker.internal:4222",
   topic="fleet.telemetry",
   format="json",
   schema=TelemetrySchema
)

## 3. Define Alerting Logic

A User-Defined Function (UDF) is created to check for conditions that should trigger an alert.

In [None]:
@pw.udf
def detect_alerts(engine_temp, fuel_level, brake_health):
   alerts = []
   if engine_temp > 100:
       alerts.append("High Engine Temp")
   if fuel_level < 20:
       alerts.append("Low Fuel Level")
   if brake_health < 60:
       alerts.append("Poor Brake Health")
   return alerts

## 4. Process Data and Generate Alerts

The `detect_alerts` UDF is applied to the incoming data stream. We then filter out any entries that didn't generate an alert.

In [None]:
alerts = telemetry_table.select(
   vehicle_id=pw.this.vehicle_id,
   timestamp=pw.this.timestamp,
   alert_type=detect_alerts(
       pw.this.engine_temp,
       pw.this.fuel_level,
       pw.this.brake_health
   )
)

# Filter rows with no alerts
alerts = alerts.flatten(pw.this.alert_type).filter(pw.this.alert_type.is_not_none())

## 5. Output Alerts to NATS

The generated alerts are published to the `fleet.alerts` topic on the NATS server.

In [None]:
pw.io.nats.write(
   alerts.select(
       vehicle_id=pw.this.vehicle_id,
       timestamp=pw.this.timestamp,
       alert_type=pw.this.alert_type
   ),
   uri="nats://host.docker.internal:4222",
   topic="fleet.alerts",
   format="json"
)

## 6. Run the Pipeline

Finally, we start the Pathway pipeline to begin processing data.

In [None]:
pw.run()