# CZĘŚĆ 1 - OPTYMALIZACJA

## OPIS ZESTAWU DANYCH
Dane składają się z informacji o przylotach i odlotach wszystkich lotów komercyjnych w USA od października 1987 do kwietnia 2008 – przede wszystkim o ich opóźnieniach. \
Zbiór danych jest bardzo duży (120mln rekordów, 12GB danych) – na potrzeby projektu wykorzystamy jedynie dane z roku 2007 co ograniczy rozmiar przetwarzanych danych.

In [12]:
from typing import Union

import pandas as pd
import numpy as np
from pandas import DataFrame, Series
from sklearn import preprocessing
import pyarrow as pa
from matplotlib.pyplot import figure
from matplotlib import pyplot as plt
import statistics
from sklearn.impute import KNNImputer
import seaborn as sns
import glob
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnull, when, count, col, hour, mean, lit, stddev,abs
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, DecisionTreeRegressor
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, MultilayerPerceptronClassifier
from pyspark.sql.types import IntegerType, StructField, LongType, DoubleType, StructType, StringType

#### OPIS KLAS
W sumie klas jest 29, opisują one następujące informacje:
* rok
* miesiąc
* dzień miesiąca
* dzień tygodnia
* rzeczywisty czas odlotu
* zaplanowany czas odlotu
* rzeczywisty czas przylotu
* zaplanowany czas przylotu
* kod przewoźnika
* numer lotu
* numer ogonowy samolotu
* całkowity czas lotu w minutach
* rzeczywisty czas lotu
* całkowity czas w powietrzu
* opóźnienie lotu w minutach
* miejsce startu
* miejsce docelowe
* odległość w milach
* dane dotyczące przyjazdu taksówki
* informacje o tym czy lot był anulowany
* powód anulowania (pogoda, przewoźnik, ochrona, NAS)
* przekierowanie (tak/nie)
* opóźnienie przewoźnika w minutach
* opóźnienie pogodowe w minutach
* opóźnienie NAS w minutach
* opóźnienie z powodów bezpieczeństwa w minutach
* sumaryczne opóźnienie w minutach

Celem projektu jest przewidywanie sumarycznego opóźnienia samolotu - zmienna objaśniana - na podstawie podzbioru pozostałych kolumn (zmiennych objaśniających), które wybierzemy na podstawie dalszej analizy.

In [13]:
columns =  ["Year", "Month", "DayofMonth", "DayOfWeek", "DepTime", "CRSDepTime", "ArrTime", "CRSArrTime", "UniqueCarrier", "FlightNum", "TailNum", "ActualElapsedTime", "CRSElapsedTime", "AirTime", "ArrDelay", "DepDelay", "Origin", "Dest", "Distance", "TaxiIn", "TaxiOut", "Cancelled", "CancellationCode", "Diverted", "CarrierDelay", "WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay"]

In [14]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Project").config("spark.driver.memory", "15g").getOrCreate()
sparkDF = spark.read.orc("data2006-2008.orc")

In [15]:
sparkDF = spark.createDataFrame(data = sparkDF.rdd, schema = columns)

In [16]:
sparkDF = sparkDF.filter(sparkDF.Year!='Year')

In [17]:
for c in columns:
    if c in ["UniqueCarrier", "TailNum", "Origin", "Dest"]:
        sparkDF = sparkDF.withColumn(c,col(c).cast(StringType()))
    else:
        sparkDF = sparkDF.withColumn(c,col(c).cast(IntegerType()))
