# Import libraries

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Linear Regression Model").getOrCreate()

In [2]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import IndexToString, StringIndexer

# Load and verify data

In [3]:
data = spark.read.csv('resources/cruise_ship_info.csv',header = True, inferSchema = True)

In [4]:
data.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [5]:
data.head(3)

[Row(Ship_name='Journey', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55),
 Row(Ship_name='Quest', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55),
 Row(Ship_name='Celebration', Cruise_line='Carnival', Age=26, Tonnage=47.262, passengers=14.86, length=7.22, cabins=7.43, passenger_density=31.8, crew=6.7)]

In [6]:
for item in data.head(1)[0]:
    print(item)

Journey
Azamara
6
30.276999999999997
6.94
5.94
3.55
42.64
3.55


In [7]:
data.columns

['Ship_name',
 'Cruise_line',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew']

In [8]:
data.groupBy('Cruise_line').count().show()

+-----------------+-----+
|      Cruise_line|count|
+-----------------+-----+
|            Costa|   11|
|              P&O|    6|
|           Cunard|    3|
|Regent_Seven_Seas|    5|
|              MSC|    8|
|         Carnival|   22|
|          Crystal|    2|
|           Orient|    1|
|         Princess|   17|
|        Silversea|    4|
|         Seabourn|    3|
| Holland_American|   14|
|         Windstar|    3|
|           Disney|    2|
|        Norwegian|   13|
|          Oceania|    3|
|          Azamara|    2|
|        Celebrity|   10|
|             Star|    6|
|  Royal_Caribbean|   23|
+-----------------+-----+



In [9]:
data.groupBy('Ship_name').count().show()

+------------+-----+
|   Ship_name|count|
+------------+-----+
|       Virgo|    1|
|     Fortuna|    1|
|      Shadow|    1|
|     Empress|    1|
|        Wind|    2|
|    Paradise|    1|
|        Surf|    1|
|      Wonder|    1|
|       Magic|    1|
|    Symphony|    1|
|    Sinfonia|    1|
| Inspiration|    1|
|   Millenium|    1|
|    Solstice|    1|
|PrideofAloha|    1|
|     Majesty|    2|
|     Ventura|    1|
|   Romantica|    1|
|      Spirit|    4|
|       Oasis|    1|
+------------+-----+
only showing top 20 rows



# Data Preprocessing

In [10]:
indexer = StringIndexer(inputCols=["Ship_name","Cruise_line"], outputCols=["Ship_name_Index","Cruise_line_Index"])
indexed = indexer.fit(data).transform(data)
indexed.show()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+---------------+-----------------+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|Ship_name_Index|Cruise_line_Index|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+---------------+-----------------+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|           64.0|             16.0|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|           98.0|             16.0|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|           27.0|              1.0|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|           31.0|              1.0|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|          

In [11]:
indexed.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)
 |-- Ship_name_Index: double (nullable = false)
 |-- Cruise_line_Index: double (nullable = false)



In [12]:
indexed.columns

['Ship_name',
 'Cruise_line',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew',
 'Ship_name_Index',
 'Cruise_line_Index']

In [13]:
assembler = VectorAssembler(inputCols =['Age','Tonnage','passengers','length', 'cabins', 'passenger_density', 'Ship_name_Index', 'Cruise_line_Index'],
                            outputCol='features')

In [14]:
output = assembler.transform(indexed)

In [15]:
output.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)
 |-- Ship_name_Index: double (nullable = false)
 |-- Cruise_line_Index: double (nullable = false)
 |-- features: vector (nullable = true)



In [16]:
print(output.features)

Column<b'features'>


In [17]:
output.head(1)

[Row(Ship_name='Journey', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55, Ship_name_Index=64.0, Cruise_line_Index=16.0, features=DenseVector([6.0, 30.277, 6.94, 5.94, 3.55, 42.64, 64.0, 16.0]))]

In [18]:
final_data = output.select('features','crew')

In [19]:
final_data.show()

