In [1]:
from __future__ import annotations
from collections import OrderedDict
from ordered_set import OrderedSet
from functools import cmp_to_key
from typing import Optional
from scipy.stats import chi2_contingency
import numpy as np
import csv

In [2]:
MINSUP = 10
MINCONF = 0.65
MAXCOVERAGE = 5
CHISQTHRESHOLD = 0
rules = []

In [3]:
class DataEntry:
    def __init__(self, items: list[str], label: dict[str:int], count: int = 1):
        self.items = items
        self.label = label
        self.count = count

    def display(self):
        itemStr = ''
        for item in self.items:
            itemStr += str(item) + ' '
        print(itemStr,self.label)

class RuleEntry:
    def __init__(self, items: set[str], label: str, support: int, confidence: float):
        self.items = items
        self.label = label
        self.support = support
        self.confidence = confidence
    def display(self):
        itemStr = ''
        for item in self.items:
            itemStr += str(item) + ' '
        print(itemStr, '-> ', self.label, self.support, self.confidence)


class TreeNode:
    """
    The tree node class as the element
    """

    def __init__(self, name: str, occurence: int, parent: TreeNode | None, label=None):

        if label is None:
            label = {}
        self.name = name
        self.count = occurence
        self.next = None
        self.parent = parent
        # dict[str:TreeNode]
        self.children = {}
        # dict[str:int]
        self.labels = label

    def inc(self, occurence):
        self.count += occurence

    def display(self, depth=1):
        # print("node ", self.name, " . display:")
        if not self.labels:
            print('  ' * depth, self.name + ':' + str(self.count))
        else:
            print('  ' * depth, self.name + ':' + str(self.count) + ' ' + str(self.labels))
        for child in self.children.values():
            child.display(depth + 1)

    def _insertChild(self, child: TreeNode) -> None:
        """
        the method to insert a child for current tree node.
        may overwrite the existing child if the name of child already exists.
        :param child: a treenode object to be inserted as child.
        :return: None
        """
        if isinstance(child, TreeNode):
            self.children[child.name] = child
            # update parent also as find prefix path needs parent
            child.parent = self
        else:
            print("insertChild expects a child object, receive ", type(child), " instead")

    def removeChild(self, childName: str):
        if childName in self.children:
            self.children.pop(childName)
        else:
            print("unexpected child name get during _removeChild: ", childName)

    # this is purely for testing of my interface, can choose to delete or adopt later
    def insertRecord_t(self, record: DataEntry, header_table) -> None:
        """
        the method to insert a record to this tree node.
        NOTICE THAT this should be only called with root nodes, otherwise tree structure would be spoilt.
        :param header_table: the header table possibly to be updated.
        :param record: the dataentry object to be inserted.
        :return: None.
        """
        # print("> enter insert record: record: ", record.items, "-> ", record.label, 'count: ', record.count)
        if self.parent:
            raise AssertionError("the node to be called with this function must be a root node.")
        curNode = self
        deviateFlag = False
        """
        newly added to support root insertion
        """
        if not record.items:
            unifyTwoLabelDict(self.labels, record.label)
            print('updating root count:', self.count, record.count)
            self.count = self.count + record.count
            return

        for index, item in enumerate(record.items):
            if deviateFlag or item not in curNode.children:
                deviateFlag = True
                node = TreeNode(item, record.count, self)
                curNode._insertChild(node)
                if not header_table[item][1]:
                    header_table[item][1] = node
                else:
                    updateHeader(header_table[item][1], node)
            else:
                curNode.children[item].count += record.count
            curNode = curNode.children[item]
        # would be in place for curNode.labels.
        unifyTwoLabelDict(curNode.labels, record.label)
        self.count += record.count

    def _deleteSubTree(self, childName: str) -> None:
        """
        the method to delete a subtree rooted by given node.
        :param childName: the name to be deleted.
        :return: None
        """
        # since we delete a subtree, can safely remove a whole by deleting that child node
        if childName in self.children:
            self.children.pop(childName)
        else:
            print("unexpected childName: ", childName)

    def findRootPath(self) -> list[str]:
        """
        this function finds the path to root of FP tree given a tree node.
        :return: the path to root, including root but exluding given node.
        """
        if not self.parent:
            return []
        path = []
        cur_element = self
        while cur_element.parent.name and cur_element.parent.name != 'Root':
            cur_element = cur_element.parent
            path.append(cur_element.name)
        path.reverse()
        return path

    def findAllSucessorPath(self):
        """
        the method which find all the paths to subtree leaves.
        :return: list[list[TreeNode]]
        """
        paths = []
        if not self.children:
            return [[self]]
        for key in self.children.keys():
            tempResult = self.children[key].findAllSucessorPath()
            if tempResult is not None:
                for path in tempResult:
                    paths += (path + [self])
        return paths

    # def retrieveAllRecordsPassing(self):
    #     rootPath = self.findRootPath()
    #     allSucessorPaths = self.findAllSucessorPath()
    #     result = []
    #     fullPaths = []
    #     for path in allSucessorPaths:
    #         fullPaths.append(path + rootPath)
    #
    #     return

    def retrieveRecordingFromLeaf(self) -> DataEntry | None:
        """
        the method to retrieve a data record from the given leaf.
        must be called with a leaf node, otherwise throw runtime error.
        :return: a single record retrieved.
        """
        print("node ", self.name, " retrieved record: ", self.labels)
        rootPath = self.findRootPath()
        # if not rootPath:
        #     return None
        if self.labels:
            # tempRecord = DataEntry([self] + rootPath, self.labels, self.count)
            # no need self node
            tempRecord = DataEntry(rootPath, self.labels, self.count)
        else:
            raise RuntimeError("retrieveAllRecordingsFromLeaf should not be called with a non-storing node.")
        return tempRecord

    def retrieveRulesFromLeaf(self, prefix: set[str]) -> RuleEntry | None:
        """
        the method to retrieve all the rules from the dictionary of this node.
        must be called with a storage node, otherwise throw run time error.
        :param prefix: the prefix that produced from projections of the tree.
        used as convenience for tree manipulation and rule mining.
        :return: the full list of rules from this node, or None instead (for root node).
        """
        rootPath = self.findRootPath()
        print("node ", self.name, " retrieved rules: ", 'labels', self.labels, "with prefix: ", prefix, "and root path: ",
              rootPath)
        # if not rootPath:
        #     return None
        resultRule = None
        if self.labels:
            totSum = sum(self.labels.values())
            cur_label = None
            cur_maxsupport = 0
            cur_maxconf = 0
            for label in self.labels:
                confidence = self.labels[label] / totSum
                # print("debug: support, conf: ", self.labels[label], confidence)
                # print("maxcursup, maxcurconf: ", max(MINSUP, cur_maxsupport), max(MINCONF, cur_maxconf))
                if self.labels[label] >= max(MINSUP, cur_maxsupport) and confidence >= max(MINCONF, cur_maxconf):
                    # union 3 sets to get the full data entry that passes this node
                    # print("bug check: ", set(rootPath), {self.name}, prefix)
                    # resultRules.append(RuleEntry(set.union(set(rootPath), {self.name}, prefix), label,
                    #                              self.labels[label], confidence))
                    cur_maxsupport = self.labels[label]
                    cur_maxconf = confidence
                    cur_label = label
            if cur_label is not None and cur_maxsupport != 0 and cur_maxconf != 0:
                if self.name != 'Root':
                    resultRule = RuleEntry(set.union(set(rootPath), {self.name}, prefix), cur_label,
                                           cur_maxsupport, cur_maxconf)
                else:
                    resultRule = RuleEntry(set.union(set(rootPath), prefix), cur_label,
                                           cur_maxsupport, cur_maxconf)

        elif self.name != 'Root':
            raise RuntimeError("retrieveAllRulesFromLeaf should not be called with a non-storing node.")
        if resultRule is not None:
            print("returned: ", resultRule, "\n", resultRule.items, "->", resultRule.label,
                  "conf = ", resultRule.confidence, "sup = ", resultRule.support)
        else:
            print("retunned None")
        return resultRule