sparkDF.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: integer (nullable = true)
 |-- Diverted: integer (nullable = true

In [18]:
print(sparkDF.limit(5).toPandas())

   Year  Month  DayofMonth  DayOfWeek  DepTime  CRSDepTime  ArrTime  \
0  2008      1           3          4     1343        1325     1451   
1  2008      1           3          4     1125        1120     1247   
2  2008      1           3          4     2009        2015     2136   
3  2008      1           3          4      903         855     1203   
4  2008      1           3          4     1423        1400     1726   

   CRSArrTime UniqueCarrier  FlightNum  ... TaxiIn  TaxiOut  Cancelled  \
0        1435            WN        588  ...      4        9          0   
1        1245            WN       1343  ...      3        8          0   
2        2140            WN       3841  ...      2       14          0   
3        1205            WN          3  ...      5        7          0   
4        1710            WN         25  ...      6       10          0   

   CancellationCode  Diverted  CarrierDelay WeatherDelay NASDelay  \
0               NaN         0          16.0          0.0   

In [19]:
# KRZYSZTOF - poprwić żeby działało na sparku :)
# for column in ['DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime']:
#     sparkDF[column] = sparkDF[column] // 100 + (sparkDF[column] % 100) / 60

  Jak widać w powyższej tabeli, niektóre kolumny zawierają dane tekstowe - UniqueCarrier, TailNum, Origin, Dest i CancellationCode. \
Z racji tego, że w projekcie chcielibyśmy się skupić na powiązaniach między opóźnieniami/odwołaniami lotów, a momentem ich odbywania, część danych będzie nam zbędna. Dlatego też zdecydowaliśmy się na usunięcie kolumn:
- UniqueCarrier - indywidualny kod przewoźnika
- TailNum - numer ogonowy
- Origin - miejsce rozpoczęcia podróży
- Dest - cel podróży
- CancellationCode - kod odwołania

Ponadto usuwamy także poniższe kolumny:
- FlightNum - ponieważ pełni on rolę numeru ID, więc nie będzie miało większego sensu uwzględnianie go w modelu.
- TaxiIn, TaxiOut - ponieważ dane na temat taksówki naszym zdaniem nie mają wpływu na opóźnienie/odwołanie lotu.

In [20]:
sparkDF.drop('UniqueCarrier', 'TailNum', 'Origin', 'Dest','CancellationCode', 'FlightNum', 'TaxiIn', 'TaxiOut')

DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, DepTime: int, CRSDepTime: int, ArrTime: int, CRSArrTime: int, ActualElapsedTime: int, CRSElapsedTime: int, AirTime: int, ArrDelay: int, DepDelay: int, Distance: int, Cancelled: int, Diverted: int, CarrierDelay: int, WeatherDelay: int, NASDelay: int, SecurityDelay: int, LateAircraftDelay: int]

DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, DepTime: int, CRSDepTime: int, ArrTime: int, CRSArrTime: int, ActualElapsedTime: int, CRSElapsedTime: int, AirTime: int, ArrDelay: int, DepDelay: int, Distance: int, Cancelled: int, Diverted: int, CarrierDelay: int, WeatherDelay: int, NASDelay: int, SecurityDelay: int, LateAircraftDelay: int]

In [21]:
sparkDF.Cancelled

Column<'Cancelled'>

Column<'Cancelled'>

In [22]:
cancelledDF = sparkDF.filter(sparkDF.Cancelled==1)

In [24]:
notCancelledDF = sparkDF.filter(sparkDF.Cancelled==0)

In [25]:
pandasDF = cancelledDF.sample(fraction=0.2).toPandas()

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 54354)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "C:\Users\klaud\Desktop\MOW\BigDataMachineLearning\venv\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\klaud\AppData\Local\Programs\Python\Python37\lib\socket.py", line 589, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\klaud\Desktop\MOW\BigDataMachineLearning\venv\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\klaud\Desktop\MOW\BigDataMachineLearning\venv\lib\site-packages\py4j\clien

ConnectionRefusedError: [WinError 10061] Nie można nawiązać połączenia, ponieważ komputer docelowy aktywnie go odmawia

In [None]:
pandasDFNC = notCancelledDF.sample(fraction=0.2).toPandas()

In [None]:
with pd.option_context('display.float_format', '{:.2f}'.format, 'display.max_rows', None, 'display.max_columns', None):

    display(pandasDF.describe())

Jak widać w powyższej tabeli dane obejmują okres od stycznia do czerwca 2007 i są dość równo rozłożone w tym okresie - średni miesiąc to między marcem a kwietniem, dni miesiąca oraz dni tygodnia mają równo rozłożone kwartyle. \



## WIZUALIZACJA DANYCH

### BOXPLOTY

In [None]:
columns_for_visualisation = [column for column in pandasDF.columns if column != 'Year']

In [None]:
fig, axes = plt.subplots(nrows=5, ncols=4, figsize=(20, 20))
axes = axes.flatten()
for i, column in enumerate(columns_for_visualisation):
    sns.boxplot(pandasDF[column], ax=axes[i], orient='v')
    axes[i].set_title(column, fontsize=15)
    axes[i].set_ylabel('')
    axes[i].set_xticks([])
plt.show()

#### TO DO: Dodać komentarz dotyczący boxplotów

### HISTOGRAMY

