
Date: 18/10/2019

Version: 1.0

Environment: Python 3.7.0 (64-bit)

Libraries used:

* [Matplotlib Official Documentation)](https://matplotlib.org/3.1.1/api/pyplot_summary.html)
        pip install matplotlib
        
 
* [OS Official Documentation)](https://docs.python.org/3/library/os.html)
        pip install os



# Rain in Australia: Predict rain tomorrow in Australia

## Introduction
This assignment comprises of two parts. In this part, you will analyze Predicting rain or weather is a common problem in machine learning. Different machine earning algorithms can be used to model and predict rainfall. In this assignment, we ask you to complete the analysis to predict whether there will be rain tomorrow or not. In particular, you are required to apply the tools of machine learning to visualize and predict the possibility of rainfall in Australia.

Following is the given dataset:
* [Rain in Australia](https://www.kaggle.com/jsphyg/weather-dataset-rattle-package)

Following are the steps to be performed for Part A:
1. [__Step 01__](#Step_01): In this step We will use and import **`SparkContext`** from **`pyspark`**, which is the main entry point for Spark Core functionality. The **`SparkSession`** object provides methods used to create DataFrames from various input sources.
2. [__Step 02__](#Step_02): In this step I need to create a dataframe and give the source of input dataset.
3. [__Step 03__](#Step_03): In this step I need to drop few columns which are not essential in the dataset.
4. [__Step 04__](#Step_04): In this step I need to print number of missing values in the dataset. 
5. [__Step 05__](#Step_05): In this step I have to fill the missing values with average value for numeric columns & maximum occurence for categorical columns.
6. [__Step 06__](#Step_06): In this step I need to perform Data transformation i.e., changing the datatype from string to double for numeric columns & using String Indexer method to convert into numbers for categorical columns.
7. [__Step 07__](#Step_07): In this step I need to create a feature vector & randomly splitting the data
8. [__Step 08__](#Step_08): In this step I had applied different machine learning classification algorithms on the dataset & comparing their accuracies.
8. [__Step 09__](#Step_09): In this step I had calculated Confusion Matrix, Recall, Precision & F1 Score for different machine learning classification algorithms.

More details for each steps will be given in the following sections.

# A. Creating Spark Session and Loading the Data

## Step 01: Import  Spark Session and initialize Spark <a id='Step_01' ></a>

### SparkContext and SparkSession
Apache Spark community released a powerful Python package, **`pyspark`**. Using **`pyspark`**, we can  initialise Spark, create RDD  from the data, sort, filter and sample the data. Especially, we will use and import **`SparkContext`** from **`pyspark`**, which is the main entry point for Spark Core functionality. The **`SparkSession`** object provides methods used to create DataFrames from various input sources.  

Spark applications run as independent sets of processes on a cluster, which is specified by the **`SparkContext`** object. **`SparkContext`** can connect to several types of cluster managers (local (standalone), Mesos or YARN), which allocate resources across applications. Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application. Next, it sends your application code (passed to `SparkContext`) to the executors. Finally, **`SparkContext`** sends tasks to the executors to run.

Write the code to create a sparkSession object, with 4 local cores. To create a sparkSession with 4 core you have to use configure it as `local[4]`. Given a name to your program using `setAppName()` as `Assignment 2_Rainfall`.

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0 pyspark-shell'
# create entry points to spark
from pyspark import SparkContext,SparkConf # Spark
from pyspark.sql import SparkSession # Spark SQL
# local[4]: run Spark locally with 4 working processors as logical cores on your machine.
# The `appName` field iis set as `Assignment2_Rainfall`. 
conf = SparkConf().setAppName("Assignment 2_Rainfall").setMaster("local[4]")
sc = SparkContext(conf=conf)
spark = SparkSession(sparkContext=sc)\
        .builder\
        .appName("MongoDB and Apache Spark")\
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0")\
        .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/fit5202_db.wk04_coll")\
        .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/fit5202_db.wk04_coll")\
        .getOrCreate()
sc

## Step 02: Load the dataset and print the schema and total number of entries<a id='Step_02' ></a>

Firstly, I had read the csv file using `spark.read.csv` and stored it in a dataframe. Then printed the number of entries in the given dataframe. 

In [2]:
# Reading data from input csv file & displaying number of entries in the dataframe
weather_df = spark.read.csv("weatherAUS.csv", inferSchema=True, header=True)
print("Number of entries in the given dataframe:",weather_df.count())

Number of entries in the given dataframe: 142193


After obtaining the total number of entries, I had viewed the datatype of each column by using printschema & also seen the few contents in the given dataframe.

In [3]:
weather_df.printSchema() # Printed schema for the given dataframe

root
 |-- Date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- MinTemp: string (nullable = true)
 |-- MaxTemp: string (nullable = true)
 |-- Rainfall: string (nullable = true)
 |-- Evaporation: string (nullable = true)
 |-- Sunshine: string (nullable = true)
 |-- WindGustDir: string (nullable = true)
 |-- WindGustSpeed: string (nullable = true)
 |-- WindDir9am: string (nullable = true)
 |-- WindDir3pm: string (nullable = true)
 |-- WindSpeed9am: string (nullable = true)
 |-- WindSpeed3pm: string (nullable = true)
 |-- Humidity9am: string (nullable = true)
 |-- Humidity3pm: string (nullable = true)
 |-- Pressure9am: string (nullable = true)
 |-- Pressure3pm: string (nullable = true)
 |-- Cloud9am: string (nullable = true)
 |-- Cloud3pm: string (nullable = true)
 |-- Temp9am: string (nullable = true)
 |-- Temp3pm: string (nullable = true)
 |-- RainToday: string (nullable = true)
 |-- RainTomorrow: string (nullable = true)



In [4]:
weather_df.show(5) # Printing the contents of weather dataframe

+-------------------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|               Date|Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|
+-------------------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|2008-12-01 00:00:00|  Albury|   13.4|   22.9|     0.6|         NA|      NA|          W|           44|         W|       WNW|          20|          24|         71|         22|     1007.7|     1007.1|       8|      NA|   16.9|   21.8|

# B. Data Cleaning and Processing
Data cleaning and processing is an important aspect for any machine learning task. We have to carefully look into the data and based on the types, quality of the data, we have to plan our cleaning procedures.

## Step 03: Delete columns from the dataset<a id='Step_03' ></a>

During the data cleaning and processing phase, we delete unnecessary data from
the dataset to improve the efficiency and accuracy of our model. You have to think
which columns are not contributing to the rain prediction. To keep things simple, you are
required to delete the following columns due to data quality and accuracy.

● Date
● Location
● Evaporation
● Sunshine
● Cloud9am
● Cloud3pm
● Temp9am
● Temp3pm

In [5]:
# Using `drop` command, I had dropped few columns from a dataframe.
weather_df = weather_df.drop('Date','Location','Evaporation','Sunshine','Cloud9am','Cloud3pm','Temp9am','Temp3pm')

In [6]:
# Viewing the dataframe after dropping the above columns
weather_df.show(5)

+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+---------+------------+
|MinTemp|MaxTemp|Rainfall|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|RainToday|RainTomorrow|
+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+---------+------------+
|   13.4|   22.9|     0.6|          W|           44|         W|       WNW|          20|          24|         71|         22|     1007.7|     1007.1|       No|          No|
|    7.4|   25.1|       0|        WNW|           44|       NNW|       WSW|           4|          22|         44|         25|     1010.6|     1007.8|       No|          No|
|   12.9|   25.7|       0|        WSW|           46|         W|       WSW|          19|          26|         38|         30|     1007.6|    

In [7]:
weather_df.printSchema()

root
 |-- MinTemp: string (nullable = true)
 |-- MaxTemp: string (nullable = true)
 |-- Rainfall: string (nullable = true)
 |-- WindGustDir: string (nullable = true)
 |-- WindGustSpeed: string (nullable = true)
 |-- WindDir9am: string (nullable = true)
 |-- WindDir3pm: string (nullable = true)
 |-- WindSpeed9am: string (nullable = true)
 |-- WindSpeed3pm: string (nullable = true)
 |-- Humidity9am: string (nullable = true)
 |-- Humidity3pm: string (nullable = true)
 |-- Pressure9am: string (nullable = true)
 |-- Pressure3pm: string (nullable = true)
 |-- RainToday: string (nullable = true)
 |-- RainTomorrow: string (nullable = true)



## Step 04: Print the number of missing data in each column<a id='Step_04' ></a>

Using a for loop I had calculated the count of NA(null) values in each column and then printed the number of NA(null) values in each column.

In [8]:
# Counting the number of NA(null) values in each column
for column_name in weather_df.columns:
    print("Number of NA(null) values in",column_name,":",weather_df.filter(weather_df[column_name] == "NA").count())

Number of NA(null) values in MinTemp : 637
Number of NA(null) values in MaxTemp : 322
Number of NA(null) values in Rainfall : 1406
Number of NA(null) values in WindGustDir : 9330
Number of NA(null) values in WindGustSpeed : 9270
Number of NA(null) values in WindDir9am : 10013
Number of NA(null) values in WindDir3pm : 3778
Number of NA(null) values in WindSpeed9am : 1348
Number of NA(null) values in WindSpeed3pm : 2630
Number of NA(null) values in Humidity9am : 1774
Number of NA(null) values in Humidity3pm : 3610
Number of NA(null) values in Pressure9am : 14014
Number of NA(null) values in Pressure3pm : 13981
Number of NA(null) values in RainToday : 1406
Number of NA(null) values in RainTomorrow : 0


## Step 05: Fill the missing data with average value and maximum occurrence value<a id='Step_05' ></a>

- In this step you have to fill in all the missing data with average value (for numeric column) or maximum frequency value (for non-numeric column).
- Firstly, identify the columns which have numeric values (e.g., MinTemp, MaxTemp), calculate the average and fill the null value with the average.
- Secondly, identify the columns with non-numeric values (e.g., WindGustDir, WindDir9am) and find the most frequent item (e.g., wind direction). Now fill the null values with that item for that particular column.

Using a for loop for reading all columns, and with the help of aggregate function calculated the `average` value for each column and store it in Average attribute. Average values are calculated for numerical columns.

In [None]:
# Calculating Mean for numerical columns
Average = {}
for column in weather_df.columns:
    value = weather_df.agg({column:'avg'}).collect()[0][0]
    if value != None:
        Average[column] = value
    else:
        Average[column] = weather_df.agg({column:'max'}).collect()[0][0]
Average

Using groupBy & aggregate function, I had counted the maximum occurence of string & using Orderby I had calculated the count for Categorical columns.

In [None]:
# Imputing NA value with Max occurence value for categorical columns
Average['WindGustDir'] = weather_df.groupBy('WindGustDir').agg({'WindGustDir':'count'}).orderBy('count(WindGustDir)').collect()[16][0]
Average['WindDir9am'] = weather_df.groupBy('WindDir9am').agg({'WindDir9am':'count'}).orderBy('count(WindDir9am)').collect()[16][0]
Average['WindDir3pm'] = weather_df.groupBy('WindDir3pm').agg({'WindDir3pm':'count'}).orderBy('count(WindDir3pm)').collect()[16][0]
Average['RainToday'] = weather_df.groupBy('RainToday').agg({'RainToday':'count'}).orderBy('count(RainToday)').collect()[2][0]
Average['RainTomorrow'] = weather_df.groupBy('RainTomorrow').agg({'RainTomorrow':'count'}).orderBy('count(RainTomorrow)').collect()[1][0]

In [None]:
Average

In [None]:
# Imputing the NA values of numerical columns with its Average values
from pyspark.sql.functions import when
for column in weather_df.columns:
    weather_df = weather_df.withColumn(column, when(weather_df[column] == "NA", Average[column]).otherwise(weather_df[column]))

In [None]:
# Viewing the dataframe after imputation
weather_df.show()

## Step 06: Data transformation<a id='Step_06' ></a>

 - Before transforming your non-numerical data, do the type casting (to double) of the numerical value columns as they are defined as “String” (see, the schema of the dataset). 
 - For the non-numerical value column (i.e., WindGustDir,
WindDir9am, WindDir3pm, RainTomorrow) use the StringIndexer method to convert them into numbers.

In [None]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col

In [None]:
# Filtering only the numeric columns in a separate list
numeric=["MinTemp","MaxTemp","Rainfall","WindGustSpeed","WindSpeed9am","WindSpeed3pm","Humidity9am",
                  "Humidity3pm","Pressure9am","Pressure3pm"]

In [None]:
# Type casting numerical columns from string to double type
for column in numeric:
    weather_df=weather_df.withColumn(column,weather_df[column].cast(DoubleType()))

In [None]:
# Viewing the datatype of columns after typecasting with the usage of `printSchema`
weather_df.printSchema()

In [None]:
# Importing String Indexer for changing the datatype of categorical column
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [None]:
# Filtering only the categorical columns in a separate list
categorical=['WindGustDir','WindDir9am','WindDir3pm','RainToday','RainTomorrow']

A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.

In [None]:
# With Stringindexer casting the datatype of categorical column value with Pipelining
l_indexer = [StringIndexer(inputCol=column, outputCol=column+"labelIndex").fit(weather_df) for column in categorical]
# Convert label from string to index
pipeline = Pipeline(stages=l_indexer)
# Fit the pipeline to training documents.
pipelineModel = pipeline.fit(weather_df)
# Creating a model to transform the given dataframe to new one
weather_df2 = pipelineModel.transform(weather_df)

In [None]:
# Using `drop` command, I had dropped original columns from a dataframe.
weather_df2 = weather_df2.drop('WindGustDir','WindDir9am','WindDir3pm','RainToday','RainTomorrow')

In [None]:
weather_df2.show(5)

## Step 07: Create the feature vector and divide the dataset<a id='Step_0' ></a>

 - Create the feature vector from the given columns.
 - When you create you feature vector, remember to exclude the column that you will be using for testing the accuracy of your model.
 - After creating a feature vector, split the dataset into two (e.g., training and testing) randomly and between 70 percent and 30 percent.

In [None]:
weather_df2.printSchema()

In [None]:
# Creating a separate list of columns for creating feature vector for the datatype double`
vectorlist = [column[0] for column in weather_df2.dtypes if column[1] == "double"]
vectorlist

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
# Creating a feature vector using VectorAssembler
vector_assembler = VectorAssembler(inputCols=vectorlist[:-1], outputCol="features")
feature_vect = vector_assembler.transform(weather_df2)
feature_vect.select('features').show(3)

In [None]:
# Split the data into train (70%) and test (30%)s
(trainingData,testData) = feature_vect.randomSplit([0.7,0.3])

# C. Apply Machine Learning Algorithms

## Step 08: Apply machine learning classification algorithms on the dataset and compare their accuracy. Plot the accuracy as bar graph.<a id='Step_08' ></a>

 - Use `DecisionTreeClassifier()`, `RandomForestClassifier()`,`LogisticRegression()` & `GBTClassifier()` methods in spark to calculate the probability of the rain fall tomorrow based on the other related data points (e.g., temperature, wind, humidity).
 - Draw the graph (e.g. bar chart) to demonstrate the comparison of their accuracy.

For the prediction of probability of rain fall tomorrow I had used four different machine learning algorithms to compare the accuracies in each algorithm.

1. __`Decision Tree`__ : Decision Trees are a non-parametric supervised learning method used for both classification and regression tasks. The goal is to create a model that predicts the value of a target variable by learning simple decision rules inferred from the data features. This is achieved in pyspark using function `DecisionTreeClassifier`.
2. __`Random Forest`__ : Random forests or random decision forests are an ensemble learning method for classification, regression and other tasks that operates by constructing a multitude of decision trees at training time and outputting the class that is the mode of the classes (classification) or mean prediction (regression) of the individual trees. Random decision forests correct for decision trees' habit of overfitting to their training set.This is achieved in pyspark using function `RandomForestClassifier`.
3. __`LogisticRegression`__ : Logistic regression is another technique borrowed by machine learning from the field of statistics. It is the go-to method for binary classification problems (problems with two class values).This is achieved in pyspark using function `LogisticRegression`. 
4. __`GBTClassifier`__ : Gradient-Boosted Trees (GBTs) are ensembles of decision trees. GBTs iteratively train decision trees in order to minimize a loss function. The spark.ml implementation supports GBTs for binary classification and for regression, using both continuous and categorical features. This is achieved in pyspark using function `GBTClassifier`. 

In [None]:
# Importing functions of different Machine Learning algorithms
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
# Creating a dictionary to store predicted values of each algorithm
prediction_dict = {}

### 1. Decision Tree Algorithm

In [None]:
# Apply DecisionTreeClassifier to classify them.
decision_tree = DecisionTreeClassifier(labelCol="RainTomorrowlabelIndex", featuresCol="features")
# Fitting the above model on training data
model = decision_tree.fit(trainingData)
# Predicting the model on test data
predictions = model.transform(testData)
# Viewing the top 5 contents of predicted values on raintomorrow column
predictions.select("prediction", "RainTomorrowlabelIndex").show(5)
# Evaluating the prediction column using MulticalssClassification evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="RainTomorrowlabelIndex", 
                                              predictionCol="prediction",
                                              metricName="accuracy")

accuracy_decisiontree = evaluator.evaluate(predictions)
# Printing Error & Accuracy for Decision tree algorithm
print("Test Error = %g " % (1.0 - accuracy_decisiontree))
print("Test accuracy = %g " % (accuracy_decisiontree*100),"%")
# Storing the predicted value of Decision tree to dictionary
prediction_dict['DecisionTreeClassifier'] = predictions

## 2. RandomForest Algorithm

In [None]:
# Apply RandomForestClassifier to classify them.
random_forest = RandomForestClassifier(labelCol="RainTomorrowlabelIndex", featuresCol="features")
# Fitting the above model on training data
model = random_forest.fit(trainingData)
# Predicting the model on test data
predictions = model.transform(testData)
# Viewing the top 5 contents of predicted values on raintomorrow column
predictions.select("prediction", "RainTomorrowlabelIndex").show(5)
# Evaluating the prediction column using MulticalssClassification evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="RainTomorrowlabelIndex", 
                                              predictionCol="prediction",
                                              metricName="accuracy")

accuracy_randomforest = evaluator.evaluate(predictions)
# Printing Error & Accuracy for Random Forest algorithm
print("Test Error = %g " % (1.0 - accuracy_randomforest))
print("Test accuracy = %g " % (accuracy_randomforest*100),"%")
# Storing the predicted value of Random Forest to dictionary
prediction_dict['RandomForestClassifier'] = predictions

##  3. Logistic Regression

In [None]:
# Apply LogisticRegression to classify them.
logistic_regression = LogisticRegression(labelCol="RainTomorrowlabelIndex", featuresCol="features")
# Fitting the above model on training data
model = logistic_regression.fit(trainingData)
# Predicting the model on test data
predictions = model.transform(testData)
# Viewing the top 5 contents of predicted values on raintomorrow column
predictions.select("prediction", "RainTomorrowlabelIndex").show(5)
# Evaluating the prediction column using MulticalssClassification evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="RainTomorrowlabelIndex", 
                                              predictionCol="prediction",
                                              metricName="accuracy")

accuracy_logistic = evaluator.evaluate(predictions)
# Printing Error & Accuracy for Logistic Regression algorithm
print("Test Error = %g " % (1.0 - accuracy_logistic))
print("Test accuracy = %g " % (accuracy_logistic*100),"%")
# Storing the predicted value of Logistic Regression to dictionary
prediction_dict['LogisticRegression'] = predictions

## 4. GBT Classifier

In [None]:
# Apply GBTClassifier to classify them.
gbt_classifier = GBTClassifier(labelCol="RainTomorrowlabelIndex", featuresCol="features")
# Fitting the above model on training data
model = gbt_classifier.fit(trainingData)
# Predicting the model on test data
predictions = model.transform(testData)
# Viewing the top 5 contents of predicted values on raintomorrow column
predictions.select("prediction", "RainTomorrowlabelIndex").show(5)
# Evaluating the prediction column using MulticalssClassification evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="RainTomorrowlabelIndex", 
                                              predictionCol="prediction",
                                              metricName="accuracy")

accuracy_GBT = evaluator.evaluate(predictions)
# Printing Error & Accuracy for GBTClassifier algorithm
print("Test Error = %g " % (1.0 - accuracy_GBT))
print("Test accuracy = %g " % (accuracy_GBT*100),"%")
# Storing the predicted value of GBTClassifier to dictionary
prediction_dict['GBTClassifier'] = predictions

 __matplotlib__ : This library is used for plotting the contents of the extracted data.
 
 I had used matplotlib styling with the help of __`ggplot`__.
 
 Plotted the __`bar graph`__ by taking `accuracies` of each algorithms in `Y-axis` & `algorithms` in `X-axis`.
 
__Understandings__:
 - Plotted the accuracy for different algorithms with the help of barchart.
 - For the algorithms `Decision Tree`,`Random Forest`,`Logistic Regression` accuracy values are almost similar whch is upto 83%.
 - Comparitively accuracy is bit higher for `GBT Classifier` algorithm.

In [None]:
# Plotting the bar chart
import matplotlib.pyplot as plt
plt.style.use('ggplot')
%matplotlib inline
accuracies=[accuracy_decisiontree*100,accuracy_randomforest*100,accuracy_logistic*100,accuracy_GBT*100]
algorithms=['DecisionTreeClassifier','RandomForestClassifier','LogisticRegression','GBTClassifier']
plt.figure(figsize=(15,10))
bars = plt.bar(algorithms,accuracies,color = 'C1')#color=['red', 'green', 'blue', 'cyan'])
plt.xticks(rotation=45)
plt.xlabel('Classification Algorithms',fontsize = 15)
plt.ylabel('Accuracies for different algorithms',fontsize = 15)
plt.title('Comparison of Accuracies for different Machine Learning algorithms ',fontsize = 15)
# Visualising the values of each accuracy on bar chart for each algorithm
for bar in bars:
    value = bar.get_height()
    plt.text(bar.get_x(),value, value,fontsize = 13)
plt.show()

In [None]:
# Viewing the contents of dictionary of predicted values for each algorithm
prediction_dict

## Step 09 : Calculate the confusion matrix and find the precision, recall, and F1 score of each classification algorithm. <a id='Step_09' ></a>

 - Calculate the Confusion matrix using confusionMatrix() method.
 - Find the Precision, recall and F1 score of each classification algorithm.

- __`Confusion Matrix`__ : A confusion matrix is a table that is often used to describe the performance of a classification model (or “classifier”) on a set of test data for which the true values are known. It allows the visualization of the performance of an algorithm. Using `MulticlassMetrics` library and with the help of `ConfusionMatrix` function I had calculated the confusion matrix for each classification algorithm

While there are many different types of classification algorithms, the evaluation of classification models all share similar principles. In a supervised classification problem, there exists a true output and a model-generated predicted output for each data point. For this reason, the results for each data point can be assigned to one of four categories:

 - `True Positive (TP)` - label is positive and prediction is also positive
 - `True Negative (TN)` - label is negative and prediction is also negative
 - `False Positive (FP)` - label is negative but prediction is positive
 - `False Negative (FN)` - label is positive but prediction is negative

These four numbers are the building blocks for most classifier evaluation metrics.

- __`Precision`__ : Measures the percentage of the correct classification from the predicted members. Also called as `positive predictive value`. 

\begin{gather*}
    \therefore Precision = \frac{True Positive}{(True Positive + False Positive)}
\end{gather*}

- __`Recall`__ : Measures the percentage of the correct classification from the overall members. Also called as `Sensitivity`. 

\begin{gather*}
    \therefore Recall = \frac{True Positive}{(True Positive + False Negative)}
\end{gather*}

Both precision and recall are therefore based on an understanding and measure of relevance. 

- __`F1 Score`__ : Measures the balances of Precision & Recall. Also called as `F-score or F-measure`. 

\begin{gather*}
    \therefore F1Score = \frac{2*(Precision * Recall)}{(Precision + Recall)}
\end{gather*}

In [None]:
# Importing MulticlassMetrics library for calculating Confusion Matrix
from pyspark.mllib.evaluation import MulticlassMetrics

In [None]:
# Using for loop storing the predicted values for each algorithm
for key,value in prediction_dict.items():
    predict = value.select("prediction", "RainTomorrowlabelIndex")
# Converting the predicted values to RDD
    rdd_predict = predict.rdd
# Created RDD is passed through MulticlassMetrics function
    confusion_matrix = MulticlassMetrics(rdd_predict)
# Calculating Confusion matrix for all algorithms
    ConfusionMatrix = confusion_matrix.confusionMatrix().toArray()
# Declaring True positive, True Negative,False positive, False Negative for predicted column
    tp = value[(value.RainTomorrowlabelIndex == 0) & (value.prediction == 0)].count()
    tn = value[(value.RainTomorrowlabelIndex == 1) & (value.prediction == 1)].count()
    fp = value[(value.RainTomorrowlabelIndex == 1) & (value.prediction == 0)].count()
    fn = value[(value.RainTomorrowlabelIndex == 0) & (value.prediction == 1)].count()
# Calculating Precision for all algorithms
    Precision = tp/(tp+fp)
# Calculating Recall for all algorithms
    Recall = tp/(tp+fn)
# Calculating F1 Score for all algorithms
    F1Score = 2*(Precision * Recall) / (Precision + Recall)
    print('Classification Algorithm:',key)
    print('Confusion Matrix:\n',ConfusionMatrix)
    print('Precision:\n',Precision)
    print('Recall:\n',Recall)
    print('F1 Score:\n',F1Score)
    print('-'*20 + '\n')

## Explain how the accuracy of the predication can be improved?

From the above models we can infer that accuracy of `GBT Classifier` algorithm has little higher accuracy compared to other three algorithms. More the accuracy, will leads to better prediction.

Also there are few other ways which we can consider to improve the accuracy of the prediction:

1. In order to improve accuracy we Add more parameters i.e, this can be done by adding `max-depth` parameter in `Decision-Tree` algorithm, changing the number of trees in `Random Forest`, adding `max-iters` parameter in `Logistic Regression` & `GBT Classifier` algorithm.

2. To make our prediction more accurate, we can `change the random sampling rate of training & test data` till we achieve more accuracy i.e, we can perform `cross-validation` which means try to leave a sample on which you do not train the model and test the model on this sample before finalizing the model.

3. One more way of achieving better accuracy is `Ensemble Method` which combines the results of many weak Machine Learning models and give better results. Best method is by `Bagging` also called as `Bootstrap Aggregating`. Even though this method is more complex than other usual methods, it will yield better results in improving accuracy.

4. Other way is to treat both `Missing Values or Null Values` & `Outlier values` in an effecive manner. Here for numerical columns I had imputed with mean of its values. Instead using prediction models like `KNN` or `Linear Regression` we can predict the missing values which yields better accuracy than mean imputation. Also, removing outliers initially before imputing will yield better accuracy.

These are the few ways where in general we can improve the accuracy of the prediction.

# Summary

This assessment analyze rain data in Australia. Different machine learning algorithms can be used to model and predict rainfall in the Python using Apache Pyspark. The main outcomes achieved while applying these techniques were:

- __Import pyspark and initialize Spark__
- __Load the dataset and print the schema and total number of entries__
- __Delete columns from the dataset__
- __Print the number of missing data in each column__
- __Fill the missing data with average value and maximum occurrence value__
- __Data transformation__
- __Create the feature vector and divide the dataset__
- __Apply machine learning classification algorithms on the dataset and compare their accuracy. Plot the accuracy as bar graph__
- __Calculate the confusion matrix and find the precision, recall, and F1 score of each classification algorithm__

# References

* [Matplotlib Official Documentation)](https://matplotlib.org/3.1.1/api/pyplot_summary.html)
* https://www.analyticsvidhya.com/blog/2015/12/improve-machine-learning-results/
* https://stackoverflow.com/questions/41032256/get-same-value-for-precision-recall-and-f-score-in-apache-spark-logistic-regres
* [Rain in Australia](https://www.kaggle.com/jsphyg/weather-dataset-rattle-package)