# Linear Regression with Spark

Scenario:

Imagin that we've been contracted by Hyundai Heavy Industries to help them build a predictive model for some ships...
The project we have been assign is about to create a machine learning model that will help to predict how many crew members will be needed for future ships. 
The client also mentioned that they have found that particular "cruise lines" will differ in acceptable crew counts,so it is most likely an important feature to include in the analysis!
The dataset we are using was downloaded from the UCI machine learning repository: https://archive.ics.uci.edu/ml/index.php

Dataset structure:

So let's start!

In [5]:
#Imports
# 1) For the spark session
from pyspark.sql import SparkSession
# 2) For the linear regression model
from pyspark.ml.regression import LinearRegression
# 3) To enumerate the stings
from pyspark.ml.feature import StringIndexer

In [6]:
#Creation of a spark session
spark = SparkSession.builder.appName('lr_hyunday').getOrCreate()

In [7]:
#Reading the dataset
data = spark.read.csv('cruise_ship_info.csv', inferSchema= True, header=True)

In [8]:
#The dataset has the following schema...
data.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



# Data preparation

In [9]:
data.show()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|
|    Ecstasy|   Carnival| 22|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Elation|   Carnival| 15|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Fantasy|   Carnival| 23| 

In the case we want to understand if there exist a linear correlation between specific variables we can calculate it.

In [10]:
# Import corelation method
from pyspark.sql.functions import corr

In [11]:
#In this case we evaluate the Pearson's correlation between the variables 'crew' and 'passengers'. 
#This is a good preliminary step in order to understand which variables to include as a feature (see forward).
#The Pearson's correlationg goes from -1<= p <=1.
#More the value of p is close to 1 more strong is the correlation between the too variables.
data.select(corr('crew', 'passengers')).show()

+----------------------+
|corr(crew, passengers)|
+----------------------+
|    0.9152341306065384|
+----------------------+



In [12]:
#Here GroupBy "Cruise_line" in order to understand how many cruise lines there are in the data set
data.groupBy('Cruise_line').count().show()

+-----------------+-----+
|      Cruise_line|count|
+-----------------+-----+
|            Costa|   11|
|              P&O|    6|
|           Cunard|    3|
|Regent_Seven_Seas|    5|
|              MSC|    8|
|         Carnival|   22|
|          Crystal|    2|
|           Orient|    1|
|         Princess|   17|
|        Silversea|    4|
|         Seabourn|    3|
| Holland_American|   14|
|         Windstar|    3|
|           Disney|    2|
|        Norwegian|   13|
|          Oceania|    3|
|          Azamara|    2|
|        Celebrity|   10|
|             Star|    6|
|  Royal_Caribbean|   23|
+-----------------+-----+



As mentioned before, particular "cruise lines" will differ in acceptable crew counts. So the next step is to convert the cruise line string into and integer, this will allow to create a new calculable feature.

In [13]:
#Creation of the StringIndexer object
#In the output column "Cruise_line_Index" we will put the result of the conversion 
indexer = StringIndexer(inputCol="Cruise_line", outputCol="Cruise_line_Index")

#Fitting of all the data
indexed = indexer.fit(data).transform(data)

#Show the result
indexed.show()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+-----------------+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|Cruise_line_Index|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+-----------------+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|             16.0|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|             16.0|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|              1.0|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|              1.0|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|              1.0|
|    Ecstasy|   Carnival| 22|            70.367|     20.52|  8.55|  10.2|       

Now is time to transform the data set into "features" column and "label" column only. To do this we need to import the VectorAssembler modul.

In [14]:
# Imports 
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [15]:
# Looking to the columns we want to include as features
indexed.columns

['Ship_name',
 'Cruise_line',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew',
 'Cruise_line_Index']

In [16]:
#Instancing of the modul Vector Assembler.
#Pay attention: In the input column we put only the numerical variables usefull for the analysis, we don't
#use 'Ship_name' and 'Cruise_line' variables.

