## ASSIGNMENT 2

### Create a Regression Model
Create a regression model that will predict how many crew members will be needed for future ships


## Import the Spark and MlLib Libraries

In [2]:
from pyspark import SparkConf,SparkContext
from pyspark.sql import SQLContext,SparkSession
from pyspark.ml.regression import LinearRegression

## Load Dataset

In [3]:
spark=SparkSession.builder.appName("Cruise").getOrCreate()

cruise_df=spark.read.option("header",'True').option('delimiter',',').csv("cruise_line_info.csv")

In [4]:
#viewing the dataframe
cruise_df.take(1)

[Row(Ship_name='Journey', Cruise_line='Azamara', Age='6', Tonnage='30.277', passengers='6.94', length='5.94', cabins='3.55', passenger_density='42.64', crew='3.55')]

## Data Exploration

Print Schema in a tree format

In [5]:
cruise_df.cache()
cruise_df.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Tonnage: string (nullable = true)
 |-- passengers: string (nullable = true)
 |-- length: string (nullable = true)
 |-- cabins: string (nullable = true)
 |-- passenger_density: string (nullable = true)
 |-- crew: string (nullable = true)



Perform Descriptive analytics

In [6]:
cruise_df.describe().toPandas().transpose() 

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Ship_name,158,Infinity,,Adventure,Zuiderdam
Cruise_line,158,,,Azamara,Windstar
Age,158,15.689873417721518,7.615691058751413,10,9
Tonnage,158,71.28467088607599,37.229540025907866,10,93
passengers,158,18.45740506329114,9.677094775143416,0.66,9.52
length,158,8.130632911392404,1.793473548054825,10.2,9.65
cabins,158,8.830000000000005,4.4714172221480615,0.33,9.87
passenger_density,158,39.90094936708861,8.63921711391542,17.7,71.43
crew,158,7.794177215189873,3.503486564627034,0.59,9.99


## Preparing data for Prediction

In [7]:
# Cast Course_Fees from integer type to float type 
cruise_df2 = cruise_df.withColumn("Age", cruise_df["Age"].cast('int')).withColumn("Tonnage",cruise_df["Tonnage"].cast("float")) \
.withColumn("passengers",cruise_df["passengers"].cast("float")).withColumn("length",cruise_df["length"].cast("float")) \
.withColumn("cabins",cruise_df["cabins"].cast("float")).withColumn("passenger_density",cruise_df["passenger_density"].cast("float")) \
.withColumn("crew", cruise_df["crew"].cast('float'))

In [9]:
#print the tree schema
cruise_df2.cache()
cruise_df2.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: float (nullable = true)
 |-- passengers: float (nullable = true)
 |-- length: float (nullable = true)
 |-- cabins: float (nullable = true)
 |-- passenger_density: float (nullable = true)
 |-- crew: float (nullable = true)



In [10]:
#import vector Assembler
from pyspark.ml.feature import VectorAssembler

input_cols = ['Age', 'Tonnage', 'passengers', 'length', 'cabins', 'passenger_density']

vectorAssembler = VectorAssembler(inputCols=input_cols,outputCol='features')
vcruise_df = vectorAssembler.transform(cruise_df2)
vcruise_df = vcruise_df.select(['features','crew'])
vcruise_df.show(5)



