#FP-Growth


In [127]:
from __future__ import annotations
from collections import Counter, defaultdict
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, FrozenSet


# FP-tree node and tree classes
@dataclass
class FPNode:
    item: Optional[Any]
    count: int
    parent: Optional['FPNode'] = None
    children: Dict[Any, 'FPNode'] = field(default_factory=dict)
    node_link: Optional['FPNode'] = None

    def increment(self, n: int = 1) -> None:
        """Increase node count by n (used when merging counts)."""
        self.count += n


class FPTree:
    """Container for the FP-tree and its header table.
    header_table maps item -> (support_count, first_node)
    """

    def __init__(self) -> None:
        self.root = FPNode(None, 1)
        self.header_table: Dict[Any, Tuple[int, Optional[FPNode]]] = {}

    def add_transaction(self, transaction: List[Any], count: int = 1) -> None:
        """Insert a single (ordered) transaction into the tree with given count.
        Count is added to existing node counts instead of inserting multiple copies.
        """
        node = self.root
        for item in transaction:
            if item in node.children:
                # existing child: increment its count
                child = node.children[item]
                child.increment(count)
                node = child
            else:
                # create a new child node and link it in header table
                new_node = FPNode(item, count, parent=node)
                node.children[item] = new_node
                self._update_header(item, new_node, count)
                node = new_node

    def _update_header(self, item: Any, new_node: FPNode, count: int) -> None:
        """Update header table count and append node to node_link chain."""
        if item in self.header_table:
            total, first = self.header_table[item]
            # append new_node at end of node_link chain
            if first is None:
                self.header_table[item] = (total + count, new_node)
            else:
                last = first
                while last.node_link is not None:
                    last = last.node_link
                last.node_link = new_node
                self.header_table[item] = (total + count, first)
        else:
            self.header_table[item] = (count, new_node)

    def conditional_pattern_base(self, item: Any) -> List[Tuple[List[Any], int]]:
        """Return conditional pattern base as list of (prefix_path, count).
        Each prefix_path is the sequence of items from root (excluded) to
        the parent of a node with 'item'. Counts reflect how many times the
        path appears with 'item'.
        """
        base_patterns: List[Tuple[List[Any], int]] = []
        if item not in self.header_table:
            return base_patterns
        _, node = self.header_table[item]
        while node is not None:
            count = node.count
            path: List[Any] = []
            parent = node.parent
            # traverse to root, collecting items
            while parent is not None and parent.item is not None:
                path.append(parent.item)
                parent = parent.parent
            if path:
                base_patterns.append((list(reversed(path)), count))
            node = node.node_link

        return base_patterns

    def has_single_path(self) -> bool:
        """Check if the tree (excluding root) is a single path.
        If single path, frequent patterns can be generated combinatorially.
        """
        node = self.root
        return self._is_single_path_from(node)

    def _is_single_path_from(self, node: FPNode) -> bool:
        if len(node.children) > 1:
            return False
        if len(node.children) == 0:
            return True
        # continue down the single child
        child = next(iter(node.children.values()))
        return self._is_single_path_from(child)


# FP-Growth core functions
def _sort_items_by_frequency(items: Iterable[Any], freq: Dict[Any, int]) -> List[Any]:
    """Return items sorted by descending frequency, then by item for stability."""
    return sorted(items, key=lambda it: (-freq[it], it))


def build_fptree_from_paths(paths: List[Tuple[List[Any], int]], min_support: int) -> Tuple[FPTree, Dict[Any, int]]:
    """Build an FP-tree from conditional paths (each with a count).
    Paths are list of (ordered_items, count). This avoids expanding counts.
    Returns the built tree and the frequency dict (filtered by min_support).
    """
    # aggregate item counts from paths
    item_counts: Counter = Counter()
    for path, count in paths:
        for item in path:
            item_counts[item] += count
    # filter infrequent items
    freq_items = {item: cnt for item, cnt in item_counts.items() if cnt >= min_support}
    tree = FPTree()
    if not freq_items:
        return tree, {}
    # initialize header counts (node pointers set in add_transaction)
    for item, cnt in freq_items.items():
        tree.header_table[item] = (cnt, None)
    # insert filtered & ordered transactions
    for path, count in paths:
        ordered = [it for it in path if it in freq_items]
        if not ordered:
            continue
        ordered = _sort_items_by_frequency(ordered, freq_items)
        tree.add_transaction(ordered, count)
    return tree, freq_items


