# Initial setup

In [14]:
import findspark

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col,isnan, when, count
import pyspark.sql.functions as F

from configparser import ConfigParser

config = ConfigParser()
# create your own config.ini in root of project folder to store project configurations
config.read('config.ini')

pathfile = config.get('main', 'dirty_csv')  

spark = SparkSession.builder \
    .config("spark.driver.memory", "15g") \
    .appName("SparkFlight").getOrCreate()



In [2]:
# data = spark.read.csv(pathfile, inferSchema='true', header='true', mode='PERMISSIVE', encoding='ISO-8859-1').limit(10**6).cache()
data = spark.read.csv(pathfile, inferSchema='true', header='true', mode='PERMISSIVE', encoding='ISO-8859-1')

### Analysis cells

In [20]:
data.show()

+-----------------+-------+--------+--------------+--------+----+--------+---------------------------+------------------+------+-------------+-------+------+-------+-------------+-------------------+-------------------+-------------------+-------------------+
|ActualElapsedTime|AirTime|ArrDelay|CRSElapsedTime|DepDelay|Dest|Distance|LateAircraftAndCarrierDelay|NASAndWeatherDelay|Origin|SecurityDelay|TailNum|TaxiIn|TaxiOut|UniqueCarrier|    CRSDepTimeStamp|       DepTimestamp|       ArrTimestamp|    CRSArrTimestamp|
+-----------------+-------+--------+--------------+--------+----+--------+---------------------------+------------------+------+-------------+-------+------+-------+-------------+-------------------+-------------------+-------------------+-------------------+
|               53|     32|      -8|            65|       4| PIT|     205|                          0|                 0|   DCA|            0| N443US|     7|     14|           US|2002-10-10 15:45:00|2002-10-10 15:49:00|2

In [None]:
# check if read.csv(inferSchema='true') inferred the column types correctly
data.dtypes

In [None]:
data.count()

### {preprocess step} Filter out NA and Null values in all columns saved in var colnames

In [None]:
# filter out rows that have NA values in following columns that are assumed to be important for potential FDs:
colnames = ["ActualElapsedTime", "AirTime", "ArrDelay", "ArrTime", "CRSArrTime", "CRSDepTime", "CRSElapsedTime", "DayOfWeek", "DayofMonth", "DepDelay", "DepTime", "Dest", "Distance", "FlightNum", "Month", "Origin", "TailNum", "TaxiIn", "TaxiOut", "UniqueCarrier", "Year"]
# Specific delay columns like CarrierDelay and WeatherDelay with NA or Nul values need to have these values changed to 0 instead of filtered out.
# Tailnum still has some duplicated that do not have the same right UniqueCarrier
# also has some UNKNOW columns for TailNum. We keep these for soft dependency

# pyspark does not recognize csv "NA" strings as NaN values.
# This is reflected in the output of data.dtypes: columns that normally only have numeric values were not inferred as int type.
# Also remove these rows by filtering them out.
data_filt_nan = data.filter(data.ActualElapsedTime.isNotNull() & 
                            (data.ActualElapsedTime != "NA") &
                            data.AirTime.isNotNull() &
                            (data.AirTime != "NA") &
                            data.ArrDelay.isNotNull() &
                            (data.ArrDelay != "NA") &
                            data.ArrTime.isNotNull() &
                            (data.ArrTime != "NA") &              
                            data.CRSArrTime.isNotNull() & 
                            data.CRSDepTime.isNotNull() &
                            data.CRSElapsedTime.isNotNull() &
                            (data.CRSElapsedTime != "NA") &
                            data.DayOfWeek.isNotNull() &
                            data.DayofMonth.isNotNull() &
                            data.DepDelay.isNotNull() &
                            (data.DepDelay != "NA") &
                            data.DepTime.isNotNull() &
                            (data.DepTime != "NA") &
                            data.Dest.isNotNull() &
                            (data.Dest != "NA") &
                            data.Distance.isNotNull() &
                            (data.Distance != "NA") &
                            data.FlightNum.isNotNull() &
                            data.Month.isNotNull() &
                            data.Origin.isNotNull() &
                            (data.Origin != "NA") &
                            data.TailNum.isNotNull() &
                            (data.TailNum != "NA") &
                            data.TaxiIn.isNotNull() &
                            (data.TaxiIn != "NA") &
                            data.TaxiOut.isNotNull() &
                            (data.TaxiOut != "NA") &
                            data.UniqueCarrier.isNotNull() &
                            (data.UniqueCarrier != "NA") &
                            data.Year.isNotNull())

