# Importing Libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# Create SparkSession

In [2]:
spark = (SparkSession.builder
                     .appName("MLLib")
                     .getOrCreate())

23/07/10 10:33:01 WARN Utils: Your hostname, myThinkPad resolves to a loopback address: 127.0.1.1; using 192.168.163.222 instead (on interface wlp3s0)
23/07/10 10:33:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/07/10 10:33:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/07/10 10:33:03 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [17]:
  from pyspark import SparkContext 
  sc = SparkContext()
  print(sc)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=MLLib, master=local[2]) created by getOrCreate at /tmp/ipykernel_291642/3522750921.py:3 

In [4]:
path = 'Data_Science_Bootcamp/Regression_Algorithms/Multiple_Linear_Regression/Restaurant_Profit_Data.csv'


# Read in the student data
df = spark.read.csv(path, header=True, inferSchema=True)
print('-----Read the raw Restaurant csv data--------')
df.printSchema()
df.limit(5).show(truncate=False)


-----Read the raw Restaurant csv data--------
root
 |-- Miscellaneous_Expenses: double (nullable = true)
 |-- Food_Innovation_Spend: double (nullable = true)
 |-- Advertising: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Profit: double (nullable = true)

+----------------------+---------------------+-----------+-------+---------+
|Miscellaneous_Expenses|Food_Innovation_Spend|Advertising|City   |Profit   |
+----------------------+---------------------+-----------+-------+---------+
|138671.8              |167497.2             |475918.1   |Chicago|202443.83|
|153151.59             |164745.7             |448032.53  |Mumbai |201974.06|
|102919.55             |155589.51            |412068.54  |Tokyo  |201232.39|
|120445.85             |146520.41            |387333.62  |Chicago|193083.99|
|93165.77              |144255.34            |370302.42  |Tokyo  |176369.94|
+----------------------+---------------------+-----------+-------+---------+



In [5]:
#Create features storing categorical & numerical variables, omitting the last column
categorical_cols = [item[0] for item in df.dtypes if item[1].startswith('string')]
print(categorical_cols)

numerical_cols = [item[0] for item in df.dtypes if item[1].startswith('int') | item[1].startswith('double')][:-1]
print(numerical_cols)

print("----------------------------")
#Print number of categorical as well as numerical features.
print(str(len(categorical_cols)) + '  categorical features')
print(str(len(numerical_cols)) + '  numerical features')

['City']
['Miscellaneous_Expenses', 'Food_Innovation_Spend', 'Advertising']
----------------------------
1  categorical features
3  numerical features


In [6]:
#Convert categorical column City to numerical
'''
indexer = StringIndexer(inputCol="City", outputCol="City_Dummy")
df_new = indexer.fit(df).transform(df)
df_new.show()
df_new = df_new.drop("City")
df_new.printSchema()
df_new.show()
'''

'\nindexer = StringIndexer(inputCol="City", outputCol="City_Dummy")\ndf_new = indexer.fit(df).transform(df)\ndf_new.show()\ndf_new = df_new.drop("City")\ndf_new.printSchema()\ndf_new.show()\n'

# Example to convert categorical column to numerical

In [7]:
from pyspark.ml.feature import StringIndexer

testdf = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])
testdf.show()
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(testdf).transform(df)
indexed.show()

                                                                                

+---+--------+
| id|category|
+---+--------+
|  0|       a|
|  1|       b|
|  2|       c|
|  3|       a|
|  4|       a|
|  5|       c|
+---+--------+



[Stage 7:>                                                          (0 + 2) / 2]

23/07/10 10:33:49 WARN StringIndexerModel: Input column category does not exist during transformation. Skip StringIndexerModel for this column.
+----------------------+---------------------+-----------+-------+---------+
|Miscellaneous_Expenses|Food_Innovation_Spend|Advertising|   City|   Profit|
+----------------------+---------------------+-----------+-------+---------+
|              138671.8|             167497.2|   475918.1|Chicago|202443.83|
|             153151.59|             164745.7|  448032.53| Mumbai|201974.06|
|             102919.55|            155589.51|  412068.54|  Tokyo|201232.39|
|             120445.85|            146520.41|  387333.62|Chicago|193083.99|
|              93165.77|            144255.34|  370302.42|  Tokyo|176369.94|
|             101588.71|             134024.9|  366995.36|Chicago|167173.12|
|             148972.87|            136763.46|  131850.82| Mumbai|166304.51|
|             147304.06|            132446.13|  328010.68|  Tokyo| 165934.6|
|        

                                                                                

## First using StringIndexer to convert string/text values into numerical values followed by OneHotEncoderEstimator 
## Spark MLLibto convert each Stringindexed or transformed values into One Hot Encoded values.
## VectorAssembler is being used to assemble all the features into one vector from multiple columns that contain type double 
## Also appending every step of the process in a stages array

