In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [3]:
spark

In [4]:
import pandas as pd
data = spark.read.csv('Restaurent_Profit_Data.txt',header=True,inferSchema=True)

In [5]:
data.printSchema()

root
 |-- Miscellaneous_Expenses: double (nullable = true)
 |-- Food_Innovation_Spend: double (nullable = true)
 |-- Advertising: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Profit: double (nullable = true)



In [6]:
type(data)

pyspark.sql.dataframe.DataFrame

In [7]:
# Display the data
data.show()

+----------------------+---------------------+-----------+-------+---------+
|Miscellaneous_Expenses|Food_Innovation_Spend|Advertising|   City|   Profit|
+----------------------+---------------------+-----------+-------+---------+
|              138671.8|             167497.2|   475918.1|Chicago|202443.83|
|             153151.59|             164745.7|  448032.53| Mumbai|201974.06|
|             102919.55|            155589.51|  412068.54|  Tokyo|201232.39|
|             120445.85|            146520.41|  387333.62|Chicago|193083.99|
|              93165.77|            144255.34|  370302.42|  Tokyo|176369.94|
|             101588.71|             134024.9|  366995.36|Chicago|167173.12|
|             148972.87|            136763.46|  131850.82| Mumbai|166304.51|
|             147304.06|            132446.13|  328010.68|  Tokyo| 165934.6|
|             150492.95|            122690.52|  315747.29|Chicago|162393.77|
|             110453.17|            125482.88|  309115.62| Mumbai|159941.96|

In [8]:
data.dtypes

[('Miscellaneous_Expenses', 'double'),
 ('Food_Innovation_Spend', 'double'),
 ('Advertising', 'double'),
 ('City', 'string'),
 ('Profit', 'double')]

In [10]:
# create categorical features and numnerical variables , omitting the last column

categorical_cols = [item[0] for item in data.dtypes if item[1].startswith('string')]
print(categorical_cols)

numerical_cols = [item[0] for item in data.dtypes if item[1].startswith('int')| item[1].startswith('double')][:-1]
print(numerical_cols)

['City']
['Miscellaneous_Expenses', 'Food_Innovation_Spend', 'Advertising']


In [12]:
print(str(len(categorical_cols)) + ": categorical_cols")
print(str(len(numerical_cols)) + ": numerical_cols")

1: categorical_cols
3: numerical_cols


In [32]:
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator,VectorAssembler
stages = []
for cat_columns in categorical_cols:
    stringIndexer = StringIndexer(inputCol = cat_columns,outputCol=cat_columns+"index")
    onehotencoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],outputCols=[cat_columns + "_catVec"])
stages += [stringIndexer,onehotencoder]
assemblerInputs = [c + "_catVec" for c in categorical_cols] + numerical_cols
vectorassembler = VectorAssembler(inputCols=assemblerInputs,outputCol="features")
stages += [vectorassembler]

## Spark MLlib pipeline to Apply all the stages of transformation 

In [33]:
from pyspark.ml import Pipeline

In [34]:
cols = data.columns
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(data)
data = pipelineModel.transform(data)
selectedCols = ["features"]+cols
data = data.select(selectedCols)
pd.DataFrame(data.take(5),columns=data.columns)

Unnamed: 0,features,Miscellaneous_Expenses,Food_Innovation_Spend,Advertising,City,Profit
0,"[1.0, 0.0, 138671.8, 167497.2, 475918.1]",138671.8,167497.2,475918.1,Chicago,202443.83
1,"[0.0, 1.0, 153151.59, 164745.7, 448032.53]",153151.59,164745.7,448032.53,Mumbai,201974.06
2,"[0.0, 0.0, 102919.55, 155589.51, 412068.54]",102919.55,155589.51,412068.54,Tokyo,201232.39
3,"[1.0, 0.0, 120445.85, 146520.41, 387333.62]",120445.85,146520.41,387333.62,Chicago,193083.99
4,"[0.0, 0.0, 93165.77, 144255.34, 370302.42]",93165.77,144255.34,370302.42,Tokyo,176369.94


In [35]:
data.show()