assembler = VectorAssembler(inputCols=['Age','Tonnage','passengers','length','cabins','passenger_density','Cruise_line_Index'],
                             outputCol= 'features')

In [17]:
#Transforming of the dataset
output = assembler.transform(indexed)

#As we can see in the schema there is a new column "features". It is a dense vector.
output.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)
 |-- Cruise_line_Index: double (nullable = true)
 |-- features: vector (nullable = true)



In [18]:
# Now we keep only the 'crew' column (targhet varible) and the 'features' column (dipendent variable)
final_data = output.select('crew', 'features')
final_data.show()

+----+--------------------+
|crew|            features|
+----+--------------------+
|3.55|[6.0,30.276999999...|
|3.55|[6.0,30.276999999...|
| 6.7|[26.0,47.262,14.8...|
|19.1|[11.0,110.0,29.74...|
|10.0|[17.0,101.353,26....|
| 9.2|[22.0,70.367,20.5...|
| 9.2|[15.0,70.367,20.5...|
| 9.2|[23.0,70.367,20.5...|
| 9.2|[19.0,70.367,20.5...|
|11.5|[6.0,110.23899999...|
|11.6|[10.0,110.0,29.74...|
| 6.6|[28.0,46.052,14.5...|
| 9.2|[18.0,70.367,20.5...|
| 9.2|[17.0,70.367,20.5...|
| 9.3|[11.0,86.0,21.24,...|
|11.6|[8.0,110.0,29.74,...|
|10.3|[9.0,88.5,21.24,9...|
| 9.2|[15.0,70.367,20.5...|
| 9.3|[12.0,88.5,21.24,...|
| 9.2|[20.0,70.367,20.5...|
+----+--------------------+
only showing top 20 rows



# Creation of the model

In [19]:
# First we crate the training set and test set (70% e 30%)
training_set, test_set = final_data.randomSplit([0.7, 0.3])

In [20]:
# Checking of the splitting
training_set.describe().show()

+-------+------------------+
|summary|              crew|
+-------+------------------+
|  count|               122|
|   mean| 7.951475409836066|
| stddev|3.5721301078844796|
|    min|              0.59|
|    max|              21.0|
+-------+------------------+



In [21]:
test_set.describe().show()

+-------+------------------+
|summary|              crew|
+-------+------------------+
|  count|                36|
|   mean|  7.26111111111111|
| stddev|3.2507649893329753|
|    min|              0.88|
|    max|             12.38|
+-------+------------------+



In [22]:
#Instancing of the object LinearRegression

#The labelCol parameter takes in input the targhet variable
lr = LinearRegression(labelCol='crew')

#Fitting
#The linear regression model is calculated on the training data
lr_model = lr.fit(training_set)

# Evaluation of the lr model

In [23]:
#Now is time to evaluate the model on the test set data
test_result = lr_model.evaluate(test_set)

In [24]:
# As a first parameter we get a look to the "residuals" values.
# A single residual value represent the difference between the actual value of the targhet variable 
# (numbers of crew members) and the prediction value of the model
test_result.residuals.show()

+--------------------+
|           residuals|
+--------------------+
|-0.49136594014959123|
|-0.01624419173843...|
|0.018674262608828096|
|-0.26845166994477987|
|-0.01521749266098471|
|  0.4172521781371099|
|-0.10968801032535325|
| 0.31095930102150504|
|  1.1848229710261418|
|-0.38621063541529654|
| -1.1886289077020455|
|  -0.625355008271927|
| 0.08490603800521157|
|-0.23917858939303382|
| -1.6753835359241567|
|   0.933598504748903|
| -0.3122251337531763|
| -0.2575870001516005|
| -0.8560952701210587|
|  1.2185729518332185|
+--------------------+
only showing top 20 rows



Each residual value represent the residual error in the prediction

In [25]:
# Here we go to evaluate the accuracy of the model
# We take in charge the rootMeanSquaredError, It give back the mean error that the model does
# in the prediction of the number of the crew members.

test_result.rootMeanSquaredError

0.949239136653638

The value of the rootMeanSquaredError refers to the number of people.

In [26]:
# If we observe the mean, the max and the standard deviation of our dataset we can conclude that 
# our value is the sign of a good performance of the model
final_data.describe().show()

+-------+-----------------+
|summary|             crew|
+-------+-----------------+
|  count|              158|
|   mean|7.794177215189873|
| stddev|3.503486564627034|
|    min|             0.59|
|    max|             21.0|
+-------+-----------------+



Other evaluation metrics 

In [27]:
# The r2 error gives back the proportion between the data variability and the correctness of the used model
# It has a range 0<= r2 <=1. More the value is close to 1 more the model fits to the data set.

# Fr example, if r2= 0.94 it means tha our model can explain the 94% of our dataset
test_result.r2

0.9122969735547289

So, until here seems that our evaluation metric values are good. But let's explore more metrics...

In [25]:
# The Mean Squared Error
test_result.meanSquaredError

0.5558621458097346

In [26]:
# The Mean Absolute Error
test_result.meanAbsoluteError

0.6051312218999878

The mean absolute error is another parameter for the evaluation of the model. It represent the mean absolute error our model produce when predicting the targhet varible. 

It seems that our model works good. Now is time to apply our model on unlabled data.

# If we want to simulate the prediction on unlabeled data...

In [28]:
#We can use the test set and select only the "features" column
unlabeled_data = test_set.select('features')
unlabeled_data.show()

+--------------------+
|            features|
+--------------------+
|[25.0,5.35,1.58,4...|
|[24.0,10.0,2.08,4...|
|[27.0,10.0,2.08,4...|
|[16.0,19.2,3.2,5....|
|[14.0,33.0,4.9,5....|
|[40.0,28.0,11.5,6...|
|[21.0,38.0,10.56,...|
|[25.0,38.0,7.49,6...|
|[21.0,19.093,8.0,...|
|[18.0,51.004,9.4,...|
|[14.0,63.0,14.4,7...|
|[13.0,61.0,13.8,7...|
|[25.0,42.0,15.04,...|
|[10.0,68.0,10.8,7...|
|[15.0,78.491,24.3...|
|[23.0,48.563,20.2...|
|[22.0,69.845,15.9...|
|[10.0,58.825,15.6...|
|[18.0,70.0,18.0,8...|
|[24.0,40.05300000...|
+--------------------+
only showing top 20 rows



In [29]:
# Here we transform the unlabeled data using the model
prediction = lr_model.transform(unlabeled_data)

In [30]:
# And finally we have the prediction
# The prediction values corresponds to the numbers of crew a ship needs in relation of it's features
prediction.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[25.0,5.35,1.58,4...|1.3713659401495912|
|[24.0,10.0,2.08,4...|1.6162441917384365|
|[27.0,10.0,2.08,4...| 1.581325737391172|
|[16.0,19.2,3.2,5....|2.3784516699447797|
|[14.0,33.0,4.9,5....| 3.255217492660985|
|[40.0,28.0,11.5,6...|  3.38274782186289|
|[21.0,38.0,10.56,...| 4.489688010325353|
|[25.0,38.0,7.49,6...| 4.289040698978495|
|[21.0,19.093,8.0,...|3.5151770289738584|
|[18.0,51.004,9.4,...| 5.836210635415297|
|[14.0,63.0,14.4,7...| 6.798628907702046|
|[13.0,61.0,13.8,7...| 6.625355008271927|
|[25.0,42.0,15.04,...| 6.215093961994788|
|[10.0,68.0,10.8,7...| 6.599178589393034|
|[15.0,78.491,24.3...| 8.275383535924156|
|[23.0,48.563,20.2...| 5.776401495251097|
|[22.0,69.845,15.9...| 7.272225133753176|
|[10.0,58.825,15.6...|7.2575870001516005|
|[18.0,70.0,18.0,8...| 8.056095270121059|
|[24.0,40.05300000...|6.2814270481667815|
+--------------------+------------

Great! We have finished our project!

# Thanks for following the tutorial!