In [1]:
import csv
import time
import os
import psutil
from typing import List, Dict, Any, Tuple, Generator
from collections import defaultdict
import matplotlib.pyplot as plt

In [2]:
class Transaction:
    """Lớp đại diện cho một giao dịch trong dữ liệu không chắc chắn.

    Examples
    --------
    >>> transaction = Transaction(
    ...     id=1,
    ...     items=[1, 2, 3],
    ...     utilities=[10, 20, 30],
    ...     probabilities=[0.5, 0.3, 0.2]
    ... )
    >>> print(transaction)
    Transaction(1, [1, 2, 3], [10, 20, 30], [0.5, 0.3, 0.2])
    >>> transaction.to_dict()
    {
        'id': 1,
        'items': [1, 2, 3],
        'utilities': [10, 20, 30],
        'probabilities': [0.5, 0.3, 0.2]
    }
    """

    def __init__(
        self,
        id: int,
        items: List[int],
        utilities: List[float],
        probabilities: List[float],
    ) -> None:
        """Khởi tạo một giao dịch với id, items, utilities và probabilities cho mỗi item.

        Parameters
        ----------
        id : int
            Id của giao dịch.
        items : List[int]
            Danh sách các item trong giao dịch.
        utilities : List[float]
            Danh sách các giá trị của các item trong giao dịch.
        probabilities : List[float]
            Danh sách xác suất của các item trong giao dịch.
        """
        self.id = id
        self.items = items
        self.utilities = utilities
        self.probabilities = probabilities

    def to_dict(self) -> Dict[str, Any]:
        """Trả về giao dịch dưới dạng Dictionary.

        Returns
        -------
        Dict[str, Any]
            Dictionary chứa thông tin của giao dịch.
        """
        return {
            "id": self.id,
            "items": self.items,
            "utilities": self.utilities,
            "probabilities": self.probabilities,
        }

    def __repr__(self) -> str:
        """Trả về một chuỗi biểu diễn của đối tượng Transaction."""
        return f"Transaction({self.id}, {self.items}, {self.utilities}, {self.probabilities})"

In [3]:
class UDB:
    """Lớp quản lý cơ sở dữ liệu không chắc chắn.

    Examples
    --------
    >>> udb = UDB("sample_data.csv")
    >>> transactions = udb.get_transactions()
    >>> for transaction in transactions:
    ...     print(transaction)
    [
        Transaction(1, [1, 2, 3], [10.0, 20.0, 30.0], [0.5, 0.3, 0.2]),
        Transaction(2, [1, 4], [10.0, 40.0], [0.6, 0.4]),
    ]
    """

    def __init__(self, file_path: str) -> None:
        """Khởi tạo một cơ sở dữ liệu không chắc chắn với đường dẫn file.

        Parameters
        ----------
        file_path : str
            Đường dẫn file chứa dữ liệu không chắc chắn.
        """
        self.file_path = file_path

    def read_file(self) -> Generator[Transaction, None, None]:
        """Đọc file và tạo Generator chứa các đối tượng Transaction.

        Returns
        -------
        Generator[Transaction, None, None]
            Generator chứa các đối tượng Transaction.
        """
        with open(self.file_path, "r") as file:
            csv_reader = csv.DictReader(file)
            for id, row in enumerate(csv_reader, start=1):
                try:
                    # Chuyển đổi các chuỗi thành danh sách
                    items = list(map(int, row["items"].split()))
                    utilities = list(map(float, row["item_utilities"].split()))
                    probabilities = list(map(float, row["item_probabilities"].split()))

                    # Kiểm tra tính hợp lệ
                    if len(items) != len(utilities) or len(items) != len(probabilities):
                        raise ValueError(
                            "The length of items, utilities, and probabilities must be the same."
                        )
                    if not all(0 <= p <= 1 for p in probabilities):
                        raise ValueError(
                            "The probabilities must be in the range [0, 1]."
                        )

                    # Trả về từng giao dịch dưới dạng Generator
                    yield Transaction(id, items, utilities, probabilities)
                except Exception as e:
                    print(f"Error at line {id}: {e}")

    def get_transactions(self) -> List[Transaction]:
        """Trả về một danh sách các giao dịch từ file.

        Returns
        -------
        List[Transaction]
            Danh sách các giao dịch.
        """
        return list(self.read_file())

