#TASK 1: Batch output to filesystem

In [None]:
#Comment these lines if running from VSCode
#import findspark
#findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark import SparkFiles
from pyspark.sql.functions import *
from pyspark.streaming import *
from pyspark.sql.types import *
from pyspark.sql.types import *
from pyspark.sql.functions import window, col

#Build spark batch session

In [3]:
spark = SparkSession.builder\
    .appName("batch_app")\
    .getOrCreate()
    
sc = spark.sparkContext

url = "https://raw.githubusercontent.com/jpatokal/openflights/master/data/routes.dat"

sc.addFile(url)

#Filter raw data to drop \N values (null)

In [4]:
raw_df = spark.read.option("sep", ",").csv("file:///" + SparkFiles.get("routes.dat"))


#Delete rows with \N (null) values

routes_filter_df = raw_df \
                   .filter(raw_df._c0 != "\\N") \
                   .filter(raw_df._c1 != "\\N") \
                   .filter(raw_df._c2 != "\\N") \
                   .filter(raw_df._c3 != "\\N") \
                   .filter(raw_df._c4 != "\\N") \
                   .filter(raw_df._c5 != "\\N") \
                   .filter(raw_df._c6 != "\\N") \
                   .filter(raw_df._c7 != "\\N") \
                   .filter(raw_df._c8 != "\\N") \

routes_filter_df.show(10)


+---+----+---+----+---+----+---+---+---+
|_c0| _c1|_c2| _c3|_c4| _c5|_c6|_c7|_c8|
+---+----+---+----+---+----+---+---+---+
| 2P| 897|GES|2402|MNL|2397|  Y|  0|320|
| 2P| 897|MNL|2397|GES|2402|  Y|  0|320|
| 4M|3201|DFW|3670|EZE|3988|  Y|  0|777|
| 4M|3201|EZE|3988|DFW|3670|  Y|  0|777|
| 4M|3201|EZE|3988|JFK|3797|  Y|  0|777|
| 4M|3201|JFK|3797|EZE|3988|  Y|  0|777|
| 5N| 503|ARH|4362|CSH|6110|  Y|  0|AN4|
| 5N| 503|ARH|4362|MMK|2949|  Y|  0|AN4|
| 5N| 503|ARH|4362|USK|4369|  Y|  0|AN4|
| 5N| 503|CSH|6110|ARH|4362|  Y|  0|AN4|
+---+----+---+----+---+----+---+---+---+
only showing top 10 rows



#Renaming Columns for clarity

In [5]:
df_new=routes_filter_df.withColumnRenamed("_c0","Airline")\
    .withColumnRenamed("_c1","Airline_ID")\
    .withColumnRenamed("_c2","Source_airport")\
    .withColumnRenamed("_c3","Source_airport_ID")\
    .withColumnRenamed("_c4","Destination_airport")\
    .withColumnRenamed("_c5","Destination_airport_ID")\
    .withColumnRenamed("_c6","Codeshare")\
    .withColumnRenamed("_c7","Stops")\
    .withColumnRenamed("_c8","Equipment")
   
df_new.show()

df_new.createOrReplaceTempView("flights_information")

+-------+----------+--------------+-----------------+-------------------+----------------------+---------+-----+---------+
|Airline|Airline_ID|Source_airport|Source_airport_ID|Destination_airport|Destination_airport_ID|Codeshare|Stops|Equipment|
+-------+----------+--------------+-----------------+-------------------+----------------------+---------+-----+---------+
|     2P|       897|           GES|             2402|                MNL|                  2397|        Y|    0|      320|
|     2P|       897|           MNL|             2397|                GES|                  2402|        Y|    0|      320|
|     4M|      3201|           DFW|             3670|                EZE|                  3988|        Y|    0|      777|
|     4M|      3201|           EZE|             3988|                DFW|                  3670|        Y|    0|      777|
|     4M|      3201|           EZE|             3988|                JFK|                  3797|        Y|    0|      777|
|     4M|      3

#Select top 10 airports by source airport using spark.sql

In [6]:
#In spark.sql

top_airports = spark.sql("""
SELECT Source_airport, COUNT(Source_airport) as Count 
FROM flights_information 
GROUP BY Source_Airport
ORDER BY Count DESC LIMIT 10
 """)

top_airports.show()

#dense_rank can be used

+--------------+-----+
|Source_airport|Count|
+--------------+-----+
|           ATL|  633|
|           ORD|  277|
|           LAX|  209|
|           LHR|  202|
|           VIE|  159|
|           CDG|  155|
|           FRA|  150|
|           AMS|  145|
|           DFW|  143|
|           DEN|  136|
+--------------+-----+



#Select top 10 airports by source airport using pyspark

In [7]:
top_airports = df_new.select("Source_airport") \
                        .groupBy("Source_airport").count() \
                        .orderBy(desc("count")) \
                        .limit(10)

