# CSIT 5800 Introduction to Big Data
## Spring 2020
### Assignment 2 - PySpark

### Description
In this assignment, you will have an opportunity to:
<ul>
<li>apply data pre-processing tecniques that you learned in the class to a problem using Spark</li>
<li>apply machine learning techniques that you learned in the class to a problem using Spark</li>
</ul>

<br/>
To get started on this assignment, you need to download the given dataset and read the description carefully written on this page. Please note that all implementation of your program should be done with Python.
<br/>

### Intended Learning Outcomes

- Upon completion of this assignment, you should be able to:
<ol>
    <li>Demonstrate your understanding on how to pre-process data using the algorithms / techniques as described in the class.</li>
    <li>Demonstrate your understanding on how to do prediction using the machine learning algorithms / techniques as described in the class.</li>
    <li>Using PySpark to construct Python program to pre-process data, performing machine learning from the training data and do data classification for the testing set.</li>
</ol>

### Dataset
The dataset contains daily weather observations from numerous Australian weather stations.
The problem is to predict whether or not it will rain tomorrow by training a binary classification model on target RainTomorrow
The target variable RainTomorrow means: Will it rain the next day? Yes or No.

Note: You should exclude the variable Risk-MM when training a binary classification model. Not excluding it will leak the answers to your model and reduce its predictability. Read more about it here.

<font color="red">Note: The suggested functions below are for reference only. You can use any functions from PySpark.</font>

## Step 0: Installing PySpark

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"




### Step 0.1 : Install Java
#### Step 0.1.1: Check Java Version

In command prompt:
<pre>java -version</pre>

Note: PySpark requires Java version 7 or later.

#### Step 0.1.2
Install java from the official download website.
<url> https://www.oracle.com/java/technologies/javase-jdk8-downloads.html </url>


### Step 0.2: Install Apache Spark on Windows
<ol>
    <li>Go to the <a href="http://spark.apache.org/downloads.html">Spark download</a>.</li>
    <li>Select the latest stable release (2.4.5 as of May-2020) of Spark for "Choose a Spark release".</li>
    <li>Select a version that is pre-built for the latest version of Hadoop such as Pre-built for Hadoop 2.7 and later
        for "Choose a package type"</li>
    <li>Click the link next to Download Spark to download the spark-2.4.5-bin-hadoop2.7.tgz</li>
    <li>Extract the files from the downloaded zip file using winzip or equivalent (right click on the extracted file and click extract here).</li>
    <li>Make sure that the folder path and the folder name containing Spark files do not contain any spaces.</li>
    <li>Create a folder called "spark" on your desktop and unzip the file that you downloaded as a folder called spark-2.4.5-bin-hadoop2.7. So, all Spark files will be in a folder called C:\Users\[your_user_name]\Desktop\Spark\spark-2.4.0-bin-hadoop2.7. This will be referred as SPARK_HOME.</li>
    <li>To test if your installation was successful, open Anaconda Prompt, change to SPARK_HOME directory and type bin\pyspark. This should start the PySpark shell which can be used to interactively work with Spark. </li>
    <li>Create a system environment variable in Windows called SPARK_HOME that points to the SPARK_HOME folder path.</li>
</ol>

### Step 0.2: Install Apache Spark on Mac

You can use Homebrew to install Apache Spark.
<ol>
    <li>Install Homebrew using the following command in your terminal:<br/>
        <pre>/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"</pre></li>
    <li>Install Spark with Homebrew
         <ol>
         <li>In your terminal, type: <br />
         <pre>brew install apache-spark</pre></li>
         <li>You can check the version of spark: <br />
         <pre>pyspark –version</pre></li>
         </ol>
    </li>
    <li>You may need to install PySpark by:<br />
    <pre>pip install pyspark</pre>
    </li>
    <li>To know where spark is installed: <br/>
    <pre>brew info apache-spark</pre>
    </li>
    <li>Set the environment variables:<br />
    <pre>export SPARK_HOME="[your_path]/ibexec/"</pre>
    </li>
</ol>

### Step 0.3 Using Pyspark in Jupyter Notebook

