In [None]:
import pyspark

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()


In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
cschema = StructType([
StructField('Date',StringType(), True),
StructField('Location',StringType(), True),
StructField('MinTemp',FloatType(), True),
StructField('MaxTemp',FloatType(), True),
StructField('Rainfall',FloatType(), True),
StructField('Evaporation',FloatType(), True),
StructField('Sunshine',FloatType(), True),
StructField('WindGustDir',StringType(), True),
StructField('WindGustSpeed',IntegerType(), True),
StructField('WindDir9am',StringType(), True),
StructField('WindDir3pm',StringType(), True),
StructField('WindSpeed9am',IntegerType(), True),
StructField('WindSpeed3pm',IntegerType(), True),
StructField('Humidity9am',IntegerType(), True),
StructField('Humidity3pm',IntegerType(), True),
StructField('Pressure9am',FloatType(), True),
StructField('Pressure3pm',FloatType(), True),
StructField('Cloud9am',IntegerType(), True),
StructField('Cloud3pm',IntegerType(), True),
StructField('Temp9am',FloatType(), True),
StructField('Temp3pm',FloatType(), True),
StructField('RainToday',StringType(), True),
StructField('RainTomorrow',StringType(), True)])

In [None]:
df = spark.read.csv('weatherAUS.csv', header=True, schema=cschema)

In [None]:
df.show(5)

+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|      Date|Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|
+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|2008-12-01|  Albury|   13.4|   22.9|     0.6|       null|    null|          W|           44|         W|       WNW|          20|          24|         71|         22|     1007.7|     1007.1|       8|    null|   16.9|   21.8|       No|          No|
|2008-12-02|

In [None]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- MinTemp: float (nullable = true)
 |-- MaxTemp: float (nullable = true)
 |-- Rainfall: float (nullable = true)
 |-- Evaporation: float (nullable = true)
 |-- Sunshine: float (nullable = true)
 |-- WindGustDir: string (nullable = true)
 |-- WindGustSpeed: integer (nullable = true)
 |-- WindDir9am: string (nullable = true)
 |-- WindDir3pm: string (nullable = true)
 |-- WindSpeed9am: integer (nullable = true)
 |-- WindSpeed3pm: integer (nullable = true)
 |-- Humidity9am: integer (nullable = true)
 |-- Humidity3pm: integer (nullable = true)
 |-- Pressure9am: float (nullable = true)
 |-- Pressure3pm: float (nullable = true)
 |-- Cloud9am: integer (nullable = true)
 |-- Cloud3pm: integer (nullable = true)
 |-- Temp9am: float (nullable = true)
 |-- Temp3pm: float (nullable = true)
 |-- RainToday: string (nullable = true)
 |-- RainTomorrow: string (nullable = true)



In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator

In [None]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils


In [None]:
(trainingData, testData) =df.randomSplit([0.8, 0.2],12345)

In [None]:
#https://www.datasciencemadesimple.com/count-of-missing-nanna-and-null-values-in-pyspark/
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

+----+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|Date|Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|
+----+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|   0|       0|      0|      0|       0|          0|       0|          0|            0|         0|         0|           0|           0|          0|          0|          0|          0|       0|       0|      0|      0|        0|           0|
+----+--------+-------+-------+-----

In [None]:
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+----+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|Date|Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|
+----+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|   0|       0|   1485|   1261|    3261|      62790|   69835|          0|        10263|         0|         0|        1767|        3062|       2654|       4507|      15065|      15028|   55888|   59358|   1767|   3609|        0|           0|
+----+--------+-------+-------+-----

Dropping the columns Sunshine and Evaporation as they have high number of null values

In [None]:
df  = df.drop("Evaporation", "Sunshine","Cloud9am","Cloud3pm")

In [None]:
from pyspark.ml.feature import Imputer,OneHotEncoder

