In [210]:
from pyspark.sql import SparkSession

In [211]:
# Initialize PySpark
spark = SparkSession.builder.appName("Dataframe").getOrCreate()

### Explore Dataset

In [212]:
# Import data from PySpark
df_pyspark = spark.read.csv("./other_data/tips_dataset.csv", header=True, inferSchema=True)

# Show first 4 rows
df_pyspark.show(4)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
+----------+----+------+------+---+------+----+
only showing top 4 rows



In [213]:
# Check for data typers
df_pyspark.dtypes

# Check for schema
df_pyspark.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



In [214]:
# Descrie PySparks data
df_pyspark.describe().show()

+-------+-----------------+------------------+------+------+----+------+------------------+
|summary|       total_bill|               tip|   sex|smoker| day|  time|              size|
+-------+-----------------+------------------+------+------+----+------+------------------+
|  count|              243|               242|   242|   243| 243|   243|               243|
|   mean|19.76617283950618| 3.001900826446281|  NULL|  NULL|NULL|  NULL|  2.57201646090535|
| stddev|8.915417545076668|1.3878517469281784|  NULL|  NULL|NULL|  NULL|0.9523561732459417|
|    min|             3.07|               1.0|Female|    No| Fri|Dinner|                 1|
|    max|            50.81|              10.0|  Male|   Yes|Thur| Lunch|                 6|
+-------+-----------------+------------------+------+------+----+------+------------------+



### Modify Dataset

In [215]:
# Add new column
df_pyspark.withColumn("amount_spend", df_pyspark["total_bill"]+df_pyspark["tip"]).show(4)

+----------+----+------+------+---+------+----+------------+
|total_bill| tip|   sex|smoker|day|  time|size|amount_spend|
+----------+----+------+------+---+------+----+------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        18.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        12.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|       24.51|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|       26.99|
+----------+----+------+------+---+------+----+------------+
only showing top 4 rows



In [216]:
# Rename column
df_pyspark.withColumnRenamed("total_bill", "total_spend").show(4)

+-----------+----+------+------+---+------+----+
|total_spend| tip|   sex|smoker|day|  time|size|
+-----------+----+------+------+---+------+----+
|      16.99|1.01|Female|    No|Sun|Dinner|   2|
|      10.34|1.66|  Male|    No|Sun|Dinner|   3|
|      21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|      23.68|3.31|  Male|    No|Sun|Dinner|   2|
+-----------+----+------+------+---+------+----+
only showing top 4 rows



In [217]:
# Delete column
df_pyspark.drop("size").show(4)

+----------+----+------+------+---+------+
|total_bill| tip|   sex|smoker|day|  time|
+----------+----+------+------+---+------+
|     16.99|1.01|Female|    No|Sun|Dinner|
|     10.34|1.66|  Male|    No|Sun|Dinner|
|     21.01| 3.5|  Male|    No|Sun|Dinner|
|     23.68|3.31|  Male|    No|Sun|Dinner|
+----------+----+------+------+---+------+
only showing top 4 rows



### Handle Null Value

In [218]:
# Show sample datasets with null values
df_pyspark.show(8)

+----------+----+------+------+----+------+----+
|total_bill| tip|   sex|smoker| day|  time|size|
+----------+----+------+------+----+------+----+
|     16.99|1.01|Female|    No| Sun|Dinner|   2|
|     10.34|1.66|  Male|    No| Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No| Sun|Dinner|   3|
|     23.68|3.31|  Male|    No| Sun|Dinner|   2|
|      NULL|3.61|Female|    No| Sun|  NULL|   4|
|     25.29|4.71|  NULL|    No| Sun|Dinner|   4|
|      8.77|NULL|  NULL|    No|NULL|Dinner|NULL|
|     26.88|NULL|  Male|  NULL| Sun|Dinner|   4|
+----------+----+------+------+----+------+----+
only showing top 8 rows



In [219]:
from pyspark.sql.functions import col, isnan, when, count

# Count how many null values in each column
df_pyspark.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_pyspark.columns]).show()

+----------+---+---+------+---+----+----+
|total_bill|tip|sex|smoker|day|time|size|
+----------+---+---+------+---+----+----+
|         1|  2|  2|     1|  1|   1|   1|
+----------+---+---+------+---+----+----+



In [220]:
from functools import reduce
from pyspark.sql import functions as F