+--------------------+----+
|            features|crew|
+--------------------+----+
|[6.0,30.276999999...|3.55|
|[6.0,30.276999999...|3.55|
|[26.0,47.262,14.8...| 6.7|
|[11.0,110.0,29.74...|19.1|
|[17.0,101.353,26....|10.0|
|[22.0,70.367,20.5...| 9.2|
|[15.0,70.367,20.5...| 9.2|
|[23.0,70.367,20.5...| 9.2|
|[19.0,70.367,20.5...| 9.2|
|[6.0,110.23899999...|11.5|
|[10.0,110.0,29.74...|11.6|
|[28.0,46.052,14.5...| 6.6|
|[18.0,70.367,20.5...| 9.2|
|[17.0,70.367,20.5...| 9.2|
|[11.0,86.0,21.24,...| 9.3|
|[8.0,110.0,29.74,...|11.6|
|[9.0,88.5,21.24,9...|10.3|
|[15.0,70.367,20.5...| 9.2|
|[12.0,88.5,21.24,...| 9.3|
|[20.0,70.367,20.5...| 9.2|
+--------------------+----+
only showing top 20 rows



# Train Test split

In [20]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [21]:
train_data.describe().show()

+-------+------------------+
|summary|              crew|
+-------+------------------+
|  count|               109|
|   mean| 7.854403669724782|
| stddev|3.5383427359462307|
|    min|              0.59|
|    max|              19.1|
+-------+------------------+



In [22]:
test_data.describe().show()

+-------+------------------+
|summary|              crew|
+-------+------------------+
|  count|                49|
|   mean| 7.660204081632652|
| stddev|3.4570667297796907|
|    min|              0.88|
|    max|              21.0|
+-------+------------------+



# Build Model

In [23]:
regressor = LinearRegression(labelCol='crew')

In [24]:
model = regressor.fit(train_data)

# Evaluate Model

In [25]:
pred_data = model.evaluate(test_data)

In [26]:
pred_data.residuals.show()

+--------------------+
|           residuals|
+--------------------+
|  0.4040840736170246|
| -1.2608900197831048|
|  0.5248812915064249|
|-0.16388177003419724|
|0.002477372605037...|
|  0.7031699534731093|
| -0.3980479349030617|
|  1.0819922223380498|
| -0.2848749029174851|
|  0.3039217584108158|
| -0.8129717616006129|
| -0.6061917413397513|
|  0.8540660191018592|
| -1.1300540213116879|
| -0.5766931221980247|
| -1.3996607583041394|
|  0.7843680856910407|
|   3.285233784177686|
|  -0.693427098474773|
| -1.2094475547328596|
+--------------------+
only showing top 20 rows



In [27]:
pred_data.rootMeanSquaredError

1.0414459811608836

In [28]:
pred_data.r2

0.9073569564265022

In [29]:
pred_data.meanSquaredError

1.0846097316761556

In [30]:
pred_data.meanAbsoluteError

0.7140562145356989

In [31]:
from pyspark.sql import functions as f
data.select(f.corr('crew','passengers')).show()

+----------------------+
|corr(crew, passengers)|
+----------------------+
|    0.9152341306065384|
+----------------------+



In [32]:
data.select(f.corr('crew','cabins')).show()

+------------------+
|corr(crew, cabins)|
+------------------+
|0.9508226063578497|
+------------------+



In [33]:
unlabeled_data = test_data.select('features')
test_predictions = model.transform(unlabeled_data)

In [34]:
test_predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[4.0,220.0,54.0,1...|20.595915926382975|
|[5.0,86.0,21.04,9...| 9.260890019783105|
|[5.0,115.0,35.74,...|11.675118708493574|
|[5.0,122.0,28.5,1...| 6.863881770034197|
|[9.0,59.058,17.0,...| 7.397522627394963|
|[9.0,88.5,21.24,9...| 9.596830046526891|
|[9.0,105.0,27.2,8...|11.078047934903061|
|[9.0,113.0,26.74,...|11.298007777661951|
|[10.0,91.62700000...| 9.284874902917485|
|[11.0,58.6,15.66,...| 7.296078241589184|
|[11.0,85.0,18.48,...| 8.812971761600613|
|[11.0,90.09,25.01...| 9.086191741339752|
|[12.0,77.104,20.0...|  8.73593398089814|
|[12.0,138.0,31.14...|12.980054021311688|
|[13.0,61.0,13.8,7...| 6.576693122198025|
|[13.0,63.0,14.4,7...| 6.709660758304139|
|[13.0,91.0,20.32,...|  9.20563191430896|
|[14.0,76.8,19.6,8...| 8.714766215822314|
|[14.0,77.104,20.0...| 8.693427098474773|
|[14.0,138.0,31.14...| 12.96944755473286|
+--------------------+------------