# 1. Start Spark Session and Import Libraries

In [1]:
import findspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, DateType, BooleanType
from pyspark.ml import PipelineModel
from pyspark.ml.classification import RandomForestClassificationModel

findspark.init()

kafka_package = "org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.1"


spark = SparkSession.builder \
    .appName("FlightDelayStreamingPrediction") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.jars.packages", kafka_package) \
    .getOrCreate()

:: loading settings :: url = jar:file:/home/hadoop/spark-3.5.4/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jj/.ivy2/cache
The jars for the packages stored in: /home/jj/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ce135d54-381e-446d-a23d-cef0da19c2f1;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.scala-lang.modules#scala-parallel-collections_2.13;1.0.4 in central
	found org.apache.commons#commons-pool2;2.11.1 in

# 2. Load Models & Pipeline

In [2]:
pipeline_path = "./flight_delay_pipeline_model"
model_path = "./flight_delay_rf_model"

loaded_pipeline_model = PipelineModel.load(pipeline_path)
loaded_rf_model = RandomForestClassificationModel.load(model_path)

                                                                                

# 3. Create Kafka Topic

In [5]:
kafka_topic = "flight_data_stream"
kafka_bootstrap_servers = "localhost:9092"

kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "latest") \
    .load()

kafka_df = kafka_df.selectExpr("CAST(value AS STRING)")
kafka_df.printSchema()

root
 |-- value: string (nullable = true)



# 4. Prepare for Streaming

In [6]:
json_schema = StructType([
    StructField("FL_DATE", StringType(), True),
    StructField("AIRLINE", StringType(), True),
    StructField("AIRLINE_CODE", StringType(), True),
    StructField("ORIGIN", StringType(), True),
    StructField("DEST", StringType(), True),
    StructField("CRS_DEP_TIME", IntegerType(), True),
    StructField("CRS_ARR_TIME", IntegerType(), True),
    StructField("CRS_ELAPSED_TIME", DoubleType(), True),
    StructField("DISTANCE", DoubleType(), True),
])

parsed_stream_df = kafka_df \
    .select(F.from_json(F.col("value"), json_schema).alias("data")) \
    .select("data.*")

parsed_stream_df = parsed_stream_df.withColumn("FL_DATE", F.to_date(F.col("FL_DATE"), "yyyy-MM-dd"))
parsed_stream_df = parsed_stream_df.withColumn("DEP_HOUR", (F.col("CRS_DEP_TIME") / 100).cast("integer"))
parsed_stream_df = parsed_stream_df.withColumn("DEP_MINUTE", (F.col("CRS_DEP_TIME") % 100).cast("integer"))
parsed_stream_df = parsed_stream_df.withColumn("ARR_HOUR", (F.col("CRS_ARR_TIME") / 100).cast("integer"))
parsed_stream_df = parsed_stream_df.withColumn("ARR_MINUTE", (F.col("CRS_ARR_TIME") % 100).cast("integer"))
parsed_stream_df = parsed_stream_df.withColumn("DEP_DAY_OF_WEEK", F.dayofweek(F.col("FL_DATE")))
parsed_stream_df = parsed_stream_df.withColumn("DEP_MONTH", F.month(F.col("FL_DATE")))
parsed_stream_df = parsed_stream_df.withColumn("DEP_DAY_OF_MONTH", F.dayofmonth(F.col("FL_DATE")))
parsed_stream_df = parsed_stream_df.withColumn("DEP_WEEK_OF_YEAR", F.weekofyear(F.col("FL_DATE")))
parsed_stream_df = parsed_stream_df.withColumn("IS_WEEKEND", F.when(F.col("DEP_DAY_OF_WEEK").isin([1, 7]), 1).otherwise(0))
parsed_stream_df = parsed_stream_df.withColumn("DISTANCE_PER_MINUTE", F.col("DISTANCE") / (F.col("CRS_ELAPSED_TIME") + 1e-6))


feature_columns = [
    "AIRLINE_CODE", "ORIGIN", "DEST", "CRS_ELAPSED_TIME", "DISTANCE",
    "DEP_HOUR", "DEP_MINUTE", "ARR_HOUR", "ARR_MINUTE",
    "DEP_DAY_OF_WEEK", "DEP_MONTH", "DEP_DAY_OF_MONTH", "DEP_WEEK_OF_YEAR",
    "IS_WEEKEND", "DISTANCE_PER_MINUTE"
]
parsed_stream_df = parsed_stream_df.dropna(subset=feature_columns + ["FL_DATE"])
parsed_stream_df.printSchema()

processed_stream_df = loaded_pipeline_model.transform(parsed_stream_df)

predictions_df = loaded_rf_model.transform(processed_stream_df)

output_df = predictions_df.select(
    "FL_DATE",
    "AIRLINE_CODE",
    "ORIGIN",
    "DEST",
    "CRS_DEP_TIME",
    "CRS_ARR_TIME",
    "DISTANCE",
    "prediction",
    "probability"
)

output_df = output_df.withColumn(
    "Prediction_Label",
     F.when(F.col("prediction") == 1, "Severe Delay Predicted")
     .otherwise("No Severe Delay Predicted")
)

root
 |-- FL_DATE: date (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- AIRLINE_CODE: string (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- DEP_HOUR: integer (nullable = true)
 |-- DEP_MINUTE: integer (nullable = true)
 |-- ARR_HOUR: integer (nullable = true)
 |-- ARR_MINUTE: integer (nullable = true)
 |-- DEP_DAY_OF_WEEK: integer (nullable = true)
 |-- DEP_MONTH: integer (nullable = true)
 |-- DEP_DAY_OF_MONTH: integer (nullable = true)
 |-- DEP_WEEK_OF_YEAR: integer (nullable = true)
 |-- IS_WEEKEND: integer (nullable = false)
 |-- DISTANCE_PER_MINUTE: double (nullable = true)



In [7]:
prob_udf = F.udf(lambda prob: float(prob[1]), DoubleType())
output_df = output_df.withColumn("Probability_Severe_Delay", prob_udf(F.col("probability")))

output_df.printSchema()

query = output_df \
    .select("FL_DATE", "AIRLINE_CODE", "ORIGIN", "DEST", "CRS_DEP_TIME", "Prediction_Label", "Probability_Severe_Delay") \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .start()

try:
    query.awaitTermination()
except KeyboardInterrupt:
    print("Streaming query interrupted by user.")

root
 |-- FL_DATE: date (nullable = true)
 |-- AIRLINE_CODE: string (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- prediction: double (nullable = false)
 |-- probability: vector (nullable = true)
 |-- Prediction_Label: string (nullable = false)
 |-- Probability_Severe_Delay: double (nullable = true)



25/04/17 20:34:05 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-fba00bd5-233f-493f-9edf-6a46ddc2cd76. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/17 20:34:05 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/04/17 20:34:06 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


-------------------------------------------
Batch: 0
-------------------------------------------
+-------+------------+------+----+------------+----------------+------------------------+
|FL_DATE|AIRLINE_CODE|ORIGIN|DEST|CRS_DEP_TIME|Prediction_Label|Probability_Severe_Delay|
+-------+------------+------+----+------------+----------------+------------------------+
+-------+------------+------+----+------------+----------------+------------------------+



ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/jj/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/jj/.local/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


Streaming query interrupted by user.
