In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, ShortType, FloatType, ByteType
from time import sleep

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Lab7_1")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

dataSchema = StructType(
        [StructField("ArrDelay", FloatType(), True),
         StructField("ArrTime", FloatType(), True),
         StructField("DayOfWeek", ByteType(), True),
         StructField("DayofMonth", ByteType(), True),
         StructField("Dest", StringType(), True),            # Dest and UniqueCarrier zijn vooralsnog object-type, deze krijg ik nog niet geconvert
         StructField("Month", ByteType(), True)
         StructField("UniqueCarrier", StringType(), True),
         StructField("Year", ShortType(), True),
         ])

# Read from a source 
sdf = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1) \
        .csv("/home/jovyan/data/airline")

# drop the columns that we do not use
new_sdf = sdf.drop("DayOfWeek", "Dest", "DayofMonth", "Year")

# create the event time column 
withEventTimedf = new_sdf.selectExpr(
    "*",
    "cast(cast(ArrTime as double)/1000000000 as timestamp) as event_time")

withEventTimedf.printSchema()

withEventTimedf.groupBy(window(col("event_time"), "1 minute"), "UniqueCarrier").avg() \
        .writeStream \
        .queryName("avg_arrdelay_per_carrier") \
        .format("memory") \
        .outputMode("complete") \
        .start()

for x in range(100):
    spark.sql("SELECT * FROM avg_arrdelay_per_carrier").show()
    sleep(10)