# Installing prerequisites

In [1]:
from google.colab import drive
drive.mount('/content/drive')
!ls /content/drive/MyDrive/MMDS
!cp /content/drive/MyDrive//MMDS/data.csv .
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark as fs
fs.init()

Mounted at /content/drive
data.csv  data.full.csv


In [2]:
!cp /content/drive/MyDrive//MMDS/data.full.csv .

# Task 1: Basket
- All members

In [3]:
!rm -r /content/baskets.csv # Remove existing folder if there is one

rm: cannot remove '/content/baskets.csv': No such file or directory


In [4]:
# Import necessary libraries
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window


# Initialize Spark session
spark = SparkSession.builder.appName("ShoppingBaskets").getOrCreate()

# Load the data.csv file into a DataFrame
df = spark.read.csv("data.full.csv", header=True, inferSchema=True)

# Convert date column to the correct format
df = df.withColumn("Date", F.to_date(F.col("Date"), "dd/MM/yyyy"))

window_spec = Window.partitionBy("Member_number", "Date")

# Group by Member_number and Date, then collect unique itemDescriptions into a list
grouped_df = df.withColumn("Basket", F.concat_ws(',',F.collect_set("itemDescription").over(window_spec)))

# Drop duplicates to get unique rows for each customer on each day
grouped_df = grouped_df.dropDuplicates(["Member_number", "Date", "Basket"]).select(["Member_number", "Date", "Basket"])

# Show the result

grouped_df = grouped_df.withColumn("Date", F.date_format(F.col("Date"), "dd/MM/yyyy"))

# Show the resulting DataFrame
grouped_df.show(truncate=False)

# Save the result to baskets.csv file with specified separators
grouped_df.write.option("sep", ";") \
                .csv("baskets.csv", header=True)
# Stop the Spark session
spark.stop()


+-------------+----------+--------------------------------------+
|Member_number|Date      |Basket                                |
+-------------+----------+--------------------------------------+
|1014         |01/04/2015|canned beer,cookware                  |
|1086         |17/11/2015|beef,butter,curd,soda,other vegetables|
|1113         |01/07/2014|whole milk,yogurt,newspapers          |
|1122         |25/06/2015|root vegetables,frozen vegetables     |
|1146         |23/05/2014|yogurt,soda                           |
|1192         |27/12/2014|pastry,brown bread                    |
|1222         |08/07/2015|canned beer,coffee                    |
|1276         |23/11/2015|condensed milk,hard cheese            |
|1323         |30/04/2015|whole milk,beef,bottled beer,soups    |
|1377         |12/11/2014|yogurt,tea                            |
|1390         |09/04/2014|frozen meals,butter                   |
|1574         |02/12/2014|frozen fish,bottled beer              |
|1604     

In [14]:
!cat /content/baskets.csv/part-00000-5d7aacac-5815-4304-bbb0-dee2d5e21456-c000.csv

Member_number;Date;Basket
1014;01/04/2015;canned beer,cookware
1086;17/11/2015;beef,butter,curd,soda,other vegetables
1113;01/07/2014;whole milk,yogurt,newspapers
1122;25/06/2015;root vegetables,frozen vegetables
1146;23/05/2014;yogurt,soda
1192;27/12/2014;pastry,brown bread
1222;08/07/2015;canned beer,coffee
1276;23/11/2015;condensed milk,hard cheese
1323;30/04/2015;whole milk,beef,bottled beer,soups
1377;12/11/2014;yogurt,tea
1390;09/04/2014;frozen meals,butter
1574;02/12/2014;frozen fish,bottled beer
1604;02/03/2014;specialty bar,rolls/buns
1793;27/10/2014;brown bread,other vegetables
1886;12/02/2015;canned beer,rolls/buns
1924;09/07/2015;sausage,semi-finished bread
2001;18/10/2014;butter,butter milk
2050;20/10/2014;rolls/buns,UHT-milk
2158;23/06/2014;newspapers,pot plants,brown bread
2159;17/07/2015;white bread,ham
2242;12/01/2014;butter,frozen vegetables
2347;15/01/2014;sausage,canned beer
2360;07/09/2014;rolls/buns,napkins
2400;02/06/2015;whole milk,ice cream,margarine
2427;22/12

# Abstract class
- Tran Nguyen Duy Bao: 521H0493

In [6]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import DataFrame

# Initialize Spark session
from abc import ABC, abstractmethod