In [4]:
# just a hack to make use of type hinting
def _getNodeFromTable(item: str, head_table: dict[str:[int, TreeNode]]) -> TreeNode:
    if item in head_table:
        return head_table[item][1]
    else:
        print("_getNodeFromTable: unexpected item str: ", item)


# purely for testing, temp code
def _getNodeFromTable_t(item: str, head_table: dict[str:[int, TreeNode]]) -> TreeNode:
    if item in head_table:
        return head_table[item][1]
    else:
        print("_getNodeFromTable: unexpected item str: ", item)

def projection(item: str, header_table: dict[str:[int, TreeNode]], root: TreeNode):
    """
    the function to evaluate a projection for a given element in the head table.
    :param item: the item name to be projected.
    :param header_table: head table of the tree to be projected.
    :return: root of new fp tree and its head table.
    """
    curNode = _getNodeFromTable_t(item, header_table)
    # if root.labels:
    #     # records = [DataEntry([], root.labels, root.count)]
    #     records = [DataEntry([], root.labels, sum(root.labels.values()))]

    # else:
    #     records = []
    records = []

    while curNode is not None:
        print("in projection, retrieve curNode name:, its parent: ", curNode.name, curNode.parent.name,
              "curnode label: ", curNode.labels)
        tempRecord = None
        if curNode.labels:
            tempRecord = curNode.retrieveRecordingFromLeaf()
        if tempRecord is not None:
            records.append(tempRecord)
        curNode = curNode.next
    """
    may have this function modified to support dataEntry definition.
    """
    print("debug: temprecord: ")
    for record in records:
        print(record.items, " -> ", record.label)

    new_root, new_header_table = createFPtree(records, header_table, MINSUP)
    for item in new_header_table:
        new_header_table[item][0][1] = header_table[item][0][1]
    return new_root, new_header_table


def merge(item: str, head_table: dict[str:[int, TreeNode]], root: TreeNode):
    """
    the method to merge a given element in an fp tree.
    the merged element is assumed to be a leaf node as CMAR paper proposes.
    :param item: the item name to be merged.
    :param head_table: the head table of the tree to be merged.
    :return: None.
    """
    curNode = _getNodeFromTable(item, head_table)
    print(">> enter merge: merging node ", curNode.name)
    while curNode is not None:
        if curNode.children:
            raise AssertionError("the merged node must be a leaf for this implementation."
                                 "Please check either head table or tree to make sure the least support node"
                                 "is always merged first.")
        tempParent = curNode.parent
        print("parent: ", tempParent.name, "ready to merge dicts")
        unifyTwoLabelDict(tempParent.labels, curNode.labels)
        """
        let garbage handling to handle break off node!
        """
        tempParent.removeChild(curNode.name)
        curNode = curNode.next

    """
    newly added to remove root labels, since they're nonsence (void record) produced by merge()

    --------FALSE. They make sense. --------zpz
    """
    # root.count -= sum(root.labels.values())
    # root.labels = {}

    head_table.pop(item)


# traverse to the end of a node and update next to target node
def updateHeader(current, target):
    while current.next:
        current = current.next
    current.next = target


# add one itemset to fp tree
def updateFPtree(dataentry, root, header_table):
    root.insertRecord_t(dataentry, header_table)


