<a href="https://colab.research.google.com/github/adamskyy/labs/blob/main/PWr_12_01_2023_Regression_Pyspark_workshop.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install Spark


In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz 

In [None]:
!tar xf spark-3.3.1-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [None]:
import findspark
findspark.init()

# Create session

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as f
from pyspark.sql import Window

spark = SparkSession.builder \
    .appName('DataFrame') \
    .master('local[*]') \
    .getOrCreate()

# Download data

In [None]:
!wget "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data" 

Task: 
Using adults data, predict if the yearly income will be above 50K $.

Label: earnings (can have two values <=50K, >50K).
Features: other columns. 

It is binary classification task and we can use for linear regressison to solve it. 

In [None]:
col_names = ["age", "workclass", "fnlwgt", "education", "education-num","marital-status", "occupation", 
             "relationship", "race", "sex", "capital-gain", "capital-loss", "hours-per-week", 
             "native-country", "earnings"]

df = spark.read.csv("adult.data", header=False, inferSchema=True, ignoreLeadingWhiteSpace=True)
df = df.select(*[f.col(old).alias(new) for old, new in zip(df.columns, col_names)]).drop("fnlwgt").dropna("any")
df.show(3,)

In [None]:
df.dropDuplicates(['earnings']).show(3)

# Divide data to train and test sets.


In [None]:
(trainingData, testData) = df.randomSplit([0.7, 0.3], seed = 100)
print(trainingData.count())
print(testData.count())

# ML pipeline creation

For data processing and model training we will use Spark Piepline API. 
Each transformation on dataset needs to be added to pipeline stages list. 

Steps:
1. Replace categorical columns with discrete encodings lLabels encoding). We will use **StringIndexer** https://spark.apache.org/docs/1.5.1/ml-features.html 

```
  #  id | category | category_index
  # ----|----------|---------------
  #  0  | a        | 0.0
  #  1  | b        | 2.0
  #  2  | c        | 1.0
  #  3  | a        | 0.0
  #  4  | a        | 0.0
  #  5  | c        | 1.0
```

2. One hot encode discrete values from step 1. We will use **OneHotEncoder** https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.feature.OneHotEncoder.html

```
  #  id | category       | category_index | category_a | category_b
  # ----|----------------|----------------|------------|-----------
  #  0  | a              | 0.0            |1           |0
  #  1  | b              | 2.0            |0           |1
  #  2  | c              | 1.0            |0           |0
  #  3  | a              | 0.0            |1           |0
  #  4  | a              | 0.0            |1           |0
  #  5  | c              | 1.0            |0           |0
```
When handleInvalid is configured to ‘keep’, an extra “category” indicating invalid values is added as last category.

```
  #  id | category       | category_index | category_a | category_b | category_c
  # ----|----------------|----------------|------------|------------|-----------
  #  0  | a              | 0.0            |1           |0           |0
  #  1  | b              | 2.0            |0           |1           |0
  #  2  | c              | 1.0            |0           |0           |1
  #  3  | a              | 0.0            |1           |0           |0
  #  4  | a              | 0.0            |1           |0           |0
  #  5  | c              | 1.0            |0           |0           |1
```

3. Continuous array variables stay unchanged .
4. All features need to be collected in one vector, that will be used later on for model training. We will use **VectorAssembler** https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html?highlight=vectorassembler 

```
  #  id | category       | category_index | category_ohe_vector
  # ----|----------------|----------------|---------------------
  #  0  | a              | 0.0            |[1, 0, 0]
  #  1  | b              | 2.0            |[0, 1, 0]
  #  2  | c              | 1.0            |[0, 0, 1]
  #  3  | a              | 0.0            |[1, 0, 0]
  #  4  | a              | 0.0            |[1, 0, 0]
  #  5  | c              | 1.0            |[0, 0, 1]
```

## Data processing

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

categorical_col_names = ["workclass", "education", "marital-status", 
             "occupation", "relationship", "race", "sex",
             "native-country"]

pipeline_steps = []

