# Assignment 5 - Kafka
#### Stefanela Stevanović

## Problem description

In this assignment, we use Apache Kafka to process meteorological data from three US cities: Austin, Bronte, and Spokane. The dataset contains a variety of meteorological measurements collected every 5 minutes. However, for this assignment, we are specifically interested in analyzing the air temperature data. The goal of this assignment is to simulate real-time data streaming and perform the following tasks:

* Compute the hourly temperature (hourly mean) for each station.
* Stream temperature data from the three stations and report the station with the highest hourly temperature using subhourly data.
* Implement an algorithm to detect outliers in the temperature data stream.


To achieve these goals, we will set up Kafka producers and consumers, and implement data processing using Faust.

## Getting Started

Firstly, we import all necessary libraries to do this project. We need 'confluent_kafka' for writting Kafka Producer and Consumer and 'threading' to be able to run them in parallel.

In [1]:
# Importing libraries

import confluent_kafka as kafka, socket
from confluent_kafka import KafkaError
import os, socket
import pandas as pd
import json
import time
from datetime import datetime
import threading
from queue import Queue
import logging

logging.basicConfig(level=logging.INFO)

## Defining Kafka Producer

First, we define the producer configuration and specify the paths to the data files. We then implement the 'run_producer' function, which will sequentially send data to the Kafka InputData topic from three text files, each corresponding to a different city: Austin, Bronte, and Spokane.

In [2]:
# Defining producer configuration
producer = kafka.Producer({'bootstrap.servers': "localhost:29092",
                  'client.id': socket.gethostname()})

# Defining path to the data
data_path = './data/'
filenames = ["Austin_data.txt", "Bronte_data.txt", "Spokane_data.txt"]

columns = [
    "WBANNO", "UTC_DATE", "UTC_TIME", "LST_DATE", "LST_TIME", "CRX_VN",
    "LONGITUDE", "LATITUDE", "AIR_TEMPERATURE", "PRECIPITATION",
    "SOLAR_RADIATION", "SR_FLAG", "SURFACE_TEMPERATURE", "ST_TYPE",
    "ST_FLAG", "RELATIVE_HUMIDITY", "RH_FLAG", "SOIL_MOISTURE_5",
    "SOIL_TEMPERATURE_5", "WETNESS", "WET_FLAG", "WIND_1_5", "WIND_FLAG"
]

# Function for printing error message
def error_report(err, msg):
    if err is not None:
        logging.error(f"Message delivery failed: {err}")

# Function for running the producer
def run_producer():
    for filename in filenames:
        file_path = os.path.join(data_path, filename)
        if os.path.isfile(file_path):
            df = pd.read_csv(file_path, delimiter=r'\s+', names=columns)
            for _, row in df.iterrows():
                data = row.to_dict()
                data['source_file'] = filename  
                producer.produce('InputData', key=None, value=json.dumps(data), callback=error_report)
                producer.poll(0)
            
    producer.flush()
    logging.info("All messages delivered to InputData Topic!")

## Stream Processing with Faust

We will use Faust to process data from the 'InputData' topic, which is populated by our producer. The code for this processing is contained in a separate Python script, 'faust_app.'py', as it will be run from the terminal in parallel with the Producer and Consumer. Below, we will highlight some important sections from that script.

Additionally, we have created two more Kafka topics: 'Output' and 'Outliers'. These topics are specified in our 'faust_app.py' script and will be used to store the processed data from the Faust application.

```python

# Defining Faust app
app = faust.App('temperature-stream', broker='kafka://localhost:29092')

# Defining the input and output Kafka topics
input_topic = app.topic('InputData', value_type=bytes)
output_topic = app.topic('Output', value_type=bytes)
outliers_topic = app.topic('Outliers', value_type=bytes)

Next, we define a stream processor using the @agent decorator to consume data from the Kafka 'InputData' topic. The processor will detect outliers and compute the mean hourly air temperature for each station and each date. Temperatures below -60°C and above 60°C are considered outliers. If an outlier is detected, it is excluded from the calculation of the mean hourly temperature. Outlier data is written to the 'Outliers' topic, while the mean hourly air temperature for each station is written to the 'Output' topic. The code section for this part is shown below.

```python

