<a href="https://colab.research.google.com/github/Rparekh123/PySpark/blob/main/PySpark_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark SQL 
## Airline Dataset Exploration

In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 62.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=2e756a3542dbf9d3a472e1d1049d5d586f1ab2f6da32c00f525d335c365e8bd1
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


## Create Dataframe

In [4]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('airlines_case_sql').getOrCreate()

In [5]:
# Create datafrme as a table
airlinedf = spark.read.option('header', 'true').csv('airlines1.csv')

## Explanatory Data

In [6]:
# Register dataframe as a table
airlinedf.registerTempTable('AirlineTable')



In [7]:
# Read the register table
spark.sql('select * from AirlineTable').show(5)

+---+----+-------+-----+----------+---------+----------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+------+------------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+----+--------------------+---------+-------------+--------------+-------+----------+-------+--------+---------------+--------+--------------------+----------+-------+---------+--------+------+----------+-------+--------+---------------+--------+------------------+----------+---------+----------------+--------+--------------+-----------------+-------+-------+--------+-------------+------------+------------+--------+-------------+-----------------+------------+-------------+---------------+------------------+--------------+--------------------+-----------+-----------+-----------+-------------+----------------+------------+--------------+----

In [8]:
# Number of rows
spark.sql('select count(*) from AirlineTable').show()

+--------+
|count(1)|
+--------+
|   50001|
+--------+



In [9]:
# Columns with datatype
spark.sql('describe AirlineTable').show(50)

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|                 _c0|   string|   null|
|                Year|   string|   null|
|             Quarter|   string|   null|
|               Month|   string|   null|
|          DayofMonth|   string|   null|
|           DayOfWeek|   string|   null|
|          FlightDate|   string|   null|
|   Reporting_Airline|   string|   null|
|DOT_ID_Reporting_...|   string|   null|
|IATA_CODE_Reporti...|   string|   null|
|         Tail_Number|   string|   null|
|Flight_Number_Rep...|   string|   null|
|     OriginAirportID|   string|   null|
|  OriginAirportSeqID|   string|   null|
|  OriginCityMarketID|   string|   null|
|              Origin|   string|   null|
|      OriginCityName|   string|   null|
|         OriginState|   string|   null|
|     OriginStateFips|   string|   null|
|     OriginStateName|   string|   null|
|           OriginWac|   string|   null|
|       DestAirp

In [10]:
# Selecting specific columns
spark.sql('select origin, destcityname from AirlineTable').show(5)

+------+--------------------+
|origin|        destcityname|
+------+--------------------+
|   MSP|  Salt Lake City, UT|
|   MKE|         Orlando, FL|
|   GJT|Dallas/Fort Worth...|
|   LAX|         Detroit, MI|
|   EWR|       Charlotte, NC|
+------+--------------------+
only showing top 5 rows



In [11]:
# Overall flight delays
spark.sql('select Flight_Number_Reporting_Airline, DepDelay+ArrDelay from AirlineTable').show()

+-------------------------------+---------------------+
|Flight_Number_Reporting_Airline|(DepDelay + ArrDelay)|
+-------------------------------+---------------------+
|                            675|                 42.0|
|                            671|                 -2.0|
|                           3297|                 11.0|
|                           1806|                -20.0|
|                            465|                 83.0|
|                           1198|                 11.0|
|                           1431|                 -2.0|
|                           3459|                435.0|
|                           7233|                 12.0|
|                           5932|                 45.0|
|                           2135|                  8.0|
|                           1463|                 -5.0|
|                            830|                -13.0|
|                           6474|                -23.0|
|                            674|               

In [12]:
# Average departure delay
spark.sql('select avg(DepDelayMinutes) from AirlineTable').show()

+--------------------+
|avg(DepDelayMinutes)|
+--------------------+
|  10.851455322613475|
+--------------------+



