# Introduction

We are interested in experimenting with linear regression in Spark. Here we are working with a dataset containing information about various wine. It has many features: citric acid, chlorides etc. and also quality. It seems natural to model quality as a function of the remaining features. 

After some massaging of the data we split into train and test sets. We combine the features into a single vector (necessary for Spark's Linear Regressor) using the Vector Assembler. We combine these transformations into a useful pipeline so we can easily apply it to the test set as well.

Finally we use the trained model to make predictions on the test data, before evaluating the model using Spark's Regression Evaluator with the RMSE and R^2 metrics.

# Exploring and tidying the dataset

We explore the datasets available before opening our wine data. The data is formatted as a single column so we split it and change the data types to those that we want.

In [0]:
# Show available datasets
display(dbutils.fs.ls('/databricks-datasets'))

path,name,size,modificationTime
dbfs:/databricks-datasets/COVID/,COVID/,0,0
dbfs:/databricks-datasets/README.md,README.md,976,1532468253000
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0,0
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359,1455043490000
dbfs:/databricks-datasets/adult/,adult/,0,0
dbfs:/databricks-datasets/airlines/,airlines/,0,0
dbfs:/databricks-datasets/amazon/,amazon/,0,0
dbfs:/databricks-datasets/asa/,asa/,0,0
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0,0
dbfs:/databricks-datasets/bikeSharing/,bikeSharing/,0,0


In [0]:
display(dbutils.fs.ls('/databricks-datasets/wine-quality'))

path,name,size,modificationTime
dbfs:/databricks-datasets/wine-quality/README.md,README.md,1066,1594262736000
dbfs:/databricks-datasets/wine-quality/winequality-red.csv,winequality-red.csv,84199,1594262736000
dbfs:/databricks-datasets/wine-quality/winequality-white.csv,winequality-white.csv,264426,1594262736000


In [0]:
# Read in the data
path = '/databricks-datasets/wine-quality/winequality-red.csv'
df = spark.read.option("header", "true").csv(path)

In [0]:
# Unfortunately this data has been read in as a single column, so we need to split it
df.show(5)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|"fixed acidity";"volatile acidity";"citric acid";"residual sugar";"chlorides";"free sulfur dioxide";"total sulfur dioxide";"density";"pH";"sulphates";"alcohol";"quality"|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                     7.4;0.7;0;1.9;0.0...|
|                                                                                                                                                     7.8;0.88;0;2.6;0....|
|                                                                                                                                           

In [0]:
# We can see from the schema that all the data appears in a single string
df.printSchema()

root
 |-- "fixed acidity";"volatile acidity";"citric acid";"residual sugar";"chlorides";"free sulfur dioxide";"total sulfur dioxide";"density";"pH";"sulphates";"alcohol";"quality": string (nullable = true)



In [0]:
# We can use split to separate the data, but there should be a less cumbersome method. 

from pyspark.sql.functions import split

split_col = split(df[0], ';')
df = df.withColumn('fixed acidity', split_col.getItem(0))
df = df.withColumn('volatile acidity', split_col.getItem(1))
df = df.withColumn('citric acid', split_col.getItem(2))
df = df.withColumn('residual sugar', split_col.getItem(3))
df = df.withColumn('chlorides', split_col.getItem(4))
df = df.withColumn('free sulfur dioxide', split_col.getItem(5))
df = df.withColumn('total sulfur dioxide', split_col.getItem(6))
df = df.withColumn('density', split_col.getItem(7))
df = df.withColumn('pH', split_col.getItem(8))
df = df.withColumn('sulphates', split_col.getItem(9))
df = df.withColumn('alcohol', split_col.getItem(10))
df = df.withColumn('quality', split_col.getItem(11))

In [0]:
# We now have all columns, including the one we started with
df.show(5)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|"fixed acidity";"volatile acidity";"citric acid";"residual sugar";"chlorides";"free sulfur dioxide";"total sulfur dioxide";"density";"pH";"sulphates";"alcohol";"quality"|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|                                       

In [0]:
# Let's drop the initial one
df = df.drop(df.columns[0])

In [0]:
df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|          0|           1.9|    0.076|                 11|                  34| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|          0|           2.6|    0.098|                 25|                  67| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|                 15|                  54|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|                 17|           

In [0]:
# We can see from the Schema that all data is string. We want numerical. 
df.printSchema()

root
 |-- fixed acidity: string (nullable = true)
 |-- volatile acidity: string (nullable = true)
 |-- citric acid: string (nullable = true)
 |-- residual sugar: string (nullable = true)
 |-- chlorides: string (nullable = true)
 |-- free sulfur dioxide: string (nullable = true)
 |-- total sulfur dioxide: string (nullable = true)
 |-- density: string (nullable = true)
 |-- pH: string (nullable = true)
 |-- sulphates: string (nullable = true)
 |-- alcohol: string (nullable = true)
 |-- quality: string (nullable = true)



In [0]:
from pyspark.sql.functions import col

df = df.select([col(c).cast("double") for c in df.columns])

In [0]:
df.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: double (nullable = true)



# Linear regression model

We split our data into train and test sets and then use the vector assembler to combine all features into a single column, as required by Spark for Linear Regression. We train our model and examine its coefficients and then use it to make a prediction based on the test data.

In [0]:
# Let's try to do a Linear Regression first
from pyspark.ml.regression import LinearRegression

In [0]:
# Split the data. First we will process the training data. Since we will combine our process into a pipeline we can will easily be able to process the test data later.
train_df, test_df = df.randomSplit([.8, .2], seed=11)

In [0]:
train_df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          4.6|            0.52|       0.15|           2.1|    0.054|                8.0|                65.0| 0.9934| 3.9|     0.56|   13.1|    4.0|
|          4.7|             0.6|       0.17|           2.3|    0.058|               17.0|               106.0| 0.9932|3.85|      0.6|   12.9|    6.0|
|          4.9|            0.42|        0.0|           2.1|    0.048|               16.0|                42.0|0.99154|3.71|     0.74|   14.0|    7.0|
|          5.0|            0.38|       0.01|           1.6|    0.048|               26.0|           

In [0]:
# It seems like for many Spark algorithms we need all features to be in one vector, so we use the Vector Assembler here.

from pyspark.ml.feature import VectorAssembler

features = [col for col in df.columns if col not in ["quality"]]

vecAssembler = VectorAssembler(inputCols = features, outputCol = "features")

vec_train_df = vecAssembler.transform(train_df)

In [0]:
vec_train_df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|            features|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+
|          4.6|            0.52|       0.15|           2.1|    0.054|                8.0|                65.0| 0.9934| 3.9|     0.56|   13.1|    4.0|[4.6,0.52,0.15,2....|
|          4.7|             0.6|       0.17|           2.3|    0.058|               17.0|               106.0| 0.9932|3.85|      0.6|   12.9|    6.0|[4.7,0.6,0.17,2.3...|
|          4.9|            0.42|        0.0|           2.1|    0.048|               16.0|                42.0|0.99154|3.71|     0.74|   14.0|    

In [0]:
# These are the two columns we need for the regression. We can select them later too, within the Regression fit
vec_train_df.select(['features', 'quality']).show(5)

+--------------------+-------+
|            features|quality|
+--------------------+-------+
|[4.6,0.52,0.15,2....|    4.0|
|[4.7,0.6,0.17,2.3...|    6.0|
|[4.9,0.42,0.0,2.1...|    7.0|
|[5.0,0.38,0.01,1....|    6.0|
|[5.0,0.4,0.5,4.3,...|    6.0|
+--------------------+-------+
only showing top 5 rows



In [0]:
# Fit the linear model
lin_reg = LinearRegression(featuresCol="features", labelCol="quality")
lin_model = lin_reg.fit(vec_train_df)

In [0]:
# Here are the fitted model's coefficients
lin_model.coefficients

Out[115]: DenseVector([0.0243, -1.0257, -0.142, 0.022, -2.2325, 0.0049, -0.0032, -13.9329, -0.3722, 0.9894, 0.2491])

In [0]:
# The fitted model's intercept
lin_model.intercept

Out[116]: 18.078660840510274

In [0]:
# This model can be used to transform the vectorized test data to make a prediction. Note that we have to use the vector assembler first. This is untidy and our pipeline below will make it much nicer.
lin_model.transform(vecAssembler.transform(test_df))["quality", "prediction"].show(5)

+-------+-----------------+
|quality|       prediction|
+-------+-----------------+
|    8.0|6.599491419997751|
|    6.0|5.685681239969254|
|    7.0|6.327832003244092|
|    5.0| 5.26249987365507|
|    6.0|6.802500766502856|
+-------+-----------------+
only showing top 5 rows



# Pipeline

For convenience we turn our process into a pipeline, which can then be used to transform the test data.

In [0]:
# We can compactify this into a pipeline, since our training data goes train -> vectorAssembler -> regression

from pyspark.ml import Pipeline

pipeline = Pipeline(stages = [vecAssembler, lin_reg])

# Apply the pipeline to the train data

pipelined_model = pipeline.fit(train_df)

In [0]:
# We can easily apply the pipeline to the test data to make predictions based on it
pred_data = pipelined_model.transform(test_df)

In [0]:
# Our predictions
pred_data[["quality", "prediction"]].show(5)

+-------+-----------------+
|quality|       prediction|
+-------+-----------------+
|    8.0|6.599491419997751|
|    6.0|5.685681239969254|
|    7.0|6.327832003244092|
|    5.0| 5.26249987365507|
|    6.0|6.802500766502856|
+-------+-----------------+
only showing top 5 rows



# Model Evaluation

Finally we evaluate our model using the RMSE and R^2 metrics.

In [0]:
# We can calculate the RMSE as follows.

from pyspark.ml.evaluation import RegressionEvaluator

reg_eval = RegressionEvaluator(predictionCol = "prediction", labelCol = "quality", metricName="rmse")
rmse = reg_eval.evaluate(pred_data)

rmse

In [0]:
# To evaulate the RMSE we need to compare it to a baseline model. Instead we can use R^2, which has the baseline (always predicting the mean) built in
# # A value greater than 0 is better than predicting the mean, but we want to be as close to 1 as possible, so perhaps this model isn't great.
R_squared = reg_eval.setMetricName("r2").evaluate(pred_data)
R_squared

Out[123]: 0.4158549985983323

Saving and loading the model for convenience

In [0]:
# We can save the model, but we will need to keep track of what kind of model it is. Instead we can save the pipeline

pipeline_path = "/tmp/lr-pipeline-model"
pipelined_model.write().overwrite().save(pipeline_path)

In [0]:
# Load the model

from pyspark.ml import PipelineModel

saved_pipeline_model = PipelineModel.load(pipeline_path)