In [18]:

# importing library to load pyspark
from pyspark.sql import SparkSession

In [19]:
# Create a Spark session (entry point for PySpark)
# 'appName' sets the name of the application for Spark UI, and 'getOrCreate' retrieves or creates the session
spark=SparkSession.builder.appName('Linear Regression').getOrCreate()


In [20]:
# Reading the file 
df=spark.read.csv('tips.csv',inferSchema='True',header=True)

In [21]:
# showing columns and its data types
df.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



In [72]:
#showing first 5 columns
df.head(5)

[Row(total_bill=16.99, tip=1.01, sex='Female', smoker='No', day='Sun', time='Dinner', size=2),
 Row(total_bill=10.34, tip=1.66, sex='Male', smoker='No', day='Sun', time='Dinner', size=3),
 Row(total_bill=21.01, tip=3.5, sex='Male', smoker='No', day='Sun', time='Dinner', size=3),
 Row(total_bill=23.68, tip=3.31, sex='Male', smoker='No', day='Sun', time='Dinner', size=2),
 Row(total_bill=24.59, tip=3.61, sex='Female', smoker='No', day='Sun', time='Dinner', size=4)]

In [47]:
df.describe().show()

+-------+------------------+------------------+------+------+----+------+------------------+
|summary|        total_bill|               tip|   sex|smoker| day|  time|              size|
+-------+------------------+------------------+------+------+----+------+------------------+
|  count|               244|               244|   244|   244| 244|   244|               244|
|   mean|19.785942622950824|2.9982786885245902|  NULL|  NULL|NULL|  NULL| 2.569672131147541|
| stddev| 8.902411954856857|1.3836381890011815|  NULL|  NULL|NULL|  NULL|0.9510998047322347|
|    min|              3.07|               1.0|Female|    No| Fri|Dinner|                 1|
|    max|             50.81|              10.0|  Male|   Yes|Thur| Lunch|                 6|
+-------+------------------+------------------+------+------+----+------+------------------+



In [73]:
#showing all columns
df.columns

['total_bill', 'tip', 'sex', 'smoker', 'day', 'time', 'size']

In [24]:
#printing datatypes of all column
df.dtypes

[('total_bill', 'double'),
 ('tip', 'double'),
 ('sex', 'string'),
 ('smoker', 'string'),
 ('day', 'string'),
 ('time', 'string'),
 ('size', 'int')]

In [25]:
### Handling Categorical Features
from pyspark.ml.feature import StringIndexer

cat_columns=[col for col,dtype in df.dtypes if dtype=='string']
index= StringIndexer(inputCols=cat_columns,outputCols=[i+"_indexed" for i in cat_columns])
df_i=index.fit(df).transform(df)

In [26]:
df_i.show()

+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|day_indexed|time_indexed|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|         0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|         0.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|           0.0|        1.0|         0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|


In [30]:
# Selecting wanted numerical colum to pass in to linear regression model
num_col=[col for col,dtype in df_i.dtypes if dtype=='int' or dtype=='double' ]
df_t=df_i.select(*num_col)

In [31]:
df_t.show()

+----------+----+----+-----------+--------------+-----------+------------+
|total_bill| tip|size|sex_indexed|smoker_indexed|day_indexed|time_indexed|
+----------+----+----+-----------+--------------+-----------+------------+
|     16.99|1.01|   2|        1.0|           0.0|        1.0|         0.0|
|     10.34|1.66|   3|        0.0|           0.0|        1.0|         0.0|
|     21.01| 3.5|   3|        0.0|           0.0|        1.0|         0.0|
|     23.68|3.31|   2|        0.0|           0.0|        1.0|         0.0|
|     24.59|3.61|   4|        1.0|           0.0|        1.0|         0.0|
|     25.29|4.71|   4|        0.0|           0.0|        1.0|         0.0|
|      8.77| 2.0|   2|        0.0|           0.0|        1.0|         0.0|
|     26.88|3.12|   4|        0.0|           0.0|        1.0|         0.0|
|     15.04|1.96|   2|        0.0|           0.0|        1.0|         0.0|
|     14.78|3.23|   2|        0.0|           0.0|        1.0|         0.0|
|     10.27|1.71|   2|   

In [34]:
 # Creating user defined datatype for predicition
from pyspark.ml.feature import VectorAssembler

featureAssempler=VectorAssembler(inputCols=[i for i in df_t.columns if i !='total_bill'],
                                 outputCol='independent_features')

df_new=featureAssempler.transform(df_t)
df_new.show()


+----------+----+----+-----------+--------------+-----------+------------+--------------------+
|total_bill| tip|size|sex_indexed|smoker_indexed|day_indexed|time_indexed|independent_features|
+----------+----+----+-----------+--------------+-----------+------------+--------------------+
|     16.99|1.01|   2|        1.0|           0.0|        1.0|         0.0|[1.01,2.0,1.0,0.0...|
|     10.34|1.66|   3|        0.0|           0.0|        1.0|         0.0|[1.66,3.0,0.0,0.0...|
|     21.01| 3.5|   3|        0.0|           0.0|        1.0|         0.0|[3.5,3.0,0.0,0.0,...|
|     23.68|3.31|   2|        0.0|           0.0|        1.0|         0.0|[3.31,2.0,0.0,0.0...|
|     24.59|3.61|   4|        1.0|           0.0|        1.0|         0.0|[3.61,4.0,1.0,0.0...|
|     25.29|4.71|   4|        0.0|           0.0|        1.0|         0.0|[4.71,4.0,0.0,0.0...|
|      8.77| 2.0|   2|        0.0|           0.0|        1.0|         0.0|[2.0,2.0,0.0,0.0,...|
|     26.88|3.12|   4|        0.0|      

In [38]:
df_final=df_new.select('independent_features','total_bill')
df_final.show()

+--------------------+----------+
|independent_features|total_bill|
+--------------------+----------+
|[1.01,2.0,1.0,0.0...|     16.99|
|[1.66,3.0,0.0,0.0...|     10.34|
|[3.5,3.0,0.0,0.0,...|     21.01|
|[3.31,2.0,0.0,0.0...|     23.68|
|[3.61,4.0,1.0,0.0...|     24.59|
|[4.71,4.0,0.0,0.0...|     25.29|
|[2.0,2.0,0.0,0.0,...|      8.77|
|[3.12,4.0,0.0,0.0...|     26.88|
|[1.96,2.0,0.0,0.0...|     15.04|
|[3.23,2.0,0.0,0.0...|     14.78|
|[1.71,2.0,0.0,0.0...|     10.27|
|[5.0,4.0,1.0,0.0,...|     35.26|
|[1.57,2.0,0.0,0.0...|     15.42|
|[3.0,4.0,0.0,0.0,...|     18.43|
|[3.02,2.0,1.0,0.0...|     14.83|
|[3.92,2.0,0.0,0.0...|     21.58|
|[1.67,3.0,1.0,0.0...|     10.33|
|[3.71,3.0,0.0,0.0...|     16.29|
|[3.5,3.0,1.0,0.0,...|     16.97|
|(6,[0,1],[3.35,3.0])|     20.65|
+--------------------+----------+
only showing top 20 rows



In [51]:
#spliting data for training and testing
train,test=df_final.randomSplit([0.75,0.25])


In [None]:
#Passing data to linear regression model
from pyspark.ml.regression import LinearRegression

Linear_model=LinearRegression(featuresCol='independent_features',labelCol='total_bill')
regressor=Linear_model.fit(train)

In [61]:
# intrecept value or slope
regressor.intercept

1.902539692317333

In [65]:
# Regression Coefficients
regressor.coefficients

DenseVector([3.1125, 3.3017, -0.8311, 3.2457, -0.5886, -0.3923])

In [67]:
# passing testing dat into model for prediciton
pred_results=regressor.evaluate(test)

In [68]:
#prediciton results
pred_results.predictions.show()

+--------------------+----------+------------------+
|independent_features|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[1.25,2.0])|     10.07|12.396596170148081|
|(6,[0,1],[1.75,2.0])|     17.82|13.952851747578995|
| (6,[0,1],[2.0,2.0])|     13.37|14.730979536294452|
|(6,[0,1],[2.72,2.0])|     13.28| 16.97198756779497|
|(6,[0,1],[7.58,4.0])|     39.42|38.702209314676914|
|[1.0,2.0,1.0,1.0,...|      5.75|12.267348234132216|
|[1.44,2.0,0.0,1.0...|      7.74|16.233692204078118|
|[1.48,2.0,0.0,0.0...|      8.52|11.543030224297251|
|[1.5,2.0,0.0,1.0,...|     15.69|15.831873786414267|
|[1.5,2.0,1.0,0.0,...|      8.35|10.774148646454472|
|[1.5,2.0,1.0,0.0,...|     11.17|10.774148646454472|
|[1.56,2.0,0.0,0.0...|      9.94|12.772905541199687|
|[1.71,2.0,0.0,0.0...|     10.27|13.239782214428962|
|[1.76,2.0,0.0,1.0...|     11.24|17.229695773633903|
|[1.96,2.0,0.0,0.0...|     15.04|14.017910003144417|
|[2.0,2.0,0.0,1.0,...|     13.81| 17.976698450

In [69]:
# printing RMSE, R2
pred_results.rootMeanSquaredError, pred_results.r2

(5.648458914448821, 0.6189716005019112)

In [74]:
# # Save the trained model to a specific directory
regressor.save("model")


In [None]:

# Load the saved model
loaded_model = LinearRegressionModel.load("model")
