In [1]:
from __future__ import annotations
import re
from dataclasses import dataclass, field
from itertools import combinations
from typing import TypeVar, Dict, FrozenSet, Optional, List, Set
from copy import deepcopy
import matplotlib.pyplot as plt
from config import DATA, MIN_SUP, MIN_CONF

In [4]:
# Define some type.
TID = TypeVar(int)
Item = TypeVar(int)
ItemSet = TypeVar(Set[Item])
ItemSets = TypeVar(List[ItemSet])

def get_data(path: Path = DATA.ibm_test):
    r"""Get data.

    Parameter
    =========
    path: Path
        The path to data file.

    Returns
    =======
    ori_itemsets: ItemSets
        The original itemsets.
    transactions: List[ItemSet]
        The transactions.
    """
    t_dict: Dict[TID, ItemSets] = dict()
    ori_itemsets: ItemSets = set()

    # Formated data.
    for line in open(path):
        _, _, tid, item, _ = re.split('[ \t\n]+', line)
        if tid in t_dict:
            t_dict[tid].add(item)
        else:
            t_dict[tid] = set(item)
        ori_itemsets.add(frozenset({item}))
    return ori_itemsets, t_dict.values()


def support(itemset: FrozenSet[Item], transactions: List[FrozenSet[Item]]):
    r"""Compute the target itemset's support value.

    Parameters
    ==========
    itemset: ItemSet
        The target itemset.
    transactions: List[ItemSet]
        The transaction lists.
    
    Return
    ======
    float
        The support value.
    """
    count = 0
    for transaction in transactions:
        if itemset.issubset(transaction): count += 1
    return count / len(transactions)


def compute_supports(itemsets: ItemSets, transactions: List[ItemSet]):
    r"""Compute the itemset's support value in transactions.
    
    Parameters
    ==========
    itemsets: ItemSets
        The target itemsets.
    transactions: List[ItemSet]
        The transaction lists.
    
    Return
    ======
    Dict[ItemSet, float]
        A dict of itemsets map to there support value.
    """
    t_size = len(transactions)
    return {itemset: support(itemset, transactions) for itemset in itemsets}


def apriori(itemsets: Set[Item],  transactions: List[Set[Item]], k: int):
    r"""Apriori Algorithm.
    
    Parameters
    ==========
    itemsets: ItemSets
        The target itemsets.
    transactions: List[ItemSet]
        The transaction lists.
    
    Returns
    =======
    L_1: dict
        The itemset occur in transctions times mapping.
    I_2: map
        The combinations of next itemsets.
    """
    t_size = len(transactions)
    # Count itemset occur in all transaction times.
    C = compute_supports(itemsets, transactions)                         # (Itemset, count)
    # Filter out less than MIN_SUP.
    L = dict(filter(lambda x: x[1] >= MIN_SUP, C.items()))  # (Itemset, count)

    try:
        L_union = frozenset(frozenset.union(*deepcopy(L).keys()))
    except TypeError as e:
        L_union = set()
    # Generate combinations in forzenset type.
    I = map(frozenset, combinations(L_union, k))                # Itemsets
    return L, I


def get_all_itemsets(ori_itemsets: ItemSet, transactions: List[ItemSet]):
    r"""Get all itemsets.
    
    Parameters
    ==========
    ori_itemsets: ItemSet
        The original itemset.
    transactions: List[ItemSet]
        The transaction lists.
    
    Returns
    =======
    result: Dict[ItemSet, float]
        A dict of all itemsets mapping to there support value.
    """
    result = dict()
    for k in range(2, 10):
        I = ori_itemsets if k == 2 else I
        L, I = apriori(I, transactions, k)
        result.update(L)
        if 0 == len(list(L)) <= 1:
            return result
    return result

ori_itemsets, transactions = get_data(DATA.test)
all_itemsets = get_all_itemsets(ori_itemsets, transactions)
all_itemsets