# Algorithm ITUFP


In [4]:
class UPList:
    """Lớp đại diện cho UP-List.

    Examples
    --------
    >>> uplist = UPList(item_name="1")
    >>> uplist.add_entry(tid=1, probability=0.5, utility=10)
    >>> uplist.add_entry(tid=2, probability=0.3, utility=20)
    >>> uplist.add_entry(tid=3, probability=0.2, utility=30)
    >>> print(uplist)
    UPList(1: [(1, 0.5, 10), (2, 0.3, 20), (3, 0.2, 30)], Total Utility: 60.0, Expected Support: 1.0)
    """

    def __init__(self, item_name: str) -> None:
        """Khởi tạo UPList

        Parameters
        ----------
        item_name : str
            Tên của mục.
        """
        self.item_name: str = item_name
        # List các entry (tid, probability, utility)
        self.entries: List[Tuple[int, float, float]] = []
        self.total_utility: float = 0.0
        self.exp_support: float = 0.0

    def add_entry(self, tid: int, probability: float, utility: float) -> None:
        """Thêm một entry vào UP-List.

        Parameters
        ----------
        tid : int
            ID của giao dịch.
        probability : float
            Xác suất của mục trong giao dịch.
        utility : float
            Giá trị hữu ích của mục trong giao dịch.
        """
        self.entries.append((tid, probability, utility))
        self.total_utility += utility
        self.exp_support += probability

    def __repr__(self) -> str:
        """Trả về chuỗi đại diện cho UP-List.

        Returns
        ----------
        str
            Chuỗi đại diện cho UP-List.
        """
        return (
            f"UPList({self.item_name}: {self.entries}, "
            f"Total Utility: {self.total_utility}, "
            f"Expected Support: {self.exp_support})"
        )

In [5]:
class UplistManager:
    """Lớp quản lý các UPLists, hỗ trợ truy xuất và xử lý."""

    def __init__(self, up_lists: List[UPList]) -> None:
        """Khởi tạo UplistManager với danh sách UPList."""
        self.up_lists = up_lists

    def get_uplist(self, item_name: int) -> UPList:
        """Lấy UPList theo tên mục.

        Parameters
        ----------
        item_name : int
            Tên của mục (item_name).

        Returns
        -------
        UPList
            UPList tương ứng với item_name.
        """
        for uplist in self.up_lists:
            if uplist.item_name == item_name:
                return uplist
        raise ValueError(f"UPList with item_name {item_name} not found.")

