## Read data file and create data schema

In [1]:
from pyspark.sql.types import *

#create session
appName = "data preprocessing in Spark"
spark = SparkSession \
    .builder \
    .appName(appName) \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

flightSchema = StructType([
  StructField("DayofMonth", IntegerType(), False),
  StructField("DayOfWeek", IntegerType(), False),
  StructField("Carrier", StringType(), False),
  StructField("OriginAirportID", IntegerType(), False),
  StructField("DestAirportID", IntegerType(), False),
  StructField("DepDelay", IntegerType(), False),
  StructField("ArrDelay", IntegerType(), False),
])

flights = spark.read.csv('dataset/raw-flight-data.csv', 
                         schema=flightSchema, header=True)
flights.show(2)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 2 rows



In [2]:
airportSchema = StructType([
  StructField("airport_id", IntegerType(), False),
  StructField("city", StringType(), False),
  StructField("state", StringType(), False),
  StructField("name", StringType(), False),
])

airports = spark.read.csv('dataset/airports.csv', header=True, 
                          schema=airportSchema)
airports.show(2)

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
+----------+-----------+-----+--------------------+
only showing top 2 rows



Merge two dataFrame (flight and airports), and show how many flights from each City

In [3]:
flightsByOrigin = flights.join(airports,
                               flights.OriginAirportID == 
                               airports.airport_id).groupBy("city").count()
flightsByOrigin.show(3)

+--------------+-----+
|          city|count|
+--------------+-----+
|       Phoenix|90281|
|         Omaha|13537|
|Raleigh/Durham|28436|
+--------------+-----+
only showing top 3 rows



## Handle duplicated data
Drop duplicated data and calculate how many duplicated data

In [4]:
#count the number of original data rows
n1 = flights.count()
print("number of original data rows: ", n1)
#count the number of data rows after deleting duplicated data
n2 = flights.dropDuplicates().count()
print("number of data rows after deleting duplicated data: ", n2)
n3 = n1 - n2
print("number of duplicated data: ", n3)

number of original data rows:  2719418
number of data rows after deleting duplicated data:  2696983
number of duplicated data:  22435


We can also specify the duplicate criterion by certain column

In [5]:
df = spark.createDataFrame([("Rony",27, 168), 
                            ("Rony",15, 165), 
                            ("Rony",27, 168)], 
                           ["name","age","height"])
df.show()
df.dropDuplicates().show()

+----+---+------+
|name|age|height|
+----+---+------+
|Rony| 27|   168|
|Rony| 15|   165|
|Rony| 27|   168|
+----+---+------+

+----+---+------+
|name|age|height|
+----+---+------+
|Rony| 27|   168|
|Rony| 15|   165|
+----+---+------+



In [6]:
df.dropDuplicates(['name']).show()

+----+---+------+
|name|age|height|
+----+---+------+
|Rony| 27|   168|
+----+---+------+



## Handle missing data

1. Delete row if there is at least one (column) missing data

In [7]:
flightsNoMissingValue = flights.dropDuplicates().dropna(
    how="any", subset=["ArrDelay", "DepDelay"])# use how="all" for all column missing data
numberOfMissingValueAny = n1 - flightsNoMissingValue.count()
print("number of missing value rows: ", numberOfMissingValueAny)

number of missing value rows:  46233


2. Fill the missing data using mean value of each corresponding column data

In [8]:
#take mean value
meanArrDelay = flights.groupBy().avg("ArrDelay").take(1)[0][0]
print("mean ArrDelay: ", meanArrDelay)
meanDepDelay = flights.groupBy().avg("DepDelay").take(1)[0][0]
print("mean DepDelay: ", meanDepDelay)
#drop duplicated data and fill missing data with mean value
flightsCleanData=flights.fillna(
    {'ArrDelay': meanArrDelay, 'DepDelay': meanDepDelay})
#just for experiment
flights.groupBy().avg("ArrDelay").show()

mean ArrDelay:  6.63768791455498
mean DepDelay:  10.53686662649788
+----------------+
|   avg(ArrDelay)|
+----------------+
|6.63768791455498|
+----------------+



## Explore the statistic of our data

In [9]:
flightsCleanData.describe('DepDelay','ArrDelay').show()

+-------+------------------+-----------------+
|summary|          DepDelay|         ArrDelay|
+-------+------------------+-----------------+
|  count|           2719418|          2719418|
|   mean|10.531448640848888|6.630879842672218|
| stddev| 35.91695039008075|38.44200618946938|
|    min|               -63|              -94|
|    max|              1863|             1845|
+-------+------------------+-----------------+



We can also calculate the correlation between two variables to know whether the varible is related each other or not

In [10]:
correlation = flightsCleanData.corr('DepDelay', 'ArrDelay')
print("correlation between departure delay and arrival delay: ", 
      correlation)

correlation between departure delay and arrival delay:  0.939353821557276
