<a href="https://colab.research.google.com/github/Bishop1303/ML_PySpark/blob/dev/ML_RandomForestEnsamble.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Carica il drive con i dati:
from google.colab import drive
drive.mount('/content/drive')

# Getting the softwares:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar -xvf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark

# To use spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

# SparkSession
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [67]:
from pyspark.ml.feature import Bucketizer, OneHotEncoder, VectorAssembler, StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier, GBTClassifier, RandomForestClassifier
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.sql.functions import round
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

### DATA PREPARATION ###

# Read raw data to pyspark
flights = spark.read.csv("/content/drive/My Drive/flights.csv", inferSchema=True, header=True, mode='FAILFAST')
#flights.show()
#flights.printSchema()

### DATA PREPARATION ###

# Remove the 'flight' column
flights_drop_column = flights.drop('flight')

# Remove records with missing 'delay' values or NA values
flights_none_missing = flights_drop_column.filter((flights_drop_column.delay.isNotNull()) & \
                                                  (flights_drop_column.delay != 'NA'))

# Change delay type to int
flights_none_missing = flights_none_missing.withColumn('delay', flights_none_missing['delay'].cast('int'))

# Check on dataframe
#print('The Schema is: ')
#flights_none_missing.printSchema()
#print('=====================================================')
#print('Informative rows after dropping malformed: ',flights_none_missing.count())


# Conversion: 'mile' to 'km' and drop 'mile' column
flights_km = flights_none_missing.withColumn('km', round(flights.mile * 1.60934, 0)).drop('mile')

# Creating 'label' column indicating whether flight delayed (1) or not (0)
flights_ready = flights_km.withColumn('label', (flights_km.delay >= 15).cast('integer'))

# Check records
#flights_ready.show(5)
#flights_ready.printSchema()


In [68]:
### DATA PREPARATION FOR ML ###

# Create buckets at 3 hour intervals through the day
#splits=[0,3,6,9,12,15,18,21,24]
#buckets = Bucketizer(splits=splits, inputCol='depart', outputCol='depart_bucket')

# Bucket the departure times
#bucketed = buckets.transform(flights_ready)
#bucketed.select('depart','depart_bucket').show(5)

# Create a one-hot encoder
#onehot = OneHotEncoder(inputCol=buckets.getOutputCol(), outputCol='depart_dummy')

# One-hot encode the bucketed departure times
#flights_onehot = onehot.fit(bucketed).transform(bucketed)
#flights_onehot.select('depart', 'depart_bucket', 'depart_dummy').show(5)

# Create an assembler object
assembler = VectorAssembler(inputCols=['mon','depart','duration'], outputCol='features')

# Consolidate predictor columns
flights_assembled = assembler.transform(bucketed)

# Check the resulting column
flights_assembled = flights_assembled.select('mon','depart','duration','features','label')

# Split into training and testing sets in a 80:20 ratio
flights_train, flights_test = flights_assembled.randomSplit([0.8, 0.2], seed=7)

In [66]:

# Create model objects and train on training data
tree = DecisionTreeClassifier().fit(flights_train)
gbt = GBTClassifier().fit(flights_train)

# Compare AUC on testing data
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(tree.transform(flights_test))
evaluator.evaluate(gbt.transform(flights_test))

# Find the number of trees and the relative importance of features
print('The number of trees is: ',gbt.getNumTrees)
print('The importance of the features is: ',gbt.featureImportances)

The number of trees is:  20
The importance of the features is:  (3,[0,1,2],[0.3530586081199667,0.314137501885979,0.33280388999405425])


In [69]:


forest = RandomForestClassifier()

# Create a parameter grid
params = ParamGridBuilder() \
            .addGrid(forest.featureSubsetStrategy, ['all', 'onethird', 'sqrt', 'log2']) \
            .addGrid(forest.maxDepth, [2, 5, 10]) \
            .build()

# Create a binary classification evaluator
evaluator = BinaryClassificationEvaluator()

# Create a cross-validator
cv = CrossValidator(estimator=forest,
                    estimatorParamMaps=params, 
                    evaluator=evaluator, 
                    numFolds=5
                    ).fit(flights_train)


# Evaluating Random Forest

* cv - a cross-validator which has already been fit to the training data

* evaluator — a BinaryClassificationEvaluator object

In [71]:
# Average AUC for each parameter combination in grid
avg_auc = cv.avgMetrics
print(avg_auc)

# Average AUC for the best model
print(max(cv.avgMetrics)) 

# What's the optimal parameter value for maxDepth?
print(cv.bestModel.explainParam('maxDepth'))
# What's the optimal parameter value for featureSubsetStrategy?
print(cv.bestModel.explainParam('featureSubsetStrategy'))

# AUC for best model on testing data
print('AUC for best model on testing data: ','{:,.3f}'.format(evaluator.evaluate(cv.transform(flights_test))))

[0.6237401501928062, 0.6627639948562409, 0.671435148541434, 0.6467624333232634, 0.6649439350208881, 0.6760481739876215, 0.6402999535732408, 0.6648675676141262, 0.6720475336770333, 0.6402999535732408, 0.6648675676141262, 0.6720475336770333]
0.6760481739876215
maxDepth: Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 5, current: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the features), 'sqrt' (use sqrt(number of features)), 'log2' (use log2(number of features)), 'n' (when n is in the range (0, 1.0], use n * number of features. When n is in the range (1, number of features), use n features). default = 'auto' (default: auto, current: oneth