# Prerequisits

In [95]:
# I use pandas to collect the data from the URL and save it locally for spark to read it
# i have searched everywhere to find a way to read in the dataset from the URL in parrelel but could not find it
import pandas as pd

url = 'https://raw.githubusercontent.com/jpatokal/openflights/master/data/routes.dat'

# get the data from the URL and cast it to the variable pandas_df as a dataframe
pandas_df = pd.read_csv(url, names=["Airline", "Airline ID", "Source airport", 
                                    "Source airport ID", "Destination airport", "Destination airport ID", "Codeshare", "Stops", "Equipment"])
# save the file 
pandas_df.to_csv('routes.csv')

# Task 1

In [96]:
from pyspark.sql import SparkSession

# The entry point into all functionality in Spark is the SparkSession class
# To create a basic SparkSession, just use SparkSession.builder
spark = SparkSession.builder.appName("Airline Routes").getOrCreate()

# this will output a link named Spark UI to see more about your sessions
spark

df = spark.read.csv('routes.csv', inferSchema=True, nullValue='NA', header=True)

adf = df.groupBy('Source airport').count()

from pyspark.sql.functions import desc, asc

adf = adf.sort(desc("count"))
save_df = adf.limit(10)
save_df = save_df.toPandas()
save_df.to_csv('top_10_airports.csv')

df.show()

spark.stop()

+---+-------+----------+--------------+-----------------+-------------------+----------------------+---------+-----+---------+
|_c0|Airline|Airline ID|Source airport|Source airport ID|Destination airport|Destination airport ID|Codeshare|Stops|Equipment|
+---+-------+----------+--------------+-----------------+-------------------+----------------------+---------+-----+---------+
|  0|     2B|       410|           AER|             2965|                KZN|                  2990|     null|    0|      CR2|
|  1|     2B|       410|           ASF|             2966|                KZN|                  2990|     null|    0|      CR2|
|  2|     2B|       410|           ASF|             2966|                MRV|                  2962|     null|    0|      CR2|
|  3|     2B|       410|           CEK|             2968|                KZN|                  2990|     null|    0|      CR2|
|  4|     2B|       410|           CEK|             2968|                OVB|                  4078|     null| 

# Task 2

In [97]:
# By reading the theory. The stream of data is regarded as a table to which data is continiously appended
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql.types import StructType

# Read all the csv files written atomically in a directory
spark = SparkSession.builder.appName("Structured Streaming").getOrCreate()

# refrence example
# userSchema = StructType().add("name", "string").add("age", "integer")

userSchema = StructType().add("Airline", "string").add("Airline ID", "integer").add("Source airport", "string")\
.add("Source airport ID", "integer").add("Destination airport", "string").add("Destination airport ID", "integer")\
.add("Codeshare", "string").add("Stops", "integer").add("Equipment", "string")

csv_sdf = spark.readStream.csv('routes.csv', schema = userSchema)

csv_sdf.isStreaming



True

In [98]:
# csv_sdf.writeStream.format("memory").queryName("whole").start()
# spark.sql("select * from whole").show()

In [99]:
csv_sdf.createOrReplaceTempView("planes")
planes = spark.sql("select * from planes")

In [100]:
query = planes.writeStream.format("console").start()

In [101]:
query.awaitTermination()

StreamingQueryException: [STREAM_FAILED] Query [id = 64a0a879-c945-4a19-a32f-c178fcf61699, runId = 684c4aab-fc94-445a-af62-1e78538fcae8] terminated with exception: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z

Window Operations on Event Time

Aggregations over a sliding event-time window are straightforward with Structured Streaming and are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let’s understand this with an illustration.

In [None]:
from pyspark.sql.functions import window
path = "C:/Users/ozcan/Desktop/Schiphol"
agg_csv = csv_sdf.groupBy("Source airport").count()
# agg_csv = csv_sdf.groupBy("Source airport",window("Airline", windowDuration="10 minutes", slideDuration="5 minutes")).count()
agg_csv.isStreaming

True