## Hands-on PySpark 
<br>
The goal of this project is to predict whether a flight will be **delayed for more than 15 minutes** using Spark MLlib API.

In [0]:
df = spark.read.format("csv") \
               .option("header", "true") \
               .load("dbfs:/FileStore/shared_uploads/Idrissa.ndiaye@codeworks.fr/flight_delays_train.csv")

In [0]:
df.show(5)

In [0]:
df.columns

In [0]:
df.printSchema()

In [0]:
df.select('UniqueCarrier', 'dep_delayed_15min').show(3)

In [0]:
df.filter(df['dep_delayed_15min'] == 'Y')  \
  .groupBy('UniqueCarrier')  \
  .count()   \
  .orderBy('count', ascending=False) \
  .show(5, False) 

In [0]:
df.groupBy('dep_delayed_15min').count().show()

In [0]:
from pyspark.sql.types import StringType, DoubleType, IntegerType
from pyspark.sql.functions import udf, pandas_udf, split

PySpark UDFs work in a similar way as the pandas **.map()** and **.apply()** methods for pandas series and dataframes.

In [0]:
def binDepTime(DepTime):
  
  if DepTime < 600:
    return "Night"
  
  elif DepTime >= 600 and DepTime < 1200:
    return "Morning"
  
  elif DepTime >= 1200 and DepTime < 1800:
    return "Afternoon"
  
  else:
    return "Evening"
  
# create udf using python function
dep_time_udf = udf(binDepTime, StringType())

# apply udf on dataframe after converting to INT
df = df.withColumn("DepTime", df["DepTime"].cast(IntegerType()))
df = df.withColumn("binDepTime", dep_time_udf(df["DepTime"]))

df.show(2)

In [0]:
time_features = ("Month", "DayofMonth", "DayOfWeek")

for time in time_features:
  df = df.withColumn("new"+time, split(df[time], "-").getItem(1).cast(IntegerType()))

df.show(2)

In [0]:
label_udf = udf(lambda x: 1 if x == "Y" else 0, IntegerType())

df = df.withColumn("label", label_udf(df.dep_delayed_15min))
df = df.withColumn("Distance", df.Distance.cast(IntegerType()))

df.show(5)

- **StringIndexer** encodes a string column of labels to a column of label indices.
- **Bucketizer** transforms a column of continuous features to a column of feature buckets, where the buckets are specified by users.

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Bucketizer

# We transform the binDepTime feature to numerical form

bin_dep_time_indexer = StringIndexer(inputCol="binDepTime", outputCol="binDepTime_Num").fit(df)
df = bin_dep_time_indexer.transform(df)


# We create a new feature called bucketedDistance based on distance level

distanceBins = [0.0, 500.0, 1000.0, 1500.0, 2000.0, float("inf")]  # arbitrary
distance_bucketizer = Bucketizer(splits=distanceBins, inputCol="Distance", outputCol="bucketedDistance")
df = distance_bucketizer.transform(df)

df.show(5)

- **VectorAssembler** is a transformer that combines a given list of columns into a single vector column. <br>It is useful for combining raw features and features generated by different feature transformers into a single feature vector, **in order to train ML models** like logistic regression and decision trees.

In [0]:
from pyspark.ml.feature import VectorAssembler

inputCols=('DepTime', 'Distance', 'newMonth', 'newDayofMonth', 'newDayOfWeek', 'binDepTime_Num', 'bucketedDistance')

df_assembler = VectorAssembler(inputCols=inputCols, outputCol="features")
df_train = df_assembler.transform(df)

df_train.show(3)

In [0]:
df_train.select(['features', 'label']).show(5, False)

In [0]:
import pyspark

df_model = df_train.select(['features', 'label']).persist(pyspark.StorageLevel.MEMORY_AND_DISK)

training_df, valid_df = df_model.randomSplit(weights=[0.75, 0.25], seed=2022)

print(f"Number of training rows : {training_df.count()}")
print(f"Number of valid rows : {valid_df.count()}")

- **Logistic regression** : classification algorithm to predict a categorical response. It is a special case of Generalized Linear models that predicts the probability of the outcomes
- **Gradient-boosted tree classifier** : GBTs are a popular classification and regression method using ensembles of decision trees. 
- **Multilayer perceptron classifier** : Multilayer perceptron classifier (MLPC) is a classifier based on the [feedforward artificial neural network](https://en.wikipedia.org/wiki/Feedforward_neural_network) . MLPC consists of multiple layers of nodes. Each layer is fully connected to the next layer in the network

In [0]:
%timeit

from pyspark.ml.classification import LogisticRegression, GBTClassifier, MultilayerPerceptronClassifier, LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator

logreg_model = LogisticRegression(labelCol='label').fit(training_df)
gbt_model = GBTClassifier(labelCol='label').fit(training_df)


# specify layers for the neural network:
# input layer of size 7 (features), two intermediate of size 5 and 4
# and output of size 2 (classes)

layers = [7, 5, 4, 2]

mlpc_model = MultilayerPerceptronClassifier(labelCol='label', maxIter=100, layers=layers, blockSize=128, seed=2022).fit(training_df)

In [0]:
logreg_predictions = logreg_model.transform(valid_df)
gbt_predictions = gbt_model.transform(valid_df)
mlpc_predictions = mlpc_model.transform(valid_df)

logreg_predictions.show(3, False)

In [0]:
logreg_auc = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC").evaluate(logreg_predictions)
logreg_auc