# Show datasets that have null values
cols = [F.col(c) for c in df_pyspark.columns]
filter_null = reduce(lambda a, b: a | b.isNull(), cols[1:], cols[0].isNull())

df_pyspark.filter(filter_null).show()

+----------+----+------+------+----+------+----+
|total_bill| tip|   sex|smoker| day|  time|size|
+----------+----+------+------+----+------+----+
|      NULL|3.61|Female|    No| Sun|  NULL|   4|
|     25.29|4.71|  NULL|    No| Sun|Dinner|   4|
|      8.77|NULL|  NULL|    No|NULL|Dinner|NULL|
|     26.88|NULL|  Male|  NULL| Sun|Dinner|   4|
+----------+----+------+------+----+------+----+



In [221]:
# Drop datasets that have any null values
df_pyspark.na.drop(how="any").show(8)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 8 rows



In [222]:
# Drop datasets that doesn't have at least 6 non null values
df_pyspark.na.drop(how="any", thresh=6).show(8)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     25.29|4.71|  NULL|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
+----------+----+------+------+---+------+----+
only showing top 8 rows



In [223]:
# Drop datasets that have any nukk value on column tip
df_pyspark.na.drop(how="any", subset=["tip"]).show(8)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|      NULL|3.61|Female|    No|Sun|  NULL|   4|
|     25.29|4.71|  NULL|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
+----------+----+------+------+---+------+----+
only showing top 8 rows



In [224]:
# Change null value to -
df_pyspark.na.fill(0.0, ["tip"]).show(8)

+----------+----+------+------+----+------+----+
|total_bill| tip|   sex|smoker| day|  time|size|
+----------+----+------+------+----+------+----+
|     16.99|1.01|Female|    No| Sun|Dinner|   2|
|     10.34|1.66|  Male|    No| Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No| Sun|Dinner|   3|
|     23.68|3.31|  Male|    No| Sun|Dinner|   2|
|      NULL|3.61|Female|    No| Sun|  NULL|   4|
|     25.29|4.71|  NULL|    No| Sun|Dinner|   4|
|      8.77| 0.0|  NULL|    No|NULL|Dinner|NULL|
|     26.88| 0.0|  Male|  NULL| Sun|Dinner|   4|
+----------+----+------+------+----+------+----+
only showing top 8 rows



In [225]:
from pyspark.sql.functions import mean
from pyspark.sql.functions import round

# Calculate the mean value for each column
mean_values = df_pyspark.select(*(round(mean(col), 2).alias(col) for col in ["total_bill", "tip", "size"])).collect()[0].asDict()
df_pyspark.na.fill(mean_values).show(8)

+----------+----+------+------+----+------+----+
|total_bill| tip|   sex|smoker| day|  time|size|
+----------+----+------+------+----+------+----+
|     16.99|1.01|Female|    No| Sun|Dinner|   2|
|     10.34|1.66|  Male|    No| Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No| Sun|Dinner|   3|
|     23.68|3.31|  Male|    No| Sun|Dinner|   2|
|     19.77|3.61|Female|    No| Sun|  NULL|   4|
|     25.29|4.71|  NULL|    No| Sun|Dinner|   4|
|      8.77| 3.0|  NULL|    No|NULL|Dinner|   2|
|     26.88| 3.0|  Male|  NULL| Sun|Dinner|   4|
+----------+----+------+------+----+------+----+
only showing top 8 rows



In [226]:
from pyspark.ml.feature import Imputer
from pyspark.sql.functions import round

# Using imputation to find mean or median
imputer = Imputer(
    inputCols=["total_bill", "tip", "size"], 
    outputCols=["{}_imputed".format(c) for c in ["total_bill", "tip", "size"]]).setStrategy("median")

imputer.fit(df_pyspark).transform(df_pyspark).show(8)

+----------+----+------+------+----+------+----+------------------+-----------+------------+
|total_bill| tip|   sex|smoker| day|  time|size|total_bill_imputed|tip_imputed|size_imputed|
+----------+----+------+------+----+------+----+------------------+-----------+------------+
|     16.99|1.01|Female|    No| Sun|Dinner|   2|             16.99|       1.01|           2|
|     10.34|1.66|  Male|    No| Sun|Dinner|   3|             10.34|       1.66|           3|
|     21.01| 3.5|  Male|    No| Sun|Dinner|   3|             21.01|        3.5|           3|
|     23.68|3.31|  Male|    No| Sun|Dinner|   2|             23.68|       3.31|           2|
|      NULL|3.61|Female|    No| Sun|  NULL|   4|             17.78|       3.61|           4|
|     25.29|4.71|  NULL|    No| Sun|Dinner|   4|             25.29|       4.71|           4|
|      8.77|NULL|  NULL|    No|NULL|Dinner|NULL|              8.77|       2.88|           2|
|     26.88|NULL|  Male|  NULL| Sun|Dinner|   4|             26.88|   

