### Importing all the useful libraries

In [1]:
!pip install pyspark
import pyspark



In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [17]:
from pyspark.sql.functions import mean,when
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import GBTClassifier
from pyspark.mllib.evaluation import MulticlassMetrics
import matplotlib.pyplot as plt
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.util import MLUtils
from pyspark.sql import SparkSession
from pyspark.ml.feature import Imputer,OneHotEncoder

In [4]:
spark = SparkSession.builder.getOrCreate()

In [5]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [6]:
spark

In [42]:
wschema = StructType([
StructField('Date',StringType(), True),
StructField('Location',StringType(), True),
StructField('MinTemp',FloatType(), True),
StructField('MaxTemp',FloatType(), True),
StructField('Rainfall',FloatType(), True),
StructField('Evaporation',FloatType(), True),
StructField('Sunshine',FloatType(), True),
StructField('WindGustDir',StringType(), True),
StructField('WindGustSpeed',IntegerType(), True),
StructField('WindDir9am',StringType(), True),
StructField('WindDir3pm',StringType(), True),
StructField('WindSpeed9am',IntegerType(), True),
StructField('WindSpeed3pm',IntegerType(), True),
StructField('Humidity9am',IntegerType(), True),
StructField('Humidity3pm',IntegerType(), True),
StructField('Pressure9am',FloatType(), True),
StructField('Pressure3pm',FloatType(), True),
StructField('Cloud9am',IntegerType(), True),
StructField('Cloud3pm',IntegerType(), True),
StructField('Temp9am',FloatType(), True),
StructField('Temp3pm',FloatType(), True),
StructField('RainToday',StringType(), True),
StructField('RainTomorrow',StringType(), True)])

Declaring schema for the file


In [83]:
df = spark.read.csv('weatherAUS.csv',schema=wschema, header = True)

In [84]:
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+----+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|Date|Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|
+----+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|   0|       0|   1485|   1261|    3261|      62790|   69835|          0|        10263|         0|         0|        1767|        3062|       2654|       4507|      15065|      15028|   55888|   59358|   1767|   3609|        0|           0|
+----+--------+-------+-------+-----

Dropping the columns having more null values 

In [85]:
df = df.drop('Evaporation','Sunshine','Cloud9am','Cloud3pm')

#### Used imputer to fill the null values

Updating the numeric column's null values with their respective mean 

In [87]:
imputer = Imputer(inputCols=["MinTemp", "MaxTemp","Rainfall","WindGustSpeed","WindSpeed9am","WindSpeed3pm",
                             "Humidity9am","Humidity3pm","Pressure9am","Pressure3pm","Temp9am","Temp3pm"],outputCols=["indMinTemp", "indMaxTemp","indRainfall","indWindGustSpeed","indWindSpeed9am","indWindSpeed3pm",
                             "indHumidity9am","indHumidity3pm","indPressure9am","indPressure3pm","indTemp9am","indTemp3pm"])
model = imputer.fit(df)

df=model.transform(df)

Dropping the columns with the null values after imputing

In [107]:
df=df.drop("MinTemp", "MaxTemp","Rainfall","Win2dGustSpeed","WindSpeed9am","WindSpeed3pm",
                             "Humidity9am","Humidity3pm","Pressure9am","Pressure3pm","Temp9am","Temp3pm")

Updating with null values of the string columns with relevant data

In [108]:
df = df.withColumn("WindGustDir",F.when(F.col("WindGustDir") == 'NA','N').otherwise(F.col("WindGustDir")))
df = df.withColumn("WindDir9am",F.when(F.col("WindDir9am") == 'NA','N').otherwise(F.col("WindDir9am")))
df = df.withColumn("WindDir3pm",F.when(F.col("WindDir3pm") == 'NA','N').otherwise(F.col("WindDir3pm")))
df = df.withColumn("RainToday",F.when(F.col("RainToday") == 'NA','No').otherwise(F.col("RainToday")))
df = df.withColumn("RainTomorrow",F.when(F.col("RainTomorrow") == 'NA','No').otherwise(F.col("RainTomorrow")))

Checking whether the data is having any null values

In [109]:
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+----+--------+-----------+----------+----------+---------+------------+----------+----------+-----------+----------------+---------------+---------------+--------------+--------------+--------------+--------------+----------+----------+
|Date|Location|WindGustDir|WindDir9am|WindDir3pm|RainToday|RainTomorrow|indMinTemp|indMaxTemp|indRainfall|indWindGustSpeed|indWindSpeed9am|indWindSpeed3pm|indHumidity9am|indHumidity3pm|indPressure9am|indPressure3pm|indTemp9am|indTemp3pm|
+----+--------+-----------+----------+----------+---------+------------+----------+----------+-----------+----------------+---------------+---------------+--------------+--------------+--------------+--------------+----------+----------+
|   0|       0|          0|         0|         0|        0|           0|         0|         0|          0|               0|              0|              0|             0|             0|             0|             0|         0|         0|
+----+--------+-----------+----------+----------

In [91]:
numeric_col = ['indMinTemp','indMaxTemp','indRainfall','indWindGustSpeed','indWindSpeed9am','indWindSpeed3pm','indHumidity9am','indHumidity3pm','indPressure9am','indHumidity3pm','indHumidity9am','indPressure3pm']

#### Creating a pipeline and using StringIndexer and Onehotencoder 

In [92]:
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import OneHotEncoder

 
categoricalColumns = ["WindGustDir","WindDir9am","WindDir3pm","RainToday","RainTomorrow"]
stages = [] # stages in Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]


In [93]:
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numeric_col
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

Decision Tree

In [94]:
dt = DecisionTreeClassifier(labelCol='RainTomorrowIndex' , featuresCol="features")

Parameters for decision tree

In [110]:
paramGrid = (ParamGridBuilder()
               .addGrid(dt.impurity, ["gini","entropy"])
             .addGrid(dt.maxDepth, [5])
             .addGrid(dt.maxBins, [5, 10, 15])
             .addGrid(dt.minInfoGain, [0.0, 0.2, 0.4])
             .build())

In [111]:
evaluator = BinaryClassificationEvaluator().setLabelCol("RainTomorrowIndex")

Crossvalidator with 4 folds

In [112]:
cv = CrossValidator(estimator = dt,
                      estimatorParamMaps = paramGrid,
                      evaluator = evaluator,
                      numFolds = 4)

Adding cv to the stages

In [113]:
stages += [cv]

#### Splitting the data into train and test data

In [114]:
(trainingData, testData) = df.randomSplit([0.8, 0.2], seed=12345)

#### Running the pipeline after assigning all the stages 

In [None]:
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(trainingData)
preppedDataDF = pipelineModel.transform(testData)

In [None]:
model=pipelineModel.stages[-1].bestModel

In [None]:
print(model)

In [None]:
preds = preppedDataDF.select('RainTomorrowIndex','probability').rdd.map(lambda row: (float(row['probability'][1]), float(row['RainTomorrowIndex'])))

In [None]:
metrics = BinaryClassificationMetrics(preds)


ROC and AUC

In [105]:
print("Area under PR = %s" % metrics.areaUnderPR)

Area under PR = 1.0


In [106]:
print("Area under ROC = %s" % metrics.areaUnderROC)

Area under ROC = 1.0
