# 0. Introduction

**This project is the final project for Distributed Big Data Systems for MSc1 Data Science and AI, University Nice Côte d'Azur.**

**Professors: Fabrice Huet and Boris Shminke.**

**Student: NGUYEN Huyen Trang.**

**In this final project, I will use Apache Spark to build a machine learning pipeline to:**

- Read data

- Transform data (extracting features, dealing with missing values, etc)

- Exploratory data analysis (EDA)

- Build different classification models (LogisticRegression, DecisionTreeClassifier, LinearSVC, GBTClassifier), each model with different parameters (hyperparameters tuning)

- Evaluate quality (using cross-validation and train/test split)

- Model evaluation (Evaluate different metrics: AreaUnderROC and AreaUnderPR).

**Data being used in this final project: binary classification problem about the possibility of rain in each day in Australia (target column: RainTomorrow, Binary output: ('Yes' or 'No'))**

**The data can be found in https://www.kaggle.com/datasets/jsphyg/weather-dataset-rattle-package.**

# 1. Reading data

## 1.1. Import important libraries

In [0]:
import json
#import pandas 
import os
from pyspark import pandas as pd
from pyspark.sql.functions import col, count, isnan, to_timestamp, when

import numpy as np
import pandas as pd
import plotly.express as px
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    OneHotEncoder,
    StandardScaler,
    StringIndexer,
    VectorAssembler,
)
from pyspark.ml.stat import Correlation
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, isnan, to_timestamp, when
from pyspark.sql.types import DoubleType



## 1.2. Reading data

In [0]:
dbutils.fs.ls("dbfs:///FileStore") 
dbutils.fs.mkdirs("file:///root/.kaggle/") 
dbutils.fs.cp("dbfs:///FileStore/kaggle.json","file:///root/.kaggle/") 
dbutils.fs.ls("file:///root/.kaggle/") 

Out[2]: [FileInfo(path='file:/root/.kaggle/kaggle.json', name='kaggle.json', size=68, modificationTime=1673566098201)]

In [0]:
!pip install kaggle 
import kaggle 

!kaggle datasets download -d jsphyg/weather-dataset-rattle-package     # ! is used for computer command system
#!unzip weather-dataset-rattle-package.zip     #if cluster in databricks is exist, no need to unzip again
!ls
rain_file = "weatherAUS.csv " #string of name file
file_path = os.path.join(os.getcwd(), rain_file)  
print(file_path)
file_path="file:///databricks/driver/weatherAUS.csv"

Collecting kaggle
  Using cached kaggle-1.5.12-py3-none-any.whl
Collecting python-slugify
  Using cached python_slugify-7.0.0-py2.py3-none-any.whl (9.4 kB)
Collecting tqdm
  Using cached tqdm-4.64.1-py2.py3-none-any.whl (78 kB)
Collecting text-unidecode>=1.3
  Using cached text_unidecode-1.3-py2.py3-none-any.whl (78 kB)
Installing collected packages: text-unidecode, tqdm, python-slugify, kaggle
Successfully installed kaggle-1.5.12 python-slugify-7.0.0 text-unidecode-1.3 tqdm-4.64.1
You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-ce13015d-6ae6-420c-8409-72480e496f71/bin/python -m pip install --upgrade pip' command.[0m
weather-dataset-rattle-package.zip: Skipping, found more recently modified local copy (use --force to force download)
azure	   hadoop_accessed_config.lst  weather-dataset-rattle-package.zip
conf	   logs			       weatherAUS.csv
eventlogs  metastore_db
ganglia    preload_class.lst
/databricks/driver/weatherAUS.csv 


**Here, I only choose option ('header' = True), I don't consider other options to output the datatype of each column, because in the beginning of EDA (Exploratory data analysis), I want to analyse all valuess in term of strings. After that, I will change the datatype of columns if nessesary.**

**In other dataset, with different purposes, we can always choose other options in reading file, such as inferSchema, delimiter, etc. as we want.**

In [0]:
df = spark.read.option("header", True).csv(file_path)

