# This notebook depicts spark streaming process, it listens to topic and persists events in mongodb atlas(hosted over aws). 



In [0]:
from pyspark import SparkConf, SparkContext
import sys
import math

assert sys.version_info >= (3, 5)  # make sure we have Python 3.5+
import re, datetime, uuid
from pyspark.sql import SQLContext, Row, SparkSession, functions, types
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType

# Spark Streaming process - Feature is available from spark 2.4 version 


**Streaming process handling etl layer, it captures events from kafka topic and transform data so that it can be loaded into mongodb**

**Mongo atlas is used here, which is hosted on AWS, dependencies provided with spark.jars.packages parameter**

In [0]:
def save_batch(df, epoch_id):
    df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append") \
            .option("database",'iot_prediction') \
            .option("collection", 'battery_1') \
            .save()
    pass


def main(topic, freq, database_name, collection_name, bootstrap_server):
    #spark = SparkSession.builder.appName('Read_Stream').config("spark.jars.packages",'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0,org.mongodb.spark:mongo-spark-connector_2.11:2.3.0').config('spark.mongodb.input.uri', 'mongodb+srv://falcon:vancouver@cmpt733-stzkw.mongodb.net/test?retryWrites=true').config('spark.mongodb.output.uri', 'mongodb+srv://falcon:vancouver@cmpt733-stzkw.mongodb.net/test?retryWrites=true').getOrCreate()
    spark = SparkSession.builder.appName('Read_Stream') \
              .config("spark.jars.packages",'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0,org.mongodb.spark:mongo-spark-connector_2.11:2.3.0') \
              .config('spark.mongodb.input.uri', 'mongodb+srv://falcon:vancouver@cmpt733-stzkw.mongodb.net/test?retryWrites=true') \
              .config('spark.mongodb.output.uri', 'mongodb+srv://falcon:vancouver@cmpt733-stzkw.mongodb.net/test?retryWrites=true') \
              .getOrCreate()

    messages = spark.readStream.format('kafka') \
        .option('kafka.bootstrap.servers', bootstrap_server) \
        .option('subscribe', topic).load()

    spark.sparkContext.setLogLevel('WARN')

    values = messages.select(messages['value'].cast('string'))
    split_val = functions.split(values['value'], ',')

    #Values are specific to business requirement
    
    values = values.withColumn('Langitude', split_val.getItem(0))
    values = values.withColumn('Battery_Level', split_val.getItem(1))
    values = values.withColumn('Latitude', split_val.getItem(2))
    values = values.withColumn('Battery_Cycle_No', split_val.getItem(3))
    values = values.withColumn('Location', split_val.getItem(4))
    values = values.withColumn('u_id', split_val.getItem(5))
    values = values.withColumn('Battery_Type', split_val.getItem(6))
    values = values.withColumn('Battery_Status', split_val.getItem(7))
    values = values.withColumn('User_Type', split_val.getItem(8))
    values = values.withColumn('DateTime', split_val.getItem(9))
    values = values.withColumn('full_name', split_val.getItem(10))
    

    stream = values.writeStream.foreachBatch(save_batch).start()

    stream.awaitTermination(freq)
    

   

**Below main method is used for invocation, all relevant information should be passed in this method.**


In [0]:
if __name__ == "__main__":
    topic = "battery-1"
    termination_time = 6000
    database_name = 'iot_prediction'
    collection_name = 'battery_1'
    bootstrap_server = '207.23.196.89:9092'
    
    main(topic, termination_time, database_name, collection_name, bootstrap_server) 