# Kafka Demo

## Important: Connect to Kafka Broker Server FIRST

**Before running any code in this notebook**, establish an SSH tunnel to the Kafka server:

```
ssh -L <local_port>:localhost:<remote_port> <user>@<remote_server> -NTf
```

Find connection details (remote_server, user, password, ports) on the Canvas lab 2 assignment page.

**Verify your connection is active:**
```bash
lsof -i :<local_port>  # Should show an ssh process
```

**To kill the connection when done:**
```
lsof -ti:<local_port> | xargs kill -9
```

---

## Setup

It is recommended to set up a python environment for this lab (and all other ones).
```
python -m venv <environment_name>
source <environment_name>/bin/activate  # On Windows: <environment_name>\Scripts\activate
```

Then install the requirements:
```
pip install -r requirements.txt
```
Or manually:
```
pip install kafka-python
```

In [61]:
import os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
from kafka import KafkaConsumer, KafkaProducer
from typing import Dict, Any

# [TODO]: Fill in a unique identifier so your topic doesn't collide with others'
# Replace ... with your andrew_id as a string (e.g., "asmith") or any unique identifier
andrew_id = "bbasavar"  # Example: andrew_id = "asmith"
topic = f"lab02-{andrew_id}"
print(f"Topic: {topic}")

Topic: lab02-bbasavar


### Producer Mode -> Writes Data to Broker

In [62]:
# Below schema is for messages. You may change the city data if you wish but it is optional.
def make_city_data(city: str, temperature_f: str) -> Dict[str, Any]:
    return {
        "city": city,
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "temperature_f": temperature_f, # temperature in fahrenheit
    }

In [63]:
# Create a producer to write data to kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# EXPLANATION:
# 1. bootstrap_servers: Address of Kafka broker (localhost:9092 via SSH tunnel)
# 2. value_serializer: A function that converts Python objects to bytes
#    - Kafka only accepts bytes, not Python dicts
#    - This lambda function: takes a dict (v) → converts to JSON string (dumps) → encodes to UTF-8 bytes
#    - The function will be called automatically for each message you send

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],  # Already filled correctly!
    value_serializer=lambda v: dumps(v).encode('utf-8')  # Converts dict → JSON → bytes
)

In [64]:
# EXPLANATION: We create a list of city temperature data. The producer will randomly select
# one of these cities and send it to Kafka. This simulates real-time streaming data.
# [TODO]: Add a few more examples of city data below
cities = [
    make_city_data("Pittsburgh", 64),
    make_city_data("New York", 72),
    make_city_data("Los Angeles", 78),
    make_city_data("Chicago", 58),
    make_city_data("Mumbai", 84)
]

print("Writing to Kafka Broker")
# EXPLANATION: We send 10 messages, one per second. Each message contains random city data.
# producer.send() is asynchronous - it queues the message but doesn't wait for confirmation.
# producer.flush() ensures all queued messages are actually sent before we continue.
for i in range(10):
    data = cities[randint(0, len(cities)-1)]  # Random selection from our cities list
    producer.send(topic=topic, value=data)
    sleep(1)

producer.flush()  # Wait for all messages to be sent
print(f"Data written to topic: {topic}")

Writing to Kafka Broker
Data written to topic: lab02-bbasavar


### Consumer Mode -> Reads Data from Broker

In [None]:
# Create a consumer to read data from kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# EXPLANATION:
# 1. topic: Must match the topic name used in the producer (already correct - uses topic variable)
# 2. bootstrap_servers: Same address as producer (localhost:9092 via SSH tunnel)
# 3. auto_offset_reset: Controls where to start reading when no offset is stored:
#    - 'earliest': Read from the beginning of the topic (all messages) - USE THIS to see all data
#    - 'latest': Only read new messages arriving after consumer starts
#    - 'none': Throw error if no offset is found
# 4. enable_auto_commit: Automatically saves the offset after reading (ensures message continuity)
# 5. auto_commit_interval_ms: How often to commit offsets (every 1 second)

consumer = KafkaConsumer(
    topic,
    bootstrap_servers=['localhost:9092'],
    group_id='lab02-consumer-group-v4',  # ← ADD THIS!
    auto_offset_reset='earliest', 
    enable_auto_commit=True,
    auto_commit_interval_ms=1000,
)

print('Reading Kafka Broker')
# EXPLANATION: The consumer is an iterator that blocks and waits for messages.
# For each message received:
# 1. Decode bytes to UTF-8 string
# 2. Parse JSON string to Python dictionary
# 3. Print the message
# 4. Append to kafka_log.csv file
# Note: This loop will run indefinitely. Press Ctrl+C to stop.
for message in consumer:
    # Producer serialized to JSON bytes, so we decode and parse
    message_str = message.value.decode('utf-8')
    message_dict = loads(message_str)
    # print(message_dict)
    os.system(f"echo {message_str} >> kafka_log.csv")

Reading Kafka Broker


KeyboardInterrupt: 

# Use kcat!
It's a CLI (Command Line Interface). Previously known as kafkacat


Ref: https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html

In [None]:
# Using kcat CLI Tool to Consume Messages

## Step 1: Install kcat (if not already installed)
```bash
# macOS:
brew install kcat

# Ubuntu/Debian:
sudo apt-get install kcat
```

## Step 2: Run the kcat command
**IMPORTANT**: Make sure your SSH tunnel is active before running this command!

Run this command in your terminal (replace `9092` with your actual SSH tunnel port if different):

```bash
kcat -b localhost:9092 -t lab02-bbasavar -C -o earliest -c 5 -f "%o: %s\n"
```

## Command Parameter Breakdown:

| Parameter | Explanation |
|-----------|-------------|
| `-b localhost:9092` | **Broker address** - Same port as your SSH tunnel (typically 9092) |
| `-t lab02-bbasavar` | **Topic name** - Your specific topic (from Cell 1) |
| `-C` | **Consumer mode** - Read messages from Kafka |
| `-o earliest` | **Offset reset** - Start from the beginning of the topic |
| `-c 5` | **Count** - Consume exactly 5 messages then exit |
| `-f "%o: %s\n"` | **Format** - `%o` = offset number, `%s` = message content |

## What is an Offset?

**Offset** is a unique sequential number (starting from 0) assigned to each message in a Kafka topic partition. Think of it as:
- A **bookmark** or **position marker** in the message stream
- A way for Kafka to track "where you are" in reading messages
- **Message continuity**: If a consumer disconnects and reconnects, Kafka can resume from the last committed offset
- This ensures you don't lose messages or accidentally re-read old ones

**Example**: If you read messages with offsets 0, 1, 2, 3, 4 and then disconnect, when you reconnect with `auto_offset_reset='earliest'`, Kafka will start from offset 5 (the next unread message) if you're using the same consumer group.

## Expected Output Format:
```
0: {"city": "Los Angeles", "timestamp": "2026-01-21 18:44:34", "temperature_f": 78}
1: {"city": "Chicago", "timestamp": "2026-01-21 18:44:34", "temperature_f": 58}
2: {"city": "Chicago", "timestamp": "2026-01-21 18:44:34", "temperature_f": 58}
3: {"city": "Pittsburgh", "timestamp": "2026-01-21 18:44:34", "temperature_f": 64}
4: {"city": "Los Angeles", "timestamp": "2026-01-21 18:44:34", "temperature_f": 78}
```

## Paste Your Actual Output Here:
```
[Run the command above and paste the output here]
```

## Explanation of Offset (for your lab report):
[Write your explanation here about what the offset refers to and why it's important for message continuity]

SyntaxError: unterminated string literal (detected at line 38) (3531051564.py, line 38)