In [15]:
# count total number of flights by origin airport and total avg and maximum DepDelay time of each airport
spark.sql('select origin, count(*), avg(DepDelayMinutes), max(DepDelayMinutes) from AirlineTable where group by origin').show()

+------+--------+--------------------+--------------------+
|origin|count(1)|avg(DepDelayMinutes)|max(DepDelayMinutes)|
+------+--------+--------------------+--------------------+
|   ABE|      49|   6.659574468085107|                44.0|
|   ABI|      11|  0.5555555555555556|                 4.0|
|   ABQ|     290|   9.041958041958042|                 9.0|
|   ABR|       3|               284.0|               852.0|
|   ABY|       3|                20.0|                30.0|
|   ACK|       1|                 0.0|                 0.0|
|   ACT|      16|              3.1875|                33.0|
|   ACV|      14|   7.071428571428571|                 7.0|
|   ACY|       5|                 7.6|                38.0|
|   ADK|       2|                 6.5|                13.0|
|   ADQ|       4|   4.666666666666667|                14.0|
|   AEX|      11|   8.090909090909092|                34.0|
|   AGS|      27|  23.814814814814813|                64.0|
|   AKN|       2|                 9.5|  

In [17]:
# calculate origin state cancelled flights
spark.sql('select OriginState, count(Cancelled) from AirlineTable where Cancelled = 1 group by originstate').show()

+-----------+----------------+
|OriginState|count(Cancelled)|
+-----------+----------------+
|         SC|               8|
|         AZ|              12|
|         LA|               5|
|         MN|              26|
|         NJ|              27|
|         OR|               5|
|         VA|              41|
|         RI|               4|
|         KY|              15|
|         WY|               1|
|         NH|               1|
|         MI|              24|
|         NV|              10|
|         WI|               6|
|         ID|               5|
|         CA|              99|
|         NE|               2|
|         CT|               4|
|         NC|              31|
|         MD|               6|
+-----------+----------------+
only showing top 20 rows



In [25]:
# Condition on 2 columns
spark.sql('select origin, count(*) from AirlineTable where year = 2010 and month = 6 group by origin order by count(*) desc').show()

+------+--------+
|origin|count(1)|
+------+--------+
|   ATL|       8|
|   ORD|       7|
|   PHX|       6|
|   DFW|       6|
|   MCO|       6|
|   LAX|       5|
|   IAH|       5|
|   CLE|       4|
|   CMH|       3|
|   TPA|       3|
|   CLT|       3|
|   MIA|       3|
|   MEM|       3|
|   BOS|       3|
|   IAD|       3|
|   SFO|       3|
|   LAS|       3|
|   SYR|       2|
|   EWR|       2|
|   COS|       2|
+------+--------+
only showing top 20 rows



In [26]:
# Conditions on different columns
spark.sql("select DOT_ID_REPORTING_AIRLINE from AirlineTable where year = 2015 and month = 12 and origin = 'JFK'").show()

+------------------------+
|DOT_ID_REPORTING_AIRLINE|
+------------------------+
|                   20409|
|                   19805|
+------------------------+



## Time flight performance - year wise

In [31]:
df = spark.sql('select year, count(*) AS TOTAL_FLIGHT, count(arrdelay == 0) AS TOTAL_ON_TIME, sum(cancelled) AS TOTAL_CANCELLED, sum(diverted) AS TOTAL_DIVERTED from AirlineTable where group by year')

In [32]:
df.registerTempTable('AirlineTable1')



In [33]:
spark.sql('select * from AirlineTable1').show()