for categorical_col_name in categorical_col_names:
  indexer = StringIndexer(inputCol=categorical_col_name, outputCol=categorical_col_name+"_index", handleInvalid='keep') 
  encoder = OneHotEncoder(inputCol=categorical_col_name+"_index", outputCol=categorical_col_name+"_class")
  pipeline_steps += [indexer, encoder]


indexer_label = StringIndexer(inputCol="earnings", outputCol="earnings_index") 
pipeline_steps += [indexer_label]


## Features vector creation

We do not add earnings_index to vector of features since it is our label.

In [None]:
from pyspark.ml.feature import VectorAssembler

numeric_col_names = ["age", "education-num", "capital-gain", "capital-loss", "hours-per-week"]
assembler_inputs = list(map(lambda c: c + "_class", categorical_col_names))+ numeric_col_names
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
pipeline_steps += [assembler]

In [None]:
# dataset.select('relationship').distinct().collect()

## Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression
regressor = LogisticRegression(labelCol="earnings_index", featuresCol="features", maxIter=50)
pipeline_steps += [regressor]

## Fit pipeline on train data and do prediction on test data

Transform is like scikit predict.

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=pipeline_steps)

model = pipeline.fit(trainingData)
predictions = model.transform(testData)

In [None]:
predictions.printSchema()

In [None]:
print('Coefficients:', model.stages[-1].coefficients)
print('Model Intercept: ', model.stages[-1].intercept)

In [None]:
selected = predictions.select("earnings_index", "prediction", "probability", "age", "occupation")
selected.show(10)

# Evaluate model

## **BinaryClassificationEvaluator**

https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.html


In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol='earnings_index')

for line in evaluator.explainParams().split('\n'):
 print(line)
 
evaluator.evaluate(predictions)

In [None]:
evaluator.getMetricName()

## RegressionEvaluator

The RMSE is the square root of the variance of the residuals. It indicates the absolute fit of the model to the data–how close the observed data points are to the model’s predicted values. Whereas R-squared is a relative measure of fit, RMSE is an absolute measure of fit. As the square root of a variance, RMSE can be interpreted as the standard deviation of the unexplained variance, and has the useful property of being in the same units as the response variable. Lower values of RMSE indicate better fit. RMSE is a good measure of how accurately the model predicts the response, and it is the most important criterion for fit if the main purpose of the model is prediction.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol='earnings_index')

for line in evaluator.explainParams().split('\n'):
 print(line)

print('RMSE:', evaluator.evaluate(predictions, {evaluator.metricName: "rmse"}))
print('R-squared:', evaluator.evaluate(predictions, {evaluator.metricName: "r2"}))
print('mae:', evaluator.evaluate(predictions, {evaluator.metricName: "mae"}))

# Hyper Parameters Tuning

In [None]:
for line in regressor.explainParams().split('\n'):
 print(line)

Create ParamGrid for Cross Validation (In Sklearn gridsearchCV)

We use 5-fold CrossValidator


In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(regressor.regParam, [0.01, 0.5, 2.0])
             .addGrid(regressor.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(regressor.maxIter, [1, 5, 10, 100])
             .build())

cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)


Run cross validations, may take time


In [None]:
cvModel = cv.fit(trainingData) # usually takes up to 12 minutes

Use test set here so we can measure the accuracy of our model on new data


In [None]:
predictions = cvModel.transform(testData)

cvModel always uses the best model found from the Cross Validation

In [None]:
print('RMSE:', evaluator.evaluate(predictions, {evaluator.metricName: "rmse"}))
print('R-squared:', evaluator.evaluate(predictions, {evaluator.metricName: "r2"}))
print('mae:', evaluator.evaluate(predictions, {evaluator.metricName: "mae"}))

In [None]:
print('Model Intercept: ', cvModel.bestModel.stages[-1].intercept)

Check best model's predictions and probabilities of each prediction class


In [None]:
selected = predictions.select("earnings_index", "prediction", "probability", "age", "occupation")
selected.show(10)