<a href="https://colab.research.google.com/github/BhuvaneswariRV/MachineLearning/blob/main/Spark_LinearRegression.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Linear Regression Using Spark

In [1]:
#Install PySpark
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession

In [3]:
session = SparkSession.builder.appName('Spark_linear_regression').getOrCreate()

In [4]:
# Read csv
data = session.read.csv('cruise_ship_info.csv',inferSchema=True,header=True)

In [5]:
data.head()

Row(Ship_name='Journey', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55)

In [6]:
data.describe().show()

+-------+---------+-----------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+
|summary|Ship_name|Cruise_line|               Age|           Tonnage|       passengers|           length|            cabins|passenger_density|             crew|
+-------+---------+-----------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+
|  count|      158|        158|               158|               158|              158|              158|               158|              158|              158|
|   mean| Infinity|       null|15.689873417721518| 71.28467088607599|18.45740506329114|8.130632911392404| 8.830000000000005|39.90094936708861|7.794177215189873|
| stddev|     null|       null| 7.615691058751413|37.229540025907866|9.677094775143416|1.793473548054825|4.4714172221480615| 8.63921711391542|3.503486564627034|
|    min|Adventure|    Azamara|   

In [7]:
data.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



### Check for Null in data set

In [8]:
data.createOrReplaceTempView("ship")

In [9]:
session.sql("select * from ship where Cruise_line is null").show()

+---------+-----------+---+-------+----------+------+------+-----------------+----+
|Ship_name|Cruise_line|Age|Tonnage|passengers|length|cabins|passenger_density|crew|
+---------+-----------+---+-------+----------+------+------+-----------------+----+
+---------+-----------+---+-------+----------+------+------+-----------------+----+



### Replace String to categories

In [10]:
from pyspark.sql.functions import countDistinct
data.select(countDistinct('Cruise_line')).show()

+---------------------------+
|count(DISTINCT Cruise_line)|
+---------------------------+
|                         20|
+---------------------------+



In [11]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Cruise_line", outputCol="Cruise_Index")
indexed = indexer.fit(data).transform(data)
indexed.show()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+------------+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|Cruise_Index|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+------------+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|        16.0|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|        16.0|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|         1.0|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|         1.0|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|         1.0|
|    Ecstasy|   Carnival| 22|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|         1.0|
|    Elati

## SetUp DataFrame

In [12]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [13]:
indexed.columns

['Ship_name',
 'Cruise_line',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew',
 'Cruise_Index']

In [14]:
assembler = VectorAssembler(
    inputCols=['Age','Tonnage', 'passengers', 'length', 'cabins', 'passenger_density', 'Cruise_Index'],
    outputCol="features")
data = assembler.transform(indexed)
data.show()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+------------+--------------------+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|Cruise_Index|            features|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+------------+--------------------+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|        16.0|[6.0,30.276999999...|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|        16.0|[6.0,30.276999999...|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|         1.0|[26.0,47.262,14.8...|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|         1.0|[11.0,110.0,29.74...|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|          

In [15]:
data = data.select('features','crew')

## Apply PCA

In [16]:
from pyspark.ml.feature import PCA

In [17]:
pca = PCA(k=5, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(data).transform(data)
model.select('pcaFeatures').show()

+--------------------+
|         pcaFeatures|
+--------------------+
|[28.9069668920941...|
|[28.9069668920941...|
|[46.1671959257090...|
|[112.237912587011...|
|[102.249639517772...|
|[70.3748283363941...|
|[71.2163003726564...|
|[70.2670224725733...|
|[70.7354592090780...|
|[114.872573212663...|
|[112.356178641863...|
|[44.6756224114142...|
|[70.8556694999726...|
|[70.9758797908672...|
|[86.8070790602045...|
|[112.596599223652...|
|[89.4195605544128...|
|[71.2163003726564...|
|[89.1684098333524...|
|[70.6152489181834...|
+--------------------+
only showing top 20 rows



## Create and evaluate the linear model with and without PCA

In [18]:
final_data = data.select("features",'crew')

In [19]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [20]:
from pyspark.ml.regression import LinearRegression
linearReg = LinearRegression(labelCol='crew')
linearRegModel = linearReg.fit(train_data)
print("Coefficients: {} Intercept: {}".format(linearRegModel.coefficients,linearRegModel.intercept))

Coefficients: [-0.009252190109599344,0.00839468529458932,-0.1025811781225782,0.43184313367301286,0.7702357987966898,0.005738094889316517,0.038288937732637265] Intercept: -1.4948801905445954


In [21]:
test_results = linearRegModel.evaluate(test_data)
test_results.residuals.show()

+--------------------+
|           residuals|
+--------------------+
| -1.2857381527488911|
| -0.6231386047457494|
|  -0.557746807258658|
|-0.08307477479330494|
| 0.49337685153943944|
|  0.6980955640209796|
| -0.7099687944962323|
|-0.18102821646823486|
| -0.2649606646655709|
|-0.49602465042434574|
| -0.5625606628064883|
|  0.7434166656416341|
|  0.2859961543601973|
|  0.2852970989645689|
| -0.3533084726968898|
| 0.11156911743573161|
| -1.3935649112055986|
|-0.42528614898103534|
|  0.7619210458608325|
| -1.0843127210959986|
+--------------------+
only showing top 20 rows



In [22]:
test_results.r2

0.9052718342415075

In [23]:
test_results.rootMeanSquaredError

0.9158696082466363

In [24]:
pca_final_data = model.select("pcaFeatures",'crew')

In [25]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [26]:
from pyspark.ml.regression import LinearRegression
linearReg = LinearRegression(labelCol='crew')
linearRegModel = linearReg.fit(train_data)
print("Coefficients: {} Intercept: {}".format(linearRegModel.coefficients,linearRegModel.intercept))

Coefficients: [-0.012505598939363559,0.01691241249824924,-0.20306908386212258,0.3742058687154651,0.940043025258437,-0.011488722278504792,0.05463818538364151] Intercept: -0.6453980318952609


In [27]:
test_results = linearRegModel.evaluate(test_data)
test_results.residuals.show()

+--------------------+
|           residuals|
+--------------------+
| 0.38397728098039074|
| 0.14598193718228458|
| -1.7544320527174708|
| -1.1593568484659187|
|  0.4634336043910956|
|  0.6200535412514654|
|  0.5282407010512564|
|-0.19276886452729514|
|  0.3732586226260981|
| -0.6424559427165004|
| -0.6829628784882065|
|   0.711624685473085|
| -0.6336924024642911|
| -0.3000019695763445|
|  1.8744510941244523|
| -0.2768119589898985|
| -0.3699107962047741|
|   1.172501461822927|
|  0.9130855775143996|
|-0.21962940062747016|
+--------------------+
only showing top 20 rows



In [28]:
test_results.r2

0.9607582957308911

In [29]:
test_results.rootMeanSquaredError

0.6765486739125269