In [1]:
!pip install pyspark




In [26]:
from pyspark.sql.functions import mean
from pyspark.sql.functions import when
from pyspark.sql import functions as F
import pyspark.sql.functions as f
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import GBTClassifier
from pyspark.mllib.evaluation import MulticlassMetrics
import matplotlib.pyplot

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier

Creating Spark Session

In [4]:
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Step1 - Defining the Schema before hand

In [5]:
from pyspark.sql.types import *
customSchema = StructType([
      StructField("Date",StringType(),True),
      StructField("Location",StringType(),True),
      StructField("MinTemp",FloatType(),True),
      StructField("MaxTemp",FloatType(),True),
      StructField("Rainfall",FloatType(),True),
      StructField("Evaporation",StringType(),True),
      StructField("Sunshine",StringType(),True),
      StructField("WindGustDir",StringType(),True),
      StructField("WindGustSpeed",FloatType(),True),
      StructField("WindDir9am",StringType(),True),
      StructField("WindDir3pm",StringType(),True),
      StructField("WindSpeed9am",FloatType(),True),
      StructField("WindSpeed3pm",FloatType(),True),
      StructField("Humidity9am",FloatType(),True),
      StructField("Humidity3pm",FloatType(),True),
      StructField("Pressure9am",FloatType(),True),
      StructField("Pressure3pm",FloatType(),True),
      StructField("Cloud9am",FloatType(),True),
      StructField("Cloud3pm",FloatType(),True),
      StructField("Temp9am",FloatType(),True),
      StructField("Temp3pm",FloatType(),True),
      StructField("RainToday",StringType(),True),
      StructField("RainTomorrow",StringType(),True)])

In [6]:
df = spark.read.csv("/weatherAUS.csv", header = True, schema = customSchema, nullValue = 'NA')
df = df.drop("Date", "Evaporation","Sunshine","Cloud9am", "Cloud3pm", 'WindGustDir', 'WindGustSpeed')

Dropping all the NA Values, I have tried imputing the NA values with the average but for some reason that dataframe kept on throwing some error. So, I am just dropping the NA values.

In [27]:
df = df.na.drop()

Step2 - Splitting the data into train and test. Train data allocates 80 percent of the total data while test data allocate about 20 percent of the total data


In [8]:
(train,test) = df.randomSplit([0.8,0.2])

Step3 -
1. Usging the StringIndexer and OneHotEncoder to convert the categorical values into binay SparseVectors
2. Creating label column for 'RainTomorrow' in binary format
3. Using VectorAssembler to transform the features


In [12]:
categoricalColumns = ["Location", "WindDir9am", "WindDir3pm", "RainToday"]
stages = [] # stages in Pipeline

for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index",)
    #stringIndexer = stringIndexer.setHandleInvalid("keep")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.
    stages += [stringIndexer, encoder]

In [13]:
#stages

[StringIndexer_6d98386282b7,
 OneHotEncoder_650aa580bdf2,
 StringIndexer_23bcc2545898,
 OneHotEncoder_db38dce4208a,
 StringIndexer_0e3827627722,
 OneHotEncoder_59d5742f20a1,
 StringIndexer_bc8521dbe404,
 OneHotEncoder_e46ba23eae78]

In [14]:
# Creating the label column
label_stringIdx = StringIndexer(inputCol="RainTomorrow", outputCol="label")
# setHandleInvalid("skip"), the indexer adds new indexes when it sees new labels.
stages += [label_stringIdx]

In [15]:
# Transform all features into a vector using VectorAssembler
numericCols = ["MinTemp", "MaxTemp", "WindSpeed9am", "WindSpeed3pm", "Humidity9am", "Humidity3pm", "Temp9am", "Temp3pm", "Pressure9am", "Pressure3pm"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
#assembler = assembler.setHandleInvalid("skip")
stages += [assembler]


dtree = DecisionTreeClassifier(labelCol="label", featuresCol=assembler.getOutputCol())

Step 4 - Fitting the Machine Learning Model

In [16]:
paramGrid = (ParamGridBuilder()
    .addGrid(dtree.impurity, ['gini', 'entropy'])
    .addGrid(dtree.maxBins, [5, 10, 15])
    .addGrid(dtree.minInfoGain, [0.0, 0.2, 0.4])
    .addGrid(dtree.maxDepth, [3, 5, 7])
    .build())

In [17]:
evaluator = BinaryClassificationEvaluator()

In [18]:
cv = CrossValidator(estimator=dtree, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=4)
stages += [cv]

Step 5 - Creating pipeline to combine everything so far

In [19]:
from pyspark.ml import Pipeline

# Creating the pipeline with all the above steps
pipeline = Pipeline().setStages(stages)
pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)

In [20]:
best_model = pipeline_model.stages[-1].bestModel

In [21]:
best_modelobj = best_model._java_obj.parent()

best_modeldepth = best_modelobj.getMaxDepth()
best_modelbins = best_modelobj.getMaxBins()
best_modelimpurity = best_modelobj.getImpurity()
best_modelgain = best_modelobj.getMinInfoGain()

In [22]:
print("Best model grid params are ")
print(best_modeldepth)
print(best_modelbins)
print(best_modelimpurity)
print(best_modelgain)

Best model grid params are 
3
5
gini
0.2


In [23]:
evaluator.evaluate(predictions)

0.5

In [24]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.5


In [25]:
evaluator2 = BinaryClassificationEvaluator()
print("Test Area Under PR: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})))

Test Area Under PR: 0.2266174703964304
