#SparkSQL

Working with PySpark's built-in SQL compliant functionality to investigate flights-delay data at scale


*   Understand the limitations of SparkSQL
*   Experiment with createOrReplaceGlobalView
*   Work on same executions through pyspark methods



In [None]:
#Download Data from source
Data Source Repo - https://kloudbitbucket.s3.amazonaws.com/krunal_ds/departuredelays.csv

In [10]:
#Import necessary spark components
!pip install pyspark



Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=0f746797270e78849f396716bad38a43beae0c4a5cf854ff654e63b5e0c3ff01
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [11]:
from pyspark.sql import SparkSession

In [18]:
#Create a SparkSession
spark=SparkSession.builder.appName('vandana').getOrCreate()

In [34]:
#Import Data into Spark Native Dataframe
df=spark.read.option("Header","True").csv("departuredelays.csv")


In [36]:
#Convert into an SQL complaint format
sdf=df
sdf.createOrReplaceTempView("delays")


In [37]:
#Display 100 rows of data with SQL query
spark.sql("select * from delays").limit(100).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [43]:
#Get a list of all Origin and Destination airports
spark.sql("select origin,destination  from delays").show()

+------+-----------+
|origin|destination|
+------+-----------+
|   ABE|        ATL|
|   ABE|        DTW|
|   ABE|        ATL|
|   ABE|        ATL|
|   ABE|        ATL|
|   ABE|        ATL|
|   ABE|        ATL|
|   ABE|        ATL|
|   ABE|        ATL|
|   ABE|        ATL|
|   ABE|        ATL|
|   ABE|        ATL|
|   ABE|        DTW|
|   ABE|        ATL|
|   ABE|        DTW|
|   ABE|        ATL|
|   ABE|        DTW|
|   ABE|        ATL|
|   ABE|        ORD|
|   ABE|        DTW|
+------+-----------+
only showing top 20 rows



In [50]:
#Find the top 5 longest distance travel routes
#df.select('distance').limit(5).show()
spark.sql("select * from delays order by distance desc limit 5").show()


+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|02020959|   -9|     999|   PHX|        DSM|
|01071459|   11|     999|   DSM|        PHX|
|02040959|   -7|     999|   PHX|        DSM|
|01090751|    5|     999|   DSM|        PHX|
|02031950|   -5|     999|   PHX|        DSM|
+--------+-----+--------+------+-----------+



In [55]:
#Find top 3 origin airport with maximum flight delay occurances
spark.sql("""select origin
from delays
group by origin
order by sum(delay) desc limit 3
""").show()

+------+
|origin|
+------+
|   ORD|
|   ATL|
|   DEN|
+------+



In [56]:
#Find top 3 destination airport with maximum flight delay occurances
spark.sql(""" select destination
from delays
group by destination
order by sum(delay) desc limit(3)
""").show()


+-----------+
|destination|
+-----------+
|        ATL|
|        ORD|
|        DEN|
+-----------+



In [60]:
#Find the route with maximum delay occurances
spark.sql("""select origin, destination, max(delay)
from delays
group by origin,destination
order by max(delay) desc limit 1
""").show()

+------+-----------+----------+
|origin|destination|max(delay)|
+------+-----------+----------+
|   SMF|        SLC|       995|
+------+-----------+----------+



In [62]:
#Find the top three routes with maximum time-delay
spark.sql("""select *
from delays
order by delay desc limit 3
""").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01090600|  995|     462|   SMF|        SLC|
|03191420|  994|    1590|   SJC|        ORD|
|01200645|  993|     525|   MOT|        DEN|
+--------+-----+--------+------+-----------+



In [63]:
#Find the distance for top three max time-delay routes
spark.sql(""" select * from delays
order by distance desc limit 3
""").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|02010959|   -6|     999|   PHX|        DSM|
|01071459|   11|     999|   DSM|        PHX|
|02020959|   -9|     999|   PHX|        DSM|
+--------+-----+--------+------+-----------+

