# CSIT 5800 Introduction to Big Data
## Spring 2020
### Assignment 2 - PySpark

### Description
In this assignment, you will have an opportunity to:
<ul>
<li>apply data pre-processing tecniques that you learned in the class to a problem using Spark</li>
<li>apply machine learning techniques that you learned in the class to a problem using Spark</li>
</ul>

<br/>
To get started on this assignment, you need to download the given dataset and read the description carefully written on this page. Please note that all implementation of your program should be done with Python.
<br/>

### Intended Learning Outcomes

- Upon completion of this assignment, you should be able to:
<ol>
    <li>Demonstrate your understanding on how to pre-process data using the algorithms / techniques as described in the class.</li>
    <li>Demonstrate your understanding on how to do prediction using the machine learning algorithms / techniques as described in the class.</li>
    <li>Using PySpark to construct Python program to pre-process data, performing machine learning from the training data and do data classification for the testing set.</li>
</ol>

### Dataset
The dataset contains daily weather observations from numerous Australian weather stations.
The problem is to predict whether or not it will rain tomorrow by training a binary classification model on target RainTomorrow
The target variable RainTomorrow means: Will it rain the next day? Yes or No.

Note: You should exclude the variable Risk-MM when training a binary classification model. Not excluding it will leak the answers to your model and reduce its predictability. Read more about it here.

<font color="red">Note: The suggested functions below are for reference only. You can use any functions from PySpark.</font>

## Step 0: Installing PySpark
### Step 0.1 : Install Java
#### Step 0.1.1: Check Java Version

In command prompt:
<pre>java -version</pre>

Note: PySpark requires Java version 7 or later.

#### Step 0.1.2
Install java from the official download website.
<url> https://www.oracle.com/java/technologies/javase-jdk8-downloads.html </url>


### Step 0.2: Install Apache Spark on Windows
<ol>
    <li>Go to the <a href="http://spark.apache.org/downloads.html">Spark download</a>.</li>
    <li>Select the latest stable release (2.4.5 as of May-2020) of Spark for "Choose a Spark release".</li>
    <li>Select a version that is pre-built for the latest version of Hadoop such as Pre-built for Hadoop 2.7 and later
        for "Choose a package type"</li>
    <li>Click the link next to Download Spark to download the spark-2.4.5-bin-hadoop2.7.tgz</li>
    <li>Extract the files from the downloaded zip file using winzip or equivalent (right click on the extracted file and click extract here).</li>
    <li>Make sure that the folder path and the folder name containing Spark files do not contain any spaces.</li>
    <li>Create a folder called "spark" on your desktop and unzip the file that you downloaded as a folder called spark-2.4.5-bin-hadoop2.7. So, all Spark files will be in a folder called C:\Users\[your_user_name]\Desktop\Spark\spark-2.4.0-bin-hadoop2.7. This will be referred as SPARK_HOME.</li>
    <li>To test if your installation was successful, open Anaconda Prompt, change to SPARK_HOME directory and type bin\pyspark. This should start the PySpark shell which can be used to interactively work with Spark. </li>
    <li>Create a system environment variable in Windows called SPARK_HOME that points to the SPARK_HOME folder path.</li>
</ol>

### Step 0.2: Install Apache Spark on Mac

You can use Homebrew to install Apache Spark.
<ol>
    <li>Install Homebrew using the following command in your terminal:<br/>
        <pre>/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"</pre></li>
    <li>Install Spark with Homebrew
         <ol>
         <li>In your terminal, type: <br />
         <pre>brew install apache-spark</pre></li>
         <li>You can check the version of spark: <br />
         <pre>pyspark –version</pre></li>
         </ol>
    </li>
    <li>You may need to install PySpark by:<br />
    <pre>pip install pyspark</pre>
    </li>
    <li>To know where spark is installed: <br/>
    <pre>brew info apache-spark</pre>
    </li>
    <li>Set the environment variables:<br />
    <pre>export SPARK_HOME="[your_path]/ibexec/"</pre>
    </li>
