In [1]:
!pip install pyspark



In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession 

In [4]:
spark=SparkSession.builder.appName('business_case').master('local').getOrCreate()

## Creating a dataframe

In [5]:
data=[('Alice',27,250000),('Jack',22,19000),('Daniel',27,35000)]

In [6]:
#Converting a list into a dataframe
df=spark.createDataFrame(data,['Name','Age','Salary'])

In [7]:
df.show()

+------+---+------+
|  Name|Age|Salary|
+------+---+------+
| Alice| 27|250000|
|  Jack| 22| 19000|
|Daniel| 27| 35000|
+------+---+------+



In [8]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Salary: long (nullable = true)



## Creating a table from DataFrame

In [9]:
#Creating a table named person
df.createOrReplaceTempView("person")

In [10]:
spark.sql('SELECT * FROM PERSON').show()

#spark.sql('SELECT NAME FROM PERSON').show() --> gives us only the name from the table person created
#spark.sql('SELECT NAME,AGE FROM PERSON').show() --> gives the selected columns from tehr table person created

+------+---+------+
|  Name|Age|Salary|
+------+---+------+
| Alice| 27|250000|
|  Jack| 22| 19000|
|Daniel| 27| 35000|
+------+---+------+



## Creating a DataFrame from table

In [11]:
new_df=spark.table('Person') #not case sensitive

In [12]:
new_df.show()

+------+---+------+
|  Name|Age|Salary|
+------+---+------+
| Alice| 27|250000|
|  Jack| 22| 19000|
|Daniel| 27| 35000|
+------+---+------+



## Alernative ways to create a table from a dataframe

In [13]:
df.registerTempTable('person1')

In [14]:
spark.sql('SELECT * FROM PERSON1').show()

+------+---+------+
|  Name|Age|Salary|
+------+---+------+
| Alice| 27|250000|
|  Jack| 22| 19000|
|Daniel| 27| 35000|
+------+---+------+



## Drop Table

In [15]:
spark.catalog.dropTempView('person')

## Basic SQL Functions

In [16]:
spark.sql('SELECT max(SALARY) from person1').show()

+-----------+
|max(SALARY)|
+-----------+
|     250000|
+-----------+



In [17]:
spark.sql('SELECT avg(SALARY) from person1').show()

+------------------+
|       avg(SALARY)|
+------------------+
|101333.33333333333|
+------------------+



In [18]:
spark.sql("SELECT sum(SALARY) as Total_Salary from person1").show()

+------------+
|Total_Salary|
+------------+
|      304000|
+------------+



In [19]:
spark.sql("SELECT * from person1 where Salary > 25000").show()

+------+---+------+
|  Name|Age|Salary|
+------+---+------+
| Alice| 27|250000|
|Daniel| 27| 35000|
+------+---+------+



In [20]:
spark.sql('SELECT * FROM PERSON1 WHERE SALARY >20000 AND AGE >25').show()

+------+---+------+
|  Name|Age|Salary|
+------+---+------+
| Alice| 27|250000|
|Daniel| 27| 35000|
+------+---+------+



In [21]:
#consider two tables
x=[('Ninja',7),('Pirate',1),('Monkey',2),('Spaghetti',4)]
df1=spark.createDataFrame(x,['Name','id'])

In [22]:
y=[('Pirate',5),('Rutubaga',1),('Ninja',3),('Darth Vader',4)]
df2=spark.createDataFrame(y,['Name','id'])

In [23]:
df1.createOrReplaceTempView('table1')
df2.createOrReplaceTempView('table2')

In [24]:
spark.sql('select * from table1 left join table2 on table1.id =table2.id').show()

+---------+---+-----------+----+
|     Name| id|       Name|  id|
+---------+---+-----------+----+
|    Ninja|  7|       null|null|
|   Pirate|  1|   Rutubaga|   1|
|   Monkey|  2|       null|null|
|Spaghetti|  4|Darth Vader|   4|
+---------+---+-----------+----+



In [25]:
spark.sql('select * from table1 right join table2 on table1.id =table2.id').show()

