
***
# Classification - ML Section

***


## Importing Packages 

In [65]:
 pip install pyspark 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [66]:
import warnings


import pyspark
from pyspark.sql import functions as F
from pyspark.sql import types
from pyspark.sql.functions import col, isnan, when, count
from pyspark.sql.functions import regexp_extract, regexp_replace
from pyspark.sql.functions import substring_index
from pyspark.ml import Pipeline

#for EDA 
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np 


# for ml Classification 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import Imputer,StandardScaler,StringIndexer, VectorAssembler, VectorIndexer, OneHotEncoder
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col, trim, lower


warnings.filterwarnings('ignore')

## Loading Dataset 

First Connect to spark server >> to access to spark cluster, we using SparkSession

In [67]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [68]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [69]:
#read csv file with header 
df = spark.read.csv('/content/drive/MyDrive/SaudiWeather.csv', header='true', inferSchema='true', sep=',')

In [70]:
df.show()

+----+--------------------+-----------+-------------------+----------+----------+---------+--------------------+---------+---------------+---------+-------------------+---------------+-------------------------+--------------------+-----+---+----+------+------+-----------+--------------+--------------------------+
|YEAR|        station_name| station_id|   observation_date|  latitude| longitude|elevation|wind_direction_angle|wind_type|wind_speed_rate|sky_cavok|visibility_distance|air_temperature|air_temperature_dew_point|            GEOPOINT|month|day|hour|minute|season|Season_name|humidity_level|air_temperature_categories|
+----+--------------------+-----------+-------------------+----------+----------+---------+--------------------+---------+---------------+---------+-------------------+---------------+-------------------------+--------------------+-----+---+----+------+------+-----------+--------------+--------------------------+
|2022|             AL BAHA|41055099999|2022-12-10 00:00