</ol>

### Step 0.3 Using Pyspark in Jupyter Notebook

findspark is a library that automatically sets up the development environment to import Apache Spark library.
To install findspark, run the following in your shell:<br />
<pre>pip install findspark</pre>

In [1500]:
import findspark
findspark.init()
findspark.find()

'/usr/lib/spark/spark-3.0.0-preview2-bin-hadoop2.7'

## Step 1: Importing data and exploring the features (8 points)

In [1501]:
import pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

### Step 1.1
Read the csv file 'weatherAUS.csv' using 
<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.SparkSession">SparkSession.read()</a> to create a dataframe.

In [1502]:
# Put your statement here
data= spark.read.csv(path='weatherAUS.csv',inferSchema=True,header=True)


### Step 1.2
Use show() of <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.DataFrame">pyspark.sql.DataFrame</a> to print out the schema of the dataframe created.

In [1503]:
# Put your statement here
data.show()


+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+-------+------------+
|      Date|Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RISK_MM|RainTomorrow|
+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+-------+------------+
|2008-12-01|  Albury|   13.4|   22.9|     0.6|         NA|      NA|          W|           44|         W|       WNW|          20|          24|         71|         22|     1007.7|     1007.1|       8|      NA|   16.9|   21.8|       No|  

### Step 1.3
Use printSchema() of <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.DataFrame">pyspark.sql.DataFrame</a> to print out the schema of the dataframe created.

In [1504]:
# Put your statement here
data.printSchema()


root
 |-- Date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- MinTemp: string (nullable = true)
 |-- MaxTemp: string (nullable = true)
 |-- Rainfall: string (nullable = true)
 |-- Evaporation: string (nullable = true)
 |-- Sunshine: string (nullable = true)
 |-- WindGustDir: string (nullable = true)
 |-- WindGustSpeed: string (nullable = true)
 |-- WindDir9am: string (nullable = true)
 |-- WindDir3pm: string (nullable = true)
 |-- WindSpeed9am: string (nullable = true)
 |-- WindSpeed3pm: string (nullable = true)
 |-- Humidity9am: string (nullable = true)
 |-- Humidity3pm: string (nullable = true)
 |-- Pressure9am: string (nullable = true)
 |-- Pressure3pm: string (nullable = true)
 |-- Cloud9am: string (nullable = true)
 |-- Cloud3pm: string (nullable = true)
 |-- Temp9am: string (nullable = true)
 |-- Temp3pm: string (nullable = true)
 |-- RainToday: string (nullable = true)
 |-- RISK_MM: double (nullable = true)
 |-- RainTomorrow: string (nullable = true)



Which of the features above have a wrong datatype in the schema? What should the correct datatypes be?

<font color=red>MinTemp float64</font>

<font color=red>MaxTemp float64</font>

<font color=red>Rainfall float64</font>

<font color=red>Evaporation float64</font>

<font color=red>Sunshine float64</font>

<font color=red>WindGustSpeed float64</font>

<font color=red>WindSpeed9am float64</font>

<font color=red>WindSpeed3pm float64</font>

<font color=red>Humidity9am float64</font>

<font color=red>Humidity3pm float64</font>

<font color=red>Pressure9am float64</font>

<font color=red>Pressure3pm float64</font>

<font color=red>Cloud9am float64</font>

<font color=red>Cloud3pm float64</font>

<font color=red>Temp9am float64</font>

<font color=red>Temp3pm float64 </font>



## Step 2: Convert the data types of features (7 points)

### Step 2.1
Import data types from 
<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#module-pyspark.sql.types">pyspark.sql.types</a>

In [1505]:
# Put your statement here
from pyspark.sql.types import DataType


