In [18]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[3]").appName('ML').getOrCreate()

In [19]:
pyspark_data = spark.read.csv("/home/vijendra/Work/Datasets/tips.csv",header=True,inferSchema=True)
pyspark_data.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [20]:
pyspark_data.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



In [21]:
pyspark_data.columns

['total_bill', 'tip', 'sex', 'smoker', 'day', 'time', 'size']

In [22]:
#Handling categorical features

from pyspark.ml.feature import StringIndexer
#StringIndexer will help to convert categorical features to numerical features

indexer = StringIndexer(inputCols=['sex','smoker','day','time'],outputCols=["Mod_{}".format(i) for i in ['sex','smoker','day','time']])

In [23]:
pyspark_data = indexer.fit(pyspark_data).transform(pyspark_data)
pyspark_data.show(3)

+----------+----+------+------+---+------+----+-------+----------+-------+--------+
|total_bill| tip|   sex|smoker|day|  time|size|Mod_sex|Mod_smoker|Mod_day|Mod_time|
+----------+----+------+------+---+------+----+-------+----------+-------+--------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|    1.0|       0.0|    1.0|     0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|    0.0|       0.0|    1.0|     0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|    0.0|       0.0|    1.0|     0.0|
+----------+----+------+------+---+------+----+-------+----------+-------+--------+
only showing top 3 rows



VectorAssembler is used to combine all dependent variable together which is used to train models in Spark

In [27]:
from pyspark.ml.feature import VectorAssembler

feature_assembler = VectorAssembler(inputCols=['total_bill','Mod_sex','Mod_smoker','Mod_day','Mod_time','size'],
               outputCol='independent features')

output = feature_assembler.transform(pyspark_data)

In [28]:
output.show()

+----------+----+------+------+---+------+----+-------+----------+-------+--------+--------------------+
|total_bill| tip|   sex|smoker|day|  time|size|Mod_sex|Mod_smoker|Mod_day|Mod_time|independent features|
+----------+----+------+------+---+------+----+-------+----------+-------+--------+--------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|    1.0|       0.0|    1.0|     0.0|[16.99,1.0,0.0,1....|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|    0.0|       0.0|    1.0|     0.0|[10.34,0.0,0.0,1....|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|    0.0|       0.0|    1.0|     0.0|[21.01,0.0,0.0,1....|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|    0.0|       0.0|    1.0|     0.0|[23.68,0.0,0.0,1....|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|    1.0|       0.0|    1.0|     0.0|[24.59,1.0,0.0,1....|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|    0.0|       0.0|    1.0|     0.0|[25.29,0.0,0.0,1....|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|    0.0|

In [29]:
output.select('independent features').show(3)

+--------------------+
|independent features|
+--------------------+
|[16.99,1.0,0.0,1....|
|[10.34,0.0,0.0,1....|
|[21.01,0.0,0.0,1....|
+--------------------+
only showing top 3 rows



In [30]:
final_df = output.select("independent features","tip")
final_df.show(3)

+--------------------+----+
|independent features| tip|
+--------------------+----+
|[16.99,1.0,0.0,1....|1.01|
|[10.34,0.0,0.0,1....|1.66|
|[21.01,0.0,0.0,1....| 3.5|
+--------------------+----+
only showing top 3 rows



In [31]:
from pyspark.ml.regression import LinearRegression

train_data,test_data = final_df.randomSplit([0.75,0.25])

regressor = LinearRegression(featuresCol="independent features",labelCol="tip")

regressor = regressor.fit(train_data)

In [34]:
regressor.coefficients

DenseVector([0.0935, -0.0147, -0.2373, 0.0948, -0.0793, 0.1397])

In [35]:
regressor.intercept

0.7793772662187829

In [38]:
#prediction

pred_data = regressor.evaluate(test_data)
pred_data.predictions.show()

+--------------------+----+------------------+
|independent features| tip|        prediction|
+--------------------+----+------------------+
|(6,[0,5],[10.07,2...|1.25|2.0007517607046603|
|(6,[0,5],[12.02,2...|1.97| 2.183153239511322|
|(6,[0,5],[13.28,2...|2.72|2.3010126565863955|
|(6,[0,5],[17.59,3...|2.64|2.8438839694012166|
|(6,[0,5],[17.81,4...|2.34|3.0041798728215516|
|(6,[0,5],[18.35,4...| 2.5|3.0546910515680117|
|(6,[0,5],[21.7,2.0])| 4.3|3.0886128881515713|
|[3.07,1.0,1.0,0.0...| 1.0|0.9542676328837931|
|[10.09,1.0,1.0,3....| 2.0|1.9555502355762657|
|[10.27,0.0,0.0,1....|1.71|2.1142159166253425|
|[13.27,1.0,1.0,0....| 2.5|2.0480849509146335|
|[13.39,1.0,0.0,1....|2.61|2.3913198613764326|
|[13.42,1.0,1.0,3....|3.48|2.2670358378461035|
|[13.81,0.0,1.0,0....| 2.0|2.1133345510006625|
|[14.15,1.0,0.0,2....| 2.0|2.4778170485663984|
|[14.31,1.0,1.0,0....| 4.0|  2.14536573961152|
|[14.83,1.0,0.0,1....|3.02|  2.52601633803366|
|[15.01,0.0,1.0,0....|2.09| 2.225581614881685|
|[15.77,1.0,0

In [42]:
pred_data.r2,pred_data.meanAbsoluteError,pred_data.meanSquaredError

(0.4905604286608517, 0.6867322889679136, 1.0053583344217607)