In [1]:
# SETTING UP PYSPARK ENVIRONMENT

In [2]:
!pip install pyarrow==0.15.1

Collecting pyarrow==0.15.1
  Downloading pyarrow-0.15.1-cp37-cp37m-manylinux2010_x86_64.whl (59.2 MB)
[K     |████████████████████████████████| 59.2 MB 1.8 MB/s 
Installing collected packages: pyarrow
  Attempting uninstall: pyarrow
    Found existing installation: pyarrow 6.0.1
    Uninstalling pyarrow-6.0.1:
      Successfully uninstalled pyarrow-6.0.1
Successfully installed pyarrow-0.15.1


In [3]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz
!tar xf spark-3.0.3-bin-hadoop3.2.tgz
!pip install -q findspark

In [4]:
!ls /usr/lib/jvm/

default-java		   java-11-openjdk-amd64     java-8-openjdk-amd64
java-1.11.0-openjdk-amd64  java-1.8.0-openjdk-amd64


In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop3.2"

In [6]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [7]:
import sys
from pyspark.sql.functions import *

In [8]:
df=spark.read.csv("/content/cruise_ship_info.csv",inferSchema=True,header=True) # imported the dataset

In [9]:
df.show()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|
|    Ecstasy|   Carnival| 22|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Elation|   Carnival| 15|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Fantasy|   Carnival| 23| 

In [10]:
df.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [11]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [12]:
feat=VectorAssembler(inputCols=["Age","Tonnage","passengers","length","cabins"],outputCol="independent")

In [13]:
output=feat.transform(df) # actual transformation 

In [14]:
output.show()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+--------------------+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|         independent|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+--------------------+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|[6.0,30.276999999...|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|[6.0,30.276999999...|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|[26.0,47.262,14.8...|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|[11.0,110.0,29.74...|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|[17.0,101.353,26....|
|    Ecstasy|   Carnival| 22|            70.367|     20.

In [15]:
final_df=output.select("independent","crew") # final dataset for training & testing 

In [16]:
final_df.show()

+--------------------+----+
|         independent|crew|
+--------------------+----+
|[6.0,30.276999999...|3.55|
|[6.0,30.276999999...|3.55|
|[26.0,47.262,14.8...| 6.7|
|[11.0,110.0,29.74...|19.1|
|[17.0,101.353,26....|10.0|
|[22.0,70.367,20.5...| 9.2|
|[15.0,70.367,20.5...| 9.2|
|[23.0,70.367,20.5...| 9.2|
|[19.0,70.367,20.5...| 9.2|
|[6.0,110.23899999...|11.5|
|[10.0,110.0,29.74...|11.6|
|[28.0,46.052,14.5...| 6.6|
|[18.0,70.367,20.5...| 9.2|
|[17.0,70.367,20.5...| 9.2|
|[11.0,86.0,21.24,...| 9.3|
|[8.0,110.0,29.74,...|11.6|
|[9.0,88.5,21.24,9...|10.3|
|[15.0,70.367,20.5...| 9.2|
|[12.0,88.5,21.24,...| 9.3|
|[20.0,70.367,20.5...| 9.2|
+--------------------+----+
only showing top 20 rows



In [17]:
train,test=final_df.randomSplit([0.75,0.25]) # Split 75% training 25% testing 

In [18]:
train.show()

+--------------------+-----+
|         independent| crew|
+--------------------+-----+
|[4.0,220.0,54.0,1...| 21.0|
|[5.0,115.0,35.74,...| 12.2|
|[5.0,122.0,28.5,1...|  6.7|
|[5.0,160.0,36.34,...| 13.6|
|[6.0,30.276999999...| 3.55|
|[6.0,90.0,20.0,9....|  9.0|
|[6.0,93.0,23.94,9...|11.09|
|[6.0,110.23899999...| 11.5|
|[6.0,113.0,37.82,...| 12.0|
|[6.0,158.0,43.7,1...| 13.6|
|[7.0,89.6,25.5,9....| 9.87|
|[7.0,116.0,31.0,9...| 12.0|
|[7.0,158.0,43.7,1...| 13.6|
|[8.0,77.499,19.5,...|  9.0|
|[8.0,91.0,22.44,9...| 11.0|
|[8.0,110.0,29.74,...| 11.6|
|[9.0,59.058,17.0,...|  7.4|
|[9.0,81.0,21.44,9...| 10.0|
|[9.0,88.5,21.24,9...| 10.3|
|[9.0,90.09,25.01,...| 8.69|
+--------------------+-----+
only showing top 20 rows



In [19]:
test.show()

+--------------------+-----+
|         independent| crew|
+--------------------+-----+
|[5.0,86.0,21.04,9...|  8.0|
|[5.0,133.5,39.59,...|13.13|
|[6.0,30.276999999...| 3.55|
|[6.0,112.0,38.0,9...| 10.9|
|[9.0,85.0,19.68,9...| 8.69|
|[9.0,113.0,26.74,...|12.38|
|[10.0,68.0,10.8,7...| 6.36|
|[10.0,91.62700000...|  9.0|
|[10.0,105.0,27.2,...|10.68|
|[11.0,86.0,21.24,...|  9.3|
|[11.0,90.09,25.01...| 8.48|
|[11.0,91.62700000...|  9.0|
|[12.0,77.104,20.0...| 9.59|
|[12.0,90.09,25.01...| 8.68|
|[12.0,108.865,27....| 11.0|
|[13.0,63.0,14.4,7...| 5.31|
|[14.0,33.0,4.9,5....| 3.24|
|[15.0,70.367,20.5...|  9.2|
|[15.0,75.33800000...| 13.0|
|[16.0,19.2,3.2,5....| 2.11|
+--------------------+-----+
only showing top 20 rows



In [20]:
# Model Building
  # Linear Regression

In [21]:
from pyspark.ml.regression import LinearRegression

In [22]:
reg=LinearRegression(featuresCol="independent",labelCol="crew")

In [23]:
reg=reg.fit(train)   # model training

In [24]:
reg.coefficients   # all m values

DenseVector([-0.0176, 0.012, -0.1388, 0.4076, 0.7792])

In [25]:
reg.intercept  # Y intercept value

-0.4250248679158688

In [26]:
# Determining the Accuracy

In [27]:
train_sum=reg.summary

In [28]:
train_sum.rootMeanSquaredError

0.9760509809149426

In [29]:
train_sum.r2 # r2 score 

0.9229619282044998

In [30]:
train_sum.r2adj # adjustd r2 score 

0.9196124468220868

In [31]:
pred=reg.evaluate(test) # Prediction for testing data 

In [32]:
pred.predictions.show()

+--------------------+-----+------------------+
|         independent| crew|        prediction|
+--------------------+-----+------------------+
|[5.0,86.0,21.04,9...|  8.0|  9.37932726181633|
|[5.0,133.5,39.59,...|13.13|12.807711130772383|
|[6.0,30.276999999...| 3.55|  4.05752104404188|
|[6.0,112.0,38.0,9...| 10.9| 11.10594476782108|
|[9.0,85.0,19.68,9...| 8.69| 9.185500991628984|
|[9.0,113.0,26.74,...|12.38|11.358015708139884|
|[10.0,68.0,10.8,7...| 6.36| 6.223344335232164|
|[10.0,91.62700000...|  9.0| 9.380856877022508|
|[10.0,105.0,27.2,...|10.68|11.079747824039039|
|[11.0,86.0,21.24,...|  9.3| 9.667681580150921|
|[11.0,90.09,25.01...| 8.48| 9.096014205893786|
|[11.0,91.62700000...|  9.0| 9.363255087329733|
|[12.0,77.104,20.0...| 9.59| 8.788753767595887|
|[12.0,90.09,25.01...| 8.68| 9.078412416201012|
|[12.0,108.865,27....| 11.0|10.850592299217322|
|[13.0,63.0,14.4,7...| 5.31| 6.882324182791856|
|[14.0,33.0,4.9,5....| 3.24|3.2369309407910736|
|[15.0,70.367,20.5...|  9.2| 8.741717914

In [33]:
from pyspark.ml.evaluation import RegressionEvaluator # Evaluate Regression model

In [34]:
pred_eval=RegressionEvaluator(predictionCol="prediction",labelCol="crew",metricName="r2")

In [35]:
pred_eval.evaluate(pred.predictions)

0.9260766184395052

In [36]:
# So the model is 92.60% accurate by linear regressor

In [37]:
# Random Forest

In [38]:
from pyspark.ml.regression import RandomForestRegressor

In [39]:
rf = RandomForestRegressor(featuresCol="independent",labelCol="crew" )

In [40]:
rfmodel = rf.fit(train)

In [41]:
# Make predictions.
predictions = rfmodel.transform(test)

In [42]:
evaluator = RegressionEvaluator(
    labelCol="crew", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print ("r2 on test data = %g" % r2)

r2 on test data = 0.931353


In [44]:
# here Random Forest Shows more accuracy than that of Linear Regression