In [0]:
import unittest
from copy import deepcopy

In [0]:
dataset =  [
    [["a"], ["a", "b", "c"], ["a", "c"], ["c"]],
    [["a"], ["c"], ["b", "c"]],
    [["a", "b"], ["d"], ["c"], ["b"], ["c"]],
    [["a"], ["c"], ["b"], ["c"]]
]

In [0]:
n = 0
for sequence in dataset:
    n += len(sequence)
n

16

In [0]:
# Apriori algorithm
def generate_itemsets_lvl1(dataset):
    single_itemsets_list = []
    for sequence in dataset:
        for event in sequence:
            for item in event:
                if single_itemsets_list.count(frozenset(item)) == 0:
                    single_itemsets_list.append(frozenset(item))
    single_itemsets_list.sort()
    return single_itemsets_list


def count_support(dataset, itemset):
    # need to rename to calculate support
    n = 0
    for sequence in dataset:
        n += len(sequence)

    support = 0
    for sequence in dataset:
        for event in sequence:
            if itemset.issubset(frozenset(event)):
                support += 1
    return support/n


def prune_itemsets(dataset, itemsets, minsup):
    frequent_itemsets = {}
    for itemset in itemsets:
        support = count_support(dataset, itemset)
        if support >= minsup:
            frequent_itemsets[itemset] = support
    return frequent_itemsets


def generate_superset_itemsets(itemsets):
    superset_itemlists_list = []
    for i, itemset in enumerate(itemsets[:-1]):
        for j, itemset2 in enumerate(itemsets[i+1:]):
            superset_itemset = itemset.union(itemset2)
            if superset_itemlists_list.count(superset_itemset) == 0:
                superset_itemlists_list.append(superset_itemset)
    superset_itemlists_list.sort()
    return superset_itemlists_list


def generate_all_frequent_itemsets(dataset, minsup):
    itemsets_lvl1 = generate_itemsets_lvl1(dataset)
    frequent_itemsets = frequent_itemsets_previous_lvl = prune_itemsets(dataset, itemsets_lvl1, minsup)

    while True:
        itemsets_next_lvl = generate_superset_itemsets(list(frequent_itemsets_previous_lvl.keys()))
        frequent_itemsets_next_lvl = prune_itemsets(dataset, itemsets_next_lvl, minsup)
        frequent_itemsets.update(frequent_itemsets_next_lvl)

        frequent_itemsets_previous_lvl = frequent_itemsets_next_lvl
        if len(frequent_itemsets_next_lvl) <= 1: 
            break
    
    return frequent_itemsets



class TestAprioriAlgorithm(unittest.TestCase):
    dataset =  [
        [["a"], ["a", "b", "c"], ["a", "c"], ["c"]],
        [["a"], ["c"], ["b", "c"]],
        [["a", "b"], ["d"], ["c"], ["b"], ["c"]],
        [["a"], ["c"], ["b"], ["c"]]
    ]

    def test_generate_itemsets_lvl1(self):
        expected_single_itemsets_list = [{'a'}, {'b'}, {'c'}, {'d'}]
        self.assertEqual(generate_itemsets_lvl1(dataset), expected_single_itemsets_list)

    def test_count_support(self):
        expected_count_support_list = [
            ({'a'}, 6/16), 
            ({'b'}, 5/16), 
            ({'c'}, 9/16),
            ({'d'}, 1/16)
        ]
        for itemset, expected_support in expected_count_support_list:
            self.assertEqual(count_support(dataset, itemset), expected_support)

    def test_prune_frequent_itemsets(self):
        single_itemsets = generate_itemsets_lvl1(dataset)
        case_1 = {frozenset({'a'}): 6/16, frozenset({'b'}): 5/16, frozenset({'c'}): 9/16}
        self.assertEqual(prune_itemsets(dataset, single_itemsets, 2/16), case_1)
        case_2 = {frozenset({'a'}): 6/16, frozenset({'c'}): 9/16}
        self.assertEqual(prune_itemsets(dataset, single_itemsets, 6/16), case_2)
        case_3 = {frozenset({'c'}): 9/16}
        self.assertEqual(prune_itemsets(dataset, single_itemsets, 9/16), case_3)
    
    def test_generate_superset_itemsets(self):
        single_itemsets = generate_itemsets_lvl1(dataset)
        frequent_itemsets_lvl1 = prune_itemsets(dataset, single_itemsets, 2/16)
        itemsets_lvl1 = list(frequent_itemsets_lvl1.keys())
        case_1 = [{'a', 'b'}, {'a', 'c'}, {'b', 'c'}]
        self.assertEqual(generate_superset_itemsets(itemsets_lvl1), case_1)

    def test_generate_all_frequent_itemsets(self):
        case1 = {
            frozenset({'a'}): 6/16, 
            frozenset({'b'}): 5/16, 
            frozenset({'c'}): 9/16, 
            frozenset({'a', 'b'}): 2/16, 
            frozenset({'a', 'c'}): 2/16, 
            frozenset({'b', 'c'}): 2/16
        }
        self.assertEqual(generate_all_frequent_itemsets(dataset, 2/16), case1)
        case2 = {
            frozenset({'a'}): 6/16, 
            frozenset({'b'}): 5/16, 
            frozenset({'c'}): 9/16
        }
        self.assertEqual(generate_all_frequent_itemsets(dataset, 3/16), case2)



