# HUOPM: High-Utility Occupancy Pattern Mining
This notebook implements the HUOPM algorithm for discovering high-utility occupancy patterns in transactional databases.

<details>
<summary><strong>Table of Contents</strong></summary>

1. [Imports and Setup](#imports-and-setup)
   Libraries and shared utilities.
2. [UtilityOccupancyList Class](#utilityoccupancylist-class)
   UO-list data structure.
3. [HUOPM Mining Class](#huopm-mining-class)
   Core HUOPM algorithm.
4. [Example Run (Toy Data)](#example-run-toy-data)
   Sanity check run.
</details>


## Imports and Setup


In [1]:
from collections import defaultdict
import time
import random
from typing import Dict, List, Tuple, Set, Optional
import pandas as pd
import tracemalloc
from IPython.display import display
import matplotlib.pyplot as plt
import os
import glob
import seaborn as sns

## UtilityOccupancyList Class
Represents the UO-list for tracking utility-occupancy stats per itemset.

In [2]:
class UtilityOccupancyList:
    """
    Represents a Utility-Occupancy List (UO-list) for a specific itemset.
    
    This structure stores transaction-level utility occupancy information and
    maintains aggregate statistics (support, average UO, average RUO) that
    form the Frequency-Utility (FU) table.

    Attributes:
        itemset (tuple): The itemset this UO-list represents (e.g., ('a', 'c'))
        entries (dict): Transaction entries {tid: {'uo': float, 'ruo': float}}
        sup (int): Support count (number of transactions containing this itemset)
        sum_uo (float): Sum of utility occupancy values across all transactions
        sum_ruo (float): Sum of remaining utility occupancy values
        uo (float): Average utility occupancy (sum_uo / sup)
        ruo (float): Average remaining utility occupancy (sum_ruo / sup)

    Example:
        >>> uo_list = UtilityOccupancyList(('a', 'c'))
        >>> uo_list.add_entry('T1', uo=0.5, ruo=0.2)
        >>> uo_list.add_entry('T2', uo=0.3, ruo=0.7)
        >>> uo_list.finalize_metrics()
        >>> print(uo_list.get_sup())    # Output: 2
        >>> print(uo_list.get_uo())     # Output: 0.4
        >>> print(uo_list.get_ruo())    # Output: 0.45
        >>> print(repr(uo_list))        # Output: UO-List(('a', 'c'), sup=2, uo=0.4000, ruo=0.4500)
    """

    def __init__(self, itemset: Tuple[str, ...]):
        """
        Initialize a UO-list for a given itemset.

        Args:
            itemset (tuple of str): The itemset for this UO-list, such as ('a', 'c')

        This initializes an empty set of transaction entries and zeros for all
        aggregate statistics.
        """
        self.itemset = tuple(itemset)
        self.entries: Dict[str, Dict[str, float]] = {}
        self.sup: int = 0
        self.sum_uo: float = 0.0
        self.sum_ruo: float = 0.0
        self.uo: float = 0.0
        self.ruo: float = 0.0

    def add_entry(self, tid: str, uo: float, ruo: float) -> None:
        """
        Add a transaction record to the UO-list.

        Args:
            tid (str): Unique transaction identifier.
            uo (float): Utility occupancy value for the itemset in this transaction.
            ruo (float): Remaining utility occupancy (from other items) in the transaction.

        If the transaction is not already present, this records the entry and updates
        the support count and running totals for UO and RUO values.

        Example:
            >>> uo_list.add_entry('T3', uo=0.8, ruo=0.1)
            >>> print(uo_list.sup)
            # Output: incremented support count
        """
        if tid not in self.entries:
            self.entries[tid] = {'uo': uo, 'ruo': ruo}
            self.sup += 1
            self.sum_uo += uo
            self.sum_ruo += ruo

    def finalize_metrics(self) -> None:
        """
        Calculate average utility occupancy and remaining utility occupancy for this itemset.

        This method should be called after all entries are added, before querying UO or RUO.

        Updates:
            - self.uo: Average utility occupancy (sum_uo / sup)
            - self.ruo: Average remaining utility occupancy (sum_ruo / sup)

        Example:
            >>> uo_list.finalize_metrics()
            >>> print(uo_list.uo)   # Output: average UO
            >>> print(uo_list.ruo)  # Output: average RUO
        """
        if self.sup > 0:
            self.uo = self.sum_uo / self.sup
            self.ruo = self.sum_ruo / self.sup
        else:
            self.uo = 0.0
            self.ruo = 0.0

    def get_sup(self) -> int:
        """
        Return the support count for the itemset.

        Returns:
            int: Number of transactions containing this itemset.

        Example:
            >>> support = uo_list.get_sup()
            >>> print(support)
            # Output: support count as integer
        """
        return self.sup
        
    def get_uo(self) -> float:
        """
        Return the computed average utility occupancy for the itemset.

        Returns:
            float: The average utility occupancy (UO) value.

        Example:
            >>> avg_uo = uo_list.get_uo()
            >>> print(avg_uo)
            # Output: average UO as float
        """
        return self.uo
    
    def get_ruo(self) -> float:
        """
        Return the computed average remaining utility occupancy for the itemset.

        Returns:
            float: The average remaining utility occupancy (RUO) value.

        Example:
            >>> avg_ruo = uo_list.get_ruo()
            >>> print(avg_ruo)
            # Output: average RUO as float
        """
        return self.ruo

    def __repr__(self) -> str:
        """
        Return a readable string summary of the UO-list object.

        Displays:
            - The itemset tuple
            - Support count
            - Average UO (to 4 decimals)
            - Average RUO (to 4 decimals)

        Example:
            >>> print(repr(uo_list))
            # Output: UO-List(('a', 'c'), sup=2, uo=0.4000, ruo=0.4500)
        """
        return f"UO-List({self.itemset}, sup={self.sup}, uo={self.uo:.4f}, ruo={self.ruo:.4f})"


## HUOPM Mining Class
Implements the main HUOPM mining algorithm.

In [3]:
class HUOPM:
    """
    High-Utility Occupancy Pattern Mining (HUOPM) Algorithm.

    This class implements the HUOPM algorithm, which discovers patterns that satisfy both 
    minimum support (frequency) and minimum utility occupancy (average utility ratio) thresholds.

    Attributes:
        min_sup_ratio (float): Minimum support threshold (alpha), range [0, 1]
        min_uo_ratio (float): Minimum utility occupancy threshold (beta), range [0, 1]
        min_sup_count (int): Minimum support count (auto-calculated after data loaded)
        transactions_dict (dict): Transaction data {tid: [(item, qty), ...]}
        profit_table (dict): Profit/unit-utility per item {item: profit}
        tu_table (dict): Transaction utility table {tid: TU}
        total_order (list): List of items ordered by minimum support
        item_to_order_index (dict): Index map for each item in total_order
        huops (list): Final output, list of (itemset, support, uo) patterns

    Example:
        >>> miner = HUOPM(0.02, 0.3)
        >>> results = miner.fit(transactions_dict, profit_table)
        >>> print(results)
        [(('a', 'c'), 10, 0.42), ...]
    """
    
    def __init__(self, min_sup_ratio: float, min_uo_ratio: float):
        """
        Initialize an HUOPM miner with given frequency and utility occupancy thresholds.

        Args:
            min_sup_ratio (float): Minimum support ratio (alpha), value in [0, 1].
            min_uo_ratio (float): Minimum utility occupancy ratio (beta), value in [0, 1].

        Raises:
            ValueError: If either threshold is not within allowed range [0, 1].

        Example:
            >>> miner = HUOPM(0.01, 0.25)
        """
        if not (0 <= min_sup_ratio <= 1):
            raise ValueError(f"min_sup_ratio must be in [0, 1], got {min_sup_ratio}")
        if not (0 <= min_uo_ratio <= 1):
            raise ValueError(f"min_uo_ratio must be in [0, 1], got {min_uo_ratio}")
            
        self.min_sup_ratio = min_sup_ratio
        self.min_uo_ratio = min_uo_ratio
        
        # Mining state variables
        self.min_sup_count: int = 0
        self.transactions_dict: Dict[str, List[Tuple[str, int]]] = {}
        self.profit_table: Dict[str, float] = {}
        self.tu_table: Dict[str, float] = {}
        self.total_order: List[str] = []
        self.item_to_order_index: Dict[str, int] = {}
        self.huops: List[Tuple[Tuple[str, ...], int, float]] = []

    def fit(self, transactions_dict: Dict[str, List[Tuple[str, int]]], profit_table: Dict[str, float]) -> List[Tuple[Tuple[str, ...], int, float]]:
        """
        Execute the HUOPM algorithm on transactional data.

        Args:
            transactions_dict (dict): Transaction DB {tid: [(item, qty), ...]}.
            profit_table (dict): Profit/unit-utility for each item {item: profit}.

        Returns:
            List[Tuple[Tuple[str, ...], int, float]]: List of patterns (itemset, support, uo).

        Raises:
            ValueError: If input dictionaries are empty.

        Example:
            >>> miner = HUOPM(0.01, 0.2)
            >>> miner.fit(tx_db, profits)
        """
        if not transactions_dict:
            raise ValueError("transactions_dict cannot be empty")
        if not profit_table:
            raise ValueError("profit_table cannot be empty")
        
        print("Starting HUOPM Algorithm...")
        start_time = time.time()
        
        self.transactions_dict = transactions_dict
        self.profit_table = profit_table
        self.min_sup_count = int(self.min_sup_ratio * len(transactions_dict))
        
        if self.min_sup_count < 1:
            self.min_sup_count = 1
        
        print(f"  Total transactions: {len(transactions_dict)}")
        print(f"  min_sup_count: {self.min_sup_count} (alpha: {self.min_sup_ratio})")
        print(f"  min_uo_ratio (beta): {self.min_uo_ratio}")

        # PASS 1: Calculate TU, item supports, total order ===
        print("Phase 1: Scanning database for support and TU...")
        pass1_start = time.time()
        I_star, self.total_order = self._scan1()
        self.item_to_order_index = {item: i for i, item in enumerate(self.total_order)}
        print(f"  Frequent 1-itemsets (I*): {len(I_star)}")

        # PASS 2: Build UO-lists for 1-itemsets ===
        print("Phase 2: Building initial UO-lists...")
        pass2_start = time.time()
        initial_extensions = self._scan2(I_star)
        print(f"  Initial UO-lists built: {len(initial_extensions)}")

        # RECURSIVE SEARCH: Find all HUOPs ===
        print("Phase 3: Starting recursive HUOP mining...")
        search_start = time.time()
        self._huop_search(None, initial_extensions)
        
        total_time = time.time() - start_time
        print(f"Mining completed in {total_time:.4f}s")
        print(f"Total HUOPs discovered: {len(self.huops)}")
        
        return self.huops

    def _scan1(self) -> Tuple[Set[str], List[str]]:
        """
        Pass1: First database scan: Calculate transaction utilities and establish total order.

        Returns:
            Tuple[Set[str], List[str]]: A tuple containing:
                - I_star (set of str): Set of frequent items (items with support >= min_sup_count)
                - total_order (list of str): Sorted list of frequent items ordered by 
                (support_count, item_name) for deterministic processing

        Example:
            >>> I_star, total_order = miner._scan1()
            >>> print(f"Frequent items: {I_star}")
            >>> print(f"Total order: {total_order}")
            {'a', 'b', 'c', 'd'}
            ['a', 'c', 'b', 'd']
        """
        item_sup = defaultdict(int)
        
        # Reset TU table to ensure no stale data
        self.tu_table = {} 

        for tid, transaction in self.transactions_dict.items():
            current_tu = 0.0
            seen_items = set()
            for item, qty in transaction:
                if item in self.profit_table:
                    # Force float calculation for consistency
                    profit = float(self.profit_table[item])
                    current_tu += float(qty) * profit
                    seen_items.add(item)
            
            self.tu_table[tid] = current_tu
            for item in seen_items:
                item_sup[item] += 1
        
        I_star = {item for item, sup in item_sup.items() if sup >= self.min_sup_count}
        
        self.total_order = sorted(list(I_star), key=lambda i: (item_sup[i], i))
        
        return I_star, self.total_order

    def _scan2(self, I_star: Set[str]) -> List["UtilityOccupancyList"]:
        """
        Pass 2: Build UO-lists for 1-itemsets using support, ordered items, and transaction utilities.

        Args:
            I_star (set of str): Set of frequent 1-itemsets.

        Returns:
            List[UtilityOccupancyList]: Initial list of UtilityOccupancyList objects, one per frequent item.

        Example:
            >>> initial_uols = miner._scan2(I_star)
        """
        uo_lists = {item: UtilityOccupancyList((item,)) for item in I_star}
        
        for tid, transaction in self.transactions_dict.items():
            tu = self.tu_table.get(tid, 0.0)
            
            if tu == 0:
                continue
            
            frequent_items_in_tx = []
            item_utilities = {}
            
            for item, qty in transaction:
                if item in I_star and item in self.profit_table:
                    profit = self.profit_table[item]
                    utility = qty * profit
                    if utility > 0:
                        frequent_items_in_tx.append(item)
                        item_utilities[item] = utility
            
            sorted_items = sorted(
                frequent_items_in_tx,
                key=lambda i: self.item_to_order_index.get(i, float('inf'))
            )
            
            for i, item in enumerate(sorted_items):
                item_uo = item_utilities[item] / tu
                item_ruo = 0.0
                for j in range(i + 1, len(sorted_items)):
                    item_ruo += item_utilities[sorted_items[j]] / tu
                
                uo_lists[item].add_entry(tid, item_uo, item_ruo)
        
        final_extensions = []
        for item in self.total_order:
            if item in uo_lists:
                uol = uo_lists[item]
                uol.finalize_metrics()
                final_extensions.append(uol)
        
        return final_extensions

    def _huop_search(self, prefix_UOL: Optional["UtilityOccupancyList"], extensions_list: List["UtilityOccupancyList"]) -> None:
        """
        Perform depth-first recursive search for high-utility occupancy patterns.
        
        Applies:
            - Frequency pruning
            - Pattern acceptance
            - Upper-bound pruning (ϕ̂)

        Args:
            prefix_UOL (UtilityOccupancyList | None): Current UO-list used as prefix, or None for root.
            extensions_list (list of UtilityOccupancyList): List of extensions to explore.

        Returns:
            None

        Example:
            >>> miner._huop_search(None, initial_uols)
        """
        for i in range(len(extensions_list)):
            Xa_UOL = extensions_list[i]
            
            # Strategy 1: Frequency pruning
            if Xa_UOL.get_sup() < self.min_sup_count:
                continue
            
            if Xa_UOL.get_uo() >= self.min_uo_ratio:
                self.huops.append((
                    Xa_UOL.itemset,
                    Xa_UOL.get_sup(),
                    Xa_UOL.get_uo()
                ))
            
            # Strategy 2: Upper-bound pruning
            phi_hat_Xa = self._calculate_upper_bound(Xa_UOL)
            
            if phi_hat_Xa >= self.min_uo_ratio:
                extenOfXa = []
                
                for j in range(i + 1, len(extensions_list)):
                    Xb_UOL = extensions_list[j]
                    
                    Xab_UOL = self._construct(prefix_UOL, Xa_UOL, Xb_UOL)
                    
                    # Strategy 3 & 4: Only keep if frequent
                    if Xab_UOL.get_sup() >= self.min_sup_count:
                        extenOfXa.append(Xab_UOL)
                
                if extenOfXa:
                    self._huop_search(Xa_UOL, extenOfXa)

    def _construct(self, X_UOL: Optional["UtilityOccupancyList"], Xa_UOL: "UtilityOccupancyList", Xb_UOL: "UtilityOccupancyList") -> "UtilityOccupancyList":
        """
        Construct a UO-list for the join of itemsets (Xa, Xb), adapting the construction algorithm for k=2 and k>2 cases.

        Args:
            X_UOL (UtilityOccupancyList | None): UO-list of the prefix (for k>2), or None for k=2.
            Xa_UOL (UtilityOccupancyList): UO-list for left itemset.
            Xb_UOL (UtilityOccupancyList): UO-list for right itemset.

        Returns:
            UtilityOccupancyList: The constructed UO-list for the joined itemset.

        Example:
            >>> Xab_UOL = miner._construct(X_UOL, Xa_UOL, Xb_UOL)
        """
        Xab_itemset = Xa_UOL.itemset + (Xb_UOL.itemset[-1],)
        Xab_UOL = UtilityOccupancyList(Xab_itemset)
        
        common_tids = Xa_UOL.entries.keys() & Xb_UOL.entries.keys()
        
        Xa_entries = Xa_UOL.entries
        Xb_entries = Xb_UOL.entries
        X_entries = X_UOL.entries if X_UOL else None
        
        for tid in common_tids:
            Ea = Xa_entries[tid]
            Eb = Xb_entries[tid]
            
            new_ruo = Eb['ruo']
            new_uo = 0.0
            
            if X_entries is None:
                # Case k=2
                new_uo = Ea['uo'] + Eb['uo']
            else:
                # Case k>2
                E = X_entries.get(tid)
                if E:
                    new_uo = Ea['uo'] + Eb['uo'] - E['uo']
                else:
                    new_uo = Ea['uo'] + Eb['uo']
            
            Xab_UOL.add_entry(tid, new_uo, new_ruo)
        
        Xab_UOL.finalize_metrics()
        return Xab_UOL

    def _calculate_upper_bound(self, uol: "UtilityOccupancyList") -> float:
        """
        Compute the utility occupancy upper bound (ϕ̂) for a given UO-list.

        Args:
            uol (UtilityOccupancyList): The list for which the bound is calculated.

        Returns:
            float: The upper bound ϕ̂, used to prune the search space.

        Example:
            >>> phi_hat = miner._calculate_upper_bound(uol)
        """
        V_occu = []
        for entry in uol.entries.values():
            potential_uo = entry['uo'] + entry['ruo']
            V_occu.append(potential_uo)
        
        V_occu.sort(reverse=True)
        
        k = self.min_sup_count
        
        if len(V_occu) < k:
            return 0.0
        
        sum_top_k = sum(V_occu[:k])
        
        return sum_top_k / k if k > 0 else 0.0

## Example Run (Toy Data)


In [6]:
# Small test with toy data
print("SMALL TEST: Verifying HUOPM implementation")

toy_transactions = {
    'T1': [('A', 2), ('B', 6), ('C', 1)],
    'T2': [('A', 1), ('D', 3), ('G', 2)],
    'T3': [('B', 1), ('D', 2), ('G', 3)],
    'T4': [('A', 1), ('C', 5), ('D', 2)],
    'T5': [('B', 4), ('D', 1), ('F', 1)],
}

toy_profits = {
    'A': 3.0, 'B': 1.0, 'C': 1.0,
    'D': 5.0, 'F': 3.0, 'G': 2.0,
}

miner = HUOPM(min_sup_ratio=0.4, min_uo_ratio=0.3)
results = miner.fit(toy_transactions, toy_profits)

print("\nResults:")
for pattern, support, uo in results[:10]:
    print(f"  {pattern}: sup={support}, uo={uo:.4f}")

print("\nSmall test completed successfully!")

SMALL TEST: Verifying HUOPM implementation
Starting HUOPM Algorithm...
  Total transactions: 5
  min_sup_count: 2 (alpha: 0.4)
  min_uo_ratio (beta): 0.3
Phase 1: Scanning database for support and TU...
  Frequent 1-itemsets (I*): 5
Phase 2: Building initial UO-lists...
  Initial UO-lists built: 5
Phase 3: Starting recursive HUOP mining...
Mining completed in 0.0010s
Total HUOPs discovered: 5

Results:
  ('C', 'A'): sup=2, uo=0.4915
  ('G', 'D'): sup=2, uo=0.9024
  ('A', 'D'): sup=2, uo=0.7702
  ('B', 'D'): sup=2, uo=0.6985
  ('D',): sup=4, uo=0.5606

Small test completed successfully!