In [227]:
df_pyspark = df_pyspark.na.drop(how="any")

### Filter Operation

In [228]:
# Filter rows that have total_bill >= 40.0
df_pyspark.filter(df_pyspark["total_bill"] >= 40.0).show()

+----------+----+------+------+----+------+----+
|total_bill| tip|   sex|smoker| day|  time|size|
+----------+----+------+------+----+------+----+
|     48.27|6.73|  Male|    No| Sat|Dinner|   4|
|     40.17|4.73|  Male|   Yes| Fri|Dinner|   4|
|      44.3| 2.5|Female|   Yes| Sat|Dinner|   3|
|     41.19| 5.0|  Male|    No|Thur| Lunch|   5|
|     48.17| 5.0|  Male|    No| Sun|Dinner|   6|
|     50.81|10.0|  Male|   Yes| Sat|Dinner|   3|
|     45.35| 3.5|  Male|   Yes| Sun|Dinner|   3|
|     40.55| 3.0|  Male|   Yes| Sun|Dinner|   2|
|     43.11| 5.0|Female|   Yes|Thur| Lunch|   4|
|     48.33| 9.0|  Male|    No| Sat|Dinner|   4|
+----------+----+------+------+----+------+----+



In [229]:
# Filter certain rows that have total_bill >= 40.0
df_pyspark.filter(df_pyspark["total_bill"] >= 40.0).select(["total_bill", "tip", "time"]).show()

+----------+----+------+
|total_bill| tip|  time|
+----------+----+------+
|     48.27|6.73|Dinner|
|     40.17|4.73|Dinner|
|      44.3| 2.5|Dinner|
|     41.19| 5.0| Lunch|
|     48.17| 5.0|Dinner|
|     50.81|10.0|Dinner|
|     45.35| 3.5|Dinner|
|     40.55| 3.0|Dinner|
|     43.11| 5.0| Lunch|
|     48.33| 9.0|Dinner|
+----------+----+------+



In [230]:
# Filter rows that have total_bill >= 40 and tip >= 5.0
df_pyspark.filter((df_pyspark["total_bill"] >= 40.0) & (df_pyspark["tip"] >= 5.0)).show()

+----------+----+------+------+----+------+----+
|total_bill| tip|   sex|smoker| day|  time|size|
+----------+----+------+------+----+------+----+
|     48.27|6.73|  Male|    No| Sat|Dinner|   4|
|     41.19| 5.0|  Male|    No|Thur| Lunch|   5|
|     48.17| 5.0|  Male|    No| Sun|Dinner|   6|
|     50.81|10.0|  Male|   Yes| Sat|Dinner|   3|
|     43.11| 5.0|Female|   Yes|Thur| Lunch|   4|
|     48.33| 9.0|  Male|    No| Sat|Dinner|   4|
+----------+----+------+------+----+------+----+



In [231]:
# Filter rows that doesn't tip >= 5.0
df_pyspark.filter(~(df_pyspark["tip"] >= 5.0)).show(4)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
+----------+----+------+------+---+------+----+
only showing top 4 rows



### Aggregate Value

In [232]:
# Sum function group by day
df_pyspark.groupBy("Day").sum().show()

+----+------------------+------------------+---------+
| Day|   sum(total_bill)|          sum(tip)|sum(size)|
+----+------------------+------------------+---------+
|Thur|1096.3299999999997|            171.83|      152|
| Sun|1541.6300000000003|233.95000000000007|      202|
| Sat|1778.3999999999996|             260.4|      219|
| Fri|325.87999999999994|             51.96|       40|
+----+------------------+------------------+---------+



In [233]:
# Mean function group by day
df_pyspark.groupBy("Day").mean().show()