"""
would be inplace for dict 1. Please notice the arg order when you pass two dicts.
"""


def unifyTwoLabelDict(dict1: dict[str:int], dict2: dict[str:int]):
    """
    the naive approach to unify two dicts.
    embedded one cannot be used since it just replace value with dict 2, but in our case we need addition.
    :param dict1: the dict to be unioned as a BASIS.
    :param dict2: the dict to be ADDED.
    :return: None
    """

    for key in dict2.keys():
        if key in dict1:
            dict1[key] += dict2[key]
        else:
            dict1[key] = dict2[key]


# frequent item finding
def create_header_table111(dataset: list[DataEntry], old_headertable, minSup=MINSUP):
    """
    the function creating and returning a header table from scratch.
    this is the only origin of all header tables.
    :param dataset: the dataset without a head table.
    :param minSup: minimum support for an element to be included.
    :return:
    """
    header_table = {}
    for dataentry in dataset:
        for index, item in enumerate(dataentry.items):
            count = header_table.get(item, 0)
            if count:
                header_table[item][0] = count[0] + dataentry.count
            else:
                if old_headertable:
                    print(old_headertable)
                    header_table[item] = [dataentry.count, old_headertable[item][0][1]]
                else:
                    header_table[item] = [dataentry.count, index]
    header_table = {k: v for k, v in header_table.items() if v[0] >= minSup}
    # for k in header_table:
    #     header_table[k] = [header_table[k], None]  # element: [count, node]
    """
    a quick patch to turn not ordered list to ordered.
    """
    ordered_header_table = OrderedDict()
    # order according to frequency, then by index
    ordered_items = [v for v in sorted(header_table.items(), key=lambda p: (p[1][0], -p[1][1]))]
    # ordered_items = [v for v in sorted(header_table.items(), key=lambda p: ( -p[1][1], p[1][0]))]
    for entry in ordered_items:
        ordered_header_table[entry[0]] = [entry[1], None]
    return ordered_header_table


def createFPtree(dataset, old_headertable, minSup=MINSUP):
    print('>>>>enter create FP tree:')
    # store headers of each item
    # freq_itemset = set(header_table.keys())
    # if len(freq_itemset) == 0:
    #     print("No frequent itemset")
    #     return None, None
    #  这里有个111区别create_header_table函数
    header_table = create_header_table111(dataset, old_headertable, minSup)
    root = TreeNode('Root', 0, None)
    for dataentry in dataset:
        # print("current dataentry: ", dataentry.items, " -> ", dataentry.label)
        counter_dict = {}
        itemset = dataentry.items
        """
        do not handle empty item case 
        in this case, the item should be stored in root
        """
        # newly added to fix this bug
        if not itemset:
            print("debug: root insertion case")
            # unifyTwoLabelDict(root.labels, dataentry.label)
            updateFPtree(dataentry, root, header_table)

        for item in itemset:
            if item in header_table:  # filtering, only get frequent items
                counter_dict[item] = header_table[item][0]
        if counter_dict:
            # update fp tree
            ordered_items = [v[0] for v in
                             sorted(counter_dict.items(), key=lambda p: (p[1][0], -p[1][1]), reverse=True)]
            dataentry.items = ordered_items
            updateFPtree(dataentry, root, header_table)
    return root, header_table


# backtrack
def ascendFPtree(leaf, prefixPath):
    if leaf.parent is not None:
        prefixPath.append(leaf.name)
        ascendFPtree(leaf.parent, prefixPath)


def findPrefixPath(item, header_table: dict[str, int | list[TreeNode | None]]):
    """
    find all the data entry items (path) for a given item in the header_table, by tree backtracking.
    since header_table already points to an FP tree, no need to provide FP tree for this function.
    :param item: the item in the header_table to be find in a fp tree.
    :param header_table: the header table for the tree to be mined.
    :return: all the data records (paths to root) found in the given FP tree denoted by header_table.
    """
    node = header_table[item][1]  # first node of the item
    paths = []
    while node:
        prefixPath = []
        ascendFPtree(node, prefixPath)  # from node to root
        if len(prefixPath) > 1:
            # path_counter[frozenset(prefixPath[1:])] = node.count  # mapping of path and node (item) count
            dataentry = DataEntry(prefixPath, node.labels, 1)
            paths.append(dataentry)
        node = node.next  # scan next node
    return paths


