# Data preparation, part 2
Markdowns added later, after using it in Big Data project. Also, original notebook splitted into two different parts: data preparation and modelin. At the start of this notebook prepared data lies in three tables:
1. Flights_data, that contain filtered by nulls data only about Southwest airlines, but with all columns. All columns, that was dropped in previous part will be dropped here too
2. Routes_data, contains info routes id as ORIGIN->DESTINATION. Not useful except for better containing in database
3. Airports_data with IATA and latitude/longitude

## Data integration
Load data:

In [3]:
from pyspark.sql import SparkSession
import math
from pyspark.sql import functions as F
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCols
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.types import DoubleType

# Add here your team number teamx
team = 37

# location of your Hive database in HDFS
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

spark.sql("SHOW DATABASES").show()
spark.sql("USE team37_projectdb").show()
spark.sql("SHOW TABLES").show()
spark.sql("SELECT * FROM team37_projectdb.flight_data_bucketed LIMIT 10").show()

+--------------------+
|           namespace|
+--------------------+
|             default|
|             root_db|
|     team0_projectdb|
|team12_hive_proje...|
|    team13_projectdb|
|    team14_projectdb|
|    team15_projectdb|
|    team16_projectdb|
|    team17_projectdb|
|    team18_projectdb|
|    team19_projectdb|
|     team1_projectdb|
|    team20_projectdb|
|    team21_projectdb|
|    team22_projectdb|
|    team23_projectdb|
|    team24_projectdb|
|    team25_projectdb|
|    team26_projectdb|
|    team27_projectdb|
+--------------------+
only showing top 20 rows

++
||
++
++

+----------------+--------------------+-----------+
|       namespace|           tableName|isTemporary|
+----------------+--------------------+-----------+
|team37_projectdb|       airports_data|      false|
|team37_projectdb|  airports_data_load|      false|
|team37_projectdb|         flight_data|      false|
|team37_projectdb|flight_data_bucketed|      false|
|team37_projectdb|          q1_results|      

In [4]:
flights = spark.read.format("avro").table('team37_projectdb.flight_data_bucketed')
flights.head(5)

