# 3.4 Analyzing and visualizing PM2.5 data streams (Step-3)

This Jupyter notebook represents a **Kafka consumer** that analyses the data stream generated in the previous step. We will perform two types of analysis:

**a) Monitoring:** Creating a map visualization of the streamed data in "near real-time"

**b) Event Detection:** Calculation of the 3-day mean value of PM2.5 concentrations for each location. If the mean exceeds a critical value (event of interest), we trigger a notification. The threshold value is defined later in this Notebook document and can be changed.



In [2]:
## Import Libraries

import warnings ## ignore warnings that might be shown due to older python libraries
warnings.filterwarnings('ignore') 

import geopandas as gpd # to read files with spatial information like raster or vector
import json, math # handle json and mathematical operations
import numpy as np # handle matrix type operations and manipulation on numerical data
import geojson # handle json files with spatial information 
import pandas as pd # handle tabular data
import sys # output error messages
import time # provides time related functions
import socket #  to get network properties for kafka communication
from confluent_kafka import Consumer, KafkaError, KafkaException # Kafka library for kafka consumer components

from ipyleaflet import Map, basemaps, WidgetControl, Marker, basemap_to_tiles, DrawControl, GeoJSON, MarkerCluster, AwesomeIcon # widget to enable map interactions
from ipywidgets import IntSlider, ColorPicker, jslink # widget to enable map interactions

## KAFKA CONSUMER DEFINITION

In [4]:
### START: AVOID MAKING CHANGES ###

'''
Offset decides in what order to consume the message. "smallest" means read the first message that was sent at 1st position and then the others.
"largest" will mean to read the most 'recent' message in 1st position and then others in the same order
'''

conf = {'bootstrap.servers': 'kafka:9093',
        'default.topic.config': {'auto.offset.reset': 'smallest'},
        'group.id': socket.gethostname()}

### END: AVOID MAKING CHANGES ###

In [5]:
## Set topic name as set in file: step_2_producer.ipynb
topic = "pm25_stream"

## Kafka streamed data will be stored here
df = pd.DataFrame(columns=['lat','lon','value','day','boxId'])

Initialise the consumer and subscribe to the topic

In [6]:
consumer = Consumer(conf)
consumer.subscribe([topic])

running = True

Define functions and variables that will be used for real-time processing

In [7]:
# Define the PM Threshold
pm_threshold = 20

def warning_event_response(timestamp, pm, sensebox):

    '''
    This function defines what actions should be performed when the PM 2.5 levels have exceeded the defined threshold above
    In this case we are simply printing a message
    '''
    
    return str(sensebox)+" : "+str(timestamp)+" : !!! WARNING !!! PM 2.5 threshold exceeded with 3-Day Average of "+str(pm)

    '''
    You can insert an email trigger script after this comment
    '''
    
def event_notification(df, pm_threshold):
    
    '''
    Function to handle event notifications in real-time. This function is used to define the operations that should be 
    triggered for every data point that is received by the consumer.
    '''
    
    try:
                
        ## Get rolling average of pm value by lat/lon over last 3 days
        rolling_average = df.groupby(['lat','lon']).rolling(3)['value'].mean().reset_index()
        rolling_average.dropna(inplace=True)
        
        current_lat = df.iloc[df.shape[0] - 1,:].lat
        current_lon = df.iloc[df.shape[0] - 1,:].lon
        
        ## Get the rolling average value for each unique lat/lon. This is required because the stream has data for multiple locations
        rolling_average_index = rolling_average[(rolling_average['lat'] == current_lat) & (rolling_average['lon'] == current_lon)].index[-1]
        
        ## Get the details for the pm value, time and senseboxid for the current lat/lon that was received
        pm_value = rolling_average.loc[rolling_average_index, 'value']
        timestamp = df.iloc[df.shape[0] - 1,:]['day']
        sensebox = df.iloc[df.shape[0] - 1,:]['boxId']
            
        ## Trigger check
        if pm_value > pm_threshold:
            
            ## Trigger notification
            response = warning_event_response(timestamp, round(pm_value,2), sensebox)
            print(response)
        
        else:
            
            ## PM levels are safe
            print(str(sensebox)+" : "+str(timestamp)+" : Message Received PM 2.5 Levels are safe")

    except:

        ## In the begining when the consumer has just started there isn't enough data points to calculate rolling average for 3 days,
        ## hence, this logic will fail in the initial 2 iterations and can be simply handled by a try-catch section
        
        pass # do nothing and continue

