## Multiple Linear Regression in Spark MLLIB

Reference - https://github.com/DeltaOptimist/Multiple_Linear_Regression_PySpark/blob/main/Pyspark_Linear_Regression.ipynb

In [None]:
!pip install pyspark



In [None]:
import pyspark
from pyspark.sql import SparkSession
#SparkSession is now the entry point of Spark
#SparkSession can also be construed as gateway to spark libraries

#Creating an instance of the Spark Class
spark = SparkSession.builder.appName("cruise_ship_model").getOrCreate()

#Creating a dataframe of the csv file
df = spark.read.csv("cruise_ship_info.csv", inferSchema = True, header = True)
df.show(10)

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|
|    Ecstasy|   Carnival| 22|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Elation|   Carnival| 15|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Fantasy|   Carnival| 23| 

In [None]:
df.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [None]:
df.columns

['Ship_name',
 'Cruise_line',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew']

The following are the features identified from the dataframe:
1. Cruise Line
2. age
3. Tonnage
4. Passengers
5. Length
6. Cabins
7. Passenger Density
8. Ship name

To work on the features, spark MLLib expects all the features to be in numeric format.
Using StringIndexer we will convert the categorical features to a numeric format.

In [None]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol = "Cruise_line", outputCol = "cruise_cat")
indexed = indexer.fit(df).transform(df)

indexer = StringIndexer(inputCol = "Ship_name", outputCol = "ship_cat")
indexed2 = indexer.fit(indexed).transform(indexed)

In [None]:
for item in indexed2.head(5):
  print(item, end = "\n\n")

Row(Ship_name='Journey', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55, cruise_cat=16.0, ship_cat=64.0)

Row(Ship_name='Quest', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55, cruise_cat=16.0, ship_cat=98.0)

Row(Ship_name='Celebration', Cruise_line='Carnival', Age=26, Tonnage=47.262, passengers=14.86, length=7.22, cabins=7.43, passenger_density=31.8, crew=6.7, cruise_cat=1.0, ship_cat=27.0)

Row(Ship_name='Conquest', Cruise_line='Carnival', Age=11, Tonnage=110.0, passengers=29.74, length=9.53, cabins=14.88, passenger_density=36.99, crew=19.1, cruise_cat=1.0, ship_cat=31.0)

Row(Ship_name='Destiny', Cruise_line='Carnival', Age=17, Tonnage=101.353, passengers=26.42, length=8.92, cabins=13.21, passenger_density=38.36, crew=10.0, cruise_cat=1.0, ship_cat=34.0)



In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

#Creating vectors from the features, since MLLib takes inputs in vector form
assembler = VectorAssembler(inputCols = ['Age','Tonnage','passengers','length', 'cabins','passenger_density','cruise_cat','ship_cat'], outputCol="features")
output = assembler.transform(indexed2)
output.select('features','crew').show(5)

+--------------------+----+
|            features|crew|
+--------------------+----+
|[6.0,30.276999999...|3.55|
|[6.0,30.276999999...|3.55|
|[26.0,47.262,14.8...| 6.7|
|[11.0,110.0,29.74...|19.1|
|[17.0,101.353,26....|10.0|
+--------------------+----+
only showing top 5 rows



In [None]:
#Final data will consist of the features and label which would be the crew. We also split the data into training and testing
final_data = output.select('features','crew')
train_data, test_data = final_data.randomSplit([0.7,0.3])
train_data.describe().show()

+-------+-----------------+
|summary|             crew|
+-------+-----------------+
|  count|              107|
|   mean|7.882897196261694|
| stddev|3.639836718643886|
|    min|             0.59|
|    max|             21.0|
+-------+-----------------+



In [None]:
test_data.describe().show()

+-------+------------------+
|summary|              crew|
+-------+------------------+
|  count|                51|
|   mean| 7.608039215686273|
| stddev|3.2253458850844776|
|    min|              0.59|
|    max|              13.0|
+-------+------------------+



In [None]:
from pyspark.ml.regression import LinearRegression

#Creating an object of LinearRegression, which will take the features and labels as arguments
ship_lr = LinearRegression(featuresCol ='features', labelCol = 'crew')

#Passing training data to the model
trained_model = ship_lr.fit(train_data)

#Evaluating the results on the training data using Rsquare as the metric
ship_results = trained_model.evaluate(train_data)

print("R2 score: ", ship_results.r2)

R2 score:  0.9336801625439916


In [None]:
#Testing on the test data
ship_results = trained_model.evaluate(test_data)
print("R2 score: ", ship_results.r2)

R2 score:  0.9021009713574997


Since the R2 score is very good, we can certainly use the model for predictive analysis.

In [None]:
#Obtaining only the features from the test data to test the model on
unlabeled_data = test_data.select('features')
unlabeled_data.show(5)

+--------------------+
|            features|
+--------------------+
|[5.0,115.0,35.74,...|
|[6.0,30.276999999...|
|[6.0,90.0,20.0,9....|
|[6.0,110.23899999...|
|[8.0,77.499,19.5,...|
+--------------------+
only showing top 5 rows



In [None]:
#Obtaining the predictions on the test data
predictions = trained_model.transform(unlabeled_data)
predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[5.0,115.0,35.74,...|11.808864475084377|
|[6.0,30.276999999...| 4.340127554464244|
|[6.0,90.0,20.0,9....| 9.791862904248294|
|[6.0,110.23899999...| 11.42468464733363|
|[8.0,77.499,19.5,...| 8.678961871920702|
|[8.0,110.0,29.74,...|12.091272251949228|
|[9.0,105.0,27.2,8...|11.155516323580656|
|[9.0,113.0,26.74,...|11.063686012691909|
|[10.0,151.4,26.2,...|10.444064405563914|
|[11.0,58.6,15.66,...| 7.292946539927653|
|[11.0,90.09,25.01...| 8.904083026462613|
|[11.0,91.62700000...|  9.05438034008314|
|[11.0,108.977,26....|11.030699324037503|
|[12.0,25.0,3.88,5...|2.8774942789071623|
|[12.0,42.0,14.8,7...| 6.766491535258293|
|[12.0,58.6,15.66,...|7.4773238001790965|
|[12.0,91.0,20.32,...| 8.852756956297132|
|[13.0,61.0,13.8,7...| 6.507085668111442|
|[13.0,85.619,21.1...| 9.581440282035443|
|[13.0,91.0,20.32,...|  8.92022196321972|
+--------------------+------------