In [None]:
fig, axes = plt.subplots(nrows=3, ncols=5, figsize=(20, 10))
axes = axes.flatten()
for i, column in enumerate(columns_for_visualisation):
    sns.histplot(pandasDF[column], ax=axes[i], bins=20)
plt.show()

#### TO DO: Dodać komentarz dotyczący histogramów

### SCATTER PLOT

In [None]:
columns_for_scatter = ['DepTime', 'CRSDepTime',
       'ArrTime', 'CRSArrTime']

In [None]:
#scatter plot matrix
sns.pairplot(pandasDF, vars=columns_for_scatter)

#### TO DO: Dodać komentarz dotyczący scatterplot

## PROBLEMY Z DANYMI - DANE BRAKUJĄCE, NIEPRAWIDŁOWE, ODSTAJĄCE

### SPRAWDZENIE POPRAWNOŚCI TYPÓW DANYCH

In [None]:
table = pa.Table.from_pandas(pandasDF)
print(table.schema)

Jak widać wyżej - wszystkie dane występują w poprawnym formacie.

### NAPRAWA WIERSZY Z PUSTYMI DANYMI

In [None]:
# Obliczenie ilosci pustych danych
# Obliczenie ilosci pustych danych
# # KRZYSZTOF - zrobic usuwanie pustych w sparku
# np.sum(data.isna())

In [None]:
# sizeBeforeDeleteNull= data.count()
# dataWithoutNull = data.dropna()
# sizeAfterDeleteNull =  dataWithoutNull.count()
# print("Usunieto: ", sizeBeforeDeleteNull - sizeAfterDeleteNull)
#
# print("Percent od reduced rows: ", 100*sum(sizeBeforeDeleteNull - sizeAfterDeleteNull)/sum(sizeBeforeDeleteNull))

In [None]:
### TO DO: ZMIENIC SPOSÓB RADZENIA SOBIE Z PUSTYMI DANYMI
### TO DO: Dodać komentarz dotyczący danych pustych

### USUWANIE DANYCH ODSTAJĄCYCH

In [None]:
# sizeBefore = np.shape(data)[0]
# for col in data.columns:
#     data = data[np.abs(data[col]-data[col].mean()) <= (3*data[col].std())]
# sizeAfter =  np.shape(data)[0]
# print("Count of reduced rows: ", sizeBefore - sizeAfter)
# print("Percent od reduced rows: ", 100*(sizeBefore - sizeAfter)/sizeBefore)

#### TO DO: Dodać komentarz dotyczący danych odstających

# KORELACJE

In [None]:
plt.figure(figsize=(20, 20), dpi=80)
corrMatrix = sparkDF.corr()
sns.heatmap(corrMatrix, annot=True)
plt.show()

#### TO DO: Dodać komentarz dotyczący korelacji między danymi, ew dodać pairploty do wybranych danych

In [None]:
### TO DO: "Normalizacja danych (przedstawić wyniki min-max i standaryzacji). Zastanowić się nad zakresem skalowania danych"


## REDUKCJA WYMIAROWOŚCI

In [None]:
### TO DO: Genetyczna optymalizacja cech

In [None]:
# Genetyczna optymalizacja cech - NA RAZIE SAMA SELEKCJA NA PODSTAWIE KORELACJI
sparkDF = sparkDF.filter(["DayOfWeek", "DayofMonth", "Distance", "DepTime", "Cancelled", "Diverted", "LateAircraftDelay"], axis = 1)

# PRZYGOTOWANIE MODELI REGRESJI

### PODZIAŁ DANYCH

In [None]:

vector = VectorAssembler(inputCols = ['DayOfWeek', 'DayofMonth', 'Distance', 'DepTime', 'Cancelled', 'Diverted'
], outputCol = 'features')
vectorData = vector.transform(sparkDF).select(['features', 'LateAircraftDelay'])
vectorData.plot.bar(x = 'parameter', y= 'Coefficients')
plt.show()

vectorData.show(5)

In [None]:
train_df, test_df = sparkDF.randomSplit([0.7, 0.3])
print("train.rows: ", train_df.count())
print("test.rows: ", test_df.count())

In [None]:
### MIARY SKUTECZNOŚCI

evaluatorRMSE = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
evaluatorR2 = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")


