In [112]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Our task is to develop a regression model that will predict the number of  crew members required for future ships from the given features. 

### Read the data Crew.csv into spark dataframe
- inferSchema=True and header=True.
- Print the schema and show the first few rows.
- Use df.describe() to see the statistical properties of the data.

In [113]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import StringIndexer ,OneHotEncoder,VectorAssembler 
spark = SparkSession.builder.getOrCreate()

In [114]:
df = spark.read.csv('Crew.csv',inferSchema=True , header=True)

In [115]:
df.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [116]:
df.show(20)

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|
|    Ecstasy|   Carnival| 22|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Elation|   Carnival| 15|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Fantasy|   Carnival| 23| 

In [117]:
df.describe().show()

+-------+---------+-----------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+
|summary|Ship_name|Cruise_line|               Age|           Tonnage|       passengers|           length|            cabins|passenger_density|             crew|
+-------+---------+-----------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+
|  count|      158|        158|               158|               158|              158|              158|               158|              158|              158|
|   mean| Infinity|       null|15.689873417721518| 71.28467088607599|18.45740506329114|8.130632911392404| 8.830000000000005|39.90094936708861|7.794177215189873|
| stddev|     null|       null| 7.615691058751413|37.229540025907866|9.677094775143416|1.793473548054825|4.4714172221480615| 8.63921711391542|3.503486564627034|
|    min|Adventure|    Azamara|   

### StringIndexer and OneHotEncoder 
- Create StringIndexer and OneHotEncoder to process the data.
- StringIndexer is for any string data type.
- OneHotEncoder will be applied to the StringIndexer columns.
- Convert all obtained columns from OneHotEncoder and the other numeric columns into a feature column (use VectorAssembler) 

In [118]:
N_Col=[x for (x,y) in df.dtypes if y!='string' and x!='crew']
C_Col=[x for (x,y) in df.dtypes if y=='string']

In [119]:
C_Col

['Ship_name', 'Cruise_line']

In [120]:
N_Col

['Age', 'Tonnage', 'passengers', 'length', 'cabins', 'passenger_density']

In [121]:
in_out_col=[x+'_Index' for x in C_Col ]
in_out_col

['Ship_name_Index', 'Cruise_line_Index']

In [122]:
H_out_col=[x+'_OHE' for x in C_Col ]
H_out_col

['Ship_name_OHE', 'Cruise_line_OHE']

In [123]:
assem_in=H_out_col+N_Col
assem_in

['Ship_name_OHE',
 'Cruise_line_OHE',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density']

In [124]:
stringIn=StringIndexer(inputCols=C_Col,outputCols=in_out_col,handleInvalid='skip')

In [125]:
oneHCoder=OneHotEncoder(inputCols=in_out_col,outputCols=H_out_col)

In [126]:
vecassem=VectorAssembler(inputCols=assem_in,outputCol='features')

### Divide the data into Train/Test

In [127]:
traind,testd =df.randomSplit([0.7,0.3],seed=42)

### Create a Linear Regression Model 

In [128]:
from pyspark.ml.regression import LinearRegression
lr=LinearRegression(featuresCol='features',labelCol='crew',predictionCol='prediction')

### Create a Pipeline model

In [129]:
piplin=Pipeline(stages=[stringIn,oneHCoder,vecassem,lr])

### Fit the Pipeline model to the trainig data

In [130]:
piplineModel=piplin.fit(traind)
pred=piplineModel.transform(traind)
pred.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)
 |-- Ship_name_Index: double (nullable = false)
 |-- Cruise_line_Index: double (nullable = false)
 |-- Ship_name_OHE: vector (nullable = true)
 |-- Cruise_line_OHE: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [131]:
pred.select('crew','prediction').show()

+-----+------------------+
| crew|        prediction|
+-----+------------------+
|11.85|11.859115210442312|
|  4.0|3.9938453621421974|
| 8.69| 8.694404097750937|
| 0.59|0.5895337450928255|
|  7.0| 6.998990972466501|
|  9.2| 9.201308992837903|
| 11.0|   11.003613422078|
|  6.7| 6.689256805669288|
| 8.58|  8.57865788768911|
| 9.99| 9.993344203424568|
|  9.0| 9.002525069782441|
|  4.7| 4.690042135142349|
| 11.0|10.999826946741864|
| 10.0| 9.996875323092382|
|  9.2| 9.192620186516647|
|  9.2| 9.194517606384647|
| 12.0|11.999704489819475|
| 6.36| 6.354229425232944|
| 1.46|1.4547302962870243|
|  9.2| 9.193433366460065|
+-----+------------------+
only showing top 20 rows



### Make a prediction for the same training data and evaluate the model performance using RMSE and r2

In [132]:
reg_r2 = RegressionEvaluator(predictionCol='prediction',labelCol='crew',metricName='r2')
reg_rmse = RegressionEvaluator(predictionCol='prediction',labelCol='crew',metricName='rmse' )

In [133]:
pred1=piplineModel.transform(traind)

In [134]:
reg_r2.evaluate(pred1)

0.9999168731626313

In [135]:
reg_rmse.evaluate(pred1)

0.030567488828054396

### Make a prediction for the test data and evaluate the model performance using RMSE and r2

In [136]:
pred2=piplineModel.transform(testd)

In [137]:
reg_r2.evaluate(pred2)

0.2396721911544757

In [138]:
reg_rmse.evaluate(pred2)

3.894164672579743