In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression

spark = SparkSession.builder.appName('consultingProject').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/03 19:50:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/03 19:50:21 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


22/12/03 20:31:10 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1075867 ms exceeds timeout 120000 ms
22/12/03 20:31:10 WARN SparkContext: Killing executors is not supported by current scheduler.


In [2]:
data = spark.read.csv(r'../../Python-and-Spark-for-Big-Data-master/Spark_for_Machine_Learning/Linear_Regression/cruise_ship_info.csv', header=True, inferSchema=True)

In [5]:
data.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [4]:
# We will use stringIndexer to give the string an index
#? https://medium.com/@nutanbhogendrasharma/role-of-stringindexer-and-pipelines-in-pyspark-ml-feature-b79085bb8a6c

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.linalg import Vectors

In [7]:
# StringIndexing the Cruise_line column
cruiseLine_indexer = StringIndexer(inputCol="Cruise_line", outputCol="CruiseLine_Index")
#Fits a model to the input dataset with optional parameters.
indexedData = cruiseLine_indexer.fit(data).transform(data)
indexedData.show(5)

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+----------------+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|CruiseLine_Index|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+----------------+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|            16.0|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|            16.0|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|             1.0|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|             1.0|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|             1.0|
+-----------+-----------+---+------------------+----------+------+------+---------------

In [8]:
# Prepping the Data for MLib by making the features column
assembler = VectorAssembler(inputCols=['CruiseLine_Index', 'Tonnage', 'passengers', 'length', 'cabins'], outputCol='features')
featureData = assembler.transform(indexedData)

In [10]:
# Creating the Model
lr = LinearRegression(featuresCol='features', labelCol='crew')

In [13]:
# Splitting the Dataset into Training and Test Data
ml_data = featureData.select(['features', 'crew'])
trainingData, testData = ml_data.randomSplit([0.7, 0.3])

In [14]:
# Training/Fitting our Model on the Training Dataset
cruiseModel = lr.fit(trainingData)
cruiseModelSummary = cruiseModel.summary

22/12/03 21:22:11 WARN Instrumentation: [138cec1b] regParam is zero, which might cause numerical instability and overfitting.
22/12/03 21:22:11 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
22/12/03 21:22:11 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/12/03 21:22:11 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/12/03 21:22:11 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [15]:
# Checking the RMS, R2 to be checked with data.describe().mean , the closer they are the better
print(cruiseModelSummary.r2)
print(cruiseModelSummary.rootMeanSquaredError) 

0.8999417469732716
1.0554712846206729


In [18]:
# Evaluating our model on testData
testDataEval = cruiseModel.evaluate(testData)
testDataEval.residuals.show(5)

+--------------------+
|           residuals|
+--------------------+
|-0.33844825164594816|
| -0.3612448243969002|
|  -0.193941816043111|
| -1.4908965662311982|
| -1.4008965662311983|
+--------------------+
only showing top 5 rows



In [19]:
# Making Predictions by using unLabeled Data
unlabeledData = featureData.select('features')

preedictedData = cruiseModel.transform(unlabeledData)

In [20]:
preedictedData.show(5)

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[16.0,30.27699999...| 4.333391317959263|
|[16.0,30.27699999...| 4.333391317959263|
|[1.0,47.262,14.86...| 6.268256097240258|
|[1.0,110.0,29.74,...|12.337964416685729|
|[1.0,101.353,26.4...|11.010421778997763|
+--------------------+------------------+
only showing top 5 rows



In [23]:
# Checking the Pearson Corelation to check does any 2 columns share any co-relation or not
# checking co-relation helps us choose better independent variables as the dependent variale depend upon these
from pyspark.sql.functions import corr

data.select(corr('crew', 'passengers')).show()
data.select(corr('crew', 'age')).show()
data.select(corr('crew', 'cabins')).show()

+----------------------+
|corr(crew, passengers)|
+----------------------+
|    0.9152341306065384|
+----------------------+

+-------------------+
|    corr(crew, age)|
+-------------------+
|-0.5306565039638852|
+-------------------+

+------------------+
|corr(crew, cabins)|
+------------------+
|0.9508226063578497|
+------------------+

