In [1]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler

In [2]:
cruise_data = spark.read.csv("/FileStore/tables/cruise_ship_info.csv", inferSchema=True, header = True)
cruise_data.show()

In [3]:
cruise_data.printSchema()

In [4]:
#check if the predictor variables are highly correlated with the independent variable (crew)

from pyspark.sql.functions import corr

cruise_data.select(corr('Tonnage', 'crew')).show()
cruise_data.select(corr('Age', 'crew')).show()
cruise_data.select(corr('passengers', 'crew')).show()
cruise_data.select(corr('length', 'crew')).show()
cruise_data.select(corr('cabins', 'crew')).show()

In [5]:
#StringIndexer to convert categorical variable to numeric
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol='Cruise_line', outputCol='Cruise_line_index')
indexed_data = indexer.fit(cruise_data).transform(cruise_data)
indexed_data.show()

In [6]:
indexed_data.columns

In [7]:
#To create additional column with all the features as a dense vector
assembler = VectorAssembler(inputCols=['Cruise_line_index', 'Age', 'Tonnage', 'passengers','length','cabins','passenger_density'], outputCol='features')
vectorized_data = assembler.transform(indexed_data)
vectorized_data.show()

In [8]:
final_data = vectorized_data.select('features', 'crew')
final_data.show()

In [9]:
#train and test split

train_data, test_data = final_data.randomSplit([0.7, 0.3])
print("Train data:", train_data.describe().show())
print("Test data:", test_data.describe().show())

In [10]:
lr = LinearRegression(labelCol='crew')
lr_model = lr.fit(train_data)
test_model = lr_model.evaluate(test_data)

In [11]:
test_model.rootMeanSquaredError

In [12]:
test_model.r2

In [13]:
unlabeled_data = test_data.select('features')
predictions = lr_model.transform(unlabeled_data)
predictions.show()