### Analysis cells

In [None]:
# this suggests the correct amount of rows are removed
data_filt_nan.count()

In [None]:
# in the original dataframe there are still columns with NaN value that should be filtered out
data.select(*colnames).show()

In [None]:
# NaN values in colnames should be filtered out
# TailNum still shows weird ascci signs sometimes.
data_filt_nan.select(*colnames).show(50)

### {preprocess step} Take +- 15 mil rows now before any further transformations

In [None]:
data_limit = data_filt_nan.limit(15_000_000)

### Analysis cells

In [None]:
# data.printSchema()
# data.schema.names

In [None]:
# data_filt_nan

### {preprocess step} Transform certain existing columns that store minutes as string to timestamp type

In [None]:
# transform existing date columns to be able to use them for to_timestamp transformation
# this can then be used to calculate with timestamps for total time difference between two attributes (delta dependencies)
# example: transform 1 minute to hhm:mm -> 00:01
date_cols = ["Year", "Month", "DayofMonth", "CRSDepTime"]
data_timestamp = (data_limit.withColumn('Month', F.when(F.length(F.col('Month')) == 1, F.concat(F.lit('0'), F.col('Month'))).otherwise(F.col('Month')))
                    .withColumn('DayofMonth', F.when(F.length(F.col('DayofMonth')) == 1, F.concat(F.lit('0'), F.col('DayofMonth'))).otherwise(F.col('DayofMonth')))
                    .withColumn('CRSDepTime', F.when(F.length(F.col('CRSDepTime')) == 1, F.concat(F.lit('000'), F.col('CRSDepTime')))
                                        .when(F.length(F.col('CRSDepTime')) == 2, F.concat(F.lit('00'), F.col('CRSDepTime')))
                                        .when(F.length(F.col('CRSDepTime')) == 3, F.concat(F.lit('0'), F.col('CRSDepTime')))
                                        .otherwise(F.col('CRSDepTime')))
                    .withColumn('CRSDepTimeStamp', F.to_timestamp(F.concat(*date_cols), format='yyyyMMddHHmm'))
                 )

### Analysis cells

In [None]:
data_timestamp.show()

### {preprocess step} Transform DepTime with minutes only to full Timestamp with a date as new column using DepDelay

In [None]:
# Deptimestamp calculating by using CRSDepTimeStamp and DepDelay because actual DepTime and CRSDepTime can differ by at most one full day = 1440 mins
data_deptime = data_timestamp.withColumn("DepTimestamp", (col("CRSDepTimeStamp").cast("long") + (col("DepDelay").cast("long"))*60).cast("timestamp"))



### Analysis cells

In [None]:
data_deptime.show(500)

In [None]:
data_deptime.filter(data_deptime.DepDelay > 1300).show()

### {preprocess step} Transform ArrTime with minutes only to full Timestamp with a date as new column by adding ActualElapsedTime to DepTimestamp 

In [None]:
data_arrtime = data_deptime.withColumn("ArrTimestamp", (col("DepTimestamp").cast("long") + (col("ActualElapsedTime").cast("long"))*60).cast("timestamp"))

### Analysis cells

In [None]:
data_arrtime.show(20)

### {preprocess step} Transform CRSArrTime with minutes only to full Timestamp with a date as new column by adding CRSElapsedTime to CRSDepTime 

In [None]:
# CRSArrTimestamp calculating by using CRSDepTimestamp and CRSElapsedTime
data_crs_arrtime = data_arrtime.withColumn("CRSArrTimestamp", (col("CRSDepTimestamp").cast("long") + (col("CRSElapsedTime").cast("long"))*60).cast("timestamp"))

### Analysis cells

In [None]:
data_crs_arrtime.show(20)

In [None]:
#  check if there are duplicates in DepTimestamp for delta dependency
data_crs_arrtime.groupBy("DepTimestamp").count().orderBy(col("count").desc()).show(500)

In [None]:
#  check if there are duplicates in ArrTimestamp for delta dependency
data_crs_arrtime.groupBy("ArrTimestamp").count().orderBy(col("count").desc()).show(500)

### {preprocess step} Filter out rows that have have cancelled == 1 or Diverted == 1 as they leave out very valuable info
### This is done before removing these columns to filter out low information rows first

