# PDA

In [1]:
# Libraries importing
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType
import pyspark.sql.functions as F
import math
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DateType, ArrayType, FloatType

In [3]:
from pyspark.sql import SparkSession

# Add here your team number teamx
team = 22

# location of your Hive database in HDFS
warehouse = "/user/team22/project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("spark.executor.cores", 10)\
        .config("spark.executor.memory", "10g")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

In [3]:
# spark.sql("SHOW DATABASES").show()
spark.sql("USE team22_projectdb")
spark.sql("SHOW TABLES").show()

+----------------+----------+-----------+
|       namespace| tableName|isTemporary|
+----------------+----------+-----------+
|team22_projectdb|   flights|      false|
|team22_projectdb|q1_results|      false|
|team22_projectdb|q2_results|      false|
|team22_projectdb|q3_results|      false|
|team22_projectdb|q4_results|      false|
|team22_projectdb|q5_results|      false|
|team22_projectdb|q6_results|      false|
+----------------+----------+-----------+



## Data loading

In [4]:
# Load dataset
df = spark.sql("SELECT * FROM team22_projectdb.flights")

In [5]:
# Shape
rows = df.count()
cols = len(df.columns)
print(f'Rows: {rows}\nColumns: {cols}')

Rows: 6311871
Columns: 61


In [60]:
# Schema
df.printSchema()

