In [36]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Mllib").getOrCreate()

In [37]:
df = spark.read.csv("tips.csv", header=True, inferSchema=True)

In [38]:
df.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [39]:
df.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



In [40]:
### Handling Categorical Features
from pyspark.ml.feature import StringIndexer

categorical_features = ["sex", "smoker", "day", "time"]
indexer=StringIndexer(inputCols=categorical_features,
    outputCols=[col+"_indexed" for col in categorical_features])
df_r=indexer.fit(df).transform(df)

In [41]:
df_r[[col for col in df_r.columns if "index" in col]].show()

+-----------+--------------+-----------+------------+
|sex_indexed|smoker_indexed|day_indexed|time_indexed|
+-----------+--------------+-----------+------------+
|        1.0|           0.0|        1.0|         0.0|
|        0.0|           0.0|        1.0|         0.0|
|        0.0|           0.0|        1.0|         0.0|
|        0.0|           0.0|        1.0|         0.0|
|        1.0|           0.0|        1.0|         0.0|
|        0.0|           0.0|        1.0|         0.0|
|        0.0|           0.0|        1.0|         0.0|
|        0.0|           0.0|        1.0|         0.0|
|        0.0|           0.0|        1.0|         0.0|
|        0.0|           0.0|        1.0|         0.0|
|        0.0|           0.0|        1.0|         0.0|
|        1.0|           0.0|        1.0|         0.0|
|        0.0|           0.0|        1.0|         0.0|
|        0.0|           0.0|        1.0|         0.0|
|        1.0|           0.0|        1.0|         0.0|
|        0.0|           0.0|

In [43]:
from pyspark.ml.feature import VectorAssembler
feature_assembler = VectorAssembler(
    inputCols=["tip", "size", "sex_indexed", "smoker_indexed", "day_indexed", "time_indexed"],
    outputCol="Indepented Features")

output = feature_assembler.transform(df_r)
finalized_output = output.select("Indepented Features", "total_bill")

finalized_output.show()

+--------------------+----------+
| Indepented Features|total_bill|
+--------------------+----------+
|[1.01,2.0,1.0,0.0...|     16.99|
|[1.66,3.0,0.0,0.0...|     10.34|
|[3.5,3.0,0.0,0.0,...|     21.01|
|[3.31,2.0,0.0,0.0...|     23.68|
|[3.61,4.0,1.0,0.0...|     24.59|
|[4.71,4.0,0.0,0.0...|     25.29|
|[2.0,2.0,0.0,0.0,...|      8.77|
|[3.12,4.0,0.0,0.0...|     26.88|
|[1.96,2.0,0.0,0.0...|     15.04|
|[3.23,2.0,0.0,0.0...|     14.78|
|[1.71,2.0,0.0,0.0...|     10.27|
|[5.0,4.0,1.0,0.0,...|     35.26|
|[1.57,2.0,0.0,0.0...|     15.42|
|[3.0,4.0,0.0,0.0,...|     18.43|
|[3.02,2.0,1.0,0.0...|     14.83|
|[3.92,2.0,0.0,0.0...|     21.58|
|[1.67,3.0,1.0,0.0...|     10.33|
|[3.71,3.0,0.0,0.0...|     16.29|
|[3.5,3.0,1.0,0.0,...|     16.97|
|(6,[0,1],[3.35,3.0])|     20.65|
+--------------------+----------+
only showing top 20 rows



In [44]:
# Create model
from pyspark.ml.regression import LinearRegression
train_data, test_data = finalized_output.randomSplit([0.75, 0.25])
regressor = LinearRegression(featuresCol="Indepented Features",
                             labelCol="total_bill")
model = regressor.fit(train_data)

In [47]:
model.coefficients, model.intercept

(DenseVector([3.2302, 3.272, -0.3604, 0.9471, -0.8456, -0.1093]),
 2.0569124929073466)

In [49]:
# Predictions
pred_results = model.evaluate(test_data)
pred_results.predictions.show()

+--------------------+----------+------------------+
| Indepented Features|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[1.47,2.0])|     10.77|13.349327298244024|
|(6,[0,1],[1.75,2.0])|     17.82|14.253771423084864|
| (6,[0,1],[2.5,4.0])|     18.35|  23.2204727645451|
|(6,[0,1],[2.64,3.0])|     17.59| 20.40065325200438|
|(6,[0,1],[2.72,2.0])|     13.28|17.387024284140622|
|(6,[0,1],[3.76,2.0])|     18.24|20.746388176406587|
| (6,[0,1],[5.0,3.0])|     31.27|28.023825161377154|
|(6,[0,1],[5.92,3.0])|     29.03| 30.99557014299705|
|[1.17,2.0,0.0,1.0...|     32.83|13.327344165954912|
|[1.32,2.0,0.0,0.0...|      9.68|12.019178317996218|
|[1.44,2.0,0.0,0.0...|      7.56|11.451822472298854|
|[1.5,2.0,0.0,0.0,...|     12.46|10.909356000084898|
|[1.66,3.0,0.0,0.0...|     10.34| 16.38947347312123|
|[1.67,3.0,1.0,0.0...|     10.33|16.061412235467355|
|[1.68,2.0,1.0,0.0...|     13.42| 11.86669748004995|
|[1.73,2.0,0.0,0.0...|      9.78|12.3885681730

In [50]:
#Performance Metrics
pred_results.r2, pred_results.meanAbsoluteError, pred_results.meanAbsoluteError

(0.4016222001169789, 5.120302399746915, 5.120302399746915)