+----+------------+-------------+---------------+--------------+
|year|TOTAL_FLIGHT|TOTAL_ON_TIME|TOTAL_CANCELLED|TOTAL_DIVERTED|
+----+------------+-------------+---------------+--------------+
|1987|         339|          336|            3.0|           0.0|
|2016|        1502|         1484|           17.0|           1.0|
|2020|         462|          438|           24.0|           0.0|
|2012|        1519|         1497|           20.0|           2.0|
|1988|        1310|         1293|           15.0|           2.0|
|2019|        1900|         1862|           34.0|           4.0|
|2017|        1449|         1421|           22.0|           6.0|
|2014|        1513|         1476|           31.0|           6.0|
|2013|        1690|         1663|           22.0|           5.0|
|2005|        1812|         1776|           33.0|           3.0|
|2000|        1477|         1432|           42.0|           3.0|
|2002|        1371|         1360|           11.0|           0.0|
|2009|        1604|      

In [36]:
# percentage of performance
spark.sql('select year, TOTAL_FLIGHT, TOTAL_ON_TIME, TOTAL_ON_TIME*100/TOTAL_FLIGHT AS ON_TIME_PER, TOTAL_CANCELLED, TOTAL_CANCELLED*100/TOTAL_FLIGHT AS CANCELLED_PER, TOTAL_DIVERTED, TOTAL_DIVERTED*100/TOTAL_FLIGHT AS DIVERTED_PER from AirlineTable1;').show()

+----+------------+-------------+-----------------+---------------+------------------+--------------+-------------------+
|year|TOTAL_FLIGHT|TOTAL_ON_TIME|      ON_TIME_PER|TOTAL_CANCELLED|     CANCELLED_PER|TOTAL_DIVERTED|       DIVERTED_PER|
+----+------------+-------------+-----------------+---------------+------------------+--------------+-------------------+
|1987|         339|          336|99.11504424778761|            3.0|0.8849557522123894|           0.0|                0.0|
|2016|        1502|         1484|98.80159786950732|           17.0|1.1318242343541944|           1.0|0.06657789613848203|
|2020|         462|          438| 94.8051948051948|           24.0| 5.194805194805195|           0.0|                0.0|
|2012|        1519|         1497|98.55167873601053|           20.0| 1.316655694535879|           2.0| 0.1316655694535879|
|1988|        1310|         1293|98.70229007633588|           15.0|1.1450381679389312|           2.0|0.15267175572519084|
|2019|        1900|     

## Time flight performance - origin city name

In [38]:
df2 = spark.sql('select origincityname, count(*) AS TOTAL_FLIGHT, count(arrdelay==0) AS TOTAL_ON_TIME, sum(cancelled) AS TOTAL_CANCELLED, sum(diverted) AS TOTAL_DIVERTED from AirlineTable where group by origincityname;')

In [39]:
df2.registerTempTable('AirlineTable2')



In [40]:
spark.sql('select * from AirlineTable2').show()

+------------------+------------+-------------+---------------+--------------+
|    origincityname|TOTAL_FLIGHT|TOTAL_ON_TIME|TOTAL_CANCELLED|TOTAL_DIVERTED|
+------------------+------------+-------------+---------------+--------------+
|   Gainesville, FL|          18|           18|            0.0|           0.0|
|      Richmond, VA|         146|          141|            5.0|           0.0|
|        Tucson, AZ|         157|          155|            2.0|           0.0|
|       Ontario, CA|         270|          266|            3.0|           1.0|
|        Pierre, SD|           1|            1|            0.0|           0.0|
|     Pago Pago, TT|           1|            1|            0.0|           0.0|
|       Medford, OR|          21|           21|            0.0|           0.0|
|  Myrtle Beach, SC|          41|           40|            1.0|           0.0|
|  Palm Springs, CA|          85|           80|            3.0|           2.0|
|       Redding, CA|           8|            8|     

In [41]:
# percentage of performance
spark.sql('select origincityname, TOTAL_FLIGHT, TOTAL_ON_TIME, TOTAL_ON_TIME*100/TOTAL_FLIGHT AS ON_TIME_PER, TOTAL_CANCELLED, TOTAL_CANCELLED*100/TOTAL_FLIGHT AS CANCELLED_PER, TOTAL_DIVERTED, TOTAL_DIVERTED*100/TOTAL_FLIGHT AS DIVERTED_PER from AirlineTable2;').show()