In [None]:
imputer = Imputer(inputCols=["MinTemp", "MaxTemp","Rainfall","WindGustSpeed","WindSpeed9am","WindSpeed3pm",
                             "Humidity9am","Humidity3pm","Pressure9am","Pressure3pm","Temp9am","Temp3pm"],outputCols=["OutMinTemp", "OutMaxTemp","OutRainfall","OutWindGustSpeed","OutWindSpeed9am","OutWindSpeed3pm",
                             "Humidity9am","OutHumidity3pm","OutPressure9am","OutPressure3pm","OutTemp9am","OutTemp3pm"])
model = imputer.fit(df)

df=model.transform(df)

In [None]:
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+----+--------+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+-------+-------+---------+------------+----------+----------+-----------+----------------+---------------+---------------+--------------+--------------+--------------+----------+----------+
|Date|Location|MinTemp|MaxTemp|Rainfall|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|OutMinTemp|OutMaxTemp|OutRainfall|OutWindGustSpeed|OutWindSpeed9am|OutWindSpeed3pm|OutHumidity3pm|OutPressure9am|OutPressure3pm|OutTemp9am|OutTemp3pm|
+----+--------+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+-------+-------+---------+------------+----------+----------+-----------+----------------+---------------+---------------+------------

In [None]:
stage1 = StringIndexer(inputCol="Date", outputCol="indexeddate").fit(df)
stage2 = StringIndexer(inputCol="Location",outputCol="indexedlocation").fit(df)
stage3 = StringIndexer(inputCol="WindGustDir",outputCol="indexedWindGustDir").fit(df)
stage4 = StringIndexer(inputCol="WindDir9am",outputCol="indexedWindDir9am").fit(df)
stage5 = StringIndexer(inputCol="WindDir3pm",outputCol="indexedWindDir3pm").fit(df)
stage6 = StringIndexer(inputCol="RainToday",outputCol="indexedRainToday").fit(df)

df = stage1.transform(df)
df = stage2.transform(df)
df = stage3.transform(df)
df = stage4.transform(df)
df = stage5.transform(df)
df = stage6.transform(df)

In [None]:
import pyspark.sql.functions as F
import pyspark.sql.types as T


In [None]:
df = df.withColumn("WindGustDir",F.when(F.col("WindGustDir") == 'NA','W').otherwise(F.col("WindGustDir")))
df = df.withColumn("WindDir9am",F.when(F.col("WindDir9am") == 'NA','N').otherwise(F.col("WindDir9am")))
df = df.withColumn("WindDir3pm",F.when(F.col("WindDir3pm") == 'NA','SE').otherwise(F.col("WindDir3pm")))
df = df.withColumn("RainToday",F.when(F.col("RainToday") == 'NA','No').otherwise(F.col("RainToday")))
df = df.withColumn("RainTomorrow",F.when(F.col("RainTomorrow") == 'NA','No').otherwise(F.col("RainTomorrow")))

In [None]:
label_index = StringIndexer(inputCol="RainTomorrow",outputCol="label")
df = label_index.fit(df).transform(df)

In [None]:
OHE = OneHotEncoder(inputCols=["indexeddate","indexedlocation","indexedWindGustDir","indexedWindDir9am","indexedWindDir3pm","indexedRainToday"],
                         outputCols=["encodeddate","encodedlocation","encodedwindgust","encodedWindDir9am","encodedWindDir3pm","encodedRainToday"])



In [None]:
df = OHE.fit(df).transform(df)

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
df.show(5)

+----------+--------+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+-------+-------+---------+------------+----------+----------+-----------+----------------+---------------+---------------+--------------+--------------+--------------+----------+----------+-----------+---------------+------------------+-----------------+-----------------+----------------+-----+-------------------+---------------+---------------+-----------------+-----------------+----------------+
|      Date|Location|MinTemp|MaxTemp|Rainfall|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|OutMinTemp|OutMaxTemp|OutRainfall|OutWindGustSpeed|OutWindSpeed9am|OutWindSpeed3pm|OutHumidity3pm|OutPressure9am|OutPressure3pm|OutTemp9am|OutTemp3pm|indexeddate|indexedlocation|indexedWindGustDir|indexedWindDir9am|indexedWi

