# Docker 

In order to set up Kafka and Zookeeper, we are going to utilize docker containers. We just need to run the "docker-compose up -d" command, and it is going to create the required containers for us.

In [None]:
from confluent_kafka import Producer,  Consumer, KafkaException, KafkaError
import json
import duckdb
import json

# Producer

Here we defin the producer which is going to publish messages to the 'yellow-taxi' topic. 
This topic is to be automatically created in our Kafka instance which is running on the local machine, on the port 9092.
When we send this message we are only going to make a console log about it delivery, if it fails we notify the user, otherwise print out the topic we are writting to, the partition and the message offset. 

In [None]:
BOOTSTRAP_SERVERS = 'localhost:9092'
YELLOW_TAXI_TOPIC = 'yellow-taxi'

def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to topic '{msg.topic()}' "
              f"[{msg.partition()}] @ offset {msg.offset()}")

def create_producer():
    conf = {
        'bootstrap.servers': BOOTSTRAP_SERVERS,
        'client.id': 'python-producer'
    }
    try:
        producer = Producer(conf)
        print("Confluent Kafka Producer initialized successfully.")
        return producer
    except Exception as e:
        print(f"Error initializing Confluent Kafka Producer: {e}")
        return None

def send_message(producer, topic_name, message_value):
    if producer is None:
        print("Producer is not initialized. Cannot send message.")
        return
    try:
        producer.produce(topic_name,
                         value=json.dumps(message_value).encode('utf-8'),
                         callback=delivery_report)
        
        producer.poll(0)
    except Exception as e:
        print(f"Error sending message: {e}")


## Generator functions

We are going to utillize the ducdb library to fetch the data from the parquet files. We decided to do it this way (even though it is not the most optimal), because our files are not sorted by pickup data, and we are low on storage to write it once again to the drive.
We fetch the data in batches of 1.000.000 rows, and return it in the Pandas dataframe so it can be later used by the producer.
We provided functions both for the High Volume Dataset, and Yellow taxi dataset, because we are going to utilize both for the final project.

In [None]:
def get_batch_high_volume(batch_size = 1_000_00):
    high_volume_path = 'data/trip_record_partitioned/high_volume/year=2021/*.parquet'
    offset = 0

    while True:
        df = duckdb.sql(f"""
            select 
                pickup_datetime, 
                dropoff_datetime, 
                PULocationID, 
                DOLocationID, 
                trip_miles as distance, 
                base_passenger_fare as base_fare, 
                congestion_surcharge as congestion_charge, 
                tips
            from '{high_volume_path}'
            order by pickup_datetime
            limit {batch_size}
            offset {offset}
        """).df()
        df['pickup_datetime'] = df['pickup_datetime'].astype('str')
        df['dropoff_datetime'] = df['dropoff_datetime'].astype('str')

        offset = offset + batch_size

        yield df.to_dict(orient='records')

In [None]:
def get_batch_yellow_taxi(batch_size = 10_000_000):
    yellow_taxi_path = 'data/trip_record_partitioned/yellow-taxi/year=2021/*.parquet'
    offset = 0
    
    while True:
        df = duckdb.sql(f'''
            select 
                tpep_pickup_datetime as pickup_datetime,
                tpep_dropoff_datetime as dropoff_datetime,
                PULocationID,
                DOLocationID,
                trip_distance as distance,
                fare_amount as base_fare,
                passenger_count,
                payment_type
            from '{yellow_taxi_path}'
            order by pickup_datetime
            limit {batch_size}
            offset {offset}
        ''').df()
        df['pickup_datetime'] = df['pickup_datetime'].astype('str')
        df['dropoff_datetime'] = df['dropoff_datetime'].astype('str')


        offset = offset + batch_size
        yield df.to_dict(orient='records')

### Running the producer

We need to initialize our producer, and generator function in order to publish data to the topic. 
After we initialize them, we take our rows of data batch by batch and publish them to the topic. We do this until we run out of the rows in the dataset. 
After each abtch we flush the producer so our queue does not get filled up.

In [None]:
gen = get_batch_yellow_taxi()

producer = create_producer()

