In [6]:
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import concat_ws, to_date, desc


import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

In [8]:
spark = (SparkSession.builder
.master("local[*]")
.appName('Flight_delay')
.getOrCreate()
)

In [9]:
df = spark.read.csv('flights.csv',     
                    sep=',',
                    header=True)

In [10]:
df.printSchema()

root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- SCHEDULED_TIME: string (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- SCHEDULED_ARRIVAL: string (nullable = true)
 |-- ARRIVAL_TIME: string (nullable = true)
 |-- ARRIVAL_DELAY: string (nullable = true)
 |-- D

In [11]:
df.count()

                                                                                

5819079

In [12]:
df = df.withColumn('DATE', concat_ws("-", "YEAR", "MONTH", 'DAY')).withColumn('DATE', to_date('DATE'))


In [13]:
df = df.withColumn('TAXI_OUT', df['TAXI_OUT'].cast('float'))

In [14]:
df.printSchema()

root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: string (nullable = true)
 |-- TAXI_OUT: float (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- SCHEDULED_TIME: string (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- SCHEDULED_ARRIVAL: string (nullable = true)
 |-- ARRIVAL_TIME: string (nullable = true)
 |-- ARRIVAL_DELAY: string (nullable = true)
 |-- DI

In [15]:
df.createOrReplaceTempView("insurance_df")
df2 = spark.sql('''
    SELECT * FROM(
    SELECT ORIGIN_AIRPORT as o, 
    sum(TAXI_OUT) as sum_out,
    count(TAXI_OUT) as count_out  
    FROM insurance_df 
    WHERE DATE BETWEEN '2015-01-01' AND '2015-09-30'
    GROUP BY ORIGIN_AIRPORT) AS OA
    INNER JOIN
    (SELECT
    DESTINATION_AIRPORT as d,
    sum(TAXI_IN) as sum_in, 
    count(TAXI_IN) as count_in
    FROM insurance_df 
    GROUP BY DESTINATION_AIRPORT)
    ON
    o=d
''')

22/12/04 16:12:19 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [16]:
df2 = df2.withColumn('PROB', (df2.sum_out + df2.count_out)/(df2.count_out + df2.count_in))
df2.sort(desc('PROB')).show()

                                                                                

+---+---------+---------+---+---------+--------+------------------+
|  o|  sum_out|count_out|  d|   sum_in|count_in|              PROB|
+---+---------+---------+---+---------+--------+------------------+
|JFK|2086383.0|    76002|JFK| 802635.0|   91811|12.885682277296754|
|LGA|2167189.0|    79591|LGA| 844765.0|   94918|12.874866052753726|
|PHL|1186655.0|    53110|PHL| 434263.0|   64836| 10.51129330371526|
|ISN|  43228.0|     2116|ISN|  14463.0|    2513| 9.795636206524087|
|BOS|1764813.0|    86750|BOS| 742615.0|  105185| 9.646823143251622|
|EWR|1637769.0|    80024|EWR| 958485.0|   98454| 9.624676430708547|
|MMH|   2424.0|      130|MMH|    592.0|     140| 9.459259259259259|
|STC|   1376.0|       78|STC|    327.0|      77| 9.380645161290323|
|ORD|4475823.0|   227410|ORD|3653600.0|  276527| 9.332978130202783|
|CLT|1587373.0|    80868|CLT| 952931.0|   98971| 9.276302692964263|
|CIU|   9346.0|      481|CIU|   2683.0|     597|  9.11595547309833|
|BRD|   9193.0|      481|BRD|   3075.0|     583|