# Setup

In [1]:
from google.colab import drive
drive.mount('/content/drive/', force_remount=True)

Mounted at /content/drive/


In [2]:
!pwd

/content


In [3]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [5]:
!wget -q https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz

In [6]:
# !cp /content/spark-3.1.1-bin-hadoop3.2.tgz .
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [7]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [8]:
import findspark
findspark.init()

In [9]:
import pyspark as spark
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, \
                ArrayType, StringType, IntegerType, FloatType
from itertools import combinations

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

# Load Dataset

In [26]:
"""Adjust as need"""
path = "/content/drive/MyDrive/MMDS/Midterm/baskets.csv"

df = spark.read.csv(path, header=True)

In [27]:
df = df.groupBy(["Member_number", "Date"]) \
        .agg(F.sort_array(F.collect_set("itemDescription")).alias("Basket")) \
        .select("Basket")

Data for PCY should be formated like this

In [33]:
df.show(5, truncate=False)

+----------------------------------------------------------+
|Basket                                                    |
+----------------------------------------------------------+
|[butter, fruit/vegetable juice, pork, sausage, whole milk]|
|[butter, sweet spreads]                                   |
|[oil, shopping bags]                                      |
|[frankfurter, white bread]                                |
|[curd, spices]                                            |
+----------------------------------------------------------+
only showing top 5 rows



In [40]:
tmp = df.rdd

PCY Class