# 2. Exploratory data analysis (EDA)

**This part will contain:**

**2.1. Data overview**

**2.2. Dealing with missing values**

**2.3. Features extracting**

## 2.1. Data overview

We want to have a summary overview of the data set

In [0]:
df.summary().show()
#df.printSchema()  #uncomment this to check datatype in each columns

+-------+----------+--------+------------------+------------------+-----------------+-----------------+------------------+-----------+------------------+----------+----------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+---------+------------+
|summary|      Date|Location|           MinTemp|           MaxTemp|         Rainfall|      Evaporation|          Sunshine|WindGustDir|     WindGustSpeed|WindDir9am|WindDir3pm|      WindSpeed9am|      WindSpeed3pm|       Humidity9am|      Humidity3pm|       Pressure9am|      Pressure3pm|          Cloud9am|          Cloud3pm|           Temp9am|           Temp3pm|RainToday|RainTomorrow|
+-------+----------+--------+------------------+------------------+-----------------+-----------------+------------------+-----------+------------------+----------+----------+------------------+------------------+-------------

## 2.2. Dealing with missing values

### 2.2.1. Dealing missing values in target column (RainTomorrow)

We want to analyse the target column (RainTomorrow) to see how many unique values that this column contains.

In [0]:
df.groupby("RainTomorrow").count().show()
print(f"Total number of rows in the data set: {df.count()}")

+------------+------+
|RainTomorrow| count|
+------------+------+
|          NA|  3267|
|          No|110316|
|         Yes| 31877|
+------------+------+

Total number of rows in the data set: 145460


We can see that there are 3267 missing values in target column (RainTomorrow), over 145460 rows in dataset. It means 2.2% of the total row. It is propbably safe to remove these rows to clean the dataset. 

In [0]:
df = df.filter(df["RainTomorrow"] != "NA")
df.groupby("RainTomorrow").count().show()

+------------+------+
|RainTomorrow| count|
+------------+------+
|          No|110316|
|         Yes| 31877|
+------------+------+



### 2.2.2. Dealing missing values in other columns

#### Step 1: Remove columns that contain a lot of missing values

It is nessesary to analyse the unique values in each column

In [0]:
for col_name in df.columns:
    unique_val = df.select(col_name).distinct().collect()
    print(f"--> {col_name}")
    print(f"\tunique values count: {len(unique_val)}")
    if len(unique_val) <= 1000:
        print(f"\tunique values: {[val[col_name] for val in unique_val]}")

--> Date
	unique values count: 3436
--> Location
	unique values count: 49
	unique values: ['NorfolkIsland', 'Cobar', 'SydneyAirport', 'Wollongong', 'Williamtown', 'Moree', 'Sydney', 'BadgerysCreek', 'Albury', 'WaggaWagga', 'CoffsHarbour', 'Penrith', 'NorahHead', 'Newcastle', 'Richmond', 'Bendigo', 'Canberra', 'Mildura', 'Portland', 'Brisbane', 'Sale', 'Tuggeranong', 'Ballarat', 'MelbourneAirport', 'Dartmoor', 'Nhil', 'Melbourne', 'MountGinini', 'Watsonia', 'Cairns', 'Walpole', 'Woomera', 'Adelaide', 'PerthAirport', 'Albany', 'SalmonGums', 'Perth', 'GoldCoast', 'PearceRAAF', 'Witchcliffe', 'Nuriootpa', 'MountGambier', 'Townsville', 'Hobart', 'Darwin', 'Uluru', 'Katherine', 'AliceSprings', 'Launceston']
