# Kafka Demo

### Connect to Kafka Broker Server 
```
ssh -L <local_port>:localhost:<remote_port> <user>@<remote_server> -NTf
```
Find how to connect to Kafka server on Canvas lab 2 assignment page.

### To kill connection
```
lsof -ti:<local_port> | xargs kill -9
```

### Setup
```
python -m pip install kafka-python
```

In [3]:
import os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
from kafka import KafkaConsumer, KafkaProducer

# Update this for your own recitation section :)
topic = 'topic-apoorv' # x could be b, c, d, e, f

### Producer Mode -> Writes Data to Broker

In [4]:
# Create a producer to write data to kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# [TODO]: Replace '...' with the address of your Kafka bootstrap server
producer = KafkaProducer(bootstrap_servers=["localhost:9092"],
                        value_serializer=lambda x: dumps(x).encode('utf-8'))

# [TODO]: Add cities of your choice
cities = ["Delhi","Mumbai","Indore","Chicago","NY"]

# Write data via the producer
print("Writing to Kafka Broker")
for i in range(10):
    data = f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")},{cities[randint(0,len(cities)-1)]},{randint(18, 32)}ºC'
    print(f"Writing: {data}")
    producer.send(topic=topic, value=data)
    sleep(1)

Writing to Kafka Broker
Writing: 2025-02-06 23:25:46,Mumbai,21ºC
Writing: 2025-02-06 23:25:47,Delhi,23ºC
Writing: 2025-02-06 23:25:48,Chicago,19ºC
Writing: 2025-02-06 23:25:49,Chicago,19ºC
Writing: 2025-02-06 23:25:50,Indore,31ºC
Writing: 2025-02-06 23:25:51,Chicago,32ºC
Writing: 2025-02-06 23:25:52,Mumbai,28ºC
Writing: 2025-02-06 23:25:53,Chicago,26ºC
Writing: 2025-02-06 23:25:54,Mumbai,19ºC
Writing: 2025-02-06 23:25:55,Chicago,28ºC


### Consumer Mode -> Reads Data from Broker

In [None]:
# Create a consumer to read data from kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# [TODO]: Complete the missing ... parameters/arguments using the Kafka documentation
consumer = KafkaConsumer(
    'topic-apoorv',   
    bootstrap_servers=["localhost:9092"],
    auto_offset_reset='earliest',  # Options: 'earliest', 'latest', 'none'
    enable_auto_commit=True,
    auto_commit_interval_ms=1000,
    group_id='my_consumer_group',  
    value_deserializer=lambda x: x.decode('utf-8')  # Decoding messages from bytes
)

import csv
import os
from json import loads

print('Reading Kafka Broker')
if not os.path.exists('kafka_log.csv'):
    with open('kafka_log.csv', 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['timestamp', 'city', 'temperature'])

for message in consumer:
    message_value = message.value  # Already a string
    try:
        timestamp, city, temperature_str = message_value.strip().split(',', 2)
        temperature = temperature_str.split()[0].replace('degreeCelcius', '°C').replace('ºC', '°C')
        with open('kafka_log4.csv', 'a', newline='') as f:
            writer = csv.writer(f)
            writer.writerow([timestamp, city, temperature])
    except Exception as e:
        print(f"Error processing message: {message_value} | Error: {str(e)}")
print ('finished')

Reading Kafka Broker


# Use kcat!
It's a CLI (Command Line Interface). Previously known as kafkacat


Ref: https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html

In [None]:
#kcat command: connect to local Kafka broker, specify a topic, and consume messages from the earliest offset