# Assignment 8: Streaming Data and Kafka

Installing and importing the required packages

In [1]:
!pip install kafka-python

Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl.metadata (7.8 kB)
Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2


In [2]:
import os
from google.colab import userdata
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
import pandas as pd

Setting up Kafka

In [3]:
# prompt: untar the tgz file

!tar -xvzf filename.tgz

tar (child): filename.tgz: Cannot open: No such file or directory
tar (child): Error is not recoverable: exiting now
tar: Child returned status 2
tar: Error is not recoverable: exiting now


In [4]:
!curl -sSOL https://downloads.apache.org/kafka/3.7.1/kafka_2.12-3.7.1.tgz
!tar -xvzf kafka_2.12-3.7.1.tgz

kafka_2.12-3.7.1/
kafka_2.12-3.7.1/LICENSE
kafka_2.12-3.7.1/NOTICE
kafka_2.12-3.7.1/bin/
kafka_2.12-3.7.1/bin/kafka-delete-records.sh
kafka_2.12-3.7.1/bin/trogdor.sh
kafka_2.12-3.7.1/bin/kafka-jmx.sh
kafka_2.12-3.7.1/bin/connect-mirror-maker.sh
kafka_2.12-3.7.1/bin/kafka-console-consumer.sh
kafka_2.12-3.7.1/bin/kafka-consumer-perf-test.sh
kafka_2.12-3.7.1/bin/kafka-log-dirs.sh
kafka_2.12-3.7.1/bin/kafka-metadata-quorum.sh
kafka_2.12-3.7.1/bin/zookeeper-server-stop.sh
kafka_2.12-3.7.1/bin/kafka-verifiable-consumer.sh
kafka_2.12-3.7.1/bin/kafka-features.sh
kafka_2.12-3.7.1/bin/kafka-acls.sh
kafka_2.12-3.7.1/bin/zookeeper-server-start.sh
kafka_2.12-3.7.1/bin/kafka-server-stop.sh
kafka_2.12-3.7.1/bin/kafka-configs.sh
kafka_2.12-3.7.1/bin/kafka-reassign-partitions.sh
kafka_2.12-3.7.1/bin/connect-plugin-path.sh
kafka_2.12-3.7.1/bin/kafka-leader-election.sh
kafka_2.12-3.7.1/bin/kafka-producer-perf-test.sh
kafka_2.12-3.7.1/bin/kafka-transactions.sh
kafka_2.12-3.7.1/bin/kafka-topics.sh
kafka_2.

In [5]:
!ls -ltrh

total 115M
drwxr-xr-x 7 root root 4.0K Jun 18 21:32 kafka_2.12-3.7.1
drwxr-xr-x 1 root root 4.0K Oct 24 13:20 sample_data
-rw-r--r-- 1 root root 115M Oct 26 00:38 kafka_2.12-3.7.1.tgz


In [6]:
!./kafka_2.12-3.7.1/bin/zookeeper-server-start.sh -daemon ./kafka_2.12-3.7.1/config/zookeeper.properties
!./kafka_2.12-3.7.1/bin/kafka-server-start.sh -daemon ./kafka_2.12-3.7.1/config/server.properties
!echo "Give the processes 10 seconds to start before proceeding."
!sleep 10

Give the processes 10 seconds to start before proceeding.


Checking if Kafka is running or not?

In [7]:
!ps -ef | grep java

root         899       1 22 00:38 ?        00:00:02 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:
root        1312       1 84 00:38 ?        00:00:08 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxG
root        1441     277  0 00:38 ?        00:00:00 /bin/bash -c ps -ef | grep java
root        1443    1441  0 00:38 ?        00:00:00 grep java


In [8]:
!./kafka_2.12-3.7.1/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic sample-streaming-data

Created topic sample-streaming-data.


In [9]:
!./kafka_2.12-3.7.1/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic sample-streaming-data

Topic: sample-streaming-data	TopicId: YMsQDkyORpuq8Kp67Cx9aw	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: sample-streaming-data	Partition: 0	Leader: 0	Replicas: 0	Isr: 0


## Task I - Create a new Kafka topic and publish data into it as a Producer.

In [10]:
import requests

In [11]:
api_key = userdata.get('open_weather')

In [12]:
city = "Draper"

url = f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}&units=imperial"

KAFKA_TOPIC = "temp"  # Kafka topic to store data

In [13]:
def error_callback(exc):
    raise Exception('Error while sending data to Kafka: {0}'.format(str(exc)))

In [14]:
def write_to_kafka(topic_name, items):
    producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
    count = 0
    for message, key in items:
        producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8'), partition=0).add_errback(error_callback)
        count += 1
    producer.flush()
    print(f"Wrote {count} messages into topic: {topic_name}")

# Produce weather data every 5 seconds for 5 minutes
def produce_weather_data():
    items = []
    for _ in range(60):  # Fetch data every 5 seconds for 5 minutes
        response = requests.get(url)
        data = response.json()
        current_temp = data['main']['temp']
        timestamp = str(time.time())

        # Add message to the list
        items.append((str(current_temp), timestamp))
        print(f"Produced: {timestamp} - {current_temp}")

        time.sleep(5)

    write_to_kafka(KAFKA_TOPIC, items)

produce_weather_data()

