Import of pyspark and data

In [1]:
import findspark

In [2]:
findspark.init('/home/dimitris13/spark-3.0.0-bin-hadoop3.2')

In [3]:
"""
We import the pyspark module
"""

import pyspark

In [4]:
"""
We import a SparkSession
"""

from pyspark.sql import SparkSession

In [5]:
"""
We build a spark session, with name crew_pred, in order to 
access the Spark Web UI
"""

spark = SparkSession.builder.appName('cons_proj').getOrCreate()

In [6]:
data = spark.read.csv('/home/dimitris13/Downloads/cruise_ship_info.csv',
                     header = True, inferSchema= True)

We explore some features of our data by using the appropriate Spark methods

In [7]:
"""
After we have imported our data, we rename some of the columns. In 
particular we capitalise some of the column titles. Of course, this
is completely optional
"""

data = data.withColumnRenamed('passengers','Passengers').withColumnRenamed('length','Length').withColumnRenamed('cabins','Cabins')
data = data.withColumnRenamed('passenger_density','Passenger_density').withColumnRenamed('crew','Crew')

In [8]:
data.columns

['Ship_name',
 'Cruise_line',
 'Age',
 'Tonnage',
 'Passengers',
 'Length',
 'Cabins',
 'Passenger_density',
 'Crew']

In [9]:
"""
We obtain a list consisting of two objects. Those are row objects 
and correspond to the first two rows of the data frame. We pick
the first row object of the list (i.e the first row of the Data 
Frame). Note that the actual crew members are data['crew']*100
"""

data.head(2)[0]

Row(Ship_name='Journey', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, Passengers=6.94, Length=5.94, Cabins=3.55, Passenger_density=42.64, Crew=3.55)

In [10]:
"""
An idea of exploring our data.
"""

data.head(2)[0][0]
print(f'The ship {data.head(2)[0][0]} has {int(data.head(2)[0][-1]*100)} crew members')

The ship Journey has 355 crew members


We now do some feature engineering on the columns of our Spark Data Frame

We first observe that we have a categorical feature named 'Cruise_line'.
The feature cannot be used while it is in this form. We shall 
convert it into numerical column by using the StringIndexer.

In [11]:
from pyspark.ml.feature import VectorAssembler, StringIndexer

In [12]:
"""
We create a StringIndexer object, by giving the input column and
defining the name of the output column. Then we fit the object in
our data.
"""

string_indexer = StringIndexer(inputCol='Cruise_line',outputCol='Cruise_line_index')

In [13]:
model = string_indexer.fit(data)

In [14]:
indexed_data = model.transform(data)

In [15]:
"""
We see that now, after the transformation, our Spark Data Frame
has an extra column including the indices of the Cruise_line. Now we
can utilise this column in our exploratory data analysis.
"""
indexed_data.columns

['Ship_name',
 'Cruise_line',
 'Age',
 'Tonnage',
 'Passengers',
 'Length',
 'Cabins',
 'Passenger_density',
 'Crew',
 'Cruise_line_index']

In [16]:
"""
We now use the VectorAssembler object to gather all our feature 
columns in a single vector. This is the desired form of features
in Apache Spark. The we transform our indexed_data.
"""

assembler = VectorAssembler(inputCols = ['Age', 'Tonnage', 'Passengers', 'Length', 'Cabins', 'Passenger_density', 'Cruise_line_index'],
                           outputCol='features') 

In [17]:
output = assembler.transform(indexed_data)

In [18]:
"""
All we need as our final data is the feature vector created by 
the VectorAssembler object and the labels which is the Crew column
"""

final_data = output.select(['features','Crew'])

In [19]:
"""
We perform a random split in our final_data, in order to split
them in train and test set.
"""

train, test = final_data.randomSplit([0.7,0.3])

We import and create an instance of a Linear Regression model. Note that we name our label column. Optionally we could have named our feature column, but the name 'feature' is the default name and there is no need to do this.

In [20]:
from pyspark.ml.regression import LinearRegression

In [21]:
lr = LinearRegression(labelCol= 'Crew')

In [22]:
regr_model = lr.fit(train)

Our model is ready and fitted on our train data. We now need to evaluate our model on the test data.

In [23]:
test_results = regr_model.evaluate(test)

In [24]:
model_summary = regr_model.summary

In [25]:
print(f'Mean Absolute Error: {test_results.meanAbsoluteError}')
print(f'Mean Squared Error: {test_results.meanSquaredError}')
print(f'Root Mean Squared Error: {test_results.rootMeanSquaredError}')
print(f'R2: {test_results.r2}')

Mean Absolute Error: 0.728146442952454
Mean Squared Error: 1.8840822298762954
Root Mean Squared Error: 1.3726187489162078
R2: 0.875531741555629


By observing the metrics above, we note that our model performs well and explains a big proportion of the variance of the data.

Now we shall use our model to make predictions on unlabeled data.

In [26]:
unlabeled_data = test.select('features')

In [27]:
predictions = regr_model.transform(unlabeled_data)

In [28]:
predictions.join(test,on = 'features').show()

+--------------------+-------------------+-----+
|            features|         prediction| Crew|
+--------------------+-------------------+-----+
|[5.0,160.0,36.34,...|  14.91298789368967| 13.6|
|[6.0,30.276999999...|  4.131083118172464| 3.55|
|[6.0,110.23899999...| 10.919425910041161| 11.5|
|[6.0,113.0,37.82,...| 11.437956832857893| 12.0|
|[8.0,110.0,29.74,...| 11.794433448744728| 11.6|
|[9.0,81.0,21.44,9...|  9.299298972142367| 10.0|
|[9.0,116.0,26.0,9...| 11.020301717326626| 11.0|
|[10.0,86.0,21.14,...|  9.531191650580057|  9.2|
|[11.0,58.6,15.66,...|  7.232518462769702|  7.6|
|[11.0,86.0,21.24,...|   9.35111538306697|  9.3|
|[11.0,90.09,25.01...|  8.817251861527899| 8.48|
|[11.0,110.0,29.74...|  11.81533157475251| 19.1|
|[12.0,2.329,0.94,...|0.24383414740290843|  0.6|
|[12.0,50.0,7.0,7....|  4.578341343575277| 4.45|
|[12.0,77.104,20.0...|  8.581093279824158| 9.59|
|[12.0,88.5,21.24,...|    9.3484056015759|10.29|
|[12.0,138.0,31.14...| 12.858149436990264|11.85|
|[13.0,25.0,3.82,5..

Some further exploratory data analysis!

In [29]:
from pyspark.sql.functions import corr

In [30]:
data.select(corr('Passengers','Crew').alias('Passengers and Crew correlation')).show()

+-------------------------------+
|Passengers and Crew correlation|
+-------------------------------+
|             0.9152341306065384|
+-------------------------------+



In [31]:
data.select(corr('Cabins','Crew').alias('Cabins and Crew correlation')).show()

+---------------------------+
|Cabins and Crew correlation|
+---------------------------+
|         0.9508226063578497|
+---------------------------+



We see that there is high positive (Pearson) correlation between the number of crew members and the numbers of passenger capacity and cabins in the ship. 