def build_fptree(transactions: List[List[Any]], min_support: int) -> Tuple[FPTree, Dict[Any, int]]:
    """Build FP-tree from raw transactions (each considered count=1).
    This is a convenience wrapper that converts transactions into paths
    with counts and calls build_fptree_from_paths.
    """
    paths_with_counts: List[Tuple[List[Any], int]] = [(trans, 1) for trans in transactions]
    return build_fptree_from_paths(paths_with_counts, min_support)


def mine_fp_tree(tree: FPTree, min_support: int) -> Dict[FrozenSet[Any], int]:
    """Mine the FP-tree and return frequent itemsets with their supports.
    This function returns a mapping: frozenset(itemset) -> support_count.
    """
    patterns: Dict[FrozenSet[Any], int] = {}
    # items sorted by ascending support (least frequent first) for mining
    items = sorted(tree.header_table.items(), key=lambda x: (x[1][0], x[0]))

    for item, (support, _) in items:
        # single item pattern
        patterns[frozenset([item])] = support
        # build conditional pattern base for item
        cpb = tree.conditional_pattern_base(item)
        # build conditional tree from cpb (paths with counts)
        cond_tree, cond_freq = build_fptree_from_paths(cpb, min_support)
        if not cond_freq:
            continue
        # if conditional tree has single path, generate combinations directly
        if cond_tree.has_single_path():
            # collect path items and their counts
            single_path: List[Tuple[Any, int]] = []
            node = cond_tree.root
            while node and node.children:
                # exactly one child per level
                child = next(iter(node.children.values()))
                single_path.append((child.item, child.count))
                node = child
            # generate all combinations of items in path with min count as support
            n = len(single_path)
            from itertools import combinations

            for r in range(1, n + 1):
                for combo in combinations(single_path, r):
                    combo_items = frozenset([it for it, _ in combo])
                    combo_support = min(cnt for _, cnt in combo)
                    new_pattern = frozenset(set(combo_items) | {item})
                    patterns[new_pattern] = max(patterns.get(new_pattern, 0), combo_support)
        else:
            # recursively mine conditional tree
            sub_patterns = mine_fp_tree(cond_tree, min_support)
            for pset, psup in sub_patterns.items():
                new_pattern = frozenset(set(pset) | {item})
                patterns[new_pattern] = max(patterns.get(new_pattern, 0), psup)

    return patterns


def fp_growth(transactions: List[List[Any]], min_support: int) -> Dict[FrozenSet[Any], int]:
    """Top-level FP-Growth function.
    Returns frequent itemsets -> support counts.
    """
    tree, freq_items = build_fptree(transactions, min_support)
    if not freq_items:
        return {}
    patterns = mine_fp_tree(tree, min_support)
    # ensure single-item supports are present and correct
    for it, cnt in freq_items.items():
        patterns.setdefault(frozenset([it]), cnt)
    return patterns


# Association rule generation
def generate_association_rules(patterns: Dict[FrozenSet[Any], int], min_confidence: float) -> List[Tuple[set, set, float, int]]:
    """Generate association rules from frequent itemsets.
    Returns list of tuples: (antecedent_set, consequent_set, confidence, support_count_of_whole).
    """
    rules: List[Tuple[set, set, float, int]] = []
    # helper to generate non-empty proper subsets
    def _subsets(s: List[Any]) -> Iterator[Tuple[Any, ...]]:
        from itertools import chain, combinations

        for r in range(1, len(s)):
            for combo in combinations(s, r):
                yield combo

    for itemset, sup in patterns.items():
        if len(itemset) < 2:
            continue
        items_list = list(itemset)
        for subset in _subsets(items_list):
            A = frozenset(subset)
            B = itemset - A
            if A not in patterns:
                continue
            support_A = patterns[A]
            confidence = sup / support_A
            if confidence >= min_confidence:
                rules.append((set(A), set(B), confidence, sup))
    # sort by confidence then support

    rules.sort(key=lambda x: (-x[2], -x[3]))
    return rules


