In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=e55095d936983948e4ae87163be760df7c228ef743ea1a43fe43abde96e3d36e
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [17]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Groceries Analysis").getOrCreate()
df = spark.read.csv('Groceries.csv', header=True, inferSchema=True)


In [18]:
df.show(5)


+-------------+----------+----------------+
|Member_number|      Date| itemDescription|
+-------------+----------+----------------+
|         1808|21-07-2015|  tropical fruit|
|         2552|05-01-2015|      whole milk|
|         2300|19-09-2015|       pip fruit|
|         1187|12-12-2015|other vegetables|
|         3037|01-02-2015|      whole milk|
+-------------+----------+----------------+
only showing top 5 rows



In [19]:
print(df.count())

38765


In [20]:
df = df.drop('Date')

In [21]:
from pyspark.sql import functions as F

df_grouped = df.groupBy("Member_number").agg(F.collect_set("itemDescription").alias("products"))
df_grouped.show(truncate=False)



+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Member_number|products                                                                                                                                                                                                                                      |
+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1000         |[pickled vegetables, whole milk, misc. beverages, pastry, salty snack, sausage, canned beer, semi-finished bread, hygiene articles, yogurt, soda]                                                                           

In [22]:
df_grouped_with_counts = df_grouped.withColumn("product_count", F.size("products"))
df_grouped_with_counts.filter("product_count > 10").select("Member_number").show()

+-------------+
|Member_number|
+-------------+
|         1000|
|         1004|
|         1006|
|         1008|
|         1011|
|         1012|
|         1013|
|         1023|
|         1026|
|         1028|
|         1032|
|         1033|
|         1038|
|         1050|
|         1051|
|         1052|
|         1061|
|         1062|
|         1065|
|         1077|
+-------------+
only showing top 20 rows



In [23]:
from pyspark.ml.fpm import FPGrowth

fpGrowth = FPGrowth(itemsCol="products", minSupport=0.15, minConfidence=0.15)
model = fpGrowth.fit(df_grouped)
frequent_itemsets = model.freqItemsets
frequent_itemsets.show()



+--------------------+----+
|               items|freq|
+--------------------+----+
|           [sausage]| 803|
|         [pip fruit]| 665|
|        [rolls/buns]|1363|
|[rolls/buns, whol...| 696|
|    [tropical fruit]| 911|
|      [bottled beer]| 619|
|            [yogurt]|1103|
|[yogurt, whole milk]| 587|
|       [canned beer]| 644|
|     [bottled water]| 833|
|  [other vegetables]|1468|
|[other vegetables...| 746|
|            [pastry]| 692|
|      [citrus fruit]| 723|
|     [shopping bags]| 656|
|              [soda]|1222|
|  [soda, whole milk]| 589|
|   [root vegetables]| 899|
|[whipped/sour cream]| 603|
|        [whole milk]|1786|
+--------------------+----+



In [24]:
rules = model.associationRules
rules.filter(rules.confidence >= 0.4).show()

+------------------+------------------+-------------------+------------------+-------------------+
|        antecedent|        consequent|         confidence|              lift|            support|
+------------------+------------------+-------------------+------------------+-------------------+
|[other vegetables]|      [whole milk]| 0.5081743869209809|1.1091062487222754| 0.1913801949717804|
|          [yogurt]|      [whole milk]| 0.5321849501359928|1.1615100423460805|0.15059004617752694|
|      [rolls/buns]|      [whole milk]| 0.5106382978723404|1.1144838102499344|0.17855310415597742|
|      [whole milk]|[other vegetables]| 0.4176931690929451|1.1091062487222754| 0.1913801949717804|
|            [soda]|      [whole milk]|0.48199672667757776|1.0519726990980953|0.15110312981015905|
+------------------+------------------+-------------------+------------------+-------------------+

