# Spark ML: Predicting Avocado Prices

This notebook introduces how to train a ML model using Spark ML.  This bases on an excellent article in Towards Data Science [First Steps in Machine Learning with Apache Spark](https://towardsdatascience.com/first-steps-in-machine-learning-with-apache-spark-672fe31799a3) using [Avocado Prices dataset](https://www.kaggle.com/datasets/neuromusic/avocado-prices) in Kaggle.

The objective of this model is to predict the average price of avocado given datetime, supply amounts, and region.

## Spark Cluster Preparation

In [1]:
try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

In [2]:
if IN_COLAB:
    !apt-get install openjdk-8-jdk-headless -qq > /dev/null
    !wget -q https://dlcdn.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
    !tar xf spark-3.5.5-bin-hadoop3.tgz
    !mv spark-3.5.5-bin-hadoop3 spark
    !pip install -q findspark
    import os
    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
    os.environ["SPARK_HOME"] = "/content/spark"

In [3]:
import findspark
findspark.init()

In [4]:
spark_url = 'local'

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [6]:
spark = SparkSession.builder\
        .master(spark_url)\
        .appName('Spark SQL')\
        .getOrCreate()

## Data Preparation

First, we read a csv file.  We can provide option such as delimiter and header.  We then rename the colume names to remove dot ('.') in the names.

In [7]:
path = 'avocado.csv'

In [8]:
df_avocado = spark.read.csv(path, header=True, inferSchema=True)

In [9]:
cols = [c.replace(' ', '_') for c in df_avocado.columns]
df_avocado = df_avocado.toDF(*cols)

In [10]:
df_avocado.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- AveragePrice: double (nullable = true)
 |-- Total_Volume: double (nullable = true)
 |-- 4046: double (nullable = true)
 |-- 4225: double (nullable = true)
 |-- 4770: double (nullable = true)
 |-- Total_Bags: double (nullable = true)
 |-- Small_Bags: double (nullable = true)
 |-- Large_Bags: double (nullable = true)
 |-- XLarge_Bags: double (nullable = true)
 |-- type: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- region: string (nullable = true)



We then split data into training and testing datasets.

In [19]:
(df_avocado_train, df_avocado_test) = df_avocado.randomSplit([0.75, 0.25], seed=214)

In [21]:
df_avocado_train.show(4)

+---+----------+------------+------------+---------+---------+--------+----------+----------+----------+-----------+------------+----+----------------+
| Id|      Date|AveragePrice|Total_Volume|     4046|     4225|    4770|Total_Bags|Small_Bags|Large_Bags|XLarge_Bags|        type|year|          region|
+---+----------+------------+------------+---------+---------+--------+----------+----------+----------+-----------+------------+----+----------------+
|  0|2015-12-27|        0.49|  1137707.43| 738314.8|286858.37|11642.46|  100891.8|  70749.02|  30142.78|        0.0|conventional|2015|   PhoenixTucson|
|  0|2015-12-27|        0.71|   776404.39|451904.51|141599.36|15486.97| 167413.55| 123158.22|  33065.33|    11190.0|conventional|2015|WestTexNewMexico|
|  0|2015-12-27|         0.8|  1020390.64|494425.64|276556.76|84912.97| 164495.27| 136560.04|   12277.7|   15657.53|conventional|2015|   DallasFtWorth|
|  0|2015-12-27|         0.8|  2326942.14|976982.58|455203.42|86202.11| 808554.03| 72278

## Create ML Pipeline
For this pipeline, we will create several transformers using built-in estimators/transformers.  These include:


| SparkML Feature | Feature Type | Data Type |
|:-----------------|:--------------:|:--------------:|
| SQLTransformer  | Tranformer   | Numerical |
| MinMaxScaler    | Estimator    | Numerical |
| StandardScaler  | Estimator    | Numerical |
| StringIndexer   | Estimator    | Categorical |
| VectorAssembler | Transformer  | Both |

Using these components, we create the following pipeline:

| Pipeline Stage | SparkML Feature |
|:----------|:----------|
| sql_transformer | SQLTransformer |
| month_vec_asm_transfromer | VectorAssembler |
| month_scaler_transfromer | MinMaxScaler |
| numerical_vec_asm_transformer | VectorAssembler |
| std_scaler_transformer | StandardScaler |
| str_indexer_transformer | StringIndexer |
| categorical_vec_asm_transformer | VectorAssembler |
| all_vec_asm_transformer | VectorAssembler |

In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import SQLTransformer, MinMaxScaler, StandardScaler
from pyspark.ml.feature import VectorAssembler, StringIndexer

### Numerical Feature Transformers

#### sql_transformer: numeric column selection and log-transform
Create a transformer to select columns and log-transform some numerical columns

In [13]:
cols = ['AveragePrice', 'type']
cols = [f"`{col}`" for col in cols]
cols

['`AveragePrice`', '`type`']

In [14]:
log_cols =  ['4225', '4770', 'Small_Bags', 'Large_Bags', 'XLarge_Bags']
log_cols = [f"LOG(`{col}`+1) AS `LOG_{col}`" for col in log_cols] # logarithm, +1 in case it is 0
log_cols

['LOG(`4225`+1) AS `LOG_4225`',
 'LOG(`4770`+1) AS `LOG_4770`',
 'LOG(`Small_Bags`+1) AS `LOG_Small_Bags`',
 'LOG(`Large_Bags`+1) AS `LOG_Large_Bags`',
 'LOG(`XLarge_Bags`+1) AS `LOG_XLarge_Bags`']

In [16]:
statement = f"""SELECT{', '.join(cols)}, {', '.join(log_cols)},
    YEAR(__THIS__.Date)-2000 AS year, MONTH(__THIS__.Date) AS month
    FROM __THIS__
    """
statement

'SELECT`AveragePrice`, `type`, LOG(`4225`+1) AS `LOG_4225`, LOG(`4770`+1) AS `LOG_4770`, LOG(`Small_Bags`+1) AS `LOG_Small_Bags`, LOG(`Large_Bags`+1) AS `LOG_Large_Bags`, LOG(`XLarge_Bags`+1) AS `LOG_XLarge_Bags`, \n    YEAR(__THIS__.Date)-2000 AS year, MONTH(__THIS__.Date) AS month\n    FROM __THIS__\n    '

In [17]:
sql_transformer = SQLTransformer(statement=statement)

In [18]:
df_avocado_train.show(4)

+---+----------+------------+------------+---------+---------+--------+----------+----------+----------+-----------+------------+----+----------------+
| Id|      Date|AveragePrice|Total_Volume|     4046|     4225|    4770|Total_Bags|Small_Bags|Large_Bags|XLarge_Bags|        type|year|          region|
+---+----------+------------+------------+---------+---------+--------+----------+----------+----------+-----------+------------+----+----------------+
|  0|2015-12-27|        0.49|  1137707.43| 738314.8|286858.37|11642.46|  100891.8|  70749.02|  30142.78|        0.0|conventional|2015|   PhoenixTucson|
|  0|2015-12-27|        0.71|   776404.39|451904.51|141599.36|15486.97| 167413.55| 123158.22|  33065.33|    11190.0|conventional|2015|WestTexNewMexico|
|  0|2015-12-27|         0.8|  1020390.64|494425.64|276556.76|84912.97| 164495.27| 136560.04|   12277.7|   15657.53|conventional|2015|   DallasFtWorth|
|  0|2015-12-27|         0.8|  2326942.14|976982.58|455203.42|86202.11| 808554.03| 72278

In [22]:
sql_transformer.transform(df_avocado_train).show(4)

+------------+------------+------------------+------------------+------------------+------------------+-----------------+----+-----+
|AveragePrice|        type|          LOG_4225|          LOG_4770|    LOG_Small_Bags|    LOG_Large_Bags|  LOG_XLarge_Bags|year|month|
+------------+------------+------------------+------------------+------------------+------------------+-----------------+----+-----+
|        0.49|conventional|12.566747374652527| 9.362499927974252|11.166908098190957|10.313733879047971|              0.0|  15|   12|
|        0.71|conventional|11.860764002611406| 9.647818872531012| 11.72123326879331| 10.40627082310141|9.322865162818028|  15|   12|
|         0.8|conventional| 12.53017497505446|11.349393905288467|11.824526973139381| 9.415621332905047|9.658771095406955|  15|   12|
|         0.8|conventional|13.028501871764691|11.364461534887267|13.490872079413348| 11.21667384527801|9.342104328605496|  15|   12|
+------------+------------+------------------+------------------+----

In [24]:
df_avocado_train.show(4) # still the same

+---+----------+------------+------------+---------+---------+--------+----------+----------+----------+-----------+------------+----+----------------+
| Id|      Date|AveragePrice|Total_Volume|     4046|     4225|    4770|Total_Bags|Small_Bags|Large_Bags|XLarge_Bags|        type|year|          region|
+---+----------+------------+------------+---------+---------+--------+----------+----------+----------+-----------+------------+----+----------------+
|  0|2015-12-27|        0.49|  1137707.43| 738314.8|286858.37|11642.46|  100891.8|  70749.02|  30142.78|        0.0|conventional|2015|   PhoenixTucson|
|  0|2015-12-27|        0.71|   776404.39|451904.51|141599.36|15486.97| 167413.55| 123158.22|  33065.33|    11190.0|conventional|2015|WestTexNewMexico|
|  0|2015-12-27|         0.8|  1020390.64|494425.64|276556.76|84912.97| 164495.27| 136560.04|   12277.7|   15657.53|conventional|2015|   DallasFtWorth|
|  0|2015-12-27|         0.8|  2326942.14|976982.58|455203.42|86202.11| 808554.03| 72278

#### month_vec_asm_transformer / month_scaler_transformer: create month vectors and normalize their values

After using SQLTransformer, we then tranform *'month'* column into month vector and then normalize their values

In [25]:
month_vec_asm_transformer = VectorAssembler(inputCols=['month'], outputCol='month_vec')

df_avocado_month_ass = month_vec_asm_transformer.transform(sql_transformer.transform(df_avocado_train))
df_avocado_month_ass.show(4)

+------------+------------+------------------+------------------+------------------+------------------+-----------------+----+-----+---------+
|AveragePrice|        type|          LOG_4225|          LOG_4770|    LOG_Small_Bags|    LOG_Large_Bags|  LOG_XLarge_Bags|year|month|month_vec|
+------------+------------+------------------+------------------+------------------+------------------+-----------------+----+-----+---------+
|        0.49|conventional|12.566747374652527| 9.362499927974252|11.166908098190957|10.313733879047971|              0.0|  15|   12|   [12.0]|
|        0.71|conventional|11.860764002611406| 9.647818872531012| 11.72123326879331| 10.40627082310141|9.322865162818028|  15|   12|   [12.0]|
|         0.8|conventional| 12.53017497505446|11.349393905288467|11.824526973139381| 9.415621332905047|9.658771095406955|  15|   12|   [12.0]|
|         0.8|conventional|13.028501871764691|11.364461534887267|13.490872079413348| 11.21667384527801|9.342104328605496|  15|   12|   [12.0]|

Create a transformer that normalizes month vector using an estimator, *"MinMaxScaler"*

In [31]:
month_scaler_estimator = MinMaxScaler(inputCol='month_vec', outputCol='month_scaled')
month_scaler_transformer = month_scaler_estimator.fit(df_avocado_month_ass)

month_scaler_transformer.transform(df_avocado_month_ass)\
    .select( ['month', 'month_vec', 'month_scaled'] )\
    .where(df_avocado_month_ass.month == 2).show(10)

+-----+---------+--------------------+
|month|month_vec|        month_scaled|
+-----+---------+--------------------+
|    2|    [2.0]|[0.09090909090909...|
|    2|    [2.0]|[0.09090909090909...|
|    2|    [2.0]|[0.09090909090909...|
|    2|    [2.0]|[0.09090909090909...|
|    2|    [2.0]|[0.09090909090909...|
|    2|    [2.0]|[0.09090909090909...|
|    2|    [2.0]|[0.09090909090909...|
|    2|    [2.0]|[0.09090909090909...|
|    2|    [2.0]|[0.09090909090909...|
|    2|    [2.0]|[0.09090909090909...|
+-----+---------+--------------------+
only showing top 10 rows



#### numerical_vec_asm_transformer/std_scaler_transformer : assemble numerical features vector and scale all numerical features

In [32]:
numerical_vec_asm_transformer = VectorAssembler(
    inputCols=[
      'year', 'month_scaled', 'LOG_4225',
      'LOG_4770', 'LOG_Small_Bags',
      'LOG_Large_Bags', 'LOG_XLarge_Bags'
    ],
    outputCol='features_num'
)
df_avocado_numerical = numerical_vec_asm_transformer.transform(month_scaler_transformer.transform(df_avocado_month_ass))
df_avocado_numerical.select('year', 'month_scaled', 'LOG_4225','features_num').show(4)

+----+------------+------------------+--------------------+
|year|month_scaled|          LOG_4225|        features_num|
+----+------------+------------------+--------------------+
|  15|       [1.0]|12.566747374652527|[15.0,1.0,12.5667...|
|  15|       [1.0]|11.860764002611406|[15.0,1.0,11.8607...|
|  15|       [1.0]| 12.53017497505446|[15.0,1.0,12.5301...|
|  15|       [1.0]|13.028501871764691|[15.0,1.0,13.0285...|
+----+------------+------------------+--------------------+
only showing top 4 rows



In [33]:
# Scaling the numerical features using a StandardScaler
std_scaler_estimator = StandardScaler(
    inputCol="features_num",
    outputCol="features_scaled",
    withStd=True,
    withMean=True
)

std_scaler_transformer = std_scaler_estimator.fit(df_avocado_numerical)
std_scaler_transformer.transform(df_avocado_numerical).select(['features_scaled']).show(5, False)

+----------------------------------------------------------------------------------------------------------------------------------------+
|features_scaled                                                                                                                         |
+----------------------------------------------------------------------------------------------------------------------------------------+
|[-1.2177154955881637,1.6482225355667333,0.9527463109714546,1.0269649008115518,0.5657377199959452,0.8334134211814762,-0.6436162273445295]|
|[-1.2177154955881637,1.6482225355667333,0.7058305701685025,1.0954357394643428,0.7803295242390127,0.8574417380503548,2.012648481596976]  |
|[-1.2177154955881637,1.6482225355667333,0.9399552148956506,1.5037797059140563,0.8203168521795554,0.6002078289352569,2.1083545825302594] |
|[-1.2177154955881637,1.6482225355667333,1.1142436751287843,1.5073956355774096,1.4653967110976907,1.0678725104034048,2.0181300922626053] |
|[-1.2177154955881637,1.648

### Categorical Feature Transformers
Transforming categorical features usually involve text transformation e.g. one-hot encoding

### str_indexer_transformer: encoding categorical data
We create a transformer using "StringIndexer", which is an estimator that produces StringIndexerModel.  This is similar to perform one-hot encoder on the categorical data

In [34]:
type_indexer_estimator = StringIndexer(inputCol="type", outputCol="type_index")
type_indexer_transformer = type_indexer_estimator.fit(df_avocado_train)

type_indexer_transformer.transform(df_avocado_train)\
  .select( ["type", "type_index"] ).show(4)

+------------+----------+
|        type|type_index|
+------------+----------+
|conventional|       0.0|
|conventional|       0.0|
|conventional|       0.0|
|conventional|       0.0|
+------------+----------+
only showing top 4 rows



In [35]:
categorical_vec_asm_transformer = VectorAssembler(
    inputCols=['type_index'],
    outputCol='features_cat'
)
categorical_vec_asm_transformer.transform(
    type_indexer_transformer.transform(df_avocado_train)
).select('type', 'type_index', 'features_cat').show(4)

+------------+----------+------------+
|        type|type_index|features_cat|
+------------+----------+------------+
|conventional|       0.0|       [0.0]|
|conventional|       0.0|       [0.0]|
|conventional|       0.0|       [0.0]|
|conventional|       0.0|       [0.0]|
+------------+----------+------------+
only showing top 4 rows



### Create a pipeline: merge both numerical and categorical features

In [36]:
all_vec_asm_transformer = VectorAssembler(
        inputCols=['features_scaled', 'features_cat'],
        outputCol='features')

In [37]:
feature_prep_pipeline = Pipeline(stages=[sql_transformer, month_vec_asm_transformer,
                                         month_scaler_transformer,
                                         numerical_vec_asm_transformer,
                                         std_scaler_transformer,
                                         type_indexer_transformer,
                                         categorical_vec_asm_transformer,
                                         all_vec_asm_transformer])

In [38]:
pipeline_model = feature_prep_pipeline.fit(df_avocado_train)

### Transform training dataset using the pipeline

In [39]:
df_avocado_train_transformed = pipeline_model.transform(df_avocado_train)

In [40]:
df_avocado_train_transformed.select('features', 'AveragePrice').show(5, False)

+--------------------------------------------------------------------------------------------------------------------------------------------+------------+
|features                                                                                                                                    |AveragePrice|
+--------------------------------------------------------------------------------------------------------------------------------------------+------------+
|[-1.2177154955881637,1.6482225355667333,0.9527463109714546,1.0269649008115518,0.5657377199959452,0.8334134211814762,-0.6436162273445295,0.0]|0.49        |
|[-1.2177154955881637,1.6482225355667333,0.7058305701685025,1.0954357394643428,0.7803295242390127,0.8574417380503548,2.012648481596976,0.0]  |0.71        |
|[-1.2177154955881637,1.6482225355667333,0.9399552148956506,1.5037797059140563,0.8203168521795554,0.6002078289352569,2.1083545825302594,0.0] |0.8         |
|[-1.2177154955881637,1.6482225355667333,1.1142436751287843,1.50

## Model Training
We will train a linear regression model using transformed training dataset.  In order to do this, we will have to fit an estimator, *'LinearRegression'* to transformed training dataset to create a model, which is a *transformer*, that can be used to test the testing dataset.

Note that this example focuses on how to create a pipeline.  Spark also provides hyperparameter tuning function.  However, this is out of the scope of this example.  Please refer to [First Steps in Machine Learning with Apache Spark](https://towardsdatascience.com/first-steps-in-machine-learning-with-apache-spark-672fe31799a3) for more details.

In [41]:
from pyspark.ml.regression import LinearRegression

In [42]:
linear_reg_estimator = LinearRegression(
    featuresCol='features',
    labelCol='AveragePrice',
    predictionCol='prediction',

    # Hyperaparameters
    maxIter=1000,
    regParam=0.3,       # Regularization
    elasticNetParam=0.8 # Regularization mixing parameter. 1 for L1, 0 for L2.
)

In [43]:
linear_reg_model = linear_reg_estimator.fit(df_avocado_train_transformed)

### Inference the testing dataset

In [44]:
df_avocado_train_pred = linear_reg_model.transform(df_avocado_train_transformed)
df_avocado_train_pred.select(
  ['AveragePrice', 'prediction']
).sample(False, 0.1, 0).show(5, False)

+------------+------------------+
|AveragePrice|prediction        |
+------------+------------------+
|0.8         |1.4003505112793717|
|0.95        |1.4003505112793717|
|0.98        |1.4003505112793717|
|1.07        |1.4116333911023091|
|1.39        |1.4116333911023091|
+------------+------------------+
only showing top 5 rows



## Model Evaluation
Spark provides several evaluation functions.  We will have to select the right one.

In [45]:
from pyspark.ml.evaluation import RegressionEvaluator

In [46]:
reg_eval = RegressionEvaluator(
    labelCol='AveragePrice',
    predictionCol='prediction',
    metricName='rmse' # Root mean squared error
)

In [47]:
reg_eval.evaluate(df_avocado_train_pred)

0.3978489578943717

## THE END

In [48]:
spark.stop()