In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *


In [0]:
spark = SparkSession.builder.appName("Data_Processing").getOrCreate()

In [0]:
schema_flights = StructType([StructField('DayofMonth', IntegerType(), False),
                     StructField('DayOfWeek', IntegerType(), False),
                     StructField('Carrier', StringType(), False),
                     StructField('OriginAirportID', IntegerType(), False),
                     StructField('DestAirportID', IntegerType(), False),
                     StructField('DepDelay', IntegerType(), False),
                     StructField('ArrDelay', IntegerType(), False)])
schema_airports = StructType([StructField('airport_id', IntegerType(), False),
                              StructField('city', StringType(), False),
                              StructField('state', StringType(), False),
                              StructField('name', StringType(), False)])


In [0]:
df_flights= spark.read.csv("/FileStore/tables/flights.csv",header=True,schema=schema_flights)

df_airports= spark.read.csv("/FileStore/tables/airports.csv",header=True,schema=schema_airports)


In [0]:
df_flights.schema
df_airports.schema

Out[29]: StructType([StructField('airport_id', IntegerType(), True), StructField('city', StringType(), True), StructField('state', StringType(), True), StructField('name', StringType(), True)])

In [0]:
df_flights.show(5)
df_airports.show(5)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 5 rows

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
|   

In [0]:
df_merge = df_flights.join(df_airports,df_flights.OriginAirportID == df_airports.airport_id)

In [0]:
df_merge.groupby("city").count().show(10)

+-----------------+------+
|             city| count|
+-----------------+------+
|          Phoenix| 89720|
|            Omaha| 13487|
|   Raleigh/Durham| 28301|
|        Anchorage|  7700|
|           Dallas| 19062|
|          Oakland| 25178|
|      San Antonio| 22880|
|     Philadelphia| 47446|
|       Louisville| 10915|
|Dallas/Fort Worth|104270|
+-----------------+------+
only showing top 10 rows



In [0]:
#Drop dublicate data and caluclate how many duplicate data is present
t_1 = df_merge.count()
print(f"Total number of rows : {t_1} ")
t_2 = df_merge.dropDuplicates().count()
print(f"After dropping duplicate : {t_2}")
d_1 = t_1 - t_2
print(f"no of duplacte rows : {d_1}")



Total number of rows : 2702218 
After dropping duplicate : 2696657
no of duplacte rows : 5561


In [0]:
#Delete row if there is at least one column missing data
df_row_md = df_merge.dropDuplicates().dropna(
how="any",subset=["ArrDelay","DepDelay"])

In [0]:
no_of_md = t_1 - df_row_md.count()
print(f"no_of_md are : {no_of_md}")

no_of_md are : 5561


In [0]:
#Fill missing values with mean value
mean_avr = df_merge.groupby().avg("ArrDelay").take(1)[0][0]
mean_dep = df_merge.groupby().avg("DepDelay").take(1)[0][0]
df_clean_data = df_merge.fillna(
{'ArrDelay':mean_avr,'DepDelay':mean_dep})
print(f"mean avg of arrival delay is  :  {mean_avr}")
print(f"mean avg of depature delay is  :  {mean_dep}")


mean avg of arrival delay is  :  6.6550108096386005
mean avg of depature delay is  :  10.510732294729737


In [0]:
df_clean_data.describe('DepDelay','ArrDelay').show()

+-------+------------------+------------------+
|summary|          DepDelay|          ArrDelay|
+-------+------------------+------------------+
|  count|           2702218|           2702218|
|   mean|10.510732294729737|6.6550108096386005|
| stddev|36.029756084661265| 38.54758423679121|
|    min|               -63|               -94|
|    max|              1863|              1845|
+-------+------------------+------------------+



In [0]:
corr = df_clean_data.corr('DepDelay','ArrDelay')
print(f"correlation between the above two columns is {corr}")

correlation between the above two columns is 0.9392560468773448