In [35]:
class PCY:

    def __init__(self, min_support=0.2, min_confidence=0.5, num_buckets=100):
        """
        Params and Attributes:
            min_support: float, mimimum support for frequent itemsets
            min_confidence: float, minimum confidence for association rules
            num_buckets: int, number of buckets for PCY hash function

            spark: SparkSession, SparkSession object
            df: DataFrame, DataFrame of baskets
            fq_pair_df: DataFrame, DataFrame of frequent pairs
            association_rules_df: DataFrame, DataFrame of association rules
            fq_item_count_dict: dict, dictionary of frequent items
                                                        and their counts
            fq_bucket_count_dict: dict, dictionary of frequent buckets
                                                        and their counts
        """
        self.min_support = min_support
        self.min_confidence = min_confidence
        self.num_buckets = num_buckets

        self.spark = SparkSession.builder.getOrCreate()
        self.df = None
        self.fq_pair_df = None
        self.association_rules_df = None
        self.min_count = 1
        self.fq_item_count_dict = dict()
        self.fq_bucket_count_dict = dict()

    def fit(self, df):
        """
        Fit the model to the data.
        Finding frequent items in Pass 1
        Finding frequent pairs in Pass 2
        Finding association rules

        Params:
            df: DataFrame, DataFrame of baskets
        """

        self.df = df

        """Miminum count for frequent itemsets"""
        self.min_count = int(self.min_support * self.df.count())

        self.pass1()
        self.pass2()
        self.association_rules()


    def pass1(self):
        """
        Finding frequent items and frequent buckets in Pass 1
        """
        self.item_count()
        self.bucket_count()

    def item_count(self):

        """
        Item Dataframe contains item name and its count.
        This step is as know as finding candidate items, C1
        """
        item_df = self.df.select(F.explode(self.df.columns[0]).alias("Item"))

        """Finding frequent items, L1"""
        item_count_df = (item_df.groupBy("Item").count()
                    .filter(F.col("count") >= self.min_count))

        """
        Handle cases where the frequent item DataFrame is empty,
        indicating that no items meet the minimum frequency threshold.
        """
        if item_count_df.rdd.isEmpty():
            raise ValueError("No frequent items found with the " +
                                "given minimum support.")
        self.fq_item_count_dict = (item_count_df.rdd.collectAsMap())


    @staticmethod
    def hash_pair(item1: str, item2: str, num_buckets: int) -> int:
        """Hash function for pairs"""

        def hash_item(item):
            """
            Hash function for an item.

            Each character in item string will convert to ASCII value,
            then multiples it by 31 ^ its position in the string.

            This function ensures hash values would intend to distribute
            into buckets uniformly as possible and amplifies the position of
            each character.
            """
            return sum(ord(ch) * 31**i for i, ch in enumerate(item))

        return (hash_item(item1) ^ hash_item(item2)) % num_buckets


    def bucket_count(self):
        """
        Finding frequent buckets in Pass 1

        Because we have already found frequent items by PySpark DataFrame.
        We just consider pairs of those frequent items and hash these pairs
        to the buckets. This helps us use memory efficiently.

        Note that original PCY hash all pairs of a basket to buckets
        without considering whether they are frequent or not.
        """

        """
            Could not pass "self" into transformation and operation function.
            We need some variable to capture those attributes.
        """
        fq_items = list(self.fq_item_count_dict.keys())
        num_buckets = self.num_buckets
        min_count = self.min_count
        hash_pair = PCY.hash_pair

        def hash_pair_to_bucket(items):
            """
            Create RDD, its key=bucket, value=1.

            This is for counting the number of pairs in
            each bucket for transformation function in later.
            """
            buckets = []
            n = len(items)
            for i in range(n - 1):
                for j in range(i + 1, n):
                    if items[i] in fq_items and items[j] in fq_items:
                        bucket = hash_pair(items[i], items[j], num_buckets)
                        buckets.append((bucket, 1))
            return buckets


        """Finding frequent buckets, B1"""
        self.fq_bucket_count_dict = (self.df.rdd
                    .flatMap(lambda basket: hash_pair_to_bucket(basket[0]))
                    .reduceByKey(lambda x, y: x + y)
                    .filter(lambda x: x[1] >= min_count)
                    .collectAsMap())


    def pass2(self):

        hash_pair = PCY.hash_pair
        num_buckets = self.num_buckets
        fq_items = list(self.fq_item_count_dict.keys())
        fq_buckets = list(self.fq_bucket_count_dict.keys())
        min_count = self.min_count

        def find_fq_pairs(items):
            """
            Finding frequent pairs of each basket
            """

            pairs = []
            n = len(items)

            """
            Iterate through all distinct pairs of items in the basket.
            Check if the hash value of each pair exists in the hash table.
            """
            for i in range(n - 1):
                for j in range(i + 1, n):
                    if items[i] in fq_items and items[j] in fq_items:
                        bucket = hash_pair(items[i], items[j], num_buckets)
                        if bucket in fq_buckets:
                            pairs.append((items[i], items[j]))
            return pairs

        fq_pair_count = (self.df.rdd
                    .flatMap(lambda basket: find_fq_pairs(basket[0]))
                    .map(lambda x: (tuple(sorted(x)), 1))
                    .reduceByKey(lambda x, y: x + y)
                    .filter(lambda x: x[1] >= min_count))

        # xem lại chưa khai báo fq_pair_count_(dict)
        self.fq_pair_count = fq_pair_count.collectAsMap()

        self.create_fq_pair_df(fq_pair_count)

    def create_fq_pair_df(self, rdd):
        """
        Create DataFrame of frequent pairs

        Params:
            rdd: RDD, RDD of frequent pairs
        """

        schema = StructType([
            StructField("Pair", ArrayType(StringType())),
            StructField("Support", IntegerType())
        ])

        self.fq_pair_df = (spark.createDataFrame(rdd
                            .map(lambda x: Row(Pair=list(x[0]), Support=x[1])),
                                                 schema))

    def association_rules(self):
        """
        Finding association rules from frequent pairs
        """

        fq_item_count_dict = self.fq_item_count_dict
        min_confidence = self.min_confidence

        def generate_association_rules(pair, pair_support):
            """
            Generate association rules from frequent pairs
            """

            rules = []

            item1, item2 = pair
            item1_support = fq_item_count_dict[str(item1)]
            item2_support = fq_item_count_dict[str(item2)]

            """Case 1: item1 -> item2"""
            if item1_support > 0:
                confidence = pair_support / item1_support
                if confidence >= min_confidence:
                    rules.append((item1, item2, confidence))

            """Case 2: item2 -> item1"""
            if item2_support > 0:
                confidence = pair_support / item2_support
                if confidence >= min_confidence:
                    rules.append((item2, item1, confidence))

            return rules

        """
        Create Association rules user-defined function in PySpark DataFrame
        """
        generate_association_rules_udf = F.udf(
            lambda pair, pair_support:
            generate_association_rules(pair, pair_support),

            ArrayType(
                StructType([
                    StructField("Antecedent", StringType(), False),
                    StructField("Consequence", StringType(), False),
                    StructField("Confidence", FloatType(), False)
                ])
            )
        )

        """
        Create Association rules DataFrame
        """
        self.association_rules_df = (self.fq_pair_df
                    .withColumn("Rules",
                    generate_association_rules_udf("Pair", "Support"))
                    .select(F.explode("Rules").alias("Rule"))
                    .select("Rule.*")
                    .dropna())



