In [None]:
# Install Pathway (AI Engine), Streamlit (Dashboard), and Ngrok (Tunnel)
!pip install -q pathway streamlit pyngrok pandas pydeck watchdog

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
opentelemetry-exporter-otlp-proto-http 1.38.0 requires opentelemetry-exporter-otlp-proto-common==1.38.0, but you have opentelemetry-exporter-otlp-proto-common 1.39.1 which is incompatible.
opentelemetry-exporter-otlp-proto-http 1.38.0 requires opentelemetry-proto==1.38.0, but you have opentelemetry-proto 1.39.1 which is incompatible.
opentelemetry-exporter-otlp-proto-http 1.38.0 requires opentelemetry-sdk~=1.38.0, but you have opentelemetry-sdk 1.39.1 which is incompatible.
opentelemetry-exporter-gcp-logging 1.11.0a0 requires opentelemetry-sdk<1.39.0,>=1.35.0, but you have opentelemetry-sdk 1.39.1 which is incompatible.
bigframes 2.35.0 requires google-cloud-bigquery[bqstorage,pandas]>=3.36.0, but you have google-cloud-bigquery 3.29.0 which is incompatible.[0m[31m
[0m

In [None]:
%%writefile generator.py
import time
import csv
import os
import random
import datetime

DATA_DIR = "stream_data"
if not os.path.exists(DATA_DIR):
    os.makedirs(DATA_DIR)

# --- FLEET CONFIGURATION ---
# We simulate 4 trucks with different behaviors and tech levels
trucks = [
    # MODERN TRUCKS (Have J1939 Sensors)
    {"id": "DL-01-J-1001", "type": "MODERN_J1939", "lat": 28.5355, "lon": 77.3910, "last_speed": 40, "behavior": "GOOD"},
    {"id": "KA-53-J-2002", "type": "MODERN_J1939", "lat": 28.5400, "lon": 77.3800, "last_speed": 0,  "behavior": "IDLER"},

    # LEGACY TRUCKS (GPS Only - No Fuel Sensors)
    {"id": "UP-16-L-3003", "type": "LEGACY_GPS",   "lat": 28.5300, "lon": 77.4000, "last_speed": 60, "behavior": "SPEEDER"},
    {"id": "HR-26-L-4004", "type": "LEGACY_GPS",   "lat": 28.5200, "lon": 77.4100, "last_speed": 45, "behavior": "GOOD"},
]

def calculate_physics_burn(speed, acceleration, engine_on):
    """Calculates realistic fuel burn based on physics (Used for Modern Truck Sensors)"""
    if not engine_on: return 0.0
    base_idle = 1.5
    drag = speed * 0.08
    load = acceleration * 0.5 if acceleration > 0 else 0
    return round(base_idle + drag + load, 2)

def update_truck(truck):
    prev_speed = truck["last_speed"]
    traffic = random.choices(["CLEAR", "MODERATE", "JAM"], weights=[0.7, 0.2, 0.1])[0]


    if traffic == "JAM":
        speed = random.randint(0, 5)
    elif truck["behavior"] == "IDLER" and random.random() < 0.3:
        speed = 0 # Idling
    elif truck["behavior"] == "SPEEDER":
        speed = random.randint(80, 110)
    else:
        speed = random.randint(40, 60)

    if speed > 0:
        truck["lat"] += random.uniform(-0.002, 0.002)
        truck["lon"] += random.uniform(-0.002, 0.002)

    acceleration = speed - prev_speed
    truck["last_speed"] = speed

    if truck["type"] == "MODERN_J1939":
        true_burn = calculate_physics_burn(speed, acceleration, True)
        sensor_reading = true_burn + random.uniform(-0.1, 0.1) # Sensor Noise
        rpm_reading = int(speed * 25) if speed > 0 else 800
    else:
        sensor_reading = None
        rpm_reading = None

    return {
        "timestamp": datetime.datetime.now().isoformat(),
        "vehicle_id": truck["id"],
        "truck_type": truck["type"],
        "gps_latitude": round(truck["lat"], 6),
        "gps_longitude": round(truck["lon"], 6),
        "gps_speed_kmph": speed,
        "engine_status": "ON",
        "j1939_fuel_rate_lph": sensor_reading, # Can be Float or None
        "j1939_engine_speed_rpm": rpm_reading
    }

print("üöÄ Hybrid Fleet Generator Started... Writing to ./stream_data/")
while True:
    batch = [update_truck(t) for t in trucks]
    filename = f"telemetry_{time.time_ns()}.csv"
    filepath = os.path.join(DATA_DIR, filename)

    with open(filepath, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=batch[0].keys())
        writer.writeheader()
        writer.writerows(batch)

    time.sleep(2)

Overwriting generator.py


In [None]:
%%writefile main.py
import pathway as pw

class TelemetrySchema(pw.Schema):
    timestamp: str
    vehicle_id: str
    truck_type: str
    gps_latitude: float
    gps_longitude: float
    gps_speed_kmph: float
    engine_status: str
    j1939_fuel_rate_lph: float | None
    j1939_engine_speed_rpm: float | None

input_stream = pw.io.csv.read("./stream_data/", schema=TelemetrySchema, mode="streaming")

def standardize_fuel_rate(sensor_val: float | None, speed: float, engine_status: str) -> float:
    if sensor_val is not None:
        return float(sensor_val)
    if engine_status == "OFF":
        return 0.0
    return float(1.5 + (speed * 0.08))

def calc_fuel_volume_per_tick(fuel_rate_lph: float) -> float:
    return float(fuel_rate_lph * (2.0 / 3600.0))

def calculate_waste(speed: float, fuel_volume: float) -> float:
    if speed == 0.0 and fuel_volume > 0.0:
        return float(fuel_volume)
    return 0.0

def calculate_co2(fuel_volume: float) -> float:
    return float(fuel_volume * 2.68)

intermediate_data = input_stream.select(
    *pw.this,
    final_fuel_rate = pw.apply(
        standardize_fuel_rate,
        pw.this.j1939_fuel_rate_lph,
        pw.this.gps_speed_kmph,
        pw.this.engine_status
    )
)

processed_data = intermediate_data.select(
    *pw.this,
    co2_emissions = pw.apply(calculate_co2, pw.apply(calc_fuel_volume_per_tick, pw.this.final_fuel_rate)),
    wasted_fuel_liters = pw.apply(calculate_waste, pw.this.gps_speed_kmph, pw.apply(calc_fuel_volume_per_tick, pw.this.final_fuel_rate))
)

alerts = processed_data.filter(pw.this.wasted_fuel_liters > 0.0)

pw.io.csv.write(processed_data, "./live_dashboard_data.csv")
pw.io.csv.write(alerts, "./live_alerts.csv")
pw.run()

Overwriting main.py


In [None]:
%%writefile dashboard.py
import streamlit as st
import pandas as pd
import pydeck as pdk
import time
import os

st.set_page_config(layout="wide", page_title="EcoPath AI Command Center")
st.title("üåç EcoPath AI: Real-Time Fleet Intelligence")

placeholder = st.empty()

while True:
    if os.path.exists("live_dashboard_data.csv"):
        try:
            df = pd.read_csv("live_dashboard_data.csv")

            df['timestamp'] = pd.to_datetime(df['timestamp'])
            uptime_seconds = (df['timestamp'].max() - df['timestamp'].min()).total_seconds()
            mins, secs = divmod(int(uptime_seconds), 60)

            with placeholder.container():
                st.info(f"‚è±Ô∏è **Session Uptime:** {mins}m {secs}s | üì° **Live Data Points:** {len(df)}")

                k1, k2, k3, k4 = st.columns(4)
                k1.metric("üöõ Total Fleet", len(df["vehicle_id"].unique()))
                k2.metric("‚ö° Active Trucks", len(df[df["engine_status"]=="ON"]))

                k3.metric("‚òÅÔ∏è CO2 Emissions", f"{df['co2_emissions'].sum():.4f} kg")
                k4.metric("‚ö†Ô∏è Wasted Fuel", f"{df['wasted_fuel_liters'].sum():.4f} L", delta_color="inverse")

                df["color"] = df["wasted_fuel_liters"].apply(lambda x: [255, 0, 0, 200] if x > 0 else [0, 255, 0, 150])

                st.pydeck_chart(pdk.Deck(
                    map_style='mapbox://styles/mapbox/dark-v9',
                    initial_view_state=pdk.ViewState(latitude=28.5355, longitude=77.3910, zoom=13, pitch=50),
                    layers=[
                        pdk.Layer(
                            "ScatterplotLayer",
                            data=df,
                            get_position=["gps_longitude", "gps_latitude"],
                            get_fill_color="color",
                            get_radius=150,
                            pickable=True,
                            auto_highlight=True,
                        ),
                    ],
                    tooltip={"text": "Truck: {vehicle_id}\nSpeed: {gps_speed_kmph} km/h"}
                ))

                st.subheader("üö® Real-Time Alerts (Idling Detected)")
                alerts = df[df["wasted_fuel_liters"] > 0][["timestamp", "vehicle_id", "truck_type", "wasted_fuel_liters"]]
                if not alerts.empty:
                    st.error(f"ATTENTION: {len(alerts)} trucks are currently wasting fuel!")
                    st.dataframe(alerts.tail(5), use_container_width=True)
                else:
                    st.success("‚úÖ Fleet operating efficiently. No wastage detected.")

        except Exception as e:
            st.warning("Processing stream data...")

    time.sleep(1)

Overwriting dashboard.py


In [None]:
import subprocess
from pyngrok import ngrok
import time

NGROK_TOKEN = "PASTE_YOUR_NGROK_TOKEN_HERE"
ngrok.set_auth_token(NGROK_TOKEN)

!pkill -f generator.py
!pkill -f main.py
!pkill -f streamlit

print("‚öôÔ∏è Starting Generator (Data Factory)...")
subprocess.Popen(["python", "generator.py"])
time.sleep(2)

print("üß† Starting Pathway Engine (AI Brain)...")
subprocess.Popen(["python", "main.py"])

print("üåê Starting Streamlit Dashboard (Frontend)...")
subprocess.Popen(["streamlit", "run", "dashboard.py", "--server.port", "8501"])
time.sleep(3)

public_url = ngrok.connect(8501).public_url
print("\n" + "="*50)
print(f"üöÄ YOUR PROJECT IS LIVE! CLICK HERE: {public_url}")
print("="*50)

‚öôÔ∏è Starting Generator (Data Factory)...
üß† Starting Pathway Engine (AI Brain)...
üåê Starting Streamlit Dashboard (Frontend)...

üöÄ YOUR PROJECT IS LIVE! CLICK HERE: https://unmilitaristic-castiel-overmodestly.ngrok-free.dev