+--------------------+----------------------+---------------------+-----------+-------+---------+
|            features|Miscellaneous_Expenses|Food_Innovation_Spend|Advertising|   City|   Profit|
+--------------------+----------------------+---------------------+-----------+-------+---------+
|[1.0,0.0,138671.8...|              138671.8|             167497.2|   475918.1|Chicago|202443.83|
|[0.0,1.0,153151.5...|             153151.59|             164745.7|  448032.53| Mumbai|201974.06|
|[0.0,0.0,102919.5...|             102919.55|            155589.51|  412068.54|  Tokyo|201232.39|
|[1.0,0.0,120445.8...|             120445.85|            146520.41|  387333.62|Chicago|193083.99|
|[0.0,0.0,93165.77...|              93165.77|            144255.34|  370302.42|  Tokyo|176369.94|
|[1.0,0.0,101588.7...|             101588.71|             134024.9|  366995.36|Chicago|167173.12|
|[0.0,1.0,148972.8...|             148972.87|            136763.46|  131850.82| Mumbai|166304.51|
|[0.0,0.0,147304.0..

In [36]:
final_data = data.select("features","profit")
final_data.show()

+--------------------+---------+
|            features|   profit|
+--------------------+---------+
|[1.0,0.0,138671.8...|202443.83|
|[0.0,1.0,153151.5...|201974.06|
|[0.0,0.0,102919.5...|201232.39|
|[1.0,0.0,120445.8...|193083.99|
|[0.0,0.0,93165.77...|176369.94|
|[1.0,0.0,101588.7...|167173.12|
|[0.0,1.0,148972.8...|166304.51|
|[0.0,0.0,147304.0...| 165934.6|
|[1.0,0.0,150492.9...|162393.77|
|[0.0,1.0,110453.1...|159941.96|
|[0.0,0.0,112368.1...|156303.95|
|[0.0,1.0,93564.61...| 154441.4|
|[0.0,0.0,129094.3...|151767.52|
|[0.0,1.0,137269.0...|144489.35|
|[0.0,0.0,158321.4...|142784.65|
|[1.0,0.0,124390.8...|140099.04|
|[0.0,1.0,123371.5...|137174.93|
|[1.0,0.0,146851.5...|135552.37|
|[0.0,0.0,115949.7...| 134448.9|
|[1.0,0.0,155288.1...|132958.86|
+--------------------+---------+
only showing top 20 rows



In [38]:
train_dataset,test_dataset = final_data.randomSplit([0.7,0.3])

In [44]:
from pyspark.ml.regression import LinearRegression
MLR = LinearRegression(featuresCol = "features",labelCol = "profit")

In [45]:
model = MLR.fit(train_dataset)

In [46]:
pred = model.evaluate(test_dataset)

In [47]:
pred.predictions.show()

+--------------------+---------+------------------+
|            features|   profit|        prediction|
+--------------------+---------+------------------+
|[0.0,0.0,86484.77...| 87980.83| 85065.55365563772|
|[0.0,0.0,93165.77...|176369.94| 185129.8325300921|
|[0.0,0.0,145909.9...|115915.54|121360.33325914326|
|[0.0,0.0,184419.5...|113464.38|111432.02785916431|
|[0.0,1.0,84756.09...| 91187.76| 93377.74708858396|
|[0.0,1.0,115641.3...|128656.03|126390.94123157437|
|[0.0,1.0,137269.0...|144489.35| 137515.2997775195|
|[1.0,0.0,86821.44...|106661.51| 99300.84187939132|
|[1.0,0.0,101055.3...|118734.04|123145.77096041071|
|[1.0,0.0,120445.8...|193083.99| 184272.7246786328|
|[1.0,0.0,124390.8...|140099.04|156271.46705197188|
|[1.0,0.0,125927.0...| 75108.08| 55061.96877825327|
|[1.0,0.0,138671.8...|202443.83|203052.18796167916|
|[1.0,0.0,150492.9...|162393.77| 161255.5305817661|
|[1.0,0.0,154475.9...|107665.56|106127.61954295603|
+--------------------+---------+------------------+



In [48]:
coefficient = model.coefficients
print ("The coefficients of the model are : %a" %coefficient)

The coefficients of the model are : DenseVector([-1883.1807, -949.3654, -0.0464, 0.838, 0.0231])


In [49]:
intercept = model.intercept
print ("The Intercept of the model is : %f" %intercept)

The Intercept of the model is : 60008.544286


In [51]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluation = RegressionEvaluator(labelCol="profit", predictionCol="prediction")

# r2 - coefficient of determination
r2 = evaluation.evaluate(pred.predictions, {evaluation.metricName: "r2"})
print("r2: %.3f" %r2)

r2: 0.952


In [52]:
unlabeled_dataset = test_dataset.select('features')

In [53]:
unlabeled_dataset.show()

+--------------------+
|            features|
+--------------------+
|[0.0,0.0,86484.77...|
|[0.0,0.0,93165.77...|
|[0.0,0.0,145909.9...|
|[0.0,0.0,184419.5...|
|[0.0,1.0,84756.09...|
|[0.0,1.0,115641.3...|
|[0.0,1.0,137269.0...|
|[1.0,0.0,86821.44...|
|[1.0,0.0,101055.3...|
|[1.0,0.0,120445.8...|
|[1.0,0.0,124390.8...|
|[1.0,0.0,125927.0...|
|[1.0,0.0,138671.8...|
|[1.0,0.0,150492.9...|
|[1.0,0.0,154475.9...|
+--------------------+



In [55]:
new_predictions = model.transform(unlabeled_dataset)
new_predictions.show()


+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[0.0,0.0,86484.77...| 85065.55365563772|
|[0.0,0.0,93165.77...| 185129.8325300921|
|[0.0,0.0,145909.9...|121360.33325914326|
|[0.0,0.0,184419.5...|111432.02785916431|
|[0.0,1.0,84756.09...| 93377.74708858396|
|[0.0,1.0,115641.3...|126390.94123157437|
|[0.0,1.0,137269.0...| 137515.2997775195|
|[1.0,0.0,86821.44...| 99300.84187939132|
|[1.0,0.0,101055.3...|123145.77096041071|
|[1.0,0.0,120445.8...| 184272.7246786328|
|[1.0,0.0,124390.8...|156271.46705197188|
|[1.0,0.0,125927.0...| 55061.96877825327|
|[1.0,0.0,138671.8...|203052.18796167916|
|[1.0,0.0,150492.9...| 161255.5305817661|
|[1.0,0.0,154475.9...|106127.61954295603|
+--------------------+------------------+