Produced: 1729903132.7834206 - 56.21
Produced: 1729903137.831676 - 56.21
Produced: 1729903142.8775756 - 56.21
Produced: 1729903147.9256742 - 56.21
Produced: 1729903152.9713817 - 56.21
Produced: 1729903158.0173013 - 56.21
Produced: 1729903163.0804596 - 56.21
Produced: 1729903168.1263874 - 56.21
Produced: 1729903173.1690884 - 56.21
Produced: 1729903178.2158055 - 56.21
Produced: 1729903183.261487 - 56.21
Produced: 1729903188.3073165 - 56.21
Produced: 1729903193.3963296 - 56.21
Produced: 1729903198.4397647 - 56.21
Produced: 1729903203.4855978 - 56.21
Produced: 1729903208.5313141 - 56.21
Produced: 1729903213.5779002 - 56.21
Produced: 1729903218.6238618 - 56.21
Produced: 1729903223.7365322 - 56.07
Produced: 1729903228.782333 - 56.07
Produced: 1729903233.8292758 - 56.07
Produced: 1729903238.8731556 - 56.07
Produced: 1729903243.9194744 - 56.07
Produced: 1729903248.9648213 - 56.07
Produced: 1729903254.0151389 - 56.21
Produced: 1729903259.1013887 - 56.21
Produced: 1729903264.1490302 - 56.21
Prod

## Task II - Extract data from the Kafka stream as a Consumer.

In [15]:
from kafka import KafkaConsumer

# Kafka consumer configuration
bootstrap_servers = ['127.0.0.1:9092']
group_id = 'weather_consumer'
topic_name = KAFKA_TOPIC  # The same topic created in Task I

# Initialize Kafka consumer
consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=bootstrap_servers,
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id=group_id,
    # Decodes the message value
    value_deserializer=lambda x: x.decode('utf-8'),
    # Decodes the message key
    key_deserializer=lambda x: x.decode('utf-8')
)

# Read messages and store them in a list
def consume_weather_data(message_count=60):
    messages = []
    try:
        for _ in range(message_count):
            message = next(consumer, None)  # Use a default None if no message is found
            if message is None:  # Exit if no more messages are received
                print("No more messages available.")
                break
            timestamp, temperature = message.key, float(message.value)
            print(f"Consumed: {timestamp} - {temperature}")
            messages.append((timestamp, temperature))
    finally:
        consumer.close()
    return messages

# Consume data from Kafka
weather_data = consume_weather_data()

Consumed: 1729903132.7834206 - 56.21
Consumed: 1729903137.831676 - 56.21
Consumed: 1729903142.8775756 - 56.21
Consumed: 1729903147.9256742 - 56.21
Consumed: 1729903152.9713817 - 56.21
Consumed: 1729903158.0173013 - 56.21
Consumed: 1729903163.0804596 - 56.21
Consumed: 1729903168.1263874 - 56.21
Consumed: 1729903173.1690884 - 56.21
Consumed: 1729903178.2158055 - 56.21
Consumed: 1729903183.261487 - 56.21
Consumed: 1729903188.3073165 - 56.21
Consumed: 1729903193.3963296 - 56.21
Consumed: 1729903198.4397647 - 56.21
Consumed: 1729903203.4855978 - 56.21
Consumed: 1729903208.5313141 - 56.21
Consumed: 1729903213.5779002 - 56.21
Consumed: 1729903218.6238618 - 56.21
Consumed: 1729903223.7365322 - 56.07
Consumed: 1729903228.782333 - 56.07
Consumed: 1729903233.8292758 - 56.07
Consumed: 1729903238.8731556 - 56.07
Consumed: 1729903243.9194744 - 56.07
Consumed: 1729903248.9648213 - 56.07
Consumed: 1729903254.0151389 - 56.21
Consumed: 1729903259.1013887 - 56.21
Consumed: 1729903264.1490302 - 56.21
Cons

## Task III - Convert the messages to a format for adding to a sqlite database table

In [21]:
import sqlite3

# Initialize SQLite database connection
conn = sqlite3.connect("weather_data.db")
cursor = conn.cursor()

# Create table if not exists
cursor.execute('''CREATE TABLE IF NOT EXISTS weather (timestamp TEXT, temperature REAL)''')
conn.commit()

# Insert consumed data into SQLite database
def store_data_in_db(data):
    cursor.executemany("INSERT INTO weather (timestamp, temperature) VALUES (?, ?)", data)
    conn.commit()

# Store the consumed data
store_data_in_db(weather_data)

# Viewing the first few rows of the data
pd.read_sql_query("SELECT * FROM weather LIMIT 5", conn)

Unnamed: 0,timestamp,temperature
0,1729903132.7834206,56.21
1,1729903137.831676,56.21
2,1729903142.8775756,56.21
3,1729903147.9256742,56.21
4,1729903152.9713817,56.21


## Task IV - SQL summarization

In [22]:
def summarize_data():
    cursor.execute("SELECT AVG(temperature) FROM weather")
    avg_temp = cursor.fetchone()[0]
    print(f"Average Temperature of Draper over the last 5 minutes: {avg_temp}°F")

summarize_data()

# Close SQLite connection
conn.close()

Average Temperature of Draper over the last 5 minutes: 56.195999999999835°F


In [24]:
# Mount Google Drive
import os
from google.colab import drive
drive.mount('/content/drive')

!cp "/content/drive/MyDrive/Colab Notebooks/de_lab_8_Ahmad_Ahsan.ipynb" ./
!jupyter nbconvert --to html "de_lab_8_Ahmad_Ahsan.ipynb"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
[NbConvertApp] Converting notebook de_lab_8_Ahmad_Ahsan.ipynb to html
[NbConvertApp] Writing 640462 bytes to de_lab_8_Ahmad_Ahsan.html