findspark is a library that automatically sets up the development environment to import Apache Spark library.
To install findspark, run the following in your shell:<br />
<pre>pip install findspark</pre>

In [2]:
import findspark
findspark.init()
findspark.find()

'/content/spark-2.4.5-bin-hadoop2.7'

## Step 1: Importing data and exploring the features (8 points)

In [0]:
import pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

### Step 1.1
Read the csv file 'weatherAUS.csv' using 
<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.SparkSession">SparkSession.read()</a> to create a dataframe.

In [0]:
data = spark.read.csv('/content/weatherAUS.csv',header=True)



### Step 1.2
Use show() of <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.DataFrame">pyspark.sql.DataFrame</a> to print out the schema of the dataframe created.

In [5]:
data.show()



+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+-------+------------+
|      Date|Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RISK_MM|RainTomorrow|
+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+-------+------------+
|2008-12-01|  Albury|   13.4|   22.9|     0.6|         NA|      NA|          W|           44|         W|       WNW|          20|          24|         71|         22|     1007.7|     1007.1|       8|      NA|   16.9|   21.8|       No|  

### Step 1.3
Use printSchema() of <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.DataFrame">pyspark.sql.DataFrame</a> to print out the schema of the dataframe created.

In [6]:
data.printSchema()



root
 |-- Date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- MinTemp: string (nullable = true)
 |-- MaxTemp: string (nullable = true)
 |-- Rainfall: string (nullable = true)
 |-- Evaporation: string (nullable = true)
 |-- Sunshine: string (nullable = true)
 |-- WindGustDir: string (nullable = true)
 |-- WindGustSpeed: string (nullable = true)
 |-- WindDir9am: string (nullable = true)
 |-- WindDir3pm: string (nullable = true)
 |-- WindSpeed9am: string (nullable = true)
 |-- WindSpeed3pm: string (nullable = true)
 |-- Humidity9am: string (nullable = true)
 |-- Humidity3pm: string (nullable = true)
 |-- Pressure9am: string (nullable = true)
 |-- Pressure3pm: string (nullable = true)
 |-- Cloud9am: string (nullable = true)
 |-- Cloud3pm: string (nullable = true)
 |-- Temp9am: string (nullable = true)
 |-- Temp3pm: string (nullable = true)
 |-- RainToday: string (nullable = true)
 |-- RISK_MM: string (nullable = true)
 |-- RainTomorrow: string (nullable = true)



Which of the features above have a wrong datatype in the schema? What should the correct datatypes be?

<font color=red>
[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustSpeed, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RISK_MM]
</font>
<br> The above features should be double datatype.




## Step 2: Convert the data types of features (7 points)

### Step 2.1
Import data types from 
<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#module-pyspark.sql.types">pyspark.sql.types</a>

In [0]:
from pyspark.sql.types import DoubleType



