### Using PySpark in Jupyter Notebook

In [3]:
import findspark
findspark.init()
findspark.find()

'C:\\Users\\trang\\Downloads\\spark-3.1.1-bin-hadoop2.7\\spark-3.1.1-bin-hadoop2.7'

Import PySpark and create the SparkSession

In [4]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

### Step 1: Importing data and exploring the features

In [7]:
df = spark.read.load("C:/Users/trang/Downloads/weatherAUS.csv",format="csv",sep=",",inferSchema="true",header="true")

In [8]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- MinTemp: string (nullable = true)
 |-- MaxTemp: string (nullable = true)
 |-- Rainfall: string (nullable = true)
 |-- Evaporation: string (nullable = true)
 |-- Sunshine: string (nullable = true)
 |-- WindGustDir: string (nullable = true)
 |-- WindGustSpeed: string (nullable = true)
 |-- WindDir9am: string (nullable = true)
 |-- WindDir3pm: string (nullable = true)
 |-- WindSpeed9am: string (nullable = true)
 |-- WindSpeed3pm: string (nullable = true)
 |-- Humidity9am: string (nullable = true)
 |-- Humidity3pm: string (nullable = true)
 |-- Pressure9am: string (nullable = true)
 |-- Pressure3pm: string (nullable = true)
 |-- Cloud9am: string (nullable = true)
 |-- Cloud3pm: string (nullable = true)
 |-- Temp9am: string (nullable = true)
 |-- Temp3pm: string (nullable = true)
 |-- RainToday: string (nullable = true)
 |-- RISK_MM: double (nullable = true)
 |-- RainTomorrow: string (nullable = true)



### Step 2: Convert the data types of features

Import data types from pyspark.sql.types

In [9]:
from pyspark.sql.types import *

In [10]:
features =["MinTemp","MaxTemp","Rainfall","Evaporation","Sunshine",
          "WindGustSpeed","WindSpeed9am","WindSpeed3pm",
          "Humidity9am","Humidity3pm","Pressure9am","Pressure3pm",
          "Cloud9am","Cloud3pm","Temp9am","Temp3pm"]
for each_feature in features:
    df =df.withColumn(each_feature,df[each_feature].cast(FloatType()))
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- MinTemp: float (nullable = true)
 |-- MaxTemp: float (nullable = true)
 |-- Rainfall: float (nullable = true)
 |-- Evaporation: float (nullable = true)
 |-- Sunshine: float (nullable = true)
 |-- WindGustDir: string (nullable = true)
 |-- WindGustSpeed: float (nullable = true)
 |-- WindDir9am: string (nullable = true)
 |-- WindDir3pm: string (nullable = true)
 |-- WindSpeed9am: float (nullable = true)
 |-- WindSpeed3pm: float (nullable = true)
 |-- Humidity9am: float (nullable = true)
 |-- Humidity3pm: float (nullable = true)
 |-- Pressure9am: float (nullable = true)
 |-- Pressure3pm: float (nullable = true)
 |-- Cloud9am: float (nullable = true)
 |-- Cloud3pm: float (nullable = true)
 |-- Temp9am: float (nullable = true)
 |-- Temp3pm: float (nullable = true)
 |-- RainToday: string (nullable = true)
 |-- RISK_MM: double (nullable = true)
 |-- RainTomorrow: string (nullable = true)



### Step 3: Exploring missing value

In [11]:
for col in df.columns:
    count = df.filter(df[col].isNull()).count()
    if count:
        print (col,"\t","with mull values:",count)

MinTemp 	 with mull values: 637
MaxTemp 	 with mull values: 322
Rainfall 	 with mull values: 1406
Evaporation 	 with mull values: 60843
Sunshine 	 with mull values: 67816
WindGustSpeed 	 with mull values: 9270
WindSpeed9am 	 with mull values: 1348
WindSpeed3pm 	 with mull values: 2630
Humidity9am 	 with mull values: 1774
Humidity3pm 	 with mull values: 3610
Pressure9am 	 with mull values: 14014
Pressure3pm 	 with mull values: 13981
Cloud9am 	 with mull values: 53657
Cloud3pm 	 with mull values: 57094
Temp9am 	 with mull values: 904
Temp3pm 	 with mull values: 2726


In [12]:
for col in df.columns:
    count = df.filter(df[col]=="NA").count()
    if count:
        print (col, "\t", "with null values:", count)

WindGustDir 	 with null values: 9330
WindDir9am 	 with null values: 10013
WindDir3pm 	 with null values: 3778
RainToday 	 with null values: 1406


### Step 4: Drop the feature, RISK_MM

In [13]:
df = df.drop('RISK_MM')

### Step 5: Processing the data feature

In [14]:
from pyspark.sql.functions import to_date, year, month, dayofmonth