class FindRules(ABC):
  def __init__(self,path: str, S: float, C: float):
    self.spark = SparkSession.builder.appName("FindRules").getOrCreate()
    self.sc = self.spark.sparkContext
    self.path=path
    self.S = S
    self.C = C
    self.baskets_df = self.read_baskets()
    self.item = self.baskets_df.select(explode(self.baskets_df["Basket"]) \
                                    .alias("item"))
    self.total_baskets = self.baskets_df.count()

  @abstractmethod
  def run(self):
    pass

  def read_baskets(self):
    # Read baskets from CSV file
    baskets_df = self.spark.read.csv(self.path,
                                     header=True,
                                     inferSchema=True,
                                     sep=';')

    baskets_df = baskets_df.withColumn("Basket",
                                       F.split(baskets_df["Basket"], ","))
    return baskets_df

  def _save(self, df: DataFrame, destination: str):
        df.write.option("sep", ";").csv(destination, header=True)

  def __delete__(self):
    self.spark.stop()


# Task 2: A-Priori
- Tran Nguyen Duy Bao: 521H0493
- Le Tran Nhat Quang: 521H0413

In [7]:
class APriori(FindRules):
    def find_frequent_items(self):
        # Count the occurrences of itemsets in the baskets
        itemset_counts = self.item.groupBy('item') \
                                            .agg(count('*').alias('count'))
        # Calculate support by dividing the count of itemsets by the total number of baskets
        support_df = itemset_counts.withColumn('support', itemset_counts['count'] / self.total_baskets)

        frequent_items_df = support_df.filter(support_df['support'] >= self.S)

        return frequent_items_df

    def _generate_pairs(self, frequent_items_df):
        # Generate candidate pairs from frequent items
        combinations_df = frequent_items_df.crossJoin(frequent_items_df.withColumnRenamed('item', 'item2'))

        # Create two separate columns for item1 and item2
        pairs = combinations_df \
            .filter(combinations_df["item"] < combinations_df["item2"]) \
            .withColumnRenamed('item', 'item1') \
            .select("item1", "item2")
        return pairs

    def generate_frequent_pairs(self,frequent_items_df):
        pairs = self._generate_pairs(frequent_items_df)
        basket = self.baskets_df.select('Basket')

        tmp_tb = basket.crossJoin(pairs)

        frequent_pairs = tmp_tb.filter(array_contains(tmp_tb["Basket"], tmp_tb["item1"]) &
                                        array_contains(tmp_tb["Basket"], tmp_tb["item2"])) \
                                .groupBy(tmp_tb["item1"],tmp_tb["item2"]) \
                                .count() \
                                .withColumnRenamed("count", "freq") \
                                .selectExpr("array(item1, item2) as items", "freq")

        return frequent_pairs

    def _generate__rule(self, frequent_items_df, frequent_pairs, i,j):
      # Calculate support and confidence
        assoc_rules_df = frequent_pairs.select(
            frequent_pairs['items'][i].alias('antecedent'),
            frequent_pairs['items'][j].alias('consequent'),
            frequent_pairs['freq']
        ).withColumn(
            'support',
            col('freq') / self.total_baskets
        )

        # Join assoc_rules_df with frequent_items_df to get support for antecedents
        assoc_rules_df = assoc_rules_df.join(
            broadcast(frequent_items_df),
            on=(assoc_rules_df['antecedent'] == frequent_items_df['item']),
            how='inner'
        ).select(
            'antecedent',
            'consequent',
            'freq',
            assoc_rules_df['support'].alias('support'),
            frequent_items_df['support'].alias('antecedent_support')
        )

        # Calculate confidence
        assoc_rules_df = assoc_rules_df.withColumn(
            'confidence',
            col('support') / col('antecedent_support')
        )

        assoc_rules_df = assoc_rules_df.drop('antecedent_support', 'freq')

        # Convert antecedent and consequent columns to lists of strings
        # assoc_rules_df = assoc_rules_df.withColumn(
        #     'antecedent',
        #     split(col('antecedent'), ', ')
        # ).withColumn(
        #     'consequent',
        #     split(col('consequent'), ', ')
        # )

        return assoc_rules_df


    def generate_association_rules(self, frequent_items_df, frequent_pairs):
        assoc_rules_1 = self._generate__rule(frequent_items_df,frequent_pairs,0,1)
        assoc_rules_2 = self._generate__rule(frequent_items_df,frequent_pairs,1,0)
        unioned_df = assoc_rules_1.union(assoc_rules_2)

        return unioned_df.filter(unioned_df.support >= self.S) \
                          .filter(unioned_df.confidence >= self.C)

    def run(self):
        frequent_items_df = self.find_frequent_items()
        frequent_pairs = self.generate_frequent_pairs(frequent_items_df)
        association_rules = self.generate_association_rules(frequent_items_df, frequent_pairs)

        print('='*15 + 'Apriori' + '='*15)
        print("Frequent Itemsets:")
        frequent_pairs.show(truncate=False)
        print("\nFiltered Association Rules:")
        association_rules = association_rules.orderBy(col("antecedent"), col("consequent"))
        association_rules.show(truncate=False)

        # Prepare data to save
        frequent_pairs = frequent_pairs.withColumn("items",
                                      F.concat_ws(
                                          ",", col("items")
                                      ))

        # save to file
        self._save(frequent_pairs,'apriori_frequent_pairs.csv')
        self._save(association_rules,'apriori_association_rules.csv')
        # frequent_pairs.write.option("header",True) \
        #         .csv('apriori_frequent_pairs.csv')

        # association_rules.write.option("header",True) \
        #         .csv('apriori_association_rules.csv')