test_apriori_algorithm = TestAprioriAlgorithm()
test_apriori_algorithm.test_generate_itemsets_lvl1()
test_apriori_algorithm.test_count_support()
test_apriori_algorithm.test_prune_frequent_itemsets()
test_apriori_algorithm.test_generate_superset_itemsets()
test_apriori_algorithm.test_generate_all_frequent_itemsets()

In [0]:
frequent_itemsets = generate_all_frequent_itemsets(dataset, 1/16)
frequent_itemsets

{frozenset({'a'}): 0.375,
 frozenset({'b'}): 0.3125,
 frozenset({'c'}): 0.5625,
 frozenset({'d'}): 0.0625,
 frozenset({'a', 'b'}): 0.125,
 frozenset({'a', 'c'}): 0.125,
 frozenset({'b', 'c'}): 0.125,
 frozenset({'a', 'b', 'c'}): 0.0625}

In [0]:
def generate_rules_with_single_rightset(frequent_itemsets):
    rules_with_single_rightset = []
    for itemset in frequent_itemsets.keys():
        if len(itemset) == 1:
            continue

        for singe_rightset in itemset:
            leftset = set(itemset)
            leftset.remove(singe_rightset)

            rules_with_single_rightset.append([frozenset(leftset), frozenset(singe_rightset)])
    return rules_with_single_rightset

In [0]:
def generate_subrules(leftset, rightset):
    if len(leftset) <= 1:
        return []

    subrules = []
    for item in leftset:
        new_leftset = deepcopy(leftset)
        new_leftset = set(new_leftset)
        new_leftset.remove(item)
        new_rightset = deepcopy(rightset)
        new_rightset = set(new_rightset)
        new_rightset = new_rightset.union(set([item]))
        subrules.append([frozenset(new_leftset), frozenset(new_rightset)])
    return subrules

In [0]:
def prune_rules(frequent_itemsets, minconf, rules):
    association_rules = {}
    for leftset, rightset in rules:
        union_set = frozenset(set().union(leftset, rightset))
        support = frequent_itemsets[union_set]
        confidence = support/frequent_itemsets[leftset]
        lift = (support/frequent_itemsets[leftset]) / frequent_itemsets[rightset]
        if confidence >= minconf:
            association_rules[(leftset, rightset)] = {
                'support': support, 
                'confidence': confidence, 
                'lift': lift
            }
    return association_rules

In [0]:
# def generate_all_association_rules(frequent_itemsets, minconf):
frequent_itemsets = generate_all_frequent_itemsets(dataset, 1/16)
minconf = 0.01
rules_with_single_rightset = generate_rules_with_single_rightset(frequent_itemsets)
association_rules = association_rules_previous_step = prune_rules(frequent_itemsets, minconf, rules_with_single_rightset)

for leftset, rightset in association_rules_previous_step.keys():
    print(leftset, rightset)
    subrules = generate_subrules(leftset, rightset)
    association_rules_next_step = prune_rules(frequent_itemsets, minconf, subrules)
#     itemsets_next_lvl = generate_subrules(list(frequent_itemsets_previous_lvl.keys()))
#     frequent_itemsets_next_lvl = prune_itemsets(dataset, itemsets_next_lvl, minsup)
#     frequent_itemsets.update(frequent_itemsets_next_lvl)

#     frequent_itemsets_previous_lvl = frequent_itemsets_next_lvl
#     if len(frequent_itemsets_next_lvl) <= 1: 

# for i in association_rules:
#     print(i)

frozenset({'a'}) frozenset({'b'})
frozenset({'b'}) frozenset({'a'})
frozenset({'c'}) frozenset({'a'})
frozenset({'a'}) frozenset({'c'})
frozenset({'c'}) frozenset({'b'})
frozenset({'b'}) frozenset({'c'})
frozenset({'a', 'c'}) frozenset({'b'})
frozenset({'b', 'c'}) frozenset({'a'})
frozenset({'b', 'a'}) frozenset({'c'})