In [None]:
assembler = VectorAssembler(inputCols = ['OutMinTemp',
 'OutMaxTemp',
 'OutRainfall',
 'OutWindGustSpeed',
 'OutWindSpeed9am',
 'OutWindSpeed3pm',
 'OutHumidity3pm',
 'OutPressure9am',
 'OutPressure3pm',
 'OutTemp9am',
 'OutTemp3pm',"encodeddate","encodedlocation","encodedwindgust","encodedWindDir9am","encodedWindDir3pm","encodedRainToday"], outputCol = "features")

In [None]:
df = assembler.transform(df)

In [None]:
df.show(5)

+----------+--------+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+-------+-------+---------+------------+----------+----------+-----------+----------------+---------------+---------------+--------------+--------------+--------------+----------+----------+-----------+---------------+------------------+-----------------+-----------------+----------------+-----+-------------------+---------------+---------------+-----------------+-----------------+----------------+--------------------+
|      Date|Location|MinTemp|MaxTemp|Rainfall|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|OutMinTemp|OutMaxTemp|OutRainfall|OutWindGustSpeed|OutWindSpeed9am|OutWindSpeed3pm|OutHumidity3pm|OutPressure9am|OutPressure3pm|OutTemp9am|OutTemp3pm|indexeddate|indexedlocation|indexedWindGustDir|indexe

In [None]:
(trainingData, testData) =df.randomSplit([0.8, 0.2],12345)

In [85]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features",maxDepth=3)

# Train model with Training Data
dtModel = dt.fit(trainingData)

In [86]:
predictions = dtModel.transform(testData)


In [87]:
selected = predictions.select("label", "prediction", "probability", "Location", "RainToday")
selected.show(5)

+-----+----------+--------------------+--------+---------+
|label|prediction|         probability|Location|RainToday|
+-----+----------+--------------------+--------+---------+
|  0.0|       0.0|[0.85673122440158...|Canberra|       No|
|  0.0|       0.0|[0.85673122440158...|Canberra|       No|
|  0.0|       0.0|[0.85673122440158...|Canberra|       No|
|  1.0|       0.0|[0.85673122440158...|Canberra|       No|
|  1.0|       0.0|[0.61169987282746...|Canberra|       No|
+-----+----------+--------------------+--------+---------+
only showing top 5 rows



In [None]:
dtparamGrid = (ParamGridBuilder()
               .addGrid(dt.impurity, ["gini","entropy"])
             .addGrid(dt.maxDepth, [5])
             .addGrid(dt.maxBins, [5, 10, 15])
             .addGrid(dt.minInfoGain, [0.0, 0.2, 0.4])
             .build())

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [90]:
dt = DecisionTreeClassifier(labelCol='label' , featuresCol="features")

In [91]:
dtevaluator = BinaryClassificationEvaluator().setLabelCol("label")

In [92]:
cv = CrossValidator(estimator = dt,
                      estimatorParamMaps = dtparamGrid,
                      evaluator = dtevaluator,
                      numFolds = 4)

In [None]:
stages=[cv]

In [None]:
pipeline = Pipeline(stages=[OHE,cv])

In [None]:
pipelinemodel = pipeline.fit(trainingData)

IllegalArgumentException: ignored

In [None]:
print('Decision Tree Accuracy (gini):', gini_ac.evaluate(dt_predictions))
print('Decision Tree F1 (gini):', gini_f1.evaluate(dt_predictions))

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator()
evaluator.evaluate(predictions)


In [None]:
#from pyspark.ml.evaluation import BinaryClassificationEvaluator
#evaluator = BinaryClassificationEvaluator()


In [None]:
#predictions = cvModel.transform(testData)
#evaluator.evaluate(predictions)


In [None]:
#predictionAndLabels = testData.map(lambda lp: (float(cvModel.predict(lp.features)), lp.label))

In [None]:
cv1 = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid1, evaluator=evaluator, numFolds=4)

# Run cross validations
cvModel1 = cv1.fit(trainingData)
predictions1 = cvModel1.transform(testData)
evaluator.evaluate(predictions1)

In [None]:
pipeline = Pipeline(stages=[label_index, assembler, dt,dt1,cv])
model1 = pipeline.fit(trainingData)

In [None]:
out_df = model.transform(testData)