In [None]:
!pip install pyspark

In [8]:
from pyspark.sql import SparkSession


In [9]:
spark=SparkSession.builder.getOrCreate() 

In [10]:
Flights=spark.read.csv("Flights.csv",header=True)
Planes=spark.read.csv("planes.csv",header=True)

In [34]:
# Rename year column
planes = Planes.withColumnRenamed("year","plane_year")


In [35]:
# Join the DataFrames
model_data = Flights.join(planes, on="tailnum", how="leftouter")

In [None]:
model_data.show() 
#Before you get started modeling :Cast the columns to integers (Spark only handles numeric data)

In [37]:
# Cast the columns to integers
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))
model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
model_data = model_data.withColumn("month", model_data.month.cast("integer"))
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast("integer"))

In [38]:
# Create the column plane_age
model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)

In [39]:
# Create is_late
model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)

# Convert to an integer
model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))

# Remove missing values
model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")

In [50]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
# Create a StringIndexer (String Indexer:convert the string values into label indices (numeric values))
carr_indexer = StringIndexer(inputCol="carrier",outputCol="carrier_index")
dest_indexer = StringIndexer(inputCol="dest",outputCol="dest_index")
# Create a OneHotEncoder (technique which is used to convert or transform a categorical feature having string labels into  numerical)
carr_encoder = OneHotEncoder(inputCol="carrier_index",outputCol="carrier_fact")
dest_encoder= OneHotEncoder(inputCol="dest_index",outputCol="dest_fact")

In [51]:
from pyspark.ml.feature import VectorAssembler
# Make a VectorAssembler(transformer that combines a given list of columns into a single vector column.)
vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")

In [52]:
from pyspark.ml import Pipeline

# Make the pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

In [53]:
# Fit and transform the data
piped_data = flights_pipe.fit(model_data).transform(model_data)

In [54]:
# Split the data into training and test sets
training, test = piped_data.randomSplit([.6, .4])

In [56]:
training.show(5)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+----------------+--------+-------+-----+-----+---------+---------+-------+-----+----------+---------------+-------------+--------------+--------------------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|    manufacturer|   model|engines|seats|speed|   engine|plane_age|is_late|label|dest_index|      dest_fact|carrier_index|  carrier_fact|            features|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+----------------+--------+-------+-----+-----+---------+---------+-------+-----+----------+---------------+-------------+--------------+--------------------+
| N102UW|2014|    5|  7|    1311|        6|    2115|        2|     US|  1

In [57]:
test.show(5)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+----------------+--------+-------+-----+-----+---------+---------+-------+-----+----------+---------------+-------------+--------------+--------------------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|    manufacturer|   model|engines|seats|speed|   engine|plane_age|is_late|label|dest_index|      dest_fact|carrier_index|  carrier_fact|            features|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+----------------+--------+-------+-----+-----+---------+---------+-------+-----+----------+---------------+-------------+--------------+--------------------+
| N102UW|2014|   11|  9|    2220|       -5|     555|      -11|     US|  1