top_airports.show()

+--------------+-----+
|Source_airport|count|
+--------------+-----+
|           ATL|  633|
|           ORD|  277|
|           LAX|  209|
|           LHR|  202|
|           VIE|  159|
|           CDG|  155|
|           FRA|  150|
|           AMS|  145|
|           DFW|  143|
|           DEN|  136|
+--------------+-----+



#Print result to filesystem

In [8]:
#Write to sink

csvPath = "./output/batch"
top_airports.write.mode("overwrite").option("header", "true") \
.csv(csvPath)

print("Output .csv file saved in", csvPath)

Output .csv file saved in ./output/batch


#Stop batch session

#TASK 2: Convert to Structured Streaming

In [9]:
checkDirectory = "./output/streaming/check"

  #Define schema required for Structured streaming

fileSchema = (StructType()
  .add(StructField("Airline", StringType(), True))
  .add(StructField("Airline_ID", IntegerType()))
  .add(StructField("Source_airport", StringType(), True))
  .add(StructField("Source_airport_ID", IntegerType()))
  .add(StructField("Destination_airport", StringType(), True))
  .add(StructField("Destination_airport_ID", IntegerType()))
  .add(StructField("Codeshare", StringType(), True))
  .add(StructField("Stops", IntegerType()))
  .add(StructField("Equipment", StringType(), True))
  )


In [10]:
inputDF_streaming = (spark
  .readStream
  .schema(fileSchema)
  .option("header", False)
  .option("maxFilesperTrigger ",1)
  .csv(SparkFiles.getRootDirectory())) #stream needs to read from directory

In [11]:
query = inputDF_streaming.writeStream \
    .trigger(availableNow=True) \
    .format("console") \
    .outputMode("complete") \
    .option("checkpointLocation", checkDirectory) \
    .outputMode("append") \
    .start()


In [12]:
 #cleaning & including dummy timestamp
    
from pyspark.sql import Row
    
transformed_df = inputDF_streaming \
                   .filter(inputDF_streaming.Airline.isNotNull()) \
                   .filter(inputDF_streaming.Airline_ID.isNotNull()) \
                   .filter(inputDF_streaming.Source_airport.isNotNull()) \
                   .filter(inputDF_streaming.Source_airport_ID.isNotNull()) \
                   .filter(inputDF_streaming.Destination_airport.isNotNull()) \
                   .filter(inputDF_streaming.Destination_airport_ID.isNotNull()) \
                   .filter(inputDF_streaming.Codeshare.isNotNull()) \
                   .filter(inputDF_streaming.Stops.isNotNull()) \
                   .filter(inputDF_streaming.Equipment.isNotNull())
    
    
transformed_df_withEvents = transformed_df \
     .selectExpr(
    "*",
    "cast(cast(Destination_airport_ID as int)/1000000 as timestamp) as dummy_event_time"
  )

transformed_df_withEvents = transformed_df_withEvents \
                        .filter((transformed_df_withEvents.dummy_event_time.isNotNull()))
    


#Select top airports (streaming_top_airports)

In [13]:
from pyspark.sql.functions import *

spark.conf.set("spark.sql.shuffle.partitions", 5)

# Select the source airport column and count the occurrences

#transformed_df.createOrReplaceTempView("flights_information")


streaming_top_airports = transformed_df_withEvents.select("Source_airport") \
                        .groupBy("Source_airport").count() \
                        .orderBy(desc("count")) \
                        .limit(10)


checkDirectory = "./output/streaming/check"

streaming_top_airports.printSchema()


print("Streaming DataFrame : " + str(streaming_top_airports.isStreaming))

root
 |-- Source_airport: string (nullable = true)
 |-- count: long (nullable = false)

Streaming DataFrame : True


In [14]:
# Write the streaming output to the console

checkDirectory = "./output/streaming/check"

streamingQuery = streaming_top_airports \
    .writeStream \
    .format("console") \
    .outputMode("complete") \
    .option("checkDir", checkDirectory ) \
    .trigger(processingTime="10 minutes") \
    .start()

#.awaitTermination() 
    


TASK 3: Aggregations using sliding windows

In [15]:
# Create a window with a time interval of 1 minute

from pyspark.sql.functions import window, col

streaming_df = transformed_df_withEvents \
    .groupBy(window(col("dummy_event_time"), "10 minutes", "5 minutes"), col("Source_airport"))\
    .count() \
    .orderBy(desc("count")) \
    .limit(10)

streaming_df.printSchema()


root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- Source_airport: string (nullable = true)
 |-- count: long (nullable = false)



In [16]:
#Write output to console

streamingQuery = streaming_df \
    .writeStream \
    .format("console") \
    .outputMode("complete") \
    .trigger(processingTime="20 seconds") \
    .start() 