def mineFPtree(root, header_table: dict[str, int | list[TreeNode | None]],
               minSup: int, prefix: set[str], fre_itemlist: list[set], rule_list: list[RuleEntry]):
    print("in mine tree: ordered item: ", header_table)

    """
    newly added to support mining from single root node.
    otherwise merge or projection should be performed on the tree.
    """
    if not root.children:
        rule = root.retrieveRulesFromLeaf(prefix)
        if rule:
            print("RRRRRRRRRRRRRRRRRRRRRRnew rule appended: ", rule.items, "->", rule.label, "sup=", rule.support,
                  "conf=", rule.confidence)

            rule_list.append(root.retrieveRulesFromLeaf(prefix))

    copy_head_table = header_table.copy()
    """
        since each item in header_table is used only once, safe to get a deep copy and loop with original one
        and since unvisited node is always valid, safe to ignore danging reference check
    """
    for fre_item in header_table.keys():  # for every frequent item
        print(">>>>>>>>>>iteration for fre_item: ", fre_item, ", current tree: ")
        print('header table is', header_table)
        root.display()
        cur_node = _getNodeFromTable_t(fre_item, header_table)
        # newly added to mine rules
        while cur_node is not None:
            print("when retrieving record, curNode: ", cur_node.name, cur_node.count)
            cur_rules = cur_node.retrieveRulesFromLeaf(prefix)

            print("cur rules: ", cur_rules)
            if cur_rules:
                print("%%%%%%%%%%%%%%%%%%%%%%new rule appended: ", cur_rules.items, "->", cur_rules.label)
                rule_list.append(cur_rules)
            cur_node = cur_node.next
        new_freq_set = prefix.copy()
        new_freq_set.add(fre_item)
        fre_itemlist.append(new_freq_set)
        # paths = findPrefixPath(fre_item, header_table)  # find all the paths of the current item
        # new_root, new_header_table = createFPtree(paths, minSup)  # construct FP tree of the current item
        new_root, new_header_table = projection(fre_item, copy_head_table, root)
        if new_root:
            print("after projection: ")
            new_root.display()
            mineFPtree(new_root, new_header_table, minSup, new_freq_set, fre_itemlist,
                       rule_list)  # recursively construct FP tree
        merge(fre_item, copy_head_table, root)

    """
    remember to mine the root here!!!!!
    """
    # the case where there's a single root remaining
    # check whether the root is a single node, if not so, there must be some problem with tree structure
    if not root.parent and not root.children and root.labels:
        cur_rules = root.retrieveRulesFromLeaf(prefix)

        print("cur rules: ", cur_rules)
        if cur_rules:
            print("%%%%%%%%%%%%%%%%%%%%%%new rule appended: ", cur_rules.items, "->", cur_rules.label)
            rule_list.append(cur_rules)


def printRules(rules: list[RuleEntry] | RuleEntry):
    print("whole view of rule object: ", rules)
    if rules is None:
        return
    if isinstance(rules, RuleEntry):
        print("rule: ", rules.items, "->", rules.label, ": ", "sup: ", rules.support, "conf: ", rules.confidence)
        return
    for rule in rules:
        # if rule is None:
        #     continue
        print("rule object: ", rule)
        print("rule: ", rule.items, "->", rule.label, ": ", "sup: ", rule.support, "conf: ", rule.confidence)


def load_test_data():
    # data = [['x', 'y', 'z'], ['x', 'y', 'z'], ['x', 'y', 'z'], ['x', 'y', 'z', 'a'], ['x', 'y', 'a'], ['x', 'z', 'a'],
    #         ['y', 'a', 'abaaba', 'QAQ'], ['ABA', 'QAQ'], ['QAQ', 'a', 'z'], ['y', 'z', 'a'], ['y', 'z', 'a', 'QAQ']]
    # data = [['x', 'y', 'z'], ['x', 'y', 'z'], ['x', 'y', 'z'], ['x', 'y', 'z', 'a'], ['x', 'y', 'a'], ['x', 'z', 'a']]

    # labels = ['A', 'A', 'A', 'B', 'A', 'B', 'C', 'D', 'A', 'A', 'A']
    # data = [['a1', 'b1', 'c1', 'd1'], ['a1', 'b1', 'c2', 'd2'], ['a1', 'b1', 'c1', 'd1'],
    #         ['a2', 'b1', 'c2', 'd1'], ['a3', 'b1', 'c1', 'd1'], ['a2', 'b2', 'c2', 'd1'],
    #
    #         ['a3', 'b2', 'c3', 'd3'], ['a1', 'b2', 'c3', 'd3'], ['a3', 'b2', 'c2', 'd3'],
    #         ['a2', 'b3', 'c2', 'd3'], ['a3', 'b1', 'c1', 'd3'], ['a1', 'b2', 'c2', 'd3'],
    #
    #         ['a1', 'b1', 'c1', 'd3'], ['a3', 'b2', 'c1', 'd1'], ['a2', 'b1', 'c3', 'd1'],
    #         ['a3', 'b3', 'c3', 'd3'], ['a1', 'b3', 'c1', 'd3'], ['a2', 'b2', 'c2', 'd2']]
    # labels = ['A', 'A', 'A', 'A', 'A', 'B',
    #           'C', 'C', 'B', 'B', 'A', 'B',
    #           'A', 'A', 'A', 'C', 'A', 'B']
    # data = [['a1', 'b1', 'c1', 'd1'], ['a1', 'b1', 'c2', 'd2'], ['a1', 'b1', 'c1', 'd1']]
    # labels = ['A', 'A', 'A']
    # data = [['a1', 'b1', 'c1']] * 100 + [['a1', 'b1', 'c2']] * 150 + [['a1', 'b1', 'c3']] * 60 + \
    #        [['a2', 'b1', 'c1']] * 100 + [['a1', 'b2', 'c1']] * 50
    # labels = ['A'] * 80 + ['B'] * 20 + ['A'] * 150 + ['C'] * 60 + ['B'] * 100 + ['B'] * 50
    data = [['x1', 'y1', 'z1', 'n1'], ['x1', 'y2', 'z1', 'n2'], ['x2', 'y3', 'z2', 'n3'], ['x1', 'y2', 'z3', 'n3'],
            ['x1', 'y2', 'z1', 'n3']]
    labels = ['A', 'B', 'A', 'B', 'A']
    print("data: ", data)
    print("labels: ", labels)
    print("both var's length: ", len(data), len(labels))
    # data = [['x1', 'y1', 'z1', 'n1'], ['x1', 'y2', 'z1', 'n2'], ['x2', 'y3', 'z2', 'n3'], ['x1', 'y2', 'z3', 'n3'],
    #         ['x1', 'y2', 'z1', 'n3']]
    # labels = ['A', 'B', 'A', 'B', 'A']
    dataset = []
    for i in range(len(data)):
        dataset.append(DataEntry(data[i], {labels[i]: 1}, 1))
    return dataset


