In [180]:
!pip install pyspark



In [None]:
from pyspark.sql.types import *
customSchema = StructType([
      StructField("Date",StringType(),True),
      StructField("Location",StringType(),True),
      StructField("MinTemp",FloatType(),True),
      StructField("MaxTemp",FloatType(),True),
      StructField("Rainfall",FloatType(),True),
      StructField("Evaporation",StringType(),True),
      StructField("Sunshine",StringType(),True),
      StructField("WindGustDir",StringType(),True),
      StructField("WindGustSpeed",FloatType(),True),
      StructField("WindDir9am",StringType(),True),
      StructField("WindDir3pm",StringType(),True),
      StructField("WindSpeed9am",FloatType(),True),
      StructField("WindSpeed3pm",FloatType(),True),
      StructField("Humidity9am",FloatType(),True),
      StructField("Humidity3pm",FloatType(),True),
      StructField("Pressure9am",FloatType(),True),
      StructField("Pressure3pm",FloatType(),True),
      StructField("Cloud9am",FloatType(),True),
      StructField("Cloud3pm",FloatType(),True),
      StructField("Temp9am",FloatType(),True),
      StructField("Temp3pm",FloatType(),True),
      StructField("RainToday",StringType(),True),
      StructField("RainTomorrow",StringType(),True)
])

In [181]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [215]:
# We are importing the data from csv file by providing path and assigning na to null values
DF = spark.read.csv('/content/sample_data/weatherAUS.csv',header="true", schema=customSchema,nullValue= 'NA')
DF = DF.drop("Date", "Evaporation","Sunshine","Cloud9am", "Cloud3pm", 'WindGustDir', 'WindGustSpeed')

# New Section

In [217]:
# For dropping the null values
DF = DF.na.drop()
DF.show(10)

+--------+-------+-------+--------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+-------+-------+---------+------------+
|Location|MinTemp|MaxTemp|Rainfall|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|
+--------+-------+-------+--------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+-------+-------+---------+------------+
|  Albury|   13.4|   22.9|     0.6|         W|       WNW|        20.0|        24.0|       71.0|       22.0|     1007.7|     1007.1|   16.9|   21.8|       No|          No|
|  Albury|    7.4|   25.1|     0.0|       NNW|       WSW|         4.0|        22.0|       44.0|       25.0|     1010.6|     1007.8|   17.2|   24.3|       No|          No|
|  Albury|   12.9|   25.7|     0.0|         W|       WSW|        19.0|        26.0|       38.0|       30.0|     1007.6|     1008.7|   21.0|   23.

In [233]:
# Splitting the data into 80/20 using the seed 12345
(training, test) =DF.randomSplit([0.8, 0.2],12345)

In [219]:
# String columns except the RainTomorrow(categorical values) are listed
categoricalColumns = ["Location", "WindDir9am", "WindDir3pm", "RainToday"]

In [221]:
# For stages in Pipeline
stages = [] 

In [222]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
for categoricalCol in categoricalColumns:
    # Using String Indexer for categorical indexing
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Conversion of  categorical variables into binary SparseVectors using OneHotEncoder
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # stages added
    stages += [stringIndexer, encoder]

In [223]:
stages

[StringIndexer_8d0dfef2d056,
 OneHotEncoder_8f9923fed1c1,
 StringIndexer_d3ffaec92cac,
 OneHotEncoder_16c0154fb0ef,
 StringIndexer_4a746a3e9f61,
 OneHotEncoder_700a0e6bcefe,
 StringIndexer_8afbc100b907,
 OneHotEncoder_37bdba7a6fda]

In [224]:
# Label Column
label_stringIdx = StringIndexer(inputCol="RainTomorrow", outputCol="label")
# setHandleInvalid("skip"), the indexer adds new indexes when it sees new labels.
stages += [label_stringIdx]

In [225]:
# All features transformation into a vector using VectorAssembler
numericCols = ["MinTemp", "MaxTemp", "WindSpeed9am", "WindSpeed3pm", "Humidity9am", "Humidity3pm", "Temp9am", "Temp3pm", "Pressure9am", "Pressure3pm"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
dtree = DecisionTreeClassifier(labelCol='RainTomorrowIndex' , featuresCol="features")


In [226]:
paramGrid = (ParamGridBuilder()
    .addGrid(dtree.impurity, ['gini', 'entropy'])
    .addGrid(dtree.maxBins, [5, 10, 15])
    .addGrid(dtree.minInfoGain, [0.0, 0.2, 0.4])
    .addGrid(dtree.maxDepth, [3, 5, 7])
    .build())

In [227]:
evaluator = BinaryClassificationEvaluator()

In [228]:
cv = CrossValidator(estimator=dtree, 
                    estimatorParamMaps=paramGrid, 
                    evaluator=evaluator, 
                    numFolds=4)
stages += [cv]

In [229]:
from pyspark.ml import Pipeline

In [230]:
# Creating the pipeline using the above steps
pipeline = Pipeline().setStages(stages)

In [None]:
pipeline_model = pipeline.fit(trainingData)
predictions = pipeline_model.transform(testData)

In [200]:
best_model = pipeline_model.stages[-1].bestModel

In [201]:
best_modelobj = best_model._java_obj.parent()

best_modeldepth = best_modelobj.getMaxDepth()
best_modelbins = best_modelobj.getMaxBins()
best_modelimpurity = best_modelobj.getImpurity()
best_modelgain = best_modelobj.getMinInfoGain()

In [202]:
print("Best model grid params are ")
print(best_modeldepth)
print(best_modelbins)
print(best_modelimpurity)
print(best_modelgain)

Best model grid params are 
7
10
gini
0.0


In [203]:
evaluator.evaluate(predictions)

0.4958957106271647

In [211]:
evaluator1 = BinaryClassificationEvaluator()
print("Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Area Under ROC: 0.4958957106271647


In [212]:
evaluator2 = BinaryClassificationEvaluator()
print("Area Under PR: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})))

Area Under PR: 0.21444079379852393
