# Big Data Technologies - Big Data Applications: Kafka

## Task 1

#### **Kafka and Zookeeper installation via Docker (from Notebook)**

##### 1. Zookeeper: 

In [13]:
# Run Zookeeper 
!docker run -d --name zookeeper -p 2181:2181 -e ZOOKEEPER_CLIENT_PORT=2181 confluentinc/cp-zookeeper:7.5.0

d14acc86cae730d5b40cee292864db24ecb70fa23dad7cc1fbdb0fbce5bc7329


##### 2. Kafka (linked to Zookeeper):

In [14]:
# Run Kafka (linked to Zookeeper)
!docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -e KAFKA_DELETE_TOPIC_ENABLE=true --link zookeeper confluentinc/cp-kafka:7.5.0

ecd4b051d1365447b3d89e84e9bfeeff240044e367043c3bc7069441fcf7f2e4


#### **If needed, use the following cell to start the container(s):**

In [15]:
# Start container(s) if needed
!docker start zookeeper
!docker start kafka

zookeeper
kafka


In [16]:
# List running containers: should include Zookeeper and Kafka
!docker ps

CONTAINER ID   IMAGE                             COMMAND                  CREATED          STATUS          PORTS                                        NAMES
ecd4b051d136   confluentinc/cp-kafka:7.5.0       "/etc/confluent/dock…"   30 seconds ago   Up 30 seconds   0.0.0.0:9092->9092/tcp                       kafka
d14acc86cae7   confluentinc/cp-zookeeper:7.5.0   "/etc/confluent/dock…"   34 seconds ago   Up 34 seconds   2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp   zookeeper


## Task 2

#### **Kafka Topic Creation**

Create the topic "measurements" via Docker (from Notebook):

In [17]:
# Create topic "measurements" if it doesn't exist already
# Creates the topic from within the notebook using the Kafka CLI inside the container
!docker exec kafka kafka-topics --create --topic measurements --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 --if-not-exists

#### **Show that topic now exists in the Kafka container:**

In [18]:
!docker exec kafka kafka-topics --list --bootstrap-server localhost:9092

__consumer_offsets
measurements


## Task 3

#### **Install the Kafka Python library:**

In [19]:
# Install kafka-python if not already installed
!pip install kafka-python

Collecting kafka-python
  Using cached kafka_python-2.2.10-py2.py3-none-any.whl.metadata (10.0 kB)
Using cached kafka_python-2.2.10-py2.py3-none-any.whl (309 kB)
Installing collected packages: kafka-python
Successfully installed kafka-python-2.2.10


#### **Import all relevant packages, which are used for the following steps:**

In [20]:
from kafka import KafkaProducer, KafkaConsumer
from datetime import datetime
import time
import json
import random
import pandas as pd

#### **Kafka Python Client Connection Test**

To test the Kafka broker within this notebook, a producer and a consumer are created:
- The producer sent a simple test message ("Test message from Jupyter") to the topic "measurements".
- The consumer subscribed to the same topic and printed out the received message.

This will confirm that the Kafka setup and Python client are working correctly.

In [21]:
# Create a Kafka producer instance
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Send test message in JSON format
test_message = {
    "sensor": "test",
    "value": "Test message from Jupyter",
    "timestamp": datetime.now().isoformat()
}
future = producer.send("measurements", test_message)

# Get delivery acknowledgment from the Kafka broker
result = future.get(timeout=10)
print(f"Message sent and acknowledged by Kafka: {result}")

Message sent and acknowledged by Kafka: RecordMetadata(topic='measurements', partition=0, topic_partition=TopicPartition(topic='measurements', partition=0), offset=0, timestamp=1748863942802, checksum=None, serialized_key_size=-1, serialized_value_size=99, serialized_header_size=-1)


In [22]:
# Create a Kafka consumer instance
consumer = KafkaConsumer(
    'measurements',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='test-group'
)

print("Messages from 'measurements' topic:")
for message in consumer:
    print(f"{message.value.decode('utf-8')}")
    break  # Just read one message for testing


Messages from 'measurements' topic:
{"sensor": "test", "value": "Test message from Jupyter", "timestamp": "2025-06-02T13:32:22.669518"}


## Task 4

#### **Random Measurement Generator**

This producer simulates sensor measurements by periodically generating random data and sending it to the Kafka topic "measurements".

Each message includes:
- A sensor type (temperature)
- A random value
- A timestamp

The data is structured as a JSON object and encoded in UTF-8 before being sent to Kafka.

In [23]:
# Create Kafka producer
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # auto JSON encode
)

# Function to generate a random measurement
def rand_temp_generator(min_temp=0.0, max_temp=30.0):
    return {
        "sensor": "temperature",
        "value": round(random.uniform(min_temp, max_temp), 2), 
        "unit": "°C",
        "timestamp": datetime.now().isoformat()
    }

# Send 100 measurements with 0.2 second intervals
for _ in range(100):
    measurement = rand_temp_generator()
    producer.send("measurements", measurement)
    print(f"Sent: {measurement}")
    time.sleep(0.2)

producer.flush()
print("All measurements sent.")

Sent: {'sensor': 'temperature', 'value': 8.92, 'unit': '°C', 'timestamp': '2025-06-02T13:32:26.839797'}
Sent: {'sensor': 'temperature', 'value': 29.56, 'unit': '°C', 'timestamp': '2025-06-02T13:32:27.162315'}
Sent: {'sensor': 'temperature', 'value': 24.25, 'unit': '°C', 'timestamp': '2025-06-02T13:32:27.364947'}
Sent: {'sensor': 'temperature', 'value': 29.48, 'unit': '°C', 'timestamp': '2025-06-02T13:32:27.566260'}
Sent: {'sensor': 'temperature', 'value': 19.21, 'unit': '°C', 'timestamp': '2025-06-02T13:32:27.769791'}
Sent: {'sensor': 'temperature', 'value': 18.77, 'unit': '°C', 'timestamp': '2025-06-02T13:32:27.970788'}
Sent: {'sensor': 'temperature', 'value': 3.39, 'unit': '°C', 'timestamp': '2025-06-02T13:32:28.173204'}
Sent: {'sensor': 'temperature', 'value': 29.48, 'unit': '°C', 'timestamp': '2025-06-02T13:32:28.374201'}
Sent: {'sensor': 'temperature', 'value': 13.84, 'unit': '°C', 'timestamp': '2025-06-02T13:32:28.576981'}
Sent: {'sensor': 'temperature', 'value': 24.29, 'unit': '

## Task 5

#### **Kafka Consumer**

This consumer subscribes to the topic "measurements" and reads JSON-encoded messages produced by the temperature sensor simulator.

Each message is parsed and displayed as a structured record. The data includes:
- Sensor type
- Measurement value
- Unit
- Timestamp

For better readability, the data is converted into a Pandas DataFrame.

In [24]:
# Create Kafka consumer
consumer = KafkaConsumer(
    'measurements',                         # Topic to subscribe to
    bootstrap_servers='localhost:9092',     # Kafka broker address
    auto_offset_reset='earliest',           # Start from the beginning if no offset is committed
    enable_auto_commit=True,                # Automatically commit read offsets
    group_id='measurement-consumer-group',  # Consumer group ID (helps track offsets)
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),  # Decode and parse JSON messages
    consumer_timeout_ms=10000               # Stop after 10 seconds of no new messages
)

# Read and collect messages 
records = []
for i, message in enumerate(consumer):
    records.append(message.value)
    print(f"Received: {message.value}")

# Display as DataFrame
measurements_df = pd.DataFrame(records)
measurements_df

Received: {'sensor': 'test', 'value': 'Test message from Jupyter', 'timestamp': '2025-06-02T13:32:22.669518'}
Received: {'sensor': 'temperature', 'value': 8.92, 'unit': '°C', 'timestamp': '2025-06-02T13:32:26.839797'}
Received: {'sensor': 'temperature', 'value': 29.56, 'unit': '°C', 'timestamp': '2025-06-02T13:32:27.162315'}
Received: {'sensor': 'temperature', 'value': 24.25, 'unit': '°C', 'timestamp': '2025-06-02T13:32:27.364947'}
Received: {'sensor': 'temperature', 'value': 29.48, 'unit': '°C', 'timestamp': '2025-06-02T13:32:27.566260'}
Received: {'sensor': 'temperature', 'value': 19.21, 'unit': '°C', 'timestamp': '2025-06-02T13:32:27.769791'}
Received: {'sensor': 'temperature', 'value': 18.77, 'unit': '°C', 'timestamp': '2025-06-02T13:32:27.970788'}
Received: {'sensor': 'temperature', 'value': 3.39, 'unit': '°C', 'timestamp': '2025-06-02T13:32:28.173204'}
Received: {'sensor': 'temperature', 'value': 29.48, 'unit': '°C', 'timestamp': '2025-06-02T13:32:28.374201'}
Received: {'sensor':

Unnamed: 0,sensor,value,timestamp,unit
0,test,Test message from Jupyter,2025-06-02T13:32:22.669518,
1,temperature,8.92,2025-06-02T13:32:26.839797,°C
2,temperature,29.56,2025-06-02T13:32:27.162315,°C
3,temperature,24.25,2025-06-02T13:32:27.364947,°C
4,temperature,29.48,2025-06-02T13:32:27.566260,°C
...,...,...,...,...
96,temperature,22.78,2025-06-02T13:32:46.173970,°C
97,temperature,28.2,2025-06-02T13:32:46.375315,°C
98,temperature,28.13,2025-06-02T13:32:46.576658,°C
99,temperature,24.57,2025-06-02T13:32:46.777777,°C


## Task 6

Delivered via PDF file.