In [0]:
association_rules

In [0]:
generate_rules_with_single_rightset(frequent_itemsets)

In [0]:
import itertools
import unittest
from copy import deepcopy
from collections import Counter

In [0]:
# https://www-users.cs.umn.edu/~kumar001/dmbook/ch6.pdf
class AprioriAlgorithm(object):
    def __init__(self, dataset, minsup = None, minconf = None):
        # Attributes
        self.all_frequent_itemsets = None
        self.all_rules = None
        self.dataset = dataset
        self.minconf = None
        self.minsup = None
        self.n = None # number of transaction

        # Call methods
        self._calculate_number_of_transactions()
        self.set_minsup(minsup)
        self.set_minconf(minconf)


    def set_minconf(self, minconf):
        self.minconf = minconf


    def set_minsup(self, minsup):
        self.minsup = minsup


    def _calculate_number_of_transactions(self):
        self.n = 0
        for sequence in self.dataset:
            self.n += len(sequence)
    
    
    def _calculate_support(self, itemset):
        itemset_cloned = frozenset(itemset)
        support = 0.0
        for sequence in self.dataset:
            for event in sequence:
                if itemset_cloned.issubset(frozenset(event)):
                    support += 1
        return support/self.n


    def generate_all_frequent_itemsets(self):
        self.all_frequent_itemsets = dict()

        frequent_single_itemsets = previous_frequent_itemsets = self._generate_frequent_single_itemsets()
        self.all_frequent_itemsets.update(frequent_single_itemsets)

        while True:
            previous_itemsets = list(previous_frequent_itemsets.keys())
            candidate_itemsets = self._generate_candidate_itemsets(previous_itemsets)
            frequent_itemsets = self._prune_frequent_itemsets(candidate_itemsets)

            if frequent_itemsets:
                self.all_frequent_itemsets.update(frequent_itemsets)
                previous_frequent_itemsets = frequent_itemsets
            else:
                break
        return self


    def generate_all_rules(self):
        self.all_rules = dict()

        if not self.all_frequent_itemsets:
            self.generate_all_frequent_itemsets()

        for frequent_itemset, support in self.all_frequent_itemsets.items():
            if len(frequent_itemset) <= 1:
                continue
            
            self._generate_rules(itemset=frequent_itemset)
        return self

    
    @staticmethod
    def _generate_candidate_itemsets(itemsets):
        candidate_itemsets = list()
        for i, itemset in enumerate(itemsets[:-1]):
            for itemset2 in itemsets[i+1:]:
                itemset_cloned = list(deepcopy(itemset))
                itemset_cloned.sort()
                itemset2_cloned = list(deepcopy(itemset2))
                itemset2_cloned.sort()
                if itemset_cloned[:-1] == itemset2_cloned[:-1]:
                    candidate_itemset = set().union(itemset_cloned, itemset2_cloned)
                    candidate_itemsets.append(frozenset(candidate_itemset))
        return candidate_itemsets


    def _generate_frequent_single_itemsets(self):
        single_itemsets = set()
        for sequence in self.dataset:
            for event in sequence:
                for item in event:
                    single_itemsets.add(frozenset(item))
        frequent_single_itemsets = self._prune_frequent_itemsets(single_itemsets)
        return frequent_single_itemsets


    def _generate_rules(self, itemset, previous_consequents=None):
        if previous_consequents:
            consequents = self._generate_candidate_itemsets(previous_consequents)
        else:
            consequents = list(deepcopy(itemset))

        if consequents and len(itemset) == len(consequents[0]):
            return

        for consequent in deepcopy(consequents):
            antecedent =  itemset.difference(consequent)
            confident = self.all_frequent_itemsets[frozenset(itemset)] / self.all_frequent_itemsets[frozenset(antecedent)]
            if confident >= self.minconf:
                self.all_rules[(frozenset(antecedent), frozenset(consequent))] = confident
            else:
                consequents.remove(consequent)
        
        if consequents and len(itemset) > len(consequents[0]) + 1:
            self._generate_rules(itemset, previous_consequents = consequents)
        

    def _prune_frequent_itemsets(self, itemsets):
        frequent_itemsets = dict()
        for itemset in itemsets:
            itemset_cloned = frozenset(itemset)
            support = self._calculate_support(itemset_cloned)
            if support >= self.minsup:
                frequent_itemsets[itemset_cloned] = support
        return frequent_itemsets
        



