<a href="https://colab.research.google.com/github/Degananda264/PySpark-MLlib/blob/master/Multiple_Linear_Regression_Using_Spark_MLlib.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar -xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
import findspark
findspark.init()

In [7]:
!pip install -U Pyarrow

Collecting Pyarrow
[?25l  Downloading https://files.pythonhosted.org/packages/ba/3f/6cac1714fff444664603f92cb9fbe91c7ae25375880158b9e9691c4584c8/pyarrow-0.17.1-cp36-cp36m-manylinux2014_x86_64.whl (63.8MB)
[K     |████████████████████████████████| 63.8MB 47kB/s 
Installing collected packages: Pyarrow
  Found existing installation: pyarrow 0.14.1
    Uninstalling pyarrow-0.14.1:
      Successfully uninstalled pyarrow-0.14.1
Successfully installed Pyarrow-0.17.1


In [27]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [28]:
#Loading the Student_Grades_Data.csv file, uploaded in previous step
data=spark.read.csv("/content/Restaurant_Profit_Data.csv",header=True,inferSchema=True)

In [29]:
data.show()

+----------------------+---------------------+-----------+-------+---------+
|Miscellaneous_Expenses|Food_Innovation_Spend|Advertising|   City|   Profit|
+----------------------+---------------------+-----------+-------+---------+
|              138671.8|             167497.2|   475918.1|Chicago|202443.83|
|             153151.59|             164745.7|  448032.53| Mumbai|201974.06|
|             102919.55|            155589.51|  412068.54|  Tokyo|201232.39|
|             120445.85|            146520.41|  387333.62|Chicago|193083.99|
|              93165.77|            144255.34|  370302.42|  Tokyo|176369.94|
|             101588.71|             134024.9|  366995.36|Chicago|167173.12|
|             148972.87|            136763.46|  131850.82| Mumbai|166304.51|
|             147304.06|            132446.13|  328010.68|  Tokyo| 165934.6|
|             150492.95|            122690.52|  315747.29|Chicago|162393.77|
|             110453.17|            125482.88|  309115.62| Mumbai|159941.96|

In [30]:
data.printSchema()

root
 |-- Miscellaneous_Expenses: double (nullable = true)
 |-- Food_Innovation_Spend: double (nullable = true)
 |-- Advertising: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Profit: double (nullable = true)



In [31]:
data.dtypes

[('Miscellaneous_Expenses', 'double'),
 ('Food_Innovation_Spend', 'double'),
 ('Advertising', 'double'),
 ('City', 'string'),
 ('Profit', 'double')]

In [32]:
data.count()

50

In [33]:
#Create features storing categorical & numerical variables, omitting the last column
categorical_cols = [item[0] for item in data.dtypes if item[1].startswith('string')]
print(categorical_cols)

numerical_cols = [item[0] for item in data.dtypes if item[1].startswith('int') | item[1].startswith('double')][:-1]
print(numerical_cols)

['City']
['Miscellaneous_Expenses', 'Food_Innovation_Spend', 'Advertising']


In [34]:
#Print number of categorical as well as numerical features.
print(str(len(categorical_cols)) + '  categorical features')
print(str(len(numerical_cols)) + '  numerical features')

1  categorical features
3  numerical features


In [35]:
# First using StringIndexer to convert string/text values into numerical values followed by OneHotEncoderEstimator 
# Spark MLLibto convert each Stringindexed or transformed values into One Hot Encoded values.
# VectorAssembler is being used to assemble all the features into one vector from multiple columns that contain type double 
# Also appending every step of the process in a stages array
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
stages = []
for categoricalCol in categorical_cols:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    OHencoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "_catVec"])
stages += [stringIndexer, OHencoder]
assemblerInputs = [c + "_catVec" for c in categorical_cols] + numerical_cols
Vectassembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [Vectassembler]

In [36]:
stages

[StringIndexer_a020386017d8,
 OneHotEncoderEstimator_2491829eb11a,
 VectorAssembler_4928c033d0f3]

In [37]:
# Using a Spark MLLib pipeline to apply all the stages of transformation
import pandas as pd
from pyspark.ml import Pipeline
cols = data.columns
pipeline = Pipeline(stages = stages)


In [38]:
pipelineModel = pipeline.fit(data)
data = pipelineModel.transform(data)


In [39]:
data.show()