In [8]:
# First using StringIndexer to convert string/text values into numerical values followed by OneHotEncoderEstimator 
# Spark MLLibto convert each Stringindexed or transformed values into One Hot Encoded values.
# VectorAssembler is being used to assemble all the features into one vector from multiple columns that contain type double 
# Also appending every step of the process in a stages array
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
stages = []
for categoricalCol in categorical_cols:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    OHencoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "_catVec"])

print(stringIndexer)
print(OHencoder)
stages += [stringIndexer, OHencoder]
print(stages)
assemblerInputs = [c + "_catVec" for c in categorical_cols] + numerical_cols
print(assemblerInputs)
Vectassembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
print(Vectassembler)
stages += [Vectassembler]
print(stages)

StringIndexer_808613d60161
OneHotEncoder_4e2c8f80e2d0
[StringIndexer_808613d60161, OneHotEncoder_4e2c8f80e2d0]
['City_catVec', 'Miscellaneous_Expenses', 'Food_Innovation_Spend', 'Advertising']
VectorAssembler_ef1c69f55352
[StringIndexer_808613d60161, OneHotEncoder_4e2c8f80e2d0, VectorAssembler_ef1c69f55352]


In [9]:
# Using a Spark MLLib pipeline to apply all the stages of transformation
from pyspark.ml import Pipeline
import pandas as pd
cols = df.columns
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
data = pipelineModel.transform(df)
selectedCols = ['features']+cols
data = data.select(selectedCols)
pd.DataFrame(data.take(5), columns=data.columns)

Unnamed: 0,features,Miscellaneous_Expenses,Food_Innovation_Spend,Advertising,City,Profit
0,"[1.0, 0.0, 138671.8, 167497.2, 475918.1]",138671.8,167497.2,475918.1,Chicago,202443.83
1,"[0.0, 1.0, 153151.59, 164745.7, 448032.53]",153151.59,164745.7,448032.53,Mumbai,201974.06
2,"[0.0, 0.0, 102919.55, 155589.51, 412068.54]",102919.55,155589.51,412068.54,Tokyo,201232.39
3,"[1.0, 0.0, 120445.85, 146520.41, 387333.62]",120445.85,146520.41,387333.62,Chicago,193083.99
4,"[0.0, 0.0, 93165.77, 144255.34, 370302.42]",93165.77,144255.34,370302.42,Tokyo,176369.94


In [10]:
#Display the data having additional column named features. Since it's a multiple linear regression problem, hence all the
# independent variable values are shown as one vector
data.show()

