# Real-Time Route Optimization System 

## Key Steps
1. **Kafka Producer**: Fetches traffic data and sends it to Kafka.
2. **Kafka Consumer**: Processes traffic data and stores it in the database.
3. **Route Optimization Engine**: Computes optimal routes and checks if they meet the threshold.
4. **Backend API**: Serves optimized routes to the frontend.

## Implementation

### Step 1: Kafka Producer
The Kafka Producer fetches traffic data from the TomTom API and sends it to the Kafka topic `traffic_updates`.

In [None]:
from confluent_kafka import Producer
import requests
import json
import time

# Kafka configuration
conf = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(conf)

# TomTom API configuration
TOMTOM_API_KEY = 'YOU_API_KEY'
TOMTOM_API_URL = "https://api.tomtom.com/traffic/services/4/flowSegmentData/relative0/10/json"

def fetch_traffic_data(lat, lon):
    params = {
        'point': f'{lat},{lon}',
        'unit': 'KMPH',
        'key': TOMTOM_API_KEY
    }
    response = requests.get(TOMTOM_API_URL, params=params)
    return response.json() if response.status_code == 200 else None

def delivery_report(err, msg):
    if err:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()}')

def produce_traffic_data():
    # Example coordinates (update with your delivery points)
    locations = [
        (34.0195, -118.4912),  # Point A
        (34.0250, -118.5000),  # Point B
        (34.0300, -118.5100)   # Point C
    ]
    while True:
        for lat, lon in locations:
            data = fetch_traffic_data(lat, lon)
            if data:
                producer.produce(
                    'traffic_updates',
                    json.dumps(data),
                    callback=delivery_report
                )
                producer.flush()
        time.sleep(300)  # Fetch data every 5 minutes

### Step 2: Kafka Consumer
The Kafka Consumer processes traffic data and stores it in the PostgreSQL database.

In [None]:
from confluent_kafka import Consumer
import psycopg2
import json

# Kafka Configuration
conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'traffic_consumer_group',
    'auto.offset.reset': 'earliest'
}

# PostgreSQL Configuration
conn = psycopg2.connect("dbname=logistics user=postgres password=postgres host=localhost")
cur = conn.cursor()

def process_traffic_data(data):
    try:
        segment_id = data.get('segment_id')
        speed_kmh = data.get('speed_kmh')
        coordinates = data.get('coordinates')

        if not all([segment_id, speed_kmh, coordinates]):
            print("Error: Missing required fields in data")
            return

        cur.execute("""
            INSERT INTO realtime_traffic (segment_id, speed_kmh, coordinates)
            VALUES (%s, %s, ST_SetSRID(ST_MakeLine(
                ST_MakePoint(%s, %s),
                ST_MakePoint(%s, %s)
            ), 4326))
            ON CONFLICT (segment_id) DO UPDATE
            SET speed_kmh = EXCLUDED.speed_kmh,
                coordinates = EXCLUDED.coordinates,
                timestamp = CURRENT_TIMESTAMP
        """, (
            segment_id,
            speed_kmh,
            coordinates[0][0], coordinates[0][1],
            coordinates[1][0], coordinates[1][1]
        ))
        conn.commit()
        print("Data successfully processed and stored in the database.")
    except Exception as e:
        print(f"Error processing data: {e}")

consumer = Consumer(conf)
consumer.subscribe(['traffic_updates'])

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print(f"Error: {msg.error()}")
        continue
    try:
        data = json.loads(msg.value())
        process_traffic_data(data)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")

### Step 3: Route Optimization Engine
The Route Optimization Engine computes optimal routes and checks if they meet the threshold.

example threshold : https://github.com/aurelio-labs/semantic-router/blob/main/docs/06-threshold-optimization.ipynb

In [None]:
import networkx as nx
import psycopg2

def calculate_optimal_route(origin, destinations, threshold=0.1):
    conn = psycopg2.connect("dbname=logistics user=postgres password=postgres host=localhost")
    cur = conn.cursor()

    cur.execute("""
        WITH traffic_data AS (
            SELECT 
                d1.point_id AS origin_id,
                d2.point_id AS destination_id,
                ST_Distance(d1.location, d2.location) AS distance,
                COALESCE(rt.speed_kmh, hd.traffic_factor * 50) AS speed
            FROM delivery_points d1
            CROSS JOIN delivery_points d2
            LEFT JOIN realtime_traffic rt ON ST_Intersects(
                ST_MakeLine(d1.location::geometry, d2.location::geometry),
                rt.coordinates::geometry
            )
            LEFT JOIN historical_deliveries hd ON hd.point_id = d2.point_id
            WHERE d1.point_id = %s AND d2.point_id IN %s
        )
        SELECT origin_id, destination_id, distance / speed AS travel_time
        FROM traffic_data
    """, (origin, tuple(destinations)))

    G = nx.Graph()
    for origin_id, dest_id, travel_time in cur:
        G.add_edge(origin_id, dest_id, weight=travel_time)

    optimal_route = nx.shortest_path(G, source=origin, weight='weight')
    current_route = get_current_route(origin)

    if is_route_improved(optimal_route, current_route, threshold):
        update_route_in_db(optimal_route)
        return optimal_route
    else:
        return current_route

def get_current_route(origin):
    # Fetch the current route from the database
    pass

def is_route_improved(new_route, current_route, threshold):
    # Compare the new route with the current route using the threshold
    pass

def update_route_in_db(route):
    # Update the database with the new route
    pass

### Step 4: Backend API
The Backend API serves optimized routes to the frontend.

In [None]:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from datetime import datetime

app = FastAPI()

class RouteRequest(BaseModel):
    origin: int
    destinations: list[int]

@app.post("/optimize")
async def optimize_route(request: RouteRequest):
    try:
        optimal_route = calculate_optimal_route(request.origin, request.destinations)
        return {
            "optimal_route": optimal_route,
            "timestamp": datetime.now().isoformat()
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

## Conclusion
This notebook provides a basic implementation of the real-time route optimization system. The threshold optimization mechanism ensures that routes are only updated when significant improvements are detected, making the system more efficient.