# Machine Learning with Pyspark

In [0]:
# Fundamentals
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType
import pyspark.sql.functions as f
from pyspark.sql.functions import *
import numpy as np
import pandas as pd
import time

from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder,  StandardScaler, MinMaxScaler

# Correlation
from pyspark.ml.stat import Correlation

# PCA
from pyspark.ml.feature import PCA
from pyspark.ml.feature import StandardScaler

# Linear Regression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Logistic Regression and classification evaluators
from pyspark.ml.classification import LogisticRegression # default index rmse
from pyspark.ml.evaluation import BinaryClassificationEvaluator # default metrics area under roc, does not support accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator # default metrics accuracy (also good for binary classification)

# Linear SVC
from pyspark.ml.classification import LinearSVC

# KMeans
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# initialise spark session
from pyspark.sql import SQLContext

# Getting Started

#### Example Data 

This data was curated from Kaggle.com and serves as an example for machine learning in PySpark.
The data is of California house prices and traits aggregated on the census block level.
Below we will explore the data dictionary and the machine learning process to predict `median_house_value` as a regression problem. 

Data: 
https://www.kaggle.com/camnugent/california-housing-prices

Inspiration: 
https://www.analyticsvidhya.com/blog/2019/11/build-machine-learning-pipelines-pyspark/

In [0]:
# Read and convert to data PySpark DF
dat = spark.createDataFrame(pd.read_csv("https://raw.githubusercontent.com/BYUI451/guide_machine_learning/main/housing.csv"))

# In the case of this data we would benefit from a unique identifier. 
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window

dat = dat.withColumn("index",row_number().over(Window.orderBy(monotonically_increasing_id())))
dat = dat.select('index', 'longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households','median_income', 'median_house_value', 'ocean_proximity')

# Number of records with missing 'total_bedrooms' values
dat.filter('total_bedrooms IS NULL').count()

# Preprocessing Data

Luckily, the data is nearly machine learning friendly, with the exception of the `ocean_proximity` variable. In our class project we will eventually have to one-hot-encode categorical variables and strings as well, so explaining the process here will translate well.

In [0]:
# Counting Number of Nulls in Each Column
dat.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dat.columns]).show()

In [0]:
# Determining Numeric and Categorical Columns
numeric_columns = list()
categorical_column = list()

for col in dat.columns:
    if dat.select(col).dtypes[0][1] != "string":
        numeric_columns.append(col)
    else:
        categorical_column.append(col)
        
print("Numeric columns",numeric_columns)
print("categorical columns",categorical_column)

#### Replacing Missing Values

In [0]:
from pyspark.ml.feature import Imputer

# Can change it to Mean, Mode, and Median
imputer = Imputer(inputCols= numeric_columns , outputCols= numeric_columns).setStrategy("mean")
dat = imputer.fit(dat).transform(dat)
dat.show()

#### Removing Duplicates

In [0]:
# Counting Duplicates
dat.groupBy(dat.columns).count().where(f.col('count') > 1).select(f.sum('count')).show()

# If you want to drop duplicates by subset
# dat.dropDuplicates([])

#### Removing Null Values

In [0]:
# Remove records with missing 'total_bedrooms' values
dat_valid = dat.filter('total_bedrooms IS NOT NULL')
print(dat_valid.count())

# Remove records with missing values in any column and get the number of remaining rows
dat = dat_valid.dropna()
print(dat.count())

# Dimensions and Description
dat.limit(5).display()

index,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
1,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY
2,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY
3,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY
4,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY
5,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY


In [0]:
# Getting a quick summary statistic for the different variables
dat.drop('ocean_proximity').describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
index,20640,10320.5,5958.399113856003,1,20640
longitude,20640,-119.56970445736148,2.003531723502584,-124.35,-114.31
latitude,20640,35.6318614341087,2.135952397457101,32.54,41.95
housing_median_age,20640,28.639486434108527,12.58555761211163,1.0,52.0
total_rooms,20640,2635.7630813953488,2181.6152515827944,2.0,39320.0
total_bedrooms,20640,537.8705525375639,419.26659232552385,1.0,6445.0
population,20640,1425.4767441860465,1132.46212176534,3.0,35682.0
households,20640,499.5396802325581,382.3297528316098,1.0,6082.0
median_income,20640,3.8706710029070246,1.899821717945263,0.4999,15.0001


#### Categorical to Indexed Numerical Values

In [0]:
# https://towardsdev.com/how-to-write-pyspark-one-hot-encoding-results-to-an-interpretable-csv-file-626ecb973962

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

# Create an indexer
indexer = StringIndexer(inputCol='ocean_proximity', outputCol='ocean_prox_idx')

# indexer identifies categories in the data
indexer_model = indexer.fit(dat)

# Indexer creates a new column with numeric index values
dat_indexed = indexer_model.transform(dat)

# Check the DataFrame
print(dat_indexed.show())

# The closer to 0 an index is, the more frequent that category is compared to the others
print(dat_indexed.groupby('ocean_proximity').count().show())


#### Vector Assembling

'A vector assembler combines a given list of columns into a single vector column.
This is typically used at the end of the data exploration and pre-processing steps. At this stage, we usually work with a few raw or transformed features that can be used to train our model.
The Vector Assembler converts them into a single feature column in order to train the machine learning model (such as Logistic Regression). It accepts numeric, boolean and vector type columns.'

https://www.analyticsvidhya.com/blog/2019/11/build-machine-learning-pipelines-pyspark/

In [0]:
# Create an assembler object
assembler = VectorAssembler(inputCols=[
  'index',
  'longitude',
  'latitude',
  'housing_median_age',
  'total_rooms',
  'total_bedrooms',
  'population',
  'households',
  'median_income',
  'ocean_prox_idx'
], outputCol='features')

# Consolidate predictor columns
dat_assembled = assembler.transform(dat_indexed)
dat_assembled.show()

# Check the resulting column
dat_assembled.select(['features', 'median_house_value'])
# dat_assembled.show()

d
#### One Hot Encoding

In [0]:
# Create an instance of the one hot encoder
onehot = OneHotEncoder(inputCols=['ocean_prox_idx'], outputCols=['type_dummy'])

# Apply the on hot encoder to the dataset
onehot = onehot.fit(dat_assembled)
dat = onehot.transform(dat_assembled)

# Check the results
dat.select('ocean_proximity', 'ocean_prox_idx', 'type_dummy').distinct().sort('ocean_prox_idx').show()

dat.show()

#### Standardization

Centering and Scaling happen independently on each feature by computing the relevant statistics on the samples in the training set. 
- withStd: True by default. Scales the data to unit standard deviation.
- withMean: False by default. Centers the data with mean before scaling.
The model can then transform a Vector column in a dataset to have unit standard deviation and/or zero mean features.

In [0]:
# Center and normalise column-wise
scaler = StandardScaler(inputCol="features", outputCol="Scaled_Features", withStd=True, withMean=False)
scaledData = scaler.fit(dat).transform(dat)
scaledData.show()


minmax = MinMaxScaler().setMin(0).setMax(1).setInputCol("features").setOutputCol("MinMax_Scaled_Feat")
minmaxData = minmax.fit(dat).transform(dat)
minmaxData.show()

# Modeling

### What Spark ML offers for *regression problems*: 

https://spark.apache.org/docs/latest/ml-classification-regression.html#regression

- Linear Regression
- Generalized linear regression and Families
- Decision tree regression
- Random forest regression
- Gradient-boosted tree regression
- Survival regression
- Isotonic regression
- Factorization machines regressor

### Spark ML also contains models and examples for *classification problems*:

https://spark.apache.org/docs/latest/ml-classification-regression.html#classification