+------------------+------------+-------------+-----------------+---------------+------------------+--------------+-------------------+
|    origincityname|TOTAL_FLIGHT|TOTAL_ON_TIME|      ON_TIME_PER|TOTAL_CANCELLED|     CANCELLED_PER|TOTAL_DIVERTED|       DIVERTED_PER|
+------------------+------------+-------------+-----------------+---------------+------------------+--------------+-------------------+
|   Gainesville, FL|          18|           18|            100.0|            0.0|               0.0|           0.0|                0.0|
|      Richmond, VA|         146|          141|96.57534246575342|            5.0|3.4246575342465753|           0.0|                0.0|
|        Tucson, AZ|         157|          155|98.72611464968153|            2.0|1.2738853503184713|           0.0|                0.0|
|       Ontario, CA|         270|          266|98.51851851851852|            3.0|1.1111111111111112|           1.0|0.37037037037037035|
|        Pierre, SD|           1|            1| 

In [42]:
# Multiple gorup by columns
spark.sql('select origin, origincityname, count(*) from AirlineTable group by origin, origincityname;').show()

+------+--------------------+--------+
|origin|      origincityname|count(1)|
+------+--------------------+--------+
|   COS|Colorado Springs, CO|     100|
|   SDF|      Louisville, KY|     175|
|   CLL|College Station/B...|      10|
|   PIR|          Pierre, SD|       1|
|   MSN|         Madison, WI|      81|
|   EVV|      Evansville, IN|      18|
|   CMI|Champaign/Urbana, IL|      10|
|   GTR|        Columbus, MS|       2|
|   RIC|        Richmond, VA|     146|
|   STX|   Christiansted, VI|      11|
|   FLO|        Florence, SC|       4|
|   DCA|      Washington, DC|     704|
|   IPL|       El Centro, CA|       1|
|   CKB|Clarksburg/Fairmo...|       1|
|   GRI|    Grand Island, NE|       3|
|   ITO|            Hilo, HI|      26|
|   MSP|     Minneapolis, MN|    1128|
|   AVL|       Asheville, NC|      29|
|   CHS|      Charleston, SC|     109|
|   PIT|      Pittsburgh, PA|     641|
+------+--------------------+--------+
only showing top 20 rows



In [43]:
# rank of origin by number of flights 
spark.sql('select rank() over (ORDER BY count(*)desc), origin, origincityname, count(*) from AirlineTable where group by origin, origincityname;').show()

+------------------------------------------------------------------------------------------------+------+--------------------+--------+
|RANK() OVER (ORDER BY count(1) DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)|origin|      origincityname|count(1)|
+------------------------------------------------------------------------------------------------+------+--------------------+--------+
|                                                                                               1|   ATL|         Atlanta, GA|    2669|
|                                                                                               2|   ORD|         Chicago, IL|    2536|
|                                                                                               3|   DFW|Dallas/Fort Worth...|    2188|
|                                                                                               4|   LAX|     Los Angeles, CA|    1718|
|                                               

In [44]:
# total air time and distance travel by flights
spark.sql('select AirTime, Distance from AirlineTable').show()

+-------+--------+
|AirTime|Distance|
+-------+--------+
|  153.0|   991.0|
|  141.0|  1066.0|
|  103.0|   773.0|
|  220.0|  1979.0|
|   80.0|   529.0|
|   28.0|   190.0|
|   94.0|   563.0|
|   35.0|   192.0|
|   59.0|   316.0|
|  114.0|   793.0|
|   null|   109.0|
|   77.0|   562.0|
|   null|  1045.0|
|   95.0|   677.0|
|   99.0|   733.0|
|   null|   278.0|
|   24.0|    98.0|
|  102.0|   689.0|
|  255.0|  2288.0|
|   null|   373.0|
+-------+--------+
only showing top 20 rows

