In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pbspark import MessageConverter
from driver_fatigue_pb2 import FatigueDetection
from seat_detection_pb2 import SeatDetection
from bus_location_pb2 import BusLocation
import sys

In [None]:
jarsPackages = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1,com.datastax.spark:spark-cassandra-connector_2.12:3.2.0"

In [None]:
spark = SparkSession.builder.master("local[*]") \
                    .appName('Spark Structured Streaming from Sensor EV-BUS') \
                    .config("spark.jars.packages", jarsPackages) \
                    .config("spark.cassandra.connection.host", "10.184.0.2") \
                    .config('spark.cassandra.auth.username','cassandra') \
                    .config('spark.cassandra.auth.password','cassandra') \
                    .config('spark.cassandra.connection.keepAliveMS','3600000') \
                    .getOrCreate()

In [None]:
def writeToConsole(ds,OutputMode):
    return ds.writeStream \
            .outputMode(OutputMode) \
            .format('console') \
            .option('truncate', False) \
            .start()

In [None]:
kafkaStream = spark.readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "103.146.34.72:9094") \
      .option("subscribe", 'driver_fatigue_detection,seat_detection,bus_location') \
      .option("includeHeaders", "true") \
      .load() \
      .selectExpr("value", "headers",'timestamp')

In [None]:
topicBusLocation = kafkaStream.select(col('headers'),col('value')).where(expr('headers')[0]['value'].cast('string')=='bus_location')
topicFatigueDriver = kafkaStream.select(col('headers'),col('value')).where(expr('headers')[0]['value'].cast('string')=="driver_fatigue_detection")
topicSeatDetection = kafkaStream.select(col('headers'),col('value')).where(expr('headers')[0]['value'].cast('string')=="seat_detection")

In [None]:
def parsedRawData(df, proto):
    # Parsed Protobuf encryption
    mc = MessageConverter()
    parsedData = df.withColumn('parsed', mc.from_protobuf('value', proto)) \
        .withColumn('topic', expr('headers')[0]['value'].cast('string')) \
        .withColumn('bus_id', expr('headers')[2]['value'].cast('string')) \
        .select('topic','bus_id', 'parsed.*') \
        .withColumn('timestamp', to_timestamp(col('timestamp') / 1000))

    return parsedData

In [None]:
DriverFatigueRaw = parsedRawData(topicFatigueDriver,FatigueDetection)
SeatDetectionRaw = parsedRawData(topicSeatDetection,SeatDetection)
BusLocationRaw = parsedRawData(topicBusLocation,BusLocation)

In [None]:
"topic_bus_location": "bus_location",
"topic_driver_fatigue": "driver_fatigue_detection",
"topic_seat_detection": "seat_detection",
"hadoop_job_path": "hdfs://namenode:9000/user/parallels/job",
"hadoop_checkpoint_path": "hdfs://namenode:9000/user/parallels/checkpoint"

In [None]:
def writeToHDFS(ds,topic):
    return ds \
        .writeStream \
        .outputMode('append') \
        .option('path', "hdfs://192.168.193.64:9000/user/parallels/job/") \
        .option('checkpointLocation', f"hdfs://192.168.193.64:9000/user/parallels/checkpoint/{topic}/") \
        .partitionBy('topic','bus_id') \
        .trigger(processingTime='10 seconds') \
        .start()

In [None]:
DriverFatigueDs = writeToHDFS(DriverFatigueRaw,'driver_fatigue_detection')
SeatDetectionDs = writeToHDFS(SeatDetectionRaw,'seat_detection')
BusLocationDs = writeToHDFS(BusLocationRaw,'bus_location')

In [None]:
writeToConsole(DriverFatigueRaw,'append')
writeToConsole(BusLocationRaw,'append')
writeToConsole(SeatDetectionRaw,'append')

In [None]:
def forEachBatchAction(batch_df,batch_id):
        return batch_df.write \
                .format("org.apache.spark.sql.cassandra") \
                .options(table='raw_data_bus_location', keyspace="busev") \
                .mode('append')\
                .save()

In [None]:
cassandra = BusLocationRaw.writeStream \
            .trigger(processingTime='5 seconds')\
            .foreachBatch(forEachBatchAction) \
            .outputMode('update')\
            .start() \
            .awaitTermination()