+----+------------------+-----------------+------------------+
| Day|   avg(total_bill)|         avg(tip)|         avg(size)|
+----+------------------+-----------------+------------------+
|Thur|17.682741935483865|2.771451612903226|2.4516129032258065|
| Sun| 21.41152777777778|3.249305555555557|2.8055555555555554|
| Sat|20.441379310344825|2.993103448275862|2.5172413793103448|
| Fri|17.151578947368417|2.734736842105263|2.1052631578947367|
+----+------------------+-----------------+------------------+



In [234]:
# Count function group by day
df_pyspark.groupBy("Day").count().show()

+----+-----+
| Day|count|
+----+-----+
|Thur|   62|
| Sun|   72|
| Sat|   87|
| Fri|   19|
+----+-----+



In [235]:
df_pyspark.agg({"total_bill":"sum", "size":"average", "day": "count"}).show()

+------------------+----------+-----------------+
|         avg(size)|count(day)|  sum(total_bill)|
+------------------+----------+-----------------+
|2.5541666666666667|       240|4742.240000000001|
+------------------+----------+-----------------+



### Linear Regression

In [236]:
from pyspark.ml.feature import StringIndexer

# Indexing the data
indexer = StringIndexer(inputCols=["sex", "smoker", "day", "time"],
                        outputCols=["sex_indexed", "smoker_indexed", "day_indexed", "time_index"]) 

df_regression = indexer.fit(df_pyspark).transform(df_pyspark) 
df_regression.show(5)

+----------+----+------+------+---+------+----+-----------+--------------+-----------+----------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|day_indexed|time_index|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+----------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|       0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|       0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|       0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|       0.0|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|       0.0|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+----------+
only showing top 5 rows



In [237]:
from pyspark.ml.feature import VectorAssembler 

# Group all independent feature values into one array called Independet Features
feature = VectorAssembler(inputCols=["tip","size","sex_indexed","smoker_indexed","day_indexed", "time_index"],
                                outputCol="Independent Features") 

df_regression = feature.transform(df_regression)

df_regression.show(5)

+----------+----+------+------+---+------+----+-----------+--------------+-----------+----------+--------------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|day_indexed|time_index|Independent Features|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+----------+--------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|       0.0|[1.01,2.0,1.0,0.0...|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|       0.0|[1.66,3.0,0.0,0.0...|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|       0.0|[3.5,3.0,0.0,0.0,...|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|       0.0|[3.31,2.0,0.0,0.0...|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|       0.0|[1.96,2.0,0.0,0.0...|
+----------+----+------+------+---+------+----+-

In [238]:
# Final data include independet features and dependet feature (total_bill)
final_data = output.select("Independent Features","total_bill")
final_data.show(5)

+--------------------+----------+
|Independent Features|total_bill|
+--------------------+----------+
|[1.01,2.0,1.0,0.0...|     16.99|
|[1.66,3.0,0.0,0.0...|     10.34|
|[3.5,3.0,0.0,0.0,...|     21.01|
|[3.31,2.0,0.0,0.0...|     23.68|
|[1.96,2.0,0.0,0.0...|     15.04|
+--------------------+----------+
only showing top 5 rows



In [239]:
from pyspark.ml.regression import LinearRegression 

# Train and test split 
train_data, test_data = final_data.randomSplit([0.75,0.25]) 
regressor = LinearRegression(featuresCol='Independent Features', labelCol='total_bill') 
regressor = regressor.fit(train_data)

regressor.coefficients, regressor.intercept

(DenseVector([2.909, 3.5168, -0.9058, 2.1246, -0.2534, -1.1765]),
 2.125411627178693)

In [240]:
# Prediction result
pred_results = regressor.evaluate(test_data)
pred_results.predictions.show(5)

+--------------------+----------+------------------+
|Independent Features|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[1.45,2.0])|      9.55| 13.37700008281707|
|(6,[0,1],[1.47,2.0])|     10.77|13.435180109218228|
| (6,[0,1],[2.0,3.0])|     16.31|18.493719079626118|
|(6,[0,1],[2.64,3.0])|     17.59|20.355479924463207|
|(6,[0,1],[2.72,2.0])|     13.28|17.071431759290668|
+--------------------+----------+------------------+
only showing top 5 rows



In [241]:
# Performance result metric
pred_results.r2, pred_results.meanAbsoluteError, pred_results.meanSquaredError

(0.5927212319671727, 4.555080156179097, 42.71725247682671)