# Function for detecting outliers
def detect_outliers(record):
    if record.AIR_TEMPERATURE < -60 or record.AIR_TEMPERATURE > 60:
        outlier = OutlierRecord(
            station=record.WBANNO,
            lst_date=f"{str(record.LST_DATE)[:4]}-{str(record.LST_DATE)[4:6]}-{str(record.LST_DATE)[6:]}",
            lst_time=f"{str(record.LST_TIME)[:2]}:{str(record.LST_TIME)[2:]}",
            temp=record.AIR_TEMPERATURE,
            source_file = record.source_file
        )
        return outlier
    return None

# Function for computing mean temperature 
def compute_mean_temp(station, date, hour, temps):
    mean_temp = sum(temps) / len(temps)
    hourly_temp = HourlyTemperature(
        station=station,
        lst_date=f"{str(date)[:4]}-{str(date)[4:6]}-{str(date)[6:]}",
        lst_hour=hour,
        mean_temp=mean_temp
    )
    return hourly_temp


@app.agent(input_topic)
async def process_temperature(records):
    current_date = None
    current_hour = None
    current_station = None
    temps = []
    async for record in records:
        record = InputRecord(**record)

        # Check if record is an outlier and skip the rest of the loop if it is
        outlier = detect_outliers(record)
        if outlier:
            await outliers_topic.send(value=outlier)
            continue
        
        station = record.WBANNO
        date = record.LST_DATE
        lst_time = f"{record.LST_TIME:04d}"
        hour = lst_time[:2]
        temp = record.AIR_TEMPERATURE

        # Check if the hour or station has changed and compute hourly mean temperature
        if (current_station is not None and (hour != current_hour or date != current_date)):
            hourly_temp = compute_mean_temp(current_station, current_date, current_hour, temps)
            await output_topic.send(value=hourly_temp)
            temps = []

        # Update current hour, date and station
        current_hour = hour
        current_station = station
        current_date =  date

        # Append the temperature for current record to the list
        temps.append(temp)

    hourly_temp = compute_mean_temp(current_station, current_date, current_hour, temps)
    await output_topic.send(value=hourly_temp)

## Defining Kafka Consumer

Next we define a Kafka consumer function, 'run_consumer', for reading the messages from a specified Kafka topic. The consumer is configured to start reading from the earliest available message. Inside the main loop, the consumer polls for messages and can output the messages to the console (if 'print_output' argument is set to True) or store them in a queue (if 'queue_storage' is specified). Additionally, the loop can be interrupted by an external stop flag or automatically if no messages are received for a certain number of consecutive polls in the 'Output' topic.

In [3]:
# Consumer function
def run_consumer(topic_name, print_output=True, queue_storage=None, stop_flag=None):
    
    consumer = kafka.Consumer({
        'bootstrap.servers': 'localhost:29092',
        'client.id': socket.gethostname(),
        'group.id': 'test_group',
        'auto.offset.reset': 'earliest'})
    consumer.subscribe([topic_name])
    logging.info(f"Consumer subscribed to topic: {topic_name}")

    if print_output==True:
        print(f"Received messages from {topic_name}:")

    try:
        count = 0
        while True:
            # Break out of the loop if stop_flag is set
            if stop_flag and stop_flag.is_set():
                break
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                # Increment count if no message received
                count += 1
                # Break out of the loop if no message is received for 10 consecutive polls in Output topic
                if (topic_name == 'Output' or topic_name == 'MaxTemp') and count > 10:
                    logging.info('No message received.')
                    break
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    logging.error('Error occurred: {}'.format(msg.error()))
                    print(msg.error())
                    break
            else:
                data = json.loads(msg.value().decode('utf-8'))
                if print_output==True:
                    print(f"{data}")
                if queue_storage:
                    queue_storage.put(data)
                count = 0

    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()
        logging.info(f"Consumer for {topic_name} closed")

