# Kafka

In [24]:
import confluent_kafka as kafka, socket
import os, socket, uuid, json
import pandas as pd
import faust, time

Create kafka producer and consumer

In [25]:
producer = kafka.Producer({'bootstrap.servers': "localhost:29092",
                  'client.id': socket.gethostname()})
                  
consumer = kafka.Consumer({'bootstrap.servers': "localhost:29092",
                           'client.id': socket.gethostname(),
                           'group.id': 'test_group', 
                           'auto.offset.reset': 'earliest'})

topic = "Weather"

%4|1716836719.842|TERMINATE|cristian-pc#producer-1| [thrd:app]: Producer terminating with 1 message (73 bytes) still in queue or transit: use flush() to wait for outstanding message delivery


I have choosen the stations from Merced, Santa Barbara and Charlottesville. I read the data, sort it by timestamp and concatenate it.

In [29]:
column_names = ['WBANNO', 'UTC_DATE', 'UTC_TIME', 'LST_DATE', 'LST_TIME', 'CRX_VN', 'LONGITUDE', 'LATITUDE', 
                'AIR_TEMPERATURE', 'PRECIPITATION', 'SOLAR_RADIATION', 'SR_FLAG', 'SURFACE_TEMPERATURE', 
                'ST_TYPE', 'ST_FLAG', 'RELATIVE_HUMIDITY', 'RH_FLAG', 'SOIL_MOISTURE_5', 'SOIL_TEMPERATURE_5', 
                'WETNESS', 'WET_FLAG', 'WIND_1_5', 'WIND_FLAG']

merced = pd.read_table('CRNS0101-05-2021-CA_Merced_23_WSW.txt', sep='\s+', names=column_names)
barbara = pd.read_table('CRNS0101-05-2021-CA_Santa_Barbara_11_W.txt', sep='\s+', names=column_names)
charlotte = pd.read_table('CRNS0101-05-2021-VA_Charlottesville_2_SSE.txt', sep='\s+', names=column_names)

# Print codes for each dataset
print(f'Code for station in Merced: {merced.WBANNO[0]}')
print(f'Code for station in Santa Barbara: {barbara.WBANNO[0]}')
print(f'Code for station in Charlottesville: {charlotte.WBANNO[0]}')

all_data = pd.concat([merced, barbara, charlotte])
# Sort them by UTC_DATE and UTC_TIME
all_data = all_data.sort_values(by=['UTC_DATE', 'UTC_TIME'])
all_data.to_csv('all_data.csv', index=False)

Code for station in Merced: 93243
Code for station in Santa Barbara: 53152
Code for station in Charlottesville: 3759


In [30]:
all_data.head()

Unnamed: 0,WBANNO,UTC_DATE,UTC_TIME,LST_DATE,LST_TIME,CRX_VN,LONGITUDE,LATITUDE,AIR_TEMPERATURE,PRECIPITATION,...,ST_TYPE,ST_FLAG,RELATIVE_HUMIDITY,RH_FLAG,SOIL_MOISTURE_5,SOIL_TEMPERATURE_5,WETNESS,WET_FLAG,WIND_1_5,WIND_FLAG
0,93243,20210101,5,20201231,1605,2.622,-120.88,37.24,14.0,0.0,...,C,0,50,0,-99.0,-9999.0,1233,0,5.53,0
0,53152,20210101,5,20201231,1605,2.622,-119.88,34.41,15.3,0.0,...,C,0,51,0,-99.0,-9999.0,1216,0,1.0,0
0,3759,20210101,5,20201231,1905,2.623,-78.47,38.0,5.5,0.0,...,C,0,76,0,-99.0,-9999.0,1148,0,2.59,0
1,93243,20210101,10,20201231,1610,2.622,-120.88,37.24,13.8,0.0,...,C,0,50,0,-99.0,-9999.0,1236,0,4.91,0
1,53152,20210101,10,20201231,1610,2.622,-119.88,34.41,15.3,0.0,...,C,0,51,0,-99.0,-9999.0,1218,0,0.91,0


Generate the file to lauch using faust.

In [31]:
%%writefile faust_app.py

from typing import List
import faust
import numpy as np
import scipy.stats as stats
import math
from datetime import datetime

topic = "Weather"
# Number of messages to calculate the hourly temperature
# The messages arrive every 5-minutes
calculation_threshold = 12

# Connect to kafka
app = faust.App(topic, broker="kafka://localhost:29092")

print(f"App is {app}")

class stationMsg(faust.Record,validation=True):
    temperature: float
    timestamp: float
    stationID: str

# Create the topics
highest_temp_topic_name = 'highest_temp'
weather_topic = app.topic(topic, value_type=stationMsg)
highest_temp_topic = app.topic(highest_temp_topic_name, internal=True, partitions=1, value_type=stationMsg)