Trigger the Kafka Consumer, the infinite loop will automatically break if no message is received for more than **10 seconds**

### EVENT DETECTION

In [8]:
try:

    ## Ifinite loop which breaks automatically based on a timer

    while running:

        msg = consumer.poll(timeout=10) # wait 10 seconds before exit. If no messages are received for 10 seconds, consuming will stop 
        
        if msg is None:
            break # if no messsages are received, exit this loop
        if msg.error():

            ## handle different errors that can come up (specific to kafka)

            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                    (msg.topic(), msg.partition(), msg.offset()))
            elif msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
                sys.stderr.write('Topic unknown, creating %s topic\n' %
                                    (topic))
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            
            ## This block is execute when everything is working fine and a message was successfully received

            ## Load the message in JSON format or dictionary format
            input = json.loads(msg.value())

            ## The actual PM 2.5 value is the "key" of the above dictionary
            key = list(input.keys())[0]
            
            ## Create a temporary dictionary with received values for each data point. Dict to Pandas conversion is easier
            ## Each of these dicts are appended to the pandas dataframe as a row

            stream = {
                'lat': input[key][0],  # latitude of the sensor
                'lon': input[key][1],  # longitude of the sensor
                'day': input[key][2],  # day of the value recording
                'value':  float(key),  # PM 2.5 Value
                'boxId': input[key][3] # Sensebox ID
            }

            ## Append the above dict to a pandas table
            df = df.append(stream, ignore_index = True)
            
            ### EVENT NOTIFICATION SECTION: START ###
            
            event_notification(df, pm_threshold)
            
            ### EVENT NOTIFICATION SECTION: END ###
            
        ## Commit enables processing of a message only once, meaning drops any duplicates, however, you may lose messages that
        ## were not sent for some failure and will not be re-tried. Removing this command is possible but will require further
        ## changes to this script to perform manual de-duplication
        consumer.commit()

except KeyboardInterrupt:
    pass

finally:
    consumer.close()
    
    ## Note: Re-running this cell will note pull the data again as it is already pulled and the consumer is closed. You should
    ## re-run the 'sendStream.py' file to send the data again and then restart this notebook

5750220bed08f9680c6b4154 : 2022-01-23T15:00:00.000Z : Message Received PM 2.5 Levels are safe
5750220bed08f9680c6b4154 : 2022-01-24T16:00:00.000Z : Message Received PM 2.5 Levels are safe
5750220bed08f9680c6b4154 : 2022-01-28T20:00:00.000Z : Message Received PM 2.5 Levels are safe
591f578c51d34600116a8ea5 : 2022-01-23T15:00:00.000Z : Message Received PM 2.5 Levels are safe
591f578c51d34600116a8ea5 : 2022-01-24T16:00:00.000Z : Message Received PM 2.5 Levels are safe
59ad958fd67eb50011b85f6d : 2022-01-23T15:00:00.000Z : Message Received PM 2.5 Levels are safe
59ad958fd67eb50011b85f6d : 2022-01-24T16:00:00.000Z : Message Received PM 2.5 Levels are safe
59ad958fd67eb50011b85f6d : 2022-01-25T17:00:00.000Z : Message Received PM 2.5 Levels are safe
59ad958fd67eb50011b85f6d : 2022-01-26T18:00:00.000Z : Message Received PM 2.5 Levels are safe
59ad958fd67eb50011b85f6d : 2022-01-27T19:00:00.000Z : Message Received PM 2.5 Levels are safe
59ad958fd67eb50011b85f6d : 2022-01-28T20:00:00.000Z : Messag

In [9]:
## Read the output of the streamed file
df.head()

Unnamed: 0,lat,lon,value,day,boxId
0,51.956168,7.651169,4.190406,2022-01-21T13:00:00.000Z,5750220bed08f9680c6b4154
1,51.956168,7.651169,6.32255,2022-01-22T14:00:00.000Z,5750220bed08f9680c6b4154
2,51.956168,7.651169,12.757133,2022-01-23T15:00:00.000Z,5750220bed08f9680c6b4154
3,51.956168,7.651169,26.606711,2022-01-24T16:00:00.000Z,5750220bed08f9680c6b4154
4,51.956168,7.651169,38.289267,2022-01-25T17:00:00.000Z,5750220bed08f9680c6b4154


In the above dataframe, we have data for three different days for each of the **n** locations

In [10]:
## Check how many values are present for each day
df['day'].value_counts()

