In [1]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd


In [21]:
spark = SparkSession.builder.appName("Introduction to Spark").getOrCreate()
spark

In [44]:
df = spark.read.csv("tips.csv", header = True, inferSchema = True)
df.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [45]:
for i in ['sex','day','time','size']:
    df.groupBy(i).count().show()

+------+-----+
|   sex|count|
+------+-----+
|Female|   87|
|  Male|  157|
+------+-----+

+----+-----+
| day|count|
+----+-----+
|Thur|   62|
| Sun|   76|
| Sat|   87|
| Fri|   19|
+----+-----+

+------+-----+
|  time|count|
+------+-----+
| Lunch|   68|
|Dinner|  176|
+------+-----+

+----+-----+
|size|count|
+----+-----+
|   1|    4|
|   6|    4|
|   3|   38|
|   5|    5|
|   4|   37|
|   2|  156|
+----+-----+



In [46]:
from pyspark.ml.feature import StringIndexer

In [47]:
indexer=StringIndexer(inputCols=["sex","smoker","time","day"],outputCols=["sex_indexed","smoker_indexed","time_indexed","day_indexed"])
df_r=indexer.fit(df).transform(df)
df_r.show()

+----------+----+------+------+---+------+----+-----------+--------------+------------+-----------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|time_indexed|day_indexed|
+----------+----+------+------+---+------+----+-----------+--------------+------------+-----------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|         0.0|        1.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|         0.0|        1.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|         0.0|        1.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|         0.0|        1.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|         0.0|        1.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|           0.0|         0.0|        1.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|         0.0|        1.0|


In [48]:
df_r.columns

['total_bill',
 'tip',
 'sex',
 'smoker',
 'day',
 'time',
 'size',
 'sex_indexed',
 'smoker_indexed',
 'time_indexed',
 'day_indexed']

In [50]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=['tip','size','sex_indexed','smoker_indexed','day_indexed','time_indexed'],outputCol="Independent Features")
output=featureassembler.transform(df_r)

In [52]:
output.show(3)

+----------+----+------+------+---+------+----+-----------+--------------+------------+-----------+--------------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|time_indexed|day_indexed|Independent Features|
+----------+----+------+------+---+------+----+-----------+--------------+------------+-----------+--------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|         0.0|        1.0|[1.01,2.0,1.0,0.0...|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|         0.0|        1.0|[1.66,3.0,0.0,0.0...|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|         0.0|        1.0|[3.5,3.0,0.0,0.0,...|
+----------+----+------+------+---+------+----+-----------+--------------+------------+-----------+--------------------+
only showing top 3 rows



In [53]:
finalized_data=output.select("Independent Features","total_bill")

## LR Model

In [54]:
from pyspark.ml.regression import LinearRegression
##train test split
train_data,test_data=finalized_data.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='Independent Features', labelCol='total_bill')
regressor=regressor.fit(train_data)

In [55]:
regressor.coefficients

DenseVector([2.7754, 3.5859, -0.667, 2.5551, 0.0796, -1.5355])

In [56]:
regressor.intercept

2.26213245743256

In [57]:
### Predictions
pred_results=regressor.evaluate(test_data)

In [58]:
## Final comparison
pred_results.predictions.show()

+--------------------+----------+------------------+
|Independent Features|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[1.25,2.0])|     10.07|12.903275642130609|
|(6,[0,1],[1.75,2.0])|     17.82|14.290998596021975|
|(6,[0,1],[2.24,3.0])|     16.04|19.236884990820332|
|(6,[0,1],[2.31,3.0])|     18.69|19.431166204365123|
|(6,[0,1],[2.34,4.0])|     17.81|23.100347481583416|
|(6,[0,1],[2.64,3.0])|     17.59|20.347063353933425|
| (6,[0,1],[3.6,3.0])|     24.06|23.011491425404852|
|[1.0,1.0,1.0,1.0,...|      3.07| 10.51154316010556|
|[1.0,2.0,1.0,1.0,...|      5.75|14.336399501168057|
|[1.1,2.0,1.0,1.0,...|      12.9|14.375005650868648|
|[1.32,2.0,0.0,0.0...|      9.68|13.177203002701294|
|[1.44,2.0,0.0,0.0...|      7.56|12.054389719644174|
|[1.5,2.0,0.0,0.0,...|     12.46|13.836075560153972|
|[1.5,2.0,0.0,1.0,...|     11.59|  16.1521898447604|
|[1.5,2.0,0.0,1.0,...|     12.03|16.391128285838082|
|[1.5,2.0,1.0,0.0,...|      8.35|11.5539106433

In [61]:
### PErformance Metrics
print(f"r2: ",{pred_results.r2},"\nMEA: ",pred_results.meanAbsoluteError,"\nMSE: ",pred_results.meanSquaredError)

r2:  {0.6300619118682482} 
MEA:  4.0743914104522325 
MSE:  30.747883840353932
