## EVENT CONSUMER CLIENT - Apache Spark Streaming<hr />

The consumer client is implemented by Apache Spark Streaming client which listens to the topic where the data is being ingested. The client accepts streams of data from all the producers, performs stream level operations in accordance to several operational conditions and eventually ingested into respective collections in the MongoDb database.

During the implementation of the Spark streaming engine, we have made some assumptions to ensure that the streaming, processing and ingestion into the database happens in a seamless manner. Following are the assumptions:

<ol>
    <li>The streaming application will ingest the processed data to the collections created to contain the histori data.</li>
    <li>The streaming application will be accepting streams of data from a single stream as a batch/partition of RDD's. The engine will go through each of the partition, checks for conditions mentioned in the business logic and takes actions; Operational or ingestional.</li>
    <li>For each stream of the data that is accepted, for each chunk of data within the stream recieved from each of the producer, we will be appending <b>geohash</b> key for each partition. The geohash is implemented by making use of third-party library using the latitude and longitude information present in each of the chunks of the streamed data. </li>
    <li>For comparing any two geohashes for 2 data chunks, we are considering the first 3 prefixes of the gephashes to be same for comparison. if the first 3 prefixes of any two chunks from any 2 producers is the same, we assume these 2 chunks belonging to the same region considering geohash represents a rectangular region on the map and plots the data points on the geogrpahical map</li>
    <li>We come across 3 conditions while processing the streamed data for the geohashes having similar prefixes:</li>
    <ul>
        <li>Two partitions : Hotspot-AQUA and Hotspot-TERRA; we perform the average of surface_temperature and confidence and ingest the transformed data to the database</li>
        <li>If the partitions are from Climate and Hotspot(AQUA OR TERRA), we append the objectID of the Hotspot to the Climate partition and ingest the modified partitions to Climate and Hotspot collections created.</li>
        <li>if both the partitions are from Climate data we are directly ingesting to the database without performing any transformation</li>
    </ul>
    <li>If the partitions do not match with geohashes, we directly ingest the data to the database. we do this to ensure that we do not lose data irrespective of whether the data is useful, relevant or not. The streamed and ingested data maybe of significance and prove to be a historic data for processing sometime later.</li>   
</ol>

In [None]:
# importing os package to set the environment for the pyspark streaming application
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

# importing all other relevant libraries
import ast
import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import pygeohash as pgh
import json
import pandas as pd
import string
from random import sample, choice
import random


# function to ingest the streamed data to the database to respective collections
def sendDataToDB(iter):
    
    #establishing a connection to the client
    client = MongoClient()
    
    #creating a database/referencing an existing database if it exists
    db = client.assignment2
    
    # creating or referencing an existing colletion for climate data
    climate = db.climate
    
    # creating or referencing an existing collection for hotspot data
    fire = db.fire
    
    # list to hold the collections of a particular partitioned stream
    lst=[]
    
    # counter to keep track of the number of chunks in the partition
    count = 0
    
    # A loop that will go through each of the record of the partition and perform operations based on the conditions satisfied
    for record in iter:
        
        # creating a json object of the read data
        x =json.loads(str(record[1]).replace("'",'"'))
        
        # creating a new dictionary for each of the json objects read to append additional information
        ins = {}
        
        # incrementing the count for each of the partition
        count += 1
        
        # replicating the data items in the dictionary to the new dictionary
        for k,v in x.items():
            ins[k] = x[k]
        
        # appending geohash to the new dictionary with all the values of the existing dictionary with geohash as the key
        ins["geohash"] = pgh.encode(float(x["latitude"]),float(x["longitude"]), precision = 5)
        
        # appending the updated dictionary to a list for further processing
        lst.append(ins)        
    
    # check if the number of partitions in the streamed partition is more than 1
    if count > 1:
        
        # perform a serial comparison with every element in the list with every other element and check conditions
        for i in range(len(lst)):
            for j in range(i+1,len(lst)):
                
                # checking if the first 3 prefixes are equal
                if lst[i]["geohash"][:3] == lst[j]["geohash"][:3]:
                    
                    # checking if the participating partitions are from Producer 2 and Producer 3, and perfrom the average operation and ingest to the database
                    if lst[i]["sender_id"] in ["fire_AQUA_producer_2","fire_TERRA_producer_3"] and lst[j]["sender_id"] in ["fire_AQUA_producer_2","fire_TERRA_producer_3"]:
                        lst[i]["confidence"] = (lst[i]["confidence"]+lst[j]["confidence"])/2
                        lst[i]["surface_temperature_celcius"] = (lst[i]["surface_temperature_celcius"]+lst[j]["surface_temperature_celcius"])/2
                        lst[i]["match"] = lst[j]["_id"]    
                        fire.insert(lst[i])
                    
                    # checking if both the partition are from climate, we ingest both of them to the climate collection
                    elif lst[i]["sender_id"] == "climate_producer_1" and lst[j]["sender_id"] == "climate_producer_1":

                        climate.insert(lst[i])
                        climate.insert(lst[j])

                    else:
                        # if both the above conditions are not met, then one must be from climate and the other from
                        # hotspot, we create a new key "match" with ObjectId of the matched hotspot data and eventually 
                        # insert into database.
                        if lst[i]["sender_id"] == "climate_producer_1":
                            lst[i]["match"] = lst[j]["_id"]

                            climate.insert(lst[i])
                            fire.insert(lst[j])

                        else:
                            lst[j]["match"] = lst[i]["_id"]

                            climate.insert(lst[j])
                            fire.insert(lst[i])
                
                # if the geohashes do not match, identify the producer of the datachunk and insert it into database. Once added
                # we break the inner loop to prevent comparison of the same datachunk and insertion. this will prevent duplication 
                # of data in the database
                else:
                    if lst[i]["sender_id"] == "climate_producer_1":
                        climate.insert(lst[i])
                        break
                    else:
                        fire.insert(lst[i])
                        break
    
    # if the size of the data partition is 1 and is from Producer 1, we directly insert the data to the database 
    else:
        if lst[i]["sender_id"] == "climate_producer_1":
            climate.insert(lst[i])
            
    # close the connection to the database, after the database operation are completed
    client.close()
        

        
# time interval for the streaming application to run
n_secs = 10

# topic for the streaming spark application to connect.
topic = "test"

# setting a local streaming application with 2 threads.
conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")

# creating or getting a spark streaming context
sc = SparkContext.getOrCreate()

if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")

# streaming context to run using the created spark streaming context with the specified number of seconds
ssc = StreamingContext(sc, n_secs)

# creating a direct stream with the specified topic and the Kafka server instance
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'week11-group', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary

# Printing the contents of the read stream
kafkaStream.pprint()

# for each parritioned RDD calling the sendDataToDB
lines = kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))

# starting the streaming context and await till termination
ssc.start()

# Run stream for 10 minutes just in case no detection of producer
time.sleep(600) 
ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)