## Running Producer and Consumer

To simulate real-time data streaming and processing, we need to simultaneously run the 'faust_app.py' script, the Producer, and the Consumer. We leverage Python's threading library to run the Producer and Consumer threads concurrently within a Jupyter Notebook, while the faust_app.py script is executed from the terminal. Since there are two Consumers that write to different topics, Output and Outliers, we will configure the Consumer that writes mean hourly temperatures to the Output topic to print the data. The Consumer that writes to the Outliers topic will be configured to store the data in a queue. This approach ensures that the outputs from these Consumers do not get mixed.

In [6]:
# Defining the queue for storing outliers
outliers_queue = Queue()

# Define stop flag
stop_flag = threading.Event()

# Creating threads for one producer and 2 consumers
producer_thread = threading.Thread(target=run_producer)
consumer_thread_1 = threading.Thread(target=run_consumer, args=('Output', True, None, None))
consumer_thread_2 = threading.Thread(target=run_consumer, args=('Outliers', False, outliers_queue, stop_flag))

# Run producer and consumer threads in parallel
producer_thread.start()
consumer_thread_1.start()
consumer_thread_2.start()

producer_thread.join()
consumer_thread_1.join()

# Set the stop flag to stop the second consumer thread when first consumer flags finishes
stop_flag.set()

consumer_thread_2.join()

INFO:root:Consumer subscribed to topic: Output
INFO:root:Consumer subscribed to topic: Outliers


