#References

https://drive.google.com/drive/u/2/folders/1hn-FP6oeMfoVJ3hXeX3MOsfUHpoLXZ2Z

PCY page 39

https://www.geeksforgeeks.org/pcy-algorithm-in-big-data/

https://docs.google.com/document/d/1TPvw6hDHD95FTpEAoQv5RgwVLgjCMiP2FYnBPEqbjEs/edit?usp=sharing

• The algorithm is organized in form of OOP classes to support software deployment.

Refer to the FPGrowth class of PySpark for examples.

# Store data in Google Drive

##Connect into my drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
secret_path='/content/drive/MyDrive/baskets.csv'

#Using PySpark DataFrames to identify baskets (sets of items bought a customer in a date).

##Import neccessary library

In [None]:
!pip install pyspark



In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import  lit,concat_ws,collect_set,col, when,explode, array, struct, sort_array,split,desc,expr,row_number
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
import itertools
import numpy as np
from collections import defaultdict
from os import truncate
from pyspark.sql.window import Window

##Read file_csv

In [None]:
spark = SparkSession.builder.appName("MarketBasket").getOrCreate()
baskets_df=spark.read.csv('/content/drive/MyDrive/baskets.csv', header=True)

In [None]:
baskets_df.printSchema() #-> đều là kiểu String nên phải sử lí type

root
 |-- Member_number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- itemDescription: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- day_of_week: string (nullable = true)



In [None]:
row_count = baskets_df.count()  # Đếm số dòng

print(f"Số dòng trong DataFrame: {row_count}")

Số dòng trong DataFrame: 38765


##Show data

In [None]:
print(baskets_df.show(5))#Tốn Ram nhưng đối với dùng show nhiều khi show toàn bộ

+-------------+----------+----------------+----+-----+---+-----------+
|Member_number|      Date| itemDescription|year|month|day|day_of_week|
+-------------+----------+----------------+----+-----+---+-----------+
|         1249|01/01/2014|    citrus fruit|2014|    1|  1|          2|
|         1249|01/01/2014|          coffee|2014|    1|  1|          2|
|         1381|01/01/2014|            curd|2014|    1|  1|          2|
|         1381|01/01/2014|            soda|2014|    1|  1|          2|
|         1440|01/01/2014|other vegetables|2014|    1|  1|          2|
+-------------+----------+----------------+----+-----+---+-----------+
only showing top 5 rows

None


In [None]:
print(baskets_df.printSchema)

<bound method DataFrame.printSchema of DataFrame[Member_number: string, Date: string, itemDescription: string, year: string, month: string, day: string, day_of_week: string]>


##Group data

* Đã kiểm tra thì một khách hàng có sự trùng lặp mua đồ cùng 1 ngày

* Đã kiểm tra dữ liệu không có null

In [None]:
Custom_oneday_df = baskets_df.groupBy("Date", "Member_number") \
               .agg(collect_set("itemDescription").alias("Basket"))

In [None]:
from pyspark.sql.functions import size, avg

# Đếm số lượng khách hàng trong mỗi ngày
Custom_oneday_df = Custom_oneday_df.withColumn("num_baskets", size("Basket"))

# Tính trung bình số khách hàng mỗi ngày
avg_customers_per_day = Custom_oneday_df.select(avg("num_baskets")).collect()[0][0]
#Trung bình mỗi ngày có 2.54 sản phẩm. -> sparse (chưa phản ánh tốc độ chạy)
print(f"📊 Trung bình mỗi ngày có {avg_customers_per_day:.2f} sản phẩm .")

📊 Trung bình mỗi ngày có 2.54 sản phẩm .


In [None]:
Custom_oneday_df.show(5,truncate=False)

+----------+-------------+----------------------------------------+
|Date      |Member_number|Basket                                  |
+----------+-------------+----------------------------------------+
|01/01/2014|1249         |[citrus fruit, coffee]                  |
|01/01/2014|1381         |[curd, soda]                            |
|01/01/2014|1440         |[yogurt, other vegetables]              |
|01/01/2014|1659         |[specialty chocolate, frozen vegetables]|
|01/01/2014|1789         |[candles, hamburger meat]               |
+----------+-------------+----------------------------------------+
only showing top 5 rows



#Build dataset

In [4]:
class Dataset:
    '''

         Lớp Dataset chịu trách nhiệm đọc dữ liệu từ file CSV và xử lý dữ liệu giao dịch.

    '''

    def __init__(self, path: str):
        '''

            Khởi tạo Dataset với đường dẫn đến file CSV.

            Args:
                path (str): Đường dẫn đến file dữ liệu giao dịch.
                spark (SparkSession): SparkSession để tạo DataFrame.
                df (DataFrame): DataFrame chứa dữ liệu giao dịch.

        '''
        self.spark: SparkSession = SparkSession.builder.appName("MarketBasket").getOrCreate()
        self.path: str = path
        self.df: DataFrame | None = None  # Biến lưu trữ DataFrame của dữ liệu giao dịch

    def read_file(self) -> DataFrame:
        '''

            Đọc dữ liệu từ file CSV và lưu vào biến self.df.

            Returns:
                DataFrame: Dữ liệu giao dịch dưới dạng Spark DataFrame.

        '''
        # print("📥 Đang đọc dữ liệu từ CSV...")
        self.df = self.spark.read.csv(self.path, header=True)
        return self.df

    def get_row_count(self,custom_df) -> int:
        '''

            Đếm số dòng (số lượng giao dịch) trong DataFrame.

            Returns:
                int: Số lượng giao dịch.

            Raises:
                ValueError: Nếu dữ liệu chưa được đọc.

        '''
        if custom_df is not None:
            return custom_df.count()
        raise ValueError("❌ Data chưa được load. Hãy gọi `read_file()` trước.")

    def group_data(self) -> DataFrame:
        '''

              Nhóm dữ liệu theo ngày và khách hàng để tạo các giỏ hàng (baskets).

              Returns:
                  DataFrame: DataFrame chứa danh sách sản phẩm mà mỗi khách hàng đã mua trong từng ngày.

              Raises:
                  ValueError: Nếu dữ liệu chưa được đọc.

        '''
        if self.df is None:
            raise ValueError("❌ Data chưa được load. Hãy gọi `read_file()` trước.")

        # print("🔄 Đang nhóm dữ liệu theo ngày và khách hàng...")
        return self.df.groupBy("Date", "Member_number") \
                      .agg(collect_set("itemDescription").alias("Basket"))