--> MinTemp
	unique values count: 390
	unique values: ['10.7', '8.5', '20.5', '-1.2', '8.2', '2.6', '7', '7.3', '26.5', '3.1', '12.8', '14.2', '16.6', '29.4', '17.1', '-2.4', '26.7', '15', '8.3', '-1', '22.4', '4.2', '-0.1', '9.2', '11', '19.3', '18.1', '15.8', '-3.8', '12.4', '22.5', '29

As we can see above, in some columns, there are "NA" missing values. We want to find the percentage of missing values in each column. We will create a function called missing_percent for this step:


In [0]:
def missing_percent(df):
    df.select([(count(when(isnan(c)
                    | col(c).isNull()
                    | (col(c) == "NA"), c))/(df.count())*100).alias(c) for c in df.columns]).show()

missing_percent(df)

+----+--------+-------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------+
|Date|Location|            MinTemp|           MaxTemp|          Rainfall|      Evaporation|          Sunshine|      WindGustDir|     WindGustSpeed|       WindDir9am|        WindDir3pm|     WindSpeed9am|      WindSpeed3pm|      Humidity9am|       Humidity3pm|     Pressure9am|      Pressure3pm|         Cloud9am|         Cloud3pm|          Temp9am|           Temp3pm|         RainToday|RainTomorrow|
+----+--------+-------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+-----------------+

As we can see, there are some columns containing a lot of missing values, such as 'Evaporation','Sunshine','Cloud9am', 'Cloud3pm', 'Pressure9am', 'Pressure3pm'.

These columns should be removed as they do not bring enough information to our project. We define here a function called drop_col to drop column as we want

In [0]:
def drop_col(col):
    global df
    if col in df.columns:
        df = df.drop(col)

for col in ['Evaporation','Sunshine','Cloud9am', 'Cloud3pm', 'Pressure9am', 'Pressure3pm']:
    drop_col(col)

#### Step 2: Filling missing values by mode value (for categorical columns), and by median value (for numeric columns)

We define the categorical and numerical columns remained.

In [0]:
num_col = ['MinTemp', 'MaxTemp', 'Rainfall', 'WindGustSpeed', 'WindSpeed9am', 'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Temp9am', 'Temp3pm']
cat_col = ['Date', 'Location', 'WindGustDir', 'WindDir9am', 'WindDir3pm', 'RainToday', 'RainTomorrow']

We change the datatype of all columns in list num_col above from 'string' to 'double'

In [0]:
for col in num_col:
    df = df.withColumn(col, df[col].cast("double"))

We fill missing values in categorical columns by the mode of that column:

In [0]:
from pyspark.sql import functions as F
for col_name in cat_col:
    count_mode_val = df.groupBy(col_name).count().filter(F.col(col_name).isNotNull()).agg(F.max("count")).collect()[0][0]
    mode_val = df.groupBy(col_name).count().filter(F.col(col_name).isNotNull()).filter(F.col("count") == count_mode_val).select(col_name).collect()[0][0]
    df = df.withColumn(col_name,F.when(F.col(col_name)=="NA",mode_val).otherwise(F.col(col_name)))

We fill missing values in numerical columns by the median of that column:

In [0]:
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols = num_col, outputCols = num_col).setStrategy('median')
df = imputer.fit(df).transform(df)

## 2.3. Features extracting

### 2.3.1. Correlation between numerical columns and target column (RainTomorrow)

We want to analyse the correlation between numerical column and target column (RainTomorrow)

In [0]:
#change data in target column from string to integer 
from pyspark.sql import functions as F
df = df.withColumn('RainTomorrow',F.when(F.col("RainTomorrow")=="Yes","1").otherwise("0")) #('Yes' == '1' and 'No' == '0')
df = df.withColumn('Raintomorrow', df['RainTomorrow'].cast("double"))                      #'1' and '0' from string to double

#dataframe that contains all numerical column and target column (RainTomorrow)
df1 = df.select(num_col + ['RainTomorrow'])  

#correlation between numerical columns and target column (RainTomorrow)
for col in df1.columns:
    print(col, df1.corr('RainTomorrow', col))

MinTemp 0.08370136845272902
MaxTemp -0.15905986368317063
Rainfall 0.23508696663173922
WindGustSpeed 0.22476600699195176
WindSpeed9am 0.0904461199509426
WindSpeed3pm 0.08697306340610542
Humidity9am 0.2552920750958114
Humidity3pm 0.4397406441924282
Temp9am -0.025652834276214842
Temp3pm -0.1904619160412507
RainTomorrow 1.0