def test():
    n = MINSUP
    data = load_test_data()
    # initSet = fpgrowth.createInitSet(data)
    myFPtree, myHeaderTab = createFPtree(data, minSup=MINSUP)
    myFPtree.display()
    freqItems = []
    rules = []
    print('start')
    mineFPtree(myFPtree, myHeaderTab, n, set([]), freqItems, rules)
    print('end')
    for x in freqItems:
        print(x)
    printRules(rules)

In [5]:
class CRTreeNode:
    """
    The CR tree node class as the element, notice that only one label should be stored in one node if confidence > 50%
    """

    def __init__(self, name: str, support: int, confidence: float, parent: CRTreeNode | None, label=None):
        self.name = name
        self.confidence = confidence
        self.support = support
        self.next = None
        self.parent = parent
        # dict[str:TreeNode]
        self.children = {}
        self.label = label

    def display(self, depth=1):
        # # print("node ", self.name, " . display:")
        if not self.label:
            # # print('  ' * depth, self.name + ':' + str(str(self.support)) + ' ' + str(self.confidence))
            # print('  ' * depth, self.name)
            pass
        else:
            # print('  ' * depth, self.name + ':' + str(str(self.support)) + ' ' + str(self.confidence) + ' ' + str(self.label))
            pass
        for child in self.children.values():
            child.display(depth + 1)

    def _checkSelf_nothighRankThanRule(self, rule: RuleEntry):
        # self.label != rule.label may need to change, not proven by analysis that it's correct to prune rules
        if self.label is None or self.support is None or self.confidence is None or self.label != rule.label:
            return 1
        selfRule = RuleEntry(OrderedSet(self.findRootPath() + [self.name]), self.label,
                             self.support, self.confidence)
        return compare_rules(rule, selfRule)

    def insertRule(self, rule: RuleEntry, header_table) -> None:
        """
        the method to insert a rule to this tree node.
        NOTICE THAT this should be only called with root nodes, otherwise tree structure would be spoilt.
        :param header_table: the header table possibly to be updated.
        :param rule: the rule entry object to be inserted.
        :return: None.
        """
        if self.parent:
            raise AssertionError("the node to be called with this function must be a root node.")
        curNode = self
        deviateFlag = False
        # item_num = len(rule.items)
        for index, item in enumerate(rule.items):
            # if a passing node has higher rank, just prune the inserted rule
            if not deviateFlag and curNode._checkSelf_nothighRankThanRule(rule) == -1:
                # print("rule insertion pruned by predecessor: ")
                rule.display()
                return
            if deviateFlag or item not in curNode.children:
                deviateFlag = True
                """
                the old code does not handle the case where the rule ends up at an existing storage node.
                """
                # if index + 1 == item_num:
                #     node = CRTreeNode(item, rule.support, rule.confidence, self, rule.label)
                # else:
                #     node = CRTreeNode(item, 0, 0, self)
                node = CRTreeNode(item, 0, 0, None)
                curNode._insertChild(node)
                if not header_table[item]:
                    header_table[item] = node
                else:
                    updateHeader(header_table[item], node)
            curNode = curNode.children[item]
        """
        remember to prune sucessors here!!!!!!!!!!!!!!!!!!!!!!!!!
        """
        self.pruneSucessor(rule)

        curNode.label = rule.label
        curNode.support = rule.support
        curNode.confidence = rule.confidence

    def pruneSucessor(self, rule: RuleEntry):
        """
        the function called to prune successor nodes from this nade
        recursively call this function and call _pruneThis for each node itself
        use bottom-up recursion to also eliminate empty branches of nodes
        :param rule: the rule used to prune sucessors
        :return: None
        """
        for name in self.children:
            self.children[name].pruneSucessor(rule)
        if self.label is None or self.support is None or self.confidence is None:
            self._pruneThis(rule)
        if self.children is None:
            return

    def _pruneThis(self, rule: RuleEntry):
        """
        the function is called with a predecessor node.
        if self rule has lower rank, prune rule stored in this node.
        :param rule: the rule to be compared from a predecessor.
        :return: None
        """
        if self._checkSelf_nothighRankThanRule(rule):
            self.label = None
            self.support = None
            self.confidence = None
            self.label = None
        # empty this node if it is a leaf and pruned
        if self.children is None:
            self.parent.children.pop(self.name)

    def _insertChild(self, child: CRTreeNode) -> None:
        """
        the method to insert a child for current tree node.
        may overwrite the existing child if the name of child already exists.
        :param child: a treenode object to be inserted as child.
        :return: None
        """
        if isinstance(child, CRTreeNode):
            self.children[child.name] = child
            # update parent also as find prefix path needs parent
            child.parent = self
        else:
            pass
            # print("insertChild expects a child object, receive ", type(child), " instead")

    def getRule(self) -> RuleEntry | None:
        # # print("node ", self.name, " retrieved record: ", self.label)
        rootPath = self.findRootPath()
        if not rootPath:
            return None
        if self.label:
            # tempRecord = DataEntry([self] + rootPath, self.labels, self.count)
            # no need self node
            rule = RuleEntry(set(rootPath), self.label, self.support, self.confidence)
        else:
            raise RuntimeError("getRule should not be called with a non-storing node.")
        return rule

    def findRootPath(self) -> list[str]:
        """
        this function finds the path to root of FP tree given a tree node.
        :return: the path to root, including root but exluding given node.
        """
        path = [self.name]
        cur_element = self
        while cur_element.parent.name and cur_element.parent.name != 'CR Root':
            cur_element = cur_element.parent
            path.append(cur_element.name)
        return path[::-1]

    def getAllRules(self, header_table):
        rule_list = []
        for item in header_table:
            node = header_table[item]
            while node:
                if node.label:
                    rule_list.append(node.getRule())
                node = node.next
        return rule_list


def compare_rules(r1, r2):
    if r1.confidence > r2.confidence:
        return 1
    elif r1.confidence < r2.confidence:
        return -1
    else:
        if r1.support > r2.support:
            return 1
        elif r1.support < r2.support:
            return -1
        else:
            if len(r1.items) < len(r2.items):
                return 1
            elif len(r1.items) > len(r2.items):
                return -1
            else:
                # print("actually, the two rules are identical")
                return 0

def create_header_table(rule_list: list[RuleEntry]):
    s = set()
    count_dict = {}
    for rule in rule_list:
        for item in rule.items:
            if item in count_dict:
                count_dict[item] += 1
            else:
                count_dict[item] = 1
        s = s.union(rule.items)
    s_list = list(s)
    # ordered_items = sorted(s_list, key=lambda p: (count_dict[p], p), reverse=True)
    ordered_items = sorted(s_list) # sort just based on element alphabetical order

    header_table = OrderedDict.fromkeys(ordered_items, None)
    # print("debug: in create header table, header table returned: ", header_table)
    return header_table



def createCRtree(rule_list: [RuleEntry]):
    header_table = create_header_table(rule_list)
    # print("createCRTree: head table: ", header_table.keys())
    root = CRTreeNode('CR Root', 0, 0, None)
    for rule in rule_list:
        root.insertRule(rule, header_table)
    return root, header_table


# traverse to the end of a node and update next to target node
def updateHeader(current, target):
    while current.next:
        current = current.next
    current.next = target


# for test data conversion
"""
changed to ordered set for this function. may cause further problem,
remember to go back and take care of this guy!
"""


def convert_to_rule(rule):
    return RuleEntry(OrderedSet(rule[0]), rule[1], rule[2], rule[3])


def pruneByCoverage(dataset: list[DataEntry], rules: list[RuleEntry]) -> (list[RuleEntry], str):
    # the rule traversal is performed based on CBA ranking
    ordered_rules = sorted(rules, key=cmp_to_key(compare_rules), reverse=True)
    mark_count_dict = {}
    retained_rules = []
    """
    since a rule can take middle few elements of a record, i cannot come up a way
    to make use of FP tree to simplify matching process
    hence here i just use the very naive approach
    """
    uncovered = {i for i in range(len(dataset))}

    for rule in ordered_rules:
        retain_this_rule = False
        marked = []
        for record_idx in range(len(dataset)):
            record = dataset[record_idx]
            # check whether the rule correctly classify any label
            if rule.items.issubset(record.items) and record_idx in mark_count_dict \
                    and mark_count_dict[record_idx] <= MAXCOVERAGE:
                marked.append(record_idx)
                if rule.label == record.label and not retain_this_rule:
                    retain_this_rule = True
        # update dataset and remove fully k-marked entries only if the rule is retained
        if retain_this_rule:
            retained_rules.append(rule)
            for record_idx in marked:
                if record_idx in mark_count_dict and mark_count_dict[record_idx] <= MAXCOVERAGE:
                    mark_count_dict[record_idx] += 1
                else:
                    mark_count_dict[record_idx] = 1
                    # added to compute default label
                if mark_count_dict[record_idx] > MAXCOVERAGE:
                    uncovered.remove(record_idx)

    """ newly added to perform default class computation """
    label_counts = {}
    for record_idx in uncovered:
        record = dataset[record_idx]
        label = [key for key in record.label][0]
        if label in label_counts:
            label_counts[label] += 1
        else:
            label_counts[label] = 1
    """ 反手一个擂台算法摁干 """
    default_label = None
    max_label_count = 0
    for label, count in label_counts.items():
        if count > max_label_count:
            max_label_count = count
            default_label = label
    return retained_rules, default_label

In [6]:
class CMARClassifier:
    def __init__(self, rules: [RuleEntry], default_label: str, dataset: Optional[DataEntry], dataset_size: int):
        self.rules = rules
        self.default_label = default_label
        # used for max X2 square
        self.dataset = dataset
        self.dataset_size = dataset_size

        self.dataset_label_count = {}
        if dataset is None:
            return
        for record in dataset:
            label_dic = record.label
            for label in label_dic:
                if label in self.dataset_label_count:
                    self.dataset_label_count[label] += 1
                else:
                    self.dataset_label_count[label] = 1

    def classify(self, record: DataEntry) -> str:
        precondition = record.items
        valid_rules = {}
        for rule in self.rules:
            # if the rule matches the data
            if rule.items.issubset(precondition):
                # add the classified label count to the dictionary
                if rule.label in valid_rules:
                    valid_rules[rule.label].append(rule)
                else:
                    valid_rules[rule.label] = [rule]
        # processing label and counts to find the last label
        final_label = self.default_label
        for key, ruleList in valid_rules.items():
            print(key, " rules : ")
            for rule in ruleList:
                rule.display()
        max_wchisq = 0
        for label in valid_rules:
            weighted_chisq = self.weighted_chi_square(valid_rules[label])
            if weighted_chisq >= CHISQTHRESHOLD and weighted_chisq > max_wchisq:
                final_label = label
                max_wchisq = weighted_chisq
        return final_label

    @staticmethod
    def chi_squared(precondition_count: int, label_count: int, support: int, datasize: int) -> (float, float):
        # print('support (nAB) = ', support, " precond count = ", precondition_count)
        # print('label count = ', label_count, " datasize = ", datasize)
        # print('------------------------------------------------------')

        nAB = support
        n_notA_notB = datasize - (precondition_count + label_count) + support
        n_A_notB = precondition_count - support
        n_notA_B = label_count - support
        # print("nAB = ", nAB, "nA'B' = ", n_notA_notB)
        # print("nAB' = ", n_A_notB, " nA'B = ", n_notA_B)
        g, p, _, _ = chi2_contingency(np.array([[nAB, n_A_notB], [n_notA_B, n_notA_notB]]))
        # not sure about what does these two mean: not checked documentation yet
        return g, p

    @staticmethod
    def max_chi_squared(precondition_count: int, label_count: int, datasize: int):
        n_notleft = datasize - precondition_count
        n_notright = datasize - label_count

        e = 1.0 / (precondition_count * label_count) + 1.0 / (precondition_count * n_notright) + \
            1.0 / (n_notleft * label_count) + 1.0 / (n_notleft * n_notright)
        return ((min(precondition_count,
                     label_count) - precondition_count * label_count / datasize) ** 2) * datasize * e

    def weighted_chi_square(self, rules: [RuleEntry]):
        sum = 0
        for rule in rules:
            precondition_count = int(rule.support / rule.confidence)
            label_count = self.dataset_label_count[rule.label]
            support = rule.support

            chisq, _ = CMARClassifier.chi_squared(precondition_count, label_count, support, self.dataset_size)
            max_chisq = CMARClassifier.max_chi_squared(precondition_count, label_count, self.dataset_size)
            sum += (chisq * chisq) / max_chisq
        return sum