# Test PCY

In [37]:
data = df.select(F.col("Basket").cast(ArrayType(StringType())))

In [39]:

pcy = PCY(min_support=0.005, min_confidence=0.05, num_buckets=13)
pcy.fit(data)

Traceback (most recent call last):
  File "/content/spark-3.1.1-bin-hadoop3.2/python/pyspark/serializers.py", line 437, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/content/spark-3.1.1-bin-hadoop3.2/python/pyspark/cloudpickle/cloudpickle_fast.py", line 72, in dumps
    cp.dump(obj)
  File "/content/spark-3.1.1-bin-hadoop3.2/python/pyspark/cloudpickle/cloudpickle_fast.py", line 540, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/content/spark-3.1.1-bin-hadoop3.2/python/pyspark/cloudpickle/cloudpickle_fast.py", line 630, in reducer_override
    return self._function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/content/spark-3.1.1-bin-hadoop3.2/python/pyspark/cloudpickle/cloudpickle_fast.py", line 503, in _function_reduce
    return self._dynamic_function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/content/spark-3.1.1-bin-hadoop3.2/python/

PicklingError: Could not serialize object: IndexError: tuple index out of range

# Association rules that generated by PCY

In [None]:
pcy.association_rules_df.show(5, truncate=False)

+-----------+-----------+----------+
|Antecedent |Consequence|Confidence|
+-----------+-----------+----------+
|rolls/buns |whole milk |0.12697448|
|whole milk |rolls/buns |0.08844689|
|canned beer|whole milk |0.12820514|
|frankfurter|whole milk |0.139823  |
|sausage    |yogurt     |0.0952381 |
+-----------+-----------+----------+
only showing top 5 rows



# Frequent pairs that generated by PCY

In [None]:
pcy.fq_pair_df.show(5, truncate=False)

+-------------------------+-------+
|Pair                     |Support|
+-------------------------+-------+
|[rolls/buns, whole milk] |209    |
|[canned beer, whole milk]|90     |
|[frankfurter, whole milk]|79     |
|[sausage, yogurt]        |86     |
|[pip fruit, rolls/buns]  |74     |
+-------------------------+-------+
only showing top 5 rows



# Validate by FPGrowth

In [23]:
from pyspark.ml.fpm import FPGrowth

fpGrowth = FPGrowth(itemsCol="Basket", minSupport=0.2, minConfidence=0.15)
model = fpGrowth.fit(data)

In [None]:
model.freqItemsets.filter(F.size("items") >= 2).show(5, truncate=False)

+------------------------------+----+
|items                         |freq|
+------------------------------+----+
|[other vegetables, whole milk]|222 |
|[rolls/buns, other vegetables]|158 |
|[rolls/buns, whole milk]      |209 |
|[soda, rolls/buns]            |121 |
|[soda, other vegetables]      |145 |
+------------------------------+----+
only showing top 5 rows



In [24]:
model.associationRules.show(5, truncate=False)

+----------+----------+----------+----+-------+
|antecedent|consequent|confidence|lift|support|
+----------+----------+----------+----+-------+
+----------+----------+----------+----+-------+