# Simple demo when run as script
if __name__ == '__main__':
    # example transactions
    transactions = [
        ['f', 'a', 'c', 'd', 'g', 'i', 'm', 'p'],
        ['a', 'b', 'c', 'f', 'l', 'm', 'o'],
        ['b', 'f', 'h', 'j', 'o'],
        ['b', 'c', 'k', 's', 'p'],
        ['a', 'f', 'c', 'e', 'l', 'p', 'm', 'n'],
    ]

    min_support = 2
    patterns = fp_growth(transactions, min_support)
    print('Frequent patterns (itemset -> support):')
    for itset, sup in sorted(patterns.items(), key=lambda x: (-len(x[0]), -x[1], sorted(list(x[0])))):
        print(set(itset), '->', sup)

    rules = generate_association_rules(patterns, min_confidence=0.6)
    print('\nGenerated rules (A => B, confidence, support):')
    for A, B, conf, sup in rules:
        print(f'{A} => {B}, conf={conf:.2f}, sup={sup}')

Frequent patterns (itemset -> support):
{'f', 'm', 'l', 'a', 'c'} -> 2
{'f', 'm', 'p', 'a', 'c'} -> 2
{'a', 'f', 'm', 'c'} -> 3
{'a', 'f', 'l', 'c'} -> 2
{'a', 'f', 'c', 'p'} -> 2
{'a', 'm', 'c', 'l'} -> 2
{'a', 'm', 'c', 'p'} -> 2
{'a', 'f', 'm', 'l'} -> 2
{'a', 'f', 'm', 'p'} -> 2
{'f', 'm', 'c', 'l'} -> 2
{'f', 'm', 'c', 'p'} -> 2
{'a', 'f', 'c'} -> 3
{'a', 'm', 'c'} -> 3
{'a', 'f', 'm'} -> 3
{'f', 'm', 'c'} -> 3
{'a', 'l', 'c'} -> 2
{'a', 'c', 'p'} -> 2
{'a', 'f', 'l'} -> 2
{'a', 'f', 'p'} -> 2
{'a', 'm', 'l'} -> 2
{'a', 'm', 'p'} -> 2
{'o', 'f', 'b'} -> 2
{'f', 'l', 'c'} -> 2
{'f', 'c', 'p'} -> 2
{'m', 'c', 'l'} -> 2
{'m', 'c', 'p'} -> 2
{'f', 'm', 'l'} -> 2
{'f', 'm', 'p'} -> 2
{'f', 'b'} -> 4
{'a', 'c'} -> 3
{'a', 'f'} -> 3
{'a', 'm'} -> 3
{'b', 'c'} -> 3
{'f', 'c'} -> 3
{'m', 'c'} -> 3
{'c', 'p'} -> 3
{'f', 'm'} -> 3
{'a', 'l'} -> 2
{'a', 'p'} -> 2
{'o', 'b'} -> 2
{'l', 'c'} -> 2
{'f', 'l'} -> 2
{'o', 'f'} -> 2
{'f', 'p'} -> 2
{'m', 'l'} -> 2
{'m', 'p'} -> 2
{'b'} -> 6
{'f'} ->

# K-fold cross-validation

In [128]:
import numpy as np
from typing import Callable, Optional, Any, List


def k_fold_indices(n_samples: int, k: int = 5, shuffle: bool = True, random_state: Optional[int] = None) -> List[np.ndarray]:
    """Return list of k arrays with indices for each fold."""
    if k <= 1:
        raise ValueError("k must be >= 2")
    if n_samples < k:
        raise ValueError("n_samples must be >= k")

    rng = np.random.default_rng(random_state)
    indices = np.arange(n_samples)
    if shuffle:
        rng.shuffle(indices)

    base, remainder = divmod(n_samples, k)
    folds, start = [], 0
    for i in range(k):
        size = base + (1 if i < remainder else 0)
        folds.append(indices[start:start+size])
        start += size
    return folds


def _as_predictable(model_or_fn: Any) -> Any:
    """Wrap callable into object with .predict(X)."""
    if callable(model_or_fn):
        class _Wrapper:
            def __init__(self, fn):
                self._fn = fn
            def predict(self, X):
                return self._fn(X)
        return _Wrapper(model_or_fn)
    if hasattr(model_or_fn, 'predict'):
        return model_or_fn
    raise ValueError("Learner must return callable or object with .predict(X)")