def effectivenessMeasures(model, predictions):
    print("Coefficients: " + str(model.coefficients))
    print("Intercept: " + str(model.intercept))
    print(predictions.show(5))
    rmse = evaluatorRMSE.evaluate(predictions)
    r2 = evaluatorR2.evaluate(predictions)
    print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
    print("R2 on test data = %g" % r2)


### LOGISTIC REGRESSION

In [None]:

linear_reg = LinearRegression(featuresCol = 'features', labelCol='label', maxIter=20, regParam=0.3, elasticNetParam=0.8)
linear_model = linear_reg.fit(train_df)
linear_predictions = linear_model.transform(test_df)
effectivenessMeasures(linear_model, linear_predictions)

## RANDOM FOREST regressor

In [None]:
rf = RandomForestRegressor(featuresCol = 'features', labelCol='label')

rf_model = rf.fit(train_df)
rf_predictions = rf_model.transform(test_df)

effectivenessMeasures(rf_model, rf_predictions)

## DECISION TREE REGRESSOR

In [None]:
dt = DecisionTreeRegressor(featuresCol = 'features', labelCol='label')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)

effectivenessMeasures(dt_model, dt_predictions)

# KLASYFIKACJA

### MIARY SKUTECZNOŚCI - KLASYFIKACJA

In [None]:

def effectivenessMeasuresClassifier(predictions):
    predictionCol = "features"
    labelCol = "Cancelled"
    acc=MulticlassClassificationEvaluator(predictionCol=predictionCol, labelCol=labelCol, metricName='accuracy').evaluate(predictions)
    print("Prediction Accuracy: ", acc)

    f1=MulticlassClassificationEvaluator(predictionCol=predictionCol, labelCol=labelCol, metricName='f1').evaluate(predictions)
    print("F1: ", f1)

    precision=MulticlassClassificationEvaluator(predictionCol=predictionCol, labelCol=labelCol, metricName='weightedPrecision').evaluate(predictions)
    print("Precision: ", precision)

    recall=MulticlassClassificationEvaluator(predictionCol=predictionCol, labelCol=labelCol, metricName='weightedRecall').evaluate(predictions)
    print("Recall: ", recall)

    y_pred=predictions.select(predictionCol).collect()
    y_orig=predictions.select(labelCol).collect()

    cm = confusion_matrix(y_orig, y_pred)
    print("Confusion Matrix:")
    print(cm)

    return acc, f1, precision, recall


### DECISION TREE CLASSIFIER

In [None]:

(trainingData, testData) = vectorData.randomSplit([0.7, 0.3])
dtc = DecisionTreeClassifier(featuresCol="features", labelCol="Cancelled").fit(trainingData)
pred = dtc.transform(testData)
# pred.show()
eff_dtc = effectivenessMeasuresClassifier(pred)

### RANDOM FOREST CLASSIFIER

In [None]:
rfc = RandomForestClassifier(labelCol="Cancelled", featuresCol="features", numTrees=10).fit(trainingData)
pred = rfc.transform(testData)
eff_rfc = effectivenessMeasuresClassifier(pred)

### MULTILAYER PERCEPTRON CLASSIFIER

In [None]:
mpc = MultilayerPerceptronClassifier(labelCol="Cancelled", featuresCol="features", numTrees=10).fit(trainingData)
pred = mpc.transform(testData)
eff_mpc = effectivenessMeasuresClassifier(pred)

### GBT CLASSIFIER

In [None]:
gbt = GBTClassifier(labelCol="Cancelled", featuresCol="features", numTrees=10).fit(trainingData)
pred = gbt.transform(testData)
eff_gbt = effectivenessMeasuresClassifier(pred)

### ENSAMBLE CLASSIFICATOR - EXTRA TREES CLASSIFIER

In [None]:
from sklearn.ensemble import ExtraTreesClassifier
etr = ExtraTreesClassifier(n_estimators = 100, criterion ='mse', max_features = "auto")
etr_model = etr.fit(trainingData)
pred = etr_model.transform(testData)
eff_etr = effectivenessMeasuresClassifier(pred)

### ENSAMBLE CLASSIFICATOR - VOTING CLASSIFIER

In [None]:
from sklearn.ensemble import VotingClassifier

lr_model = linear_model.LinearClassifier()
dc_model = DecisionTreeClassifier()
rf_model = RandomForestClassifier()
estimators = [('lr', lr_model), ('dc', dc_model), ('rf', rf_model)]
vc = VotingClassifier(estimators)
vc_model = vc.fit(trainingData)
pred = vc_model.transform(testData)
vc_etr = effectivenessMeasuresClassifier(pred)

