In [None]:
from pymongo import MongoClient
from pprint import pprint
import folium  
from datetime import datetime
import time

In [None]:
# Question 1a
# import statements
from time import sleep
from kafka import KafkaConsumer
import datetime as dt
import matplotlib.pyplot as plt
import ast


# this line is needed for the inline display of graphs in Jupyter Notebook
%matplotlib notebook

topic = 'same_topic'

def annotate_max(x, y, ax = None):
    ymax = max(y)
    xpos = y.index(ymax)
    xmax = x[xpos]
    text = 'Max: Time={}, Value={}'.format(xmax, ymax)
    if not ax:
        ax=plt.gca()
    ax.annotate(text, xy=(xmax, ymax), xytext=(xmax, ymax+5), arrowprops=dict(facecolor='red', shrink=0.05),)
    
def annotate_min(x, y, ax = None):
    ymin = min(y)
    xpos = y.index(ymin)
    xmin = x[xpos]
    text = 'Min: Time={}, Value={}'.format(xmin, ymin)
    if not ax:
        ax=plt.gca()
    ax.annotate(text, xy=(xmin, ymin), xytext=(xmin, ymin+5), arrowprops=dict(facecolor='orange', shrink=0.05),)

def connect_kafka_consumer():
    _consumer = None
    try:
         _consumer = KafkaConsumer(topic,
                                   consumer_timeout_ms=10000, # stop iteration if no message after 10 sec
                                   auto_offset_reset='earliest', # comment this if you don't want to consume earliest available message
                                   bootstrap_servers=['localhost:9092'],
                                   api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _consumer

def init_plots():
    try:
        width = 9.5
        height = 6
        fig = plt.figure(figsize=(width,height)) # create new figure
        ax = fig.add_subplot(111) # adding the subplot axes to the given grid position
        fig.suptitle('Real-time uniform stream data visualization') # giving figure a title
        ax.set_xlabel('Time')
        ax.set_ylabel('Value')
        ax.set_ylim(0,110) 
        ax.set_yticks([0,20,40,60,80,100])
        fig.show() # displaying the figure
        fig.canvas.draw() # drawing on the canvas
        return fig, ax
    except Exception as ex:
        print(str(ex))
    
def consume_messages(consumer, fig, ax):
    start_time = time.time()
    try:
        # container for x and y values
        x, y = [], []
        # print('Waiting for messages')
        for message in consumer:
            x.append(time.time() - start_time)
            the_data = bytes.decode(message[6])
            converted_data = ast.literal_eval(the_data)
            y.append(float(converted_data[2]))
            # print(y)
            # we start plotting only when we have 10 data points
            if len(y) > 10:
                ax.clear()
                ax.plot(x, y)
                ax.set_xlabel('Time')
                ax.set_ylabel('Air Temperature')
                ax.set_ylim(0,50) 
                ax.set_yticks([0,20,40,60,80,100])
                annotate_max(x,y)
                annotate_min(x,y)
                fig.canvas.draw()
                x.pop(0) # removing the item in the first position
                y.pop(0)
        plt.close('all')
    except Exception as ex:
        print(str(ex))
    
if __name__ == '__main__':
    consumer = connect_kafka_consumer()
    fig, ax = init_plots()
    consume_messages(consumer, fig, ax)
    
    

In [None]:
# 2a
from pprint import pprint
import pymongo
from pymongo import MongoClient
import csv
from datetime import datetime
import matplotlib.pyplot as plt
client = MongoClient() # so we connect on the default host and port
db = client.fit3182_assignment_db # create a new database fit3182_assignment_db
the_collection = db.climate  # create a new collection called 'the_collection'
each_hour = the_collection.aggregate([ 
    {"$match": {"confidence": {"$ne": None}}},
    {"$group": {"_id": "$datetime", "count": {"$sum": 1}}}
])

hours = []
counts = []
for item in each_hour:
    the_time = item['_id']
    hour_value = int(the_time[0:2])
    hours.append(hour_value)
    counts.append(int(item['count']))

fig = plt.figure(figsize = (10,7))
plt.bar(hours, counts, width= 0.4)
plt.xlabel("Hour")
plt.ylabel("Count")
plt.title("The Total Number of Fire Records Each Hour")

plt.show()

In [None]:
#2b  
client = MongoClient() # so we connect on the default host and port
db = client.fit3182_assignment_db # create a new database fit3182_assignment_db
the_collection = db.climate  # create a new collection called 'the_collection'
all_locations = the_collection.find({})
first_location = the_collection.find({}).limit(1)
for item in first_location:
    first_entry = item
m = folium.Map(location=[first_entry['latitude'], first_entry['longitude']])
for entries in all_locations:
    air_temperature = str(entries['air_temperature_celsius'])
    surface_temperature = str(entries['surface_temperature_celsius'])
    relative_humidity = str(entries['relative_humidity'])
    confidence = str(entries['confidence'])
    surface_string = 'Surface Temp' + ' ' + surface_temperature + '\n'  
    air_string = 'Air Temp' + ' ' + air_temperature + '\n'
    humidity = 'Humidity' + ' ' + relative_humidity + '\n'
    confid_string = 'Confidence' + ' ' + confidence
    full_string = surface_string + air_string + humidity + confid_string
    if entries['cause'] == 'natural':
        
        folium.Marker(
            location = [entries['latitude'], entries['longitude']],
            popup = full_string,
            icon = folium.Icon(color = "blue"),
        ).add_to(m)
    else:
        folium.Marker(
                location = [entries['latitude'], entries['longitude']],
                popup = full_string,
                icon = folium.Icon(color= "red"),
            ).add_to(m)     
        
m