In [7]:
def read(data_path, scheme_path):
    data = read_data(data_path)
    attributes, value_type = read_scheme(scheme_path)
    data = str2numerical(data, value_type)
    return data, attributes, value_type
def str2numerical(data, value_type):
    size = len(data)
    columns = len(data[0])
    for i in range(size):
        for j in range(columns - 1):
            if value_type[j] == 'numerical' and data[i][j] != '?':
                data[i][j] = float(data[i][j])
    return data
def pre_process(data, attribute, value_type):
    column_num = len(data[0])
    size = len(data)
    class_column = [x[-1] for x in data]
    discard_list = []
    for i in range(0, column_num - 1):
        data_column = [x[i] for x in data]

        # process missing values
        missing_values_ratio = data_column.count('?') / size
        if missing_values_ratio > 0.5:
            discard_list.append(i)
            continue
        elif missing_values_ratio > 0:
            data = fill_missing_values(data, i)
            data_column = [x[i] for x in data]

        # discretization
        if value_type[i] == 'numerical':
            discretization_data = get_discretization_data(data_column, class_column)
            block = rmep.Block(discretization_data)
            walls = rmep.partition(block)
            if len(walls) == 0:
                max_value = max(data_column)
                min_value = min(data_column)
                step = (max_value - min_value) / 3
                walls.append(min_value + step)
                walls.append(min_value + 2 * step)
            print(attribute[i] + ":", walls)        # print out split points
            data = replace_numerical(data, i, walls)
        elif value_type[i] == 'categorical':
            data, classes_no = replace_categorical(data, i)
            print(attribute[i] + ":", classes_no)   # print out replacement list

    # discard
    if len(discard_list) > 0:
        data = discard(data, discard_list)
        print("discard:", discard_list)             # print out discard list
    return data
def replace_categorical(data, column_no):
    size = len(data)
    classes = set([x[column_no] for x in data])
    classes_no = dict([(label, 0) for label in classes])
    j = 1
    for i in classes:
        classes_no[i] = j
        j += 1
    for i in range(size):
        data[i][column_no] = classes_no[data[i][column_no]]
    return data, classes_no


In [8]:

def read_data(path):
    data = []
    with open(path, 'r') as csv_file:
        reader = csv.reader(csv_file, delimiter=',')
        for line in reader:
            data.append(line)
        while [] in data:
            data.remove([])
    return data

def read_scheme(path):
    with open(path, 'r') as csv_file:
        reader = csv.reader(csv_file, delimiter=',')
        attributes = next(reader)
        value_type = next(reader)
    return attributes, value_type

def convert_to_dataentry_and_build_tree(data, scheme):
    dataentries = []
    items = []
    for index, char in enumerate(data[:-1]):
        items.append(str(scheme[index]) + '' + char)
    dataentry = DataEntry(items, {data[-1]: 1}, 1)
    dataentries.append(dataentry)
    root, head_table = createFPtree(dataentries, minSup=MINSUP)
    print(head_table)
    root.display()


def convert_ruleitem_to_ours(l_ruleitem:RuleItem, scheme):
    support = l_ruleitem.rule_sup_count
    label = l_ruleitem.class_label
    confidence = l_ruleitem.confidence
    items = OrderedSet()
    for cond in l_ruleitem.cond_set:
        attribute = scheme[0][cond]
        value = l_ruleitem.cond_set[cond]
        items.add(attribute+str(value))
    return RuleEntry(items, label, support, confidence)

def convert_data_to_dataentry(l_data, scheme):
    attributes = scheme[0]
    items = []
    for index, i in enumerate(l_data[:-1]):
        item = attributes[index] + str(i)
        items.append(item)
    label = l_data[-1]
    return DataEntry(items, {label:1}, 1)

def get_error(classifier, dataentries: [DataEntry]):
    error_count = 0
    result_counter = {}
    for dataentry in dataentries:
        label = [key for key in dataentry.label][0]
        result = classifier.classify(dataentry)
        if result in result_counter:
            result_counter[result] += 1
        else:
            result_counter[result] = 1
        result_counter[result] += 1
        if result != label:
            error_count += 1
    print(result_counter)
    print("error count is", error_count)
    return error_count/len(dataentries)
# get rules from dataset
def get_rules(data, MINSUP):
    myFPtree, myHeaderTab = createFPtree(data, {}, minSup=MINSUP)
    print("my header_table is ", myHeaderTab)
    myFPtree.display()
    freqItems = []
    rules = []
    mineFPtree(myFPtree, myHeaderTab, MINSUP, set([]), freqItems, rules)
    return rules

