# Projeto Final - Limpeza de Dados

## Setup

In [1]:
from pyspark.sql import functions as sf
import pyspark.sql.functions as f

# Modeling
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
# Criar a sessao do Spark
from pyspark.sql import SparkSession
spark = SparkSession \
            .builder \
            .master("local[4]") \
            .appName("nyc_caio_garcia") \
            .config("spark.jars.packages", "org.apache.hadoop:hadoop-azure:3.3.4,com.microsoft.azure:azure-storage:8.6.6") \
            .getOrCreate()

In [3]:
# Acesso aos dados na nuvem
STORAGE_ACCOUNT = 'dlspadseastusprod'
CONTAINER = 'big-data-comp-nuvem'
FOLDER = 'airline-delay'
TOKEN = 'lSuH4ZI9BhOFEhCF/7ZQbrpPBIhgtLcPDfXjJ8lMxQZjaADW4p6tcmiZGDX9u05o7FqSE2t9d2RD+ASt0YFG8g=='

spark.conf.set("fs.azure.account.key." + STORAGE_ACCOUNT + ".blob.core.windows.net", TOKEN)

### Schema
Schema definido de acordo com o dicionário de dados em `projeto_final_dicionário.xlsx`

In [4]:
from pyspark.sql.types import *

labels = (('FL_DATE', TimestampType()),
          ('OP_CARRIER', StringType()),
          ('OP_CARRIER_FL_NUM', IntegerType()),
          ('ORIGIN', StringType()),
          ('DEST', StringType()),
          ('CRS_DEP_TIME', IntegerType()),
          ('DEP_TIME', FloatType()),
          ('DEP_DELAY', FloatType()),
          ('TAXI_OUT', FloatType()),
          ('WHEELS_OFF', FloatType()),
          ('WHEELS_ON', FloatType()),
          ('TAXI_IN', FloatType()),
          ('CRS_ARR_TIME', IntegerType()),
          ('ARR_TIME', FloatType()),
          ('ARR_DELAY', FloatType()),
          ('CANCELLED', FloatType()),
          ('CANCELLATION_CODE', StringType()),
          ('DIVERTED', FloatType()),
          ('CRS_ELAPSED_TIME', FloatType()),
          ('ACTUAL_ELAPSED_TIME', FloatType()),
          ('AIR_TIME', FloatType()),
          ('DISTANCE', FloatType()),
          ('CARRIER_DELAY', FloatType()),
          ('WEATHER_DELAY', FloatType()),
          ('NAS_DELAY', FloatType()),
          ('SECURITY_DELAY', FloatType()),
          ('LATE_AIRCRAFT_DELAY', StringType()))

schema = StructType([StructField(x[0], x[1], True) for x in labels])

In [5]:
# Columns with values in minutes
minute_columns = ["TAXI_OUT","TAXI_IN","DEP_DELAY","ARR_DELAY","AIR_TIME","CRS_ELAPSED_TIME","ACTUAL_ELAPSED_TIME",
                  "CARRIER_DELAY","WEATHER_DELAY","NAS_DELAY","SECURITY_DELAY","LATE_AIRCRAFT_DELAY"]

# Subset from minute columns with no data leak from moment of take off
clean_min_columns = ["TAXI_OUT", "DEP_DELAY", "CRS_ELAPSED_TIME"]

# Columns with time information on format 'hhmm'
# Not proper for numerical manipulation
odd_format_columns = ["CRS_DEP_TIME","DEP_TIME","WHEELS_OFF","WHEELS_ON","ARR_TIME","CRS_ARR_TIME"]

### Carregamento de dados
Dados carregados da nuvem como spark data frame

In [6]:
config = spark.sparkContext._jsc.hadoopConfiguration()
config.set("fs.azure.account.key." + STORAGE_ACCOUNT + ".blob.core.windows.net", TOKEN)
sc = spark.sparkContext

df = spark.read.csv("wasbs://{}@{}.blob.core.windows.net/{}/2009.csv"\
                    .format(CONTAINER, STORAGE_ACCOUNT, FOLDER), header=True, schema=schema)
df.take(2)

