In [None]:
import time
import json
from kafka import KafkaProducer
from datetime import datetime, timedelta
import random

from uxsim import *

# Initialize Kafka producer
producer = KafkaProducer(bootstrap_servers='localhost:9092')


# Function to send vehicle data to Kafka broker
def send_vehicle_data_to_kafka(vehicle_data, simulator_start_time):
    

    
    for index, row in vehicle_data.iterrows():
        # Skip sending if 'link' contains "waiting_at_origin_node"
        if "waiting_at_origin_node" in row['link']:
            continue
        
        # Calculate the timestamp based on simulator start time and t
        timestamp = (simulator_start_time + timedelta(seconds=row['t'])).strftime("%d/%m/%Y %H:%M:%S")
        
        # Construct JSON object including 't' and 'simulator_start_time'
        vehicle = {
            "name": row['name'],
            "origin": row['orig'],
            "destination": row['dest'],
            "time": timestamp,
            "t": row['t'],  # Include the simulation time 't' in the message
            "simulator_start_time": simulator_start_time.strftime("%d/%m/%Y %H:%M:%S"),  # Add simulator start time
            "link": row['link'],
            "position": row['x'],
            "spacing": row['s'],
            "speed": row['v']
        }
        
        # Send the JSON message to Kafka
        producer.send('vehicle_positions', json.dumps(vehicle).encode('utf-8'))  # Encode string to bytes and send
        time.sleep(1)
    
    # Flush the producer to ensure all messages are sent
    producer.flush()

# Function to simulate vehicles and send data to Kafka
def simulate_and_send_data():
    W = World(
        name="",
        deltan=5,
        tmax=360,  # 1 hour simulation
        print_mode=1,
        save_mode=0,
        show_mode=1,
        random_seed=None,
        duo_update_time=600
    )
    random.seed(None)

    # Network definition
    signal_time = 20
    sf_1 = 1
    sf_2 = 1

    I1 = W.addNode("I1", 1, 0, signal=[signal_time * sf_1, signal_time * sf_2])
    I2 = W.addNode("I2", 2, 0, signal=[signal_time * sf_1, signal_time * sf_2])
    I3 = W.addNode("I3", 3, 0, signal=[signal_time * sf_1, signal_time * sf_2])
    I4 = W.addNode("I4", 4, 0, signal=[signal_time * sf_1, signal_time * sf_2])
    W1 = W.addNode("W1", 0, 0)
    E1 = W.addNode("E1", 5, 0)
    N1 = W.addNode("N1", 1, 1)
    N2 = W.addNode("N2", 2, 1)
    N3 = W.addNode("N3", 3, 1)
    N4 = W.addNode("N4", 4, 1)
    S1 = W.addNode("S1", 1, -1)
    S2 = W.addNode("S2", 2, -1)
    S3 = W.addNode("S3", 3, -1)
    S4 = W.addNode("S4", 4, -1)

    # E <-> W direction: signal group 0
    for n1, n2 in [[W1, I1], [I1, I2], [I2, I3], [I3, I4], [I4, E1]]:
        W.addLink(n2.name + n1.name, n2, n1, length=500, free_flow_speed=50, jam_density=0.2, number_of_lanes=3, signal_group=0)

    # N -> S direction: signal group 1
    for n1, n2 in [[N1, I1], [I1, S1], [N3, I3], [I3, S3]]:
        W.addLink(n1.name + n2.name, n1, n2, length=500, free_flow_speed=30, jam_density=0.2, signal_group=1)

    # S -> N direction: signal group 2
    for n1, n2 in [[N2, I2], [I2, S2], [N4, I4], [I4, S4]]:
        W.addLink(n2.name + n1.name, n2, n1, length=500, free_flow_speed=30, jam_density=0.2, signal_group=1)

    # Random demand definition every 30 seconds
    dt = 60  # Increase this to reduce vehicle entry frequency
    demand = 0.5  # Reduce this to decrease the number of vehicles
    demands = []
    for t in range(0, 3600, dt):
        dem = random.uniform(0, demand)
        for n1, n2 in [[N1, S1], [S2, N2], [N3, S3], [S4, N4]]:
            W.adddemand(n1, n2, t, t + dt, dem * 0.25)
            demands.append({"start": n1.name, "dest": n2.name, "times": {"start": t, "end": t + dt}, "demand": dem})
        for n1, n2 in [[E1, W1], [N1, W1], [S2, W1], [N3, W1], [S4, W1]]:
            W.adddemand(n1, n2, t, t + dt, dem * 0.75)
            demands.append({"start": n1.name, "dest": n2.name, "times": {"start": t, "end": t + dt}, "demand": dem})

    W.exec_simulation()
    W.analyzer.print_simple_stats()

    # Capture simulation start time
    simulator_start_time = datetime.now()
    print("Simulation Start Time:", simulator_start_time)

    # Get vehicle data as pandas DataFrame
    vehicle_data = W.analyzer.vehicles_to_pandas()
    
    # Print all rows of vehicle data
    print(vehicle_data.to_string())

    # Send vehicle data to Kafka broker
    send_vehicle_data_to_kafka(vehicle_data, simulator_start_time)


# Start simulating and sending data to Kafka
simulate_and_send_data()