[Row(fl_date='2021-06-08', dot_code=19393, fl_number=5448, crs_dep_time=1100, dep_time=1120.0, dep_delay=20.0, taxi_out=12.0, wheels_off=1132.0, wheels_on=1153.0, taxi_in=3.0, crs_arr_time=1145, arr_time=1156.0, arr_delay=11.0, cancelled=0.0, cancellation_code='N', diverted=0.0, crs_elapsed_time=105.0, elapsed_time=96.0, air_time=81.0, id=624813, route_id=407),
 Row(fl_date='2021-06-08', dot_code=19393, fl_number=2427, crs_dep_time=1705, dep_time=1703.0, dep_delay=-2.0, taxi_out=9.0, wheels_off=1712.0, wheels_on=1732.0, taxi_in=4.0, crs_arr_time=1750, arr_time=1736.0, arr_delay=-14.0, cancelled=0.0, cancellation_code='N', diverted=0.0, crs_elapsed_time=105.0, elapsed_time=93.0, air_time=80.0, id=624810, route_id=407),
 Row(fl_date='2021-06-08', dot_code=19393, fl_number=2387, crs_dep_time=1130, dep_time=1127.0, dep_delay=-3.0, taxi_out=14.0, wheels_off=1141.0, wheels_on=1305.0, taxi_in=5.0, crs_arr_time=1315, arr_time=1310.0, arr_delay=-5.0, cancelled=0.0, cancellation_code='N', divert

In [5]:
routes = spark.read.format("avro").table('team37_projectdb.route_data_partitioned')
routes.head(5)

[Row(dest='LAS', origin_city='Albuquerque, NM', dest_city='Las Vegas, NV', route_id=11, most_common_distance=486.0, origin='ABQ'),
 Row(dest='PHX', origin_city='Albuquerque, NM', dest_city='Phoenix, AZ', route_id=17, most_common_distance=328.0, origin='ABQ'),
 Row(dest='MDW', origin_city='Albuquerque, NM', dest_city='Chicago, IL', route_id=18, most_common_distance=1121.0, origin='ABQ'),
 Row(dest='HOU', origin_city='Albuquerque, NM', dest_city='Houston, TX', route_id=19, most_common_distance=759.0, origin='ABQ'),
 Row(dest='DEN', origin_city='Albuquerque, NM', dest_city='Denver, CO', route_id=20, most_common_distance=349.0, origin='ABQ')]

In [6]:
airports = spark.read.format("avro").table('team37_projectdb.airports_data_load').withColumnRenamed("latitude", "origin_latitude").withColumnRenamed("longitude", "origin_longitude")
airports.head(5)

[Row(airport_name='ABQ', origin_latitude=35.04022216796875, origin_longitude=-106.60919189453125),
 Row(airport_name='ANC', origin_latitude=61.174320220947266, origin_longitude=-149.99618530273438),
 Row(airport_name='ATL', origin_latitude=33.640445709228516, origin_longitude=-84.42694091796875),
 Row(airport_name='AUS', origin_latitude=30.19453239440918, origin_longitude=-97.66986846923828),
 Row(airport_name='BDL', origin_latitude=41.938873291015625, origin_longitude=-72.6832275390625)]

In [7]:
airports1 = spark.read.format("avro").table('team37_projectdb.airports_data_load').withColumnRenamed("latitude", "dest_latitude").withColumnRenamed("longitude", "dest_longitude").withColumnRenamed("airport_name", "dest_airport_name")
airports1.head(5)

[Row(dest_airport_name='ABQ', dest_latitude=35.04022216796875, dest_longitude=-106.60919189453125),
 Row(dest_airport_name='ANC', dest_latitude=61.174320220947266, dest_longitude=-149.99618530273438),
 Row(dest_airport_name='ATL', dest_latitude=33.640445709228516, dest_longitude=-84.42694091796875),
 Row(dest_airport_name='AUS', dest_latitude=30.19453239440918, dest_longitude=-97.66986846923828),
 Row(dest_airport_name='BDL', dest_latitude=41.938873291015625, dest_longitude=-72.6832275390625)]

### Integrate data into one table. For origin and destination airports connect physical metadata

In [8]:
df = flights.join(routes, on='route_id', how='inner')
df = df.join(airports1, df.dest == airports1.dest_airport_name, how='inner')
df = df.join(airports, df.origin == airports.airport_name, how='inner')
df.head()

Row(route_id=407, fl_date='2021-06-08', dot_code=19393, fl_number=5448, crs_dep_time=1100, dep_time=1120.0, dep_delay=20.0, taxi_out=12.0, wheels_off=1132.0, wheels_on=1153.0, taxi_in=3.0, crs_arr_time=1145, arr_time=1156.0, arr_delay=11.0, cancelled=0.0, cancellation_code='N', diverted=0.0, crs_elapsed_time=105.0, elapsed_time=96.0, air_time=81.0, id=624813, dest='MSY', origin_city='Orlando, FL', dest_city='New Orleans, LA', most_common_distance=551.0, origin='MCO', dest_airport_name='MSY', dest_latitude=29.993389129638672, dest_longitude=-90.25802612304688, airport_name='MCO', origin_latitude=28.42888832092285, origin_longitude=-81.31602478027344)

## Format data
First and most important thing - transformate date with saving it's iterative natures. I have used this formul for month:
month=sin(2∗pi∗month/12)
For day: day=sin(2∗pi∗month/31)
And didn't use same for year, due to only 2021-2023 in dataset and no iterative nature in years.
Also, later I will use same thing for hours

In [9]:
class Encoder(Transformer, HasInputCol, HasOutputCols,
              DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, inputCol, outputCols, n):
        super(Encoder, self).__init__()
        self._set(inputCol=inputCol, outputCols=outputCols)
        self.n = n

    def _transform(self, dataset):
        input_col = self.getInputCol()
        output_cols = self.getOutputCols()
        dataset = dataset.withColumn(output_cols[0],
                                     F.sin(2 * math.pi*F.col(input_col) / self.n))
        return dataset


df = df.withColumn("year", F.year("fl_date"))
df = df.withColumn("month", F.month("fl_date"))
df = df.withColumn("day", F.dayofmonth("fl_date"))
df = Encoder("month", ["month"], 12).transform(df)
df = Encoder("day", ["day"], 31).transform(df)

In [13]:
features = ['crs_dep_time', 'dep_time', 'dep_delay', 
            'taxi_out', 'wheels_off', 'wheels_on', 'taxi_in', 'crs_arr_time', 'arr_time', 'arr_delay', 'cancelled',
            'cancellation_code', 'diverted', 'crs_elapsed_time', 'elapsed_time', 'air_time','id', 'year', 'month', 'day', 'airport_name', 'dest_latitude', 'dest_longitude', 'origin_latitude', 'origin_longitude']

features

['fl_number',
 'crs_dep_time',
 'dep_time',
 'dep_delay',
 'taxi_out',
 'wheels_off',
 'wheels_on',
 'taxi_in',
 'crs_arr_time',
 'arr_time',
 'arr_delay',
 'cancelled',
 'cancellation_code',
 'diverted',
 'crs_elapsed_time',
 'elapsed_time',
 'air_time',
 'id',
 'year',
 'month',
 'day',
 'airport_name',
 'dest_latitude',
 'dest_longitude',
 'origin_latitude',
 'origin_longitude']

### Also, here was generated new variable: delta_minutes, calculated difference between scheduled start and arrival

In [25]:
from pyspark.sql.functions import col, floor, abs
def split_time(column_name):
    hours = floor(col(column_name) / 100).alias(f"{column_name}_hours")
    return hours

time_columns = ["crs_dep_time", "dep_time", "wheels_off", "wheels_on", "crs_arr_time", "arr_time"]
for col_name in time_columns:
    hours, minutes = split_time(col_name)
    df = df.withColumn(f"{col_name}_hours", hours)
    df = df.withColumn(f"{col_name}_minutes", minutes)
df = df.withColumn("delta_minutes", (col("crs_arr_time_hours") * 60 + col("crs_arr_time_minutes") - col("crs_dep_time_hours") * 60 - col("crs_dep_time_minutes")))
df = Encoder("crs_dep_time_hours", ["crs_dep_time_hours"], 24).transform(df)
df = Encoder("dep_time_hours", ["dep_time_hours"], 24).transform(df)
df = Encoder("wheels_off_hours", ["wheels_off_hours"], 24).transform(df)
df = Encoder("wheels_on_hours", ["wheels_on_hours"], 24).transform(df)
df = Encoder("crs_arr_time_hours", ["crs_arr_time_hours"], 24).transform(df)
df = Encoder("arr_time_hours", ["arr_time_hours"], 24).transform(df)
df.head(5)

[Row(route_id=407, fl_date='2021-06-08', dot_code=19393, fl_number=5448, crs_dep_time=1100, dep_time=1120.0, dep_delay=20.0, taxi_out=12.0, wheels_off=1132.0, wheels_on=1153.0, taxi_in=3.0, crs_arr_time=1145, arr_time=1156.0, arr_delay=11.0, cancelled=0.0, cancellation_code='N', diverted=0.0, crs_elapsed_time=105.0, elapsed_time=96.0, air_time=81.0, id=624813, dest='MSY', origin_city='Orlando, FL', dest_city='New Orleans, LA', most_common_distance=551.0, origin='MCO', dest_airport_name='MSY', dest_latitude=29.993389129638672, dest_longitude=-90.25802612304688, airport_name='MCO', origin_latitude=28.42888832092285, origin_longitude=-81.31602478027344, year=2021, month=1.2246467991473532e-16, day=0.9987165071710528, crs_dep_time_hours=0.258819045102521, dep_time_hours=0.258819045102521, wheels_off_hours=0.258819045102521, wheels_on_hours=0.258819045102521, crs_arr_time_hours=0.258819045102521, arr_time_hours=0.258819045102521),
 Row(route_id=407, fl_date='2021-06-08', dot_code=19393, fl_

In [26]:
df.head(1)

[Row(route_id=407, fl_date='2021-06-08', dot_code=19393, fl_number=5448, crs_dep_time=1100, dep_time=1120.0, dep_delay=20.0, taxi_out=12.0, wheels_off=1132.0, wheels_on=1153.0, taxi_in=3.0, crs_arr_time=1145, arr_time=1156.0, arr_delay=11.0, cancelled=0.0, cancellation_code='N', diverted=0.0, crs_elapsed_time=105.0, elapsed_time=96.0, air_time=81.0, id=624813, dest='MSY', origin_city='Orlando, FL', dest_city='New Orleans, LA', most_common_distance=551.0, origin='MCO', dest_airport_name='MSY', dest_latitude=29.993389129638672, dest_longitude=-90.25802612304688, airport_name='MCO', origin_latitude=28.42888832092285, origin_longitude=-81.31602478027344, year=2021, month=1.2246467991473532e-16, day=0.9987165071710528, crs_dep_time_hours=0.258819045102521, dep_time_hours=0.258819045102521, wheels_off_hours=0.258819045102521, wheels_on_hours=0.258819045102521, crs_arr_time_hours=0.258819045102521, arr_time_hours=0.258819045102521)]

In [29]:
features = ['taxi_out', 'taxi_in', 'arr_delay', 'diverted', 'crs_elapsed_time', 'elapsed_time', 'air_time', 'year', 'month', 'day', 'airport_name', 'dest_latitude', 'dest_longitude', 'origin_latitude', 'origin_longitude', 'crs_dep_time_hours',
            'dep_time_hours', 'wheels_off_hours', 'wheels_on_hours', 'crs_arr_time_hours', 'arr_time_hours']

features

['taxi_out',
 'taxi_in',
 'arr_delay',
 'diverted',
 'crs_elapsed_time',
 'elapsed_time',
 'air_time',
 'year',
 'month',
 'day',
 'airport_name',
 'dest_latitude',
 'dest_longitude',
 'origin_latitude',
 'origin_longitude',
 'crs_dep_time_hours',
 'dep_time_hours',
 'wheels_off_hours',
 'wheels_on_hours',
 'crs_arr_time_hours',
 'arr_time_hours']

In [49]:
df_filtered = df.select(*features)
df_filtered = df_filtered.withColumnRenamed("arr_delay","label")
df_filtered.head(1)

[Row(taxi_out=12.0, taxi_in=3.0, label=11.0, diverted=0.0, crs_elapsed_time=105.0, elapsed_time=96.0, air_time=81.0, year=2021, month=1.2246467991473532e-16, day=0.9987165071710528, airport_name='MCO', dest_latitude=29.993389129638672, dest_longitude=-90.25802612304688, origin_latitude=28.42888832092285, origin_longitude=-81.31602478027344, crs_dep_time_hours=0.258819045102521, dep_time_hours=0.258819045102521, wheels_off_hours=0.258819045102521, wheels_on_hours=0.258819045102521, crs_arr_time_hours=0.258819045102521, arr_time_hours=0.258819045102521)]

## Data formatting
Use encoding of original airports and scaling of all features. Also choose features, that was choosed in previous part of data preparation. And was added new variables.

In [95]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler,StandardScaler, Word2Vec, Tokenizer, RegexTokenizer
from pyspark.sql.functions import col

categoricalCols = ['airport_name']
others = ['crs_elapsed_time', 'year', 'month', 'day', 'dest_latitude', 'dest_longitude', 'origin_latitude', 'origin_longitude', 'crs_dep_time_hours', 'crs_arr_time_hours']

# StringIndexer for categorical columns
indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in categoricalCols ]

# OneHotEncoder for indexed categorical columns
encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers ]

# VectorAssembler to combine all features into a single vector
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + others, outputCol="features")

# StandardScaler to scale the features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)

# Create the pipeline with all stages
pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler])