# Task 3: PCY
- Nguyen Hai Tien Phat: 521H0126
- Nguyen Lam Duy: 521H0499
- Le Thanh Nhan: 521H0409

In [8]:
from pyspark.sql.types import StructType, StructField, ArrayType, StringType, IntegerType

class PCY(APriori):
    def __init__(self,path: str, S: float, C: float, bucket_size: int = 1000):
        super().__init__(path,S,C)
        self.bucket_size = bucket_size

    def generate_frequent_pairs(self, frequent_items_df):
        pairs = self._generate_pairs(frequent_items_df)
        pairs = pairs.withColumn('hash_value', abs(hash(concat_ws('\t',*pairs.columns)) % self.bucket_size))

        hash_df = pairs.select(pairs['hash_value']) \
                      .groupBy(pairs['hash_value']) \
                        .count()

        basket = self.baskets_df.select('Basket')


        hash_df = hash_df.filter((col("count") / self.total_baskets) >= self.S)
        hash_dict = hash_df.rdd.collectAsMap()
        if not hash_dict:
          schema = StructType([
                                  StructField("items", ArrayType(StringType(), True), True),
                                  StructField("freq", IntegerType(), True)
                              ])
          return self.spark.createDataFrame([], schema)
        tmp_tb = basket.crossJoin(pairs)

        frequent_pairs = tmp_tb.filter(expr('hash_value IN ({})'.format(','.join(map(str, hash_dict))))) \
                                .filter(array_contains(tmp_tb["Basket"], tmp_tb["item1"]) &
                                        array_contains(tmp_tb["Basket"], tmp_tb["item2"])) \
                                .groupBy(tmp_tb["item1"], tmp_tb["item2"]) \
                                .count() \
                                .withColumnRenamed("count", "freq") \
                                .selectExpr("array(item1, item2) as items", "freq")

        return frequent_pairs

    def run(self):
        frequent_items_df = self.find_frequent_items()
        frequent_pairs = self.generate_frequent_pairs(frequent_items_df)
        association_rules = self.generate_association_rules(frequent_items_df, frequent_pairs)

        print('=' * 15 + 'PCY' + '=' * 15)
        print("Frequent Itemsets:")
        frequent_pairs.show(truncate=False)
        print("\nFiltered Association Rules:")
        association_rules = association_rules.orderBy(col("antecedent"), col("consequent"))
        association_rules.show(truncate=False)

        # Prepare data to save
        frequent_pairs = frequent_pairs.withColumn("items",
                                                  F.concat_ws(
                                                      ",", col("items")
                                                  ))

        # save to file
        self._save(frequent_pairs, 'pcy_frequent_pairs.csv')
        self._save(association_rules, 'pcy_association_rules.csv')



# Task 4: FPGrowth
- Nguyen Hai Tien Phat: 521H0126

In [9]:
from pyspark.ml.fpm import FPGrowth as FPG
from pyspark.sql.functions import col
from pyspark.sql import SparkSession

class FPGrowth(FindRules):

    def run(self):
        fp = FPG(minSupport=self.S, minConfidence=self.C, itemsCol='Basket', predictionCol='prediction')
        self.model = fp.fit(self.baskets_df)
        self.display_results()

    def display_results(self):
        print('='*15 + 'FPGrowth' + '='*15)

        print("Frequent Itemsets:")
        self.model.freqItemsets.filter(size(self.model.freqItemsets.items) == 2) \
                                .show(truncate=False)

        print("\nFiltered Association Rules:")
        assoc_rules = self.model.associationRules.filter((size(self.model.associationRules.antecedent) == 1) &
                                                        (size(self.model.associationRules.consequent) == 1))
        assoc_rules = assoc_rules.withColumn("antecedent", explode(assoc_rules.antecedent)) \
                   .withColumn("consequent", explode(assoc_rules.consequent))
        assoc_rules = assoc_rules.orderBy(col("antecedent"), col("consequent"))
        assoc_rules.show(truncate=False)