In [6]:
class IMCUPList:
    """Lớp đại diện cho IMCUP-List, lưu trữ các mẫu kết hợp và utilities của chúng.

    Examples
    --------
    >>> uplist_1 = UPList(item_name="1")
    >>> uplist_1.add_entry(tid=1, probability=0.5, utility=10)
    >>>
    >>> uplist_2 = UPList(item_name="2")
    >>> uplist_2.add_entry(tid=1, probability=0.6, utility=15)
    >>>
    >>> uplist_3 = UPList(item_name="3")
    >>> uplist_3.add_entry(tid=1, probability=0.7, utility=20)
    >>>
    >>> imcup_list_12 = IMCUPList(pattern_name="1,2")
    >>> imcup_list_12.construct(uplist_1, uplist_2, prefix_indices=[], uplist_manager=uplist_manager)
    >>>
    >>> imcup_list_13 = IMCUPList(pattern_name="1,3")
    >>> imcup_list_13.construct(uplist_1, uplist_3, prefix_indices=[], uplist_manager=uplist_manager)
    >>>
    >>> print(imcup_list_12)
    IMCUPList(1,2: [(1, 0.3, 25.0)], Total Utility: 25.0, Expected Support: 0.3)
    >>>
    >>> print(imcup_list_13)
    IMCUPList(1,3: [(1, 0.35, 30.0)], Total Utility: 30.0, Expected Support: 0.35)

    >>> imcup_list_123 = IMCUPList(pattern_name="1,2,3")
    >>> imcup_list_123.construct(imcup_list_12, imcup_list_13, prefix_indices=[1], uplist_manager=uplist_manager)
    >>> print(imcup_list_123)
    IMCUPList(1,2,3: [(1, 0.21, 45.0)], Total Utility: 45.0, Expected Support: 0.21)
    """

    def __init__(self, pattern_name: str) -> None:
        """Khởi tạo IMCUP-List với tên mẫu."""
        self.pattern_name: str = pattern_name
        # List các entry (tid, probability, utility)
        self.entries: List[Tuple[int, float, float]] = []
        self.total_utility: float = 0.0
        self.exp_support: float = 0.0

    def construct(
        self,
        list1: UPList or "IMCUPList",  # type: ignore
        list2: UPList or "IMCUPList",  # type: ignore
        prefix_indices: List[int],
        uplist_manager: List[UPList],
    ) -> None:
        """
        Tạo IMCUP-List bằng cách kết hợp hai danh sách (IMCUPList hoặc UPList).

        Parameters
        ----------
        list1 : IMCUPList or UPList
            Danh sách đầu tiên.
        list2 : IMCUPList or UPList
            Danh sách thứ hai.
        prefix_indices : List[int]
            Danh sách các chỉ số của các mục cần làm tiền tố.
        uplist_manager : UPListManager
            Lớp quản lý UPLists.
        """
        idx1, idx2 = 0, 0

        # Lấy danh sách UPLists của các tiền tố (nếu có)
        prefix_lists = [uplist_manager.get_uplist(idx) for idx in prefix_indices]

        while idx1 < len(list1.entries) and idx2 < len(list2.entries):
            tid1, prob1, util1 = list1.entries[idx1]
            tid2, prob2, util2 = list2.entries[idx2]

            if tid1 == tid2:  # Nếu cùng giao dịch (TID)
                combined_prob = prob1 * prob2
                combined_util = util1 + util2

                # Loại bỏ utility trùng lặp từ các tiền tố
                for prefix_list in prefix_lists:
                    for tid_prefix, prob_prefix, util_prefix in prefix_list.entries:
                        if tid_prefix == tid1:
                            combined_prob /= prob_prefix
                            combined_util -= util_prefix
                            break

                # Cập nhật vào IMCUPList
                self.entries.append((tid1, combined_prob, combined_util))
                self.total_utility += combined_util
                self.exp_support += combined_prob

                idx1 += 1
                idx2 += 1
            elif tid1 < tid2:
                idx1 += 1
            else:
                idx2 += 1

    def __repr__(self) -> str:
        """Trả về chuỗi đại diện cho IMCUP-List.

        Returns
        -------
        str
            Chuỗi đại diện cho IMCUP-List.
        """
        return (
            f"IMCUPList({self.pattern_name}: {self.entries}, "
            f"Total Utility: {self.total_utility}, "
            f"Expected Support: {self.exp_support})"
        )

In [14]:
# Ví dụ sử dụng
udb = UDB("./Uncertain_DB/sample_data.csv")
transactions = udb.get_transactions()

def generate_up_lists():
    up_lists = defaultdict(lambda: UPList(item_name=""))

    for transaction in transactions:
        for item, prob, util in zip(
            transaction.items, transaction.probabilities, transaction.utilities
        ):
            if not up_lists[item].item_name:
                up_lists[item].item_name = item
            up_lists[item].add_entry(transaction.id, prob, util)

    return UplistManager(
        sorted(up_lists.values(), key=lambda x: x.exp_support, reverse=True)
    )

uplist_manager = generate_up_lists()
for uplist in uplist_manager.up_lists[:5]:
    print(uplist)

up_list1 = uplist_manager.get_uplist(1)
up_list2 = uplist_manager.get_uplist(2)
up_list3 = uplist_manager.get_uplist(3)
up_list4 = uplist_manager.get_uplist(4)

imcup_list_12 = IMCUPList(pattern_name="1,2")
imcup_list_12.construct(
    up_list1, up_list2, prefix_indices=[], uplist_manager=uplist_manager
)

imcup_list_13 = IMCUPList(pattern_name="1,3")
imcup_list_13.construct(
    up_list1, up_list3, prefix_indices=[], uplist_manager=uplist_manager
)

imcup_list_14 = IMCUPList(pattern_name="1,4")
imcup_list_14.construct(
    up_list1, up_list4, prefix_indices=[], uplist_manager=uplist_manager
)

imcup_list_123 = IMCUPList(pattern_name="1,2,3")
imcup_list_123.construct(
    imcup_list_12, imcup_list_13, prefix_indices=[1], uplist_manager=uplist_manager
)