+---------+----+-----------+---+
|     Name|  id|       Name| id|
+---------+----+-----------+---+
|     null|null|     Pirate|  5|
|   Pirate|   1|   Rutubaga|  1|
|     null|null|      Ninja|  3|
|Spaghetti|   4|Darth Vader|  4|
+---------+----+-----------+---+



In [26]:
#left anti join 
spark.sql('select * from table1 left anti join table2 on table1.id =table2.id').show()

+------+---+
|  Name| id|
+------+---+
| Ninja|  7|
|Monkey|  2|
+------+---+



In [27]:
#left anti join 
spark.sql('select * from table1 left semi join table2 on table1.id =table2.id').show()

+---------+---+
|     Name| id|
+---------+---+
|   Pirate|  1|
|Spaghetti|  4|
+---------+---+



In [51]:
df2=spark.read.format('csv').option('header','true').option('path','file:///home/gautham_reddy096/airlines.csv').load()

In [52]:
df2.show()

+---+----+-------+-----+----------+---------+----------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+------+--------------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+----+--------------------+---------+-------------+--------------+-------+----------+-------+--------+---------------+--------+--------------------+----------+-------+---------+--------+------+----------+-------+--------+---------------+--------+------------------+----------+---------+----------------+--------+--------------+-----------------+-------+-------+--------+-------------+------------+------------+--------+-------------+-----------------+------------+-------------+---------------+------------------+--------------+--------------------+-----------+-----------+-----------+-------------+----------------+------------+--------------+--

In [53]:
df2.registerTempTable('airlines')

In [56]:
spark.sql("select * from airlines limit 10").show(5)

+---+----+-------+-----+----------+---------+----------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+------+------------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+----+--------------------+---------+-------------+--------------+-------+----------+-------+--------+---------------+--------+--------------------+----------+-------+---------+--------+------+----------+-------+--------+---------------+--------+------------------+----------+---------+----------------+--------+--------------+-----------------+-------+-------+--------+-------------+------------+------------+--------+-------------+-----------------+------------+-------------+---------------+------------------+--------------+--------------------+-----------+-----------+-----------+-------------+----------------+------------+--------------+----

In [57]:
spark.sql("select origin,destcityname from airlines").show(5)

+------+--------------------+
|origin|        destcityname|
+------+--------------------+
|   MSP|  Salt Lake City, UT|
|   MKE|         Orlando, FL|
|   GJT|Dallas/Fort Worth...|
|   LAX|         Detroit, MI|
|   EWR|       Charlotte, NC|
+------+--------------------+
only showing top 5 rows



In [69]:
spark.sql('select round(avg(DepDelayMinutes),2) as avg_dep_delay from airlines').show(5)

+-------------+
|avg_dep_delay|
+-------------+
|        10.85|
+-------------+



In [62]:
spark.sql("Select flight_number_reporting_airline,depdelay+arrdelay as total_delay from airlines").show(5)

+-------------------------------+-----------+
|flight_number_reporting_airline|total_delay|
+-------------------------------+-----------+
|                            675|       42.0|
|                            671|       -2.0|
|                           3297|       11.0|
|                           1806|      -20.0|
|                            465|       83.0|
+-------------------------------+-----------+
only showing top 5 rows



In [74]:
spark.sql("select origin,count(1) as total, round(avg(DepDelayMinutes),2) as avg_delay,round(max(depdelayminutes),2) as max_delay from airlines where DepDelayMinutes is not null group by origin").show(10)

+------+-----+---------+---------+
|origin|total|avg_delay|max_delay|
+------+-----+---------+---------+
|   BGM|   11|      4.0|     40.0|
|   DLG|    1|      3.0|      3.0|
|   PSE|    3|      7.0|     21.0|
|   MSY|  381|    10.94|     94.0|
|   PPG|    1|      0.0|      0.0|
|   GEG|  117|     7.73|      9.0|
|   BUR|  218|     6.58|     98.0|
|   SNA|  331|     8.59|     91.0|
|   GRB|   42|      3.9|     56.0|
|   GTF|   27|     8.56|     77.0|
+------+-----+---------+---------+
only showing top 10 rows



