In [72]:
!pip install pyspark



In [73]:
from pyspark.sql.types import *
customSchema = StructType([
      StructField("Date",StringType(),True),
      StructField("Location",StringType(),True),
      StructField("MinTemp",FloatType(),True),
      StructField("MaxTemp",FloatType(),True),
      StructField("Rainfall",FloatType(),True),
      StructField("Evaporation",StringType(),True),
      StructField("Sunshine",StringType(),True),
      StructField("WindGustDir",StringType(),True),
      StructField("WindGustSpeed",FloatType(),True),
      StructField("WindDir9am",StringType(),True),
      StructField("WindDir3pm",StringType(),True),
      StructField("WindSpeed9am",FloatType(),True),
      StructField("WindSpeed3pm",FloatType(),True),
      StructField("Humidity9am",FloatType(),True),
      StructField("Humidity3pm",FloatType(),True),
      StructField("Pressure9am",FloatType(),True),
      StructField("Pressure3pm",FloatType(),True),
      StructField("Cloud9am",FloatType(),True),
      StructField("Cloud3pm",FloatType(),True),
      StructField("Temp9am",FloatType(),True),
      StructField("Temp3pm",FloatType(),True),
      StructField("RainToday",StringType(),True),
      StructField("RainTomorrow",StringType(),True)
])
# Defining Custom Schema

### "Rain in Australia" Prediction

In [74]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [75]:
# importing the data and setting the NA as Null values as NA
data = spark.read.csv("weatherAUS.csv", header="true", schema=customSchema,nullValue= 'NA')
data = data.drop("Date", "Evaporation","Sunshine","Cloud9am", "Cloud3pm", 'WindGustDir', 'WindGustSpeed')


In [None]:
data = data.na.drop() # Dropping the Null values
data.show()

In [77]:
# Split the data 80/20 train/test, using a seed of 12345
(train, test) = data.randomSplit([0.8, 0.2])

In [78]:
# Listing the categoricalColumns i.e., String columns except the RainTomorrow
categoricalColumns = ["Location", "WindDir9am", "WindDir3pm", "RainToday"]

In [79]:
stages = [] # stages in Pipeline

Convert categorical variables into one-hot encoded variables

In [80]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.
    stages += [stringIndexer, encoder]

In [81]:
stages

[StringIndexer_f46cc6a5689f,
 OneHotEncoder_54aa1465a250,
 StringIndexer_2e36fd8bf3da,
 OneHotEncoder_10bdb3b1cfc5,
 StringIndexer_62226c854877,
 OneHotEncoder_249850633b0b,
 StringIndexer_3be38c7d41d2,
 OneHotEncoder_45dc978f049e]

In [82]:
# Creating the label column
label_stringIdx = StringIndexer(inputCol="RainTomorrow", outputCol="label")
# setHandleInvalid("skip"), the indexer adds new indexes when it sees new labels.
stages += [label_stringIdx]

In [83]:
# Transform all features into a vector using VectorAssembler
numericCols = ["MinTemp", "MaxTemp", "WindSpeed9am", "WindSpeed3pm", "Humidity9am", "Humidity3pm", "Temp9am", "Temp3pm", "Pressure9am", "Pressure3pm"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
dtree = DecisionTreeClassifier(labelCol="label", featuresCol=assembler.getOutputCol())


Use a parameter grid to determine the best parameters for:
impurity - gini, entropy
maxBins - 5, 10, 15
minInfoGain - 0.0, 0.2, 0.4
maxDepth - 3, 5, 7

In [84]:
paramGrid = (ParamGridBuilder()
    .addGrid(dtree.impurity, ['gini', 'entropy'])
    .addGrid(dtree.maxBins, [5, 10, 15])
    .addGrid(dtree.minInfoGain, [0.0, 0.2, 0.4])
    .addGrid(dtree.maxDepth, [3, 5, 7])
    .build())

In [85]:
evaluator = BinaryClassificationEvaluator()

Cross-validate with 4 folds

In [86]:
cv = CrossValidator(estimator=dtree, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=4)
stages += [cv]

In [87]:
from pyspark.ml import Pipeline


Use a pipeline to encapsulate all steps

In [88]:
# Creating the pipeline with all the above steps
pipeline = Pipeline().setStages(stages)


In [89]:
pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)


Print the parameters from the best model selected

In [90]:
best_model = pipeline_model.stages[-1].bestModel

In [91]:
best_modelobj = best_model._java_obj.parent()

best_modeldepth = best_modelobj.getMaxDepth()
best_modelbins = best_modelobj.getMaxBins()
best_modelimpurity = best_modelobj.getImpurity()
best_modelgain = best_modelobj.getMinInfoGain()

In [92]:
print("Best model grid params are ")
print(best_modeldepth)
print(best_modelbins)
print(best_modelimpurity)
print(best_modelgain)

Best model grid params are 
7
10
entropy
0.0


Calculate and print the Area under ROC Curve and Area under Precision-Recall Curve scores for your training and test data sets

In [93]:
evaluator.evaluate(predictions)

0.42225775465215

Area under ROC Curve

In [94]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.42225775465215


Area under Precision-Recall

In [95]:
evaluator2 = BinaryClassificationEvaluator()
print("Test Area Under PR: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})))

Test Area Under PR: 0.27348693322723666


https://docs.databricks.com/_static/notebooks/binary-classification.html

https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa