# Setup

Ensure that you have copied the "order_details.csv" file to the Files section

In [2]:
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install -q bitarray

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/288.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m92.2/288.3 kB[0m [31m2.4 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m288.3/288.3 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h

In [6]:
import os
os.environ["SPARK_HOME"] = "spark-3.1.1-bin-hadoop3.2"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

FILE_PATH = "order_details.csv"

In [4]:
import findspark
findspark.init()

In [5]:
from pyspark import SparkContext

sc = SparkContext("local", "PCY")
sc.setLogLevel("WARN")

# Introduction to PCY Algorithm

The PCY (Park-Chen-Yu) algorithm is an efficient method for finding frequent itemsets in large datasets. It belongs to the family of Apriori-based algorithms, which are widely used for association rule mining.

## Algorithm Overview

1. **Pass 1: Counting Frequencies**
   - Count occurrences of each item in baskets.
   - Create hash buckets for pairs and count their occurrences.

2. **Pass 2: Filtering and Counting**
   - Filter frequent items based on a support threshold.
   - Create bit vectors for frequent pairs and filter candidate pairs based on frequent items and the bit vector.

3. **Generating Frequent Pairs**
   - Generate frequent pairs based on the support threshold.

4. **Generating Association Rules**
   - Calculate confidence for association rules based on frequent pairs and frequent item counts.

## Results

The PCY algorithm provides two main results:

1. **Frequent Pairs:**
   - Lists the pairs of items that occur frequently in the dataset along with their support count.

2. **Association Rules:**
   - Lists the association rules between itemsets with a confidence above a specified threshold. An association rule typically takes the form "If {A} then {B}", where A and B are itemsets, and the confidence represents the likelihood of B occurring given the occurrence of A.

## Example Usage

```python
pcy = PCY(FILE_PATH, 0.1, 0.3)
pcy.run()
```

In this example, FILE_PATH is the path to the dataset, 0.1 is the support threshold, and 0.3 is the confidence threshold.



In [7]:
from bitarray import bitarray
from pyspark.sql import SQLContext
from itertools import combinations

class PCY:
  def __init__(self, path, support_threshold, confidence_threshold):
    self.spark = SQLContext(sc)
    self.path = path
    self.support_threshold = support_threshold
    self.confidence_threshold = confidence_threshold

  def read_data(self):
    df = self.spark.read.csv(self.path, header=True, inferSchema=True)
    baskets = df.groupBy("order_id") \
                .agg({"product_name": "collect_list"}) \
                .withColumnRenamed("collect_list(product_name)", "Basket")

    self.baskets = baskets.select("Basket").rdd.flatMap(lambda x: x).map(lambda x: sorted(x))
    self.baskets_count = self.baskets.count()
    print('Number of baskets: %i\n' % self.baskets_count)

    self.pairs = self.baskets.flatMap(lambda x: list(combinations(x, 2)))

  def pass_1(self):
    # Counting occurrences of each item in baskets
    self.item_counts = self.baskets.flatMap(lambda x: [(item, 1) for item in x]) \
                                   .reduceByKey(lambda x, y: x + y)

    # Creating hash buckets for pairs and counting occurrences
    self.buckets = self.pairs.map(lambda x: (hash(tuple(x)) % 1000 , 1)) \
                             .reduceByKey(lambda x, y: x + y)

  def pass_2(self):
    support = self.support_threshold * self.baskets_count
    # Filtering frequent items based on support threshold
    self.frequent_items = self.item_counts.filter(lambda x: x[1] >= support).collect()

    # Extracting item IDs from frequent items
    frequent_items = list(map(lambda x: x[0],self.frequent_items))
    # Creating bit vectors for frequent pairs
    bitvector_list = self.buckets.map(lambda x: (x[0], 1 if x[1] >= support else 0)).collect()

    bitvector = bitarray(1000)
    bitvector.setall(0)

    for item in bitvector_list:
      bitvector[item[0]] = 1 if item[1] == 1 else 0

    bitvector_list.clear()

    # Filtering candidate pairs based on frequent items and bit vector
    self.candidate_pairs = self.pairs.filter(lambda pair: pair[0] in frequent_items and pair[1] in frequent_items) \
                                     .filter(lambda pair: bitvector[hash(tuple(pair)) % 1000] == 1) \
                                     .map(lambda pair: (pair, 1)) \
                                     .reduceByKey(lambda x, y: x + y)

  def generate_frequent_pairs(self):
    support = self.support_threshold * self.baskets_count
    self.frequent_pairs = self.candidate_pairs.filter(lambda x: x[1] >= support)

    if(self.frequent_pairs.count() > 0):
      result = self.frequent_pairs.map(lambda x: (", ".join(x[0]), x[1])) \
                                  .toDF(['frequent_pairs', 'freq'])
      print('\nFrequent Pairs: ')
      result.show(truncate=False)
    else:
      print("There are no frequent pairs with this support threshold")

  def generate_association_rules(self):
    frequent_items = self.frequent_items
    confidence_threshold = self.confidence_threshold
    def calculate_confidence(freq, freq_a):
        return freq / freq_a

    def find_item_freq(item):
      return next((v for k, v in frequent_items if k == item), 0)

    pairs = self.frequent_pairs

    # Calculating confidence for association rules
    association_rules = pairs.flatMap(lambda x: [((x[0][0], x[0][1]), (x[1],find_item_freq(x[0][0]))),
                                                 ((x[0][1], x[0][0]), (x[1], find_item_freq(x[0][1])))]) \
                             .map(lambda x: (x[0], calculate_confidence(x[1][0], x[1][1]))) \
                             .filter(lambda x: x[1] >= confidence_threshold) \
                             .map(lambda x: (x[0][0], x[0][1], x[1]))

    if(association_rules.count() > 0):
      result = association_rules.toDF(['antecedent', 'consequent', 'confidence'])
      print('\nAssociation Rules: ')
      result.show()
    else:
      print("There are no association rules with this confidence")

  def run(self):
    self.read_data()
    self.pass_1()
    self.pass_2()

    self.generate_frequent_pairs()
    self.generate_association_rules()

# Example usage:
pcy = PCY(FILE_PATH, 0.1, 0.3)
pcy.run()


Number of baskets: 77


Frequent Pairs: 
+---------------------------------------------------------------------------------------+----+
|frequent_pairs                                                                         |freq|
+---------------------------------------------------------------------------------------+----+
|Banana Fish - Tập 20, Đập Chắn Thái Bình Dương                                         |13  |
|Trump - Đừng Bao Giờ Bỏ Cuộc, Đường Xa Nắng Mới                                        |8   |
|Đường Xa Nắng Mới, Đập Chắn Thái Bình Dương                                            |8   |
|Viết Sao Cho Hay, Đập Chắn Thái Bình Dương                                             |10  |
|Trump - Đừng Bao Giờ Bỏ Cuộc, Đập Chắn Thái Bình Dương                                 |9   |
|Attack On Titan - Tập 5, Đập Chắn Thái Bình Dương                                      |9   |
|Attack On Titan - Tập 5, Banana Fish - Tập 20                                          |11  |
|Banana F