In [76]:
spark.sql("select originstate,count(1) from airlines where cancelled=1 group by originstate").show(10)

+-----------+--------+
|originstate|count(1)|
+-----------+--------+
|         SC|       8|
|         AZ|      12|
|         LA|       5|
|         MN|      26|
|         NJ|      27|
|         OR|       5|
|         VA|      41|
|         RI|       4|
|         KY|      15|
|         WY|       1|
+-----------+--------+
only showing top 10 rows



In [86]:
spark.sql("select origin,sum(diverted),rank() over(order by sum(diverted) desc) as ranking from airlines group by origin").show()

+------+-----------------------------+-------+
|origin|sum(CAST(diverted AS DOUBLE))|ranking|
+------+-----------------------------+-------+
|   DEN|                          6.0|      1|
|   ATL|                          6.0|      1|
|   ORD|                          6.0|      1|
|   MSP|                          6.0|      1|
|   DFW|                          5.0|      5|
|   FLL|                          5.0|      5|
|   CVG|                          4.0|      7|
|   CLT|                          4.0|      7|
|   LAX|                          4.0|      7|
|   MCO|                          4.0|      7|
|   CMH|                          3.0|     11|
|   SEA|                          3.0|     11|
|   PDX|                          3.0|     11|
|   BWI|                          3.0|     11|
|   STL|                          3.0|     11|
|   PIT|                          3.0|     11|
|   PSG|                          2.0|     17|
|   DCA|                          2.0|     17|
|   IAH|     

## CASE STUDY

In [88]:
#Calculting year wise performance 
spark.sql("select year,count(1) as total_flight, count(Arrdelay ==0) as on_time_arr_count,sum(cancelled) as tot_cancel,sum(diverted) as diverted_count from airlines group by year").show()

+----+------------+-----------------+----------+--------------+
|year|total_flight|on_time_arr_count|tot_cancel|diverted_count|
+----+------------+-----------------+----------+--------------+
|1987|         339|              336|       3.0|           0.0|
|2016|        1502|             1484|      17.0|           1.0|
|2020|         462|              438|      24.0|           0.0|
|2012|        1519|             1497|      20.0|           2.0|
|1988|        1310|             1293|      15.0|           2.0|
|2019|        1900|             1862|      34.0|           4.0|
|2017|        1449|             1421|      22.0|           6.0|
|2014|        1513|             1476|      31.0|           6.0|
|2013|        1690|             1663|      22.0|           5.0|
|2005|        1812|             1776|      33.0|           3.0|
|2000|        1477|             1432|      42.0|           3.0|
|2002|        1371|             1360|      11.0|           0.0|
|2009|        1604|             1578|   

In [100]:
#Calculating year wise performance in percentage
spark.sql("select origincityname,count(1) as total_flight\
          ,count(Arrdelay ==0) as total_on_time,round(count(Arrdelay ==0)*100/count(1),0) as on_time_arr_perc\
          ,sum(cancelled),round(sum(cancelled)*100/count(1),0) as cancelled_perc, \
          sum(diverted) as diverted_count, round(sum(diverted)*100/count(1),0) as diverted_perc\
          from airlines group by origincityname").show()

+------------------+------------+-------------+----------------+------------------------------+--------------+--------------+-------------+
|    origincityname|total_flight|total_on_time|on_time_arr_perc|sum(CAST(cancelled AS DOUBLE))|cancelled_perc|diverted_count|diverted_perc|
+------------------+------------+-------------+----------------+------------------------------+--------------+--------------+-------------+
|   Gainesville, FL|          18|           18|           100.0|                           0.0|           0.0|           0.0|          0.0|
|      Richmond, VA|         146|          141|            97.0|                           5.0|           3.0|           0.0|          0.0|
|        Tucson, AZ|         157|          155|            99.0|                           2.0|           1.0|           0.0|          0.0|
|       Ontario, CA|         270|          266|            99.0|                           3.0|           1.0|           1.0|          0.0|
|        Pierre, SD|