nan                         4
2022-01-21T13:00:00.000Z    3
2022-01-22T14:00:00.000Z    3
2022-01-23T15:00:00.000Z    3
2022-01-24T16:00:00.000Z    3
2022-01-25T17:00:00.000Z    3
2022-01-26T18:00:00.000Z    3
2022-01-27T19:00:00.000Z    3
2022-01-28T20:00:00.000Z    3
Name: day, dtype: int64

In [11]:
# Convert Pandas to GeoPandas
gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.lon, df.lat))
gdf.set_crs(epsg=4326, inplace=True, allow_override=True)
gdf.drop(['lon','lat'], axis=1, inplace=True)
gdf.head()

Unnamed: 0,value,day,boxId,geometry
0,4.190406,2022-01-21T13:00:00.000Z,5750220bed08f9680c6b4154,POINT (7.65117 51.95617)
1,6.32255,2022-01-22T14:00:00.000Z,5750220bed08f9680c6b4154,POINT (7.65117 51.95617)
2,12.757133,2022-01-23T15:00:00.000Z,5750220bed08f9680c6b4154,POINT (7.65117 51.95617)
3,26.606711,2022-01-24T16:00:00.000Z,5750220bed08f9680c6b4154,POINT (7.65117 51.95617)
4,38.289267,2022-01-25T17:00:00.000Z,5750220bed08f9680c6b4154,POINT (7.65117 51.95617)


### SENSEBOX PLOTTING

In this section we will plot all the senseboxes, however, based on two conditions:

1. Plot senseboxes (In Green) that are live/returned values on the most recent date 
2. Plot senseboxes (In Red) that are down/did not return the values for most recent date

Using this real-time map visualization we can observe which sensors are actively streaming data and what are their locations

In [2]:
## Define Icons for Map

icon_active = AwesomeIcon(
    name='map-marker',
    marker_color='green',
    icon_color='green',
    spin=False
)

icon_inactive = AwesomeIcon(
    name='map-marker',
    marker_color='gray',
    icon_color='gray',
    spin=False
)

In [12]:
gdf['valid'] = gdf['value'].apply(lambda x: False if math.isnan(x) == True else True)

In [13]:
## Get most recent date

valid_boxes = gdf[gdf['valid'] == True]
recent_date = valid_boxes['day'].max()

In [14]:
active_boxes = gdf[gdf['day'] == recent_date][['boxId','geometry']]
active_boxes.drop_duplicates(subset=['boxId'], inplace=True)
active_boxes = active_boxes[['geometry']]
active_boxes['status'] = 'active'
active_boxes

Unnamed: 0,geometry,status
7,POINT (7.65117 51.95617),active
15,POINT (7.64522 51.96422),active
24,POINT (7.63528 51.90300),active


In [15]:
inactive_boxes = gdf[gdf['valid'] == False].drop_duplicates(subset=['geometry'])[['geometry']]
inactive_boxes['status'] = 'inactive'
inactive_boxes

Unnamed: 0,geometry,status
16,POINT (7.68419 51.92934),inactive
25,POINT (7.62677 51.94632),inactive
26,POINT (7.64146 51.95335),inactive
27,POINT (7.64143 51.96043),inactive


Setup marker icons to display the inactive and active sensors separately

In [16]:
## Create a cluster of active senseboxes as points

active_markers = []
for coords in active_boxes['geometry']:
    
    active_markers.append(
        Marker(location=(coords.y, coords.x), icon=icon_active, draggable=False)
    )

## Create a cluster of inactive senseboxes as points

inactive_markers = []
for coords in inactive_boxes['geometry']:
    
    inactive_markers.append(
        Marker(location=(coords.y, coords.x), icon=icon_inactive, draggable=False)
    )


In [17]:
lat = 51.9500023
lng = 7.6240147

center = (lat, lng)

m = Map(center=center, zoom=11)

active_boxes_cluster = MarkerCluster(
    markers=tuple(active_markers)
)

inactive_boxes_cluster = MarkerCluster(
    markers=tuple(inactive_markers)
)

m.add_layer(active_boxes_cluster)
m.add_layer(inactive_boxes_cluster)

display(m)

Map(center=[51.9500023, 7.6240147], controls=(ZoomControl(options=['position', 'zoom_in_text', 'zoom_in_title'…

This marks the end of our Kafka Streaming workflow. You should now be able to see a MAP of all senseboxes locations that are active/inactive

#### END OF TUTORIAL
