In [163]:
import pandas as pd
from itertools import combinations

from functools import cmp_to_key

In [170]:
@cmp_to_key
def _itemset_str_comparator(a:str, b: str):
        if len(a) < len(b):
            return -1
        if len(a) > len(b):
            return 1

        if a < b:
            return -1
        if a > b:
            return 1
        return 0

In [180]:
class AprioriSolver:
    def __init__(self, data: list, minsup: int):
        self.data = [set(str(x)) for x in data]
        self.minsup = minsup
        self._unique_attributes = set.union(*self.data)

    def _support(self, itemset: set):
        res = 0
        for transaction in self.data:
            if itemset.issubset(transaction):
                res += 1
        return res

    def _freq_itemsets_bruteforce(self, size=1):
        res = []
        for combo in combinations(self._unique_attributes, size):
            itemset = set(combo)
            if self._support(itemset) >= self.minsup:
                res.append(itemset)

        return res

    def apriori_gen(self, L_k1, k):
        C_k = []
        for p in L_k1:
            for q in L_k1:
                if k > 2 and list(p)[:k-2] != list(q)[:k-2]:
                    continue
                if k > 1 and list(p)[k-2] >= list(q)[k-2]:
                    continue
                C_k.append(set.union(p, q))

        bad_cs = []
        for c in C_k:
            for item in c:
                subset = c.difference({item})
                if subset not in L_k1:
                    bad_cs.append(c)

        for bad_c in bad_cs:
            C_k.remove(bad_c)

        return C_k

    def apriori(self, verbose=False):
        L = [None, self._freq_itemsets_bruteforce()]
        C = [None, [{x} for x in self._unique_attributes]]
        k = 2
        while len(L[k-1]) > 0:
            C_k = self.apriori_gen(L[k-1], k)
            L_k = []

            for c in C_k:
                if self._support(c) >= self.minsup:
                    L_k.append(c)

            L.append(L_k)
            C.append(C_k)

            k += 1

        self.Ls = L
        self.Cs = C

    def print_report(self):
        print("Main Apriori Process:")
        for i in range(1, len(self.Ls)):
            C_dict, L_dict = dict(), dict()
            for c in self.Cs[i]:
                C_dict["".join(sorted(c))] = self._support(c)
            for l in self.Ls[i]:
                L_dict["".join(sorted(l))] = self._support(l)

            keys = list(sorted(C_dict.keys()))
            C_dict = {key: C_dict[key] for key in keys}

            keys = list(sorted(L_dict.keys()))
            L_dict = {key: L_dict[key] for key in keys}

            print(f"C{i}: {C_dict}")
            print(f"L{i}: {L_dict}")
            print("=============")

    def _format_itemset_list_as_dict(self, l: list):
        res_dict = dict()
        for c in l:
            res_dict["".join(sorted(c))] = self._support(c)

        keys = list(sorted(res_dict.keys(), key=_itemset_str_comparator))
        res_dict = {key: res_dict[key] for key in keys}
        return res_dict

    def _is_superset_in_list(self, s: set, l: list):
        for elem in l:
            if elem.issuperset(s):
                return True
        return False

    def get_maximal_frequent(self):
        result = []
        for i in range(1, len(self.Ls)-1):
            for itemset in self.Ls[i]:
                if not self._is_superset_in_list(itemset, self.Ls[i+1]):
                    result.append(itemset)
        result.extend(self.Ls[-1])
        result = self._format_itemset_list_as_dict(result)
        return result
        

    def get_closed_frequent(self):
        result = []
        for i in range(1, len(self.Ls)-1):
            for itemset in self.Ls[i]:
                good = True
                for upper in self.Ls[i+1]:
                    if upper.issuperset(itemset) and self._support(itemset) == self._support(upper):
                        good = False
                        break
                if good:
                    result.append(itemset)
        result.extend(self.Ls[-1])
        result = self._format_itemset_list_as_dict(result)
        return result

In [182]:
data = ['ABDE', 'BCE', 'ABDE', 'ABCE', 'ABCDE', 'BCD']
apriorisolver = AprioriSolver(data, minsup=2)
apriorisolver.apriori()
apriorisolver.print_report()

Main Apriori Process:
C1: {'A': 4, 'B': 6, 'C': 4, 'D': 4, 'E': 5}
L1: {'A': 4, 'B': 6, 'C': 4, 'D': 4, 'E': 5}
C2: {'AB': 4, 'AC': 2, 'AD': 3, 'AE': 4, 'BC': 4, 'BD': 4, 'BE': 5, 'CD': 2, 'CE': 3, 'DE': 3}
L2: {'AB': 4, 'AC': 2, 'AD': 3, 'AE': 4, 'BC': 4, 'BD': 4, 'BE': 5, 'CD': 2, 'CE': 3, 'DE': 3}
C3: {'ABC': 2, 'ABD': 3, 'ABE': 4, 'ACD': 1, 'ACE': 2, 'ADE': 3, 'BCD': 2, 'BCE': 3, 'BDE': 3, 'CDE': 1}
L3: {'ABC': 2, 'ABD': 3, 'ABE': 4, 'ACE': 2, 'ADE': 3, 'BCD': 2, 'BCE': 3, 'BDE': 3}
C4: {'ABCE': 2, 'ABDE': 3}
L4: {'ABCE': 2, 'ABDE': 3}
C5: {}
L5: {}


In [179]:
apriorisolver.get_closed_frequent()

{'B': 6,
 'BC': 4,
 'BD': 4,
 'BE': 5,
 'ABE': 4,
 'BCD': 2,
 'BCE': 3,
 'ABCE': 2,
 'ABDE': 3}

In [178]:
apriorisolver.get_maximal_frequent()

{'BCD': 2, 'ABCE': 2, 'ABDE': 3}