imcup_list_124 = IMCUPList(pattern_name="1,2,4")
imcup_list_124.construct(
    imcup_list_12, imcup_list_14, prefix_indices=[1], uplist_manager=uplist_manager
)

print(imcup_list_12)
print(imcup_list_13)
print(imcup_list_14)
print(imcup_list_123)
print(imcup_list_124)

UPList(1: [(1, 0.7, 10.0), (2, 0.4, 40.0), (5, 1.0, 50.0), (7, 0.9, 30.0)], Total Utility: 130.0, Expected Support: 3.0)
UPList(2: [(2, 0.4, 20.0), (3, 0.6, 20.0), (5, 0.7, 30.0), (7, 0.9, 40.0)], Total Utility: 110.0, Expected Support: 2.6)
UPList(3: [(1, 0.1, 30.0), (2, 0.5, 30.0), (3, 0.3, 30.0), (4, 0.4, 30.0), (6, 0.2, 20.0), (7, 0.9, 50.0)], Total Utility: 190.0, Expected Support: 2.4)
UPList(4: [(2, 0.9, 50.0), (3, 0.2, 10.0), (4, 0.4, 40.0), (6, 0.8, 60.0)], Total Utility: 160.0, Expected Support: 2.3)
IMCUPList(1,2: [(2, 0.16000000000000003, 60.0), (5, 0.7, 80.0), (7, 0.81, 70.0)], Total Utility: 210.0, Expected Support: 1.67)
IMCUPList(1,3: [(1, 0.06999999999999999, 40.0), (2, 0.2, 70.0), (7, 0.81, 80.0)], Total Utility: 190.0, Expected Support: 1.08)
IMCUPList(1,4: [(2, 0.36000000000000004, 90.0)], Total Utility: 90.0, Expected Support: 0.36000000000000004)
IMCUPList(1,2,3: [(2, 0.08000000000000002, 90.0), (7, 0.7290000000000001, 120.0)], Total Utility: 210.0, Expected Suppo

In [None]:
class ITUFP:
    """Thuật toán ITUFP để khai thác Top-K High-Utility Itemsets từ cơ sở dữ liệu không chắc chắn.

    Examples
    --------
    >>> udb = UDB("sample_data.csv")
    >>> itufp = ITUFP(udb, k=2)
    >>> top_k_itemsets = itufp.run()
    >>> print(top_k_itemsets)
    [
        ('1', 60.0, 1.5),
        ('1,2', 25.0, 0.3)
    ]
    """

    def __init__(self, udb: UDB, k: int):
        """Khởi tạo đối tượng ITUFP.

        Parameters
        -----------
        udb : UDB
            Cơ sở dữ liệu không chắc chắn.
        k : int
            Số lượng Top-K High-Utility Itemsets cần tìm.
        """

        self.udb: UDB = udb
        self.k: int = k
        self.top_k: List[Tuple[str, float, float]] = []
        self.min_sup: float = 0.0

        # Tạo danh sách UPLists
        self.uplist_manager: UplistManager = self.generate_up_lists()
        self._pattern_indices = {}

    def generate_up_lists(self) -> UplistManager:
        """Tạo danh sách UPLists từ cơ sở dữ liệu UDB và sắp xếp theo expSupport (giảm dần).

        Returns
        -------
        UplistManager
            Lớp quản lý UPLists.
        """

        up_lists = defaultdict(lambda: UPList(item_name=""))

        for transaction in self.udb.get_transactions():
            for item, prob, util in zip(
                transaction.items, transaction.probabilities, transaction.utilities
            ):
                if not up_lists[item].item_name:
                    up_lists[item].item_name = item
                up_lists[item].add_entry(transaction.id, prob, util)

        return UplistManager(
            sorted(up_lists.values(), key=lambda x: x.exp_support, reverse=True)
        )

    def find_prefix_indices(self, pattern1: str, pattern2: str) -> List[int]:
        """Tìm danh sách các mục tiền tố trùng lặp giữa hai mẫu.

        Parameters
        ----------
        pattern1 : str
            Tên của mẫu thứ nhất.
        pattern2 : str
            Tên của mẫu thứ hai.

        Returns
        -------
        List[int]
            Danh sách các mục trùng lặp theo thứ tự xuất hiện.
        """
        key = f"{pattern1}_{pattern2}"
        if key not in self._pattern_indices:
            items1 = tuple(map(int, pattern1.split(",")))
            items2 = tuple(map(int, pattern2.split(",")))
            self._pattern_indices[key] = [item for item in items1 if item in items2]
        return self._pattern_indices[key]

    def mine_patterns(self, uplist_manager: UplistManager) -> None:
        """Đệ quy khai thác các tổ hợp từ UPLists và IMCUPLists.

        Parameters
        ----------
        uplist_manager : UplistManager
            Lớp quản lý UPLists.
        """

        imcup_lists = []
        up_lists = uplist_manager.up_lists

        # Lọc các UPLists có expSupport > minSup
        valid_indices = [
            i for i, up in enumerate(up_lists) if up.exp_support > self.min_sup
        ]

        # Tạo IMCUPLists từ các cặp UPLists
        for i in valid_indices:
            for j in range(i + 1, len(valid_indices)):
                idx_j = valid_indices[j]

                # Early pruning cho cặp items
                if (
                    up_lists[i].exp_support * up_lists[idx_j].exp_support
                    <= self.min_sup
                ):
                    continue

                imcup = IMCUPList(
                    f"{uplist_manager.up_lists[i].item_name},{uplist_manager.up_lists[j].item_name}"
                )

                imcup.construct(
                    uplist_manager.up_lists[i],
                    uplist_manager.up_lists[j],
                    [],
                    uplist_manager,
                )

                if imcup.exp_support <= self.min_sup:
                    continue

                imcup_lists.append(imcup)

                self._update_top_k(imcup)

        if imcup_lists:
            self.itufp_growth(imcup_lists, uplist_manager)

    def _update_top_k(self, item):
        pattern_name = item.item_name if isinstance(item, UPList) else item.pattern_name
        entry = (pattern_name, item.total_utility, item.exp_support)

        self.top_k.append(entry)
        self.top_k.sort(key=lambda x: x[1], reverse=True)

        if len(self.top_k) > self.k:
            self.top_k.pop()
            self.min_sup = self.top_k[-1][2]

    def itufp_growth(
        self,
        imcup_lists: List[IMCUPList],
        uplist_manager: UplistManager,
    ) -> None:
        """Đệ quy mở rộng khai thác các tổ hợp dài hơn từ IMCUPLists.

        Parameters
        ----------
        imcup_lists : List[IMCUPList]
            Danh sách các IMCUPLists cần mở rộng.
        uplist_manager : UplistManager
            Lớp quản lý UPLists.
        """

        next_level = []

        # Early pruning với min_sup
        valid_imcups = [
            imcup for imcup in imcup_lists if imcup.exp_support > self.min_sup
        ]

        for i, imcup1 in enumerate(valid_imcups):
            prefix1 = tuple(imcup1.pattern_name.split(",")[:-1])

            for imcup2 in valid_imcups[i + 1 :]:
                if tuple(imcup2.pattern_name.split(",")[:-1]) != prefix1:
                    continue

                new_pattern_name = (
                    f"{imcup1.pattern_name},{imcup2.pattern_name.split(',')[-1]}"
                )

                new_imcup = IMCUPList(new_pattern_name)

                # Tìm prefix indices
                prefix_indices = self.find_prefix_indices(
                    imcup1.pattern_name, imcup2.pattern_name
                )

                new_imcup.construct(imcup1, imcup2, prefix_indices, uplist_manager)

                if new_imcup.exp_support <= self.min_sup:
                    continue

                next_level.append(new_imcup)

                self._update_top_k(new_imcup)

        if next_level:
            self.itufp_growth(next_level, uplist_manager)

    def run(self) -> List[Tuple[str, float, float, float]]:
        """Chạy thuật toán ITUFP để tìm Top-K High-Utility Itemsets.

        Returns
        -------
        List[Tuple[str, float, float, float]]
            Danh sách các High-Utility Itemsets với tên, tổng utility, expSupport và minSup.
        """

        # Khai thác 1-itemsets
        valid_uplists = [
            up for up in self.uplist_manager.up_lists if up.exp_support > self.min_sup
        ]

        for uplist in valid_uplists:
            self._update_top_k(uplist)

        # Khai thác các tổ hợp dài hơn
        self.mine_patterns(self.uplist_manager)

        return self.top_k