In [None]:
# Rows with cancelled == 1 or Diverted == 1 were probably already filtered because of filtered out NaN values for the important time columns
# Just to be sure
data_filt_div_canc = data_crs_arrtime.filter((data_crs_arrtime.Cancelled == 0) | (data_crs_arrtime.Diverted == 0))

### Analysis cells

In [None]:
# check if there are no rows left with cancelled == 1
data_filt_div_canc.orderBy(col("Cancelled").desc()).show(20)

### {preprocess step} Drop columns that have already been transformed to other columns or columns that seem less useful
- Original dataset has 28 columns which we want to try to reduce due to computational complexity
- The date columns like Year and Month have already been transformed to CRSDepTimeStamp
- Flightnumber only has a few thousand unique numbers over a million records and seems to have no correlation with others
- CancellationCode, Cancelled and Diverted seem to be very messy and occur only very rarely
    - Cancelled and Diverted only have two possible values: 1 and 0 and 1 only occurs very rarely
    - CancellationCode is also not always set when a flight is cancelled
- After this step number of cols is reduced from 28 -> 21

In [None]:
columns_to_drop = ["CancellationCode", "Cancelled", "Diverted", "Year", "Month", "DayOfWeek", "DayofMonth", "ArrTime", "CRSArrTime", "CRSDepTime", "DepTime", "FlightNum"]
# drop as many (mostly) useless columns as we can
data_drop_cols = data_filt_div_canc.drop(*columns_to_drop)

### Analysis cells

In [None]:
# data_drop_cols.show(20)
len(data_drop_cols.columns)

In [None]:
data_drop_cols.columns

### {preprocess step} Combine several specific delay columns to one column to reduce amount of columns
### Also cast columns that store minutes to int (turned out to be bigint)


In [None]:
# Combine LateAircraftDelay and CarrierDelay into one column.
# Combine NASDelay and WeatherDelay into one column
# Cast columns with string type to bigint (will get transformed later)
def transform_and_cast_cols(x):
    ActualElapsedTime= int(x.ActualElapsedTime)
    AirTime= int(x.AirTime)
    ArrDelay= int(x.ArrDelay)
    CRSElapsedTime= int(x.CRSElapsedTime)
    DepDelay= int(x.DepDelay)
    Dest= x.Dest
    Distance= int(x.Distance)
    LateAircraftDelay = x.LateAircraftDelay
    CarrierDelay = x.CarrierDelay
    
    if LateAircraftDelay == "NA":
        LateAircraftDelay = 0
    if CarrierDelay == "NA":
        CarrierDelay = 0 
    
    LateAircraftAndCarrierDelay = int(LateAircraftDelay) + int(CarrierDelay)
    
    NASDelay = x.NASDelay
    WeatherDelay = x.WeatherDelay
    if NASDelay == "NA":
        NASDelay = 0
    if WeatherDelay == "NA":
        WeatherDelay = 0 
    
    NASAndWeatherDelay= int(NASDelay) + int(WeatherDelay)
    Origin=x.Origin
    SecurityDelay = x.SecurityDelay
    if SecurityDelay == "NA":
        SecurityDelay = 0
        
    SecurityDelay= int(SecurityDelay)
    TailNum=x.TailNum
    TaxiIn=int(x.TaxiIn)
    TaxiOut=int(x.TaxiOut)
    UniqueCarrier=x.UniqueCarrier
    CRSDepTimeStamp=x.CRSDepTimeStamp
    DepTimestamp=x.DepTimestamp
    ArrTimestamp=x.ArrTimestamp
    CRSArrTimestamp=x.CRSArrTimestamp
    return (ActualElapsedTime,
            AirTime,
            ArrDelay,
            CRSElapsedTime,
            DepDelay,
            Dest,
            Distance,
            LateAircraftAndCarrierDelay,
            NASAndWeatherDelay,
            Origin,
            SecurityDelay,
            TailNum,
            TaxiIn,
            TaxiOut,
            UniqueCarrier,
            CRSDepTimeStamp,
            DepTimestamp,
            ArrTimestamp,
            CRSArrTimestamp)


In [None]:
# first transform dataframe to rdd as only rdd has map transform function
rdd_trans_n_cast_cols = data_drop_cols.rdd.map(lambda x: transform_and_cast_cols(x))  