def test_read_data(data_path, scheme_path, minsup=MINSUP, minconf=MINCONF):
    data, attributes, value_type = read(data_path, scheme_path)
    # data example ['vhigh', 'vhigh', '2', '2', 'small', 'low', 'unacc']
    # random.shuffle(data)
    dataset = pre_process(data, attributes, value_type)
    # dataset example [4, 1, 4, 2, 1, 1, 'acc']

    scheme = read_scheme(scheme_path)
    print(scheme)


    block_size = int(len(dataset) / 10)
    split_point = [k * block_size for k in range(0, 10)]
    split_point.append(len(dataset))
    running_times = len(split_point) - 1
    running_times = 1
    for k in range(running_times):
        print("\nRound %d:" % k)
    #
        training_dataset = dataset[:split_point[k]] + dataset[split_point[k+1]:]

        train_dataset_to_feed = []
        for data in training_dataset:
            train_dataset_to_feed.append(convert_data_to_dataentry(data, scheme))
        print("dataentry example is")
        train_dataset_to_feed[0].display()
        test_dataset = dataset[split_point[k]:split_point[k+1]]
        test_dataset_to_feed = []
        for data in test_dataset:
            dataentry = convert_data_to_dataentry(data, scheme)
            test_dataset_to_feed.append(dataentry)
        rules = get_rules(train_dataset_to_feed, MINSUP)
        cr_rule_list = rules
        print('rule example is')
        cr_rule_list[0].display()
        """
            newly added pruning functions
            """
        # prune by establishing the cr tree

    root, header_table = createCRtree(cr_rule_list)
    # may figure out a way to perform pruning by coverage in-place of a tree
    # but in this way we cannot order rules in the tree
    # so just pull 'em out and order
    tree_pruned_rules = root.getAllRules(header_table)
    retained_rules, default_label = pruneByCoverage(train_dataset_to_feed, tree_pruned_rules)
    classifier = CMARClassifier(tree_pruned_rules, default_label, train_dataset_to_feed, len(train_dataset_to_feed))
    # classifier = CMARClassifier(cr_rule_list, default_class, train_dataset_to_feed, len(train_dataset_to_feed))
    print(get_error(classifier, test_dataset_to_feed))
    print(len(test_dataset_to_feed), len(train_dataset_to_feed))
    print('default class is', default_label)
    print("rule number is ", len(tree_pruned_rules))
def test():
    data_name = 'car'
    test_data_path = './dataset/{data_name}.data'.format(data_name=data_name)
    test_scheme_path = './dataset/{data_name}.names'.format(data_name=data_name)
    test_read_data(test_data_path, test_scheme_path)

test()

buying: {'high': 1, 'med': 2, 'vhigh': 3, 'low': 4}
maint: {'high': 1, 'med': 2, 'vhigh': 3, 'low': 4}
doors: {'3': 1, '5more': 2, '4': 3, '2': 4}
persons: {'more': 1, '4': 2, '2': 3}
lug_boot: {'big': 1, 'med': 2, 'small': 3}
safety: {'high': 1, 'med': 2, 'low': 3}
(['buying', 'maint', 'doors', 'persons', 'lug_boot', 'safety', 'class'], ['categorical', 'categorical', 'categorical', 'categorical', 'categorical', 'categorical', 'label'])

Round 0:
dataentry example is
buying3 maint1 doors3 persons2 lug_boot3 safety2  {'unacc': 1}
>>>>enter create FP tree:
my header_table is  OrderedDict([('buying3', [[260, 0], <__main__.TreeNode object at 0x00000291C36480A0>]), ('maint3', [[324, 1], <__main__.TreeNode object at 0x00000291C3690FA0>]), ('maint1', [[368, 1], <__main__.TreeNode object at 0x00000291C3648070>]), ('doors4', [[378, 2], <__main__.TreeNode object at 0x00000291C365C310>]), ('doors1', [[378, 2], <__main__.TreeNode object at 0x00000291C3667250>]), ('doors3', [[395, 2], <__main__.Tre

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



safety2 doors3 lug_boot2  ->  unacc 14 1.0
safety2 doors3 lug_boot1  ->  unacc 14 1.0
safety2 doors3 lug_boot2  ->  unacc 14 1.0
safety2 doors3 lug_boot1  ->  unacc 14 1.0
safety1 doors3 lug_boot2  ->  unacc 14 1.0
safety1 doors3 lug_boot1  ->  unacc 14 1.0
safety1 doors3 lug_boot2  ->  unacc 14 1.0
safety1 doors3 lug_boot1  ->  unacc 14 1.0
safety2 lug_boot2  ->  unacc 57 1.0
safety2 lug_boot1  ->  unacc 57 1.0
safety2 lug_boot2  ->  unacc 57 1.0
safety2 lug_boot1  ->  unacc 57 1.0
safety1 lug_boot2  ->  unacc 57 1.0
safety1 lug_boot1  ->  unacc 57 1.0
safety1 lug_boot2  ->  unacc 57 1.0
safety1 lug_boot1  ->  unacc 57 1.0
unacc  rules : 
buying3  ->  unacc 188 0.7230769230769231
safety3 buying3  ->  unacc 86 1.0
buying3 doors4  ->  unacc 40 0.7407407407407407
doors4  ->  unacc 272 0.7195767195767195
safety3 doors4  ->  unacc 126 1.0
lug_boot3  ->  unacc 392 0.7567567567567568
buying3 persons3 lug_boot3  ->  unacc 24 1.0
safety3 buying3 lug_boot3  ->  unacc 27 1.0
buying3 lug_boot3  -