# FIT5202 - Data Processing for Big Data S2 2021 
# Assessment 2B-Task3: Streaming Application


Student information
- Family Name: Aggarwal
- Given Name: Naval
- Student ID: 31153054
- Student email: nagg0001@student.monash.edu

### Import Libraries

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark import SparkConf

### Creating a spark session

In [2]:
master = "local"
#app _name
app_name = "Assignment-2B-Task3"
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

# Initialising with UTC timezone 
spark = SparkSession\
    .builder\
    .config(conf=spark_conf)\
    .config("spark.sql.session.timezone","UTC")\
    .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

### Ingested the streaming data into spark streams

In [3]:
# The topic of producer
topic = 'flightTopic'

# Connection to Kafka process and subscribing to the topic and loading the data with readStream
df_flights = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", topic) \
    .load()


# Converting the value from the kafka data stream to string
df_flights = df_flights.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [4]:
df_flights.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



### Transformation into the proper formats

In [5]:
# Creating a schema as per our data 
schema = ArrayType(StructType([
    StructField('YEAR',IntegerType(), True),
    StructField('MONTH',IntegerType(), True),
    StructField('DAY',IntegerType(), True),
    StructField('DAY_OF_WEEK',IntegerType(), True),
    StructField('AIRLINE',StringType(), True),
    StructField('FLIGHT_NUMBER',IntegerType(), True),
    StructField('TAIL_NUMBER',StringType(), True),
    StructField('ORIGIN_AIRPORT',StringType(), True),
    StructField('DESTINATION_AIRPORT',StringType(), True),
    StructField('SCHEDULED_DEPARTURE',IntegerType(), True),
    StructField('DEPARTURE_TIME',StringType(), True),
    StructField('DEPARTURE_DELAY',StringType(), True),
    StructField('TAXI_OUT',StringType(), True),
    StructField('WHEELS_OFF',StringType(), True),
    StructField('SCHEDULED_TIME',StringType(), True),
    StructField('ELAPSED_TIME',StringType(), True),
    StructField('AIR_TIME',StringType(), True),
    StructField('DISTANCE',IntegerType(), True),
    StructField('WHEELS_ON',StringType(), True),
    StructField('TAXI_IN',StringType(), True),
    StructField('SCHEDULED_ARRIVAL',IntegerType(), True),
    StructField('ARRIVAL_TIME',StringType(), True),
    StructField('ARRIVAL_DELAY',StringType(), True),
    StructField('DIVERTED',IntegerType(), True),
    StructField('CANCELLED',IntegerType(), True),
]))


In [6]:
#Use from_json to parse the string to the json format based on the defined schema.
#Each message contains the value of the timestamp as "ts" field and a random integer value as "value" field
df_parsed = df_flights.select(F.from_json(F.col("value").cast("string"), schema).alias('parsed_value'))

df_parsed.printSchema()

root
 |-- parsed_value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- YEAR: integer (nullable = true)
 |    |    |-- MONTH: integer (nullable = true)
 |    |    |-- DAY: integer (nullable = true)
 |    |    |-- DAY_OF_WEEK: integer (nullable = true)
 |    |    |-- AIRLINE: string (nullable = true)
 |    |    |-- FLIGHT_NUMBER: integer (nullable = true)
 |    |    |-- TAIL_NUMBER: string (nullable = true)
 |    |    |-- ORIGIN_AIRPORT: string (nullable = true)
 |    |    |-- DESTINATION_AIRPORT: string (nullable = true)
 |    |    |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |    |    |-- DEPARTURE_TIME: string (nullable = true)
 |    |    |-- DEPARTURE_DELAY: string (nullable = true)
 |    |    |-- TAXI_OUT: string (nullable = true)
 |    |    |-- WHEELS_OFF: string (nullable = true)
 |    |    |-- SCHEDULED_TIME: string (nullable = true)
 |    |    |-- ELAPSED_TIME: string (nullable = true)
 |    |    |-- AIR_TIME: string (nullable = tru

In [7]:
flightsDf = df_parsed.select(F.explode(F.col("parsed_value")).alias('value'))

flightsDf.printSchema()

root
 |-- value: struct (nullable = true)
 |    |-- YEAR: integer (nullable = true)
 |    |-- MONTH: integer (nullable = true)
 |    |-- DAY: integer (nullable = true)
 |    |-- DAY_OF_WEEK: integer (nullable = true)
 |    |-- AIRLINE: string (nullable = true)
 |    |-- FLIGHT_NUMBER: integer (nullable = true)
 |    |-- TAIL_NUMBER: string (nullable = true)
 |    |-- ORIGIN_AIRPORT: string (nullable = true)
 |    |-- DESTINATION_AIRPORT: string (nullable = true)
 |    |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |    |-- DEPARTURE_TIME: string (nullable = true)
 |    |-- DEPARTURE_DELAY: string (nullable = true)
 |    |-- TAXI_OUT: string (nullable = true)
 |    |-- WHEELS_OFF: string (nullable = true)
 |    |-- SCHEDULED_TIME: string (nullable = true)
 |    |-- ELAPSED_TIME: string (nullable = true)
 |    |-- AIR_TIME: string (nullable = true)
 |    |-- DISTANCE: integer (nullable = true)
 |    |-- WHEELS_ON: string (nullable = true)
 |    |-- TAXI_IN: string (nullable = true)


### Persisting the transformed streaming data

In [11]:
flights_query = flightsDf.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "flight.parquet")\
        .option("checkpointLocation", "flight.parquet/check")\
        .start()


In [None]:
#Stop the file_sink query
flights_query.stop()

In [12]:
# Read the saved parquet data
flights_query = spark.read.parquet("flight.parquet")

In [13]:
flights_query.printSchema()

root
 |-- value: struct (nullable = true)
 |    |-- YEAR: integer (nullable = true)
 |    |-- MONTH: integer (nullable = true)
 |    |-- DAY: integer (nullable = true)
 |    |-- DAY_OF_WEEK: integer (nullable = true)
 |    |-- AIRLINE: string (nullable = true)
 |    |-- FLIGHT_NUMBER: integer (nullable = true)
 |    |-- TAIL_NUMBER: string (nullable = true)
 |    |-- ORIGIN_AIRPORT: string (nullable = true)
 |    |-- DESTINATION_AIRPORT: string (nullable = true)
 |    |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |    |-- DEPARTURE_TIME: string (nullable = true)
 |    |-- DEPARTURE_DELAY: string (nullable = true)
 |    |-- TAXI_OUT: string (nullable = true)
 |    |-- WHEELS_OFF: string (nullable = true)
 |    |-- SCHEDULED_TIME: string (nullable = true)
 |    |-- ELAPSED_TIME: string (nullable = true)
 |    |-- AIR_TIME: string (nullable = true)
 |    |-- DISTANCE: integer (nullable = true)
 |    |-- WHEELS_ON: string (nullable = true)
 |    |-- TAXI_IN: string (nullable = true)