def cross_validate(X: np.ndarray,
                   y: np.ndarray,
                   learner: Callable[[np.ndarray, np.ndarray], Any],
                   k: int = 5,
                   metric: Optional[Callable[[np.ndarray, np.ndarray], float]] = None,
                   shuffle: bool = True,
                   random_state: Optional[int] = None) -> dict:
    """Perform k-fold cross validation."""
    X, y = np.asarray(X), np.asarray(y)
    if X.shape[0] != y.shape[0]:
        raise ValueError("X and y must match in rows")

    folds = k_fold_indices(len(y), k, shuffle, random_state)

    if metric is None:
        metric = accuracy_score if np.issubdtype(y.dtype, np.integer) else mse_score

    scores = []
    for i in range(k):
        test_idx, train_idx = folds[i], np.hstack([folds[j] for j in range(k) if j != i])
        model = _as_predictable(learner(X[train_idx], y[train_idx]))
        preds = np.asarray(model.predict(X[test_idx]))
        scores.append(float(metric(y[test_idx], preds)))

    scores = np.array(scores)
    return {
        'fold_scores': scores,
        'mean_score': float(scores.mean()),
        'std_score': float(scores.std(ddof=0)),
        'n_folds': k
    }


# ---------- METRICS ----------

def accuracy_score(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    return (np.asarray(y_true) == np.asarray(y_pred)).mean()


def mse_score(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    yt, yp = np.asarray(y_true, float), np.asarray(y_pred, float)
    return ((yt - yp) ** 2).mean()


# ---------- LEARNERS ----------

def nearest_mean_classifier(X: np.ndarray, y: np.ndarray):
    X, y, classes = np.asarray(X), np.asarray(y), np.unique(y)
    means = {c: X[y == c].mean(axis=0) for c in classes}

    def predict(X_new):
        return np.array([min(classes, key=lambda c: np.linalg.norm(x - means[c])) for x in np.asarray(X_new)])

    return predict


def linear_regression_closed_form(X: np.ndarray, y: np.ndarray, intercept: bool = True):
    X, y = np.asarray(X, float), np.asarray(y, float)
    Xd = np.hstack([np.ones((X.shape[0], 1)), X]) if intercept else X
    theta = np.linalg.pinv(Xd).dot(y)

    def predict(X_new):
        Xn = np.asarray(X_new, float)
        if intercept:
            Xn = np.hstack([np.ones((Xn.shape[0], 1)), Xn])
        return Xn.dot(theta)

    return predict


# ---------- DEMO ----------
if __name__ == '__main__':
    rng = np.random.default_rng(0)

    # Classification
    Xc = np.vstack([
        rng.normal(0.0, 0.8, (50, 2)),
        rng.normal(3.0, 0.5, (50, 2)),
        rng.normal([0.0, 4.0], 0.6, (50, 2))
    ])
    yc = np.array([0]*50 + [1]*50 + [2]*50)
    print('Nearest-mean CV:', cross_validate(Xc, yc, nearest_mean_classifier, k=5, random_state=1))

    # Regression
    Xr = rng.normal(size=(200, 3))
    true_theta = np.array([1.5, -2.0, 0.5, 3.0])
    yr = np.column_stack([np.ones(Xr.shape[0]), Xr]).dot(true_theta) + rng.normal(0, 0.5, 200)
    print('Linear regression CV:', cross_validate(Xr, yr, lambda X, y: linear_regression_closed_form(X, y), k=4))

    # Edge case
    try:
        k_fold_indices(3, k=5)
    except ValueError as e:
        print('Expected error:', e)

    # Dummy sklearn-like model
    class DummyModel:
        def __init__(self, val): self.val = val
        def predict(self, X): return np.full((X.shape[0],), self.val)

    def learner_dummy(X, y): return DummyModel(int(np.round(y.mean())))
    Xs, ys = np.arange(20).reshape(10, 2), np.array([0, 1]*5)
    print('Dummy model CV:', cross_validate(Xs, ys, learner_dummy, k=5))

Nearest-mean CV: {'fold_scores': array([1., 1., 1., 1., 1.]), 'mean_score': 1.0, 'std_score': 0.0, 'n_folds': 5}
Linear regression CV: {'fold_scores': array([0.30688093, 0.33546426, 0.21518432, 0.25073411]), 'mean_score': 0.2770659051104247, 'std_score': 0.04696274412433855, 'n_folds': 4}
Expected error: n_samples must be >= k
Dummy model CV: {'fold_scores': array([0. , 0.5, 0.5, 0.5, 0. ]), 'mean_score': 0.3, 'std_score': 0.24494897427831783, 'n_folds': 5}