Extract the year, month, day attributes of the converted Date attribute using year(), month() &
dayofmonth() and create the corresponding new features Year, Month and Day.

In [15]:
df = df.withColumn("date", to_date(df['Date']))
df = df.withColumn("year", year(df['date']))
df = df.withColumn("month", month(df['date']))
df = df.withColumn("day", dayofmonth(df['date']))
df = df.drop('date')

### Step 6: Handling missing values of the categorical features

In [16]:
df.groupBy("WindGustDir").count().show()
df.groupBy("WindDir9am").count().show()
df.groupBy("WindDir3pm").count().show()
df.groupBy("RainToday").count().show()

+-----------+-----+
|WindGustDir|count|
+-----------+-----+
|        SSE| 8993|
|         SW| 8797|
|         NW| 8003|
|         NA| 9330|
|          E| 9071|
|        WSW| 8901|
|        ENE| 7992|
|         NE| 7060|
|        NNW| 6561|
|          N| 9033|
|        SSW| 8610|
|          W| 9780|
|          S| 8949|
|         SE| 9309|
|        WNW| 8066|
|        NNE| 6433|
|        ESE| 7305|
+-----------+-----+

+----------+-----+
|WindDir9am|count|
+----------+-----+
|       SSE| 8966|
|        SW| 8237|
|        NW| 8552|
|        NA|10013|
|         E| 9024|
|       WSW| 6843|
|       ENE| 7735|
|        NE| 7527|
|       NNW| 7840|
|         N|11393|
|       SSW| 7448|
|         W| 8260|
|         S| 8493|
|        SE| 9162|
|       WNW| 7194|
|       NNE| 7948|
|       ESE| 7558|
+----------+-----+

+----------+-----+
|WindDir3pm|count|
+----------+-----+
|       SSE| 9142|
|        NW| 8468|
|        SW| 9182|
|        NA| 3778|
|         E| 8342|
|       WSW| 9329|
|       

For each of the categorical features with missing values (“NA”), replace “NA” with the corresponding most frequenct value of each of the features using when(), list(), otherwise().

In [20]:
from pyspark.sql.functions import when, lit
df = df.withColumn("WindGustDir", when(df["WindGustDir"]=="NA", lit("W")).otherwise(df["WindGustDir"]))
df = df.withColumn("WindDir9am", when(df["WindDir9am"]=="NA", lit("N")).otherwise(df["WindDir9am"]))
df = df.withColumn("WindDir3pm", when(df["WindDir3pm"]=="NA", lit("SE")).otherwise(df["WindDir3pm"]))
df = df.withColumn("RainToday", when(df["RainToday"]=="NA", lit("No")).otherwise(df["RainToday"]))

### Step 7: Handling missing values of the numerical features

In [21]:
numeric_feats = [item[0] for item in df.dtypes if item[1].startswith('float')]
print(numeric_feats)

['MinTemp', 'MaxTemp', 'Rainfall', 'Evaporation', 'Sunshine', 'WindGustSpeed', 'WindSpeed9am', 'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Pressure9am', 'Pressure3pm', 'Cloud9am', 'Cloud3pm', 'Temp9am', 'Temp3pm']


Using pyspark.ml.feature.imputer to fill in the missing values of the numerical features with
mean.

In [22]:
from pyspark.ml.feature import Imputer
imputer = Imputer(strategy='mean', inputCols=numeric_feats,outputCols=numeric_feats)
df = imputer.fit(df).transform(df)

### Step 8: Transform the features

In [23]:
from pyspark.sql.functions import skewness
skewed_feats = []
for col in numeric_feats:
    s = df.select(skewness(df[col]).alias(col)).collect()
    skew_value = float(s[0][col])
    if (skew_value > 0.75):
        print (col, "\t", skew_value)
        skewed_feats.append(col)
print(skewed_feats)

Rainfall 	 9.93720720656584
Evaporation 	 4.9535525141524825
WindGustSpeed 	 0.9042674361671061
WindSpeed9am 	 0.7791876023517204
['Rainfall', 'Evaporation', 'WindGustSpeed', 'WindSpeed9am']


Apply log transformation those features with skewness values larger than 0.75 using log1p() from
pyspark.sql.functions module

In [25]:
from pyspark.sql.functions import log1p
for feat in skewed_feats:
    df = df.withColumn(feat, log1p(df[feat]))

### Step 9: Converting categorial features

In [26]:
categorical_feats = [item[0] for item in df.dtypes if item[1].startswith('string')]
categorical_feats.remove('RainTomorrow')
print (categorical_feats)

['Location', 'WindGustDir', 'WindDir9am', 'WindDir3pm', 'RainToday']


Convert categorical features into dummy/indicator features