- Logistic regression
- - Binomial logistic regression
- - Multinomial logistic regression
- Decision tree classifier
- Random forest classifier
- Gradient-boosted tree classifier
- Multilayer perceptron classifier
- Linear Support Vector Machine
- One-vs-Rest classifier (a.k.a. One-vs-All)
- Naive Bayes
- Factorization machines classifier

##  Example: Simple Linear Regression

### Target == `median_house_value`

#### Prepping / Splitting the Data

In [0]:
# Check the resulting column for target
dat_assembled = scaledData.select(['Scaled_Features', 'median_house_value']) # <— choosing our target and feature vectors
dat_assembled.limit(5).show()

# Test train split 
splits = dat_assembled.randomSplit([0.7, 0.3])

train_df = splits[0]
test_df = splits[1]

train_df.limit(5).show()
test_df.limit(5).show()
print('Training Percentage: ' + str(train_df.count() / dat_assembled.count()))
print('Testing Percentage: ' + str(test_df.count() / dat_assembled.count()))


#### Running a Linear Regression

Parameter tuning and targeting

In [0]:
# We are performing a linear regression to make predictions on a districts median house value. 

lr = LinearRegression(featuresCol = 'Scaled_Features', labelCol='median_house_value', maxIter=10, regParam=0.3, elasticNetParam=0.8) # <- choosing the target
lr_model = lr.fit(train_df)

#### Evaluating the Model

In [0]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

## Decision Tree Regressor

### Target == `total_bedrooms` (continuous)

<br>

1. Build a model
2. Explore parameters and tuning
3. Explore a way to explain through graphs or metrics
4. Share what you found

In [0]:
from pyspark.ml.regression import DecisionTreeRegressor

# Check the resulting column for target
dat_assembled = scaledData.select(['Scaled_Features', 'total_bedrooms']) # <— choosing our target and feature vectors
dat_assembled.limit(5).show()

# Test train split 
splits = dat_assembled.randomSplit([0.7, 0.3])

train_df = splits[0]
test_df = splits[1]

train_df.limit(5).show()
test_df.limit(5).show()

# Train Decision Tree Regressor
dt = DecisionTreeRegressor(featuresCol ='Scaled_Features', labelCol = 'total_bedrooms')
dt_model = dt.fit(train_df)

# Predict
dt_predictions = dt_model.transform(test_df)
dt_predictions.select("total_bedrooms", "prediction", "Scaled_Features").show(5)

# Evaluate
dt_evaluator = RegressionEvaluator(
    labelCol="total_bedrooms", predictionCol="prediction", metricName="rmse")

rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

dt_model.featureImportances


## Random Forest Classifier

### Target == `ocean_proximity` (multi-class)

<br>

1. Build a model
2. Explore parameters and tuning
3. Explore a way to explain through graphs or metrics
4. Share what you found

https://towardsdatascience.com/a-guide-to-exploit-random-forest-classifier-in-pyspark-46d6999cb5db

In [0]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

# Check the resulting column for target
dat_assembled = scaledData.select(['Scaled_Features', 'ocean_prox_idx']) # <— choosing our target and feature vectors
dat_assembled.limit(5).show()

# Test train split 
splits = dat_assembled.randomSplit([0.7, 0.3])

train_df = splits[0]
test_df = splits[1]

train_df.limit(5).show()
test_df.limit(5).show()

# Train a RandomForest model.
rfc = RandomForestClassifier(featuresCol ='Scaled_Features', labelCol = 'ocean_prox_idx', numTrees=10)
model = rfc.fit(train_df)

#Predict
predictions = model.transform(test_df)
predictions.select("ocean_prox_idx", "prediction", "Scaled_Features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="ocean_prox_idx", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % accuracy)

# Confusion matrix
preds_and_labels = predictions.select(["ocean_prox_idx", "prediction"]).withColumn('ocean_prox_idx', f.col('ocean_prox_idx').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(["ocean_prox_idx", "prediction"])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())
