In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from time import sleep
import pandas as pd

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Stream_Processor")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")



# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

bucket = 'temp_bucket_prj2'

conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
spark.conf.set('temporaryGcsBucket', bucket)

schema = StructType(
    [StructField("flight_number", StringType(), True),
     StructField("flight_status", IntegerType(), True),
     StructField("planned_departure_time", StringType(), True),
     StructField("actual_departure_time", StringType(), True)
     ])

kafkaStream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("failOnDataLoss", "false") \
    .option("subscribe", "flight") \
    .option("startingOffsets", "latest") \
    .load()

df = kafkaStream.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
df.printSchema()

@pandas_udf(DoubleType())
def calculate_delay(actual_departure_time, planned_departure_time):
    delay_hours = []
    for actual, planned in zip(actual_departure_time, planned_departure_time):

        actual_time = pd.to_datetime(actual, format='%H:%M')
        planned_time = pd.to_datetime(planned, format='%H:%M')
        if actual_time >= planned_time:
            delay = (actual_time - planned_time).seconds / 60
        else:
            delay = (24*60 - ((planned_time - actual_time).total_seconds()/60))
        print(delay)
        delay_hours.append(delay)
    return pd.Series(delay_hours)

processed_df = df.withColumn(
    "delay_time",
    calculate_delay(col("parsed_value.actual_departure_time"), col("parsed_value.planned_departure_time"))
).select(
    "parsed_value.flight_number",
    "parsed_value.flight_status",
    "parsed_value.planned_departure_time",
    "parsed_value.actual_departure_time",
    "delay_time"
)
# Write the processed data to BigQuery
output_table = "data-eng-assignment-2.assignment2.spark-data"  

# processed_df = processed_df.groupBy().count()

average_delay = processed_df.selectExpr("avg(delay_time) as average_delay")
# average_delay.show()


def my_foreach_batch_function(df, batch_id):
   # Saving the data to BigQuery as batch processing sink -see, use write(), save(), etc.
    df.show()
    df.write.format('bigquery') \
      .option('table', output_table) \
      .mode("overwrite") \
      .save()

query = average_delay.writeStream.outputMode("complete") \
                    .trigger(processingTime = '2 seconds').foreachBatch(my_foreach_batch_function).start()

try:
    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")

root
 |-- parsed_value: struct (nullable = true)
 |    |-- flight_number: string (nullable = true)
 |    |-- flight_status: integer (nullable = true)
 |    |-- planned_departure_time: string (nullable = true)
 |    |-- actual_departure_time: string (nullable = true)

+-------------+
|average_delay|
+-------------+
|         NULL|
+-------------+

+-------------+
|average_delay|
+-------------+
|         29.8|
+-------------+

+------------------+
|     average_delay|
+------------------+
|29.407608695652176|
+------------------+

+------------------+
|     average_delay|
+------------------+
|30.033755274261605|
+------------------+

+------------------+
|     average_delay|
+------------------+
|30.463087248322147|
+------------------+

+------------------+
|     average_delay|
+------------------+
|32.137362637362635|
+------------------+

+------------------+
|     average_delay|
+------------------+
|31.615384615384617|
+------------------+

+------------------+
|     average_delay

In [74]:
spark.stop()
