# Kafka Demo

### Connect to Kafka Broker Server 
```
ssh -L <local_port>:localhost:<remote_port> <user>@<remote_server> -NTf
```
Find how to connect to Kafka server on Canvas lab 2 assignment page.

### To kill connection
```
lsof -ti:<local_port> | xargs kill -9
```

### Setup
```
python -m pip install kafka-python
```

In [1]:
import os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
from kafka import KafkaConsumer, KafkaProducer

# Update this for your own recitation section :)
topic = 'recitation-x' # x could be b, c, d, e, f

### Producer Mode -> Writes Data to Broker

In [2]:
# Create a producer to write data to kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# [TODO]: Replace '...' with the address of your Kafka bootstrap server
producer = KafkaProducer(bootstrap_servers=["localhost:9092"],
                        value_serializer=lambda x: dumps(x).encode('utf-8'))

# [TODO]: Add cities of your choice
cities = ["Mumbai", "Delhi", "Hyderabad", "Kolkata", "Banglore", "Chennai", "Vizag"]

# Write data via the producer
print("Writing to Kafka Broker")
for i in range(10):
    data = f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")},{cities[randint(0,len(cities)-1)]},{randint(18, 32)}ºC'
    print(f"Writing: {data}")
    producer.send(topic=topic, value=data)
    sleep(1)

Writing to Kafka Broker
Writing: 2025-02-06 23:27:39,Kolkata,31ºC
Writing: 2025-02-06 23:27:40,Delhi,28ºC
Writing: 2025-02-06 23:27:41,Banglore,27ºC
Writing: 2025-02-06 23:27:42,Hyderabad,26ºC
Writing: 2025-02-06 23:27:43,Vizag,32ºC
Writing: 2025-02-06 23:27:44,Vizag,26ºC
Writing: 2025-02-06 23:27:45,Hyderabad,26ºC
Writing: 2025-02-06 23:27:46,Hyderabad,27ºC
Writing: 2025-02-06 23:27:47,Banglore,21ºC
Writing: 2025-02-06 23:27:48,Banglore,23ºC


### Consumer Mode -> Reads Data from Broker

In [5]:
# Create a consumer to read data from kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
topic="recitation-x"
# [TODO]: Complete the missing ... parameters/arguments using the Kafka documentation
consumer = KafkaConsumer(
    topic,
    bootstrap_servers=["localhost:9092"],
    auto_offset_reset="earliest", #Experiment with different values
    # Commit that an offset has been read
    enable_auto_commit=True,
    # How often to tell Kafka, an offset has been read
    auto_commit_interval_ms=1000,
    consumer_timeout_ms=5000 
)

print('Reading Kafka Broker')
for message in consumer:
    message = message.value.decode()
    # Default message.value type is bytes!
    print(loads(message))
    os.system(f"echo {message} >> kafka_log.csv")

# Closing consumer and producer connections
consumer.close()
producer.close()

Reading Kafka Broker
{'timestamp': '2025-02-03 16:02:00', 'city': 'Denver', 'temperature': '25ºC'}
{'timestamp': '2025-02-03 16:02:02', 'city': 'New York', 'temperature': '28ºC'}
{'timestamp': '2025-02-03 16:02:03', 'city': 'Chicago', 'temperature': '20ºC'}
{'timestamp': '2025-02-03 16:02:04', 'city': 'New Jersey', 'temperature': '23ºC'}
{'timestamp': '2025-02-03 16:02:05', 'city': 'New York', 'temperature': '25ºC'}
{'timestamp': '2025-02-03 16:02:06', 'city': 'Denver', 'temperature': '20ºC'}
{'timestamp': '2025-02-03 16:02:07', 'city': 'Denver', 'temperature': '32ºC'}
{'timestamp': '2025-02-03 16:02:08', 'city': 'Los Angeles', 'temperature': '24ºC'}
{'timestamp': '2025-02-03 16:02:09', 'city': 'Los Angeles', 'temperature': '29ºC'}
{'timestamp': '2025-02-03 16:02:10', 'city': 'Chicago', 'temperature': '19ºC'}
2025-02-03 21:21:13,tx,27ºC
2025-02-03 21:21:14,chicago,19ºC
2025-02-03 21:21:15,nyc,27ºC
2025-02-03 21:21:16,tx,28ºC
2025-02-03 21:21:17,tx,20ºC
2025-02-03 21:21:18,chicago,26ºC


# Use kcat!
It's a CLI (Command Line Interface). Previously known as kafkacat


Ref: https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html

In [7]:
#kcat command: connect to local Kafka broker, specify a topic, and consume messages from the earliest offset
kcat_command = f"kcat -b localhost:9092 -t {topic} -o beginning -C -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\nTimestamp: %T\tPartition: %p\tOffset: %o\n--\n' -e"
os.system(kcat_command)




Key (-1 bytes): 	
Value (82 bytes): {"timestamp": "2025-02-03 16:02:00", "city": "Denver", "temperature": "25\u00baC"}
Timestamp: 1738620121358	Partition: 0	Offset: 0
--

Key (-1 bytes): 	
Value (84 bytes): {"timestamp": "2025-02-03 16:02:02", "city": "New York", "temperature": "28\u00baC"}
Timestamp: 1738620122359	Partition: 0	Offset: 1
--

Key (-1 bytes): 	
Value (83 bytes): {"timestamp": "2025-02-03 16:02:03", "city": "Chicago", "temperature": "20\u00baC"}
Timestamp: 1738620123360	Partition: 0	Offset: 2
--

Key (-1 bytes): 	
Value (86 bytes): {"timestamp": "2025-02-03 16:02:04", "city": "New Jersey", "temperature": "23\u00baC"}
Timestamp: 1738620124364	Partition: 0	Offset: 3
--

Key (-1 bytes): 	
Value (84 bytes): {"timestamp": "2025-02-03 16:02:05", "city": "New York", "temperature": "25\u00baC"}
Timestamp: 1738620125367	Partition: 0	Offset: 4
--

Key (-1 bytes): 	
Value (82 bytes): {"timestamp": "2025-02-03 16:02:06", "city": "Denver", "temperature": "20\u00baC"}
Timestamp: 17386

% Reached end of topic recitation-x [0] at offset 390: exiting


0

In [9]:
os.system("kcat -b localhost:9092 -L")

Metadata for all topics (from broker 1: localhost:9092/1):
 1 brokers:
  broker 1 at localhost:9092 (controller)
 34 topics:
  topic "recitation-x" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "movielog3" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "movielog2" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "movielog1" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "recitation-abcd" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "recitation-rohan" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "movielog10" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "topic-apoorv-2" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "nutrition_facts" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "topic-apoorv-3" with 1 partitions:
    partition 0, lea

0

In [None]:
os.system("kcat -b localhost:9092 -t movielog2 -C -e > movies_data.csv")


% Reached end of topic movielog2 [0] at offset 3968: exiting


0

: 