Received messages from Output:
{'station': 23907, 'lst_date': '2020-12-31', 'lst_hour': '18', 'mean_temp': 0.7181818181818181, '__faust': {'ns': 'faust_app.HourlyTemperature'}}
{'station': 23907, 'lst_date': '2020-12-31', 'lst_hour': '19', 'mean_temp': 0.5333333333333333, '__faust': {'ns': 'faust_app.HourlyTemperature'}}
{'station': 23907, 'lst_date': '2020-12-31', 'lst_hour': '20', 'mean_temp': 0.3916666666666666, '__faust': {'ns': 'faust_app.HourlyTemperature'}}
{'station': 23907, 'lst_date': '2020-12-31', 'lst_hour': '21', 'mean_temp': 0.46666666666666673, '__faust': {'ns': 'faust_app.HourlyTemperature'}}
{'station': 23907, 'lst_date': '2020-12-31', 'lst_hour': '22', 'mean_temp': 0.43333333333333335, '__faust': {'ns': 'faust_app.HourlyTemperature'}}
{'station': 23907, 'lst_date': '2020-12-31', 'lst_hour': '23', 'mean_temp': 0.29999999999999993, '__faust': {'ns': 'faust_app.HourlyTemperature'}}
{'station': 23907, 'lst_date': '2021-01-01', 'lst_hour': '00', 'mean_temp': 0.274999999999

INFO:root:All messages delivered to InputData Topic!


{'station': 3072, 'lst_date': '2021-01-07', 'lst_hour': '03', 'mean_temp': 0.8166666666666668, '__faust': {'ns': 'faust_app.HourlyTemperature'}}
{'station': 3072, 'lst_date': '2021-01-07', 'lst_hour': '04', 'mean_temp': 0.5666666666666668, '__faust': {'ns': 'faust_app.HourlyTemperature'}}
{'station': 3072, 'lst_date': '2021-01-07', 'lst_hour': '05', 'mean_temp': -0.15000000000000002, '__faust': {'ns': 'faust_app.HourlyTemperature'}}
{'station': 3072, 'lst_date': '2021-01-07', 'lst_hour': '06', 'mean_temp': -2.275, '__faust': {'ns': 'faust_app.HourlyTemperature'}}
{'station': 3072, 'lst_date': '2021-01-07', 'lst_hour': '07', 'mean_temp': -2.8249999999999997, '__faust': {'ns': 'faust_app.HourlyTemperature'}}
{'station': 3072, 'lst_date': '2021-01-07', 'lst_hour': '08', 'mean_temp': -0.8083333333333335, '__faust': {'ns': 'faust_app.HourlyTemperature'}}
{'station': 3072, 'lst_date': '2021-01-07', 'lst_hour': '09', 'mean_temp': 2.725, '__faust': {'ns': 'faust_app.HourlyTemperature'}}
{'stat

INFO:root:No message received.
INFO:root:Consumer for Output closed
INFO:root:Consumer for Outliers closed


In [7]:
# Print collected outliers from Outliers topic
print("Outliers Data:")
while not outliers_queue.empty():
    outlier = outliers_queue.get()
    print(outlier)

Outliers Data:
{'station': 23907, 'lst_date': '2021-01-06', 'lst_time': '11:25', 'temp': -9999.0, 'source_file': 'Austin_data.txt', '__faust': {'ns': 'faust_app.OutlierRecord'}}
{'station': 23907, 'lst_date': '2021-01-06', 'lst_time': '11:30', 'temp': -9999.0, 'source_file': 'Austin_data.txt', '__faust': {'ns': 'faust_app.OutlierRecord'}}
{'station': 23907, 'lst_date': '2021-01-06', 'lst_time': '11:35', 'temp': -9999.0, 'source_file': 'Austin_data.txt', '__faust': {'ns': 'faust_app.OutlierRecord'}}
{'station': 23907, 'lst_date': '2021-02-27', 'lst_time': '14:10', 'temp': -9999.0, 'source_file': 'Austin_data.txt', '__faust': {'ns': 'faust_app.OutlierRecord'}}
{'station': 23907, 'lst_date': '2021-03-03', 'lst_time': '12:20', 'temp': -9999.0, 'source_file': 'Austin_data.txt', '__faust': {'ns': 'faust_app.OutlierRecord'}}
{'station': 23907, 'lst_date': '2021-03-03', 'lst_time': '12:25', 'temp': -9999.0, 'source_file': 'Austin_data.txt', '__faust': {'ns': 'faust_app.OutlierRecord'}}
{'stati

## Parallel Streaming and Data Processing

Next, we want to determine the station with the highest air temperature each hour. For this task we need to use parallel streaming and data processing techniques. We begin by creating a Kafka topic 'Partitioned' with three partitions. Each producer is responsible for handling one text file (data for one station) and for sending its data to a unique partition within the same topic. Utilizing Faust, we process the incoming data to identify the station recording the maximum air temperature every hour. The results are written back to a separate MaxTemp topic. For demonstration purposes, we focus on data from a single date: January 1, 2021.

In [4]:
# Defining producer configuration
producer = kafka.Producer({'bootstrap.servers': "localhost:29092",
                  'client.id': socket.gethostname()})


def run_producer_for_partition(file_path, partition):
    if os.path.isfile(file_path):
        df = pd.read_csv(file_path, delimiter=r'\s+', names=columns)
        # Filter for only one date
        df = df[df['LST_DATE'] == 20210101]                         
        for _, row in df.iterrows():
            data = row.to_dict()
            data['source_file'] = file_path  
            producer.produce('Partitioned', key=None, value=json.dumps(data), partition=partition, callback=error_report)
            producer.flush()
            
    logging.info(f"Messages from {file_path} delivered to Partitioned Topic at partition {partition}!")


def run_producers_in_parallel():
    threads = []
    for i, filename in enumerate(filenames):
        file_path = os.path.join(data_path, filename)
        thread = threading.Thread(target=run_producer_for_partition, args=(file_path, i))
        threads.append(thread)

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

The following code section is from 'faust_app.py'. Given that the data was collected every 5 minutes, we gather 12 records from each partition (each station) and determine the maximum value among them. This maximum value represents the highest air temperature for the current hour. We then write this value to the 'MaxTemp' topic. This process is repeated for each hour to continuously identify the station with the maximum air temperature

```python
# Defining the input and output Kafka topics
partitioned_topic = app.topic('Partitioned', value_type=bytes)
max_temp_topic = app.topic('MaxTemp', value_type=bytes)

# Defining Faust Schema for the output
class MaxTemperatureRecord(faust.Record):
    max_temp: float
    station: str
    lst_date: str
    lst_hour: str
    source_file: str

# Dictionary to hold the current records for comparison
current_records = {0: [], 1: [],2: []}

@app.agent(partitioned_topic)
async def max_temp_process(stream):
    async for event in stream.events():
        record = InputRecord(**event.value)
        partition = event.message.partition

        current_records[partition].append(record)

        # Check if we have 12 records for each partition
        if all(len(records) >= 12 for records in current_records.values()):
            combined_records = [rec for recs in current_records.values() for rec in recs[:12]]

            # Find the record with the maximum temperature
            max_record = max(combined_records, key=lambda x: x.AIR_TEMPERATURE)

            # Create the max temperature record
            date = max_record.LST_DATE
            lst_time = f"{record.LST_TIME:04d}"
            max_temp_record = MaxTemperatureRecord(
                max_temp=max_record.AIR_TEMPERATURE,
                station=max_record.WBANNO,
                lst_date=f"{str(date)[:4]}-{str(date)[4:6]}-{str(date)[6:]}",
                lst_hour=lst_time[:2],
                source_file=max_record.source_file
            )
            await max_temp_topic.send(value=max_temp_record)


            # Reset the current_records dictionary
            for partition in current_records.keys():
                current_records[partition] = current_records[partition][12:]

Finally, we simultaneously run the 'faust_app.py' in terminal along with the producer and consumer threads. This setup ensures that data is continuously produced, processed and consumed. 

In [5]:
# Creating threads for one producer and consumer
parallel_producer_threads = threading.Thread(target=run_producers_in_parallel)
consumer_thread = threading.Thread(target=run_consumer, args = ('MaxTemp', True, None))

# Run producer and consumer threads in parallel
parallel_producer_threads.start()
consumer_thread.start()

parallel_producer_threads.join()
consumer_thread.join()

INFO:root:Consumer subscribed to topic: MaxTemp


Received messages from MaxTemp:
{'max_temp': 2.4, 'station': 4136, 'lst_date': '2021-01-01', 'lst_hour': '00', 'source_file': './data/Spokane_data.txt', '__faust': {'ns': 'faust_app.MaxTemperatureRecord'}}
{'max_temp': 2.1, 'station': 4136, 'lst_date': '2021-01-01', 'lst_hour': '01', 'source_file': './data/Spokane_data.txt', '__faust': {'ns': 'faust_app.MaxTemperatureRecord'}}
{'max_temp': 2.2, 'station': 4136, 'lst_date': '2021-01-01', 'lst_hour': '02', 'source_file': './data/Spokane_data.txt', '__faust': {'ns': 'faust_app.MaxTemperatureRecord'}}
{'max_temp': 2.2, 'station': 4136, 'lst_date': '2021-01-01', 'lst_hour': '03', 'source_file': './data/Spokane_data.txt', '__faust': {'ns': 'faust_app.MaxTemperatureRecord'}}
{'max_temp': 2.0, 'station': 4136, 'lst_date': '2021-01-01', 'lst_hour': '04', 'source_file': './data/Spokane_data.txt', '__faust': {'ns': 'faust_app.MaxTemperatureRecord'}}
{'max_temp': 2.2, 'station': 4136, 'lst_date': '2021-01-01', 'lst_hour': '05', 'source_file': './d

INFO:root:Messages from ./data/Spokane_data.txt delivered to Partitioned Topic at partition 2!
INFO:root:Messages from ./data/Bronte_data.txt delivered to Partitioned Topic at partition 1!
INFO:root:Messages from ./data/Austin_data.txt delivered to Partitioned Topic at partition 0!


{'max_temp': 3.3, 'station': 4136, 'lst_date': '2021-01-01', 'lst_hour': '23', 'source_file': './data/Spokane_data.txt', '__faust': {'ns': 'faust_app.MaxTemperatureRecord'}}


INFO:root:No message received.
INFO:root:Consumer for MaxTemp closed
