In [None]:
import os
from kafka import KafkaProducer
import pandas as pd
import json
import time
from dotenv import load_dotenv
import heapq

load_dotenv(dotenv_path=os.path.expanduser("~/.bashrc"), override=True)

# print()


broker_list = os.environ.get("KAFKA_BROKERS", "localhost:9092").split(",")
print(broker_list)
producer = KafkaProducer(bootstrap_servers=broker_list,value_serializer=lambda v: json.dumps(v).encode('utf-8'))

master_url = os.environ.get("SPARK_MASTER_URL", "spark://localhost:7077")

#spark = SparkSession.builder \
#    .appName("TaxiTripStream") \
#    .master(master_url).getOrCreate()

df = pd.read_parquet('~/taxi_files/yellow_tripdata_2025-02.parquet')
df = df.head(10000)

# df["trip_id"] = [str(uuid.uuid4()) for _ in range(len(df))]
df["trip_id"] = [id for id in range(len(df))]

start_df = df[["trip_id", "tpep_pickup_datetime", "PULocationID", "DOLocationID", "passenger_count"]].copy()
start_df["event"] = "start"
start_df.rename(columns={"tpep_pickup_datetime": "timestamp"}, inplace=True)

end_df = df[["trip_id", "tpep_dropoff_datetime", "PULocationID", "DOLocationID", "total_amount"]].copy()
end_df["event"] = "end"
end_df.rename(columns={"tpep_dropoff_datetime": "timestamp"}, inplace=True)

start_df = start_df.sort_values("timestamp")
end_df = end_df.sort_values("timestamp")
merged_stream = heapq.merge(
    start_df.to_dict(orient="records"),
    end_df.to_dict(orient="records"),
    key=lambda x: x["timestamp"]
)

SPEEDUP = 60.0*2
first_row = next(merged_stream)
start_time = first_row["timestamp"]
real_start = time.time()

for row in merged_stream:
    logical_elapsed = (row["timestamp"] - start_time).total_seconds()
    target_time = real_start + logical_elapsed / SPEEDUP
    sleep_duration = target_time - time.time()

    if sleep_duration > 0:
        time.sleep(sleep_duration)

    # print(row)
    topic = "trips-start" if row["event"] == "start" else "trips-end"
    # msg = row.drop("timestamp").to_dict()
    msg= row.copy()
    
    msg["timestamp"] = row["timestamp"].isoformat()
    print(msg)
    producer.send(topic, key=str(row["trip_id"]).encode("utf-8") , value=msg)
    # print(f"[{row['event'].upper()}] {msg['trip_id']} @ {row['timestamp']}")

producer.flush()


python-dotenv could not parse statement starting at line 6
python-dotenv could not parse statement starting at line 7
python-dotenv could not parse statement starting at line 8
python-dotenv could not parse statement starting at line 16
python-dotenv could not parse statement starting at line 24
python-dotenv could not parse statement starting at line 34
python-dotenv could not parse statement starting at line 39
python-dotenv could not parse statement starting at line 40
python-dotenv could not parse statement starting at line 47
python-dotenv could not parse statement starting at line 49
python-dotenv could not parse statement starting at line 58
python-dotenv could not parse statement starting at line 64
python-dotenv could not parse statement starting at line 67
python-dotenv could not parse statement starting at line 76
python-dotenv could not parse statement starting at line 77
python-dotenv could not parse statement starting at line 78
python-dotenv could not parse statement sta

['34.118.105.77:9092']
{'trip_id': 2552, 'timestamp': '2025-01-31T22:30:00', 'PULocationID': 68, 'DOLocationID': 246, 'total_amount': 17.06, 'event': 'end'}
{'trip_id': 2553, 'timestamp': '2025-01-31T22:33:59', 'PULocationID': 48, 'DOLocationID': 163, 'passenger_count': 1.0, 'event': 'start'}
{'trip_id': 2553, 'timestamp': '2025-01-31T22:42:53', 'PULocationID': 48, 'DOLocationID': 163, 'total_amount': 15.75, 'event': 'end'}
{'trip_id': 2554, 'timestamp': '2025-01-31T23:01:44', 'PULocationID': 48, 'DOLocationID': 48, 'passenger_count': 1.0, 'event': 'start'}
{'trip_id': 2554, 'timestamp': '2025-01-31T23:04:15', 'PULocationID': 48, 'DOLocationID': 48, 'total_amount': 13.28, 'event': 'end'}
{'trip_id': 2555, 'timestamp': '2025-01-31T23:07:03', 'PULocationID': 48, 'DOLocationID': 90, 'passenger_count': 1.0, 'event': 'start'}
{'trip_id': 2555, 'timestamp': '2025-01-31T23:18:01', 'PULocationID': 48, 'DOLocationID': 90, 'total_amount': 22.31, 'event': 'end'}
{'trip_id': 2639, 'timestamp': '20