#Build class PCY

**PCY algorithm**

• Implement the PCY algorithm to identify frequent pairs and generate association rules based on a given support threshold s and a confidence threshold c. Describe, in details, the hashing function and bucket management.

In [7]:
class PCY:
    '''

         Lớp PCY triển khai thuật toán PCY để tìm tập phổ biến trong dữ liệu giao dịch.

    '''

    def __init__(self, dataset: Dataset, num_buckets: int, support_threshold: float, confidence_threshold: float):

        '''

            Khởi tạo thuật toán PCY với dữ liệu đầu vào từ Dataset.

            Args:
                dataset (Dataset): Đối tượng Dataset chứa dữ liệu giao dịch.
                num_buckets (int): Số lượng bucket trong thuật toán.
                support_threshold (float): Ngưỡng phổ biến.
                num_baskets (int): Tổng số lượng giao dịch.
                frequent_1_item (DataFrame): DataFrame chứa các mục phổ biến có chứa 1 phần tử.
                buckets (DataFrame): Biến lưu trữ các bucket.
                min_support (float): Ngưỡng hỗ trợ tối thiểu.
                confidence_threshold (float): Ngưỡng tin cậy.
                frequent_pairs (DataFrame): Tập các cặp phổ biến.

        '''

        self.dataset: Dataset = dataset  # Đối tượng Dataset quản lý dữ liệu giao dịch
        self.num_buckets: int = num_buckets  # Số lượng  bucket
        self.num_baskets: int | None = None  # Tổng số lượng giao dịch
        self.support_threshold: float = support_threshold  # Ngưỡng phổ biến
        self.min_support: float = 0.0  # Ngưỡng hỗ trợ tối thiểu
        self.buckets: DataFrame | None = None  # Biến lưu trữ các bucket
        self.frequent_1_item: DataFrame | None = None # Tập item đơn lẻ phổ biến
        self.confidence_threshold: float = confidence_threshold  # Ngưỡng tin cậy
        self.frequent_pairs: DataFrame | None = None  # Tập các cặp phổ biến
        self.association_rules: DataFrame | None = None  # Tập các luật kết hợp

    def filter_item_list(self, one_item_df: DataFrame) -> DataFrame:

        '''

            Lọc các mục phổ biến dựa trên ngưỡng hỗ trợ (support_threshold).

            Args:
                one_item_df (DataFrame): DataFrame chứa các mục và số lần xuất hiện (count).

            Returns:
                DataFrame: DataFrame chứa các mục phổ biến với cột probability.

        '''

        return (one_item_df
                .withColumn("support count", F.col("count") )
                .withColumn("probability", F.col("count") / self.num_baskets)
                .filter(F.col("probability") >= self.support_threshold))


    def preprocess_and_hash(self, Custom_oneday_df: DataFrame) -> DataFrame:

        '''

            Kết hợp đếm item đơn lẻ và tạo hash table cho buckets.

            Args:
                Custom_oneday_df (DataFrame): DataFrame giỏ hàng sau khi nhóm theo ngày, khách hàng.

            Returns:
                DataFrame: DataFrame item đơn lẻ và số lần xuất hiện >= 2

        '''

        if self.num_baskets is None:
            raise ValueError("❌ num_baskets chưa được tính. Hãy gọi run() trước.")

        # Đếm item đơn lẻ
        one_item_df = (
            Custom_oneday_df
            .select(F.explode("basket").alias("item"))
            .groupBy("item")
            .agg(F.count("*").alias("count"))
            .filter(F.col("count") >= 2)
        )

        # Tạo hash table (bucket)
        hash_pairs = (
            Custom_oneday_df
            .select(F.expr(f"""
                flatten(
                    transform(basket, x ->
                        transform(filter(basket, y -> y > x), y ->
                            array(x, y, abs(hash(x) + hash(y)) % {self.num_buckets})
                        )
                    )
                ) as pairs
            """))
        )

        self.buckets = (
            hash_pairs
            .select(F.explode("pairs").alias("pair"))
            .select(
                F.col("pair")[0].alias("item1"),
                F.col("pair")[1].alias("item2"),
                F.col("pair")[2].alias("bucket_index")
            )
            .groupBy("bucket_index")
            .agg(
                F.collect_set(F.concat(F.col("item1"), F.lit("|"), F.col("item2"))).alias("pairs"),
                F.count("*").alias("support")
            )
        )

        return one_item_df

    def bit_map(self) -> None:

          '''

               Bit vector - Nếu support của bucket lớn hơn bằng ngưỡng thì bit vector là 1 nếu không 0. (chủ yếu là 1)

               Updates:

                         self.buckets (DataFrame): Thêm số 0 và 1.

          '''

          self.buckets = self.buckets.withColumn(  "bit_vector",when(col("support") >= self.min_support, 1).otherwise(0))

    def linked_filter(self, one_item_df: DataFrame) -> None:

          '''

              Trỏ đến chức năng lọc mà không được trực tiếp dùng đến hàm lọc.

              Args:
                  one_item_df (DataFrame): DataFrame chứa các mục và số lần xuất hiện (count).

          '''

          # print("✅ Đang lọc các mục không phổ biến.")

          one_item_df = self.filter_item_list(one_item_df)
          # print("✅ Đã lọc item đơn lẻ bởi support_threshold.")

          # Tạo frequent_items_df với item và probability
          self.frequent_1_item = one_item_df.select("item", "support count" ,"probability")
          self.frequent_1_item = self.frequent_1_item.orderBy(desc("probability"))

    def counts_of_candidate_pairs(self) -> None:

          '''

                  Đếm những cặp phổ biến trong tập dữ liệu    **


                  Updates:

                            self.frequent_pair (DataFrame): Thêm cột support count.
          '''

          # Bước 1: Duyệt qua baskets
          basket_pairs = (
              self.dataset.group_data()
              .withColumn("pairs", expr("""
                  flatten(
                      transform(Basket, x ->
                          transform(Basket, y ->
                              IF(x < y, array(x, y), NULL)
                          )
                      )
                  )
              """))  # Tạo danh sách cặp (x, y) và đảm bảo cặp luôn có thứ tự (x < y)
              .withColumn("pairs", expr("FILTER(pairs, x -> x[0] != x[1])"))  # Loại bỏ cặp (x, x)
              .withColumn("pair", explode(col("pairs")))  # Phân rã danh sách cặp
              .select("pair")  # Chỉ lấy cột chứa cặp
          )

          # Bước 2: Lọc các cặp phổ biến từ các cặp sinh ra trong giỏ hàng
          filtered_pairs = (
              basket_pairs
              .join(self.frequent_pairs, col("pair") == expr("array(item1, item2)"), "inner")
          )

          # Bước 3: Đếm số lần xuất hiện của từng cặp
          self.frequent_pairs = (
              filtered_pairs
              .groupBy("pair")
              .count()
              .withColumnRenamed("count", "support_count")
          )

          # Bước 4: Tính probability

          self.frequent_pairs = self.frequent_pairs.withColumn(
              "probability", col("support_count") / self.num_baskets
          ).filter(col("probability") >= self.support_threshold)

          # Bước 5: Giữ lại ba cột: pairs, support_count, probability
          self.frequent_pairs = self.frequent_pairs.select("pair", "support_count", "probability")

          # Bước 6: Sắp xếp theo support_count giảm dần
          self.frequent_pairs = self.frequent_pairs.orderBy(desc("support_count"))

    def create_frequent_pair(self) -> None:

          '''

              Tạo ra một tập các cặp phổ biến và độ hỗ trợ ứng với cặp đó

              Updates:

                      self.frequent_pairs (DataFrame): Tập các cặp phổ biến

          '''

          # Tìm các pairs (i,j) từ frequent buckets trong không gian bitmap (số nguyên 4 bytes đếm được thay thế bằng các bit vectơ bit -> giảm xuống 1/32 bộ nhớ)

          pairs_frequent_buckets = (
              self.buckets
              .filter(col("bit_vector") == 1)
              .select(explode(col("pairs")).alias("pair"))
              .select(split(col("pair"), "\\|").alias("items"))
              .select(sort_array(array(col("items")[0], col("items")[1])).alias("frequent_pairs"))
          )

          # pair_frequent_buckets.show(truncate=False)
          # print(f"Số lượng pairs trong bucket phổ biến : {pairs_frequent_buckets.count()}") -> 5370

          # Lọc các pairs (i,j) phổ biến bằng cách xét cả hai i và j đều phổ biến -> candidate_pairs

          self.frequent_pairs = (
            pairs_frequent_buckets
            .join(self.frequent_1_item.alias("f1"), pairs_frequent_buckets.frequent_pairs[0] == col("f1.item"))
            .join(self.frequent_1_item.alias("f2"), pairs_frequent_buckets.frequent_pairs[1] == col("f2.item"))
            .select("frequent_pairs")
            .distinct()
            )

          # self.frequent_pairs.show(truncate=False)
          # print(f"Số lượng pairs ứng viên : {self.frequent_pairs.count()}") -> 5217

          # Tách pair thành hai cột item1 và item2

          self.frequent_pairs = self.frequent_pairs.withColumn("item1", col("frequent_pairs")[0]) \
                                .withColumn("item2", col("frequent_pairs")[1]) \

          # Đếm các candidate pairs

          self.counts_of_candidate_pairs()

          # self.frequent_pairs.show(truncate=False)

    def generate_association_rules(self) -> None:

          '''

              Tạo ra các luật kết hợp (association rules) từ các tập hợp phổ biến.

          '''

          # print("🔄 Đang tạo luật kết hợp...")

          # Tách các cặp `pair` thành hai cột `antecedent` và `consequent`
          self.association_rules = self.frequent_pairs.withColumn("antecedent", col("pair")[0]) \
                                                  .withColumn("consequent", col("pair")[1]) \
                                                  .withColumn("support_count_pair", col("support_count")) \
                                                  .drop("pair", "support_count", "probability")

          self.association_rules_rev = self.frequent_pairs.withColumn("antecedent", col("pair")[1]) \
                                                            .withColumn("consequent", col("pair")[0]) \
                                                            .withColumn("support_count_pair", col("support_count")) \
                                                            .drop("pair", "support_count", "probability")

          self.association_rules = self.association_rules.union(self.association_rules_rev)

          self.association_rules = self.association_rules.join(
              self.frequent_1_item.selectExpr("item as antecedent", "`support count` as antecedent_count", "probability as antecedent_support"),
              on="antecedent",
              how="left"
          )

          self.association_rules = self.association_rules.join(
              self.frequent_1_item.selectExpr("item as consequent", "`support count` as consequent_count", "probability as consequent_support"),
              on="consequent",
              how="left"
          )

          self.association_rules = (
              self.association_rules
              .select("antecedent", "consequent", "support_count_pair", "antecedent_count","antecedent_support", "consequent_support")
              .withColumn("support", col("support_count_pair") / self.num_baskets)
              .withColumn("confidence", col("support_count_pair") / col("antecedent_count"))
              .filter(col("confidence") >= self.confidence_threshold)
              .orderBy(desc("confidence"))
          )

          # print(self.association_rules.count())
          # print("✅ Đã tạo luật kết hợp.")

    def save_result(self) -> None:

          '''

              Lưu các item đơn lẻ phổ biến và các cặp phổ biển vào file .txt

          '''

          window_spec = Window.orderBy(desc("support_count"))

          # Lưu frequent_1_item (chỉ lấy cột item và support count)
          (self.frequent_1_item
              .select("item", "support count")
              .orderBy(desc("support count"))
              .coalesce(1)  # Lưu vào một file duy nhất
              .write
              .option("header", "true")
              .option("sep", ",")  # Phân tách bằng dấu phẩy
              .mode("overwrite")
              .csv("frequent_1_item.csv")
          )

          # Lưu frequent_pairs (giữ nguyên pair dưới dạng [])
          (self.frequent_pairs
              .withColumn("index", row_number().over(window_spec))
              .select("index",concat_ws(", ", self.frequent_pairs.pair).alias("pair"),"support_count")
              .coalesce(1)
              .write
              .option("header", "true")
              .option("sep", ",")
              .mode("overwrite")
              .csv("frequent_pairs.csv")
          )

          (self.association_rules
            .select("antecedent", "consequent", "support", "antecedent_support", "consequent_support","confidence")
            .orderBy(desc("confidence"))
            .coalesce(1)
            .write
            .option("header", "true")
            .option("sep", ",")
            .mode("overwrite")
            .csv("association_rules.csv")
           )
          (self.association_rules
          .selectExpr("concat('{',antecedent,'}', ' -> ', '{',consequent,'}') as rule")
          .write
          .mode("overwrite")
          .text("association_rules.txt")
          )
          # # print("✅ Đã lưu kết quả vào frequent_1_item.csv, frequent_pairs.csv và association_rules.csv")

    def fit(self) -> None:

          '''

              Chạy thuật toán PCY để tìm các mục phổ biến dựa trên dữ liệu giao dịch.

          '''

          # print("🚀 Bắt đầu thuật toán PCY...")

          # 1️⃣ Đọc dữ liệu từ file CSV
          self.dataset.read_file()

          # 2️⃣ Nhóm dữ liệu theo ngày và khách hàng để tạo giỏ hàng
          Custom_oneday_df = self.dataset.group_data()

          # 3️⃣ Tính tổng số lượng giao dịch (baskets) và ngưỡng phổ biến (support threshold)
          self.num_baskets = self.dataset.get_row_count(Custom_oneday_df)
          # print(f" Số lượng giao dịch: {self.num_baskets}") -> 14963
          self.min_support = self.support_threshold * self.num_baskets
          # print(f "Độ hỗ trợ tối thiểu: {self.min_support}") -> 14.963

          # ✅ Pass 1:

          # Đếm lượng item đơn lẻ
          # #Tạo hash table
          one_item_df = self.preprocess_and_hash(Custom_oneday_df)
          # print("📌 Một số bucket sau khi băm....")


          # Sắp xếp bucket tăng dần theo index
          (self.buckets
              .orderBy("bucket_index", ascending=True)
          )

          # print("\n📊 Thông tin buckets:")
          # self.buckets.show(10,truncate=False)
          # print(f"🔢 Tổng số bucket được tạo: {self.buckets.count()}")

          # ✅ Pass 2:

          # Frequent items
          self.linked_filter(one_item_df)
          # print("\nCác mục phổ biến (frequent_1_item) với probability:")
          # self.frequent_1_item.show()

          #Tạo bitmap
          self.bit_map()
          # print("✅ Đã tạo bitmap.")
          # Hiển thị vài buckets đầu tiên
          # self.buckets.show(10,truncate=False)

          #Đếm các cặp ứng cử viên
          self.create_frequent_pair()

          # Tạo association rules
          self.generate_association_rules()


          #Save result
          # self.save_result()

          # print("🎉 Hoàn thành thuật toán PCY!")