In [None]:
# transform rdd back to dataframe
# have to specify columns again because rdd has to tabular structure
data_trans_n_cast_cols = rdd_trans_n_cast_cols.toDF(['ActualElapsedTime',
                                 'AirTime',
                                 'ArrDelay',
                                 'CRSElapsedTime',
                                 'DepDelay',
                                 'Dest',
                                 'Distance',
                                 'LateAircraftAndCarrierDelay',
                                 'NASAndWeatherDelay',
                                 'Origin',
                                 'SecurityDelay',
                                 'TailNum',
                                 'TaxiIn',
                                 'TaxiOut',
                                 'UniqueCarrier',
                                 'CRSDepTimeStamp',
                                 'DepTimestamp',
                                 'ArrTimestamp',
                                 'CRSArrTimestamp'])

### Analysis cells

In [None]:
data_trans_n_cast_cols.show(20)

In [None]:
data_trans_n_cast_cols.dtypes

### {preprocess step} Cast bigint columns to int

In [None]:
int_col_list = ['ActualElapsedTime', 'AirTime', 'ArrDelay', 'CRSElapsedTime', 'DepDelay', 'Distance', 'LateAircraftAndCarrierDelay', 'NASAndWeatherDelay', 'SecurityDelay', 'TaxiIn', 'TaxiOut']
for col in int_col_list:
    data_trans_n_cast_cols = data_trans_n_cast_cols.withColumn(col, data_trans_n_cast_cols[col].cast('integer'))

### Analysis cells

In [None]:
data_trans_n_cast_cols.dtypes

In [None]:
data_trans_n_cast_cols.show(20)

### Save dataframe to CSV again

In [None]:
# will partition the dataframe into several csv files stored under a .csv directory
# to load in the preprocessed csv again you may use the path to the .csv directory
# spark will automatically load in all csv parts included in the directory
data_trans_n_cast_cols.write.csv('preprocessed_data.csv', header = True)

# started at 19:26

### Test the saved preprocessed CSV

In [2]:
preproc_data = spark.read.csv('preprocessed_data.csv', inferSchema='true', header='true', mode='PERMISSIVE', encoding='ISO-8859-1')

In [5]:
preproc_data.show(200)

+-----------------+-------+--------+--------------+--------+----+--------+---------------------------+------------------+------+-------------+---------+------+-------+-------------+-------------------+-------------------+-------------------+-------------------+
|ActualElapsedTime|AirTime|ArrDelay|CRSElapsedTime|DepDelay|Dest|Distance|LateAircraftAndCarrierDelay|NASAndWeatherDelay|Origin|SecurityDelay|  TailNum|TaxiIn|TaxiOut|UniqueCarrier|    CRSDepTimeStamp|       DepTimestamp|       ArrTimestamp|    CRSArrTimestamp|
+-----------------+-------+--------+--------------+--------+----+--------+---------------------------+------------------+------+-------------+---------+------+-------+-------------+-------------------+-------------------+-------------------+-------------------+
|               53|     32|      -8|            65|       4| PIT|     205|                          0|                 0|   DCA|            0|   N443US|     7|     14|           US|2002-10-10 15:45:00|2002-10-10 15

In [None]:
preproc_data.count()

In [None]:
preproc_data.dtypes

In [None]:
len(preproc_data.columns)

## Correlation filter
This is my proposed solution for batching. The idea is simple:
1) First query the 16.000 elements
2) Then check if two columns are correlated. And exstend from there 
3) ...
4) Profit

In [9]:
from itertools import permutations

def unique_permutations(iterable, r=None):
    previous = tuple()
    for p in permutations(sorted(iterable), r):
        print(f'This is p: {p}')
        print(f'this is previous: {previous}')
        if p > previous:
            previous = p
            yield p

In [13]:
for p in unique_permutations(preproc_data.columns, 2):
    print(f'permutation p: {p}')