### ENSAMBLE CLASSIFICATOR - STACKING CLASSIFIER

In [None]:
from sklearn.ensemble import StackingClassifier

lr_model = linear_model.LinearClassifier()
dc_model = DecisionTreeClassifier()
rf_model = RandomForestClassifier()
estimators = [('lr', lr_model), ('dc', dc_model), ('rf', rf_model)]
sc = StackingClassifier(estimators)
sc_model = sc.fit(trainingData)
pred = sc_model.transform(testData)
sc_etr = effectivenessMeasuresClassifier(pred)

## K-KROTNA WALIDAJA KRZYŻOWA

In [None]:
from random import randrange


def cross_validation_split(data, folds):
    dataset = data.copy().to_numpy()
    dataset_split = list()
    dataset_copy = list(dataset)
    fold_size = int(len(dataset) / folds)
    for i in range(folds):
        fold = list()
        while len(fold) < fold_size:
            index = randrange(len(dataset_copy))
            fold.append(dataset_copy.pop(index))
        dataset_split.append(fold)
    return dataset_split

In [None]:
# TO DO STWORZYC FUNKCJE GENERUJACA WYNIKI

In [None]:
# TO DO: optymalizacja parametrow klasyfikatorow

In [None]:
import seaborn as sns
plt.bar(trainingData.columns, etr.feature_importance)
plt.xticks(rotation=40)
plt.xlabel('Feature Labels')
plt.ylabel('Feature Importances')
plt.title('Comparison of different Feature Importances')
plt.show()

figure(figsize=(8, 6), dpi=80)

corrMatrix = sparkDF.corr()
sns.heatmap(corrMatrix, annot=True)
plt.xlabel('Feature Labels')
plt.ylabel('Feature Importances')
plt.title('Comparison of different Feature Importances')
plt.show()

plt.bar(sparkDF.columns[:11], corrMatrix["Cancelled"][:11])
plt.xticks(rotation=40)
plt.xlabel('Feature Labels')
plt.ylabel('Feature Importances')
plt.title('Comparison of different Feature Importances')
plt.show()

In [None]:

size_before_opt = np.shape(sparkDF)[1] - 1
print("Ilosc wymairów zmiennych zależnych przed optymalizacją: ", size_before_opt)
data.drop('season', inplace=True, axis=1)
data.drop('mnth', inplace=True, axis=1)
data.drop('holiday', inplace=True, axis=1)
data.drop('weathersit', inplace=True, axis=1)
data.drop('temp', inplace=True, axis=1)
data.drop('hum', inplace=True, axis=1)
data.drop('windspeed', inplace=True, axis=1)
size_after_opt = np.shape(data)[1] - 1
print("Ilosc wymairów zmiennych zależnych po optymalizacji: ",size_after_opt)

x_train, x_test, y_train, y_test = splitOfData(data, 0.20)

In [None]:
# TO DO: wyniki

In [None]:
from prettytable import PrettyTable

def add_row(table, result, model):
    avg_result = [sum(x) / len(x) for x in zip(*result)]
    table.add_row([model, round(avg_result[0],5), round(avg_result[1],5), round(avg_result[2],5)])

def createSummaryTable(summary_table, results):
    add_row(summary_table, results[0], "Linear Regression")
    add_row(summary_table, results[1], "Polynominal Regression")
    add_row(summary_table, results[2], "Decision Tree Regression")
    add_row(summary_table, results[3], "Random Forrest Regression")
    add_row(summary_table, results[4], "Voting Regressor")
    add_row(summary_table, results[5], "Stacking Regressor")

def createSummary(k_fold = 0, grid_search_optimalization = False):
    summary_table = PrettyTable(['model', 'MSE', 'r2', 'Experience Variance'])
    if (k_fold < 2):
        results = prepare_result(grid_search_optimalization)
        createSummaryTable(summary_table, results)
        print("Summary table for result of regression models:\n", summary_table, "\n\n")
    else:
        results = prepare_result_with_k_fold(k_fold, grid_search_optimalization)
        createSummaryTable(summary_table, results)
        print("Summary table for result of regression models [K_fold: k =",k_fold,"]:\n", summary_table, "\n\n")

In [None]:
createSummary()
createSummary(k_fold = 2)
createSummary(k_fold = 5)
createSummary(k_fold = 10)