In [188]:
from pyspark.sql import SparkSession

In [189]:
spark = SparkSession.builder.getOrCreate()

In [190]:
df = spark.read.options(header = "true", inferSchema = "true", nullValue = "NA").csv("weatherAUS.csv")

In [191]:
df = df.na.drop()

In [192]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- MinTemp: double (nullable = true)
 |-- MaxTemp: double (nullable = true)
 |-- Rainfall: double (nullable = true)
 |-- Evaporation: double (nullable = true)
 |-- Sunshine: double (nullable = true)
 |-- WindGustDir: string (nullable = true)
 |-- WindGustSpeed: double (nullable = true)
 |-- WindDir9am: string (nullable = true)
 |-- WindDir3pm: string (nullable = true)
 |-- WindSpeed9am: double (nullable = true)
 |-- WindSpeed3pm: double (nullable = true)
 |-- Humidity9am: double (nullable = true)
 |-- Humidity3pm: double (nullable = true)
 |-- Pressure9am: double (nullable = true)
 |-- Pressure3pm: double (nullable = true)
 |-- Cloud9am: double (nullable = true)
 |-- Cloud3pm: double (nullable = true)
 |-- Temp9am: double (nullable = true)
 |-- Temp3pm: double (nullable = true)
 |-- RainToday: string (nullable = true)
 |-- RainTomorrow: integer (nullable = true)



In [193]:
numeric_features = [field for (field, dataType) in train.dtypes if dataType != "string"]
categorical_features = [field for (field, dataType) in train.dtypes if dataType == "string"]

In [194]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier,RandomForestClassifier,GBTClassifier,LogisticRegression
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator,BinaryClassificationEvaluator
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.ml.stat import Correlation

In [195]:
train, test = df.randomSplit([.8, .2], seed=12345)

In [196]:
from pyspark.ml.feature import StringIndexer

indexOutputCols = [x + "Index" for x in categorical_features]

stringIndexer = StringIndexer(inputCols=categorical_features, outputCols=indexOutputCols, handleInvalid="skip")

In [197]:
from pyspark.ml.feature import VectorAssembler

assemblerInputs = indexOutputCols + numeric_features
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

In [198]:
dtc = DecisionTreeClassifier(labelCol="RainTomorrow")

In [199]:
from pyspark.ml import Pipeline

In [200]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder

paramGrid = (ParamGridBuilder()
            .addGrid(dtc.impurity, ['gini','entropy'])
            .addGrid(dtc.maxBins, [3400, 3415, 3720])
             .addGrid(dtc.minInfoGain, [0.0,0.2,0.4])
            .addGrid(dtc.maxDepth, [3,5,7])
            .build())

cv = CrossValidator(estimator=dtc, 
                    evaluator=MulticlassClassificationEvaluator(labelCol="RainTomorrow"),
                    estimatorParamMaps=paramGrid, 
                    numFolds=4, 
                    seed=42)

In [201]:
stages = [stringIndexer, vecAssembler, cv]
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(train)

In [202]:
pred = pipelineModel.transform(test)

In [203]:
pred.show()

+----------+-------------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+---------+-------------+----------------+---------------+---------------+--------------+--------------------+-----------------+-------------+----------+
|      Date|     Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|DateIndex|LocationIndex|WindGustDirIndex|WindDir9amIndex|WindDir3pmIndex|RainTodayIndex|            features|    rawPrediction|  probability|prediction|
+----------+-------------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+----------

In [204]:
#I have encoded the "RainTomorrow" data in jupter notebook using python where I put 1 for "No" and 2 for "Yes".
#I was not able to encode it in pyspark.S
pred.select('prediction').show()
#1-No
#2-Yes

+----------+
|prediction|
+----------+
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       2.0|
|       1.0|
|       1.0|
|       1.0|
|       2.0|
|       1.0|
|       1.0|
|       1.0|
|       2.0|
|       1.0|
+----------+
only showing top 20 rows



In [205]:
from sklearn.metrics import confusion_matrix

y_pred=pred.select("prediction").collect()
y_orig=pred.select("RainTomorrow").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)

Confusion Matrix:
[[  30    0    0]
 [   0 8758    0]
 [   0    0 2441]]