+----------------------+---------------------+-----------+-------+---------+---------+-------------+--------------------+
|Miscellaneous_Expenses|Food_Innovation_Spend|Advertising|   City|   Profit|CityIndex|  City_catVec|            features|
+----------------------+---------------------+-----------+-------+---------+---------+-------------+--------------------+
|              138671.8|             167497.2|   475918.1|Chicago|202443.83|      0.0|(2,[0],[1.0])|[1.0,0.0,138671.8...|
|             153151.59|             164745.7|  448032.53| Mumbai|201974.06|      1.0|(2,[1],[1.0])|[0.0,1.0,153151.5...|
|             102919.55|            155589.51|  412068.54|  Tokyo|201232.39|      2.0|    (2,[],[])|[0.0,0.0,102919.5...|
|             120445.85|            146520.41|  387333.62|Chicago|193083.99|      0.0|(2,[0],[1.0])|[1.0,0.0,120445.8...|
|              93165.77|            144255.34|  370302.42|  Tokyo|176369.94|      2.0|    (2,[],[])|[0.0,0.0,93165.77...|
|             101588.71|

In [40]:
selectedCols = ['features']+cols
data = data.select(selectedCols)
pd.DataFrame(data.take(5), columns=data.columns)

Unnamed: 0,features,Miscellaneous_Expenses,Food_Innovation_Spend,Advertising,City,Profit
0,"[1.0, 0.0, 138671.8, 167497.2, 475918.1]",138671.8,167497.2,475918.1,Chicago,202443.83
1,"[0.0, 1.0, 153151.59, 164745.7, 448032.53]",153151.59,164745.7,448032.53,Mumbai,201974.06
2,"[0.0, 0.0, 102919.55, 155589.51, 412068.54]",102919.55,155589.51,412068.54,Tokyo,201232.39
3,"[1.0, 0.0, 120445.85, 146520.41, 387333.62]",120445.85,146520.41,387333.62,Chicago,193083.99
4,"[0.0, 0.0, 93165.77, 144255.34, 370302.42]",93165.77,144255.34,370302.42,Tokyo,176369.94


In [41]:
#Display the data having additional column named features. Since it's a multiple linear regression problem, hence all the
# independent variable values are shown as one vector
data.show()

+--------------------+----------------------+---------------------+-----------+-------+---------+
|            features|Miscellaneous_Expenses|Food_Innovation_Spend|Advertising|   City|   Profit|
+--------------------+----------------------+---------------------+-----------+-------+---------+
|[1.0,0.0,138671.8...|              138671.8|             167497.2|   475918.1|Chicago|202443.83|
|[0.0,1.0,153151.5...|             153151.59|             164745.7|  448032.53| Mumbai|201974.06|
|[0.0,0.0,102919.5...|             102919.55|            155589.51|  412068.54|  Tokyo|201232.39|
|[1.0,0.0,120445.8...|             120445.85|            146520.41|  387333.62|Chicago|193083.99|
|[0.0,0.0,93165.77...|              93165.77|            144255.34|  370302.42|  Tokyo|176369.94|
|[1.0,0.0,101588.7...|             101588.71|             134024.9|  366995.36|Chicago|167173.12|
|[0.0,1.0,148972.8...|             148972.87|            136763.46|  131850.82| Mumbai|166304.51|
|[0.0,0.0,147304.0..

In [42]:
#Select only Features and Label from previous dataset as we need these two entities for building machine learning model
finalized_data = data.select("features","Profit")
finalized_data.show()

+--------------------+---------+
|            features|   Profit|
+--------------------+---------+
|[1.0,0.0,138671.8...|202443.83|
|[0.0,1.0,153151.5...|201974.06|
|[0.0,0.0,102919.5...|201232.39|
|[1.0,0.0,120445.8...|193083.99|
|[0.0,0.0,93165.77...|176369.94|
|[1.0,0.0,101588.7...|167173.12|
|[0.0,1.0,148972.8...|166304.51|
|[0.0,0.0,147304.0...| 165934.6|
|[1.0,0.0,150492.9...|162393.77|
|[0.0,1.0,110453.1...|159941.96|
|[0.0,0.0,112368.1...|156303.95|
|[0.0,1.0,93564.61...| 154441.4|
|[0.0,0.0,129094.3...|151767.52|
|[0.0,1.0,137269.0...|144489.35|
|[0.0,0.0,158321.4...|142784.65|
|[1.0,0.0,124390.8...|140099.04|
|[0.0,1.0,123371.5...|137174.93|
|[1.0,0.0,146851.5...|135552.37|
|[0.0,0.0,115949.7...| 134448.9|
|[1.0,0.0,155288.1...|132958.86|
+--------------------+---------+
only showing top 20 rows



In [43]:
#Split the data into training and test model with 70% obs. going in training and 30% in testing
train_dataset, test_dataset = finalized_data.randomSplit([0.7, 0.3])

In [45]:
from pyspark.ml.regression import LinearRegression
MLR=LinearRegression(featuresCol="features",labelCol="Profit")
model=MLR.fit(train_dataset)


In [46]:
pred=model.evaluate(test_dataset)

In [49]:
pred.predictions.show()

+--------------------+---------+------------------+
|            features|   Profit|        prediction|
+--------------------+---------+------------------+
|[0.0,0.0,93165.77...|176369.94|187007.50100312495|
|[0.0,0.0,104831.4...|106960.92|110319.58950505976|
|[0.0,0.0,145909.9...|115915.54|121079.67620068719|
|[0.0,1.0,137200.9...| 52741.73|53636.934451891415|
|[0.0,1.0,137269.0...|144489.35|135631.79908098152|
|[0.0,1.0,159467.9...| 106894.8|  94387.3282542174|
|[1.0,0.0,53517.15...| 45855.41| 62990.20007470026|
|[1.0,0.0,86821.44...|106661.51|101700.57333746238|
|[1.0,0.0,101055.3...|118734.04|124903.27935098141|
|[1.0,0.0,125927.0...| 75108.08|55955.139885912555|
|[1.0,0.0,150492.9...|162393.77| 159614.2913736243|
|[1.0,0.0,154475.9...|107665.56|105049.68514783765|
|[1.0,0.0,155288.1...|132958.86|124362.65737819161|
|[1.0,0.0,155547.4...|121495.02|123588.54835146462|
+--------------------+---------+------------------+



In [50]:
#Find out coefficient value
coefficient = model.coefficients
print ("The coefficients of the model are : %a" %coefficient)

The coefficients of the model are : DenseVector([-2360.2724, -2642.7569, -0.103, 0.8367, 0.0199])


In [51]:
#Find out intercept Value
intercept = model.intercept
print ("The Intercept of the model is : %f" %intercept)

The Intercept of the model is : 68528.279165


In [52]:
#Evaluate the model using metric like Mean Absolute Error(MAE), Root Mean Square Error(RMSE) and R-Square
from pyspark.ml.evaluation import RegressionEvaluator
evaluation = RegressionEvaluator(labelCol="Profit", predictionCol="prediction")

# r2 - coefficient of determination
r2 = evaluation.evaluate(pred.predictions, {evaluation.metricName: "r2"})
print("r2: %.3f" %r2)

r2: 0.932


In [53]:
#Create Unlabeled dataset  to contain only feature column
unlabeled_dataset = test_dataset.select('features')

In [54]:
unlabeled_dataset.show()

+--------------------+
|            features|
+--------------------+
|[0.0,0.0,93165.77...|
|[0.0,0.0,104831.4...|
|[0.0,0.0,145909.9...|
|[0.0,1.0,137200.9...|
|[0.0,1.0,137269.0...|
|[0.0,1.0,159467.9...|
|[1.0,0.0,53517.15...|
|[1.0,0.0,86821.44...|
|[1.0,0.0,101055.3...|
|[1.0,0.0,125927.0...|
|[1.0,0.0,150492.9...|
|[1.0,0.0,154475.9...|
|[1.0,0.0,155288.1...|
|[1.0,0.0,155547.4...|
+--------------------+



In [55]:
#Predict the model output for fresh & unseen test data using transform() method
new_predictions = model.transform(unlabeled_dataset)

In [58]:
new_predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[0.0,0.0,93165.77...|187007.50100312495|
|[0.0,0.0,104831.4...|110319.58950505976|
|[0.0,0.0,145909.9...|121079.67620068719|
|[0.0,1.0,137200.9...|53636.934451891415|
|[0.0,1.0,137269.0...|135631.79908098152|
|[0.0,1.0,159467.9...|  94387.3282542174|
|[1.0,0.0,53517.15...| 62990.20007470026|
|[1.0,0.0,86821.44...|101700.57333746238|
|[1.0,0.0,101055.3...|124903.27935098141|
|[1.0,0.0,125927.0...|55955.139885912555|
|[1.0,0.0,150492.9...| 159614.2913736243|
|[1.0,0.0,154475.9...|105049.68514783765|
|[1.0,0.0,155288.1...|124362.65737819161|
|[1.0,0.0,155547.4...|123588.54835146462|
+--------------------+------------------+