if producer:
    for records in gen:
        if len(records) == 0:
            break

        for rec in records:
            send_message(producer, YELLOW_TAXI_TOPIC, rec)

        producer.flush()
    print("All messages sent and flushed.")

# Consumer

We need to define the conumer which is going to subscribe to the same topic as our producer. So as before, it is going to subscribe to the 'yellow-taxi' topic, on the local machine using the port 9092. It is going to start processing from the earliest available message.
For the each recieved message we are going to log it to the console.


In [None]:
GROUP_ID = 'my_confluent_consumer_group'
BOOTSTRAP_SERVERS = 'localhost:9092'
YELLOW_TAXI_TOPIC = 'yellow-taxi'

def create_consumer():
    conf = {
        'bootstrap.servers': BOOTSTRAP_SERVERS,
        'group.id': GROUP_ID,
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': True,     
        'auto.commit.interval.ms': 1000 
    }
    try:
        consumer = Consumer(conf)
        print("Confluent Kafka Consumer initialized successfully.")
        return consumer
    except Exception as e:
        print(f"Error initializing Confluent Kafka Consumer: {e}")
        return None

def consume_messages(consumer):
    consumer.subscribe([YELLOW_TAXI_TOPIC])
    print(f"Topic: {YELLOW_TAXI_TOPIC}, Group: {GROUP_ID}")

    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        else:
            decoded_value = json.loads(msg.value().decode('utf-8'))
            print(decoded_value)




Here we define our consumer and use it to recieve the messages from the topic. In order to stop it we can terminate the execution.

In [None]:
consumer = create_consumer()
if consumer:
    consume_messages(consumer)

### Running in paralle

We can run consumer and producer in paralle if we run the 'producer.py' and 'consumer.py' from different terminal session. This way we can see the true power of Kafka messaging system.

# Faust

Here we define a consumer with Faust library. Same as before it is going to subscribe to the 'yellow-taxi' topic, on the local machine, on the port 9092. 
Because Boroughs are not in the provided data, we need to provide the mapping to the conumer function, so we can do our analysis. We load the mappings from the location_to_borough.csv file and use it later in the program. We chose some interesting locations for which we are going to do the anlysis, these locations include JFK Airport, Times Square, and some others. 
We need to define the class, whihc is going to represent the data that is present in the kafka topic. Whith this class defined, we can be assured that all the data that we recieve is going to follow this pattern. 
Also we define a StatsData class which we are going to be using to store our data in RockDb, in memory data base. This table is going to store the data for the last 360 seconds, so we can print out the nececary statistics. 
Whenever we recieve a new message we update the required data for the given borough, and for the location if it is among the ones we are interested in. 
Each 30 seconds we are going to print out required information for the last 60-second window to the user. 

We cannot run this application from the terminal so we provided the faust_consumer.py script which can be run to test its performance.

In [None]:
import faust
import math
import pandas as pd
from datetime import timedelta


BOOTSTRAP_SERVERS = 'localhost:9092'
YELLOW_TAXI_TOPIC = 'yellow-taxi'
WINDOW_SIZE_SECONDS = 300  
WINDOW_EXPIRES_SECONDS = 360

LOCATION_TO_BOROUGH = pd.read_csv('location_to_borough.csv').set_index('LocationID')['Borough'].to_dict()

CHOSEN_LOCATIONS = [
    132,  # JFK Airport
    138,  # LaGuardia Airport
    230,  # Times Square
    161,  # Midtown Center
    237,  # Upper East Side South
    79,   # East Harlem South
    186,  # Penn Station / Madison Sq West
    249,  # West Village
    90,   # Flatiron
    48,   # Clinton East
]

app = faust.App(
    'taxi-stats-consumer',
    broker=f'kafka://{BOOTSTRAP_SERVERS}',
    value_serializer='json',
    store='rocksdb://',  
)


class TaxiRide(faust.Record, serializer='json'):
    pickup_datetime: str
    dropoff_datetime: str
    PULocationID: int
    DOLocationID: int
    distance: float
    base_fare: float
    passenger_count: int 
    payment_type: int

class StatsData(faust.Record):
    count: int = 0
    distance_sum: float = 0.0
    distance_sq_sum: float = 0.0
    fare_sum: float = 0.0
    fare_sq_sum: float = 0.0
    passenger_sum: int = 0
    passenger_sq_sum: int = 0