[Row(FL_DATE=datetime.datetime(2009, 1, 1, 0, 0), OP_CARRIER='XE', OP_CARRIER_FL_NUM=1204, ORIGIN='DCA', DEST='EWR', CRS_DEP_TIME=1100, DEP_TIME=1058.0, DEP_DELAY=-2.0, TAXI_OUT=18.0, WHEELS_OFF=1116.0, WHEELS_ON=1158.0, TAXI_IN=8.0, CRS_ARR_TIME=1202, ARR_TIME=1206.0, ARR_DELAY=4.0, CANCELLED=0.0, CANCELLATION_CODE=None, DIVERTED=0.0, CRS_ELAPSED_TIME=62.0, ACTUAL_ELAPSED_TIME=68.0, AIR_TIME=42.0, DISTANCE=199.0, CARRIER_DELAY=None, WEATHER_DELAY=None, NAS_DELAY=None, SECURITY_DELAY=None, LATE_AIRCRAFT_DELAY=None),
 Row(FL_DATE=datetime.datetime(2009, 1, 1, 0, 0), OP_CARRIER='XE', OP_CARRIER_FL_NUM=1206, ORIGIN='EWR', DEST='IAD', CRS_DEP_TIME=1510, DEP_TIME=1509.0, DEP_DELAY=-1.0, TAXI_OUT=28.0, WHEELS_OFF=1537.0, WHEELS_ON=1620.0, TAXI_IN=4.0, CRS_ARR_TIME=1632, ARR_TIME=1624.0, ARR_DELAY=-8.0, CANCELLED=0.0, CANCELLATION_CODE=None, DIVERTED=0.0, CRS_ELAPSED_TIME=82.0, ACTUAL_ELAPSED_TIME=75.0, AIR_TIME=43.0, DISTANCE=213.0, CARRIER_DELAY=None, WEATHER_DELAY=None, NAS_DELAY=None, SEC

In [7]:
OBSERVACOES = df.count()
assert (OBSERVACOES == 6429338)

A base contem 6429338 observações.

In [8]:
CANCELAMENTOS = df.filter(df.CANCELLED == 1).count()
assert (CANCELAMENTOS == 87038)

Dos voos na base, 87038 foram cancelados.

## Tratamento de dados faltantes

Das 27 colunas na base de dados, 16 tem valores faltantes. Os dados ausentes na coluna CANCELATION_CODE sao 100% consistentes com a informacao de cancelamento, isto eh, apenas os voos que nao foram cancelados tem a coluna CANCELATION_CODE vazia. As outras 15 colunas podem ser agrupadas em 3 grupos: _Voo_, _Chegada_ e _Atrasos_.

### Resumo

In [9]:
missing_counts = df.select([sf.col(column).isNull().cast("int").alias(column) for column in df.columns]) \
                       .groupBy() \
                       .sum()

In [10]:
missing_counts.toPandas().transpose()

Unnamed: 0,0
sum(FL_DATE),0
sum(OP_CARRIER),0
sum(OP_CARRIER_FL_NUM),0
sum(ORIGIN),0
sum(DEST),0
sum(CRS_DEP_TIME),0
sum(DEP_TIME),82867
sum(DEP_DELAY),82867
sum(TAXI_OUT),85787
sum(WHEELS_OFF),85787


### Cancelamentos

In [11]:
assert (df.filter((df.CANCELLATION_CODE.isNull()) &
                  (df.CANCELLED == 0)).count() ==
        df.filter(df.CANCELLED == 0).count())

Todos os valores faltantes de CANCELLATION_CODE são referentes a voos que não foram cancelados.

### Voo

#### Testes

`DEP_TIME` e `DEP_DELAY`: co-ausentes, todos cancelados

In [12]:
assert (df.filter((df.DEP_TIME.isNull())   &
                  (df.DEP_DELAY.isNull())  &
                  (df.TAXI_OUT.isNull())   &
                  (df.WHEELS_OFF.isNull()) &
                  (df.WHEELS_ON.isNull())  &
                  (df.TAXI_IN.isNull())    &
                  (df.ARR_TIME.isNull())).count() ==
        df.filter(df.DEP_TIME.isNull()).count())

assert (df.filter((df.DEP_TIME.isNull())   &
                  (df.DEP_DELAY.isNull())).count() ==
        df.filter(df.DEP_TIME.isNull()).count())

In [13]:
assert (df.filter((df.DEP_TIME.isNull())   &
                  (df.CANCELLED == 1)).count() ==
        df.filter(df.DEP_TIME.isNull()).count())

`TAXI_OUT` e `WHEELS_OFF`: co-ausentes e cancelados

In [14]:
assert (df.filter((df.TAXI_OUT.isNull())   &
                  (df.WHEELS_OFF.isNull()) &
                  (df.WHEELS_ON.isNull())  &
                  (df.TAXI_IN.isNull())    &
                  (df.ARR_TIME.isNull())).count() ==
        df.filter(df.TAXI_OUT.isNull()).count())

assert (df.filter((df.TAXI_OUT.isNull())   &
                  (df.WHEELS_OFF.isNull())).count() ==
        df.filter(df.TAXI_OUT.isNull()).count())

In [15]:
assert (df.filter((df.TAXI_OUT.isNull())   &
                  (df.CANCELLED == 1)).count() ==
        df.filter(df.TAXI_OUT.isNull()).count())

`WHEELS_ON`, `TAXI_IN` e `ARR_TIME`: co-ausentes

In [16]:
assert (df.filter((df.WHEELS_ON.isNull())  &
                  (df.TAXI_IN.isNull())    &
                  (df.ARR_TIME.isNull())).count() ==
        df.filter(df.TAXI_IN.isNull()).count())

In [17]:
assert (df.filter((df.TAXI_IN.isNull())   &
                  (df.CANCELLED == 1)).count() ==
        df.filter(df.CANCELLED == 1).count())

In [18]:
assert (df.filter((df.TAXI_IN.isNull()) &
                  (df.CANCELLED == 0)   &
                  (df.DIVERTED == 1)).count() ==
        df.filter((df.TAXI_IN.isNull()) &
                  (df.CANCELLED == 0)).count())

In [19]:
assert (df.filter((df.TAXI_IN.isNull()) &
                  (df.CANCELLED == 0)   &
                  (df.DIVERTED == 1)).count() !=
        df.filter(df.DIVERTED == 1).count())

In [20]:
assert (df.filter((df.TAXI_IN.isNotNull()) &
                  (df.DIVERTED == 1)).count() == 13040)

In [21]:
assert (df.filter((df.TAXI_IN.isNull()) &
                  (df.DIVERTED == 1)).count() == 2283)

#### Análise

O grupo _Voo_ apresenta uma relação entre voos cancelados e as 7 variáveis:
 - DEP_TIME
 - DEP_DELAY
 - TAXI_OUT
 - WHEELS_OFF
 - WHEELS_ON
 - TAXI_IN
 - ARR_TIME
 
Os valores faltantes para WHEELS_ON, TAXI_IN e ARR_TIME coincidem nas mesmas observações (com uma exceção descrita mais abaixo). Todos os voos cancelados se encontram dentre essas observações. Os valores faltantes para TAXI_OUT e WHEELS_OFF coincidem nas mesmas observações, todas referentes a voos cancelados. Finalmente, os valores faltantes de DEP_TIME e DEP_DELAY coincidem nas mesmas observações, todas com valores faltantes para TAXI_OUT.

Todos os voos que não foram cancelados mas não tem informação da hora de aterrisagem (`(df.WHEELS_ON.isNull()) & (df.CANCELLED == 0)`) foram redirecionados para um aeroporto diferente do aeroporto destino original (`df.DIVERTED == 1`)

Destas relações, supomos:
 - A diferença entre DEP_TIME e WHEELS_OFF pode ser devido a voos que chegam a sair do chão antes de serem cancelados, e voos que são cancelados após o embarque mas antes da decolagem.
 - Nenhum desses valores faltantes parece implausível o suficiente para assumirmos erro nos dados baseado apenas nessa análise. Alguns desses dados podem vir a ser retirados mesmo assim por questão de propriedades dos algorítmos utilizados mais a frente.

### Chegada

### Atrasos
TODO

#### Testes

In [22]:
assert (df.filter((df.CANCELLED == 0) &
                  (df.DEP_DELAY > 0)).count() == 
        2252608)

In [23]:
assert (df.filter((df.CANCELLED == 0) &
                  (df.ARR_DELAY > 0)).count() ==
        2402990)

In [24]:
assert (df.filter((df.CANCELLED == 0) &
                  ((df.DEP_DELAY > 0) |
                   (df.ARR_DELAY > 0))).count() ==
        3052688)

In [25]:
assert (OBSERVACOES - df.filter(df.CARRIER_DELAY.isNull()).count() ==
        1170501)

In [26]:
assert (df.filter((df.CARRIER_DELAY.isNull()) &
                  (df.WEATHER_DELAY.isNull()) &
                  (df.NAS_DELAY.isNull()) &
                  (df.SECURITY_DELAY.isNull()) &
                  (df.LATE_AIRCRAFT_DELAY.isNull())).count() ==
        df.filter(df.CARRIER_DELAY.isNull()).count())

In [56]:
# assert (df.filter((df.CARRIER_DELAY.isNull()) &
#                   ((df.DEP_DELAY > 0) |
#                    (df.ARR_DELAY > 0))).count() ==
#         df.filter(df.CARRIER_DELAY.isNull()).count())

In [None]:
df.filter(df.CARRIER_DELAY == 0).select(df.OP_CARRIER_FL_NUM, df.CARRIER_DELAY).take(10)

#### Análise

Todos os dados faltantes referentes a categoria de atraso coincidem nas mesmas observações.

Há menos observações com informação sobre a causa do atraso do que voos atrasados, independente se medindo o atraso de saída ou de chegada.

### Anomalia

In [None]:
assert (df.filter((df.ACTUAL_ELAPSED_TIME.isNull()) &
                  (df.AIR_TIME == 0)                &
                  (df.WHEELS_ON.isNull())           &
                  (df.ARR_TIME.isNull())            &
                  (df.ARR_DELAY == 0)               &
                  (df.TAXI_IN == 0)                 &
                  (df.CANCELLED == 0)).count() == 1)

Uma mesma observação é responsavel pela discrepância na quantidade total de valores faltantes entre WHEELS_ON, TAXI_IN e ARR_TIME, e ARR_DELAY, ACTUAL_ELAPSED_TIME e AIR_TIME.
Um valor de `AIR_TIME == 0` nao faz sentido para um voo que não foi cancelado, e o mesmo se aplica a `TAXI_IN == 0`. Ao retirar essa observação da base, a análise de dados faltantes por grupo torna-se mais consistente.

## Consistencia

In [None]:
assert (df.filter(df.AIR_TIME + df.TAXI_IN + df.TAXI_OUT != df.ACTUAL_ELAPSED_TIME).count() == 0)

In [None]:
assert (df.filter((df.CANCELLED == 1) &
                  (df.DIVERTED == 1)).count() == 0)

In [None]:
assert (df.filter((df.DEP_TIME % 1 != 0) | (df.DEP_DELAY % 1 != 0)).count() == 0)

# Modelagem

In [29]:
# This list includes all values not known at the moment of takeoff
# except `ARR_DELAY` which will be used as target variable
take_off_leak = ["WHEELS_ON","TAXI_IN","ARR_TIME","ACTUAL_ELAPSED_TIME","AIR_TIME",
                 "CARRIER_DELAY","WEATHER_DELAY","NAS_DELAY","SECURITY_DELAY","LATE_AIRCRAFT_DELAY"]

In [30]:
take_off_df = df.drop(*take_off_leak)

## Train/Test Split

In [31]:
train_df, test_df = take_off_df.randomSplit([0.8,0.2], seed=42)
toy_df = train_df.sample(False, 0.01, seed=42)

In [32]:
print("Train set count:", train_df.count())
print("Test set count:", test_df.count())
print("Toy set count:", toy_df.count())

Train set count: 5142441
Test set count: 1286897
Toy set count: 51765


## Feature Engineering: One-Hot-Enconding

In [47]:
cat_features = ["OP_CARRIER", "ORIGIN", "DEST"] # "OP_CARRIER_FL_NUM",

indexOutputCols = [x + 'Index' for x in cat_features]

oheOutputCols = [x + 'OHE' for x in cat_features]

stringIndex = StringIndexer(inputCols = cat_features,
                            outputCols = indexOutputCols,
                            handleInvalid = 'skip')

oheEncoder = OneHotEncoder(inputCols = indexOutputCols,
                           outputCols = oheOutputCols)

In [48]:
num_features = ["TAXI_OUT", "DEP_DELAY", "CRS_ELAPSED_TIME", "DISTANCE"]

numVecAssembler = VectorAssembler(inputCols = num_features,
                                  outputCol = 'features',
                                  handleInvalid = 'skip')

stdScaler = StandardScaler(inputCol = 'features',
                           outputCol = 'features_scaled')

## Assembling dos vetores

In [49]:
assembleInputs = oheOutputCols + ['features_scaled']

vecAssembler = VectorAssembler(inputCols = assembleInputs,
                               outputCol = 'features_vector')

In [50]:
stages = [stringIndex, oheEncoder, numVecAssembler, stdScaler, vecAssembler]

## Criação do Pipeline

In [51]:
# Criacao do pipeline de transformacao
transform_pipeline = Pipeline(stages=stages)

# Aplicacao do pipeline nos dados de treino
fitted_transformer = transform_pipeline.fit(train_df)
transformed_train_df = fitted_transformer.transform(train_df)

transformed_train_df.limit(10).toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,FL_DATE,OP_CARRIER,OP_CARRIER_FL_NUM,ORIGIN,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,...,DISTANCE,OP_CARRIERIndex,ORIGINIndex,DESTIndex,OP_CARRIEROHE,ORIGINOHE,DESTOHE,features,features_scaled,features_vector
0,2009-01-01,9E,2102,AUS,IND,1655,1650.0,-5.0,19.0,1709.0,...,920.0,10.0,44.0,45.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[19.0, -5.0, 155.0, 920.0]","[1.7984320895620953, -0.1581283862793019, 2.22...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,2009-01-01,9E,2103,IND,AUS,1445,1450.0,5.0,14.0,1504.0,...,920.0,10.0,45.0,44.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[14.0, 5.0, 150.0, 920.0]","[1.3251604870457545, 0.1581283862793019, 2.149...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,2009-01-01,9E,2109,MSP,OKC,2130,2126.0,-4.0,34.0,2200.0,...,695.0,10.0,12.0,59.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[34.0, -4.0, 133.0, 695.0]","[3.218246897111118, -0.12650270902344152, 1.90...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,2009-01-01,9E,2110,OKC,MSP,1745,1740.0,-5.0,8.0,1748.0,...,695.0,10.0,59.0,12.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[8.0, -5.0, 132.0, 695.0]","[0.7572345640261454, -0.1581283862793019, 1.89...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,2009-01-01,9E,2114,ALO,MSP,700,815.0,75.0,16.0,831.0,...,166.0,10.0,276.0,12.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[16.0, 75.0, 69.0, 166.0]","[1.5144691280522908, 2.3719257941895284, 0.988...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
5,2009-01-01,9E,2118,MSP,GRR,1310,1337.0,27.0,14.0,1351.0,...,408.0,10.0,12.0,78.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[14.0, 27.0, 93.0, 408.0]","[1.3251604870457545, 0.8538932859082302, 1.332...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
6,2009-01-01,9E,2122,CLE,MSP,1343,1338.0,-5.0,11.0,1349.0,...,622.0,10.0,32.0,12.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[11.0, -5.0, 135.0, 622.0]","[1.04119752553595, -0.1581283862793019, 1.9343...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
7,2009-01-01,9E,2122,MSP,RHI,1538,1549.0,11.0,12.0,1601.0,...,190.0,10.0,12.0,294.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[12.0, 11.0, 58.0, 190.0]","[1.1358518460392182, 0.34788244981446415, 0.83...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
8,2009-01-01,9E,2123,DTW,GRR,1218,1214.0,-4.0,16.0,1230.0,...,120.0,10.0,8.0,78.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[16.0, -4.0, 62.0, 120.0]","[1.5144691280522908, -0.12650270902344152, 0.8...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
9,2009-01-01,9E,2124,GRR,MSP,1350,1345.0,-5.0,9.0,1354.0,...,408.0,10.0,78.0,12.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[9.0, -5.0, 100.0, 408.0]","[0.8518888845294136, -0.1581283862793019, 1.43...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


## Model Training

In [54]:
model = LinearRegression(maxIter = 25, # pode causar overfitting
                         solver = 'normal',
                         labelCol = 'ARR_DELAY',
                         featuresCol = 'features_vector',
                         elasticNetParam = 0.2,
                         regParam = 0.02)

pipe_stages = stages + [model]

pipe = Pipeline(stages=pipe_stages)

In [55]:
fitted_pipe = pipe.fit(train_df)

Py4JJavaError: An error occurred while calling o1712.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 166.0 failed 1 times, most recent failure: Lost task 0.0 in stage 166.0 (TID 383) (jupyter-caioag3 executor driver): scala.MatchError: [null,1.0,(612,[10,218,325,608,609,610,611],[1.0,1.0,1.0,2.2717036920784364,0.22137974079102266,2.1779428808522967,1.6244122287441223])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$1(Predictor.scala:81)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$4(RDD.scala:1236)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$6(RDD.scala:1237)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1174)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1168)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$2(RDD.scala:1267)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1228)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1214)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1214)
	at org.apache.spark.ml.optim.WeightedLeastSquares.fit(WeightedLeastSquares.scala:107)
	at org.apache.spark.ml.regression.LinearRegression.trainWithNormal(LinearRegression.scala:451)
	at org.apache.spark.ml.regression.LinearRegression.$anonfun$train$1(LinearRegression.scala:345)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:327)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:184)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: scala.MatchError: [null,1.0,(612,[10,218,325,608,609,610,611],[1.0,1.0,1.0,2.2717036920784364,0.22137974079102266,2.1779428808522967,1.6244122287441223])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$1(Predictor.scala:81)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$4(RDD.scala:1236)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$6(RDD.scala:1237)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


## Model performance evaluation

In [None]:
preds = fitted_pipe.transform(test_df)

In [None]:
preds.limit(10).toPandas()