In [27]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
td = df
for feat in categorical_feats:
    stringIndexer = StringIndexer(inputCol=feat, outputCol=feat+"_indexed")
    model = stringIndexer.fit(td)
    td = model.transform(td)

In [28]:
encoder = OneHotEncoder(inputCols=[feat+"_indexed" for feat in categorical_feats], outputCols=[feat+"_ohe" for feat in categorical_feats])
model = encoder.fit(td)
encoded = model.transform(td)

### Step 10: Training the Regression model

In [29]:
from pyspark.ml.feature import VectorAssembler
updated_feats = [feat+"_ohe" for feat in categorical_feats]
updated_feats += numeric_feats
assembler = VectorAssembler(inputCols=updated_feats, outputCol='features')
output = assembler.transform(encoded)
output.show()

+--------+-------+-------+-------------------+------------------+--------+-----------+------------------+----------+----------+------------------+------------+-----------+-----------+-----------+-----------+---------+---------+-------+-------+---------+------------+----+-----+---+----------------+-------------------+------------------+------------------+-----------------+-------------+---------------+---------------+---------------+---------------+--------------------+
|Location|MinTemp|MaxTemp|           Rainfall|       Evaporation|Sunshine|WindGustDir|     WindGustSpeed|WindDir9am|WindDir3pm|      WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm| Cloud9am| Cloud3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|year|month|day|Location_indexed|WindGustDir_indexed|WindDir9am_indexed|WindDir3pm_indexed|RainToday_indexed|RainToday_ohe| WindDir3pm_ohe| WindDir9am_ohe|WindGustDir_ohe|   Location_ohe|            features|
+--------+-------+-------+-------------------+------

Transform the target variable RainTomorrow.

In [31]:
output = output.withColumn('RainTomorrow', when(df['RainTomorrow']=="Yes", 1).otherwise(0))
output.select('RainTomorrow').show()

+------------+
|RainTomorrow|
+------------+
|           0|
|           0|
|           0|
|           0|
|           0|
|           0|
|           0|
|           0|
|           1|
|           0|
|           1|
|           1|
|           1|
|           0|
|           0|
|           1|
|           1|
|           0|
|           0|
|           0|
+------------+
only showing top 20 rows



Split data into training data and testing data

In [32]:
(trainingData, testData) = output.randomSplit([0.7, 0.3])

### Build logistic regression model

In [33]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='features',labelCol='RainTomorrow')
lrmodel = lr.fit(trainingData)

In [34]:
predictions = lrmodel.transform(testData)

In [35]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
labelCol="RainTomorrow", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % accuracy)

Accuracy = 0.846365


### Step 11: Pipeline

In [36]:
from pyspark.ml import Pipeline
td = df
td = td.withColumn('RainTomorrow', when(df['RainTomorrow']=="Yes", 1).otherwise(0))
(trainingData, testData) = td.randomSplit([0.7, 0.3])

In [46]:
ML_stages = []
for feat in categorical_feats:
# Todo 1: Using StringIndexer to convert categorical values into category indices for each of the categorical features
# add a line of code here
    stringIndexer = StringIndexer(inputCol=feat, outputCol=feat+"_indexed")
    ML_stages.append(stringIndexer)
    
# Todo 2: Using OneHotEncoder to map a column of category indices to a column of binary vectors for the categorical features.
# add a line of code here
encoder = OneHotEncoder(inputCols=[feat+"_indexed" for feat in categorical_feats], outputCols=[feat+"_ohe" for feat in categorical_feats])

# Todo 3: Using VectorAssembler, a feature transformer, to merge the columns created in step 9.2.3 and the columns of numerical features into a vector column.
# add a line of code here
from pyspark.ml.feature import VectorAssembler
updated_feats = [feat+"_ohe" for feat in categorical_feats]
updated_feats += numeric_feats
assembler = VectorAssembler(inputCols=updated_feats, outputCol='features')

# Todo 4: Initialize a Logistic Regression model by LogisticRegression() function using the feature vector generated in Step 10.1.2.
# add a line of code here
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='features',labelCol='RainTomorrow')

# Todo 5: Create a pipeline with the stringIndexers, one hot encoder, vector assembler, logistic regression model
# add a line of code here
ML_stages.extend([encoder, assembler,lr])
ML_pipeline = Pipeline(stages = ML_stages)

# Todo 6: Fit the pipeline to training dataset, trainingData
# add a line of code here
lrmodel = ML_pipeline.fit(trainingData)

# Todo 7: Make predictions on testing dataset, testData, and put the predictions results into the variable, prediction
# add a line of code here
predictions = lrmodel.transform(testData)

evaluator = MulticlassClassificationEvaluator(
labelCol="RainTomorrow", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % accuracy)

Accuracy = 0.849754