# Here I receive the hourly temperatures from the weather_topic
# I keep track of the hourly temperatures for each station
# to calculate the station with the highest temperature
@app.agent(highest_temp_topic)
async def highest_temp(msgs):
    hourlyTemperatures = {}
    timeToWait = 12
    counter = 0
    async for msg in msgs:
        if msg.stationID not in hourlyTemperatures:
            hourlyTemperatures[msg.stationID] = []
        
        # I keep track of the hourly temperatures for each station
        hourlyTemperatures[msg.stationID].append((msg.temperature, msg.timestamp))
        
        # Wait for 12 messages to calculate the station with maximum temperature
        # I need to wait because I'm not sure if I will receive the messages in order 
        # and I don't know if I have received at least one message for each station
        # because I don't know how many stations there are in advance
        counter += 1
        if counter > timeToWait:
            keys = hourlyTemperatures.keys()
            lens = [len(hourlyTemperatures[key]) for key in keys]
            minLen = np.min(lens)
            
            # Find the station with the highest temperature for each hour that I have received messages from all stations
            for i in range(minLen):
                temps = {key: hourlyTemperatures[key][i][0] for key in keys}
                max_key = max(temps, key=temps.get)
                time = datetime.fromtimestamp(hourlyTemperatures[max_key][i][1]).strftime('%Y-%m-%d %H:%M:%S')
                print(f"For hour {time} station with highest temperature is {max_key} with value {temps[max_key]}")
            
            # Remove the messages I have already processed
            counter = 0
            for key in keys:
                hourlyTemperatures[key] = hourlyTemperatures[key][minLen:]


# Here I accumulate the temperatures I receive from kafka
# After 12 messages for a station I calculate an "hourly" temperature
# I send the hourly temperature to the highest_temp_topic
# I also keep track of all the outliers for each station

@app.agent(weather_topic)
async def weather(msgs):
    temperatureBatch = {}
    outlierBatch = {}
    async for msg in msgs:
        if msg.stationID not in outlierBatch:
            outlierBatch[msg.stationID] = []

        if msg.stationID not in temperatureBatch:
            temperatureBatch[msg.stationID] = []
        
        # I assume the temperature is in celsius
        # I also assume that the temperature is in a reasonable range 
        # (not higher or lower than the maximum and minimum temperatures on earth)
        if msg.temperature > -88 and msg.temperature < 58: 
            temperatureBatch[msg.stationID].append(msg.temperature)
        else:
            print(f"Probable station mulfunction for station {msg.stationID} at time {datetime.fromtimestamp(msg.timestamp).strftime('%Y-%m-%d %H:%M:%S')}: record temperature {msg.temperature}")
    
        # Calculate the hourly temperature (threshold is 12 messages)
        # Send the hourly temperature to the highest_temp_topic to find the highest one
        if len(temperatureBatch[msg.stationID]) > calculation_threshold:
            batch = temperatureBatch[msg.stationID]
            avg = sum(batch)/len(batch)
            temperatureBatch[msg.stationID] = []

            await highest_temp_topic.send(value=stationMsg(temperature=avg, timestamp=msg.timestamp, stationID=msg.stationID))
        
        # Outlier detection
        # I keep track of all the temperatures for a station
        outlierBatch[msg.stationID].append(msg.temperature)

        # Every day hours I calculate the 99.9% confidence interval to find outliers
        if len(outlierBatch[msg.stationID]) > 12 * calculation_threshold:
            batch = outlierBatch[msg.stationID]

            # Statistics for the batch
            batch_mean = np.mean(batch)
            batch_std = np.std(batch)
            n = len(batch)
            batch_se = batch_std / math.sqrt(n)

            # calculating the t-score for a 99.9% confidence interval
            ci = 99.99
            dof = n - 1
            t_value = stats.t.ppf(q= (100+ci)/2/100 , df=dof)
            confidence_interval = (batch_mean - t_value * batch_se, batch_mean + t_value * batch_se)

            # Find outliers outside the confidence interval
            outside_interval = [temp for temp in batch if temp < confidence_interval[0] or temp > confidence_interval[1] or temp > 58 or temp < -88]
            outlierBatch[msg.stationID] = []
            if len(outside_interval) > 0:
                print(f"Values outside the {ci}% confidence interval for station {msg.stationID}: {outside_interval}")
            


Overwriting faust_app.py


In this section I generate the messages taken from all the data and send using the producer. 

The messages are made of temperature, timestamp (simulated one) and station ID.

To run the simulation is needed to have the docker container running using "docker-compose up", run the previous file using faust "faust -A faust_app worker -l info" and the following script to send the messages.


In [None]:
def get_msg(line):
    new_message = {"temperature": line['AIR_TEMPERATURE'],"timestamp": time.time(),"stationID": line['WBANNO']}
    return new_message

for index, line in all_data.iterrows():
    record_key = str(uuid.uuid4())
    msg = get_msg(line)
    record_value = json.dumps(msg)
    producer.produce(topic, key=record_key, value=record_value)
    producer.poll(0)  
    time.sleep(0.1)

# Ensure all messages have been sent
producer.flush()

Close the consumer

In [35]:
consumer.close()

# Conclusion

In conclusion the highest hourly temperature seems to be split between station ID 53152 (Santa Barbara) and 93243 (Merced). This is a reasonable result since they are both in California, while Charlottesville is in Virginia (that normally has a lower temperature). On average Santa Barbara has the highest temperatures that change from around 6 to 17 (hourly). We have to take into account the the first data retrieved is from january and I did not run the simulation for the all year (so the range can increase drastically). 

We can see also that the outliers for each station change and while for some station a value is normal for other stations it can be an outlier.