In [5]:
from pyspark.sql.functions import mean
from pyspark.sql.functions import when
from pyspark.sql import functions as F
import pyspark.sql.functions as f
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import GBTClassifier
from pyspark.mllib.evaluation import MulticlassMetrics
import matplotlib.pyplot as plt
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.util import MLUtils

In [6]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

In [7]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [11]:
dataset = spark.read.csv('weatherAUS.csv',inferSchema =True, header = True)
dataset.show(5)

+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|      Date|Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|
+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|2008-12-01|  Albury|   13.4|   22.9|     0.6|         NA|      NA|          W|           44|         W|       WNW|          20|          24|         71|         22|     1007.7|     1007.1|       8|      NA|   16.9|   21.8|       No|          No|
|2008-12-02|

In [12]:
dataset  = dataset.drop("Evaporation", "Sunshine", 'Cloud9am', 'Cloud3pm')
dataset.columns

['Date',
 'Location',
 'MinTemp',
 'MaxTemp',
 'Rainfall',
 'WindGustDir',
 'WindGustSpeed',
 'WindDir9am',
 'WindDir3pm',
 'WindSpeed9am',
 'WindSpeed3pm',
 'Humidity9am',
 'Humidity3pm',
 'Pressure9am',
 'Pressure3pm',
 'Temp9am',
 'Temp3pm',
 'RainToday',
 'RainTomorrow']

In [13]:
dataset2 = dataset.filter(dataset.RainTomorrow != "NA" )  
(dataset2
.select("RainTomorrow")
.groupBy("RainTomorrow")
.count()
.orderBy("count", ascending=False)
.show(n=90, truncate=False))

+------------+------+
|RainTomorrow|count |
+------------+------+
|No          |110316|
|Yes         |31877 |
+------------+------+



In [17]:
from pyspark.sql.functions import avg

def mean_of_pyspark_columns(df, numeric_cols, verbose=False):
    col_with_mean=[]
    for col in numeric_cols:
        mean_value = df.select(avg(df[col]))
        avg_col = mean_value.columns[0]
        res = mean_value.rdd.map(lambda row : row[avg_col]).collect()
        
        if (verbose==True): print(mean_value.columns[0], "\t", res[0])
        col_with_mean.append([col, res[0]])    
    return col_with_mean

In [22]:
from pyspark.sql.functions import when, lit

def fill_missing_with_mean(df, numeric_cols):
    col_with_mean = mean_of_pyspark_columns(df, numeric_cols) 
    
    for col, mean in col_with_mean:
        df = df.withColumn(col, when(df[col].isNull()==True, 
        lit(mean)).otherwise(df[col]))
        
    return df

In [23]:
numeric_cols = ['MinTemp','MaxTemp', 'Rainfall', 'WindGustSpeed', 'WindSpeed9am', 'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Pressure9am', 'Pressure3pm', 'Temp9am', 'Temp3pm']
dataset2 = fill_missing_with_mean(dataset2, numeric_cols)
cols = dataset.columns


In [24]:
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler

from distutils.version import LooseVersion

categoricalColumns = ['Date',  'Location',  'WindGustDir', 'WindDir9am', 'WindDir3pm', 'RainToday']
stages = [] # stages in Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    if LooseVersion(pyspark.__version__) < LooseVersion("3.0"):
        from pyspark.ml.feature import OneHotEncoderEstimator
        encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    else:
        from pyspark.ml.feature import OneHotEncoder
        encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

In [25]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="RainTomorrow", outputCol="label")
stages += [label_stringIdx]

In [26]:
numericCols = ['MinTemp','MaxTemp', 'Rainfall', 'WindGustSpeed', 'WindSpeed9am', 'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Pressure9am', 'Pressure3pm', 'Temp9am', 'Temp3pm']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [None]:
from pyspark.ml.classification import LogisticRegression
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(dataset2)
preppedDataDF = pipelineModel.transform(dataset2)

In [None]:
# Fit model to prepped data
lrModel = LogisticRegression().fit(preppedDataDF)

# ROC for training data 
display(lrModel, preppedDataDF, "ROC")

In [None]:
# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset2 = preppedDataDF.select(selectedcols)
dataset2.show(5)

In [30]:
(trainingData, testData) = dataset2.randomSplit([0.8, 0.2], seed=12345)
print(trainingData.count())
print(testData.count())

113672
28521


In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

# Train model with Training Data
dtModel = dt.fit(trainingData)


In [None]:
# Make predictions on test data using the Transformer.transform() method.
predictions = dtModel.transform(testData)
predictions.printSchema()

In [None]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "Location", "RainToday")
selected.show(5)

In [33]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxBins, [5, 10, 15])
             .addGrid(dt.minInfoGain, [0.0, 0.2, 0.4])
             .addGrid(dt.maxDepth, [3, 5, 7])
             .build())

In [34]:
dt.getImpurity()

'gini'

In [None]:
dt_predictions = dtModel.transform(testData)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

multi_evaluator_ac = MulticlassClassificationEvaluator(labelCol = 'label', metricName = 'accuracy')
multi_evaluator_f1 = MulticlassClassificationEvaluator(labelCol = 'label', metricName = 'f1')

In [None]:
print('Decision Tree Accuracy (gini):', multi_evaluator_ac.evaluate(dt_predictions))
print('Decision Tree F1 (gini):', multi_evaluator_f1.evaluate(dt_predictions))

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

In [None]:
# Create 3-fold CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)

# Run cross validations
cvModel = cv.fit(trainingData)
# Takes ~5 minutes

In [None]:
# Use test set to measure the accuracy of the model on new data
predictions = cvModel.transform(testData)

In [None]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [None]:
# View Best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "Location", "RainToday")
selected.show(5)

In [None]:
# Create initial Decision Tree Model
dt1 = DecisionTreeClassifier(labelCol="label", featuresCol="features", impurity= 'entropy', maxDepth=3)

# Train model with Training Data
dtModel1 = dt1.fit(trainingData)

In [None]:
# Make predictions on test data using the Transformer.transform() method.
predictions1 = dtModel1.transform(testData)

In [None]:
dt1.getImpurity()

In [None]:
# View model's predictions and probabilities of each prediction class
selected1 = predictions1.select("label", "prediction", "probability", "Location", "RainToday")
selected1.show(5)

In [None]:
# Evaluate Decisioin Tree with Binary Classification
# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

In [None]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid1 = (ParamGridBuilder()
             .addGrid(dt1.impurity, "entropy")
             .addGrid(dt1.maxBins, [5, 10, 15])
             .addGrid(dt1.minInfoGain, [0.0, 0.2, 0.4])
             .addGrid(dt1.maxDepth, [3, 5, 7])
             .build())

In [None]:
# Create 3-fold CrossValidator
cv1 = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid1, evaluator=evaluator, numFolds=3)

# Run cross validations
cvModel1 = cv1.fit(trainingData)

In [None]:
# Use test set to measure the accuracy of the model on new data
predictions1 = cvModel1.transform(testData)
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions1)

In [None]:
# View Best model's predictions and probabilities of each prediction class
selected1 = predictions1.select("label", "prediction", "probability", "Location", "RainToday")
selected1.show(5)

In [None]:
dt_predictions1 = dtModel1.transform(testData)
print('Decision Tree Accuracy (entropy):', multi_evaluator_ac.evaluate(dt_predictions1))
print('Decision Tree F1 (entropy):', multi_evaluator_f1.evaluate(dt_predictions1))