### Step 2.2
Convert the datatype of features using:
<ul>
<li>withColumn() from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.DataFrame">pyspark.sql.DataFrame</a>, and</li>
<li>cast() from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.Column">pyspark.sql.Column</a>
to convert the features (columns) into appropriate types 
(<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#module-pyspark.sql.types">pyspark.sql.types</a>)</li>
</ul>

In [1506]:
# Put your statements here
col=['MinTemp', 'MaxTemp','Rainfall','Evaporation', 'Sunshine', 'WindGustSpeed', 'WindSpeed9am','WindSpeed3pm','Humidity9am','Humidity3pm', 'Pressure9am','Pressure3pm','Cloud9am','Cloud3pm','Temp9am','Temp3pm']
for c in col:
    data=data.withColumn(c,data[c].cast('float'))

## Step 3:Exploring missing values (10 points)

### Step 3.1 
<ul>
<li>Count the missing values in features using:
<ul>
<li>isNull() from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.Column">pyspark.sql.Column</a>
to see whether there are missing values in a certain feature (column).</li>
<li>filter() from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.DataFrame">pyspark.sql.DataFrame</a>
to filter rows given a condition, and</li>
<li>count() from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.DataFrame">pyspark.sql.DataFrame</a>
to count the number of rows in a dataframe</li>
</ul>
<li>Print the features with missing values and their corresponding number of rows with missing values</li>
</ul>


In [1507]:
# Put your statements here
for col in data.columns:
    print('Feature %s : %d '%(col,data.filter(data[col].isNull()).count()))



Feature Date : 0 
Feature Location : 0 
Feature MinTemp : 637 
Feature MaxTemp : 322 
Feature Rainfall : 1406 
Feature Evaporation : 60843 
Feature Sunshine : 67816 
Feature WindGustDir : 0 
Feature WindGustSpeed : 9270 
Feature WindDir9am : 0 
Feature WindDir3pm : 0 
Feature WindSpeed9am : 1348 
Feature WindSpeed3pm : 2630 
Feature Humidity9am : 1774 
Feature Humidity3pm : 3610 
Feature Pressure9am : 14014 
Feature Pressure3pm : 13981 
Feature Cloud9am : 53657 
Feature Cloud3pm : 57094 
Feature Temp9am : 904 
Feature Temp3pm : 2726 
Feature RainToday : 0 
Feature RISK_MM : 0 
Feature RainTomorrow : 0 


### Step 3.2
Some features have values of "NA", which should be regarded as missing values.</br>
Print those features with values of "NA" and their corresponding number of rows with values "NA".

In [1508]:
# Put your statements here
for col in data.columns:
    print('Feature %s : %d '%(col,data.filter(data[col]=='NA').count()))


Feature Date : 0 
Feature Location : 0 
Feature MinTemp : 0 
Feature MaxTemp : 0 
Feature Rainfall : 0 
Feature Evaporation : 0 
Feature Sunshine : 0 
Feature WindGustDir : 9330 
Feature WindGustSpeed : 0 
Feature WindDir9am : 10013 
Feature WindDir3pm : 3778 
Feature WindSpeed9am : 0 
Feature WindSpeed3pm : 0 
Feature Humidity9am : 0 
Feature Humidity3pm : 0 
Feature Pressure9am : 0 
Feature Pressure3pm : 0 
Feature Cloud9am : 0 
Feature Cloud3pm : 0 
Feature Temp9am : 0 
Feature Temp3pm : 0 
Feature RainToday : 1406 
Feature RISK_MM : 0 
Feature RainTomorrow : 0 


## Step 4: Drop the feature, RISK_MM (2 points)
As described in the dataset description, we will need to drop the feature, RISK_MM, using drop() from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.DataFrame">pyspark.sql.DataFrame</a>

In [1509]:
# Put your statement here
data=data.drop('RISK_MM')

## Step 5: Processing the date feature (5 points)

### Step 5.1
Import the to_date(), year(), month(), dayofmonth()
from
<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions">pyspark.sql.functions</a> module.

In [1510]:
# Put your statement here
from pyspark.sql.functions import to_date,year,month,dayofmonth


### Step 5.2
Use the to_date() function to convert the <strong>Date</strong> attribute to datetype.

In [1511]:
# Put your statement here
data=data.withColumn('Date',to_date(data['Date']))

### Step 5.3
Extract the <strong>year</strong>, <strong>month</strong>, <strong>day</strong> attributes of the converted <strong>Date</strong> attribute using year(), month() & dayofmonth()
and create the corresponding new features <strong>Year</strong>, <strong>Month</strong> and <strong>Day</strong>.

In [1512]:
# Put your statements here
data=data.withColumn('Year',year(data['Date']))
data=data.withColumn('Month',month(data['Date']))
data=data.withColumn('Day',dayofmonth(data['Date']))

### Step 5.4
Drop the original <strong>Date</strong> feature.

In [1513]:
# Put your statement here
data=data.drop('Date')


## Step 6: Handling missing values of the categorical features (8 marks)
### Step 6.1 Find the most frequent value for categorical features with "NA" values 
Use groupBy(), count() & show() of <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.DataFrame">pyspark.sql.DataFrame</a> to list the distinct values of a feature and count the corresponding number of rows.

For the categorical features with missing values ("NA"), what is the most frequent value of each of the features?

In [1514]:
# Put your statements here
from pyspark.sql.functions import desc
for index in data.columns:
    if data.select(index).dtypes[0][1]=='string' and data.filter(data[index]=='NA').count()!=0:
        data.groupBy(index).count().sort(desc("count")).show(1)

+-----------+-----+
|WindGustDir|count|
+-----------+-----+
|          W| 9780|
+-----------+-----+
only showing top 1 row

+----------+-----+
|WindDir9am|count|
+----------+-----+
|         N|11393|
+----------+-----+
only showing top 1 row

+----------+-----+
|WindDir3pm|count|
+----------+-----+
|        SE|10663|
+----------+-----+
only showing top 1 row

+---------+------+
|RainToday| count|
+---------+------+
|       No|109332|
+---------+------+
only showing top 1 row



<font color="red">WindGustDir: W</font>
    
<font color="red">WindDir9am: N</font>

<font color="red">WindDir3pm: SE</font>

<font color="red">RainToday: No</font>

### Step 6.2
Import when(), lit() from
<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions">pyspark.sql.functions</a> module.

In [1515]:
# Put your statement here
from pyspark.sql.functions import when,lit

### Step 6.3
For each of the categorical features with missing values ("NA"), replace "NA" with the corresponding most frequent value of each of the features using when(), lit(), otherwise().

In [1516]:
# Put your statements here
data=data.withColumn('WindGustDir',when(data['WindGustDir']=='NA',lit('W')).otherwise(data['WindGustDir']))
data=data.withColumn('WindDir9am',when(data['WindDir9am']=='NA',lit('N')).otherwise(data['WindDir9am']))
data=data.withColumn('WindDir3pm',when(data['WindDir3pm']=='NA',lit('SE')).otherwise(data['WindDir3pm']))
data=data.withColumn('RainToday',when(data['RainToday']=='NA',lit('No')).otherwise(data['RainToday']))

## Step 7 Handling missing values of the numerical features (10 marks)
### Step 7.1 
Print the list of numerical features.

In [1517]:
# Put your statement(s) here
num_f=[]
for index in data.columns:
    if data.select(index).dtypes[0][1]!='string':
        num_f.append(index)
print(num_f)


['MinTemp', 'MaxTemp', 'Rainfall', 'Evaporation', 'Sunshine', 'WindGustSpeed', 'WindSpeed9am', 'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Pressure9am', 'Pressure3pm', 'Cloud9am', 'Cloud3pm', 'Temp9am', 'Temp3pm', 'Year', 'Month', 'Day']


### Step 7.2
Import Imputer from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.feature">pyspark.ml.feature</a> module

In [1518]:
# Put your statement here
from pyspark.ml.feature import Imputer


### Step 7.3
Using <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.Imputer">pyspark.ml.feature.imputer</a> to fill in the missing values of the numerical features with mean.

In [1519]:
# Put your statements here
imputer=Imputer(strategy='mean',inputCols=num_f, outputCols=num_f)
#for index in data.columns:
    #if data.select(index).dtypes[0][1]!='string' and data.filter(data[index].isNull()).count()!=0:
imputer=imputer.fit(data)
data=imputer.transform(data)

## Step 8: Transform the features (10 marks)
### Step 8.1
Import skewness from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions">pyspark.sql.functions</a> module.

In [1520]:
# Put your statement here
from pyspark.sql.functions import skewness


### Step 8.2
We can get the skewness using skewness() from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions">pyspark.sql.functions</a> module.

Find the features which are with skewness values larger than 0.75, and print the features together with their skewness values

In [1521]:
# Put your statements here
skew_index=[]
for index in data.columns:
    if data.select(index).dtypes[0][1]!='string':
        if data.select(skewness(index)).take(1)[0][-1]>0.75:
            data.select(skewness(index)).show()
            skew_index.append(index)

+------------------+
|skewness(Rainfall)|
+------------------+
|  9.93720720656584|
+------------------+

+---------------------+
|skewness(Evaporation)|
+---------------------+
|   4.9535525141524825|
+---------------------+

+-----------------------+
|skewness(WindGustSpeed)|
+-----------------------+
|     0.9042674361671061|
+-----------------------+

+----------------------+
|skewness(WindSpeed9am)|
+----------------------+
|    0.7791876023517204|
+----------------------+



### Step 8.3
Import log1p from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions">pyspark.sql.functions</a> module.

In [1522]:
# Put your statement here
from pyspark.sql.functions import log1p


### Step 8.4

Apply log transformation on those features with skewness values larger than 0.75 using log1p() from 
<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions">pyspark.sql.functions</a> module.

In [1523]:
# Put your statements here
for i in skew_index:
    data=data.withColumn(i,log1p(data[i]))
pipline_data=data

## Step 9 Converting Categorial features  (10 points)

### Step 9.1 List categorical features
Get and print the list of categorial features (exclude "RainTomorrow")

In [1524]:
# Put your statements here
cat_f=[]
cat_trans=[]
for index in data.columns:
    if data.select(index).dtypes[0][1]=='string' and index!='RainTomorrow':
        cat_f.append(index)
        cat_trans.append(index+'_onehot')
print(cat_f)


['Location', 'WindGustDir', 'WindDir9am', 'WindDir3pm', 'RainToday']


### Step 9.2 Convert categorical features into dummy/indicator features
#### Step 9.2.1
Import <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.StringIndexer">StringIndex</a> and <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.OneHotEncoderEstimator">OneHotEncoderEstimator</a> from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.feature">pyspark.ml.feature</a> module.

In [1525]:
# Put your statement here
from pyspark.ml.feature import StringIndexer,OneHotEncoder


#### Step 9.2.2
Using StringIndexer to convert categorical values into category indices for each of the categorical features

In [1526]:
# Put your statement here
for i in cat_f:
    stringindexer = StringIndexer(inputCol=i, outputCol=i+'_indexer')
    stringindexer=stringindexer.fit(data)
    data=stringindexer.transform(data)
    data=data.drop(i)
    data=data.withColumnRenamed(i+'_indexer',i)
    
label_indexer = StringIndexer(inputCol='RainTomorrow', outputCol='RainTomorrow_num').fit(data)
data = label_indexer.transform(data)


#### Step 9.2.3
Using OneHotEncoderEstimator to map a column of category indices to a column of binary vectors for the categorical features.

In [1527]:
# Put your statements here
encoder = OneHotEncoder(dropLast=False, inputCols=cat_f, outputCols=cat_trans)
encoder=encoder.fit(data)
data=encoder.transform(data)


## Step 10: Training the Regression model  (20 points)

### Step 10.1: Creating the feature vector

#### Step 10.1.1
Import VectorAssembler from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.feature">pyspark.ml.feature</a> module.


In [1528]:
# Put your statement here
from pyspark.ml.feature import VectorAssembler


#### Step 10.1.2
Using <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler">VectorAssembler<a/>, a feature transformer, to merge the columns created in step 9.2.3 and the columns of numerical features into a vector column.

In [1529]:
# Put your statements here
f_all=num_f[:]
#f_all.extend(cat_trans)
vector=VectorAssembler(inputCols=f_all,outputCol="features")
new_data=vector.transform(data)

### Step 10.2 
Is there any other transformation on the dataframe needed?

In [1530]:
# Put your statement here
from pyspark.ml.feature import Normalizer
normalizer = Normalizer(p=2.0, inputCol='features', outputCol='features_norm')
new_data = normalizer.transform(new_data)
new_data=new_data.drop('features')

f_all=['features_norm']
f_all.extend(cat_trans)
vector=VectorAssembler(inputCols=f_all,outputCol="features")
new_data=vector.transform(new_data)
new_data=new_data.drop('features_norm')

### Step 10.3 Split the data into training data and testing data

Using randomSplit of <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame">pyspark.sql.DataFrame</a>
to randomly splits the DataFrame with the ratio of 0.7 and 0.3 into training and testing datasets.

In [1531]:
# Put your statement here
train_data,test_data=new_data.randomSplit([0.7,0.3])


### Step 10.4 Build the logistic regression model

#### Step 10.4.1
Import LogisticRegression from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.classification">pyspark.ml.classification</a> module.

In [1532]:
# Put your statement here
from pyspark.ml.classification import LogisticRegression


#### Step 10.4.2
<ol>
<li>Initialize a Logistic Regression model by <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegression">LogisticRegression()</a> function using the feature vector generated in Step 10.1.2.</li>
<li>Use fit() function to train the logistic regression model using the training feature data.</li>
</ol>

In [1533]:
# Put your statements here
blor=LogisticRegression(regParam=0.01, maxIter=1000,labelCol = 'RainTomorrow_num').fit(train_data)




#### Step 10.4.3
<ul>
    <li>Gets summary of model trained on the training set. </li>
    <li>Print the accuracy, objective history, total iterations of the trained model.</li>
</ul>

In [1534]:
# Put your statements here
trainingSummary = blor.summary
print("Accuracy: " + str(trainingSummary.accuracy))
print("Iterations: " + str(trainingSummary.totalIterations))
objectiveHistory = trainingSummary.objectiveHistory
print("ObjectiveHistory:")
for objective in objectiveHistory:
    print(objective)





Accuracy: 0.8476166557838164
Iterations: 302
ObjectiveHistory:
0.5319232091540478
0.5312988787331828
0.50115352804076
0.40836677237548574
0.4026176540546599
0.4017958286217634
0.4017562332761363
0.4017525386128751
0.40173256387834544
0.40165202475701756
0.4014933952570464
0.4010430264585518
0.3999713290324089
0.39748484631192993
0.392771833561181
0.38678951981117665
0.3847319840632798
0.38127307365806967
0.3802263319781325
0.38017078166599455
0.37970355934625016
0.37604208896810126
0.37342685857351826
0.37169360102877125
0.37149328664619086
0.3714836963347408
0.3714788596284995
0.37146775010599226
0.37143922198309975
0.37143332059003686
0.37137333805895323
0.37118388670150526
0.3708708357963599
0.37057151521350606
0.3694303266782195
0.36757964596374143
0.36618925001035135
0.3657991142304294
0.36575031951269965
0.36573022101476826
0.36572543948705194
0.36571602053145286
0.36555201438215235
0.3643807248445877
0.3641454438896196
0.3640098674367555
0.364004489416136
0.3640013815231301
0.36

#### Step 10.4.4
Predict the target values for the testing feature data using the transform() function.</li>

In [1535]:
# Put your statement here
predict=blor.transform(test_data)


#### Step 10.4.5
Use show() to list the top 5 rows of results in Step 10.4.4, show only "prediction", "RainTomorrow" and the featuresCol specified in the LogisticRegression model.

In [1536]:
# Put your statement here
show_col=num_f[:]
show_col.append('prediction')
show_col.append('RainTomorrow_num')
predict.select(show_col).show(5)


+-------+-------+------------------+------------------+--------+------------------+------------------+------------+-----------+-----------+-----------+-----------+---------+---------+-------+-------+----+-----+---+----------+----------------+
|MinTemp|MaxTemp|          Rainfall|       Evaporation|Sunshine|     WindGustSpeed|      WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm| Cloud9am| Cloud3pm|Temp9am|Temp3pm|Year|Month|Day|prediction|RainTomorrow_num|
+-------+-------+------------------+------------------+--------+------------------+------------------+------------+-----------+-----------+-----------+-----------+---------+---------+-------+-------+----+-----+---+----------+----------------+
|   -3.2|   15.7|               0.0|1.4350845366425882|     9.1| 3.044522437723423|1.0986122886681096|        13.0|       71.0|       36.0|     1026.2|     1023.0|      0.0|      0.0|    5.5|   15.0|2011|    7| 12|       0.0|             0.0|
|   -3.1|   16.9|           

### Step 10.5 Evaluate the results
#### Step 10.5.1 
Import MulticlassClassificationEvaluator from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.evaluation">pyspark.ml.evaluation</a> module.

In [1537]:
# Put your statement here
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


#### Step 10.5.2
Using <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.MulticlassClassificationEvaluator">pyspark.ml.evaluation.MulticlassClassificationEvaluator</a> to evaluate the predictions from Step 10.4.4.
Print the evaluation results.

In [1538]:
# Put your statements here
evaluator = MulticlassClassificationEvaluator(labelCol="RainTomorrow_num", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predict)
print("Accuracy: " + str(accuracy))



Accuracy: 0.845057212530482


## Step 11: Pipeline (10 marks)

### Step 11.1
Import <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.Pipeline">Pipeline</a> from 
<a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml">pyspark.ml</a> package.

In [1539]:
# Put your statement here
from pyspark.ml import Pipeline


### Step 11.2

Rewrite Steps 9.2 to 10.5:
<ol>
    <li>Configure an ML pipeline which consists of the StringIndexer, OneHotEncoderEstimator, VectorAssembler, LogisticRegression</li>
    <li>Fit the pipeline to the training data.</li>
    <li>Make predictions on the testing data.</li>
    <li>Evaluate the prediction results as in step 10.5.</li>
</ol>

Note: 
<ul>
    <li>It is fine to have the steps 9.2 to 10.5 slightly rearranged in this step.</li>
    <li>Hence, the evaluation results may be slightly different.</li>
</ul>

In [None]:
# Put your statements here
train_data,test_data=pipline_data.randomSplit([0.7,0.3])
feature_col=[]
for index in pipline_data.columns:
    if pipline_data.select(index).dtypes[0][1]!='string':
        feature_col.append(str(index))
cat_f=[]
for index in pipline_data.columns:
    if pipline_data.select(index).dtypes[0][1]=='string' and index!='RainTomorrow':
        cat_f.append(str(index))
        feature_col.append(str(index+'_onehot'))
string_indexers=[]
for c in cat_f:
    temp_ind=StringIndexer()
    temp_ind.setInputCol(c)
    temp_ind.setOutputCol(c+'_indexer')
    string_indexers.append(temp_ind)

encoder = OneHotEncoder(dropLast=False, inputCols=[c+'_indexer' for c in cat_f], outputCols=[c+'_onehot' for c in cat_f])
vector=VectorAssembler(inputCols=feature_col,outputCol="features")
string_indexers.append(StringIndexer(inputCol='RainTomorrow', outputCol='RainTomorrow_num'))
lr=LogisticRegression(regParam=0.01, maxIter=1000,labelCol = 'RainTomorrow_num',featuresCol="features")
stages_all=string_indexers+[encoder,vector,lr]
pipeline = Pipeline(stages=stages_all).fit(train_data)
predictions = pipeline.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="RainTomorrow_num", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy: " + str(accuracy))



## Step 12 Submission
Submit your jupyter notebook (.ipynb) to Canvas.

<center>The end of HW2</center>