{frozenset({'628'}): 0.21618357487922704,
 frozenset({'523'}): 0.41183574879227053,
 frozenset({'451'}): 0.4033816425120773,
 frozenset({'3'}): 0.5531400966183575,
 frozenset({'487'}): 0.40458937198067635,
 frozenset({'111'}): 0.39371980676328505,
 frozenset({'470'}): 0.21014492753623187,
 frozenset({'488'}): 0.4033816425120773,
 frozenset({'111', '523'}): 0.3852657004830918,
 frozenset({'488', '523'}): 0.39371980676328505,
 frozenset({'3', '523'}): 0.4033816425120773,
 frozenset({'451', '523'}): 0.39371980676328505,
 frozenset({'487', '523'}): 0.391304347826087,
 frozenset({'111', '488'}): 0.3780193236714976,
 frozenset({'111', '3'}): 0.392512077294686,
 frozenset({'111', '451'}): 0.3804347826086957,
 frozenset({'111', '487'}): 0.37922705314009664,
 frozenset({'3', '488'}): 0.39492753623188404,
 frozenset({'451', '488'}): 0.3852657004830918,
 frozenset({'487', '488'}): 0.38405797101449274,
 frozenset({'3', '451'}): 0.39371980676328505,
 frozenset({'3', '487'}): 0.392512077294686,
 fro

In [5]:
def rules_from_item(itemset: ItemSet):
    r"""Generate associations.

    Parameter
    =========
    itemset: ItemSet
        The target itemset.

    Return
    ======
    List[Tuple[ItemSet]]
        The associations.
    """
    #定义规则左侧的列表
    left = []
    for i in range(1, len(itemset)):
        left.extend(combinations(itemset, i))
    return [(frozenset(l), frozenset(itemset.difference(l))) for l in left]


def get_association_rules(itemsets: ItemSets, min_conf: float):
    r"""Get all association rules for itemsets.

    Parameters
    ==========
    itemsets: ItemSets
        The traget itemsets.
    min_conf: float
        The minimum confidence threshole.

    Return
    ======
    rules: List
        All the association rules.
    """
    rules = []
    for itemset in itemsets:
        if len(itemset) > 1:
            rules.extend(rules_from_item(itemset))

    result = []
    for left, right in rules:
        sup  = itemsets[left | right]
        conf = sup  / itemsets[left]
        lift = conf / itemsets[right]
        if conf >= min_conf:
            result.append(f"| {str(set(left)):10s}->{str(set(right)):10s} | sup: {sup:.3f} | conf: {conf:.3f} | lift: {lift:.3f} |")
    return result

get_association_rules(all_itemsets, MIN_CONF)

["| {'523'}   ->{'111'}    | sup: 0.385 | conf: 0.935 | lift: 2.376 |",
 "| {'111'}   ->{'523'}    | sup: 0.385 | conf: 0.979 | lift: 2.376 |",
 "| {'523'}   ->{'488'}    | sup: 0.394 | conf: 0.956 | lift: 2.370 |",
 "| {'488'}   ->{'523'}    | sup: 0.394 | conf: 0.976 | lift: 2.370 |",
 "| {'523'}   ->{'3'}      | sup: 0.403 | conf: 0.979 | lift: 1.771 |",
 "| {'3'}     ->{'523'}    | sup: 0.403 | conf: 0.729 | lift: 1.771 |",
 "| {'523'}   ->{'451'}    | sup: 0.394 | conf: 0.956 | lift: 2.370 |",
 "| {'451'}   ->{'523'}    | sup: 0.394 | conf: 0.976 | lift: 2.370 |",
 "| {'523'}   ->{'487'}    | sup: 0.391 | conf: 0.950 | lift: 2.348 |",
 "| {'487'}   ->{'523'}    | sup: 0.391 | conf: 0.967 | lift: 2.348 |",
 "| {'488'}   ->{'111'}    | sup: 0.378 | conf: 0.937 | lift: 2.380 |",
 "| {'111'}   ->{'488'}    | sup: 0.378 | conf: 0.960 | lift: 2.380 |",
 "| {'3'}     ->{'111'}    | sup: 0.393 | conf: 0.710 | lift: 1.802 |",
 "| {'111'}   ->{'3'}      | sup: 0.393 | conf: 0.997 | lift: 1.

In [None]:
# c = Counter(original_iset, transactions)
# c = sorted(c.items(), key=lambda x: x[1])
# k, v = zip(*c)
# k = list(frozenset().union(*k))
# main()

# import numpy as np
# std = np.std(v)
# mean = np.mean(v)

# plt.style.use('fast')
# fig, ax = plt.subplots(figsize=(20, 10))
# plt.bar(k, v)
# plt.ylabel('frequence (times)')
# plt.xlabel('item tag')
# plt.title('item - frequence')

# ax.text(0, mean+12, f"μ\n{mean:.0f}", color="red")
# ax.axhline(y=mean, color='r')

# ax.text(0, mean+std+12, f"μ+σ\n{mean+std:.0f}", color="blue")
# ax.axhline(y=mean+std, color='b', marker='.')