# Fit the pipeline to the data
model = pipeline.fit(df_filtered)

# Transform the data
data = model.transform(df_filtered)
data.select('label', 'scaledFeatures').head(5)

[Row(label=11.0, scaledFeatures=DenseVector([-0.2679, -0.244, -0.2426, -0.2296, -0.2285, -0.223, -0.1974, -0.1868, 5.5993, -0.169, -0.1638, -0.1612, -0.1558, -0.1539, -0.1512, -0.1498, -0.143, -0.1375, -0.1212, -0.12, -0.116, -0.1154, -0.1132, -0.1023, -0.1014, -0.0999, -0.0975, -0.0934, -0.0902, -0.089, -0.0888, -0.086, -0.0853, -0.0819, -0.0814, -0.0813, -0.0802, -0.0802, -0.0798, -0.0784, -0.0768, -0.0746, -0.0742, -0.0732, -0.0729, -0.0726, -0.068, -0.0673, -0.0663, -0.0649, -0.0602, -0.0592, -0.0584, -0.0583, -0.058, -0.0574, -0.0573, -0.057, -0.0569, -0.0562, -0.0555, -0.0545, -0.0536, -0.0536, -0.0529, -0.0525, -0.0511, -0.051, -0.051, -0.0504, -0.0494, -0.0476, -0.0475, -0.0471, -0.0467, -0.0456, -0.0455, -0.045, -0.0423, -0.0423, -0.0419, -0.0418, -0.0412, -0.0403, -0.0394, -0.0391, -0.0391, -0.0384, -0.038, -0.0365, -0.0364, -0.0361, -0.0347, -0.0343, -0.0339, -0.0329, -0.0327, -0.0326, -0.0325, -0.0309, -0.0298, -0.0291, -0.024, -0.0239, -0.0238, -0.4096, -1.1398, -0.1123, 1