# Comparision

In [10]:
# Delete all folders if exist:
!rm -r /content/apriori_frequent_pairs.csv
!rm -r /content/pcy_frequent_pairs.csv
!rm -r /content/apriori_association_rules.csv
!rm -r /content/pcy_association_rules.csv

rm: cannot remove '/content/apriori_frequent_pairs.csv': No such file or directory
rm: cannot remove '/content/pcy_frequent_pairs.csv': No such file or directory
rm: cannot remove '/content/apriori_association_rules.csv': No such file or directory
rm: cannot remove '/content/pcy_association_rules.csv': No such file or directory


> Calculating Apriori algorithm:

In [11]:
apriori = APriori('/content/baskets.csv',S = 2E-4, C = .5)
apriori.run()

Frequent Itemsets:
+---------------------------------------+----+
|items                                  |freq|
+---------------------------------------+----+
|[meat, white wine]                     |1   |
|[meat, other vegetables]               |32  |
|[frankfurter, pudding powder]          |1   |
|[frankfurter, waffles]                 |7   |
|[canned vegetables, turkey]            |1   |
|[cream cheese , white wine]            |2   |
|[potato products, specialty bar]       |1   |
|[other vegetables, specialty cheese]   |4   |
|[beef, meat spreads]                   |2   |
|[beef, salt]                           |2   |
|[flower soil/fertilizer, soda]         |1   |
|[hard cheese, instant coffee]          |1   |
|[hard cheese, pip fruit]               |16  |
|[roll products, seasonal products]     |2   |
|[cocoa drinks, whole milk]             |2   |
|[packaged fruit/vegetables, popcorn]   |1   |
|[packaged fruit/vegetables, pot plants]|1   |
|[packaged fruit/vegetables, pip fruit] |

> Calculating PCY algorithm:

In [12]:
pcy = PCY('/content/baskets.csv',S = 2E-4, C = .5)
pcy.run()

Frequent Itemsets:
+---------------------------------------+----+
|items                                  |freq|
+---------------------------------------+----+
|[meat, white wine]                     |1   |
|[meat, other vegetables]               |32  |
|[frankfurter, pudding powder]          |1   |
|[frankfurter, waffles]                 |7   |
|[canned vegetables, turkey]            |1   |
|[cream cheese , white wine]            |2   |
|[potato products, specialty bar]       |1   |
|[other vegetables, specialty cheese]   |4   |
|[beef, meat spreads]                   |2   |
|[beef, salt]                           |2   |
|[flower soil/fertilizer, soda]         |1   |
|[hard cheese, instant coffee]          |1   |
|[hard cheese, pip fruit]               |16  |
|[roll products, seasonal products]     |2   |
|[cocoa drinks, whole milk]             |2   |
|[packaged fruit/vegetables, popcorn]   |1   |
|[packaged fruit/vegetables, pot plants]|1   |
|[packaged fruit/vegetables, pip fruit] |

> Calculating FPGrowth algorithm:

In [13]:
fpg = FPGrowth('/content/baskets.csv', S = 2E-4, C = .5)
fpg.run()

Frequent Itemsets:
+------------------------------------+----+
|items                               |freq|
+------------------------------------+----+
|[meat spreads, sausage]             |5   |
|[meat spreads, other vegetables]    |4   |
|[meat spreads, domestic eggs]       |3   |
|[meat spreads, whipped/sour cream]  |3   |
|[meat spreads, whole milk]          |3   |
|[spread cheese, beef]               |3   |
|[spread cheese, sugar]              |6   |
|[spread cheese, sausage]            |8   |
|[spread cheese, pip fruit]          |5   |
|[spread cheese, specialty chocolate]|3   |
|[spread cheese, pork]               |4   |
|[spread cheese, rolls/buns]         |8   |
|[spread cheese, root vegetables]    |9   |
|[spread cheese, specialty bar]      |4   |
|[spread cheese, yogurt]             |7   |
|[spread cheese, newspapers]         |3   |
|[spread cheese, bottled water]      |7   |
|[spread cheese, pastry]             |3   |
|[spread cheese, onions]             |3   |
|[spread chee