# Consulting Project: Predicting Crew members
####Your job is to create a regression model that will help predict how many crew members will be needed for future ships.

####In other words, use the features you think will be useful to predict the value in the Crew column.

In [2]:
appname = "Linear Regression"

# Look into https://spark.apache.org/downloads.html for the latest version
spark_mirror = "https://mirrors.sonic.net/apache/spark"
spark_version = "3.3.1"
hadoop_version = "3"

# Install Java 8 (Spark does not work with newer Java versions)
! apt-get update
! apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download and extract Spark binary distribution
! rm -rf spark-{spark_version}-bin-hadoop{hadoop_version}.tgz spark-{spark_version}-bin-hadoop{hadoop_version}
! wget -q {spark_mirror}/spark-{spark_version}/spark-{spark_version}-bin-hadoop{hadoop_version}.tgz
! tar xzf spark-{spark_version}-bin-hadoop{hadoop_version}.tgz

# The only 2 environment variables needed to set up Java and Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/spark-{spark_version}-bin-hadoop{hadoop_version}"

# Set up the Spark environment based on the environment variable SPARK_HOME 
! pip install -q findspark
import findspark
findspark.init()

# Get the Spark session object (basic entry point for every operation)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(appname).master("local[*]").getOrCreate()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (91.189.91.38)] [Connecting to security.ub0% [Connecting to archive.ubuntu.com (91.189.91.38)] [Connecting to security.ub                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpad.net0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
0% [1 InRelease gpgv 3,62

###Load data and exploratory data analysis

First, we load the dataset from Google Drive.

In [None]:
df = spark.read.format("csv").options(header = True, inferSchema = True).load("./files/cruise_ship_info.csv")

Mounted at /content/drive


In [4]:
df.show(10)
df.printSchema()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|
|    Ecstasy|   Carnival| 22|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Elation|   Carnival| 15|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Fantasy|   Carnival| 23| 

###Preprocessing

We aim to generate linear regression model that predicts the amount of crew required based on different quantitative parameters.

First, we need to convert the string variable Cruise Line into numeric, since this feature plays an important role for crew size prediction. To do this, we will use StringIndexer.



In [9]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Cruise_line", outputCol="indexed_cruise_line", handleInvalid="error")
print(indexer.explainParams())

handleInvalid: how to handle invalid data (unseen or NULL values) in features and label column of string type. Options are 'skip' (filter out rows with invalid data), error (throw an error), or 'keep' (put invalid data in a special additional bucket, at index numLabels). (default: error, current: error)
inputCol: input column name. (current: Cruise_line)
inputCols: input column names. (undefined)
outputCol: output column name. (default: StringIndexer_4fc02fae3061__output, current: indexed_cruise_line)
outputCols: output column names. (undefined)
stringOrderType: How to order labels of string column. The first label after ordering is assigned an index of 0. Supported options: frequencyDesc, frequencyAsc, alphabetDesc, alphabetAsc. Default is frequencyDesc. In case of equal frequency when under frequencyDesc/Asc, the strings are further sorted alphabetically (default: frequencyDesc)


In [17]:
indexer_model = indexer.fit(df)
indx_df = indexer_model.transform(df)
indx_df.show(10)

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+-------------------+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|indexed_cruise_line|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+-------------------+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|               16.0|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|               16.0|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|                1.0|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|                1.0|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|                1.0|
|    Ecstasy|   Carnival| 22|            70.367|     20.52|  8.5

Now, we need to assemble the numeric features that are useful to make the predictions into a single vector using VectorAssembler. In this case, I will not include the features Age and Ship name, since I do not consider them relevant for the prediction.

In [59]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["Tonnage", "passengers", "length", "cabins", "passenger_density", "indexed_cruise_line"],
                            handleInvalid = "skip", outputCol = "features")

df_transf = assembler.transform(indx_df)
df_transf.show(10)


