# Kafka Demo

## Important: Connect to Kafka Broker Server FIRST

**Before running any code in this notebook**, establish an SSH tunnel to the Kafka server:

```
ssh -L <local_port>:localhost:<remote_port> <user>@<remote_server> -NTf
```

Find connection details (remote_server, user, password, ports) on the Canvas lab 2 assignment page.

**Verify your connection is active:**
```bash
lsof -i :<local_port>  # Should show an ssh process
```

**To kill the connection when done:**
```
lsof -ti:<local_port> | xargs kill -9
```

---

## Setup

It is recommended to set up a python environment for this lab (and all other ones).
```
python -m venv <environment_name>
source <environment_name>/bin/activate  # On Windows: <environment_name>\Scripts\activate
```

Then install the requirements:
```
pip install -r requirements.txt
```
Or manually:
```
pip install kafka-python
```

In [1]:
import os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
from kafka import KafkaConsumer, KafkaProducer
from typing import Dict, Any

# [TODO]: Fill in a unique identifier so your topic doesn't collide with others'
# Replace ... with your andrew_id as a string (e.g., "asmith") or any unique identifier
andrew_id = "yuboc"  # Example: andrew_id = "asmith"
topic = f"lab02-{andrew_id}"
print(f"Topic: {topic}")


Topic: lab02-yuboc


### Producer Mode -> Writes Data to Broker

In [6]:
# Below schema is for messages. You may change the city data if you wish but it is optional.
def make_city_data(city: str, temperature_f: str) -> Dict[str, Any]:
    return {
        "city": city,
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "temperature_f": temperature_f, # temperature in fahrenheit
    }

In [7]:
# Create a producer to write data to kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# [TODO]: Fill in the address of your Kafka bootstrap server
# [TODO]: Kafka expects messages as bytes. Explore the documentation and decide how to serialize Python dict objects into bytes.
# Hint: You may want to convert your Python dict → JSON string → UTF-8 bytes.
import json

producer = KafkaProducer(bootstrap_servers=["localhost:9092"],
                        value_serializer=lambda value: json.dumps(value).encode("utf-8"))

In [12]:
# [TODO]: Add a few more examples of city data below
cities = [make_city_data("Pittsburgh", 64), make_city_data("Nanjing", 32), make_city_data("San Diego", 58)]

print("Writing to Kafka Broker")
for i in range(10):
    data = cities[randint(0,len(cities)-1)] # random selection
    producer.send(topic=topic, value=data)
    sleep(1)

producer.flush()
print(f"Data written to topic: {topic}")

Writing to Kafka Broker
Data written to topic: lab02-yuboc


### Consumer Mode -> Reads Data from Broker

In [None]:
# Create a consumer to read data from kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# [TODO]: Fill in the missing parameters:
#   1. First parameter: topic name (should match the topic you used in producer)
#   2. bootstrap_servers: same address you used in producer (e.g., ['localhost:9092'])
#   3. auto_offset_reset: try 'earliest' to read from beginning, 'latest' for new messages only
# Note: Since producer uses value_serializer, message.value is bytes. We decode and parse JSON.

consumer = KafkaConsumer(
    topic,  # [TODO]: Use your topic variable here
    bootstrap_servers=["localhost: 9092"],  # [TODO]: Same bootstrap server as producer
    auto_offset_reset='earliest',  # [TODO]: Try 'earliest', 'latest', or 'none'
    # Commit that an offset has been read
    enable_auto_commit=True,
    # How often to tell Kafka, an offset has been read
    auto_commit_interval_ms=1000
)

print('Reading Kafka Broker')
for message in consumer:
    # Producer serialized to JSON bytes, so we decode and parse
    message_str = message.value.decode('utf-8')
    message_dict = loads(message_str)
    print(message_dict)
    os.system(f"echo {message_str} >> kafka_log.csv")

# Use kcat!
It's a CLI (Command Line Interface). Previously known as kafkacat


Ref: https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html

In [None]:
# [TODO]: Use kcat to consume the first 5 messages from your topic. Use the -f flag to include the offset in the output.
# Example command structure:
# kcat -b localhost:<local_port> -t <your_topic_name> -C -o earliest -c 5 -f "%o: %s\n"
# 
# Where:
# - -b: broker address (same port as your SSH tunnel)
# - -t: your topic name (e.g., "lab02-asmith")
# - -C: consumer mode
# - -o earliest: start from earliest offset
# - -c 5: consume 5 messages
# - -f "%o: %s\n": format to show offset and message
#
# Paste the output below and explain what the offset refers to.


# 0: {"city": "Nanjing", "timestamp": "2026-01-21 21:05:32", "temperature_f": 32}
# 1: {"city": "Pittsburgh", "timestamp": "2026-01-21 21:05:32", "temperature_f": 64}
# 2: {"city": "San Diego", "timestamp": "2026-01-21 21:05:32", "temperature_f": 58}
# 3: {"city": "Pittsburgh", "timestamp": "2026-01-21 21:05:32", "temperature_f": 64}
# 4: {"city": "Pittsburgh", "timestamp": "2026-01-21 21:05:32", "temperature_f": 64}