#### Remove numerical columns that do not have strong correlation to target column

As we can see, there are some columns that have very small correlation to the target RainTomorrow, such as columns: 'Temp9am', 'WindSpeed9am', 'WindSpeed3pm', 'MinTemp'. We drop these columns as they does not strongly related in predicting the target.


In [0]:
for col in ['Temp9am','WindSpeed9am','WindSpeed3pm', 'MinTemp']:
    drop_col(col)

***Besides, it is interesting to note that the features that have strongest relations to target are Humidity3pm, Humidity9am and Rainfall. This is quite easy to understand because in reality, the posibility of raining depends a lot in humidity. It means that the data is related really well to the reality.***

### 2.3.2. Dealing with Date column

We continue to analyse the column 'Date'. We will separate this column into 3 part: day, month and year, and creat 3 new columns for each information

In [0]:
df = df.withColumn(
    colName="datetime",
    col=to_timestamp(df.Date),   #based on column 'Date', we creat column 'datetime' with datatype is timestamp
)

from pyspark.sql.functions import *
from pyspark.sql.functions import year, month, dayofmonth
df = df.withColumn('dayOfMonth', dayofmonth(col('datetime')))  #separate 'datetime' into 3 new columns: dayOfMonth, month, year
df = df.withColumn('month', month(col('datetime')))
df = df.withColumn('year', year(col('datetime')))

for col in ['Date', 'datetime']:   #drop the columns 'Date' and 'datetime'
    drop_col(col)         

for col in ['dayOfMonth', 'month', 'year']:
    df = df.withColumn(col, df[col].cast("double"))

df.show(3)

+--------+-------+--------+-----------+-------------+----------+----------+-----------+-----------+-------+---------+------------+----------+-----+------+
|Location|MaxTemp|Rainfall|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|Humidity9am|Humidity3pm|Temp3pm|RainToday|Raintomorrow|dayOfMonth|month|  year|
+--------+-------+--------+-----------+-------------+----------+----------+-----------+-----------+-------+---------+------------+----------+-----+------+
|  Albury|   22.9|     0.6|          W|         44.0|         W|       WNW|       71.0|       22.0|   21.8|       No|         0.0|       1.0| 12.0|2008.0|
|  Albury|   25.1|     0.0|        WNW|         44.0|       NNW|       WSW|       44.0|       25.0|   24.3|       No|         0.0|       2.0| 12.0|2008.0|
|  Albury|   25.7|     0.0|        WSW|         46.0|         W|       WSW|       38.0|       30.0|   23.2|       No|         0.0|       3.0| 12.0|2008.0|
+--------+-------+--------+-----------+-------------+----------+------

# 3. Classification

## 3.1. Splitting data

In [0]:
for col in ['dayOfMonth', 'month', 'year']:
    df = df.withColumn(col, df[col].cast("string"))

In [0]:
train, test = df.randomSplit(weights=[0.8, 0.2], seed=42)
print(f"Number of observations in the train set: {train.count()}")
print(f"Number of observations in the test set: {test.count()}")

Number of observations in the train set: 113916
Number of observations in the test set: 28277


We define here the categorical and numerical columns for data set in fitting models

In [0]:
CAT_COLS_PRED = ['Location', 'WindGustDir', 'WindDir9am', 'WindDir3pm', 'RainToday', 'dayOfMonth', 'year', 'month']
NUM_COLS_PRED = ['MaxTemp', 'Rainfall', 'WindGustSpeed', 'Humidity9am', 'Humidity3pm', 'Temp3pm', 'Raintomorrow']
TARGET_COL = 'Raintomorrow'

## 3.2. OneHotEncoder for categorical columns

Preprocess categorical columns by OneHotEncoder

In [0]:
CAT_COLS_INDEXER = [f"{cat_col}_indexer" for cat_col in CAT_COLS_PRED]
CAT_COLS_ONEHOT = [f"{cat_col}_vec" for cat_col in CAT_COLS_PRED]

