# Kafka Demo

### Connect to Kafka Broker Server 
```
ssh -L <local_port>:localhost:<remote_port> <user>@<remote_server> -NTf
```
Find how to connect to Kafka server on Canvas lab 2 assignment page.

### To kill connection
```
lsof -ti:<local_port> | xargs kill -9
```

### Setup
```
python -m pip install kafka-python
```

In [26]:
import os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
from kafka import KafkaConsumer, KafkaProducer

# Update this for your own recitation section :)
topic = "recitation-e"  # x could be b, c, d, e, f

### Producer Mode -> Writes Data to Broker

In [27]:
# Create a producer to write data to kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# [DONE]: Replace '...' with the address of your Kafka bootstrap server
producer = KafkaProducer(
    bootstrap_servers=["localhost:9092"],
    value_serializer=lambda x: dumps(x).encode("utf-8"),
)

# [DONE]: Add cities of your choice
cities = ["Pittsburgh", "New York", "Los Angeles", "San Francisco", "Chicago"]

# Write data via the producer
print("Writing to Kafka Broker")
for i in range(10):
    data = f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")},{cities[randint(0,len(cities)-1)]},{randint(18, 32)}ºC'
    print(f"Writing: {data}")
    producer.send(topic=topic, value=data)
    sleep(1)

Writing to Kafka Broker
Writing: 2025-01-24 11:54:57,Los Angeles,20ºC
Writing: 2025-01-24 11:54:58,Chicago,32ºC
Writing: 2025-01-24 11:54:59,San Francisco,29ºC
Writing: 2025-01-24 11:55:00,Los Angeles,30ºC
Writing: 2025-01-24 11:55:01,Chicago,30ºC
Writing: 2025-01-24 11:55:02,Pittsburgh,18ºC
Writing: 2025-01-24 11:55:03,Los Angeles,32ºC
Writing: 2025-01-24 11:55:04,San Francisco,24ºC
Writing: 2025-01-24 11:55:05,Chicago,24ºC
Writing: 2025-01-24 11:55:06,Chicago,29ºC


### Consumer Mode -> Reads Data from Broker

In [30]:
# Create a consumer to read data from kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# [TODO]: Complete the missing ... parameters/arguments using the Kafka documentation
consumer = KafkaConsumer(
    topic,
    bootstrap_servers=["localhost:9092"],
    auto_offset_reset="earliest",  # Experiment with different values
    # Commit that an offset has been read
    enable_auto_commit=True,
    # How often to tell Kafka, an offset has been read
    auto_commit_interval_ms=5,
)

print("Reading Kafka Broker")
message_count = 0
for message in consumer:
    message = message.value.decode()
    # Default message.value type is bytes!
    print(loads(message))
    os.system(f"echo {message} >> kafka_log.csv")
    message_count += 1
    if message_count >= 10:
        break

consumer.close()

Reading Kafka Broker
2025-01-23 12:06:33,Boston,25ºC
2025-01-23 12:06:35,New York,18ºC
2025-01-23 12:06:36,Pittsburgh,28ºC
2025-01-23 12:06:37,New York,25ºC
2025-01-23 12:06:38,Pittsburgh,18ºC
2025-01-23 12:06:39,Pittsburgh,27ºC
2025-01-23 12:06:40,Pittsburgh,24ºC
2025-01-23 12:06:41,Boston,28ºC
2025-01-23 12:06:42,New York,18ºC
2025-01-23 12:06:43,Pittsburgh,31ºC


# Use kcat!
It's a CLI (Command Line Interface). Previously known as kafkacat


Ref: https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html

In [29]:
# kcat command: connect to local Kafka broker, specify a topic, and consume messages from the earliest offset