In [104]:
spark.sql("select origin,origincityname,count(*) as total from airlines group by origin,origincityname").show()

+------+--------------------+-----+
|origin|      origincityname|total|
+------+--------------------+-----+
|   COS|Colorado Springs, CO|  100|
|   SDF|      Louisville, KY|  175|
|   CLL|College Station/B...|   10|
|   PIR|          Pierre, SD|    1|
|   MSN|         Madison, WI|   81|
|   EVV|      Evansville, IN|   18|
|   CMI|Champaign/Urbana, IL|   10|
|   GTR|        Columbus, MS|    2|
|   RIC|        Richmond, VA|  146|
|   STX|   Christiansted, VI|   11|
|   FLO|        Florence, SC|    4|
|   DCA|      Washington, DC|  704|
|   IPL|       El Centro, CA|    1|
|   CKB|Clarksburg/Fairmo...|    1|
|   GUM|            Guam, TT|    4|
|   GRI|    Grand Island, NE|    3|
|   ITO|            Hilo, HI|   26|
|   MSP|     Minneapolis, MN| 1128|
|   AVL|       Asheville, NC|   29|
|   CHS|      Charleston, SC|  109|
+------+--------------------+-----+
only showing top 20 rows



In [105]:
spark.sql("select count(1),rank() over(order by count(1) desc),origin,origincityname from airlines group by origin,origincityname ").show()

+--------+------------------------------------------------------------------------------------------------+------+--------------------+
|count(1)|RANK() OVER (ORDER BY count(1) DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)|origin|      origincityname|
+--------+------------------------------------------------------------------------------------------------+------+--------------------+
|    2669|                                                                                               1|   ATL|         Atlanta, GA|
|    2536|                                                                                               2|   ORD|         Chicago, IL|
|    2188|                                                                                               3|   DFW|Dallas/Fort Worth...|
|    1718|                                                                                               4|   LAX|     Los Angeles, CA|
|    1592|                                      

In [110]:
spark.sql("select distance,airtime from airlines order by distance desc,airtime desc").show()

+--------+-------+
|distance|airtime|
+--------+-------+
|   999.0|  173.0|
|   999.0|  159.0|
|   999.0|  158.0|
|   999.0|  154.0|
|   999.0|  152.0|
|   999.0|  136.0|
|   999.0|  133.0|
|   999.0|  132.0|
|   999.0|  129.0|
|   999.0|  126.0|
|   999.0|  124.0|
|   999.0|  123.0|
|   999.0|  120.0|
|   999.0|   null|
|   999.0|   null|
|   999.0|   null|
|   999.0|   null|
|   998.0|  166.0|
|   998.0|  157.0|
|   998.0|  155.0|
+--------+-------+
only showing top 20 rows



In [111]:
df3=spark.sql("select airtime/60 as airtime_hour, distance from airlines")

In [113]:
df3.registerTempTable("airtime")

In [118]:
spark.sql("select airtime_hour,Distance,Distance/airtime_hour as nmph from airtime ").show()

+------------------+--------+------------------+
|      airtime_hour|Distance|              nmph|
+------------------+--------+------------------+
|              2.55|   991.0| 388.6274509803922|
|              2.35|  1066.0| 453.6170212765957|
|1.7166666666666666|   773.0|450.29126213592235|
|3.6666666666666665|  1979.0| 539.7272727272727|
|1.3333333333333333|   529.0|            396.75|
|0.4666666666666667|   190.0| 407.1428571428571|
|1.5666666666666667|   563.0|359.36170212765956|
|0.5833333333333334|   192.0| 329.1428571428571|
|0.9833333333333333|   316.0|321.35593220338984|
|               1.9|   793.0| 417.3684210526316|
|              null|   109.0|              null|
|1.2833333333333334|   562.0| 437.9220779220779|
|              null|  1045.0|              null|
|1.5833333333333333|   677.0| 427.5789473684211|
|              1.65|   733.0|444.24242424242425|
|              null|   278.0|              null|
|               0.4|    98.0|             245.0|
|               1.7|