cat_stages = [
    StringIndexer(
        inputCols=CAT_COLS_PRED,
        outputCols=CAT_COLS_INDEXER,
    ),
    OneHotEncoder(
        inputCols=CAT_COLS_INDEXER,
        outputCols=CAT_COLS_ONEHOT,
        dropLast=True,
    ),
]

#Pipeline
Pipeline(stages=cat_stages).fit(train).transform(train).show(2)

+--------+-------+--------+-----------+-------------+----------+----------+-----------+-----------+-------+---------+------------+----------+-----+------+----------------+-------------------+------------------+------------------+-----------------+------------------+------------+-------------+--------------+---------------+--------------+--------------+-------------+---------------+--------------+--------------+
|Location|MaxTemp|Rainfall|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|Humidity9am|Humidity3pm|Temp3pm|RainToday|Raintomorrow|dayOfMonth|month|  year|Location_indexer|WindGustDir_indexer|WindDir9am_indexer|WindDir3pm_indexer|RainToday_indexer|dayOfMonth_indexer|year_indexer|month_indexer|  Location_vec|WindGustDir_vec|WindDir9am_vec|WindDir3pm_vec|RainToday_vec| dayOfMonth_vec|      year_vec|     month_vec|
+--------+-------+--------+-----------+-------------+----------+----------+-----------+-----------+-------+---------+------------+----------+-----+------+----------------

## 3.3. Scaling numerical columns

Preprocess numerical columns by scaling

In [0]:
num_stages = [
    VectorAssembler(
        inputCols=NUM_COLS_PRED,
        outputCol="assembled_num",
    ),
    StandardScaler(
        inputCol="assembled_num",
        outputCol="scaled_num",
    ),
]

#Pipeline
Pipeline(stages=num_stages).fit(train).transform(train).show(20, truncate=False)

+--------+-------+--------+-----------+-------------+----------+----------+-----------+-----------+-------+---------+------------+----------+-----+------+---------------------------------+---------------------------------------------------------------------------------------------------------------------------------------+
|Location|MaxTemp|Rainfall|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|Humidity9am|Humidity3pm|Temp3pm|RainToday|Raintomorrow|dayOfMonth|month|year  |assembled_num                    |scaled_num                                                                                                                             |
+--------+-------+--------+-----------+-------------+----------+----------+-----------+-----------+-------+---------+------------+----------+-----+------+---------------------------------+---------------------------------------------------------------------------------------------------------------------------------------+
|Albury  |6.8    |10.0   

In [0]:
feature_assembler = [
    VectorAssembler(
        inputCols=CAT_COLS_ONEHOT + ["scaled_num"],
        outputCol="features",
    )
]

In [0]:
target_stages = [StringIndexer(inputCol=TARGET_COL, outputCol='label')]

In [0]:
# Show the effect of the target_stages
Pipeline(stages=target_stages).fit(train).transform(train).show(2)

+--------+-------+--------+-----------+-------------+----------+----------+-----------+-----------+-------+---------+------------+----------+-----+------+-----+
|Location|MaxTemp|Rainfall|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|Humidity9am|Humidity3pm|Temp3pm|RainToday|Raintomorrow|dayOfMonth|month|  year|label|
+--------+-------+--------+-----------+-------------+----------+----------+-----------+-----------+-------+---------+------------+----------+-----+------+-----+
|  Albury|    6.8|    10.0|         NE|         30.0|         S|       SSE|       96.0|       93.0|    6.4|      Yes|         1.0|      15.0|  7.0|2015.0|  1.0|
|  Albury|    7.5|     0.0|        ENE|         15.0|         N|        SE|       99.0|       79.0|    7.4|       No|         0.0|      26.0|  6.0|2016.0|  0.0|
+--------+-------+--------+-----------+-------------+----------+----------+-----------+-----------+-------+---------+------------+----------+-----+------+-----+
only showing top 2 rows



In [0]:
pipe = Pipeline(stages=cat_stages + num_stages + feature_assembler + target_stages)
preproc_model = pipe.fit(train)

