<a id='tablecontents'></a>

# PySpark for Machine Learning - Foundations
<h5>2023, Andrea Paviglianiti</h5>

<hr>

## Table of Contents:

- [Set Spark Environment](#section1)
- [Linear Regression](#section2)
    - [Import Modules](#s1p1)
    - [ML from .txt data](#s1p2)
    - [ML from .csv data](#s1p3)
    - [Performance Evaluation](#s1p4)
    - [Save predictions to a .csv file]()
- [Decision Trees](#section3)
- [Movies Recommendations](#section4)
- [Close Spark Session](#section5)
<br>
<hr>

<a id='section1'></a>

## Set Spark Environment

Popular ML libraries for Spark engine are:

- mllib
- spark-sklearn

However, <b>mmlib</b> tends to outperform <b>scikit-learn</b> because it was specifically designed to run on Spark.

#### ML Capabilities for mllib:

- Feature estraction
- Statistics
- Linear Regression
- Logistic Regression
- Support Vector Machines
- Naive Bayes Classifier
- Decision Trees
- K-Means
- PCA
- Singular Value Decomposition
- Recommendations using Alternating Least Squares


#### Summary of use cases:

- <b>Predicting values</b>: is a customer going to churn?
- <b>Classification</b>: how this document will be classified? / is this a dog or a cat?
- <b>Personalized Recommendations</b>: what is the best suitable movie for our audience?

In [1]:
import numpy as np
import os

from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import *

In [2]:
#Create a spark session
spark = SparkSession.builder.master('local[*]').appName("SparkML").getOrCreate()

def check_spark_version():
    #Check version
    this_version = spark.version
    if int(this_version[0])>=3:
        print(f'mmlib deprecated for spark version {this_version}: use new version of MLLib APIs.')
    else:
        print(f'Spark Version {this_version} allows for old MLLib APIs.')
    return

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/23 17:29:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/06/23 17:29:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/06/23 17:29:09 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/06/23 17:29:09 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/06/23 17:29:09 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.


In [3]:
check_spark_version()

mmlib deprecated for spark version 3.2.1: use new version of MLLib APIs.


In [4]:
spark

<a id='section2'></a>
## Linear Regression

<a id='s1p1'></a>

### Import Modules

In [5]:
#Import Linear Regression module
from pyspark.ml.regression import LinearRegression as spark_LR
from pyspark.ml.linalg import Vectors

<a id='s1p2'></a>

### ML from .txt data

In [6]:
os.listdir('input')

['regression.csv',
 'duplicates.csv',
 'regression.txt',
 'original.csv',
 '.ipynb_checkpoints']

In [7]:
#Select Target File
reg_file = 'input/regression.txt'

<br>

In PySpark, we use vectors to containerize our features in order to have our predictions.

- A <b>vector</b> contains all the features that make the prediction possible. Think of vector as `X` or `X1`
- A <b>label</b> is, instead, the value to be bredicted. Think of label as `y`

<br>

<u>A vector can be either <i>dense</i> or <i>sparse</i>:</u>

- it is <b>dense</b> when all variable values are stored, including zeros; 
- it is <b>sparse</b> when only non-zero variable values are stored, and it is more memory efficient

<br>

For `pyspark.ml.linalg` we have the `DenseVector` and `SparseVector`.

In [8]:
#Target file
inputLines = spark.sparkContext.textFile(reg_file)

#Use vectors to convert input lines to usable data and map it to their labels
data = inputLines.map(lambda x: x.split(",")).map(lambda x: (float(x[0]), Vectors.dense(float(x[1]))))

#Create header
my_cols = ['label', 'features']

#Convert to dataframe
df=data.toDF(my_cols)

df.show(5)

[Stage 1:>                                                          (0 + 1) / 1]

+-----+--------+
|label|features|
+-----+--------+
|-1.74|  [1.66]|
| 1.24| [-1.18]|
| 0.29|  [-0.4]|
|-0.13|  [0.09]|
|-0.39|  [0.38]|
+-----+--------+
only showing top 5 rows



                                                                                

##### Create a Linear Regression Model

In [9]:
# Split dataframe in two subset (80% for training, 20% for testing)
trainTest = df.randomSplit([0.8,0.2])
df_train = trainTest[0]
df_test = trainTest[1]

In [10]:
# create a model using MMLlib
lir = spark_LR(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# fit the model
this_model = lir.fit(df_train)

23/06/23 17:29:19 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/06/23 17:29:19 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
                                                                                

In [11]:
this_model

LinearRegressionModel: uid=LinearRegression_41cf16ac1ba3, numFeatures=1

In [12]:
# generate your predictions
df_pred = this_model.transform(df_test).cache()

df_pred.show(5) 

+-----+--------+-------------------+
|label|features|         prediction|
+-----+--------+-------------------+
|-2.54|  [2.39]| -1.699858778224386|
|-2.29|  [2.35]| -1.671264795768647|
|-2.26|  [2.25]|   -1.5997798396293|
|-2.09|  [1.97]|-1.3996219624391282|
|-1.79|  [1.73]|-1.2280580677046953|
+-----+--------+-------------------+
only showing top 5 rows



In [13]:
# extract prediction values
pred = df_pred.select('prediction').rdd.map(lambda  x: x[0])
labels = df_pred.select('label').rdd.map(lambda  x: x[0])

# zip values together with labels
prediction_set = pred.zip(labels).collect()

In [14]:
# print results (in range 0, 5)
prediction_set[0:5]

[(-1.699858778224386, -2.54),
 (-1.671264795768647, -2.29),
 (-1.5997798396293, -2.26),
 (-1.3996219624391282, -2.09),
 (-1.2280580677046953, -1.79)]

<br>

<h3>  </h3>

<a id='s1p3'></a>

### ML from .csv data

In [15]:
df = spark.read.option('header','true').csv('input/regression.csv')
df.show(5)

+------+--------+
|labels|feature1|
+------+--------+
| -1.74|    1.66|
|  1.24|    -1.1|
|  0.29|    -0.4|
| -0.13|    0.09|
| -0.39|    0.38|
+------+--------+
only showing top 5 rows



In [16]:
# Ensure the feature columns is float:
df = df.withColumn('feature1', df.feature1.cast('float'))

# Ensure that `label` is a numerical value as well
df = df.withColumn('labels', df.labels.cast('float'))
df.dtypes

[('labels', 'float'), ('feature1', 'float')]

In [17]:
#Check for na
df.filter((df.feature1.isNull()) | (df.labels.isNull())).show()

+------+--------+
|labels|feature1|
+------+--------+
| -1.11|    null|
| -0.94|    null|
| -0.85|    null|
| -1.12|    null|
| -1.22|    null|
| -0.53|    null|
| -2.12|    null|
| -1.05|    null|
| -1.25|    null|
|  -0.8|    null|
| -0.31|    null|
| -0.22|    null|
|  -1.2|    null|
| -0.33|    null|
| -0.82|    null|
| -0.09|    null|
| -0.68|    null|
| -1.34|    null|
|  -0.8|    null|
|  -0.1|    null|
+------+--------+
only showing top 20 rows



`Null` values are not accepted by MMLlib and they must be handled upfront.

For this demonstration we drop Null, however there are several options (using mean, using most frequent value, etc.)

In [18]:
#Check if 0 values are present to decide how to treat Null
df.filter(df.feature1 == 0).show(5)

+------+--------+
|labels|feature1|
+------+--------+
|  0.13|    -0.0|
|  0.05|    -0.0|
| -0.07|    -0.0|
|   0.0|    -0.0|
|  0.11|    -0.0|
+------+--------+
only showing top 5 rows



In [19]:
# We will drop all null rows
df = df.filter(df.feature1.isNotNull())
df.show(5)

+------+--------+
|labels|feature1|
+------+--------+
| -1.74|    1.66|
|  1.24|    -1.1|
|  0.29|    -0.4|
| -0.13|    0.09|
| -0.39|    0.38|
+------+--------+
only showing top 5 rows



In [20]:
#Transform all features in a single vector column `features`
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=['feature1'], outputCol="features")
df1 = vecAssembler.transform(df)

In [21]:
df1.show(5)

+------+--------+--------------------+
|labels|feature1|            features|
+------+--------+--------------------+
| -1.74|    1.66| [1.659999966621399]|
|  1.24|    -1.1|[-1.100000023841858]|
|  0.29|    -0.4|[-0.4000000059604...|
| -0.13|    0.09|[0.09000000357627...|
| -0.39|    0.38|[0.3799999952316284]|
+------+--------+--------------------+
only showing top 5 rows



- The `label` column represents the value to predict
- The `features` column is equal to the list of features values for that label

In [22]:
#prepare dataframe
dfx = df1.withColumn('label', df.labels).select('label','features')
dfx.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|-1.74| [1.659999966621399]|
| 1.24|[-1.100000023841858]|
| 0.29|[-0.4000000059604...|
|-0.13|[0.09000000357627...|
|-0.39|[0.3799999952316284]|
+-----+--------------------+
only showing top 5 rows



In [23]:
# Split dataframe in two subset (70% for training, 30% for testing)
trainTest_x = dfx.randomSplit([0.7,0.3])
dfx_train = trainTest_x[0]
dfx_test = trainTest_x[1]

In [24]:
dfx_train

DataFrame[label: float, features: vector]

In [25]:
#Perform Linear Regression
regression1 = spark_LR(maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [26]:
# fit the model
model_x = regression1.fit(dfx_train)

In [27]:
model_x

LinearRegressionModel: uid=LinearRegression_15e10be9e457, numFeatures=1

In [28]:
# generate your predictions
dfx_pred = model_x.transform(dfx_test).cache()

# extract prediction values
pred_x = dfx_pred.select('prediction').rdd.map(lambda  x: x[0])
labels_x = dfx_pred.select('label').rdd.map(lambda  x: x[0])

# zip values together with labels
prediction_set_x = pred_x.zip(labels_x).collect()

In [29]:
# print results (in range 0, 5)
prediction_set_x[0:5]

[(-2.3366550223253086, -3.2300000190734863),
 (-2.068863216162242, -2.890000104904175),
 (-1.7069822850932712, -2.5399999618530273),
 (-1.7793584022837554, -2.5399999618530273),
 (-1.6707940539397543, -2.430000066757202)]

<br>

<h3>  </h3>

<a id='s1p4'></a>

### Performance Evaluation

We create a dataframe used the result of our linear regression.

Then, we use the values in the dataframe for performance evaluation of the ML model.

In this case, we will test Mean Absolute Error (MAE) and Mean Squared Error (MSE).

In [30]:
predschema = StructType([
    StructField("prediction", DoubleType(), True),          #predicted value
    StructField("label", DoubleType(), True)                #actual value
])

predf = spark.createDataFrame(prediction_set_x, predschema)
predf.show(5)

+-------------------+-------------------+
|         prediction|              label|
+-------------------+-------------------+
|-2.3366550223253086|-3.2300000190734863|
| -2.068863216162242| -2.890000104904175|
|-1.7069822850932712|-2.5399999618530273|
|-1.7793584022837554|-2.5399999618530273|
|-1.6707940539397543| -2.430000066757202|
+-------------------+-------------------+
only showing top 5 rows



In [31]:
# Calculate MSE and MAE
from pyspark.ml.evaluation import RegressionEvaluator

# Calculate MSE
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predf)

# Then, calculate MAE
evaluator.setMetricName("mae")
mae = evaluator.evaluate(predf)

print(f'MAE:\t{mae} \nMSE:\t{mse}')
print()

MAE:	0.23655989472289898 
MSE:	0.08868880854517451



<h3>   </h3>

<a id='s1p5'></a>

### Save predictions to a .csv file

In [32]:
#Save dataframe as csv file
#try:
#    predf.write.csv('output/linear_regression_predictions.csv', encoding='UTF-8')
#    print('File saved')
#except:
#    print('Cannot save. Maybe the file already exists.')

<a id='section3'></a>
## Decision Trees

<a id='section4'></a>
## Movies Recommendations

<a id='section5'></a>
## Close Spark Session

In [36]:
spark.stop()

In [38]:
print('\n     End of the Notebook :)')


     End of the Notebook :)