class TestAprioriAlgorithm(unittest.TestCase):
    dataset =  [
        [["a"], ["a", "b", "c"], ["a", "c"], ["c"]],
        [["a"], ["c"], ["b", "c"]],
        [["a", "b"], ["d"], ["c"], ["b"], ["c"]],
        [["a"], ["c"], ["b"], ["c"]]
    ]

    apriori_algorithm_case_1 = AprioriAlgorithm(dataset = dataset, minsup = 2/16, minconf = 0.1)
    apriori_algorithm_case_2 = AprioriAlgorithm(dataset = dataset, minsup = 3/16, minconf = 0.2)
    apriori_algorithm_case_3 = AprioriAlgorithm(dataset = dataset, minsup = 6/16, minconf = 0.3)


    def test__calculate_number_of_transactions(self):
        expected_n = 16
        self.assertEqual(expected_n, self.apriori_algorithm_case_1.n)


    def test__calculate_support(self):
        self.assertEqual(self.apriori_algorithm_case_1._calculate_support(['a']), 6/16)
        self.assertEqual(self.apriori_algorithm_case_1._calculate_support(['b']), 5/16)
        self.assertEqual(self.apriori_algorithm_case_1._calculate_support(['c']), 9/16)
        self.assertEqual(self.apriori_algorithm_case_1._calculate_support(['d']), 1/16)
        self.assertEqual(self.apriori_algorithm_case_1._calculate_support(['a','b']), 2/16)
        self.assertEqual(self.apriori_algorithm_case_1._calculate_support(['a','c']), 2/16)
        self.assertEqual(self.apriori_algorithm_case_1._calculate_support(['a','b','c']), 1/16)
        self.assertEqual(self.apriori_algorithm_case_1._calculate_support(['a','b','c','d']), 0/16)


    def test__prune_frequent_itemsets(self):
        self.assertEqual(
            self.apriori_algorithm_case_1._prune_frequent_itemsets([['a'],['b'],['c'],['d']]), 
            {
                frozenset({'a'}): 0.375,
                frozenset({'b'}): 0.3125,
                frozenset({'c'}): 0.5625
            }
        )
        self.assertEqual(
            self.apriori_algorithm_case_2._prune_frequent_itemsets([['a'],['b'],['c'],['d']]), 
            {
                frozenset({'a'}): 0.375,
                frozenset({'b'}): 0.3125,
                frozenset({'c'}): 0.5625
            }
        )
        self.assertEqual(
            self.apriori_algorithm_case_3._prune_frequent_itemsets([['a'],['b'],['c'],['d']]), 
            {
                frozenset({'a'}): 0.375,
                frozenset({'c'}): 0.5625
            }
        )       


    def test__generate_frequent_single_itemsets(self):
        self.assertEqual(
            self.apriori_algorithm_case_1._generate_frequent_single_itemsets(), 
            {
                frozenset({'a'}): 0.375,
                frozenset({'b'}): 0.3125,
                frozenset({'c'}): 0.5625
            }
        )
        self.assertEqual(
            self.apriori_algorithm_case_2._generate_frequent_single_itemsets(), 
            {
                frozenset({'a'}): 0.375,
                frozenset({'b'}): 0.3125,
                frozenset({'c'}): 0.5625
            }
        )
        self.assertEqual(
            self.apriori_algorithm_case_3._generate_frequent_single_itemsets(), 
            {
                frozenset({'a'}): 0.375,
                frozenset({'c'}): 0.5625
            }
        )


    def test__generate_candidate_itemsets(self):
        candidate_itemsets = self.apriori_algorithm_case_1._generate_candidate_itemsets([frozenset({'a'}), frozenset({'c'}), frozenset({'b'})])
        self.assertEqual(
            Counter(candidate_itemsets), 
            Counter([frozenset({'a', 'b'}), frozenset({'a', 'c'}), frozenset({'c', 'b'})])
        )

        candidate_itemsets = self.apriori_algorithm_case_1._generate_candidate_itemsets(
            [frozenset({'a', 'b'}), frozenset({'a', 'c'}), frozenset({'c', 'b'})]
        )
        self.assertEqual(
            Counter(candidate_itemsets), 
            Counter([frozenset({'c', 'b', 'a'})])
        )

        candidate_itemsets = self.apriori_algorithm_case_1._generate_candidate_itemsets([['a'], ['b'], ['c']])
        self.assertEqual(
            Counter(candidate_itemsets), 
            Counter([frozenset({'a', 'b'}), frozenset({'a', 'c'}), frozenset({'c', 'b'})])
        )

        candidate_itemsets = self.apriori_algorithm_case_1._generate_candidate_itemsets(
            [['a','b'], ['b','c'], ['a','c']]
        )
        self.assertEqual(
            Counter(candidate_itemsets), 
            Counter([frozenset({'c', 'b', 'a'})])
        )

        
    def test_generate_all_frequent_itemsets(self):
        self.apriori_algorithm_case_1.generate_all_frequent_itemsets()
        self.assertEqual(
            self.apriori_algorithm_case_1.all_frequent_itemsets,
            {
                frozenset({'c'}): 0.5625,
                frozenset({'b'}): 0.3125,
                frozenset({'a'}): 0.375,
                frozenset({'b', 'c'}): 0.125,
                frozenset({'a', 'c'}): 0.125,
                frozenset({'a', 'b'}): 0.125
            }
        )

        self.apriori_algorithm_case_2.generate_all_frequent_itemsets()
        self.assertEqual(
            self.apriori_algorithm_case_2.all_frequent_itemsets,
            {
                frozenset({'c'}): 0.5625, 
                frozenset({'b'}): 0.3125, 
                frozenset({'a'}): 0.375
            }
        )

        self.apriori_algorithm_case_3.generate_all_frequent_itemsets()
        self.assertEqual(
            self.apriori_algorithm_case_3.all_frequent_itemsets,
            {
                frozenset({'c'}): 0.5625, 
                frozenset({'a'}): 0.375
            }
        )


    def test_generate_all_rules(self):
        self.apriori_algorithm_case_1.generate_all_rules()
        self.assertEqual(
            self.apriori_algorithm_case_1.all_rules,
            {
                (frozenset({'c'}), frozenset({'b'})): 2/9,
                (frozenset({'b'}), frozenset({'c'})): 0.4,
                (frozenset({'a'}), frozenset({'b'})): 3/9,
                (frozenset({'b'}), frozenset({'a'})): 0.4,
                (frozenset({'c'}), frozenset({'a'})): 2/9,
                (frozenset({'a'}), frozenset({'c'})): 3/9
            }
        )
        
        self.apriori_algorithm_case_1a = deepcopy(self.apriori_algorithm_case_1)
        self.apriori_algorithm_case_1a.set_minconf(0.3)
        self.apriori_algorithm_case_1a.generate_all_frequent_itemsets().generate_all_rules()
        self.assertEqual(
            self.apriori_algorithm_case_1a.all_rules,
            {
                (frozenset({'b'}), frozenset({'c'})): 0.4,
                (frozenset({'a'}), frozenset({'b'})): 3/9,
                (frozenset({'b'}), frozenset({'a'})): 0.4,
                (frozenset({'a'}), frozenset({'c'})): 3/9
            }
        )
        
        self.apriori_algorithm_case_1b = deepcopy(self.apriori_algorithm_case_1)
        self.apriori_algorithm_case_1b.set_minconf(0.4)
        self.apriori_algorithm_case_1b.generate_all_frequent_itemsets().generate_all_rules()
        self.assertEqual(
            self.apriori_algorithm_case_1b.all_rules,
            {
                (frozenset({'b'}), frozenset({'c'})): 0.4,
                (frozenset({'b'}), frozenset({'a'})): 0.4
            }
        )