In [0]:
preproc_train = preproc_model.transform(train).select("features", "label")
preproc_train.show(2, truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                                                                                                                                    |label|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|(152,[7,61,68,83,112,125,138,145,146,147,148,149,150,151],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,0.9558934446874955,1.176565290001742,2.2810103971514066,5.0744856724110345,4.5311528538563355,0.9305910792972667,2.3979574974202857])|1.0  |
|(152,[7,59,63,78,93,118,124,137,145,147,148,149,150],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1

In [0]:
preproc_test = preproc_model.transform(test).select("features", "label")
preproc_test.show(2, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                                                                                                                 |label|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|(152,[7,60,65,83,93,115,127,137,145,147,148,149,150],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.1526950362408033,1.1405051985757033,5.23306334967388,3.5079893062113565,1.1486983635075636])                     |0.0  |
|(152,[7,57,72,84,93,98,128,137,145,147,148,149,150,151],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.1808095493198474,2.813246156486735,4.281597286096811,4.4824307801589

## 3.4. Training models

**With the binary classification in this project, I used 4 different models: Logistic Regression, Decision Tree, Linear SVC, Gradient-Boosted Tree. In each model, I tried with different hyperparameters.**

**The crossvalidation will be used in building models also.**

**There are 2 metrics for binary classifcation will be presented below: AreaUnderROC and AreaUnderPR as being suggested from pyspark Binary Classification.**

*https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.BinaryClassificationEvaluator.html*

In [0]:
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, LinearSVC, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder

**Model Logistic Regression with different parameters (by grid).**

In [0]:
lr = LogisticRegression()
grid = (
    ParamGridBuilder()
    .addGrid(lr.maxIter, [80, 100])
    .addGrid(lr.regParam, [0.0, 1.0])
    .addGrid(lr.elasticNetParam, [0.0, 1.0])
    .build()
)
print(grid)
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=grid,
    evaluator=evaluator,
    parallelism=8,
    numFolds=5,
)
cvModel = cv.fit(preproc_train)

print(f"Result of cross validation for each combination of parameters (Logistic Regression): {cvModel.avgMetrics}")

print(f"Result of linear regression on the test set (area Under ROC) (Logistic Regression): {evaluator.evaluate(cvModel.transform(preproc_test))}")

print(f"Result of linear regression on the test set (area Under PR) (Logistic Regression): {evaluator.evaluate(cvModel.transform(preproc_test), {evaluator.metricName: 'areaUnderPR'})}")


[{Param(parent='LogisticRegression_2310168739e2', name='maxIter', doc='max number of iterations (>= 0).'): 80, Param(parent='LogisticRegression_2310168739e2', name='regParam', doc='regularization parameter (>= 0).'): 0.0, Param(parent='LogisticRegression_2310168739e2', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0}, {Param(parent='LogisticRegression_2310168739e2', name='maxIter', doc='max number of iterations (>= 0).'): 80, Param(parent='LogisticRegression_2310168739e2', name='regParam', doc='regularization parameter (>= 0).'): 0.0, Param(parent='LogisticRegression_2310168739e2', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 1.0}, {Param(parent='LogisticRegression_2310168739e2', name='maxIter', doc='max number of iterations (>= 0).'): 80, Param(parent=

**Model Decision Tree with different parameters (by grid)**

In [0]:
tree = DecisionTreeClassifier()
grid = (
    ParamGridBuilder()
    .addGrid(tree.maxDepth, [5, 10])
    .addGrid(tree.maxBins, [16, 32])
    .build()
)
print(grid)
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(
    estimator=tree,
    estimatorParamMaps=grid,
    evaluator=evaluator,
    parallelism=8,
    numFolds=5,
)
cvModel = cv.fit(preproc_train)
print(f"Result of cross validation for each combination of parameters (Decision Tree): {cvModel.avgMetrics}")

print(f"Result of linear regression on the test set (area Under ROC) (Decision Tree): {evaluator.evaluate(cvModel.transform(preproc_test))}")

print(f"Result of linear regression on the test set (area Under PR) (Decision Tree): {evaluator.evaluate(cvModel.transform(preproc_test), {evaluator.metricName: 'areaUnderPR'})}")


[{Param(parent='DecisionTreeClassifier_87cd726e4b43', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5, Param(parent='DecisionTreeClassifier_87cd726e4b43', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 16}, {Param(parent='DecisionTreeClassifier_87cd726e4b43', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5, Param(parent='DecisionTreeClassifier_87cd726e4b43', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 32}, {Param(parent='DecisionTreeClassifier_87cd726e4b43', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; d

**Model Linear SVC with different parameters (by grid)**

In [0]:
SVC = LinearSVC()
grid = (
    ParamGridBuilder()
    .addGrid(SVC.maxIter, [50, 100])
    .build()
)
print(grid)
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(
    estimator=SVC,
    estimatorParamMaps=grid,
    evaluator=evaluator,
    parallelism=8,
    numFolds=5,
)
cvModel = cv.fit(preproc_train)

print(f"Result of cross validation for each combination of parameters (SVC): {cvModel.avgMetrics}")

print(f"Result of linear regression on the test set (area Under ROC) (SVC): {evaluator.evaluate(cvModel.transform(preproc_test))}")

print(f"Result of linear regression on the test set (area Under PR) (SVC): {evaluator.evaluate(cvModel.transform(preproc_test), {evaluator.metricName: 'areaUnderPR'})}")


[{Param(parent='LinearSVC_9d099c264c26', name='maxIter', doc='max number of iterations (>= 0).'): 50}, {Param(parent='LinearSVC_9d099c264c26', name='maxIter', doc='max number of iterations (>= 0).'): 100}]
result of cross validation for each combination of parameters (SVC): [0.9999995257344445, 0.9999994959389837]
Result of linear regression on the test set (area Under ROC) (SVC): 0.9999992992032725
Result of linear regression on the test set (area Under PR) (SVC): 0.9999975816117964


**Model Gradient Boosted Trees with different parameters (by grid)**

In [0]:
gbt = GBTClassifier()
grid = (
    ParamGridBuilder()
    .addGrid(gbt.maxDepth, [5, 10])
    .addGrid(gbt.maxBins, [16, 32])
    .build()
)
print(grid)
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(
    estimator=gbt,
    estimatorParamMaps=grid,
    evaluator=evaluator,
    parallelism=8,
    numFolds=5,
)
cvModel = cv.fit(preproc_train)

print(f"Result of cross validation for each combination of parameters (Gradient-Boosted Trees): {cvModel.avgMetrics}")

print(f"Result of linear regression on the test set (area Under ROC) (Gradient-Boosted Trees): {evaluator.evaluate(cvModel.transform(preproc_test))}")

print(f"Result of linear regression on the test set (area Under PR) (Gradient-Boosted Trees): {evaluator.evaluate(cvModel.transform(preproc_test), {evaluator.metricName: 'areaUnderPR'})}")

[{Param(parent='GBTClassifier_d0242e1c8bad', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5, Param(parent='GBTClassifier_d0242e1c8bad', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 16}, {Param(parent='GBTClassifier_d0242e1c8bad', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5, Param(parent='GBTClassifier_d0242e1c8bad', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 32}, {Param(parent='GBTClassifier_d0242e1c8bad', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. 

### Summary

**The purpose of this project (managing to use Apache Spark in creating machine learning pipeline) has been presented. The principal of a machine learning pipeline have been performed by Apache Spark such as:**

- Read data

- Transform data (extracting features, dealing with missing values, etc)

- Exploratory data analysis (EDA)

- Build different classification models (LogisticRegression, DecisionTreeClassifier, LinearSVC, GBTClassifier), each model with different parameters (hyperparameters tuning)

- Evaluate quality (using cross-validation and train/test split)

- Model evaluation (Evaluate different metrics: AreaUnderROC and AreaUnderPR).

*In the end, all four models (Logistic Regression, Decision Tree, Linear SVC, Gradient-Boosted Tree) perform well in this dataset. For the other dataset and classification problem, the prediction result can be changed.*