In [71]:
df.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- station_name: string (nullable = true)
 |-- station_id: long (nullable = true)
 |-- observation_date: timestamp (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- wind_direction_angle: integer (nullable = true)
 |-- wind_type: string (nullable = true)
 |-- wind_speed_rate: double (nullable = true)
 |-- sky_cavok: string (nullable = true)
 |-- visibility_distance: integer (nullable = true)
 |-- air_temperature: double (nullable = true)
 |-- air_temperature_dew_point: integer (nullable = true)
 |-- GEOPOINT: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- season: integer (nullable = true)
 |-- Season_name: string (nullable = true)
 |-- humidity_level: string (nullable = true)
 |-- air_temperature_categories: string (nul

***
# ML Classification Section 

***

## Feature Engineering and Feature Selection

Here we will encode all the categorical columns choosing as a features using StringIndexer and drop the original columns.

In [72]:
## define variable contain all columns needed

cat_cols=['station_name','wind_type','sky_cavok','Season_name','humidity_level','air_temperature_categories']

In [73]:
#This step will label encode all the categorical columns and store them in different columns with the same name + '_', 

for col in cat_cols:
    indexer = StringIndexer(inputCol=col, outputCol=col+'_')
    df = indexer.fit(df).transform(df) #here we fit and transform the data altogether
    
df = df.drop(*cat_cols) #we will drop all the categorical columns we defined earlier

In [74]:
## drop unedded column in ML 

drop_cols = ['GEOPOINT' , 'station_id','observation_date']

df = df.drop(*drop_cols)


In [75]:
df.show()

+----+----------+----------+---------+--------------------+---------------+-------------------+---------------+-------------------------+-----+---+----+------+------+-------------+----------+----------+------------+---------------+---------------------------+
|YEAR|  latitude| longitude|elevation|wind_direction_angle|wind_speed_rate|visibility_distance|air_temperature|air_temperature_dew_point|month|day|hour|minute|season|station_name_|wind_type_|sky_cavok_|Season_name_|humidity_level_|air_temperature_categories_|
+----+----------+----------+---------+--------------------+---------------+-------------------+---------------+-------------------------+-----+---+----+------+------+-------------+----------+----------+------------+---------------+---------------------------+
|2022| 20.296139| 41.634277|  1672.13|                 350|            4.6|               9900|           17.0|                       14|   12| 10|   0|     0|     3|         23.0|       0.0|       1.0|         3.0|     

>> First Combining Feature Columns

In [76]:
#define columns
cols = df.columns
cols.remove('air_temperature_categories_') #remove air_temperature_categories_ -> we need this to be our label

#vector assembler will take all the columns and convert them into one column called features
assembler = VectorAssembler(inputCols=cols, outputCol='features')

#the .transform will apply the changes here
df = assembler.transform(df)

In [77]:
#Initialize Standard Scaler
stdScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

#Fit the Standard Scaler to the indexed Dataframe
scaler = stdScaler.fit(df)

#Transform the dataframe
df_scaled =scaler.transform(df)

In [78]:
#show database after scaled
df.show(5)

+----+----------+----------+---------+--------------------+---------------+-------------------+---------------+-------------------------+-----+---+----+------+------+-------------+----------+----------+------------+---------------+---------------------------+--------------------+
|YEAR|  latitude| longitude|elevation|wind_direction_angle|wind_speed_rate|visibility_distance|air_temperature|air_temperature_dew_point|month|day|hour|minute|season|station_name_|wind_type_|sky_cavok_|Season_name_|humidity_level_|air_temperature_categories_|            features|
+----+----------+----------+---------+--------------------+---------------+-------------------+---------------+-------------------------+-----+---+----+------+------+-------------+----------+----------+------------+---------------+---------------------------+--------------------+
|2022| 20.296139| 41.634277|  1672.13|                 350|            4.6|               9900|           17.0|                       14|   12| 10|   0|     

## Split Data

>> Just like always, before building a model we shall split our scaled dataset into training & test sets. Training Dataset = 80% Test Dataset = 20%

In [79]:
# We have created a new dataframe only consisting of the features column and the label column 
df_data = df_scaled.select(F.col('features_scaled'), F.col('air_temperature_categories_').alias('label'))

#simple data splitting
df_train, df_test = df_data.randomSplit([0.8, 0.2])

In [80]:
df_data.show()

+--------------------+-----+
|     features_scaled|label|
+--------------------+-----+
|[1288.69145462652...|  2.0|
|[1288.69145462652...|  2.0|
|[1288.69145462652...|  2.0|
|[1288.69145462652...|  3.0|
|[1288.69145462652...|  3.0|
|[1288.69145462652...|  3.0|
|[1288.69145462652...|  2.0|
|[1288.69145462652...|  0.0|
|[1288.69145462652...|  2.0|
|[1288.69145462652...|  3.0|
|[1288.69145462652...|  2.0|
|[1288.69145462652...|  3.0|
|[1288.69145462652...|  3.0|
|[1288.69145462652...|  2.0|
|[1288.69145462652...|  2.0|
|[1288.69145462652...|  0.0|
|[1288.69145462652...|  2.0|
|[1288.69145462652...|  2.0|
|[1288.69145462652...|  2.0|
|[1288.69145462652...|  2.0|
+--------------------+-----+
only showing top 20 rows



>> define evaluater for **multiclassclassification** 

In [81]:
#define evaluater
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

## Train and Evaluation Models



### 1st Classification Model: **Logistic Regression**

In [82]:
# -- LogisticRegression --

lr= LogisticRegression(featuresCol='features_scaled', labelCol='label',  maxIter=10, tol=1E-6, fitIntercept=True)


# train the multiclass model.
lr_model= lr.fit(df_train)

#train the model
lr_pred= lr_model.transform(df_test)

lr_pred.show(7,False)


+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+----------+
|features_scaled                                                                                                                                                                                                                                                                                                                            |label|rawPrediction                                                                 |probability                                                      

In [83]:
#Evaluate the LogisticRegression Model

lr_acc = evaluator.evaluate(lr_pred)

#print accuracy
print("Logistic regression accuracy =", '{:.2%}'.format(lr_acc))
print("Test Error = " ,'{:.2%}'.format (1.0 - lr_acc))

Logistic regression accuracy = 93.77%
Test Error =  6.23%


### 2nd Classification Model : **Random Forest Classification** 


In [84]:
# -- Random Forest Classifier --

rfc= RandomForestClassifier(labelCol="label", featuresCol="features_scaled", numTrees=10)

#instantiate the model
rfcـmodel= rfc.fit(df_train)

#train the model
rfc_pred= rfcـmodel.transform(df_test)


In [85]:
#Evaluate the **Random Forest Classification** Model

rfc_acc = evaluator.evaluate(rfc_pred)

#print accuracy
print("Random Forest Classification accuracy =", '{:.2%}'.format(rfc_acc))
print("Test Error = " ,'{:.2%}'.format (1.0 - rfc_acc))

Random Forest Classification accuracy = 95.56%
Test Error =  4.44%


### 3rd Classification Model: **Decision Tree Classifier**

In [86]:
# -- Decision Tree Classifier --

dtc = DecisionTreeClassifier(labelCol="label", featuresCol="features_scaled")          

#instantiate the model
dtc_model = dtc.fit(df_train)                                                        

#train the model
dtc_pred = dtc_model.transform(df_test)


In [87]:
#Evaluate the **Decision Tree Classifier** Model

dtc_acc = evaluator.evaluate(dtc_pred)

#print accuracy
print("Decision Tree Classifier accuracy =", '{:.2%}'.format(dtc_acc))
print("Test Error = " ,'{:.2%}'.format (1.0 - dtc_acc))

Decision Tree Classifier accuracy = 99.48%
Test Error =  0.52%


## Show ML Evaluation as Dataframe


In [88]:
# Accuracy Metric
evaluator_A = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# F1 Metric
evaluator_F = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

# Weighted Precision
evaluator_P = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")

# Weighted Recall
evaluator_R = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")

# Our models
models = [lr_pred,rfc_pred, dtc_pred]

# Empty lists that will store the scores for each metric for each model.
accuracy = []
F1 = []
precision = []
recall = []

# Simple loop to populate the empty lists with scores of models for each metric.
for model in models:
    accuracy.append(evaluator_A.evaluate(model))
    F1.append(evaluator_F.evaluate(model))
    precision.append(evaluator_P.evaluate(model))
    recall.append(evaluator_R.evaluate(model))

In [89]:
# Now will convert all lists created above into a dataframe for easy viewing.
df_ev = pd.DataFrame(list(zip(accuracy, F1, precision, recall)), 
                     columns = ['Accuracy', 'F1-Score', 'Weighted Precision', 'Weighted Recall'],
                     index = ['Logistic Regression','Random Forest', 'Random Decision Tree'])

In [90]:
#print the final form of result tabel

df_ev

Unnamed: 0,Accuracy,F1-Score,Weighted Precision,Weighted Recall
Logistic Regression,0.937688,0.937688,0.937717,0.937688
Random Forest,0.955556,0.953363,0.960361,0.955556
Random Decision Tree,0.994792,0.994817,0.994983,0.994792


## Model Optimization - Hyperparameter Tuning 

### Grid Search-- **1: Logistic Regression** 

In [91]:
# Create ParamGrid for Cross Validation

LrParamGrid = (ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.5, 2]).addGrid(lr.elasticNetParam, [0.0, 0.5,1.0]).addGrid(lr.maxIter,[1,5,10]).build())
lr_cv = CrossValidator(estimator=lr, estimatorParamMaps=LrParamGrid, evaluator=evaluator, numFolds=5)
lr_cvModel= lr_cv.fit(df_train)

#Best Model Performance
Lr_Predictions= lr_cvModel.transform(df_test)
print("Best Model Test Area Under ROC", evaluator.evaluate(Lr_Predictions))  


Best Model Test Area Under ROC 0.9415927522744107


### Grid Search-- **2: Random Forest Classifier** 

In [92]:
ParamGrid = (ParamGridBuilder().addGrid(rfc.numTrees, [10,15,20]).addGrid(rfc.maxDepth, [10,15,20]).addGrid(rfc.maxBins, [30,40,50]).build())
cv = CrossValidator(estimator=rfc, estimatorParamMaps=ParamGrid, evaluator=evaluator, numFolds=5)
rfc_cvModel= cv.fit(df_train)

#Best Model Performance
Predictions= rfc_cvModel.transform(df_test)
print("Best Model Test Area Under ROC", evaluator.evaluate(Predictions))

Best Model Test Area Under ROC 0.9980285274733623


### Grid Search-- **3: Decision Tree Classifier** 

In [93]:
# Create ParamGrid for Cross Validation

DtcParamGrid = (ParamGridBuilder().addGrid(dtc.maxDepth, [2,3,5,10,15, 20]).addGrid(dtc.maxBins, [5,10,20,30,40,50]).build())

# Create CrossValidator

dtc_cv = CrossValidator(estimator=dtc, estimatorParamMaps=DtcParamGrid, evaluator=evaluator, numFolds=5)
dtc_cvModel= dtc_cv.fit(df_train)

#Best Model Performance
DtPredictions= dtc_cvModel.transform(df_test)
print("Best Model Test Area Under ROC", evaluator.evaluate(DtPredictions))

Best Model Test Area Under ROC 0.9961368255113863