### Step 2.2
Convert the datatype of features using:
<ul>
<li>withColumn() from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.DataFrame">pyspark.sql.DataFrame</a>, and</li>
<li>cast() from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.Column">pyspark.sql.Column</a>
to convert the features (columns) into appropriate types 
(<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#module-pyspark.sql.types">pyspark.sql.types</a>)</li>
</ul>

In [8]:
str_features = ['MinTemp', 'MaxTemp', 'Rainfall', 'Evaporation', 'Sunshine', 'WindGustSpeed', 
                'WindSpeed9am', 'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Pressure9am', 
                'Pressure3pm', 'Cloud9am', 'Cloud3pm', 'Temp9am', 'Temp3pm', 'RISK_MM']

for i in range(len(str_features)):
  data = data.withColumn(str_features[i], data[str_features[i]].cast('double'))

data.printSchema()


root
 |-- Date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- MinTemp: double (nullable = true)
 |-- MaxTemp: double (nullable = true)
 |-- Rainfall: double (nullable = true)
 |-- Evaporation: double (nullable = true)
 |-- Sunshine: double (nullable = true)
 |-- WindGustDir: string (nullable = true)
 |-- WindGustSpeed: double (nullable = true)
 |-- WindDir9am: string (nullable = true)
 |-- WindDir3pm: string (nullable = true)
 |-- WindSpeed9am: double (nullable = true)
 |-- WindSpeed3pm: double (nullable = true)
 |-- Humidity9am: double (nullable = true)
 |-- Humidity3pm: double (nullable = true)
 |-- Pressure9am: double (nullable = true)
 |-- Pressure3pm: double (nullable = true)
 |-- Cloud9am: double (nullable = true)
 |-- Cloud3pm: double (nullable = true)
 |-- Temp9am: double (nullable = true)
 |-- Temp3pm: double (nullable = true)
 |-- RainToday: string (nullable = true)
 |-- RISK_MM: double (nullable = true)
 |-- RainTomorrow: string (nullable = true)



## Step 3:Exploring missing values (10 points)

### Step 3.1 
<ul>
<li>Count the missing values in features using:
<ul>
<li>isNull() from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.Column">pyspark.sql.Column</a>
to see whether there are missing values in a certain feature (column).</li>
<li>filter() from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.DataFrame">pyspark.sql.DataFrame</a>
to filter rows given a condition, and</li>
<li>count() from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.DataFrame">pyspark.sql.DataFrame</a>
to count the number of rows in a dataframe</li>
</ul>
<li>Print the features with missing values and their corresponding number of rows with missing values</li>
</ul>


In [9]:
for c in data.columns:
  cnt = data.filter(data[c].isNull()).count()
  if cnt !=0:
    print(c + '\t\t' + str(cnt))

MinTemp		637
MaxTemp		322
Rainfall		1406
Evaporation		60843
Sunshine		67816
WindGustSpeed		9270
WindSpeed9am		1348
WindSpeed3pm		2630
Humidity9am		1774
Humidity3pm		3610
Pressure9am		14014
Pressure3pm		13981
Cloud9am		53657
Cloud3pm		57094
Temp9am		904
Temp3pm		2726


### Step 3.2
Some features have values of "NA", which should be regarded as missing values.</br>
Print those features with values of "NA" and their corresponding number of rows with values "NA".

In [10]:
for c in data.columns:
  cnt = data.filter(data[c].contains('NA')).count()
  if cnt !=0:
    print(c + '\t\t' + str(cnt))



WindGustDir		9330
WindDir9am		10013
WindDir3pm		3778
RainToday		1406


## Step 4: Drop the feature, RISK_MM (2 points)
As described in the dataset description, we will need to drop the feature, RISK_MM, using drop() from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.DataFrame">pyspark.sql.DataFrame</a>

In [11]:
data = data.drop('RISK_MM')
data.printSchema()


root
 |-- Date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- MinTemp: double (nullable = true)
 |-- MaxTemp: double (nullable = true)
 |-- Rainfall: double (nullable = true)
 |-- Evaporation: double (nullable = true)
 |-- Sunshine: double (nullable = true)
 |-- WindGustDir: string (nullable = true)
 |-- WindGustSpeed: double (nullable = true)
 |-- WindDir9am: string (nullable = true)
 |-- WindDir3pm: string (nullable = true)
 |-- WindSpeed9am: double (nullable = true)
 |-- WindSpeed3pm: double (nullable = true)
 |-- Humidity9am: double (nullable = true)
 |-- Humidity3pm: double (nullable = true)
 |-- Pressure9am: double (nullable = true)
 |-- Pressure3pm: double (nullable = true)
 |-- Cloud9am: double (nullable = true)
 |-- Cloud3pm: double (nullable = true)
 |-- Temp9am: double (nullable = true)
 |-- Temp3pm: double (nullable = true)
 |-- RainToday: string (nullable = true)
 |-- RainTomorrow: string (nullable = true)



## Step 5: Processing the date feature (5 points)

### Step 5.1
Import the to_date(), year(), month(), dayofmonth()
from
<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions">pyspark.sql.functions</a> module.

In [0]:
from pyspark.sql.functions import to_date, year, month, dayofmonth



### Step 5.2
Use the to_date() function to convert the <strong>Date</strong> attribute to datetype.

In [0]:
data = data.withColumn('Date',to_date(data['Date']))


### Step 5.3
Extract the <strong>year</strong>, <strong>month</strong>, <strong>day</strong> attributes of the converted <strong>Date</strong> attribute using year(), month() & dayofmonth()
and create the corresponding new features <strong>Year</strong>, <strong>Month</strong> and <strong>Day</strong>.

In [0]:
data = data.withColumn('Year',year(data['Date']))
data = data.withColumn('Month',month(data['Date']))
data = data.withColumn('Day',dayofmonth(data['Date']))


### Step 5.4
Drop the original <strong>Date</strong> feature.

In [0]:
data = data.drop('Date')


## Step 6: Handling missing values of the categorical features (8 marks)
### Step 6.1 Find the most frequent value for categorical features with "NA" values 
Use groupBy(), count() & show() of <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.DataFrame">pyspark.sql.DataFrame</a> to list the distinct values of a feature and count the corresponding number of rows.

In [16]:
cate_fea = ['WindGustDir', 'WindDir9am','WindDir3pm','RainToday']
for c in cate_fea:
  data.groupBy(c).count().show()



+-----------+-----+
|WindGustDir|count|
+-----------+-----+
|        SSE| 8993|
|         SW| 8797|
|         NW| 8003|
|         NA| 9330|
|          E| 9071|
|        WSW| 8901|
|        ENE| 7992|
|         NE| 7060|
|        NNW| 6561|
|          N| 9033|
|        SSW| 8610|
|          W| 9780|
|          S| 8949|
|         SE| 9309|
|        WNW| 8066|
|        NNE| 6433|
|        ESE| 7305|
+-----------+-----+

+----------+-----+
|WindDir9am|count|
+----------+-----+
|       SSE| 8966|
|        SW| 8237|
|        NW| 8552|
|        NA|10013|
|         E| 9024|
|       WSW| 6843|
|       ENE| 7735|
|        NE| 7527|
|       NNW| 7840|
|         N|11393|
|       SSW| 7448|
|         W| 8260|
|         S| 8493|
|        SE| 9162|
|       WNW| 7194|
|       NNE| 7948|
|       ESE| 7558|
+----------+-----+

+----------+-----+
|WindDir3pm|count|
+----------+-----+
|       SSE| 9142|
|        NW| 8468|
|        SW| 9182|
|        NA| 3778|
|         E| 8342|
|       WSW| 9329|
|       

For the categorical features with missing values ("NA"), what is the most frequent value of each of the features?

<font color="red">
WindGustDir: W <br>
WindDir9am: N <br>
WindDir3pm: SE <br>
RainToday: No <br>
</font>



### Step 6.2
Import when(), lit() from
<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions">pyspark.sql.functions</a> module.

In [0]:
from pyspark.sql.functions import when, lit



### Step 6.3
For each of the categorical features with missing values ("NA"), replace "NA" with the corresponding most frequent value of each of the features using when(), lit(), otherwise().

In [0]:
data = data.withColumn('WindGustDir', when(data['WindGustDir'].contains('NA'), 
                                         lit('W')).otherwise(data['WindGustDir']))
data = data.withColumn('WindDir9am', when(data['WindDir9am'].contains('NA'), 
                                         lit('N')).otherwise(data['WindDir9am']))
data = data.withColumn('WindDir3pm', when(data['WindDir3pm'].contains('NA'), 
                                         lit('SE')).otherwise(data['WindDir3pm']))
data = data.withColumn('RainToday', when(data['RainToday'].contains('NA'), 
                                         lit('No')).otherwise(data['RainToday']))


## Step 7 Handling missing values of the numerical features (10 marks)
### Step 7.1 
Print the list of numerical features.

In [19]:
num_list = [item[0] for item in data.dtypes if (item[1].startswith('double'))|(item[1].startswith('int'))]
print(num_list)


['MinTemp', 'MaxTemp', 'Rainfall', 'Evaporation', 'Sunshine', 'WindGustSpeed', 'WindSpeed9am', 'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Pressure9am', 'Pressure3pm', 'Cloud9am', 'Cloud3pm', 'Temp9am', 'Temp3pm', 'Year', 'Month', 'Day']


### Step 7.2
Import Imputer from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.feature">pyspark.ml.feature</a> module

In [0]:
from pyspark.ml.feature import Imputer

### Step 7.3
Using <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.Imputer">pyspark.ml.feature.imputer</a> to fill in the missing values of the numerical features with mean.

In [0]:
double_list = [item[0] for item in data.dtypes if (item[1].startswith('double'))]
imputer = Imputer(inputCols = double_list, outputCols = double_list,strategy='mean')
data = imputer.fit(data).transform(data)


## Step 8: Transform the features (10 marks)
### Step 8.1
Import skewness from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions">pyspark.sql.functions</a> module.

In [0]:
from pyspark.sql.functions import skewness


### Step 8.2
We can get the skewness using skewness() from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions">pyspark.sql.functions</a> module.

Find the features which are with skewness values larger than 0.75, and print the features together with their skewness values

In [23]:
for c in double_list:
  num = data.select(skewness(data[c])).collect()[0][0]
  if num>0.75:
    print(c + '\t\t\t' +str(num))


Rainfall			9.937207166986028
Evaporation			4.9535525197618835
WindGustSpeed			0.9042674275194919
WindSpeed9am			0.7791876036690152


### Step 8.3
Import log1p from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions">pyspark.sql.functions</a> module.

In [0]:
from pyspark.sql.functions import log1p



### Step 8.4

Apply log transformation on those features with skewness values larger than 0.75 using log1p() from 
<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions">pyspark.sql.functions</a> module.

In [0]:
ske_list = ['Rainfall','Evaporation','WindGustSpeed','WindSpeed9am']

for c in ske_list:
  data = data.withColumn(c,log1p(data[c]))

# a copy of data for pipline later
data_pip = data 

## Step 9 Converting Categorial features  (10 points)

### Step 9.1 List categorical features
Get and print the list of categorial features (exclude "RainTomorrow")

In [26]:
cat_list = [item[0] for item in data.dtypes if (item[1].startswith('string'))]
cat_list.remove('RainTomorrow')
print(cat_list)


['Location', 'WindGustDir', 'WindDir9am', 'WindDir3pm', 'RainToday']


### Step 9.2 Convert categorical features into dummy/indicator features
#### Step 9.2.1
Import <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.StringIndexer">StringIndex</a> and <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.OneHotEncoderEstimator">OneHotEncoderEstimator</a> from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.feature">pyspark.ml.feature</a> module.

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator


#### Step 9.2.2
Using StringIndexer to convert categorical values into category indices for each of the categorical features

In [0]:
for c in cat_list:
  indexer = StringIndexer(inputCol=c, outputCol=str(c+'_idx'))
  data = indexer.fit(data).transform(data)

for c in cat_list: 
  data = data.drop(c)


#### Step 9.2.3
Using OneHotEncoderEstimator to map a column of category indices to a column of binary vectors for the categorical features.

In [0]:
cat_list2 = [item+'_idx' for item in cat_list]

encoder = OneHotEncoderEstimator(inputCols=cat_list2,outputCols=cat_list)
data = encoder.fit(data).transform(data)

for c in cat_list2:
  data = data.drop(c)


## Step 10: Training the Regression model  (20 points)

### Step 10.1: Creating the feature vector

#### Step 10.1.1
Import VectorAssembler from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.feature">pyspark.ml.feature</a> module.


In [0]:
from pyspark.ml.feature import VectorAssembler



#### Step 10.1.2
Using <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler">VectorAssembler<a/>, a feature transformer, to merge the columns created in step 9.2.3 and the columns of numerical features into a vector column.

In [0]:
int_list = [item[0] for item in data.dtypes if (item[1].startswith('int'))]
f_list = double_list + cat_list + int_list

vecAssembler = VectorAssembler(inputCols=f_list, outputCol="features",handleInvalid="keep")
data = vecAssembler.transform(data)


### Step 10.2 
Is there any other transformation on the dataframe needed?

In [0]:
# convert RainTomorrow to boolean label 
data = data.withColumn('label', when(data['RainTomorrow'] == 'No', lit(0)).otherwise(lit(1)))


### Step 10.3 Split the data into training data and testing data

Using randomSplit of <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame">pyspark.sql.DataFrame</a>
to randomly splits the DataFrame with the ratio of 0.7 and 0.3 into training and testing datasets.

In [0]:
df = data.select(['features','label'])
train_set, test_set = df.randomSplit([0.7,0.3],0)


### Step 10.4 Build the logistic regression model

#### Step 10.4.1
Import LogisticRegression from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.classification">pyspark.ml.classification</a> module.

In [0]:
from pyspark.ml.classification import LogisticRegression



#### Step 10.4.2
<ol>
<li>Initialize a Logistic Regression model by <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegression">LogisticRegression()</a> function using the feature vector generated in Step 10.1.2.</li>
<li>Use fit() function to train the logistic regression model using the training feature data.</li>
</ol>

In [0]:
clf = LogisticRegression(featuresCol='features',labelCol='label').fit(train_set)


#### Step 10.4.3
<ul>
    <li>Gets summary of model trained on the training set. </li>
    <li>Print the accuracy, objective history, total iterations of the trained model.</li>
</ul>

In [36]:
clf_info = clf.summary
print('summary information:')
print('accuracy: ' + str(clf_info.accuracy))
print('objective history: ' + str(clf_info.objectiveHistory))
print('total iterations: ' + str(clf_info.totalIterations))


summary information:
accuracy: 0.8472351919350142
objective history: [0.5319942642901524, 0.531898180666283, 0.531898180666283, 0.5318037511311586, 0.5318000291127355, 0.5317977099598908, 0.5317829746898022, 0.5317528533710458, 0.531665695001189, 0.5314468068658789, 0.5308700638794406, 0.5293923403795086, 0.5256414538007222, 0.5164672166432003, 0.4955449674308512, 0.45617976005284466, 0.4167738536110965, 0.4027853164078687, 0.39682594503489405, 0.3944890298274528, 0.3941063483647683, 0.394069874168949, 0.39406928882900555, 0.3940662639878889, 0.39406032188619383, 0.3940426866348724, 0.3939986244959741, 0.3938819467755806, 0.39358300184676076, 0.39282664828617253, 0.39101995581231747, 0.38723642951595083, 0.38153048026638836, 0.38064428274217305, 0.37626663399632576, 0.3755013238982979, 0.37533635386935693, 0.3753320419571259, 0.37527581321735726, 0.3751659875981029, 0.37484530945495215, 0.37408186988234393, 0.3723104191387503, 0.3690616794802842, 0.36523965545396514, 0.3632165485148966

#### Step 10.4.4
Predict the target values for the testing feature data using the transform() function.</li>

In [0]:
pred = clf.transform(test_set)



#### Step 10.4.5
Use show() to list the top 5 rows of results in Step 10.4.4, show only "prediction", "RainTomorrow" and the featuresCol specified in the LogisticRegression model.

In [38]:
pred['prediction','label','features'].head(5)



[Row(prediction=0.0, label=1, features=SparseVector(113, {0: 0.2, 1: 17.0, 2: 0.6931, 3: 1.5686, 4: 7.6249, 5: 3.7132, 6: 2.7082, 7: 18.6376, 8: 81.0, 9: 47.0, 10: 1017.6538, 11: 1015.2582, 12: 4.4372, 13: 4.5032, 14: 8.5, 15: 15.3, 16: 1.0, 64: 1.0, 79: 1.0, 94: 1.0, 109: 1.0, 110: 2009.0, 111: 10.0, 112: 17.0})),
 Row(prediction=0.0, label=0, features=SparseVector(113, {0: 4.5, 1: 15.5, 2: 0.1823, 3: 0.7885, 4: 7.6249, 5: 3.7132, 6: 2.7082, 7: 18.6376, 8: 78.0, 9: 45.0, 10: 1017.6538, 11: 1015.2582, 12: 4.4372, 13: 4.5032, 14: 9.7, 15: 15.0, 16: 1.0, 64: 1.0, 79: 1.0, 94: 1.0, 109: 1.0, 110: 2009.0, 111: 8.0, 112: 13.0})),
 Row(prediction=0.0, label=0, features=SparseVector(113, {0: 5.3, 1: 15.3, 2: 0.3365, 3: 1.4351, 4: 7.6249, 5: 3.7132, 6: 2.7082, 7: 18.6376, 8: 46.0, 9: 37.0, 10: 1017.6538, 11: 1015.2582, 12: 4.4372, 13: 4.5032, 14: 10.2, 15: 15.1, 16: 1.0, 64: 1.0, 79: 1.0, 94: 1.0, 109: 1.0, 110: 2009.0, 111: 9.0, 112: 1.0})),
 Row(prediction=0.0, label=0, features=SparseVector

### Step 10.5 Evaluate the results
#### Step 10.5.1 
Import MulticlassClassificationEvaluator from <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.evaluation">pyspark.ml.evaluation</a> module.

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


#### Step 10.5.2
Using <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.MulticlassClassificationEvaluator">pyspark.ml.evaluation.MulticlassClassificationEvaluator</a> to evaluate the predictions from Step 10.4.4.
Print the evaluation results.

In [40]:
acc = MulticlassClassificationEvaluator(labelCol='label',metricName='accuracy').evaluate(pred)
print('accuracy: ' + str(acc))

accuracy: 0.8435519459180321


## Step 11: Pipeline (10 marks)

### Step 11.1
Import <a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.Pipeline">Pipeline</a> from 
<a href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml">pyspark.ml</a> package.

In [0]:
from pyspark.ml import Pipeline


### Step 11.2

Rewrite Steps 9.2 to 10.5:
<ol>
    <li>Configure an ML pipeline which consists of the StringIndexer, OneHotEncoderEstimator, VectorAssembler, LogisticRegression</li>
    <li>Fit the pipeline to the training data.</li>
    <li>Make predictions on the testing data.</li>
    <li>Evaluate the prediction results as in step 10.5.</li>
</ol>

Note: 
<ul>
    <li>It is fine to have the steps 9.2 to 10.5 slightly rearranged in this step.</li>
    <li>Hence, the evaluation results may be slightly different.</li>
</ul>

In [42]:
cat_list = ['Location', 'WindGustDir', 'WindDir9am', 'WindDir3pm', 'RainToday']
cat_list2 = [item+'_idx' for item in cat_list]
cat_list3 = [item+'_vec' for item in cat_list]
feature_list = ['MinTemp','MaxTemp','Rainfall','Evaporation','Sunshine','WindGustSpeed','WindSpeed9am',
                'WindSpeed3pm','Humidity9am','Humidity3pm','Pressure9am','Pressure3pm','Cloud9am','Cloud3pm',
                'Temp9am','Temp3pm','Location_vec','WindGustDir_vec','WindDir9am_vec','WindDir3pm_vec',
                'RainToday_vec','Year','Month','Day']
indexer_list = []

# split the data
data_pip = data_pip.withColumn('label', when(data_pip['RainTomorrow'] == 'No', lit(0)).otherwise(lit(1)))
train_pip, test_pip = data_pip.randomSplit([0.7,0.3],0)

# 
# construct pipline:
#

# String Indexer
for c in cat_list:
  indexer_list.append( StringIndexer(inputCol=c, outputCol=str(c+'_idx')) )

# One Hot Encoder
encoder = OneHotEncoderEstimator(inputCols=cat_list2,outputCols=cat_list3)

# Vector Assembler
vecAssembler = VectorAssembler(inputCols=feature_list, outputCol="features",handleInvalid="keep")

# Logistic Regressor
clf = LogisticRegression(featuresCol='features',labelCol='label')

pipe = Pipeline(stages = indexer_list + [encoder,vecAssembler,clf])

# fit the pipeline
pipe_model = pipe.fit(train_pip)

# make predictions
pipe_pred = pipe_model.transform(test_pip)

# evaluate the prediction
pipe_acc = MulticlassClassificationEvaluator(labelCol='label',metricName='accuracy').evaluate(pipe_pred)
print('accuracy: ' + str(pipe_acc))

accuracy: 0.8463217689310361


## Step 12 Submission
Submit your jupyter notebook (.ipynb) to Canvas.

<center>The end of HW2</center>