+-----------+-----------+---+------------------+----------+------+------+-----------------+----+-------------------+--------------------+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|indexed_cruise_line|            features|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+-------------------+--------------------+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|               16.0|[30.2769999999999...|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|               16.0|[30.2769999999999...|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|                1.0|[47.262,14.86,7.2...|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|                1.0|[110.0,29.74,9.53...|
|    Destiny|   Carnival| 17|     

Now, we check whether there are Null values in the Crew column since they could cause problems when building the model.

In [60]:
df.filter(df["crew"] == "Null").count()

0

There are no Null values to impute, we can continue building the regression model.

In [66]:
# Train-test data split
data = df_transf.select("features", "crew")
train, test = data.randomSplit([0.7, 0.3])

train.summary().show(), test.summary().show()

+-------+------------------+
|summary|              crew|
+-------+------------------+
|  count|               117|
|   mean|7.6824786324786345|
| stddev| 3.526555393205931|
|    min|              0.59|
|    25%|               5.3|
|    50%|              8.08|
|    75%|              10.0|
|    max|              19.1|
+-------+------------------+

+-------+------------------+
|summary|              crew|
+-------+------------------+
|  count|                41|
|   mean| 8.112926829268291|
| stddev|3.4598173679418682|
|    min|               1.6|
|    25%|              6.12|
|    50%|              8.22|
|    75%|              9.45|
|    max|              21.0|
+-------+------------------+



(None, None)

### Building the model

In [67]:
from pyspark.ml.regression import LinearRegression

# Build and train Linear Regression model
builder = LinearRegression(labelCol="crew", featuresCol="features")
model = builder.fit(train)
print("Model coefficients:", model.coefficients)
print("Explained variance:", model.summary.explainedVariance)
print("Mean squared error:", model.summary.meanSquaredError)


Model coefficients: [-0.0007574060678084888,-0.1189015894206076,0.48252822591345623,0.848270433990954,0.0061867921124471245,0.04336503401652591]
Explained variance: 11.28225643237045
Mean squared error: 1.0480408428140524


In [68]:
# Make predictions of test data
test_pred = model.evaluate(test)
test_pred.predictions.show(10)

# We can also obtain the same predcitions with:
# model.transform(test).show(10)


+--------------------+----+------------------+
|            features|crew|        prediction|
+--------------------+----+------------------+
|[10.0,2.08,4.4,1....| 1.6|1.8116785917635914|
|[14.745,3.08,6.17...| 1.8|3.0264245037662576|
|[28.43,8.08,6.16,...| 4.0| 4.059234210552061|
|[30.2769999999999...| 4.0|3.9253276242036668|
|[30.2769999999999...|3.73|3.4573319296006586|
|[30.2769999999999...|3.55| 4.143785120507842|
|[34.25,10.52,6.15...| 4.7| 4.684238352949244|
|[42.0,14.8,7.13,7...| 6.8| 6.648610422391957|
|[47.225,13.66,6.8...| 6.7|6.0892348487052415|
|[48.563,20.2,6.92...|6.71| 5.992066332533641|
+--------------------+----+------------------+
only showing top 10 rows



###Evaluate accuracy of the linear regression model on test data

In [69]:
# 4. Evaluate the model
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="crew", predictionCol="prediction")
print(evaluator.explainParams())      # default: rmse

labelCol: label column name. (default: label, current: crew)
metricName: metric name in evaluation - one of:
                       rmse - root mean squared error (default)
                       mse - mean squared error
                       r2 - r^2 metric
                       mae - mean absolute error
                       var - explained variance. (default: rmse)
predictionCol: prediction column name. (default: prediction, current: prediction)
throughOrigin: whether the regression is through the origin. (default: False)
weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0. (undefined)


In [70]:
rmse = evaluator.evaluate(test_pred.predictions, {evaluator.metricName: "rmse"})
mse = evaluator.evaluate(test_pred.predictions, {evaluator.metricName: "mse"})
mae = evaluator.evaluate(test_pred.predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(test_pred.predictions, {evaluator.metricName: "r2"})
var = evaluator.evaluate(test_pred.predictions, {evaluator.metricName: "var"})

print(f"Root mean squared error: {rmse}")
print(f"Mean squared error: {mse}")
print(f"Mean absolute error: {mae}")
print(f"R^2 metric : {r2}")
print(f"Explained variance: {var}")

Root mean squared error: 0.7038970007389408
Mean squared error: 0.49547098764927644
Mean absolute error: 0.5752982233684258
R^2 metric : 0.9575736426256201
Explained variance: 9.699935495523055


In [71]:
# The same evaluator can be obtained directly from the LinearRegressionSummary

print(f"Root mean squared error: {test_pred.rootMeanSquaredError}")
print(f"Mean squared error: {test_pred.meanSquaredError}")
print(f"Mean absolute error: {test_pred.meanAbsoluteError}")
print(f"R^2 metric : {test_pred.r2}")
print(f"Explained variance: {test_pred.explainedVariance}")


Root mean squared error: 0.7038970007389408
Mean squared error: 0.49547098764927644
Mean absolute error: 0.5752982233684258
R^2 metric : 0.9575736426256201
Explained variance: 9.699935495523055