+--------------------+----+
|            features|crew|
+--------------------+----+
|[6.0,30.277000427...|3.55|
|[6.0,30.277000427...|3.55|
|[26.0,47.26200103...| 6.7|
|[11.0,110.0,29.73...|19.1|
|[17.0,101.3529968...|10.0|
+--------------------+----+
only showing top 5 rows



## Spliting the Transformed  Data

In [11]:
#spliting the data into training and testing sets
splits = vcruise_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [12]:
test_df.show()

+--------------------+-----+
|            features| crew|
+--------------------+-----+
|[5.0,133.5,39.590...|13.13|
|[5.0,160.0,36.340...| 13.6|
|[6.0,90.0,20.0,9....|  9.0|
|[6.0,110.23899841...| 11.5|
|[6.0,158.0,43.700...| 13.6|
|[7.0,158.0,43.700...| 13.6|
|[8.0,91.0,22.4400...| 11.0|
|[8.0,110.0,29.739...| 11.6|
|[9.0,90.089996337...| 8.69|
|[9.0,105.0,27.200...|10.68|
|[10.0,46.0,7.0,6....| 4.47|
|[10.0,77.0,20.159...|  9.0|
|[10.0,110.0,29.73...| 11.6|
|[11.0,85.0,18.479...|  8.0|
|[11.0,90.0,22.399...| 11.0|
|[11.0,110.0,29.73...| 19.1|
|[12.0,25.0,3.8800...| 2.87|
|[12.0,77.10399627...| 9.59|
|[12.0,88.5,21.239...|  9.3|
|[12.0,91.0,20.319...| 9.99|
+--------------------+-----+
only showing top 20 rows



## Model Training and Prediction

In [13]:
lr = LinearRegression(featuresCol = 'features', labelCol='crew', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [0.0,0.015481269317929005,0.0,0.4236953185227631,0.4124017227861518,0.0]
Intercept: -0.38371407416748227


### Summarize the model and print the training summary

In [14]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 0.902518
r2: 0.929520


R Squared at 0.93 indicates 93% of variability in 'crew'

In [15]:
train_df.describe().show()

+-------+------------------+
|summary|              crew|
+-------+------------------+
|  count|               106|
|   mean| 7.809339609910857|
| stddev|3.4157166980743097|
|    min|              0.59|
|    max|              21.0|
+-------+------------------+



## Prediction  of Crew for the Test Data

In [16]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","crew","features").show()

from pyspark.ml.evaluation import RegressionEvaluator

lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="crew",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+-----+--------------------+
|        prediction| crew|            features|
+------------------+-----+--------------------+
|13.065041888641995|13.13|[5.0,133.5,39.590...|
|14.298120244674335| 13.6|[5.0,160.0,36.340...|
| 9.337636892207433|  9.0|[6.0,90.0,20.0,9....|
|11.484681696366335| 11.5|[6.0,110.23899841...|
|14.252129821597116| 13.6|[6.0,158.0,43.700...|
|14.197049381701076| 13.6|[7.0,158.0,43.700...|
| 9.740888535665547| 11.0|[8.0,91.0,22.4400...|
|11.480981697566794| 11.6|[8.0,110.0,29.739...|
| 9.598617011921391| 8.69|[9.0,90.089996337...|
|10.604874911471585|10.68|[9.0,105.0,27.200...|
|3.9177540248484233| 4.47|[10.0,46.0,7.0,6....|
| 8.456092564822514|  9.0|[10.0,77.0,20.159...|
|11.480981697566794| 11.6|[10.0,110.0,29.73...|
| 8.772128218136901|  8.0|[11.0,85.0,18.479...|
| 9.717159043109367| 11.0|[11.0,90.0,22.399...|
|11.493579505441355| 19.1|[11.0,110.0,29.73...|
| 3.332837987269773| 2.87|[12.0,25.0,3.8800...|
| 8.552215951024069| 9.59|[12.0,77.10399

In [17]:
#print RMSE for test data
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 1.29024


In [18]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 10
objectiveHistory: [0.5000000000000004, 0.4231538672963231, 0.11358701388073573, 0.11160100233876376, 0.10754283733636905, 0.10674764595422753, 0.10579947526720045, 0.1033690392974422, 0.10344550525945101, 0.10277190265081265, 0.10275484491293874]
+--------------------+
|           residuals|
+--------------------+
|  1.8349097733594917|
|  -1.128208840080843|
|   0.672115777937087|
| -2.0149733697147063|
| -0.5157886837647272|
| -0.5157886837647272|
|  1.0088477861968705|
| -0.6655568888302099|
| 0.18389336718849947|
|-0.46324157860027704|
| 0.13744955923471203|
|  0.5361822732837247|
| 0.13121141444544726|
|  0.8065507812672035|
|-0.26177864243119764|
|  0.8537297156100383|
| 0.11478163374154349|
|  1.4711771930995656|
|  1.4711771930995656|
| 0.19732186094010373|
+--------------------+
only showing top 20 rows



### Using our Linear Regression model to make some predictions:

In [19]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","crew","features").show()

+------------------+-----+--------------------+
|        prediction| crew|            features|
+------------------+-----+--------------------+
|13.065041888641995|13.13|[5.0,133.5,39.590...|
|14.298120244674335| 13.6|[5.0,160.0,36.340...|
| 9.337636892207433|  9.0|[6.0,90.0,20.0,9....|
|11.484681696366335| 11.5|[6.0,110.23899841...|
|14.252129821597116| 13.6|[6.0,158.0,43.700...|
|14.197049381701076| 13.6|[7.0,158.0,43.700...|
| 9.740888535665547| 11.0|[8.0,91.0,22.4400...|
|11.480981697566794| 11.6|[8.0,110.0,29.739...|
| 9.598617011921391| 8.69|[9.0,90.089996337...|
|10.604874911471585|10.68|[9.0,105.0,27.200...|
|3.9177540248484233| 4.47|[10.0,46.0,7.0,6....|
| 8.456092564822514|  9.0|[10.0,77.0,20.159...|
|11.480981697566794| 11.6|[10.0,110.0,29.73...|
| 8.772128218136901|  8.0|[11.0,85.0,18.479...|
| 9.717159043109367| 11.0|[11.0,90.0,22.399...|
|11.493579505441355| 19.1|[11.0,110.0,29.73...|
| 3.332837987269773| 2.87|[12.0,25.0,3.8800...|
| 8.552215951024069| 9.59|[12.0,77.10399