In [6]:
dataset = Dataset(secret_path)

In [8]:
pcy= PCY(dataset,support_threshold=0.001,num_buckets=1000,confidence_threshold=0.1)
pcy.fit()

In [None]:
pcy.frequent_pairs.show(truncate=False)

+----------------------------------+-------------+--------------------+
|pair                              |support_count|probability         |
+----------------------------------+-------------+--------------------+
|[other vegetables, whole milk]    |222          |0.014836596939116486|
|[rolls/buns, whole milk]          |209          |0.013967787208447505|
|[soda, whole milk]                |174          |0.011628684087415625|
|[whole milk, yogurt]              |167          |0.011160863463209249|
|[other vegetables, rolls/buns]    |158          |0.010559379803515338|
|[other vegetables, soda]          |145          |0.009690570072846355|
|[sausage, whole milk]             |134          |0.008955423377664907|
|[tropical fruit, whole milk]      |123          |0.008220276682483459|
|[other vegetables, yogurt]        |121          |0.008086613646995923|
|[rolls/buns, soda]                |121          |0.008086613646995923|
|[rolls/buns, yogurt]              |117          |0.007819287576

In [None]:
pcy.association_rules.show(truncate=False)

+-------------------------+----------+--------------------+-------------------+---------------------+-------------------+
|antecedent               |consequent|antecedent_support  |consequent_support |support              |confidence         |
+-------------------------+----------+--------------------+-------------------+---------------------+-------------------+
|semi-finished bread      |whole milk|0.009490075519615051|0.15792287642852368|0.001670787943594199 |0.176056338028169  |
|detergent                |whole milk|0.008621265788946068|0.15792287642852368|0.001403461872619127 |0.1627906976744186 |
|ham                      |whole milk|0.0171088685424046  |0.15792287642852368|0.0027400922274944863|0.16015624999999997|
|bottled beer             |whole milk|0.04531176903027468 |0.15792287642852368|0.007150972398583172 |0.15781710914454278|
|frozen fish              |whole milk|0.006816814809864332|0.15792287642852368|0.0010693042839002875|0.15686274509803924|
|candy                  

# Deploy ideas for each function

## Pass 1

In [None]:
def count_items(Custom_oneday_df) -> DataFrame:
  '''

      Function count item or items
      Input:
              Custom_oneday_df: DatFrame
      Output:
              Return DataFrame

  '''
  return  (Custom_oneday_df
      .select(F.explode("basket").alias("item"))
      .groupBy("item")
      .agg(F.count("*").alias("count"))
      .filter(F.col("count") >= 15)
      .withColumn("support", F.round(F.col("count") / 14963,4))
  )

In [None]:
df=count_items(Custom_oneday_df)

In [None]:
df.show()

+------------------+-----+-------+
|              item|count|support|
+------------------+-----+-------+
|pickled vegetables|  134|  0.009|
|         beverages|  248| 0.0166|
|    snack products|   27| 0.0018|
|           vinegar|   51| 0.0034|
|            dishes|  135|  0.009|
|          cat food|  177| 0.0118|
|    red/blush wine|  157| 0.0105|
|          cake bar|   92| 0.0061|
|   organic sausage|   22| 0.0015|
|           napkins|  331| 0.0221|
|              meat|  252| 0.0168|
| frozen vegetables|  419|  0.028|
|        white wine|  175| 0.0117|
|       frozen fish|  102| 0.0068|
|       frankfurter|  565| 0.0378|
|           mustard|   92| 0.0061|
| canned vegetables|   82| 0.0055|
|     cream cheese |  354| 0.0237|
|    kitchen towels|   30|  0.002|
|      canned fruit|   21| 0.0014|
+------------------+-----+-------+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import min, max

# Tính giá trị lớn nhất và nhỏ nhất của cột "count"
df_stats = df.agg(
    max("count").alias("max_count"),
    min("count").alias("min_count")
)

# Hiển thị kết quả
df_stats.show()


+---------+---------+
|max_count|min_count|
+---------+---------+
|     2363|        3|
+---------+---------+



## Pass 2

In [None]:
Custom_oneday_df.select('Basket')

+--------------------+
|              Basket|
+--------------------+
|[citrus fruit, co...|
|        [curd, soda]|
|[yogurt, other ve...|
|[specialty chocol...|
|[candles, hamburg...|
+--------------------+
only showing top 5 rows



In [None]:
from mlxtend.frequent_patterns import apriori
from mlxtend.frequent_patterns import association_rules
import pandas as pd
from mlxtend.preprocessing import TransactionEncoder

# Giả sử Custom_oneday_df là PySpark DataFrame
# Chuyển cột 'Basket' từ PySpark DataFrame sang danh sách Python
transactions = [row['Basket'] for row in Custom_oneday_df.select('Basket').collect()]

# Chuyển dữ liệu thành định dạng One-Hot Encoding
te = TransactionEncoder()
te_ary = te.fit(transactions).transform(transactions)
df = pd.DataFrame(te_ary, columns=te.columns_)

# In dữ liệu đã mã hóa
# print("Dữ liệu sau khi mã hóa:")
# print(df)
num_rows = len(df)  # hoặc df.shape[0]
print(f"Số dòng của df: {num_rows}")
# Áp dụng thuật toán Apriori
min_support = 0.001  # Ngưỡng hỗ trợ tối thiểu (40%)
frequent_itemsets = apriori(df, min_support=min_support, use_colnames=True)
frequent_itemsets = frequent_itemsets.sort_values(by='support', ascending=False)# In các tập hợp mục phổ biến
print(frequent_itemsets.count())
print("\nCác tập hợp mục phổ biến:")

# print(frequent_itemsets)
# Lưu frequent_itemsets vào file result_apriori.txt
with open('result_apriori_min_0.001.txt', 'w') as file:
    # Ghi tiêu đề
    file.write("Frequent Itemsets (min_support = 0.001):\n")
    file.write("Support\tItemsets\n")
    file.write("-" * 50 + "\n")

    # Ghi từng dòng của frequent_itemsets
    for index, row in frequent_itemsets.iterrows():
        support = row['support']
        itemsets = ', '.join(row['itemsets'])  # Chuyển frozenset thành chuỗi
        file.write(f"{support:.4f}\t{itemsets}\n")

print("\nĐã lưu kết quả vào file 'result_apriori_min_0.001.txt'")
# # Tạo luật kết hợp (association rules) từ các tập hợp phổ biến
# rules = association_rules(frequent_itemsets, metric="confidence", min_threshold=0.7)

# # In các luật kết hợp
# print("\nCác luật kết hợp:")
# print(rules)

Số dòng của df: 14963
support     750
itemsets    750
dtype: int64

Các tập hợp mục phổ biến:

Đã lưu kết quả vào file 'result_apriori_min_0.001.txt'


In [None]:
# Tạo luật kết hợp (association rules) từ các tập hợp phổ biến
rules = association_rules(frequent_itemsets, metric="confidence", min_threshold=0.001)

rules = rules.sort_values(by='support', ascending=False)
# In các luật kết hợp
print("\nCác luật kết hợp:")
rules


Các luật kết hợp:


Unnamed: 0,antecedents,consequents,antecedent support,consequent support,support,confidence,lift,representativity,leverage,conviction,zhangs_metric,jaccard,certainty,kulczynski
0,(whole milk),(other vegetables),0.157923,0.122101,0.014837,0.093948,0.769430,1.0,-0.004446,0.968928,-0.262461,0.055948,-0.032068,0.107730
1,(other vegetables),(whole milk),0.122101,0.157923,0.014837,0.121511,0.769430,1.0,-0.004446,0.958551,-0.254477,0.055948,-0.043241,0.107730
2,(rolls/buns),(whole milk),0.110005,0.157923,0.013968,0.126974,0.804028,1.0,-0.003404,0.964550,-0.214986,0.055000,-0.036752,0.107711
3,(whole milk),(rolls/buns),0.157923,0.110005,0.013968,0.088447,0.804028,1.0,-0.003404,0.976350,-0.224474,0.055000,-0.024222,0.107711
4,(soda),(whole milk),0.097106,0.157923,0.011629,0.119752,0.758296,1.0,-0.003707,0.956636,-0.260917,0.047776,-0.045329,0.096694
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1233,"(soda, whole milk)",(rolls/buns),0.011629,0.110005,0.001002,0.086207,0.783666,1.0,-0.000277,0.973957,-0.218324,0.008310,-0.026739,0.047660
1234,"(rolls/buns, whole milk)",(soda),0.013968,0.097106,0.001002,0.071770,0.739091,1.0,-0.000354,0.972705,-0.263630,0.009107,-0.028061,0.041047
1235,(soda),"(rolls/buns, whole milk)",0.097106,0.013968,0.001002,0.010323,0.739091,1.0,-0.000354,0.996318,-0.281082,0.009107,-0.003696,0.041047
1236,(rolls/buns),"(soda, whole milk)",0.110005,0.011629,0.001002,0.009113,0.783666,1.0,-0.000277,0.997461,-0.236743,0.008310,-0.002545,0.047660


## Example FPGrowth

In [None]:
from pyspark.ml.fpm import FPGrowth
from pyspark import SparkContext
from pyspark.sql import SQLContext
fpGrowth = FPGrowth(itemsCol="Basket",
                    minSupport=0.001,
                    minConfidence=0.1)
model = fpGrowth.fit(Custom_oneday_df)

In [None]:
model.freqItemsets.orderBy("freq", ascending=False).show()

+--------------------+----+
|               items|freq|
+--------------------+----+
|        [whole milk]|2363|
|  [other vegetables]|1827|
|        [rolls/buns]|1646|
|              [soda]|1453|
|            [yogurt]|1285|
|   [root vegetables]|1041|
|    [tropical fruit]|1014|
|     [bottled water]| 908|
|           [sausage]| 903|
|      [citrus fruit]| 795|
|            [pastry]| 774|
|         [pip fruit]| 734|
|     [shopping bags]| 712|
|       [canned beer]| 702|
|      [bottled beer]| 678|
|[whipped/sour cream]| 654|
|        [newspapers]| 582|
|       [frankfurter]| 565|
|       [brown bread]| 563|
|              [pork]| 555|
+--------------------+----+
only showing top 20 rows



In [None]:
model.freqItemsets.orderBy("freq", ascending=False).count()

750

In [None]:
df = model.freqItemsets.withColumn("items", concat_ws(",", col("items")))

# Ghi ra CSV sau khi sắp xếp
df.orderBy(col("freq").desc()).write.csv("freqItemsets_sorted.csv", header=True)

In [None]:
model.associationRules.orderBy("confidence", ascending=False).show()

+--------------------+------------+-------------------+------------------+--------------------+
|          antecedent|  consequent|         confidence|              lift|             support|
+--------------------+------------+-------------------+------------------+--------------------+
|   [sausage, yogurt]|[whole milk]| 0.2558139534883721|1.6198663504217148|0.001470293390362...|
|[sausage, rolls/b...|[whole milk]|             0.2125| 1.345593525179856|0.001136135801644...|
|     [sausage, soda]|[whole milk]| 0.1797752808988764|1.1383739010113787|0.001069304283900...|
|[semi-finished br...|[whole milk]|  0.176056338028169|1.1148247930239072|0.001670787943594199|
|[yogurt, rolls/buns]|[whole milk]|0.17094017094017094|1.0824281751069733|0.001336630354875...|
|[sausage, whole m...|    [yogurt]|0.16417910447761194|1.9117602648237413|0.001470293390362...|
|         [detergent]|[whole milk]|0.16279069767441862| 1.030824041177455|0.001403461872619127|
|               [ham]|[whole milk]|     

In [None]:
from pyspark.sql.functions import concat_ws

# Biến đổi các mảng thành chuỗi
rules_fixed = rules_sorted.select(
    concat_ws(",", "antecedent").alias("antecedent"),
    concat_ws(",", "consequent").alias("consequent"),
    "confidence",
    "lift"
)

# Lưu vào CSV (coalesce nếu muốn một file duy nhất)
rules_fixed.coalesce(1).write.csv("association_rules_sorted.csv", header=True, mode="overwrite")


In [None]:
from pyspark.sql.functions import concat_ws

# Chuyển đổi các cột ARRAY<STRING> thành STRING
association_rules_df = model.associationRules \
    .withColumn("antecedent", concat_ws(", ", "antecedent")) \
    .withColumn("consequent", concat_ws(", ", "consequent"))

# Lưu vào file CSV (sắp xếp theo confidence giảm dần)
association_rules_df \
    .orderBy("confidence", ascending=False) \
    .coalesce(1) \
    .write \
    .option("header", "true") \
    .option("sep", ",") \
    .mode("overwrite") \
    .csv("1")


In [None]:
model.associationRules.orderBy("confidence", ascending=False).count()

131

In [None]:
# Chọn các cột cần lưu
columns_to_save = ["antecedents", "consequents", "antecedent support",
                   "consequent support", "support", "confidence"]

# Ghi ra file text (tab-separated)
rules[columns_to_save].to_csv("association_rules.txt", sep="\t", index=False)

print("Đã lưu luật kết hợp vào 'association_rules.txt'")


Đã lưu luật kết hợp vào 'association_rules.txt'


In [None]:
pairs_df = (Custom_oneday_df
        .select(F.expr("""
            flatten(
                transform(basket, x ->
                    transform(filter(basket, y -> y > x), y -> array(x, y))
                )
            ) as pairs
        """))
        .select(F.explode("pairs").alias("pair"))
        .select(
            F.col("pair")[0].alias("item1"),
            F.col("pair")[1].alias("item2")
        )
    )

In [None]:
pairs_df.show()

+--------------------+-------------------+
|               item1|              item2|
+--------------------+-------------------+
|        citrus fruit|             coffee|
|                curd|               soda|
|    other vegetables|             yogurt|
|   frozen vegetables|specialty chocolate|
|             candles|     hamburger meat|
|    other vegetables|     tropical fruit|
|       bottled water|            sausage|
|Instant food prod...|      bottled water|
|             cleaner|      shopping bags|
|       bottled water|      sliced cheese|
|       domestic eggs|     hamburger meat|
|        bottled beer|      domestic eggs|
|        bottled beer|     hamburger meat|
|   frozen vegetables|             yogurt|
|frozen potato pro...|     hamburger meat|
|      flower (seeds)|         whole milk|
|       bottled water| whipped/sour cream|
|             berries|      bottled water|
|             berries| whipped/sour cream|
|              dishes|             onions|
+----------

In [None]:
buckets = (Custom_oneday_df
        .select(F.expr("""
            flatten(
                transform(basket, x ->
                    transform(filter(basket, y -> y > x), y ->
                        array(x, y, abs(hash(x) + hash(y)) % 1000)
                    )
                )
            ) as pairs
        """))
        .select(F.explode("pairs").alias("pair"))
        .select(
            F.col("pair")[0].alias("item1"),
            F.col("pair")[1].alias("item2"),
            F.col("pair")[2].alias("bucket_index")
        )
        .groupBy("bucket_index")
        .agg(
            F.collect_set(F.concat(F.col("item1"), F.lit("|"), F.col("item2"))).alias("pairs"),
            F.count("*").alias("support")
        )
    )

In [None]:
print("\n📊 Thông tin bucket:")
(buckets
    .orderBy("bucket_index", ascending=True)
    .show(10, truncate=False)
)

print(f"🔢 Tổng số bucket được tạo: {buckets.count()}")


📊 Thông tin bucket:
+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|bucket_index|pairs                                                                                                                                                                                                                                                         |support|
+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|0           |[butter|decalcifier, UHT-milk|berries, bottled beer|kitchen towels, canned fruit|sausage]                                          

In [None]:
buckets_with_bit = buckets.withColumn(
    "bit_vector",
    when(col("support") >= 15, 1).otherwise(0)
)
buckets_with_bit= buckets_with_bit.orderBy(desc("support"))
# Hiển thị kết quả
buckets_with_bit.show()

+------------+--------------------+-------+----------+
|bucket_index|               pairs|support|bit_vector|
+------------+--------------------+-------+----------+
|         350|[dessert|soft che...|    333|         1|
|         961|[candles|seasonal...|    268|         1|
|         755|[rolls/buns|whole...|    238|         1|
|         613|[beef|curd cheese...|    218|         1|
|         898|[whole milk|yogur...|    199|         1|
|         578|[soda|whole milk,...|    190|         1|
|         252|[canned beer|fran...|    189|         1|
|         373|[margarine|season...|    186|         1|
|          87|[bottled water|fr...|    166|         1|
|         612|[cream cheese |pa...|    165|         1|
|         101|[cream cheese |wh...|    156|         1|
|         786|[citrus fruit|ice...|    155|         1|
|         801|[semi-finished br...|    149|         1|
|         500|[newspapers|whole...|    148|         1|
|         586|[sausage|specialt...|    143|         1|
|         

In [None]:
one_item_df=(Custom_oneday_df
            .select(F.explode("basket").alias("item"))
            .groupBy("item")
            .agg(F.count("*").alias("count"))
            .filter(F.col("count") >= 2)
        )
one_item_df = (one_item_df
                .withColumn("probability", F.round(F.col("count") / 14693,4))
                .filter(F.col("probability") >= 0.001))
print("✅ Đã lọc item đơn lẻ bởi support_threshold.")

# Tạo frequent_items_df với item và probability
frequent_itemsets = one_item_df.select("item", "probability")

✅ Đã lọc item đơn lẻ bởi support_threshold.


In [None]:
pairs_exploded = (
    buckets_with_bit
    .filter(col("bit_vector") == 1)
    .select(explode(col("pairs")).alias("pair"))
    .select(split(col("pair"), "\\|").alias("items"))
    .select(sort_array(array(col("items")[0], col("items")[1])).alias("sorted_pair"))
)

pairs_exploded.show(truncate=False)

+-------------------------------------------+
|sorted_pair                                |
+-------------------------------------------+
|[dessert, soft cheese]                     |
|[other vegetables, rolls/buns]             |
|[red/blush wine, tropical fruit]           |
|[cream cheese , newspapers]                |
|[beef, citrus fruit]                       |
|[bottled beer, whole milk]                 |
|[frankfurter, packaged fruit/vegetables]   |
|[hamburger meat, packaged fruit/vegetables]|
|[pickled vegetables, syrup]                |
|[curd, specialty bar]                      |
|[chocolate, packaged fruit/vegetables]     |
|[candles, seasonal products]               |
|[other vegetables, whole milk]             |
|[domestic eggs, root vegetables]           |
|[long life bakery product, newspapers]     |
|[dishes, mayonnaise]                       |
|[nuts/prunes, sausage]                     |
|[rolls/buns, whole milk]                   |
|[domestic eggs, photo/film]      

In [None]:
pairs_exploded.count()

5370

In [None]:
candidate_pairs = (
    pairs_exploded
    .join(frequent_itemsets.alias("f1"), pairs_exploded.sorted_pair[0] == col("f1.item"))
    .join(frequent_itemsets.alias("f2"), pairs_exploded.sorted_pair[1] == col("f2.item"))
    .select("sorted_pair")
    .distinct()
)

candidate_pairs.show(truncate=False)

+---------------------------------------+
|sorted_pair                            |
+---------------------------------------+
|[pastry, soda]                         |
|[hard cheese, pip fruit]               |
|[meat, other vegetables]               |
|[meat, white wine]                     |
|[house keeping products, newspapers]   |
|[onions, prosecco]                     |
|[cocoa drinks, whole milk]             |
|[frozen meals, spread cheese]          |
|[packaged fruit/vegetables, popcorn]   |
|[beef, meat spreads]                   |
|[butter, ketchup]                      |
|[house keeping products, spread cheese]|
|[female sanitary products, frankfurter]|
|[hard cheese, instant coffee]          |
|[flower soil/fertilizer, soda]         |
|[frankfurter, pudding powder]          |
|[citrus fruit, pudding powder]         |
|[frankfurter, waffles]                 |
|[beef, salt]                           |
|[packaged fruit/vegetables, pip fruit] |
+---------------------------------

In [None]:
candidate_pairs.count()

5217

In [None]:
candidate_pairs = candidate_pairs.withColumn("item1", col("sorted_pair")[0]) \
                                 .withColumn("item2", col("sorted_pair")[1]) \
                                 .drop("sorted_pair")

In [None]:
candidate_pairs.show(truncate=False)

+-------------------------+----------------+
|item1                    |item2           |
+-------------------------+----------------+
|butter                   |ketchup         |
|house keeping products   |spread cheese   |
|female sanitary products |frankfurter     |
|pastry                   |soda            |
|onions                   |prosecco        |
|packaged fruit/vegetables|pot plants      |
|packaged fruit/vegetables|popcorn         |
|hard cheese              |instant coffee  |
|house keeping products   |newspapers      |
|flower soil/fertilizer   |soda            |
|frankfurter              |pudding powder  |
|beef                     |salt            |
|jam                      |whole milk      |
|meat                     |other vegetables|
|hard cheese              |pip fruit       |
|cocoa drinks             |whole milk      |
|frankfurter              |waffles         |
|beef                     |meat spreads    |
|frozen meals             |spread cheese   |
|packaged 

In [None]:
from pyspark.sql.functions import col, expr, explode

# Bước 1: Sinh tất cả cặp sản phẩm từ Basket
basket_pairs = (
    Custom_oneday_df
    .withColumn("pairs", expr("""
        flatten(
            transform(Basket, x ->
                transform(Basket, y ->
                    IF(x < y, array(x, y), NULL)
                )
            )
        )
    """))  # Tạo danh sách cặp (x, y) và đảm bảo cặp luôn có thứ tự (x < y)
    .withColumn("pairs", expr("FILTER(pairs, x -> x[0] != x[1])"))  # Loại bỏ cặp (x, x)
    .withColumn("pair", explode(col("pairs")))  # Phân rã danh sách cặp
    .select("pair")  # Chỉ lấy cột chứa cặp
)

filtered_pairs = (
    basket_pairs
    .join(candidate_pairs, col("pair") == expr("array(item1, item2)"), "inner")
)

# Bước 3: Đếm số lần xuất hiện của từng cặp
pair_support = (
    filtered_pairs
    .groupBy("item1", "item2")
    .count()
    .filter(col("count") >= 15)

)

# 👉 **Thêm bản sao hoán đổi item1 ↔ item2**
pair_support = pair_support.union(
    pair_support.selectExpr("item2 as item1", "item1 as item2", "count")
)


# Hiển thị kết quả
pair_support.show(truncate=False)

+----------------+---------------------+-----+
|item1           |item2                |count|
+----------------+---------------------+-----+
|pastry          |soda                 |61   |
|hard cheese     |pip fruit            |16   |
|meat            |other vegetables     |32   |
|soda            |tropical fruit       |81   |
|bottled water   |butter               |27   |
|domestic eggs   |fruit/vegetable juice|16   |
|citrus fruit    |whole milk           |107  |
|domestic eggs   |pork                 |18   |
|domestic eggs   |tropical fruit       |32   |
|cream cheese    |frankfurter          |15   |
|other vegetables|whole milk           |222  |
|margarine       |whipped/sour cream   |17   |
|berries         |soda                 |22   |
|onions          |root vegetables      |20   |
|hamburger meat  |tropical fruit       |20   |
|soda            |specialty bar        |19   |
|processed cheese|root vegetables      |16   |
|other vegetables|sausage              |90   |
|bottled wate

In [None]:
pair_support.count()

1184

In [None]:

# 👉 **Bước 1: Join để thêm item1_support**
pair_support = pair_support.join(
    df.selectExpr("item as item1", "support as item1_support"),
    on="item1",
    how="left"
)

# 👉 **Bước 2: Join để thêm item2_support**
pair_support = pair_support.join(
    df.selectExpr("item as item2", "support as item2_support"),
    on="item2",
    how="left"
)
pair_support = (
    pair_support
    .select("item1", "item2", "count", "item1_support", "item2_support")  # Chỉ giữ cần thiết
    .withColumn("support", F.round(col("count") / 14963, 4))  # Tạo cột support
    .drop("count")  # Xóa cột count
    .withColumn("confidence", F.round(col("support") / col("item1_support"), 4))  # Tính confidence
)



In [None]:
pair_support.count()

1184

## Association rules

In [None]:
pair_support = pair_support.filter(
    expr("confidence >= 0.1")
)

In [None]:

pair_support = pair_support.orderBy(desc("confidence"))

# 👉 **Hiển thị kết quả**
pair_support.show(truncate=False)

+-------------------------+----------+-------------+-------------+-------+----------+
|item1                    |item2     |item1_support|item2_support|support|confidence|
+-------------------------+----------+-------------+-------------+-------+----------+
|semi-finished bread      |whole milk|0.0095       |0.1579       |0.0017 |0.1789    |
|detergent                |whole milk|0.0086       |0.1579       |0.0014 |0.1628    |
|frozen fish              |whole milk|0.0068       |0.1579       |0.0011 |0.1618    |
|bottled beer             |whole milk|0.0453       |0.1579       |0.0072 |0.1589    |
|ham                      |whole milk|0.0171       |0.1579       |0.0027 |0.1579    |
|sausage                  |whole milk|0.0603       |0.1579       |0.009  |0.1493    |
|processed cheese         |rolls/buns|0.0102       |0.11         |0.0015 |0.1471    |
|processed cheese         |whole milk|0.0102       |0.1579       |0.0015 |0.1471    |
|candy                    |whole milk|0.0144       |0.

In [None]:
pair_support.count()

111

In [None]:
from pyspark.sql.functions import concat_ws, col, lit

pair_support = pair_support.withColumn(
    "rule",
    concat_ws(" -> ", concat_ws("", lit("{"), col("item1"), lit("}")), concat_ws("", lit("{"), col("item2"), lit("}")))
)

# Lưu vào file TXT
pair_support.select("rule").coalesce(1).write.mode("overwrite").text("rules.txt")

In [None]:
spark.stop()