test_apriori_algorithm = TestAprioriAlgorithm()
test_apriori_algorithm.test__calculate_number_of_transactions()
test_apriori_algorithm.test__calculate_support()
test_apriori_algorithm.test__prune_frequent_itemsets()
test_apriori_algorithm.test__generate_frequent_single_itemsets()
test_apriori_algorithm.test__generate_candidate_itemsets()
test_apriori_algorithm.test_generate_all_frequent_itemsets()
test_apriori_algorithm.test_generate_all_rules()

In [65]:
dataset =  [
    [["a"], ["a", "b", "c"], ["a", "c"], ["c"]],
    [["a"], ["c"], ["b", "c"]],
    [["a", "b"], ["d"], ["c"], ["b"], ["c"]],
    [["a"], ["c"], ["b"], ["c"]]
]
apriori_algorithm = AprioriAlgorithm(dataset, minsup = 2/16, minconf = 0.2)
apriori_algorithm.generate_all_frequent_itemsets()
apriori_algorithm.all_frequent_itemsets
apriori_algorithm.generate_all_rules()
apriori_algorithm.all_rules

{(frozenset({'c'}), frozenset({'b'})): 0.2222222222222222,
 (frozenset({'b'}), frozenset({'c'})): 0.4,
 (frozenset({'a'}), frozenset({'b'})): 0.3333333333333333,
 (frozenset({'b'}), frozenset({'a'})): 0.4,
 (frozenset({'c'}), frozenset({'a'})): 0.2222222222222222,
 (frozenset({'a'}), frozenset({'c'})): 0.3333333333333333}