# Assignment 2A

**Name:** Nabilah Anuwar

**Student ID:** 31282016

In [10]:
from pyspark.sql.types import *
from pyspark.sql.functions import isnan, when, count, col

# 1. Data Loading, Cleaning, Labelling, and Exploration (45%)

## 1.1 Data Loading

In [1]:
from pyspark import SparkConf
master = "local[*]"
app_name = "31282016 Assignment 2A"
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.config(conf = spark_conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

In [2]:
flightsRawDf = spark.read.csv("flight-delays/flight*.csv", header = True, inferSchema = True)

In [3]:
allColumnFlights = flightsRawDf.count()

## 1.2 Data Cleaning

### Part 1

In [167]:
schema1 = StructType([
    StructField("column_name", StringType()),
    StructField("no_of_row", IntegerType()),
    StructField("missing_percentage", FloatType())
])

def calc_miss(df, schema):
    col_name = df.columns
    total_col = df.count()
    result = []
    # https://www.datasciencemadesimple.com/count-of-missing-nanna-and-null-values-in-pyspark/
    total_miss = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0]
    
    for c in col_name:
        a_row = [c, total_miss[c], round((total_miss[c]/total_col)*100,2)]
        result.append(a_row)
        
    the_df = spark.createDataFrame(result, schema = schema).orderBy(col("missing_percentage").desc(), col("no_of_row").desc())
    return the_df

In [88]:
calc_miss(flightsRawDf, schema1).show(len(flightsRawDf.columns))

+-------------------+---------+------------------+
|        column_name|no_of_row|missing_percentage|
+-------------------+---------+------------------+
|CANCELLATION_REASON|   573213|             98.46|
|      AIRLINE_DELAY|   475831|             81.73|
|LATE_AIRCRAFT_DELAY|   475831|             81.73|
|      WEATHER_DELAY|   475831|             81.73|
|   AIR_SYSTEM_DELAY|   475831|             81.73|
|     SECURITY_DELAY|   475831|             81.73|
|      ARRIVAL_DELAY|    10455|               1.8|
|       ELAPSED_TIME|    10455|               1.8|
|           AIR_TIME|    10455|               1.8|
|            TAXI_IN|     9257|              1.59|
|       ARRIVAL_TIME|     9257|              1.59|
|          WHEELS_ON|     9257|              1.59|
|         WHEELS_OFF|     8891|              1.53|
|           TAXI_OUT|     8891|              1.53|
|     DEPARTURE_TIME|     8633|              1.48|
|    DEPARTURE_DELAY|     8633|              1.48|
|        TAIL_NUMBER|     1462|

#### CANCELLATION_REASON

This would require the flight to be cancelled. If it is not then it wouldn't be filled. This can be checked below.

In [86]:
flightsRawDf.select("CANCELLED").filter(col("CANCELLED")==0).count()

573213

The number above matched on why there are empty rows in the `CANCELLATION_REASON` as the flights listed here isn't cancelled.

#### AIRLINE_DELAY, LATE_AIRCRAFT_DELAY, WEATHER_DELAY, AIR_SYSTEM_DELAY, SECURITY_DELAY

Seems to be empty simultaneously. They are columns based on input not calculations. Thus we can say they are missing due to human error.

#### ELAPSED_TIME

`ELAPSED_TIME` have a dependency on `AIR_TIME`, thus if `AIR_TIME` is missing it is possible that this will cause nulls.

#### ARRIVAL_DELAY

This column have a dependency on `ARRIVAL_TIME` and `SCHEDULED_ARRIVAL`, thus null `ARRIVAL_TIME` can cause nulls in this column

#### AIR_TIME

Though not on calculation based, the information is still based on `WHEELS_ON` and `WHEELS_OFF` thus having nulls in these columns can result in nulls in `AIR_TIME`

#### TAXI_IN, ARRIVAL_TIME, WHEELS_ON

`ARRIVAL_TIME` have dependencies on the other two. If there are nulls in these columns the resulting output will most likely be null. There are equal amount of nulls here which means the missing row does not have any information on any of these columns.

#### TAXI_OUT, WHEELS_OFF

Though not a calculated column, `TAXI_OUT` still depends on `WHEELS_OFF` column. Time duration elapsed between departure cannot be calculated without `WHEELS_OFF` time.

#### DEPARTURE_DELAY, DEPARTURE_TIME

Without `DEPARTURE_TIME`, `DEPARTURE_DELAY` cannot be calculated, again, even though it is not a calculated column.

#### TAIL_NUMBER

This column could be missing at random as it fully depend on human input. 

### Part 2

In [149]:
x = 10

def find_removed_columns(x, flightsRawDf):
    data = calc_miss(flightsRawDf, schema1)
    filtered = data.filter(col("missing_percentage") > x).select("column_name").collect()
    removedColumns = [str(row['column_name']) for row in filtered]
    return removedColumns

In [150]:
def eliminate_columns(removedColumns, flightsRawDf):
    flightsRawDf = flightsRawDf.drop(*[str(x) for x in removedColumns])
    return flightsRawDf

In [154]:
q122 = eliminate_columns(find_removed_columns(x, flightsRawDf), flightsRawDf)
print(f"The number of columns are {len(q122.columns)}")
print(f"The number of rows are {q122.count()}")

The number of columns are 25
The number of rows are 582184


In [170]:
flightsDf = q122.na.drop("any")
print(f"The number of columns are {len(flightsDf.columns)}")
print(f"The number of rows are {flightsDf.count()}")

The number of columns are 25
The number of rows are 571729


## 1.3 Data Labelling

### Part 1

In [175]:
flightsDf = flightsDf.withColumn("binaryArrDelay", when(col("ARRIVAL_DELAY")> 0,1).otherwise(0)).withColumn("binaryDeptDelay", when(col("DEPARTURE_DELAY")>0,1).otherwise(0))

### Part 2

Here we classify:

* negative values or less than 5 as *early* or 0

* the value between 5 and 20 (inclusive) as *on time* or 1

* positive values or more than 20 as *late* or 2

In [182]:
flightsDf = flightsDf.withColumn("multiclassArrDelay", when(col("ARRIVAL_DELAY")> 20,2).when(col("ARRIVAL_DELAY") < 5,0).otherwise(1))\
.withColumn("multiclassDeptDelay", when(col("DEPARTURE_DELAY")> 20,2).when(col("DEPARTURE_DELAY") < 5,0).otherwise(1))