This is p: ('ActualElapsedTime', 'AirTime')
this is previous: ()
permutation p: ('ActualElapsedTime', 'AirTime')
This is p: ('ActualElapsedTime', 'ArrDelay')
this is previous: ('ActualElapsedTime', 'AirTime')
permutation p: ('ActualElapsedTime', 'ArrDelay')
This is p: ('ActualElapsedTime', 'ArrTimestamp')
this is previous: ('ActualElapsedTime', 'ArrDelay')
permutation p: ('ActualElapsedTime', 'ArrTimestamp')
This is p: ('ActualElapsedTime', 'CRSArrTimestamp')
this is previous: ('ActualElapsedTime', 'ArrTimestamp')
permutation p: ('ActualElapsedTime', 'CRSArrTimestamp')
This is p: ('ActualElapsedTime', 'CRSDepTimeStamp')
this is previous: ('ActualElapsedTime', 'CRSArrTimestamp')
permutation p: ('ActualElapsedTime', 'CRSDepTimeStamp')
This is p: ('ActualElapsedTime', 'CRSElapsedTime')
this is previous: ('ActualElapsedTime', 'CRSDepTimeStamp')
permutation p: ('ActualElapsedTime', 'CRSElapsedTime')
This is p: ('ActualElapsedTime', 'DepDelay')
this is previous: ('ActualElapsedTime', 'CRSEla

Step 1: Transform all the categorical columns to numbers

In [31]:
import pandas as pd

data_limit = data.limit(16000) #this could be a pre-processing step
indexer =  data_limit.select("*").toPandas()
for col in indexer.select_dtypes(exclude=['number']).columns:
     indexer[col] = pd.Categorical(indexer[col], categories=indexer[col].unique()).codes



In [86]:
def filter_based_on_pvalue(sampledataset):
    def simple_hash(x): 
        return (x * 5399 + 7691 ) % 71
    
    def _filter(dependency):
        lhs, rhs = dependency
        
        # first we construct a column for the left hand side
        sampledataset["lhs"] = 0
        for item in lhs:
            sampledataset["lhs"] = sampledataset['lhs'] * 100_000 + sampledataset[item]
        
        sampledataset["lhs"] = simple_hash(sampledataset["lhs"])
        sampledataset["rhs"] = simple_hash(sampledataset[rhs])
        
        crosstab =  pd.crosstab(sampledataset['lhs'], sampledataset['rhs'])
        _, p, _, _ = stats.chi2_contingency(crosstab)
        
        return p < 0.05
    return _filter

def find_correlected_dependencies_with_spark(dependencies_to_check):
    rdd=spark.sparkContext.parallelize(dependencies_to_check)
    filtered = rdd.filter(filter_based_on_pvalue(indexer))
    return filtered.collect()
    

In [87]:
import hashlib
from scipy import stats

# in this list we add all the elements that must be checked by spark
q = []

# We have already seen these combinations
cachedCombinations = []

foundcorrelations = []

# first we add all the single columns. For instance A->B, B->C BUT NOT B-> A
for p in unique_permutations(data.columns, 2):
    if (tuple([p[-1]]),p[0:-1]) not in cachedCombinations and len(p[0:-1]) > 0:
        cachedCombinations.append((p[0:-1],tuple([p[-1]])))
        q.append((list(p[0:-1]),p[-1]))

# single_correlated is a list with tuples, the 0th index of the tuple is a list for the LHS the
# 1th index is a single column foro the RHS 
single_correlated = find_correlected_dependencies_with_spark(q)  
foundcorrelations = foundcorrelations + single_correlated


# this is where I stop, we now should construct dependecies with 2 LHS columns, and 3 etc.



[(['ActualElapsedTime'], 'AirTime'), (['ActualElapsedTime'], 'ArrDelay'), (['ActualElapsedTime'], 'CRSElapsedTime'), (['ActualElapsedTime'], 'DepDelay'), (['ActualElapsedTime'], 'DepTimestamp'), (['ActualElapsedTime'], 'Dest'), (['ActualElapsedTime'], 'Distance'), (['ActualElapsedTime'], 'Origin'), (['ActualElapsedTime'], 'TaxiIn'), (['ActualElapsedTime'], 'TaxiOut'), (['ActualElapsedTime'], 'UniqueCarrier'), (['AirTime'], 'CRSElapsedTime'), (['AirTime'], 'Dest'), (['AirTime'], 'Distance'), (['AirTime'], 'NASAndWeatherDelay'), (['AirTime'], 'Origin'), (['AirTime'], 'TaxiIn'), (['AirTime'], 'TaxiOut'), (['AirTime'], 'UniqueCarrier'), (['ArrDelay'], 'CRSElapsedTime'), (['ArrDelay'], 'DepDelay'), (['ArrDelay'], 'Dest'), (['ArrDelay'], 'LateAircraftAndCarrierDelay'), (['ArrDelay'], 'NASAndWeatherDelay'), (['ArrDelay'], 'SecurityDelay'), (['ArrDelay'], 'TaxiIn'), (['ArrDelay'], 'TaxiOut'), (['ArrDelay'], 'UniqueCarrier'), (['CRSElapsedTime'], 'DepDelay'), (['CRSElapsedTime'], 'Dest'), (['CR

In [None]:
data_limit.toDF(*data_limit.columns).show()