## Streaming Application


First, import all required libraries.

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'


import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pprint import pprint
from datetime import datetime, date

Pre-define function for writing the data into MongoDB

In [2]:
def sendDataToDB(raw_record):
    client = MongoClient()
    db = client.streaming_db
    climate_collection = db.climate
    
    for tmp_tuple in raw_record:
        data = {"climate":[],
                "hotspot":[],
               }

        hash_key = tmp_tuple[0]
        print("hash key", hash_key)
        for record in tmp_tuple[1]:
            print("record", record)
            if record["sender_id"] == '1':
                data["climate"].append(record)
            else:
                data["hotspot"].append(record)
                
        if len(data["climate"]) > 0:
            #pprint(data)
            
            #DATA MODEL FOR HOTSPOT
            hotspot_model = {'confidence': 0.0,
              'created_time': None,
              'latitude': 0.0,
              'longitude': 0.0,
              'surface_temperature_celcius': 0.0}
            
            hotspot_id = None
            #create hotspot data model
            for tmp_hotspot in data["hotspot"]:
                hotspot_model['confidence'] += tmp_hotspot['confidence']
                hotspot_model['surface_temperature_celcius'] += tmp_hotspot['surface_temperature_celcius']
                #'2019-05-20 18:42:57.622646'
                hotspot_model['created_time'] = datetime.strptime(tmp_hotspot['created_time'], '%Y-%m-%d %H:%M:%S.%f')
                hotspot_model['latitude'] = tmp_hotspot['latitude']
                hotspot_model['longitude'] = tmp_hotspot['longitude']
                hotspot_model['_id'] = tmp_hotspot['created_time']
                hotspot_id = tmp_hotspot['created_time']
            
            
            #average confidence and surface_temperature_celcius
            if hotspot_id is not None:
                hotspot_model['confidence'] = hotspot_model['confidence']/float(len(data["hotspot"]))
                hotspot_model['surface_temperature_celcius'] = hotspot_model['surface_temperature_celcius']/float(len(data["hotspot"]))

                
                
            #create climate data model
            for tmp_climate in data["climate"]:
                #DATA MODEL FOR CLIMATE
                climate_model = {'air_temperature_celcius': tmp_climate['air_temperature_celcius'],
                                 'created_time': datetime.strptime(tmp_climate['created_time'], '%Y-%m-%d %H:%M:%S.%f'),
                                 'latitude': tmp_climate['latitude'],
                                 'longitude': tmp_climate['longitude'],
                                 'max_wind_speed': tmp_climate['max_wind_speed'],
                                 'precipitation': tmp_climate['precipitation'],
                                 'relative_humidity': tmp_climate['relative_humidity'],
                                 'hotspots': [hotspot_model] if hotspot_model['created_time'] is not None else [],    #[hotspot_id] if hotspot_id is not None else [],
                                 'windspeed_knots': tmp_climate['windspeed_knots'],
                                 '_id': tmp_climate['created_time']
                                }
                
                print("Saving climate to mongo", climate_model)                
                
                try:
                    climate_collection.insert(climate_model)
                except Exception as ex:
                    print("Exception Occured. Message: {0}".format(str(ex)))
                
                
    client.close()


Main Function to get stream from Kafka Producers

In [None]:

print("Starting....")

n_secs = 10
topic = "fire"

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)

print("Creating streaming connections")

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'climate', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary


print("fetching.....")

#mapping geohash with the records
parsed = kafkaStream.map(lambda v: (json.loads(v[1])["geo_hash"], json.loads(v[1])))
#parsed.pprint()

#groupping the record based on the geohash
group = parsed.groupByKey()
#group.pprint()

#appling the function to each partition of each RDD in the stream
group.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))

ssc.start()
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)

Starting....
Creating streaming connections
fetching.....