root
 |-- flightdate: date (nullable = true)
 |-- airline: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- cancelled: boolean (nullable = true)
 |-- diverted: boolean (nullable = true)
 |-- crsdeptime: timestamp (nullable = true)
 |-- deptime: timestamp (nullable = true)
 |-- depdelayminutes: integer (nullable = true)
 |-- depdelay: integer (nullable = true)
 |-- arrtime: timestamp (nullable = true)
 |-- arrdelayminutes: integer (nullable = true)
 |-- airtime: integer (nullable = true)
 |-- crselapsedtime: integer (nullable = true)
 |-- actualelapsedtime: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- dayofmonth: integer (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- marketing_airline_network: string (nullable = true)
 |-- operated_or_branded_code_share_partners: string (nullable = true)
 |-- dot_id_marketing_air

In [7]:
# 1 sample
df.show(1)

+----------+--------------------+------+----+---------+--------+-------------------+-------------------+---------------+--------+-------------------+---------------+-------+--------------+-----------------+--------+----+-------+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+-------------------+-------------------+------+-------------------+--------+--------+------------------+----------+-------------+------------------+-----+
|flightdate|             airline|origin|dest|

The original dataset contains 6,311,871 rows and 61 columns, some of which have null values. Since nulls are contained in almost all rows for which the label value is True, it was decided not to drop such rows, but to perform custom imputing.

In [5]:
# Dropped rows
from pyspark.sql.functions import coalesce

def fillna_mean(df, include=set()): 
    means = df.agg(*(
        F.mean(x).alias(x) for x in df.columns if x in include
    ))
    return df.fillna(means.first().asDict())

def fillna_mode(df, column): 
    moda = df.groupby(column).count()\
                             .orderBy('count', ascending=False)\
                             .select(column)\
                             .collect()[1][column]
    return df.na.fill(value = moda, subset = [column])

# Small number of rows containing nulls
df = df.dropna(subset = ["crselapsedtime", 'divairportlandings'])

# Fill DepTime nulls
df = df.withColumn("deptime", coalesce(df.deptime,
                                         df.crsdeptime))
# Fill ArrTime nulls
df = df.withColumn("arrtime", coalesce(df.arrtime,
                                         df.crsarrtime))
 # Fill ActualElapsedTime nulls
df = df.withColumn("actualelapsedtime", coalesce(df.actualelapsedtime, 
                                                   df.crselapsedtime))
# Fill null values with mean
df = fillna_mean(df, ['taxiout', 'wheelsoff', 'wheelson', 'taxiin'])
# Fill null values with mode
df = fillna_mode(df, 'tail_number')

# Fill AirTime null values based on airtime formula
df = df.withColumn('tmp', (df['actualelapsedtime'] - df['taxiout'] - df['taxiin']))
df = df.withColumn("airtime", coalesce(df.airtime, 
                                         df.tmp)).drop('tmp')

# Fill with zeroes and False
df = df.na.fill(value = 0, subset = ["depdelayminutes", 'depdelay', 
                                       "arrdelayminutes", 'arrdelay',
                                       'arrivaldelaygroups', 
                                       'departuredelaygroups'])
df = df.na.fill(value = False, subset = ['depdel15', 'arrdel15'])

## Feature extraction

In [6]:
# Features' categories
numerical = ['depdelayminutes', 'depdelay', 'arrdelayminutes', 'airtime',
             'crselapsedtime', 'actualelapsedtime', 'distance', 'year', 
              'flight_number_marketing_airline', 'flight_number_operating_airline', 
             'originairportid', 'originairportseqid', 'origincitymarketid', 
             'originstatefips', 'originwac', 'destairportid', 'destairportseqid', 
             'destcitymarketid', 'deststatefips', 'destwac',  
             'departuredelaygroups', 'taxiout', 'taxiin', 'arrdelay', 
              'arrivaldelaygroups', 'distancegroup']

cyclical = ['quarter', 'month', 'dayofmonth', 'dayofweek']
time_features = ['crsdeptime', 'deptime', 'crsarrtime', 'arrtime',  'wheelson', 'wheelsoff']
date_features = ['flightdate']
boolean_features = ['diverted', 'depdel15','arrdel15']

categorical_as_cont = ['origin', 'dest', 'tail_number', 'origincityname', 
                       'destcityname']
categorical_ohe = ['airline', 'marketing_airline_network', 
                   'operated_or_branded_code_share_partners', 
                   'iata_code_marketing_airline', 'operating_airline', 
                   'iata_code_operating_airline', 'originstate', 
                   'originstatename', 'deststate', 'deststatename', 
                   'deptimeblk', 'arrtimeblk', 
                   'divairportlandings', 'dot_id_marketing_airline','dot_id_operating_airline']

label = 'cancelled'

print(f"Categorical features: {', '.join(categorical_ohe)}\n" + 
      f"Categorical features with many uniques: {', '.join(categorical_as_cont)}\n" + 
      f"Numerical features: {', '.join(numerical)}\n" + 
      f"Time features: {', '.join(time_features)}\n" + 
      f"Boolean features: {', '.join(boolean_features)}\n" + 
      f"Target: {label}")

Categorical features: airline, marketing_airline_network, operated_or_branded_code_share_partners, iata_code_marketing_airline, operating_airline, iata_code_operating_airline, originstate, originstatename, deststate, deststatename, deptimeblk, arrtimeblk, divairportlandings, dot_id_marketing_airline, dot_id_operating_airline
Categorical features with many uniques: origin, dest, tail_number, origincityname, destcityname
Numerical features: depdelayminutes, depdelay, arrdelayminutes, airtime, crselapsedtime, actualelapsedtime, distance, year, flight_number_marketing_airline, flight_number_operating_airline, originairportid, originairportseqid, origincitymarketid, originstatefips, originwac, destairportid, destairportseqid, destcitymarketid, deststatefips, destwac, departuredelaygroups, taxiout, taxiin, arrdelay, arrivaldelaygroups, distancegroup
Time features: crsdeptime, deptime, crsarrtime, arrtime, wheelson, wheelsoff
Boolean features: diverted, depdel15, arrdel15
Target: cancelled


After the stage of nulls elimination, one need to do proper sampling for dataset, as it is very imbalanced in term of label columns 'cancelled'. The reason for that is simply that dataset contains less records about cancelled flights, than successfully completed.

In [7]:
# Sample data as it's really imbalansed wrt label
major_df = df.filter(F.col(label) == 'false')
minor_df = df.filter(F.col(label) == 'true')

result_frac = minor_df.count()/major_df.count()

df_sampled = df.sampleBy(label, fractions={False: result_frac, True: 1}, seed=123)

A common method for encoding different cyclical data, including date and time, is to transform this data into two dimensions using a sine and cosine transformations. 

In [8]:
# Custom transformer to encode cyclical features with sin and cos
class TimeTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    input_col = Param(Params._dummy(), "input_col", "input column name.", typeConverter=TypeConverters.toString)
    output_col = Param(Params._dummy(), "output_col", "output column name.", typeConverter=TypeConverters.toString)

    @keyword_only
    def __init__(self, input_col: str = "input", output_col: str = "output"):
        super(TimeTransformer, self).__init__()
        self._setDefault(input_col=None, output_col=None)
        kwargs = self._input_kwargs
        self.set_params(**kwargs)
        self.set_coef()

    def set_coef(self, coef: float = 12.0):
        self.coef = coef
        return self

    @keyword_only
    def set_params(self, input_col: str = "input", output_col: str = "output"):
        kwargs = self._input_kwargs
        self._set(**kwargs)

    def get_input_col(self):
        return self.getOrDefault(self.input_col)

    def get_output_col(self):
        return self.getOrDefault(self.output_col)

    def _transform(self, df: DataFrame):
        input_col = self.get_input_col()
        output_col = self.get_output_col()
        output_col_cos = output_col + "_cos"
        output_col_sin = output_col + "_sin"

        transform_udf = F.udf(lambda x: str(2*math.pi*int(x)/self.coef), StringType())
        return df.withColumn(output_col_cos, F.cos(transform_udf(input_col)))\
                 .withColumn(output_col_sin, F.sin(transform_udf(input_col)))

The main pipeline for features extraction includes indexers, time transformer, one-hot encoder and minmax scaler.

In [9]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline

# Convert boolean
for b in boolean_features + [label]:
    df_sampled = df_sampled.withColumn(b, F.col(b).cast('integer'))

# String indexers for categorical features
indexer = StringIndexer()
indexer.setHandleInvalid("keep")
indexer.setInputCols(categorical_ohe)
features_idx = list(map(lambda x : x + "_idx", categorical_ohe))
indexer.setOutputCols(features_idx)

cat_indexer = StringIndexer()
cat_indexer.setHandleInvalid("keep")
cat_indexer.setInputCols(categorical_as_cont)
features_cat_idx = list(map(lambda x : x + "_idx", categorical_as_cont))
cat_indexer.setOutputCols(features_cat_idx)

# One-hot encoding for categorical
encoders = []
for f in features_idx:
    encoders.append(OneHotEncoder(inputCol = f, outputCol = f + "_enc"))

# Encode parsed cyclical
period = {
    "quarter": 4, 
    "month": 12, 
    "dayofmonth": 31, 
    "dayofweek": 7
}
cyclical_encoders = []
for c in cyclical:
    cyclical_encoders.append(TimeTransformer(input_col = c, output_col = c).set_coef(coef = period[c]))
    
# Encode time
time_idx = []
time_encoders = []
for t in time_features:
    time_idx.append(t + '_hour')
    time_idx.append(t + '_minute')
    time_idx.append(t + '_day')
    time_idx.append(t + '_month')

    df_sampled = df_sampled.withColumn(t + '_hour', F.hour(t))
    df_sampled = df_sampled.withColumn(t + '_minute', F.minute(t))
    df_sampled = df_sampled.withColumn(t + '_day', F.dayofmonth(t))
    df_sampled = df_sampled.withColumn(t + '_month', F.month(t))
    
    time_encoders.append(TimeTransformer(input_col = t + '_hour', output_col = t + '_hour').set_coef(coef = 24.0))
    time_encoders.append(TimeTransformer(input_col = t + '_minute', output_col = t + '_minute').set_coef(coef = 60.0))
    time_encoders.append(TimeTransformer(input_col = t + '_day', output_col = t + '_day').set_coef(coef = 31.0))
    time_encoders.append(TimeTransformer(input_col = t + '_month', output_col = t + '_month').set_coef(coef = 12.0))

# Encode date
date_idx = []
date_encoders = []
for d in date_features:
    date_idx.append(d + '_day')
    date_idx.append(d + '_month')
    
    df_sampled = df_sampled.withColumn(d + '_day', F.dayofmonth(t))
    df_sampled = df_sampled.withColumn(d + '_month', F.month(t))
    
    date_encoders.append(TimeTransformer(input_col = d + '_day', 
                                         output_col = d + '_day').set_coef(coef = 31.0))
    date_encoders.append(TimeTransformer(input_col = d + '_month', 
                                         output_col = d + '_month').set_coef(coef = 12.0))


# Assemble all features
assembler = VectorAssembler(
    inputCols = [f + "_enc" for f in features_idx] + features_cat_idx + numerical + boolean_features + [f + "_sin" for f in time_idx] + \
                [f + "_cos" for f in time_idx] + [f + "_sin" for f in date_idx] + [f + "_cos" for f in date_idx] + [f + "_sin" for f in cyclical] + \
                [f + "_cos" for f in cyclical],
    outputCol='features_unscaled'
    )

# MinMax Scaler
scaler = MinMaxScaler(inputCol = "features_unscaled", outputCol = "features")

# Apply pipeline
pipeline = Pipeline(stages = [indexer, cat_indexer] + encoders + time_encoders + date_encoders + cyclical_encoders + [assembler, scaler])
features_pipeline_model = pipeline.fit(df_sampled)
df_enc = features_pipeline_model.transform(df_sampled)

# Indexing the labels
label_indexer = StringIndexer()
label_indexer.setInputCol(label)
label_indexer.setOutputCol('label')
label_idx_model = label_indexer.fit(df_enc)

# Apply the indexer
df_labeled = label_idx_model.transform(df_enc)

In [10]:
# Get features and label
df_proj = df_labeled.select('features', 'label')
df_proj.show(10)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(481,[11,22,33,47...|  1.0|
|(481,[11,22,33,47...|  1.0|
|(481,[11,22,33,47...|  0.0|
|(481,[11,22,33,47...|  1.0|
|(481,[11,22,33,47...|  1.0|
|(481,[11,22,33,47...|  1.0|
|(481,[11,22,33,47...|  1.0|
|(481,[11,22,33,47...|  1.0|
|(481,[11,22,33,47...|  0.0|
|(481,[11,22,33,47...|  1.0|
+--------------------+-----+
only showing top 10 rows



In [31]:
# Number of new features
df_proj.select('features').collect()[0]['features'].toArray().shape

(481,)

## Modeling

In [11]:
# Train/Test split
trainRatio = 0.7

train_df, test_df = df_proj.randomSplit([trainRatio, 1 - trainRatio], seed = 42)
print(f"Ratio: {trainRatio}\nTrain size: {train_df.count()}\nTest size: {test_df.count()}")

Ratio: 0.7
Train size: 155402
Test size: 66485


In [12]:
# A function to run commands
import os
def run(command):
    return os.popen(command).read()

# Save to HDFS
train_df.select("features", "label")\
    .write\
    .mode("overwrite")\
    .format('json')\
    .save("/user/team22/project/data/train")

# Add localy
run("hdfs dfs -cat /user/team22/project/data/train/*.json > ~/bigdata-final-project/data/train.json")

# Save to HDFS
test_df.select("features", "label")\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("/user/team22/project/data/test")

# Add localy
run("hdfs dfs -cat /user/team22/project/data/test/*.json > ~/bigdata-final-project/data/test.json")

''

## Model 1

Random Forest is an ensemble learning method that combines multiple decision trees to create a more robust and accurate model. It builds multiple decision trees to improve the overall performance using a subset of the training data.

In [13]:
# Train first model
from pyspark.ml.classification import RandomForestClassifier

rf_calssifier = RandomForestClassifier()
rf_model = rf_calssifier.fit(train_df)

In [14]:
# Test first model
rf_predictions = rf_model.transform(test_df)
rf_predictions.show(10)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(481,[0,24,32,49,...|  1.0|[1.69139069648060...|[0.08456953482403...|       1.0|
|(481,[0,24,32,49,...|  0.0|[19.3256412923311...|[0.96628206461655...|       0.0|
|(481,[0,24,32,49,...|  0.0|[19.3256266565640...|[0.96628133282820...|       0.0|
|(481,[0,24,32,49,...|  0.0|[19.5285136831634...|[0.97642568415817...|       0.0|
|(481,[0,24,32,49,...|  1.0|[0.81929420337911...|[0.04096471016895...|       1.0|
|(481,[0,24,32,49,...|  0.0|[19.6574881924735...|[0.98287440962367...|       0.0|
|(481,[0,24,32,49,...|  0.0|[19.6578216672810...|[0.98289108336405...|       0.0|
|(481,[0,24,32,49,...|  0.0|[19.8502231911957...|[0.99251115955978...|       0.0|
|(481,[0,24,32,49,...|  0.0|[19.3374913360979...|[0.96687456680489...|       0.0|
|(481,[0,24,32,4

In [15]:
# Evaluation
from pyspark.ml.evaluation import BinaryClassificationEvaluator

rf_evaluator_roc = BinaryClassificationEvaluator()\
  .setLabelCol("label")\
  .setRawPredictionCol("prediction")\
  .setMetricName("areaUnderROC")

rf_roc = rf_evaluator_roc.evaluate(rf_predictions)

rf_evaluator_pr = BinaryClassificationEvaluator()\
  .setLabelCol("label")\
  .setRawPredictionCol("prediction")\
  .setMetricName("areaUnderPR")

rf_pr = rf_evaluator_pr.evaluate(rf_predictions)

print(f"Test area under ROC: {rf_roc}\nTest area under PR: {rf_pr}")

Test area under ROC: 0.9990340030748656
Test area under PR: 0.9987893832699755


In [16]:
# Hyper-parameter optimization
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

print(f"Hyperparameters:\n\t maxDepth - {[5, 10]}\n\t impurity - {['entropy', 'gini']}\n\t numTrees - {[10, 20]}")

rf_grid = ParamGridBuilder()
rf_grid = rf_grid.addGrid(rf_model.maxDepth, [5, 10])\
                 .addGrid(rf_model.impurity, ['entropy', 'gini'])\
                 .addGrid(rf_model.numTrees, [10, 20])\
                 .build()

rf_cv = CrossValidator(estimator = rf_calssifier,
                    estimatorParamMaps = rf_grid,
                    evaluator = rf_evaluator_roc,
                    parallelism = 5,
                    numFolds=3)

rf_cvModel = rf_cv.fit(train_df)
model1 = rf_cvModel.bestModel

print(f"\nBest hyperparameters:\n\t maxDepth - {model1.getMaxDepth()}\n\t impurity - {model1.getImpurity()}\n\t numTrees - {model1.getNumTrees}\n")

model1

Hyperparameters:
	 maxDepth - [5, 10]
	 impurity - ['entropy', 'gini']
	 numTrees - [10, 20]

Best hyperparameters:
	 maxDepth - 10
	 impurity - gini
	 numTrees - 10



RandomForestClassificationModel: uid=RandomForestClassifier_a343737ec309, numTrees=10, numClasses=2, numFeatures=481

In [47]:
model1.write().overwrite().save("/user/team22/project/models/model1")
run("hdfs dfs -get /user/team22/project/models/model1 ~/bigdata-final-project/models/model1")

''

In [17]:
rf_predictions_grid = model1.transform(test_df)
rf_predictions_grid.show(10)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(481,[0,24,32,49,...|  1.0|          [0.0,10.0]|           [0.0,1.0]|       1.0|
|(481,[0,24,32,49,...|  0.0|[9.99977270144334...|[0.99997727014433...|       0.0|
|(481,[0,24,32,49,...|  0.0|[9.99977270144334...|[0.99997727014433...|       0.0|
|(481,[0,24,32,49,...|  0.0|[9.99977270144334...|[0.99997727014433...|       0.0|
|(481,[0,24,32,49,...|  1.0|          [0.0,10.0]|           [0.0,1.0]|       1.0|
|(481,[0,24,32,49,...|  0.0|[9.99977270144334...|[0.99997727014433...|       0.0|
|(481,[0,24,32,49,...|  0.0|[9.99977270144334...|[0.99997727014433...|       0.0|
|(481,[0,24,32,49,...|  0.0|[9.99977270144334...|[0.99997727014433...|       0.0|
|(481,[0,24,32,49,...|  0.0|[9.99977270144334...|[0.99997727014433...|       0.0|
|(481,[0,24,32,4

In [21]:
rf_predictions_grid.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("/user/team22/project/output/model1_predictions")
run("hdfs dfs -cat /user/team22/project/output/model1_predictions/*.csv > ~/bigdata-final-project/output/model1_predictions.csv")

''

In [18]:
rf_evaluator_best_roc = BinaryClassificationEvaluator()\
  .setLabelCol("label")\
  .setRawPredictionCol("prediction")\
  .setMetricName("areaUnderROC")

rf_best_roc = rf_evaluator_best_roc.evaluate(rf_predictions_grid)

rf_evaluator_best_pr = BinaryClassificationEvaluator()\
  .setLabelCol("label")\
  .setRawPredictionCol("prediction")\
  .setMetricName("areaUnderPR")

rf_best_pr = rf_evaluator_best_pr.evaluate(rf_predictions_grid)

print(f"Test area under ROC for best model: {rf_best_roc}\nTest area under PR for best model: {rf_best_pr}")

Test area under ROC for best model: 0.9999400910615863
Test area under PR for best model: 0.9999701718559475


In [22]:
print(f"Test area under ROC for first model: {rf_roc}\nTest area under PR for first model: {rf_pr}")
print(f"Test area under ROC for best model: {rf_best_roc}\nTest area under PR for best model: {rf_best_pr}")
print(f"\nTest area under ROC increased by {rf_best_roc - rf_roc} with optimization\nTest area under PR increased " + 
      f"by {rf_best_pr - rf_pr} with optimization")

Test area under ROC for first model: 0.9990340030748656
Test area under PR for first model: 0.9987893832699755
Test area under ROC for best model: 0.9999400910615863
Test area under PR for best model: 0.9999701718559475

Test area under ROC increased by 0.0009060879867207605 with optimization
Test area under PR increased by 0.0011807885859720368 with optimization


## Model 2

The Factorization Machines algorithm is a supervised learning algorithm which is an extension of a linear model designed to capture interactions between features within high dimensional sparse datasets economically.

In [23]:
# Train second model
from pyspark.ml.classification import FMClassifier

fm_calssifier = FMClassifier()
fm_model = fm_calssifier.fit(train_df)

In [24]:
# Test second model
fm_predictions = fm_model.transform(test_df)
fm_predictions.show(10)

+--------------------+-----+--------------------+-----------+----------+
|            features|label|       rawPrediction|probability|prediction|
+--------------------+-----+--------------------+-----------+----------+
|(481,[0,24,32,49,...|  1.0|[-544.39373904847...|  [0.0,1.0]|       1.0|
|(481,[0,24,32,49,...|  1.0|[-2778.9815954978...|  [0.0,1.0]|       1.0|
|(481,[0,24,32,49,...|  1.0|[-1510.2971542198...|  [0.0,1.0]|       1.0|
|(481,[0,24,32,49,...|  1.0|[-1666.4601865016...|  [0.0,1.0]|       1.0|
|(481,[0,24,32,49,...|  1.0|[-485.29834042243...|  [0.0,1.0]|       1.0|
|(481,[0,24,32,49,...|  1.0|[-3013.5754113979...|  [0.0,1.0]|       1.0|
|(481,[0,24,32,49,...|  1.0|[-4548.9441311390...|  [0.0,1.0]|       1.0|
|(481,[0,24,32,49,...|  1.0|[-3781.1434407947...|  [0.0,1.0]|       1.0|
|(481,[0,24,32,49,...|  1.0|[-1910.1580735405...|  [0.0,1.0]|       1.0|
|(481,[0,24,32,49,...|  1.0|[-3206.8077174193...|  [0.0,1.0]|       1.0|
+--------------------+-----+--------------------+--

In [25]:
# Evaluation
from pyspark.ml.evaluation import BinaryClassificationEvaluator

fm_evaluator_roc = BinaryClassificationEvaluator()\
  .setLabelCol("label")\
  .setRawPredictionCol("prediction")\
  .setMetricName("areaUnderROC")

fm_roc = fm_evaluator_roc.evaluate(fm_predictions)

fm_evaluator_pr = BinaryClassificationEvaluator()\
  .setLabelCol("label")\
  .setRawPredictionCol("prediction")\
  .setMetricName("areaUnderPR")

fm_pr = fm_evaluator_pr.evaluate(fm_predictions)

print(f"Test area under ROC: {fm_roc}\nTest area under PR: {fm_pr}")

Test area under ROC: 0.9988274728734019
Test area under PR: 0.9988706038360329


In [26]:
# Hyper-parameter optimization
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
print(f"Hyperparameters:\n\t regParam - {[0.0, 0.5]}\n\t initStd - {[0.01, 0.05]}\n\t factorSize - {[4, 8]}")

fm_grid = ParamGridBuilder()
fm_grid = fm_grid.addGrid(fm_model.regParam, [0.0, 0.5])\
                 .addGrid(fm_model.initStd, [0.01, 0.05])\
                 .addGrid(fm_model.factorSize, [4, 8])\
                 .build()

fm_cv = CrossValidator(estimator = fm_calssifier,
                    estimatorParamMaps = fm_grid,
                    evaluator = fm_evaluator_roc,
                    parallelism = 5,
                    numFolds=3)

fm_cvModel = fm_cv.fit(train_df)
model2 = fm_cvModel.bestModel

print(f"\nBest hyperparameters:\n\t initStd - {model2.getInitStd()}\n\t regParam - {model2.getRegParam()}\n\t factorSize - {model2.getFactorSize()}\n")

model2

Hyperparameters:
	 regParam - [0.0, 0.5]
	 initStd - [0.01, 0.05]
	 factorSize - [4, 8]

Best hyperparameters:
	 initStd - 0.05
	 regParam - 0.0
	 factorSize - 8



FMClassificationModel: uid=FMClassifier_3c5286ea03fb, numClasses=2, numFeatures=481, factorSize=8, fitLinear=true, fitIntercept=true

In [48]:
model2.write().overwrite().save("/user/team22/project/models/model2")
run("hdfs dfs -get /user/team22/project/models/model2 ~/bigdata-final-project/models/model2")

''

In [31]:
fm_predictions_grid = model2.transform(test_df)
fm_predictions_grid.show(10)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(481,[0,24,32,49,...|  0.0|[597.955625362807...|[1.0,2.0472528384...|       0.0|
|(481,[0,24,32,49,...|  1.0|[-674.62556143144...|           [0.0,1.0]|       1.0|
|(481,[0,24,32,49,...|  1.0|[-1117.4210474668...|           [0.0,1.0]|       1.0|
|(481,[0,24,32,49,...|  0.0|[836.823753741428...|           [1.0,0.0]|       0.0|
|(481,[0,24,32,49,...|  0.0|[805.559720376552...|           [1.0,0.0]|       0.0|
|(481,[0,24,32,49,...|  1.0|[-709.01286573540...|           [0.0,1.0]|       1.0|
|(481,[0,24,32,49,...|  0.0|[309.718960200438...|[1.0,3.0957411791...|       0.0|
|(481,[0,24,32,49,...|  1.0|[-1600.6903232091...|           [0.0,1.0]|       1.0|
|(481,[0,24,32,49,...|  0.0|[376.616634874766...|[1.0,2.7382536857...|       0.0|
|(481,[0,24,32,4

In [35]:
fm_predictions_grid.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("/user/team22/project/output/model2_predictions")
run("hdfs dfs -cat /user/team22/project/output/model2_predictions/*.csv > ~/bigdata-final-project/output/model2_predictions.csv")

''

In [32]:
fm_evaluator_best_roc = BinaryClassificationEvaluator()\
  .setLabelCol("label")\
  .setRawPredictionCol("prediction")\
  .setMetricName("areaUnderROC")

fm_best_roc = fm_evaluator_best_roc.evaluate(fm_predictions_grid)

fm_evaluator_best_pr = BinaryClassificationEvaluator()\
  .setLabelCol("label")\
  .setRawPredictionCol("prediction")\
  .setMetricName("areaUnderPR")

fm_best_pr = fm_evaluator_best_pr.evaluate(fm_predictions_grid)

print(f"Test area under ROC for best model: {fm_best_roc}\nTest area under PR for best model: {fm_best_pr}")

Test area under ROC for best model: 0.999143825636828
Test area under PR for best model: 0.9989817614080468


In [33]:
print(f"Test area under ROC for first model: {fm_roc}\nTest area under PR for first model: {fm_pr}")
print(f"Test area under ROC for best model: {fm_best_roc}\nTest area under PR for best model: {fm_best_pr}")
print(f"\nTest area under ROC increased by {fm_best_roc - fm_roc} with optimization\nTest area under PR increased " + 
      f"by {fm_best_pr - fm_pr} with optimization")

Test area under ROC for first model: 0.9988274728734019
Test area under PR for first model: 0.9988706038360329
Test area under ROC for best model: 0.999143825636828
Test area under PR for best model: 0.9989817614080468

Test area under ROC increased by 0.00031635276342611984 with optimization
Test area under PR increased by 0.00011115757201396459 with optimization


## Comparison

In [36]:
# Create data frame to report performance of the models
models = [
    [str(model1),rf_best_roc, rf_best_pr], 
    [str(model2),fm_best_roc, fm_best_pr]
]

result_df = spark.createDataFrame(models, ["model", "Area under ROC", "Area under PR"])
result_df.show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------+------------------+------------------+
|model                                                                                                                               |Area under ROC    |Area under PR     |
+------------------------------------------------------------------------------------------------------------------------------------+------------------+------------------+
|RandomForestClassificationModel: uid=RandomForestClassifier_a343737ec309, numTrees=10, numClasses=2, numFeatures=481                |0.9999400910615863|0.9999701718559475|
|FMClassificationModel: uid=FMClassifier_3c5286ea03fb, numClasses=2, numFeatures=481, factorSize=8, fitLinear=true, fitIntercept=true|0.999143825636828 |0.9989817614080468|
+------------------------------------------------------------------------------------------------------------------------------------+-

In [50]:
# Save it to HDFS
result_df.coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ";")\
    .option("header","true")\
    .save("/user/team22/project/output/evaluation")
run("hdfs dfs -cat /user/team22/project/output/evaluation/*.csv > ~/bigdata-final-project/output/evaluation.csv")

''

## Store additional results

### Get best hyperparameters

In [38]:
# Create data frame to report performance of the models
hypoparams = [
    [str(model1),f'maxDepth = {model1.getMaxDepth()}', f'impurity = {model1.getImpurity()}', f'numTrees = {model1.getNumTrees}'], 
    [str(model2),f'initStd = {model2.getInitStd()}', f'regParam = {model2.getRegParam()}', f'factorSize = {model2.getFactorSize()}']
]

hypo_df = spark.createDataFrame(hypoparams, ["model", "Parameter 1", "Parameter 2", "Parameter 3"])
hypo_df.show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------+--------------+---------------+--------------+
|model                                                                                                                               |Parameter 1   |Parameter 2    |Parameter 3   |
+------------------------------------------------------------------------------------------------------------------------------------+--------------+---------------+--------------+
|RandomForestClassificationModel: uid=RandomForestClassifier_a343737ec309, numTrees=10, numClasses=2, numFeatures=481                |maxDepth = 10 |impurity = gini|numTrees = 10 |
|FMClassificationModel: uid=FMClassifier_3c5286ea03fb, numClasses=2, numFeatures=481, factorSize=8, fitLinear=true, fitIntercept=true|initStd = 0.05|regParam = 0.0 |factorSize = 8|
+----------------------------------------------------------------------------------------------

In [51]:
# Save it to HDFS
hypo_df.coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ";")\
    .option("header","true")\
    .save("/user/team22/project/output/hyperparameters")
run("hdfs dfs -cat /user/team22/project/output/hyperparameters/*.csv > ~/bigdata-final-project/output/hyperparameters.csv")

''

### Get result of optimization

In [40]:
# Create data frame to report performance of the models
optim_models = [
    [str(model1),rf_roc, rf_best_roc, rf_best_roc - rf_roc, rf_pr, rf_best_pr, rf_best_pr - rf_pr], 
    [str(model2),fm_roc, fm_best_roc, fm_best_roc - fm_roc, fm_pr, fm_best_pr, fm_best_pr - fm_pr]
]

optim_df = spark.createDataFrame(optim_models, ["model", "Initial area under ROC", "Optimized area under ROC", "Increase of area under ROC", 
                                                "Initial area under PR", "Optimized area under PR", "Increase of area under PR"])
optim_df.show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------+----------------------+------------------------+--------------------------+---------------------+-----------------------+-------------------------+
|model                                                                                                                               |Initial area under ROC|Optimized area under ROC|Increase of area under ROC|Initial area under PR|Optimized area under PR|Increase of area under PR|
+------------------------------------------------------------------------------------------------------------------------------------+----------------------+------------------------+--------------------------+---------------------+-----------------------+-------------------------+
|RandomForestClassificationModel: uid=RandomForestClassifier_a343737ec309, numTrees=10, numClasses=2, numFeatures=481                |0.9990340030748656  

In [52]:
# Save it to HDFS
optim_df.coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ";")\
    .option("header","true")\
    .save("/user/team22/project/output/optimization")

run("hdfs dfs -cat /user/team22/project/output/optimization/*.csv > ~/bigdata-final-project/output/optimization.csv")

''

### Predict sample on both models

In [118]:
# Get random sample and predict on both models
prediction_sample = df_labeled.sample(fraction=10/df_labeled.count(), seed=123)
models_pred = model2.transform(model1.transform(prediction_sample)\
                 .withColumnRenamed("prediction", "model1_prediction")\
                 .withColumnRenamed("rawPrediction", "model1_rawPrediction")\
                 .withColumnRenamed("probability", "model1_probability"))\
                 .withColumnRenamed("prediction", "model2_prediction")

In [119]:
prediction_result = models_pred.select(*(numerical + cyclical +\
                                               time_features + date_features +\
                                               boolean_features + categorical_as_cont +\
                                               categorical_ohe + ['label', "model1_prediction", "model2_prediction"]))

In [120]:
# Save it to HDFS
prediction_result.coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ";")\
    .option("header","true")\
    .save("/user/team22/project/output/prediction_samples")

run("hdfs dfs -cat /user/team22/project/output/prediction_samples/*.csv > ~/bigdata-final-project/output/prediction_samples.csv")

''