taxi_topic = app.topic(YELLOW_TAXI_TOPIC, value_type=TaxiRide)

# Tumbling windows: Non-overlapping, fixed-size windows.
borough_stats_table = app.Table(
    'borough_stats',
    default=StatsData,
    key_type=str,
    value_type=StatsData,
    partitions=1
).tumbling(
    timedelta(seconds=WINDOW_SIZE_SECONDS),
    expires=timedelta(seconds=WINDOW_EXPIRES_SECONDS),
    key_index=True 
)

location_stats_table = app.Table(
    'location_stats',
    default=StatsData,
    key_type=int,
    value_type=StatsData,
    partitions=1
).tumbling(
    timedelta(seconds=WINDOW_SIZE_SECONDS),
    expires=timedelta(seconds=WINDOW_EXPIRES_SECONDS),
    key_index=True
)

def calculate_and_format_stats(stats, name):
    if stats.count == 0:
        return f"{name}: No data in current window."

    mean_dist = stats.distance_sum / stats.count
    mean_fare = stats.fare_sum / stats.count
    mean_pass = stats.passenger_sum / stats.count

    std_dist = math.sqrt(max(0, (stats.distance_sq_sum / stats.count) - (mean_dist ** 2)))
    std_fare = math.sqrt(max(0, (stats.fare_sq_sum / stats.count) - (mean_fare ** 2)))
    std_pass = math.sqrt(max(0, (stats.passenger_sq_sum / stats.count) - (mean_pass ** 2)))

    return (
        f"--- {name} (Window) ---\n"
        f"  Count: {stats.count}\n"
        f"  Distance: Mean={mean_dist:.2f}, StdDev={std_dist:.2f}\n"
        f"  Fare:     Mean={mean_fare:.2f}, StdDev={std_fare:.2f}\n"
        f"  Passengers: Mean={mean_pass:.2f}, StdDev={std_pass:.2f}\n"
    )


@app.agent(taxi_topic)
async def process_taxi_ride(stream):
    async for ride in stream:
        location_id = int(ride.PULocationID)
        borough = LOCATION_TO_BOROUGH.get(location_id, "Unknown")

        current_borough_stats = borough_stats_table[borough].value()
        current_borough_stats.count += 1
        current_borough_stats.distance_sum += ride.distance
        current_borough_stats.distance_sq_sum += ride.distance ** 2
        current_borough_stats.fare_sum += ride.base_fare
        current_borough_stats.fare_sq_sum += ride.base_fare ** 2
        current_borough_stats.passenger_sum += ride.passenger_count
        current_borough_stats.passenger_sq_sum += ride.passenger_count ** 2
        borough_stats_table[borough] = current_borough_stats

        if location_id in CHOSEN_LOCATIONS:
            current_location_stats = location_stats_table[location_id].value()
            current_location_stats.count += 1
            current_location_stats.distance_sum += ride.distance
            current_location_stats.distance_sq_sum += ride.distance ** 2
            current_location_stats.fare_sum += ride.base_fare
            current_location_stats.fare_sq_sum += ride.base_fare ** 2
            current_location_stats.passenger_sum += ride.passenger_count
            current_location_stats.passenger_sq_sum += ride.passenger_count ** 2
            location_stats_table[location_id] = current_location_stats


@app.timer(interval=30.0)  
async def print_stats():
    print("\n" + "=" * 40)
    print(f"ROLLING STATISTICS (Window: {WINDOW_SIZE_SECONDS}s)")
    print("=" * 40)

    print("\n## Borough Statistics ##")
    known_boroughs = set(LOCATION_TO_BOROUGH.values())
    for borough in known_boroughs:
        stats = borough_stats_table[borough].now() 
        print(f"{stats}")
        if stats.count > 0:
            print(calculate_and_format_stats(stats, f"Borough: {borough}"))

    print("\n## Chosen Location Statistics ##")
    for loc_id in CHOSEN_LOCATIONS:
        stats = location_stats_table[loc_id].now()
        if stats.count > 0:
            print(calculate_and_format_stats(stats, f"Location ID: {loc_id}"))

    print("=" * 40 + "\n")


if __name__ == '__main__':
    app.main()