+--------------------+----------------------+---------------------+-----------+-------+---------+
|            features|Miscellaneous_Expenses|Food_Innovation_Spend|Advertising|   City|   Profit|
+--------------------+----------------------+---------------------+-----------+-------+---------+
|[1.0,0.0,138671.8...|              138671.8|             167497.2|   475918.1|Chicago|202443.83|
|[0.0,1.0,153151.5...|             153151.59|             164745.7|  448032.53| Mumbai|201974.06|
|[0.0,0.0,102919.5...|             102919.55|            155589.51|  412068.54|  Tokyo|201232.39|
|[1.0,0.0,120445.8...|             120445.85|            146520.41|  387333.62|Chicago|193083.99|
|[0.0,0.0,93165.77...|              93165.77|            144255.34|  370302.42|  Tokyo|176369.94|
|[1.0,0.0,101588.7...|             101588.71|             134024.9|  366995.36|Chicago|167173.12|
|[0.0,1.0,148972.8...|             148972.87|            136763.46|  131850.82| Mumbai|166304.51|
|[0.0,0.0,147304.0..

In [11]:
#Select only Features and Label from previous dataset as we need these two entities for building machine learning model
finalized_data = data.select("features","Profit")

finalized_data.show()

+--------------------+---------+
|            features|   Profit|
+--------------------+---------+
|[1.0,0.0,138671.8...|202443.83|
|[0.0,1.0,153151.5...|201974.06|
|[0.0,0.0,102919.5...|201232.39|
|[1.0,0.0,120445.8...|193083.99|
|[0.0,0.0,93165.77...|176369.94|
|[1.0,0.0,101588.7...|167173.12|
|[0.0,1.0,148972.8...|166304.51|
|[0.0,0.0,147304.0...| 165934.6|
|[1.0,0.0,150492.9...|162393.77|
|[0.0,1.0,110453.1...|159941.96|
|[0.0,0.0,112368.1...|156303.95|
|[0.0,1.0,93564.61...| 154441.4|
|[0.0,0.0,129094.3...|151767.52|
|[0.0,1.0,137269.0...|144489.35|
|[0.0,0.0,158321.4...|142784.65|
|[1.0,0.0,124390.8...|140099.04|
|[0.0,1.0,123371.5...|137174.93|
|[1.0,0.0,146851.5...|135552.37|
|[0.0,0.0,115949.7...| 134448.9|
|[1.0,0.0,155288.1...|132958.86|
+--------------------+---------+
only showing top 20 rows



# Create a Feature array by omitting the last column

In [12]:
#Select only Features and Label from previous dataset as we need these two entities for building machine learning model

finalized_data = data.select("features","Profit")
finalized_data.show()

+--------------------+---------+
|            features|   Profit|
+--------------------+---------+
|[1.0,0.0,138671.8...|202443.83|
|[0.0,1.0,153151.5...|201974.06|
|[0.0,0.0,102919.5...|201232.39|
|[1.0,0.0,120445.8...|193083.99|
|[0.0,0.0,93165.77...|176369.94|
|[1.0,0.0,101588.7...|167173.12|
|[0.0,1.0,148972.8...|166304.51|
|[0.0,0.0,147304.0...| 165934.6|
|[1.0,0.0,150492.9...|162393.77|
|[0.0,1.0,110453.1...|159941.96|
|[0.0,0.0,112368.1...|156303.95|
|[0.0,1.0,93564.61...| 154441.4|
|[0.0,0.0,129094.3...|151767.52|
|[0.0,1.0,137269.0...|144489.35|
|[0.0,0.0,158321.4...|142784.65|
|[1.0,0.0,124390.8...|140099.04|
|[0.0,1.0,123371.5...|137174.93|
|[1.0,0.0,146851.5...|135552.37|
|[0.0,0.0,115949.7...| 134448.9|
|[1.0,0.0,155288.1...|132958.86|
+--------------------+---------+
only showing top 20 rows



# Split the data into train and test dataset

In [13]:
#Split the data into training and test model with 70% obs. going in training and 30% in testing
train_dataset, test_dataset = finalized_data.randomSplit([0.8, 0.2])
train_dataset.describe().show()
test_dataset.describe().show()

+-------+------------------+
|summary|            Profit|
+-------+------------------+
|  count|                37|
|   mean|123397.53324324326|
| stddev| 43396.33854490875|
|    min|           24863.4|
|    max|         202443.83|
+-------+------------------+

+-------+------------------+
|summary|            Profit|
+-------+------------------+
|  count|                13|
|   mean|118771.01769230771|
| stddev| 31094.19644871723|
|    min|          75382.33|
|    max|         176369.94|
+-------+------------------+



# Perform Linear Regression

In [14]:
#Import Linear Regression class called LinearRegression
from pyspark.ml.regression import LinearRegression

#Create the Linear Regression object named having feature column as features and Label column as Time_to_Study
LinReg = LinearRegression(featuresCol="features", labelCol="Profit")

#Train the model on the training using fit() method.
model = LinReg.fit(train_dataset)

#Predict the Grades using the evulate method
pred = model.evaluate(test_dataset)

#Show the predicted Grade values along side actual Grade values
pred.predictions.show()

23/07/10 10:34:17 WARN Instrumentation: [0c0d9568] regParam is zero, which might cause numerical instability and overfitting.
+--------------------+---------+------------------+
|            features|   Profit|        prediction|
+--------------------+---------+------------------+
|[0.0,0.0,86484.77...| 87980.83| 82003.39136829594|
|[0.0,0.0,93165.77...|176369.94|183248.08673250358|
|[0.0,0.0,112368.1...|156303.95| 144984.7631362705|
|[0.0,0.0,128830.2...|100890.19| 81123.05789257553|
|[0.0,0.0,184419.5...|113464.38| 108059.5331207614|
|[0.0,1.0,120320.0...| 88421.91| 82194.60993303913|
|[0.0,1.0,156580.1...| 75382.33| 71437.49500920527|
|[0.0,1.0,159467.9...| 106894.8| 95904.00873297648|
|[1.0,0.0,86821.44...|106661.51|101531.38324942328|
|[1.0,0.0,124390.8...|140099.04|158993.41618579935|
|[1.0,0.0,150492.9...|162393.77|163764.43717146514|
|[1.0,0.0,154475.9...|107665.56|108102.13724443913|
|[1.0,0.0,155547.4...|121495.02|127350.73519832618|
+--------------------+---------+----------

In [15]:
#Find out coefficient value
coefficient = model.coefficients
print ("The coefficient of the model is : %a" %coefficient)

#Find out intercept Value
intercept = model.intercept
print ("The Intercept of the model is : %f" %intercept)

#Evaluate the model using metric like Mean Absolute Error(MAE), Root Mean Square Error(RMSE) and R-Square
from pyspark.ml.evaluation import RegressionEvaluator
evaluation = RegressionEvaluator(labelCol="Profit", predictionCol="prediction")

# Root Mean Square Error
rmse = evaluation.evaluate(pred.predictions, {evaluation.metricName: "rmse"})
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = evaluation.evaluate(pred.predictions, {evaluation.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = evaluation.evaluate(pred.predictions, {evaluation.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = evaluation.evaluate(pred.predictions, {evaluation.metricName: "r2"})
print("r2: %.3f" %r2)

The coefficient of the model is : DenseVector([3226.9572, 1144.0305, -0.0552, 0.8511, 0.0219])
The Intercept of the model is : 57525.071965
RMSE: 9710.927
MSE: 94302108.490
MAE: 7861.318
r2: 0.894
