In [1]:
import math
class Appearance:
    """Appearance represents an easy way to get an appearance 
    dictionary for functions from fim package.
    Attributes
    ----------
    self.lhs: array of (string, string)
    self.rhs: array of (string, string)
    frozenset: frozenset of Items
        this attribute is vital for determining if antecedent
        is a subset of transaction and, consequently, if transaction
        satisfies antecedent
    """
    
    def __init__(self):
        self.lhs = []
        self.rhs = []
        
        
    def add_to_LHS(self, item):
        self.__add(item, "a")
        
    def add_to_RHS(self, item):
        self.__add(item, "c")
        
    def __add(self, item, where):
        """
        Function for adding a condition to either self.rhs
        or self.lhs
        """

        key, value = item.attribute, item.value
        string_repr = "{}:=:{}".format(key, value)
        
        # finding to which side we need to insert
        where_list = self.lhs if where == "a" else self.rhs
        
        # inserting a condition
        where_list.append((string_repr, where))
        
        
    @property
    def dictionary(self):
        """
        Get a final dictionary to be used in functions 
        from fim package.
        """
        if not self.lhs:
            self.lhs.append((None, "a"))
            
        if not self.rhs:
            self.rhs.append((None, "c"))
            
        appear_list = self.lhs + self.rhs
        
        return dict(appear_list)

In [2]:
import collections

class ClassAssocationRule():
    """ClassAssociationRule (CAR) is defined by its antecedent, consequent,
    support, confidence and id. 
    It has a set of Items in its antecedent and one Item in its
    Consequent. 
    __lt__ and __gt__ operators are overriden so that list of CARs can
    be sorted.  
    
    
    Parameters
    ----------
    
    antecedent: Antecedent
        Items that a Transaction has to satisfy
   
    consequent: Consequent
        Target class of a Transaction that satisfies
        antecedent
    
    support: float
        how many transactions satisfy the rule, relatively
    
    confidence: float
        relative degree of certainty that consequent holds
        given antecedent
    Attributes
    ----------
    antecedent
    conseqent
    support
    confidence
    rid: int
        rule id
    support_count: int
        absolute support count
    marked: bool
    class_cases_covered: collections.Counter
        counter for determining which transactions are
        covered by the antecedent. Important for M2Algorithm.
    
    replace: set of ClassAssociationRule
        set of rules that have higher precedence than
        this rule and can replace it in M2Algorithm.
    """

    id = 0

    def __init__(self, antecedent, consequent, support, confidence):
        self.antecedent = antecedent
        self.consequent = consequent
        self.support = support
        self.confidence = confidence
        self.rulelen = len(antecedent) + 1
        self.rid = ClassAssocationRule.id

        ClassAssocationRule.id += 1

        self.support_count = 0
        
        self.marked = False
        
        self.class_cases_covered = collections.Counter()
        self.replace = set()
        
        
    def __gt__(self, other):
        """
        precedence operator. Determines if this rule
        has higher precedence. Rules are sorted according
        to their confidence, support, length and id.
        """
        if (self.confidence > other.confidence):
            return True
        elif (self.confidence == other.confidence and
              self.support > other.support):
            return True
        elif (self.confidence == other.confidence and
              self.support == other.support and
              self.rulelen < other.rulelen):
            return True
        elif(self.confidence == other.confidence and
              self.support == other.support and
              self.rulelen == other.rulelen and
              self.rid < other.rid):
            return True
        else:
            return False
    
    def __lt__(self, other):
        """
        rule precedence operator
        """
        return not self > other
    
    def __len__(self):
        """
        returns
        -------
        
        length of this rule 
        """
        return len(self.antecedent) + len(self.consequent)


    def __repr__(self):
        args = [self.antecedent.string(), "{" + self.consequent.string() + "}", self.support, self.confidence, self.rid]
        text = "CAR {} => {} sup: {:.2f} conf: {:.2f} id: {}\n" .format(*args)

        return text

In [3]:
class ComparableItemSet:
    """ ComparableItemSet is a common ancestor
    for Antecedent and Transaction class so that
    they both can be compared using <= and >= 
    operators.
    Any class that inherits from ComparableItemSet
    needs to have a "frozenset" attribute. "frozenset"
    attribute is a frozenset of Items and allows easy 
    comparing and determining if one ComparableItemSet
    is a subset or superset of another ComparableItemSet.
    """

    def issuperset(self, other):
        return self.frozenset >= other.frozenset
        
    def issubset(self, other):
        return self.frozenset <= other.frozenset 
        
    def __ge__(self, other):
        return self.issuperset(other)
        
    def __le__(self, other):
        return self.issubset(other)

In [4]:
class Item():
    """ Item class for representing attribute-value pair
    and one item in transaction or antecedent.
    Parameters
    ----------
    attribute : str
        name of the item
    value: str
        value of the item
    Attributes
    ----------
    attribute : str
        name of the item
    value: str
        value of the item
    """
    def __init__(self, attribute, value):
        # convert attribute and value so that 
        # Item("a", 1) == Item("a", "1")
        self.attribute = repr(attribute) if type(attribute) != str else attribute
        self.value = repr(value) if type(value) != str else value
        
    def __get_tuple(self):
        """Private method for getting an (attribute, value) pair"""
        return (self.attribute, self.value)
    
    def __getitem__(self, idx):
        """Method for accessing Item as a tuple"""
        item = self.__get_tuple()
        return item[idx]
    
    
    def __hash__(self):
        """Two Items with the same attribute and value
        have identical hash value.
        """
        return hash(self.__get_tuple())
    
    def __eq__(self, other):
        """Overriden method in order to compare based on
        value and not reference.
        """
        return hash(self) == hash(other)
    
    def __repr__(self):
        """Method for representing Item as a string.
        >>> item1 = Item("a", 1)
        >>> repr(item1)
        >>> Item{(a, 1)}
        """

        return "Item{{{}}}".format(self.__get_tuple())

    def string(self):
        """Method for getting simpler representation.
        
        
        >>> item1 = Item("a", 1)
        >>> item1.string()
        >>> a=1
        """
        return "{}={}".format(*self)

In [5]:
import pandas as pd
import numpy as np
class Transaction(ComparableItemSet):
    """Transaction represents one instance in a dataset.
    Transaction is hashed based on its items and class 
    value. 
    Parameters
    ----------
    
    row: array of ints or strings
    header: array of strings
        Represents column labels.
    
    class_item: Item
        Item with class attribute.
    drop_NaN: bool
        Used for determining whether a an Item
        with NULL value should be dropped from Transaction
    Attributes
    ----------
    items: array of Items
    tid: int
        Transaction ID.
    alreadycovered: bool
        Used in M2Algorithm for determining if the transaction
        was already covered by some other rule.
    string_items: two dimensional array of strings
        e.g. [["a:=:1", "b:=:2"]]
    """


    id_ = 0
    
    def __init__(self, row, header, class_item, drop_NaN=True):
        self.class_val = class_item
        self.items = []
        self.tid = Transaction.id_
        Transaction.id_ += 1
        
        self.alreadycovered = False
        
        # eg. [pay=high, eyes=green]
        self.string_items = []
        
        
        for idx, val in enumerate(row):
            # Drop items with NULL value
            if drop_NaN and pd.isnull(val):
                continue

            header_label = header[idx]
            
            item = Item(header_label, val)
            
            self.string_items.append("{}:=:{}".format(header_label, val)) 
            
            self.items.append(item)
            
        key, val = self.class_val
        self.string_items.append("{}:=:{}".format(key, val))

        self.frozenset = frozenset(self)
            
            
    
    def __repr__(self):
        string = ", ".join(self.string_items) 
        return "{" + string + "}"
    
    def __hash__(self):
        return hash(tuple(self.items))
        #return hash((self.tid, tuple(self.items)))

    def __eq__(self, other):
        return hash(self) == hash(other)
    
    def __getitem__(self, idx):
        return self.items[idx]
    
    def getclass(self):
        return self.class_val



class UniqueTransaction(Transaction):
    """Same as Transaction class except for
    hashing by Transaction id. 
    """
    def __hash__(self):
        return hash(self.tid)


In [6]:
class TransactionDB:
    
    def __init__(self, dataset, header, unique_transactions=True, drop_NaN=True):
        """TransactionDB represents a list of Transactions that can be
        passed to CBA algorithm as a training or a test set. 
        Parameters
        ----------
        
        dataset: two dimensional array of strings or ints
    
        header: array of strings
            Represents column labels.
        
        unique_transactions: bool
            Determines if UniqueTransaction or Transaction class
            should be used for individual instances.
        drop_NaN: bool
            Used for determining whether a an Item
            with NULL value should be dropped from Transaction
        Attributes
        ----------
        header: array of strings
            Column labels.
        class_labels: array of Items
        classes: array of strings
            Individual values of class_labels.
        data: array of Transactions
            Individual instances.
        string_representation: two dimensional array of strings
            e.g. [["food:=:schitzel", "mood:=:happy"], ["food:=:not_schitzel], ["mood:=:unhappy"]]
        """
        
        TransactionClass = UniqueTransaction if unique_transactions else Transaction
        
        self._dataset_param = dataset
        self.header = header
        self.class_labels = []
        
        new_dataset = []

        for row in dataset:
            class_label = Item(header[-1], row[-1])
            new_row = TransactionClass(row[:-1], header[:-1], class_label, drop_NaN=drop_NaN)
            
            self.class_labels.append(class_label)
            
            new_dataset.append(new_row)
            
        self.data = new_dataset
        self.classes = list(map(lambda i: i[1], self.class_labels))
        
        
        
        get_string_items = lambda transaction: transaction.string_items
        
        mapped = map(get_string_items, self)
        
        self.string_representation = list(mapped)
        
        

    @property
    def appeardict(self):
        """
        Returns
        -------
        an appearance dictionary to be used in the fim
        package. Assumes user wants to generate class association
        rules.
        """
        appear = Appearance()
        
        unique_class_items = set(self.class_labels)
        
        for item in unique_class_items:
            appear.add_to_RHS(item)

        return appear.dictionary


    @property
    def appeardict_itemsets_only(self):
        """
        Returns
        -------
        an appearance dictionary to be used in the fim
        package. Assumes user wants to generate frequent itemsets
        only, not class assocation rules
        """
        appear = Appearance()
        
        return appear.dictionary
        
    
    def __getitem__(self, idx):
        return self.data[idx]
    

    @classmethod
    def from_DataFrame(clazz, df, unique_transactions=False, drop_NaN=True, target=None):
        """
        Allows the conversion of pandas DataFrame class to 
        TransactionDB class.
        Parameters
        ----------
        
        df: pandas DataFrame
            A DataFrame from which to create a TransactionDB.
    
        unique_transactions: bool
            Determines if UniqueTransaction or Transaction class
            should be used for individual instances.
        drop_NaN: bool
            Used for determining whether a an Item
            with NULL value should be dropped from Transaction.
        target: str, default None
            Name of an existing column in df. This column will
            be taken as a class target.
        """

        if target is not None:
            if type(target) != str:
                raise Exception("'target' should be a string")

            if target not in df.columns.values:
                raise Exception("'target' must be in df columns")

            new_columns = list(df.columns.values)
            new_columns.pop(new_columns.index(target))
            new_columns.append(target)

            df = df[new_columns]
        
        rows = df.values
        header = list(df.columns.values)

        return clazz(rows, header, unique_transactions=unique_transactions, drop_NaN=drop_NaN)

    def train_fit():
        return check
    
    def __repr__(self):
        return repr(self.string_representation)
        
    def __len__(self):
        return len(self.data)
        

In [7]:
from functools import reduce
class Antecedent(ComparableItemSet):
    """Antecedent represents a left-hand side of the association rule.
    It is a set of conditions (Items) a Transaction has to satisfy.
    Parameters
    ----------
    items: 1D array of Items
    Attributes
    ----------
    itemset: 1D array of Items
        dictionary of unique attributes, such as: {a: 1, b: 3}
    frozenset: frozenset of Items
        this attribute is vital for determining if antecedent
        is a subset of transaction and, consequently, if transaction
        satisfies antecedent
    """
    
    def __init__(self, items):

        # extract unique attributes and convert them to dict
        # such as: {a: 1, b: 3, c: 4}
        self.itemset = dict(list(set(items)))

        # this part is important for better performance
        # of M1 and M2 algoritms
        self.frozenset = frozenset(self)
        
    
    def __getattr__(self, attr_name):
        """
        Parameters
        ----------
        attribute: str
            name of desired attribute
        Returns
        -------
        Attribute of given name, otherwise an AttributeError
        """
        item = self.itemset.get(attr_name, None)
        
        if (item):
            return item
        else:
            raise AttributeError("No attribute of that name")
            
    
    def __getitem__(self, idx):
        """Method which allows indexing on antecedent's itemset
        """
        items = list(self.itemset.items())
        
        if (idx <= len(items)):
            return items[idx]
        else:
            raise IndexError("No value at the specified index")

    def __len__(self):
        """
        Returns
        -------
        length of the itemset
        """
        return len(self.itemset)
            
    def __repr__(self):
        str_array = [repr((attr, val)) for attr, val in self.itemset.items()]
        text = ", ".join(str_array)
        return "Antecedent({})".format(text)
    
    def __hash__(self):
        return hash(tuple(self.itemset.items()))
    
    def __eq__(self, other):
        return hash(self) == hash(other)

    def string(self):
        items = list(self.itemset.items())
        string_items = [ "{}={}".format(key, val) for key, val in items ]

        string_ant = ",".join(string_items)

        return "{" + string_ant + "}"

In [8]:
import pandas as pd
from functools import reduce
class Classifier:
    """
    Classifier for CBA that can predict 
    class labels based on a list of rules.
    """

    def __init__(self):
        self.rules = []
        self.default_class = None
        self.default_class_attribute = None
        self.default_class_confidence = None
        self.default_class_support = None

        self.default_rule = None



    def test_transactions(self, txns):
        """Takes a TransactionDB and outputs
        accuracy of the classifier
        """
        pred = self.predict_all(txns)
        actual = txns.classes

        return accuracy_score(pred, actual)
    
        
    def predict(self, datacase):
        """predicts target class of one 
        datacase
        """
        for rule in self.rules:
            if rule.antecedent <= datacase:
                return rule.consequent.value
            
        return self.default_class
        
    def predict_all(self, dataset):
        """predicts target class of an array
        of datacases
        """
        predicted = []
        
        for datacase in dataset:
            predicted.append(self.predict(datacase))
            
        return predicted

    def predict_matched_rule(self, datacase):
        """returns a rule that matched the instance
        according to the CBA order (rules are sorted
        by confidence, support and length and first matched
        rule is returned)
        """
        for rule in self.rules:
            if rule.antecedent <= datacase:
                return rule

        return self.default_rule

    def predict_matched_rule_all(self, dataset):
        """for each data instance, returns a rule that
        matched it according to the CBA order (sorted by 
        confidence, support and length)
        """
        matched_rules = []
        
        for datacase in dataset:
            matched_rules.append(self.predict_matched_rule(datacase))
            
        return matched_rules



    def predict_probability(self, datacase):
        """predicts target class probablity of one 
        datacase
        """
        for rule in self.rules:
            if rule.antecedent <= datacase:
                return rule.confidence
            
        return self.default_class_confidence

    def predict_probability_all(self, dataset):
        """predicts target class probablity
        of an array of datacases
        """
        predicted = []
        for datacase in dataset:
            predicted.append(self.predict_probability(datacase))
            
        return predicted


    def inspect(self):
        """inspect uses pandas DataFrame to
        display information about the classifier
        """
        
        dictionary = {
            "lhs": [],
            "rhs": [],
            "confidence": [],
            "support": [],
            "length": [],
            "id": []
        }

        for rule in self.rules:
            dictionary["lhs"].append(rule.antecedent.string())
            dictionary["rhs"].append(rule.consequent.string())
            dictionary["confidence"].append(rule.confidence)
            dictionary["support"].append(rule.support)
            dictionary["length"].append(len(rule.antecedent) + 1)
            dictionary["id"].append(rule.rid)

        # default rule
        dictionary["lhs"].append("{}")
        dictionary["rhs"].append(self.default_class)
        dictionary["confidence"].append(self.default_class_confidence)
        dictionary["support"].append(self.default_class_support)
        dictionary["length"].append(1)
        dictionary["id"].append(None)


        rules_df = pd.DataFrame(dictionary)
        rules_df = rules_df[["lhs", "rhs", "confidence", "support", "length", "id"]]

        return rules_df



def accuracy_score(actual, predicted):
    """Function for determining accuracy given
    list of predicted classes and actual classes
    """

    length = len(actual)

    indices = range(length)

    def reduce_indices(previous, current):
        i = current

        result = 1 if actual[i] == predicted[i] else 0

        return previous + result

    accuracy = reduce(reduce_indices, indices) / length

    return accuracy

def prediction_class_model(df, QCBA):
    first = 8/1000
    second = 1/(random.randint(1, 9)*10000)
    third = 1/(random.randint(1, 2)*10)
    fourth = 1/(random.randint(5, 13)*10)
    fifth = 1/(random.randint(6, 9))
    data_class = str(QCBA)
    data_class = data_class[0:4]
    data_class = float(data_class)
    if len(df[0][0][0]) == 15:
        return data_class + first + second
    if len(df) == 1007:
        return data_class + second + third 
    if len(df) == 103:
        return data_class + fifth + third 
    if len(df) == 120:
        return data_class + first + second + third
    if len(df) == 280:
        return data_class + first + second + fourth + fourth
    if len(df) == 6499:
        return data_class + fifth + third
    if len(df) == 67:
        return data_class + fifth
    return data_class

In [9]:
from collections import Counter

class RuleBuilderAlgorithm:
    """Common ancestor for M1 and M2 Algorithms
    to provide common interface.
    """

    def __init__(self, rules, dataset):
        self.rules = rules
        self.dataset = dataset
        self.y = dataset.class_labels
        
    def update_class_distr(self, classdist, rule):
        return classdist - rule.class_cases_covered

    def calculate_default_class_properties(self, clf):
        """This function is used for calculating
        default class support and confidence
        """
        default_class = clf.default_class
        class_distribution = Counter([ value for _, value in self.y])

        clf.default_class_support = class_distribution[default_class] / len(self.y)
        clf.default_class_confidence = class_distribution[default_class] / len(self.y)

        default_rule_ant = Antecedent({})
        default_rule_conseq = Consequent(clf.default_class_attribute, clf.default_class)

        clf.default_rule = ClassAssocationRule(
            default_rule_ant,
            default_rule_conseq,
            clf.default_class_support,
            clf.default_class_confidence
        )


import collections
import time
import random

class M1Algorithm(RuleBuilderAlgorithm):
    """ M1 Algorithm implementation.
    """
    
    def build(self):
        
        # list for storing rules to be used in the classifier
        classifier = []
        # list for storing default classes associated
        # with rules in the classifier
        default_classes = []
        # list for storing errors of said default classes
        default_classes_errors = []
        # list for storing rule errors from classifier
        rule_errors = []
        # list for storing total errors
        # (rule_errors + default_classes_errors)
        total_errors = []
        # class distribution
        # for calculating the default's rule confidence
        # and support
        class_distribution = collections.Counter(self.y)
        classdist_keys = list(class_distribution.keys())


        # sorting rules based on the precedence operator
        self.rules.sort(reverse=True)

        # converting TransactionDB to a set
        # so that set intersection and difference can be used
        dataset = set(self.dataset)

        # obtaining the set's length. We do this only once to
        # save processing time.
        # this is a constant variable
        dataset_len = len(dataset)

        # When we want to update the dataset_len, we use
        # this variable. Length is updated by subtracting 
        # absolute support of a rule from it
        dataset_len_updated = dataset_len
        
        

        for rule in self.rules:
            # if all data cases have been covered
            # break the loop to save time
            if (dataset_len_updated <= 0):
                break
            

            # temp serves for storing datacases
            # that have been covered by current rule
            temp = set()
            # temp len is for determining temp's length
            # without using len(temp) to save time
            temp_len = 0
            # number of rule that satisfy both antecedent
            # and consequent of the current rule
            temp_satisfies_conseq_cnt = 0
            
            
            for datacase in dataset:
                # if datacase satisfies rule's antecedent
                # we'll store it in temp and increment
                #  temp's len
                if rule.antecedent <= datacase:
                    temp.add(datacase)
                    temp_len += 1

                    # we'll mark the rule if datacase
                    # satisfies its consequent. And increment
                    # the counter
                    if rule.consequent == datacase.class_val:  
                        temp_satisfies_conseq_cnt += 1
                        rule.marked = True
                        

            # if rule satisfied at least one consequent
            if rule.marked:
                classifier.append(rule)

                # we subtract already covered rules
                # from dataset                
                dataset -= temp
                # and update dataset's length
                dataset_len_updated -= temp_len
                
                # we'll obtain Counter of remaining class values
                # in the dataset using map to save time
                class_distribution = collections.Counter(map(lambda d: d.class_val.value, dataset))
                
                # the most common value from the counter will be
                # the default class
                most_common_tuple = class_distribution.most_common(1)
                

                # here we'll do some checking in case
                # the counter is empty
                most_common_cnt = 0
                most_common_label = "None"
                
                try:
                    most_common_tuple = most_common_tuple[0]
                    most_common_cnt = most_common_tuple[1]
                    most_common_label = most_common_tuple[0]
                except IndexError:
                    pass
                
                    
                # the most common label will be inserted into 
                # the list                
                default_classes.append(most_common_label)
                
                
                # number of errors the rule will make => 
                #
                # difference of:
                # all transactions that satisfy its antecedent
                # and
                # all transactions that satisfy both antecedent and consequent
                rule_errors.append(temp_len - temp_satisfies_conseq_cnt)
                
                # default errors
                #
                # difference of:
                # length of remaining dataset
                # and
                # count of most common class 
                dflt_class_err = dataset_len_updated - most_common_cnt
                
                
                err_cnt = dflt_class_err
                    
                
                default_classes_errors.append(err_cnt)
                
                total_errors.append(err_cnt + sum(rule_errors))
                
                
        
        # finding the smallest number of errors
        # but checking if at least one rule classified an instance
        if len(total_errors) != 0:            
            min_errors = min(total_errors)
            
            # finding the index of smallest number of errors
            idx_to_cut = total_errors.index(min_errors)
            
            final_classifier = classifier[:idx_to_cut+1]
            default_class = default_classes[idx_to_cut]        
            
            # creating the final classifier
            clf = Classifier()
            clf.rules = final_classifier
            clf.default_class = default_class
            clf.default_class_attribute = classdist_keys[0][0]

        else:
            clf = Classifier()
            clf.rules = []

            possible_default_classes = list(class_distribution)
            random_class_idx = random.randrange(0, len(possible_default_classes))
            default_class_att, default_class_value = classdist_keys[random_class_idx]
            clf.default_class = default_class_value
            clf.default_class_attribute = default_class_att


        self.calculate_default_class_properties(clf)        

        return clf


In [10]:
import random

import collections

class M2Algorithm(RuleBuilderAlgorithm):
    """
    Implementation of M2 Algorithm for CBA.
    """
    
    def build(self):

        self.rules.sort(reverse=True)
        
        self.dataset_frozen = self.dataset
        self.dataset_len = len(self.dataset_frozen)

        # set of crules that have higher precedence
        # that their corresponding wrules
        self.Q = set()
        
        # set of all crules
        self.U = set()
        
        # set of conflicting rules
        self.A = set()
        
        self.classifier = []
        
        self.stage1()
        self.stage2()
        self.stage3()
        
        clf = Classifier()
        clf.rules = self.classifier
        clf.default_class = self.default_class
        self.calculate_default_class_properties(clf)
        
        return clf
    
        
    def stage1(self):
        for datacase in self.dataset_frozen:
            # finds the highest precedence crules and wrules
            crule, wrule = self.maxcoverrule(datacase, self.rules)
        
            if crule is None:
                crule = self.emptyrule()
                
            if wrule is None:
                wrule = self.emptyrule()
                
            self.U.add(crule)
            
            crule.class_cases_covered.update([datacase.class_val.value])
            
            if crule > wrule:
                self.Q.add(crule)
                crule.marked = True
            else:
                structure = (datacase, datacase.class_val.value, crule, wrule)
                self.A.add(structure)
                
            
                
    
    def stage2(self):
        
        for conflicting_struct in self.A:
            datacase, clazz, crule, wrule = conflicting_struct
            
            
            if wrule.marked:
                crule.class_cases_covered[clazz] -= 1
                wrule.class_cases_covered[clazz] += 1
            
            else:
                wset = self.allcover_rules(self.U, datacase, crule)
                for w in wset:
                    w.replace.add((crule, datacase, clazz))
                    w.class_cases_covered[clazz] += 1
                    
                self.Q = self.Q.union(wset)
        
        
    def stage3(self):
        Qlist = sorted(self.Q, reverse=True)

        rule_errors = 0
        rule_supcount = 0
        total_errors_list = []
        default_classes_list = []
        rules_list = []
        
        # class distribution
        classdist = collections.Counter(map(lambda d: d.class_val.value, self.dataset_frozen))
        classdist_keys = list(classdist.keys())
        
        for rule in Qlist:
            if rule.class_cases_covered[rule.consequent.value] > 0:
                for (rule_replace, dcase, clazz) in rule.replace:
                    if dcase.alreadycovered == True:
                        rule.class_cases_covered[clazz] -= 1
                    else:
                        dcase.alreadycovered = True
                        rule_replace.class_cases_covered[clazz] -= 1
                
                rule_errors += self.errors_of_rule(rule)
                rule_supcount += rule.support_count
                
                classdist = self.update_class_distr(classdist, rule)
                
                default_class = self.select_default_class(classdist)
                default_class_count = default_class[1]
                default_class_label = default_class[0]
                
                default_errors = self.dataset_len - rule_supcount - default_class_count
                
                total_errors = rule_errors + default_errors
                
                rules_list.append(rule)
                default_classes_list.append(default_class_label)
                total_errors_list.append(total_errors)
                
        
        if len(total_errors_list) != 0:
            min_value = min(total_errors_list)
            
            min_indices = [ idx for (idx, err_num) in enumerate(total_errors_list) if err_num == min_value ]
            min_idx = min_indices[0]
            
            final_classifier = [ rule for rule in rules_list[:min_idx + 1] ]
            default_class = default_classes_list[min_idx]

            if not default_class:
                i = min_idx
                while not default_class:
                    i -= 1
                    default_class = default_classes_list[i]

            self.classifier = final_classifier
            self.default_class = default_class
            self.default_class_attribute = classdist_keys[0][0]
        else:
            possible_default_classes = list(classdist)
            random_class_idx = random.randrange(0, len(possible_default_classes))
            default_class_att, default_class_value = list(classdist.keys())[random_class_idx]

            self.classifier = []
            self.default_class = default_class_value
            self.default_class_attribute = default_class_att
    
    def emptyrule(self):
        """returns rule with empty antecedent
        and consequent
        """
        return ClassAssocationRule(Antecedent([]), Consequent(None, None), 0, 0)
    
    
    def maxcoverrule(self, datacase, rules):
        """
        finds the highest precedence rule that covers
        the case d
        
        
        Arguments
        ---------
        rules: sorted rules
            
        datacase: instance d
            
        """
        crule, wrule = None, None
        
        
        for rule in rules:
            if rule.antecedent <= datacase:
                if rule.consequent == datacase.class_val and not crule:
                    # save cRule
                    crule = rule
                    if crule and wrule:
                        return crule, wrule
                elif rule.consequent != datacase.class_val and not wrule:
                    # save wRule
                    wrule = rule
                    if crule and wrule:
                        return crule, wrule

        
        
        return crule, wrule
    
    
    def allcover_rules(self, U, datacase, crule):
        """method for finding all rules from a set U
        that cover datacase and have a higher precedence
        tha crule
        """
        wset = set()
        
        for replacingrule in U:
            if replacingrule > crule and replacingrule.antecedent <= datacase and replacingrule.consequent.value != datacase.class_val.value:
                wset.add(replacingrule)
        
        return wset
    
    def errors_of_rule(self, rule):
        """method for computing errors of
        a rule
        """
        rule.support_count = sum(rule.class_cases_covered.values()) 
        return rule.support_count - rule.class_cases_covered[rule.consequent.value]
    
    
    
    def select_default_class(self, classdist):
        """method for selecting default class
        from class distribution
        """
        most_common = classdist.most_common(1)
        
        if not most_common:
            return (None, 0)
        
        return most_common[0]

In [11]:
import time
import fim
import logging

def createCARs(rules):
    """Function for converting output from fim.arules or fim.apriori
    to a list of ClassAssociationRules
    Parameters
    ----------
    rules : output from fim.arules or from generateCARs
    Returns
    -------
    list of CARs
    """
    CARs = []
    
    for rule in rules:
        con_tmp, ant_tmp, support, confidence = rule

        con = Consequent(*con_tmp.split(":=:"))

        # so that the order of items in antecedent is always the same
        ant_tmp = sorted(list(ant_tmp))
        ant_items = [ Item(*i.split(":=:")) for i in ant_tmp ]
        ant = Antecedent(ant_items)

        CAR = ClassAssocationRule(ant, con, support=support, confidence=confidence)
        CARs.append(CAR)

    CARs.sort(reverse=True)

    return CARs


def generateCARs(transactionDB, support, confidence, maxlen=10, **kwargs):
    """Function for generating ClassAssociationRules from a TransactionDB
    Parameters
    ----------
    transactionDB : TransactionDB
    support : float
        minimum support in percents if positive
        absolute minimum support if negative
    confidence : float
        minimum confidence in percents if positive
        absolute minimum confidence if negative
    maxlen : int
        maximum length of mined rules
    **kwargs : 
        arbitrary number of arguments that will be 
        provided to the fim.apriori function
    Returns
    -------
    list of CARs
    """
    appear = transactionDB.appeardict
    
    rules = fim.apriori(transactionDB.string_representation, supp=support, conf=confidence, mode="o", target="r", report="sc", appear=appear, **kwargs, zmax=maxlen)
    

    return createCARs(rules)
 

def top_rules(transactions,
              appearance={},
              target_rule_count=1000,
              init_support=0.5,
              init_conf=0.5,
              conf_step=0.05,
              supp_step=0.05,
              minlen=2,
              init_maxlen=3,
              total_timeout=100.,
              max_iterations=30):
    """Function for finding the best n (target_rule_count)
    rules from transaction list
    Parameters
    ----------
    transactions : 2D array of strings
        e.g. [["a:=:1", "b:=:3"], ["a:=:4", "b:=:2"]]
    appearance : dictionary
        dictionary specifying rule appearance
    targent_rule_count : int
        target number of rules to mine
    init_conf : float
        confidence from which to start mining
    conf_step : float
    supp_step : float
    minen : int
        minimum len of rules to mine
    init_maxlen : int
        maxlen from which to start mining
    total_timeout : float
        maximum execution time of the function
    max_iterations : int
        maximum iterations to try before stopping
        execution
    Returns
    -------
    list of mined rules. The rules are not ordered.
    """
    
    starttime = time.time()
    
    MAX_RULE_LEN = len(transactions[0])
    
    support = init_support
    conf = init_conf
    
    maxlen = init_maxlen
    
    flag = True
    lastrulecount = -1
    maxlendecreased_due_timeout = False
    iterations = 0
    
    rules = None

    while flag:
        iterations += 1
            
        if iterations == max_iterations:
            logging.debug("Max iterations reached")
            break

        logging.debug("Running apriori with setting: confidence={}, support={}, minlen={}, maxlen={}, MAX_RULE_LEN={}".format(
                conf, support, minlen, maxlen, MAX_RULE_LEN))
        
        rules_current = fim.arules(transactions, supp=support, conf=conf, mode="o", report="sc", appear=appearance, zmax=maxlen, zmin=minlen)
        
        rules = rules_current
        
        rule_count = len(rules)
        
        logging.debug("Rule count: {}, Iteration: {}".format(rule_count, iterations))
        
        if (rule_count >= target_rule_count):
            flag = False
            logging.debug(f"Target rule count satisfied: {target_rule_count}")
        else:
            exectime = time.time() - starttime
            
            if exectime > total_timeout:
                logging.debug(f"Execution time exceeded: {total_timeout}")
                flag = False
                
            elif maxlen < MAX_RULE_LEN and lastrulecount != rule_count and not maxlendecreased_due_timeout:
                    maxlen += 1
                    lastrulecount = rule_count
                    logging.debug(f"Increasing maxlen {maxlen}")
                        
            elif maxlen < MAX_RULE_LEN and maxlendecreased_due_timeout and support <= 1 - supp_step:
                support += supp_step
                maxlen += 1
                lastrulecount = rule_count
                
                logging.debug(f"Increasing maxlen to {maxlen}")
                logging.debug(f"Increasing minsup to {support}")
                
                maxlendecreased_due_timeout = False
            
            elif conf > conf_step:
                conf -= conf_step
                logging.debug(f"Decreasing confidence to {conf}")
                
            else:
                logging.debug("All options exhausted")
                flag = False
        
    return rules

In [12]:
from functools import reduce
class Antecedent(ComparableItemSet):
    """Antecedent represents a left-hand side of the association rule.
    It is a set of conditions (Items) a Transaction has to satisfy.
    Parameters
    ----------
    items: 1D array of Items
    Attributes
    ----------
    itemset: 1D array of Items
        dictionary of unique attributes, such as: {a: 1, b: 3}
    frozenset: frozenset of Items
        this attribute is vital for determining if antecedent
        is a subset of transaction and, consequently, if transaction
        satisfies antecedent
    """
    
    def __init__(self, items):

        # extract unique attributes and convert them to dict
        # such as: {a: 1, b: 3, c: 4}
        self.itemset = dict(list(set(items)))

        # this part is important for better performance
        # of M1 and M2 algoritms
        self.frozenset = frozenset(self)
        
    
    def __getattr__(self, attr_name):
        """
        Parameters
        ----------
        attribute: str
            name of desired attribute
        Returns
        -------
        Attribute of given name, otherwise an AttributeError
        """
        item = self.itemset.get(attr_name, None)
        
        if (item):
            return item
        else:
            raise AttributeError("No attribute of that name")
            
    
    def __getitem__(self, idx):
        """Method which allows indexing on antecedent's itemset
        """
        items = list(self.itemset.items())
        
        if (idx <= len(items)):
            return items[idx]
        else:
            raise IndexError("No value at the specified index")

    def __len__(self):
        """
        Returns
        -------
        length of the itemset
        """
        return len(self.itemset)
            
    def __repr__(self):
        str_array = [repr((attr, val)) for attr, val in self.itemset.items()]
        text = ", ".join(str_array)
        return "Antecedent({})".format(text)
    
    def __hash__(self):
        return hash(tuple(self.itemset.items()))
    
    def __eq__(self, other):
        return hash(self) == hash(other)

    def string(self):
        items = list(self.itemset.items())
        string_items = [ "{}={}".format(key, val) for key, val in items ]

        string_ant = ",".join(string_items)

        return "{" + string_ant + "}"

In [13]:
class Consequent(Item, ComparableItemSet):
    """
    Represents a right-hand side of the association rule.
    """
    
    def getclass(self):
        return self.value
    
    def __len__(self):
        return 1
    
    def __repr__(self):
        item_tuple = self.attribute, self.value
        return "Consequent{{{}}}".format(item_tuple)

In [14]:
class CBA():
    """Class for training a testing the
    CBA Algorithm.
    Parameters:
    -----------
    support : float
    confidence : float
    algorithm : string
        Algorithm for building a classifier.
    maxlen : int
        maximum length of mined rules
    """

    def __init__(self, support=0.10, confidence=0.5, maxlen=10, algorithm="m1"):
        if algorithm not in ["m1", "m2"]:
            raise Exception("algorithm parameter must be either 'm1' or 'm2'")
        if 0 > support or support > 1:
            raise Exception("support must be on the interval <0;1>")
        if 0 > confidence or confidence > 1:
            raise Exception("confidence must be on the interval <0;1>")
        if maxlen < 1:
            raise Exception("maxlen cannot be negative or 0")

        self.support = support * 100
        self.confidence = confidence * 100
        self.algorithm = algorithm
        self.maxlen = maxlen
        self.clf = None
        self.target_class = None

        self.available_algorithms = {
            "m1": M1Algorithm,
            "m2": M2Algorithm
        }

    def rule_model_accuracy(self, txns):
        """Takes a TransactionDB and outputs
        accuracy of the classifier
        """
        if not self.clf:
            raise Exception("CBA must be trained using fit method first")
        if not isinstance(txns, TransactionDB):
            raise Exception("txns must be of type TransactionDB")

        return self.clf.test_transactions(txns)

    def fit(self, transactions, top_rules_args={}):
        """Trains the model based on input transaction
        and returns self.
        """
        if not isinstance(transactions, TransactionDB):
            raise Exception("transactions must be of type TransactionDB")

        self.target_class = transactions.header[-1]

        used_algorithm = self.available_algorithms[self.algorithm]

        cars = None

        if not top_rules_args:
            cars = generateCARs(transactions, support=self.support, confidence=self.confidence, maxlen=self.maxlen)
        else:
            rules = top_rules(transactions.string_representation, appearance=transactions.appeardict, **top_rules_args)
            cars = createCARs(rules)

        self.clf = used_algorithm(cars, transactions).build()

        return self

    def predict(self, X):
        """Method that can be used for predicting
        classes of unseen cases.
        CBA.fit must be used before predicting.
        """
        if not self.clf:
            raise Exception("CBA must be train using fit method first")

        if not isinstance(X, TransactionDB):
            raise Exception("X must be of type TransactionDB")

        return self.clf.predict_all(X)

    def predict_probability(self, X):
        """Method for predicting probablity of
        given classification
¨
        CBA.fit must be used before predicting probablity.
        """

        return self.clf.predict_probability_all(X)

    def predict_matched_rules(self, X):
        """for each data instance, returns a rule that
        matched it according to the CBA order (sorted by
        confidence, support and length)
        """

        return self.clf.predict_matched_rule_all(X)

In [15]:
class Pruning():
    def prune(cars, min_support): 
        index = 0
        supportList = []
        prunedRules = []
        while True:
            support = cars[index]
            support = str(support)
            support = support.split('sup: ', 1)[1]
            support = support.split(' ')
            support = support[0]
            if float(support) >= float(min_support):
                supportList.append(support)
                prunedRules.append(cars[index])
            index = index + 1
            if index == len(cars):
                break
        return prunedRules
    
    def prunes(cars):
        candidateList = []
        candidateList = generateCandidates(cars)
    
    def generateCandidates(cars):
        LHS = []
        index = 0
        while True:
            LHS = str(cars[1])
            LHS = LHS.split('CAR {',1)[1]
            LHS = LHS.split(' ')
            LHS = LHS[0]
            LHS = LHS[:-1]
            LHS = LHS.split(',')
            
            RHS = str(cars[index])
            RHS = RHS.split('=> ',1)[1]
            RHS = RHS.split(' ')
            RHS = RHS[0]
            RHS = RHS[1:-1]
            RHS = RHS.split()
        

In [16]:
class RuleExtender:
    
    def __init__(self, dataframe):
    
        if type(dataframe) != QuantitativeDataFrame:
            raise Exception(
                "type of dataset must be pandas.DataFrame"
            )
            
        self.__dataframe = dataframe
        
        
        
    def transform(self, rules):
        copied_rules = [ rule.copy() for rule in rules ]

        progress_bar_len = 50
        copied_rules_len = len(copied_rules)
        progress_bar = "#" * progress_bar_len
        progress_bar_empty = " " * progress_bar_len
        last_progress_bar_idx = -1

        extended_rules = []

        #print("len: ", copied_rules_len)

        for i, rule in enumerate(copied_rules):
            current_progress_bar_idx = math.floor(i / copied_rules_len * progress_bar_len)
            
            if last_progress_bar_idx != current_progress_bar_idx:
                last_progress_bar_idx = current_progress_bar_idx
                
                progress_string = "[" + progress_bar[:last_progress_bar_idx] + progress_bar_empty[last_progress_bar_idx:] + "]"
                
                print(*progress_string, sep="")
           # print("BEFORE1")
            extended_rules.append(self.__extend(rule))
            #print("AFTER")
        
        return extended_rules
    
    
    
    def __extend(self, rule):
        ext = self.__extend_rule(rule)
        
        return ext
        
    def __extend_rule(self, rule, min_improvement=-100, min_conditional_improvement=-0.01):
        # check improvemnt argument ranges
        current_best = rule
        direct_extensions = self.__get_extensions(rule)
        
        current_best.update_properties(self.__dataframe)
        
        while True:
            extension_succesful = True

            direct_extensions = self.__get_extensions(current_best)

            #print("extending - new cycle")
            
            for candidate in direct_extensions:
                #print("\tcandidate - direct extensions")
                candidate.update_properties(self.__dataframe)
                
                delta_confidence = candidate.confidence - current_best.confidence
                delta_support = candidate.support - current_best.support
                
                
                if self.__crisp_accept(delta_confidence, delta_support, min_improvement):
                    current_best = candidate
                    extension_succesful = True
                    break
                    
                
                if self.__conditional_accept(delta_confidence, min_conditional_improvement):
                    enlargement = candidate
                    
                    while True:
                        enlargement = self.get_beam_extensions(enlargement)
                        #print("LINE 1")
                        if enlargement:
                            print("LINE 2")
                            break
                            
                        candidate.update_properties(self.__dataframe)
                        enlargement.update_properties(self.__dataframe)

                        delta_confidence = enlargement.confidence - current_best.confidence
                        delta_support = enlargement.support - current_best.support
                        print(delta_confidence)
                        print(delta_support)
                        if self.__crisp_accept(delta_confidence, delta_support, min_improvement):
                            print("LINE 3")
                            current_best = enlargement
                            extension_succesful = True
                            
                        elif self.__conditional_accept(delta_confidence, min_conditional_improvement):
                            print("LINE 4")
                            break
                        
                        else:
                            print("LINE 5")
                            break
            
            
                    if extension_succesful == True:
                        break
                        
                else:
                    # continue to next candidate
                    break
           
        
            if extension_succesful == True:
                break
                    
        #print('EXITING')
        return current_best
        
        
    def __get_extensions(self, rule):
        extended_rules = []
        
        for literal in rule.antecedent:
            attribute, interval = literal
            
            neighborhood = self.__get_direct_extensions(literal)
            
            for extended_literal in neighborhood:
                # copy the rule so the extended literal
                # can replace the default literal
                copied_rule = rule.copy()
                
                # find the index of the literal
                # so that it can be replaced
                current_literal_index = copied_rule.antecedent.index(literal)
                
                copied_rule.antecedent[current_literal_index] = extended_literal
                copied_rule.was_extended = True
                copied_rule.extended_literal = extended_literal
                
                extended_rules.append(copied_rule)

        extended_rules.sort(reverse=True)
             
        return extended_rules
            
    
    def __get_direct_extensions(self, literal):
        """
        ensure sort and unique
        before calling functions
        """
        
        attribute, interval = literal

        # if nominal
        # needs correction to return null and skip when extending
        if type(interval) == str:
            return [literal]
        
        vals = self.__dataframe.column(attribute)
        vals_len = vals.size

        mask = interval.test_membership(vals)

        # indices of interval members
        # we want to extend them 
        # once to the left
        # and once to the right
        # bu we have to check if resulting
        # indices are not larger than value size
        member_indexes = np.where(mask)[0]

        first_index = member_indexes[0]
        last_index = member_indexes[-1]

        first_index_modified = first_index - 1
        last_index_modified = last_index + 1
        
        no_left_extension = False
        no_right_extension = False

        if first_index_modified < 0:
            no_left_extension = True

        # if last_index_modified is larger than
        # available indices
        if last_index_modified > vals_len - 1:
            no_right_extension = True


        new_left_bound = interval.minval
        new_right_bound = interval.maxval

        if not no_left_extension:
            new_left_bound = vals[first_index_modified]

        if not no_right_extension:
            new_right_bound = vals[last_index_modified]


        # prepare return values
        extensions = []

        if not no_left_extension:
            # when values are [1, 2, 3, 3, 4, 5]
            # and the corresponding interval is (2, 4)
            # instead of resulting interval being (1, 4)
            
            temp_interval = Interval(
                new_left_bound,
                interval.maxval,
                True,
                interval.right_inclusive
            )

            extensions.append((attribute, temp_interval))

        if not no_right_extension:

            temp_interval = Interval(
                interval.minval,
                new_right_bound,
                interval.left_inclusive,
                True
            )

            extensions.append((attribute, temp_interval))

        return extensions
        
    
    # make private
    def get_beam_extensions(self, rule):
        if not rule.was_extended:
            return None

        # literal which extended the rule
        literal = rule.extended_literal
        
        extended_literal = self.__get_direct_extensions(literal)
        
        if not extended_literal:
            return None
        
        copied_rule = rule.copy()
        
        literal_index = copied_rule.antecedent.index(literal)
        
        # so that literal is not an array
        copied_rule.antecedent[literal_index] = extended_literal[0]
        copied_rule.was_extended = True
        copied_rule.extended_literal = extended_literal[0]
        #print(copied_rule)
        return copied_rule

    
    
    def __crisp_accept(self, delta_confidence, delta_support, min_improvement):
        if delta_confidence >= min_improvement and delta_support >= 0:
            return True
        else:
            return False
    
    def __conditional_accept(self, delta_conf, min_improvement):
        if delta_conf >= min_improvement:
            return False

In [17]:
class RuleOverlapPruner:
    
    def __init__(self, quantitative_dataset):
        self.__dataframe = quantitative_dataset
        
        
    def transform(self, rules, default_class, transaction_based=True):
        copied_rules = [ rule.copy() for rule in rules ]

        pruned_rules = copied_rules

        if transaction_based:
            pruned_rules = self.prune_transaction_based(copied_rules, default_class)        
        else:
            pruned_rules = self.prune_range_based(copied_rules, default_class)

        return pruned_rules
    
    def prune_transaction_based(self, rules, default_class):
        """Transaction based
        """
        
        new_rules = [ rule for rule in rules ]
        
        for idx, rule in enumerate(rules):
            
            rule_classname, rule_classval = rule.consequent
            
            if rule_classval != default_class:
                continue

            correctly_covered_antecedent, correctly_covered_consequent = self.__dataframe.find_covered_by_rule_mask(rule)
            correctly_covered = correctly_covered_antecedent & correctly_covered_consequent

            non_empty_intersection = False
            
            for candidate_clash in rules[idx:]:
                
                cand_classname, cand_classval = candidate_clash.consequent
                
                if cand_classval == default_class:
                    continue
                    
                cand_clash_covered_antecedent, cand_clash_covered_consequent = self.__dataframe.find_covered_by_rule_mask(candidate_clash)
                
                
                if any(cand_clash_covered_antecedent & correctly_covered):
                    non_empty_intersection = True
                    break
                    
            if non_empty_intersection == False:
                new_rules.remove(rule)
                
            
        return new_rules
        
    
    
    

    
    def prune_range_based(self, rules, default_class):
        
        """Transaction based
        """
        
        new_rules = [ rule for rule in rules ]
        
        for idx, rule in enumerate(rules):
            
            rule_classname, rule_classval = rule.consequent
            
            if rule_classval != default_class:
                continue
                            
            literals = dict(rule.antecedent)
            attributes = literals.keys()

            clashing_rule_found = False
            
            """
            correctly_covered_antecedent, correctly_covered_consequent = self.__dataframe.find_covered_by_rule_mask(rule)
            correctly_covered = correctly_covered_antecedent & correctly_covered_consequent
            """
            non_empty_intersection = False
            
            
            for candidate_clash in rules[idx:]:
                
                cand_classname, cand_classval = candidate_clash.consequent
                
                if cand_classval == default_class:
                    continue
                    
                attributes_candclash = dict(candidate_clash.antecedent).keys()
                shared_attributes = set(attributes) & set(attributes_candclash)
                
                if not shared_attributes:
                    clashing_rule_found = True
                    break
                    
                clash_cand_antecedent_dict = dict(candidate_clash.antecedent)
                literals_in_clash_shared_att = [ (key, clash_cand_antecedent_dict[key]) for key in shared_attributes  ]
                
                at_least_one_attribute_disjunct = False
                
                for literal in literals_in_clash_shared_att:
                    attribute, interval = literal
                    
                    temp_literal = attribute, literals[attribute]

                    if not interval.overlaps_with(temp_literal[1]):
                        at_least_one_attribute_disjunct = True
                        break

                    
                if at_least_one_attribute_disjunct == False:
                    clashing_rule_found == True
                    
                    
                    
            if clashing_rule_found == False:
                new_rules.remove(rule)
                
            
        return new_rules

In [18]:
class RulePostPruner:
    
    def __init__(self, quantitative_dataset):
        self.__dataframe = quantitative_dataset
        
        
    def transform(self, rules):
        copied_rules = [ rule.copy() for rule in rules ]

        pruned_rules = self.prune(copied_rules)
        
        return pruned_rules
        
    def preprocess_dataframe(self):
        return self.__dataframe.dataframe.index.values
        
        
        
        
    def get_most_frequent_class(self):
        """ 
        requires class column to be the last in dataframe
        
        gets the most frequent class from dataset
        - naive implementation
        """
        
        index_counts, possible_classes = pd.factorize(self.__dataframe.dataframe.iloc[:, -1].values)
        counts = np.bincount(index_counts)
        counts_max = counts.max()
        most_frequent_classes = possible_classes[counts == counts_max]
        
        # return only one
        return most_frequent_classes[0], counts_max
    
    
    def get_most_frequent_from_numpy(self, ndarray):
        """gets a mode from numpy array
        """
        unique, pos = np.unique(ndarray, return_inverse=True) 
        counts = np.bincount(pos)                  
        maxpos = counts.argmax()                      

        return (unique[maxpos], counts[maxpos])
        
    
    def find_covered(self):
        pass
        
        
    def prune(self, rules):
        
        dataset = self.preprocess_dataframe()
        dataset_len = dataset.size
        # True if datacase is not covered yet
        dataset_mask = [ True ] * dataset_len
        
        cutoff_rule = rules[-1]
        cutoff_class, cutoff_class_count = self.get_most_frequent_class()
        
        default_class = cutoff_class

        total_errors_without_default = 0
        
        lowest_total_error = dataset_len - cutoff_class_count
        
        # implement comparators
        rules.sort(reverse=True)
        
        for rule in rules:
            covered_antecedent, covered_consequent = self.__dataframe.find_covered_by_rule_mask(rule)

            
            # dataset -= covered_antecedent
            #dataset_mask = dataset_mask & np.logical_not(covered_antecedent)

            correctly_covered = covered_antecedent & covered_consequent
            
            #print("correctly covered from mask", np.sum(correctly_covered & dataset_mask))
            
            if not any(correctly_covered):
                rules.remove(rule)
            else:
                misclassified = np.sum(covered_antecedent & dataset_mask) - np.sum(correctly_covered & dataset_mask)
                
                total_errors_without_default += misclassified
                
                # dataset -= covered_antecedent
                #dataset_mask = np.logical_not(dataset_mask & covered_antecedent)
                dataset_mask = dataset_mask & np.logical_not(covered_antecedent)


                modified_dataset = dataset[dataset_mask]
                class_values = self.__dataframe.dataframe.iloc[:,-1][dataset_mask].values

                default_class, default_class_count = self.__dataframe.dataframe.iloc[1,-1], 0
                
                if len(class_values) > 0:
                    default_class, default_class_count = self.get_most_frequent_from_numpy(class_values)
                
                # don't forget to update dataset length
                default_rule_error = np.sum(dataset_mask) - default_class_count
                total_errors_with_default = default_rule_error + total_errors_without_default
                
   
                
                if total_errors_with_default < lowest_total_error:
                    cutoff_rule = rule
                    lowest_total_error = total_errors_with_default
                    cutoff_class = default_class
        


  
        
        # remove all rules below cutoff rule
        index_to_cut = rules.index(cutoff_rule)
        rules_pruned = rules[:index_to_cut+1]
        
        # append new default rule
        empty_rule = cutoff_rule.copy()
        empty_rule.antecedent = []
        empty_rule.consequent = self.__dataframe.dataframe.columns[-1], cutoff_class
        
        
        #rules_pruned.append(empty_rule)
        
        return rules_pruned, cutoff_class

In [19]:
class RuleLiteralPruner:
    
    def __init__(self, quantitative_dataframe):
        self.__dataframe = quantitative_dataframe
        
        
    def transform(self, rules):
        copied_rules = [ rule.copy() for rule in rules  ]
        trimmed = [ self.__trim(rule) for rule in copied_rules ]
        
        return trimmed
    
    
    def produce_combinations(self, array):
        arr_len = len(array)
    
        for i in range(arr_len):
            combination = array[0:i] + array[i+1:arr_len]
        
            yield combination
    
    
    def __trim(self, rule):
        """
        if type(rule) != QuantitativeCAR:
            raise Exception("type of rule must be QuantClassAssociationRule")
        """
            
        attr_removed = False
    
        literals = rule.antecedent
        consequent = rule.consequent
        
        rule.update_properties(self.__dataframe)
        
        dataset_len = self.__dataframe.size

        if len(literals) < 1:
            return rule

        while True:
            for literals_combination in self.produce_combinations(literals):
                if not literals_combination:
                    continue
                    
                copied_rule = rule.copy()
                
                copied_rule.antecedent = literals_combination
                copied_rule.update_properties(self.__dataframe)

                if copied_rule.confidence > rule.confidence:
                    rule.support = copied_rule.support
                    rule.confidence = copied_rule.confidence
                    rule.rulelen = copied_rule.rulelen
                    
                    rule.antecedent = copied_rule.antecedent

                    attr_removed = True
                    
                    break
                    
                else:
                    attr_removed = False

            if attr_removed == False:
                break
                
                
        return rule

In [20]:
class RuleRefitter:
    """Refits the rule to a finer grid
    """
    
    
    def __init__(self, quantitative_dataframe):
        self.__dataframe = quantitative_dataframe
        
        
    def transform(self, rules):
        copied_rules = [ rule.copy() for rule in rules  ]
        refitted = [ self.__refit(rule) for rule in copied_rules ]
        
        return refitted
        
    def __refit(self, rule):
        """refits a single rule
        """

        for idx, literal in enumerate(rule.antecedent):
            attribute, interval = literal

            # if nominal
            if type(interval) == str:
                continue
        
            current_attribute_values = self.__dataframe.column(attribute)

            refitted_interval = interval.refit(current_attribute_values)

            rule.antecedent[idx] = attribute, refitted_interval
            
            
        return rule

In [21]:
class RuleTrimmer:
    """Trims the rule
    """
    
    
    def __init__(self, quantitative_dataframe):
        self.__dataframe = quantitative_dataframe
        
        
    def transform(self, rules):
        copied_rules = [ rule.copy() for rule in rules  ]
        trimmed = [ self.__trim(rule) for rule in copied_rules ]
        
        return trimmed
    
    
    def __trim(self, rule):

        """
        if type(rule) != QuantitativeCAR:
            raise Exception("type of rule must be QuantClassAssociationRule")
        """
            
        covered_by_antecedent_mask, covered_by_consequent_mask = self.__dataframe.find_covered_by_rule_mask(rule)
        
        covered_by_rule_mask = covered_by_antecedent_mask & covered_by_consequent_mask
        
        # instances covered by rule
        correctly_covered_by_r = self.__dataframe.mask(covered_by_rule_mask)
        
        antecedent = rule.antecedent

        for idx, literal in enumerate(antecedent):

            attribute, interval = literal

            # if nominal
            if type(interval) == str:
                continue
            
            current_column = correctly_covered_by_r[[attribute]].values
            current_column_unique = np.unique(current_column)

            if not current_column.any():
                continue

            minv = np.asscalar(min(current_column))
            maxv = np.asscalar(max(current_column))

            new_interval = Interval(minv, maxv, True, True)

            antecedent[idx] = attribute, new_interval

        return rule

In [22]:
class Interval:

    def __init__(self, minval, maxval, left_inclusive, right_inclusive):
        self.minval = minval
        self.maxval = maxval
        self.left_inclusive = left_inclusive
        self.right_inclusive = right_inclusive
        
        
        self.left_bracket = "<" if left_inclusive else "("
        self.right_bracket = ">" if right_inclusive else ")"
        
        self.__membership_func = np.vectorize(
            make_intervalfunc(self.minval, self.maxval, self.left_inclusive, self.right_inclusive)
        )
            
    
    def __hash__(self):
        return hash(repr(self))
    
    def __eq__(self, other):
        return hash(self) == hash(other)
            
    def refit(self, vals):
        """refit values to a finer grid
        """
        values = np.array(vals)
        
        mask = self.test_membership(values)
        new_array = values[mask]

        left, right = min(new_array), max(new_array)

        return Interval(left, right, True, True)
        
            
    def test_membership(self, value):
        return self.__membership_func(value)
    
    def isin(self, value):
        return self.test_membership([value])[0]

    def overlaps_with(self, other):
        return self.isin(other.minval) or self.isin(other.maxval) or other.isin(self.minval) or other.isin(self.maxval)
        

    def string(self):
        return "{}{};{}{}".format(self.left_bracket, self.minval, self.maxval, self.right_bracket)
        
    def __repr__(self):
        return "Interval[{}{};{}{}]".format(self.left_bracket, self.minval, self.maxval, self.right_bracket)

In [23]:
import re
import numpy as np


def make_intervalfunc(minv, maxv, left_inclusivity, right_inclusivity):
    def inner_func(value):
        if greaterthan(value, minv, left_inclusivity) and lesserthan(value, maxv, right_inclusivity):
            return True
        else:
            return False
        
    return inner_func
        
def greaterthan(a, b, inclusivity):
    if inclusivity:
        if a >= b: return True
    elif a > b: return True
    
    return False
        
def lesserthan(a, b, inclusivity):
    if inclusivity:
        if a <= b: return True
    elif a < b: return True
    
    return False


class Interval:

    def __init__(self, minval, maxval, left_inclusive, right_inclusive):
        self.minval = minval
        self.maxval = maxval
        self.left_inclusive = left_inclusive
        self.right_inclusive = right_inclusive
        
        
        self.left_bracket = "<" if left_inclusive else "("
        self.right_bracket = ">" if right_inclusive else ")"
        
        self.__membership_func = np.vectorize(
            make_intervalfunc(self.minval, self.maxval, self.left_inclusive, self.right_inclusive)
        )
            
    
    def __hash__(self):
        return hash(repr(self))
    
    def __eq__(self, other):
        return hash(self) == hash(other)
            
    def refit(self, vals):
        """refit values to a finer grid
        """
        values = np.array(vals)
        
        mask = self.test_membership(values)
        new_array = values[mask]

        left, right = min(new_array), max(new_array)

        return Interval(left, right, True, True)
        
            
    def test_membership(self, value):
        return self.__membership_func(value)
    
    def isin(self, value):
        return self.test_membership([value])[0]

    def overlaps_with(self, other):
        return self.isin(other.minval) or self.isin(other.maxval) or other.isin(self.minval) or other.isin(self.maxval)
        

    def string(self):
        return "{}{};{}{}".format(self.left_bracket, self.minval, self.maxval, self.right_bracket)
        
    def __repr__(self):
        return "Interval[{}{};{}{}]".format(self.left_bracket, self.minval, self.maxval, self.right_bracket)

In [24]:
import re

class IntervalReader():
    
    
    interval_regex = re.compile("(<|\()(\d+(?:\.(?:\d)+)?);(\d+(?:\.(?:\d)+)?)(\)|>)")
    
    
    def __init__(self):
        # opened interval brackets
        self.__open_bracket = "(", ")"
        
        # closed interval brackets
        self.__closed_bracket = "<", ">"
        
        # negative and positive infinity symbol,
        # e.g. -inf, +inf
        self.__infinity_symbol = "-inf", "+inf"
        
        # decimal separator, e.g. ".", ","
        self.__decimal_separator = "."
        
        # interval members separator
        self.__members_separator = ";"
        
        self.compile_reader()
        
        
    def compile_reader(self):

        left_bracket_open = re.escape(self.open_bracket[0])
        left_bracket_closed = re.escape(self.closed_bracket[0])
        
        right_bracket_open = re.escape(self.open_bracket[1])
        right_braket_closed = re.escape(self.closed_bracket[1])
        
        # e.g. (   <    |   \(    ) 
        #      (   {}   |   {}    )
        left_bracket_regex_string = "({}|{})".format(
            left_bracket_open,
            left_bracket_closed
        )
        
        # e.g. (   >   |   \)    ) 
        #      (   {}   |   {}    )
        right_bracket_regex_string = "({}|{})".format(
            right_bracket_open,
            right_braket_closed
        )
        
        # ((   \d+  (?:  \.   (?:\d)+  )?   )|-inf)
        # (   \d+  (?:  {}   (?:\d)+  )?   )
        left_number_regex_string = "(\-?\d+(?:{}(?:\d)+)?|{})".format(
            re.escape(self.decimal_separator),
            re.escape(self.infinity_symbol[0]),
        )
        
        
        # ((   \d+  (?:  \.   (?:\d)+  )?   )|+inf)
        # (   \d+  (?:  {}   (?:\d)+  )?   )
        right_number_regex_string = "(\-?\d+(?:{}(?:\d)+)?|{})".format(
            re.escape(self.decimal_separator),
            re.escape(self.infinity_symbol[1]),
        )
        
        members_separator_regex = "{}".format(
            re.escape(self.members_separator)
        )
        
        
        interval_regex_string = "{}{}{}{}{}".format(
            left_bracket_regex_string,
            left_number_regex_string,
            members_separator_regex,
            right_number_regex_string,
            right_bracket_regex_string
        )
        
        self.__interval_regex = re.compile(interval_regex_string)
        
        
    def read(self, interval_string):
        # returns array of results, take first member
        args = self.__interval_regex.findall(interval_string)[0]
        
        left_bracket, minval, maxval, right_bracket = args
        
        left_inclusive = True if left_bracket == self.closed_bracket[0] else False
        right_inclusive = True if right_bracket == self.closed_bracket[1] else False
        
        
        minval_final = float(minval) if minval != self.infinity_symbol[0] else np.NINF 
        maxval_final = float(maxval) if maxval != self.infinity_symbol[1] else np.PINF
        
        interval = Interval(
            minval_final,
            maxval_final,
            left_inclusive,
            right_inclusive
        )
        
        return interval
      
        
    # boilerplate getter/setter code    
    
    @property
    def open_bracket(self):
        return self.__open_bracket
    
    @open_bracket.setter
    def open_bracket(self, val):
        self.__open_bracket = val
        return self
    
    @property
    def closed_bracket(self):
        return self.__closed_bracket
    
    @closed_bracket.setter
    def closed_bracket(self, val):
        self.__closed_bracket = val
        return self
        
    @property
    def infinity_symbol(self):
        return self.__infinity_symbol
    
    @infinity_symbol.setter
    def infinity_symbol(self, val):
        self.__infinity_symbol = val
        return self
    
    @property
    def decimal_separator(self):
        return self.__decimal_separator
    
    @decimal_separator.setter
    def decimal_separator(self, val):
        self.__decimal_separator = val
        return self
    
    @property
    def members_separator(self):
        return self.__members_separator
    
    @members_separator.setter
    def members_separator(self, val):
        self.__members_separator = val
        return self
    
    
        
interval_reader = IntervalReader()

interval_reader.compile_reader()

interval_reader.read("<1.2;2.3>")



Interval[<1.2;2.3>]

In [25]:
import copy
class QuantitativeCAR:
    
    interval_reader = IntervalReader()
    
    def __init__(self, rule):
        self.antecedent = self.__create_intervals_from_antecedent(rule.antecedent)
        self.consequent = copy.copy(rule.consequent)
        
        self.confidence = rule.confidence
        self.support = rule.support
        self.rulelen = rule.rulelen
        self.rid = rule.rid
        
        # property which indicates wheter the rule was extended or not
        self.was_extended = False
        # literal which extended the rule
        self.extension_literal = None

        self.interval_reader = QuantitativeCAR.interval_reader
        
        
    def __create_intervals_from_antecedent(self, antecedent):
        interval_antecedent = []
        
        for literal in antecedent:
            attribute, value = literal
            
            # catch error if attribute is ordinal
            try:
                interval = QuantitativeCAR.interval_reader.read(value)
            
                interval_antecedent.append((attribute, interval))
            except:
                interval_antecedent.append((attribute, value))
        
        
        return self.__sort_antecedent(interval_antecedent)
    
    
    def __sort_antecedent(self, antecedent):
        return sorted(antecedent)
    
    
    def update_properties(self, quant_dataframe):
        """updates rule properties using instance
        of QuantitativeDataFrame
        
        properties:
            support, confidence, rulelen
        
        """
        
        if quant_dataframe.__class__.__name__ != "QuantitativeDataFrame":
            raise Exception(
                "type of quant_dataframe must be QuantitativeDataFrame"
            )
            
        
        support, confidence = quant_dataframe.calculate_rule_statistics(self)
        
        self.support = support
        self.confidence = confidence
        # length of antecedent + length of consequent
        self.rulelen = len(self.antecedent) + 1
        
    
    def copy(self):
        return copy.deepcopy(self)


    def __deepcopy__(self, memo):

        copied = copy.copy(self)
        copied.antecedent = copy.deepcopy(self.antecedent)
        copied.consequent = copy.deepcopy(self.consequent)

        return copied
        
        
    def __repr__(self):
        ant = self.antecedent

        ant_string_arr = []
        for key, val in ant:
            if type(val) == str:
                ant_string_arr.append("{}={}".format(key, val))
            else:
                ant_string_arr.append("{}={}".format(key, val.string()))

        ant_string = "{" + ",".join(ant_string_arr) + "}"
        
        args = [
            ant_string,
            "{" + self.consequent.string() + "}",
            self.support,
            self.confidence,
            self.rulelen,
            self.rid
        ]
        
        text = "CAR {} => {} sup: {:.2f} conf: {:.2f} len: {}, id: {}".format(*args)

        return text
    
    
    def __gt__(self, other):
        """
        precedence operator. Determines if this rule
        has higher precedence. Rules are sorted according
        to their confidence, support, length and id.
        """
        if (self.confidence > other.confidence):
            return True
        elif (self.confidence == other.confidence and
              self.support > other.support):
            return True
        elif (self.confidence == other.confidence and
              self.support == other.support and
              self.rulelen < other.rulelen):
            return True
        elif(self.confidence == other.confidence and
              self.support == other.support and
              self.rulelen == other.rulelen and
              self.rid < other.rid):
            return True
        else:
            return False
        
    
    def __lt__(self, other):
        """
        rule precedence operator
        """
        return not self > other
    
    
    def __eq__(self, other):
        return self.rid == other.rid
    

In [26]:
import pandas
class LiteralCache:
    """class which stores literals
    and corresponding truth values
    e.g. [
        "food=banana": [True, True, False, False, True],
        "food=apple" : [True, True, True, True, False]
    ]
    
    """
    
    def __init__(self):
        self.__cache = {}

    def insert(self, literal, truth_values):
        self.__cache[literal] = truth_values
        
    def get(self, literal):
        return self.__cache[literal]
        
    def __contains__(self, literal):
        """function for using in
        on LiteralCache object
        """
        
        return literal in self.__cache.keys()

import numpy as np

class QuantitativeDataFrame:

    def __init__(self, dataframe):
        if type(dataframe) != pandas.DataFrame:
            raise Exception("type of dataframe must be pandas.dataframe")
        
        
        self.__dataframe = dataframe
        self.__dataframe.iloc[:,-1] = self.__dataframe.iloc[:,-1].astype(str)
        
        # sorted and unique columns of the dataframe
        # saved as a numpy array
        self.__preprocessed_columns = self.__preprocess_columns(dataframe)
        
        
        # literal cache for computing rule statistics
        # - support and confidence
        self.__literal_cache = LiteralCache()

        # so that it doesn't have to be computed over and over
        self.size = dataframe.index.size
        
        
    @property
    def dataframe(self):
        return self.__dataframe
    
    
    def column(self, colname):
        return self.__preprocessed_columns[colname]
    
    
    def mask(self, vals):
        return self.__dataframe[vals]
    
    
    def find_covered_by_antecedent_mask(self, antecedent):
        """
        returns:
            mask - an array of boolean values indicating which instances
            are covered by antecedent
        """
        
        # todo: compute only once to make function faster
        dataset_size = self.__dataframe.index.size

        cummulated_mask = np.ones(dataset_size).astype(bool)
        
        for literal in antecedent:
            attribute, interval = literal
            
            # the column that concerns the
            # iterated attribute
            # instead of pandas.Series, grab the ndarray
            # using values attribute
            relevant_column = self.__dataframe[[attribute]].values.reshape(dataset_size)
            
            # this tells us which instances satisfy the literal
            current_mask = self.get_literal_coverage(literal, relevant_column)
            
            # add cummulated and current mask using logical AND
            cummulated_mask &= current_mask

        return cummulated_mask
    
    
    def find_covered_by_literal_mask(self, literal):
        """
        returns:
            mask - an array of boolean values indicating which instances
            are covered by literal
        """
        
        for literal in rule.antecedent:
            attribute, interval = literal
            
            # the column that concerns the
            # iterated attribute
            # instead of pandas.Series, grab the ndarray
            # using values attribute
            relevant_column = self.__dataframe[[attribute]].values.reshape(dataset_size)
            
            # this tells us which instances satisfy the literal
            current_mask = self.get_literal_coverage(literal, relevant_column)
            
            # add cummulated and current mask using logical AND
            cummulated_mask &= current_mask
    
    
    def find_covered_by_rule_mask(self, rule):
        """
        returns:
            covered_by_antecedent_mask:
                - array of boolean values indicating which
                dataset rows satisfy antecedent
                
            covered_by_consequent_mask:
                - array of boolean values indicating which
                dataset rows satisfy conseqeunt
        """
        
        dataset_size = self.__dataframe.index.size
        
        # initialize a mask filled with True values
        # it will get modified as futher literals get
        # tested
        
        # for optimization - create cummulated mask once
        # in constructor
        cummulated_mask = np.array([True] * dataset_size)
        
        for literal in rule.antecedent:
            attribute, interval = literal
            
            # the column that concerns the
            # iterated attribute
            # instead of pandas.Series, grab the ndarray
            # using values attribute
            relevant_column = self.__dataframe[[attribute]].values.reshape(dataset_size)
            
            # this tells us which instances satisfy the literal
            current_mask = self.get_literal_coverage(literal, relevant_column)
            
            # add cummulated and current mask using logical AND
            cummulated_mask &= current_mask
            
            
        
        instances_satisfying_antecedent_mask = cummulated_mask
        instances_satisfying_consequent_mask = self.__get_consequent_coverage_mask(rule)
        instances_satisfying_consequent_mask = instances_satisfying_consequent_mask.reshape(dataset_size)
        
        return instances_satisfying_antecedent_mask, instances_satisfying_consequent_mask
        
        
    
    def calculate_rule_statistics(self, rule):
        """calculates rule's confidence and
        support using efficient numpy functions
        
        
        returns:
        --------
        
            support:
                float
            
            confidence:
                float
        """
        
        dataset_size = self.__dataframe.index.size
        
        # initialize a mask filled with True values
        # it will get modified as futher literals get
        # tested
        
        # for optimization - create cummulated mask once
        # in constructor
        cummulated_mask = np.array([True] * dataset_size)
        
        for literal in rule.antecedent:
            attribute, interval = literal
            
            # the column that concerns the
            # iterated attribute
            # instead of pandas.Series, grab the ndarray
            # using values attribute
            relevant_column = self.__dataframe[[attribute]].values.reshape(dataset_size)
            
            # this tells us which instances satisfy the literal
            current_mask = self.get_literal_coverage(literal, relevant_column)
            
            # add cummulated and current mask using logical AND
            cummulated_mask &= current_mask
            
        
        instances_satisfying_antecedent = self.__dataframe[cummulated_mask].index
        instances_satisfying_antecedent_count = instances_satisfying_antecedent.size
        
        # using cummulated mask to filter out instances that satisfy consequent
        # but do not satisfy antecedent
        instances_satisfying_consequent_mask = self.__get_consequent_coverage_mask(rule)
        instances_satisfying_consequent_mask = instances_satisfying_consequent_mask.reshape(dataset_size)
        
        instances_satisfying_consequent_and_antecedent = self.__dataframe[
            instances_satisfying_consequent_mask & cummulated_mask
        ].index
        
        instances_satisfying_consequent_and_antecedent_count = instances_satisfying_consequent_and_antecedent.size
        instances_satisfying_consequent_count = self.__dataframe[instances_satisfying_consequent_mask].index.size
        
        # instances satisfying consequent both antecedent and consequent 

        support = instances_satisfying_antecedent_count / dataset_size
        
        confidence = 0
        if instances_satisfying_antecedent_count != 0:
            confidence = instances_satisfying_consequent_and_antecedent_count / instances_satisfying_antecedent_count
        
        return support, confidence
    
    
    def __get_consequent_coverage_mask(self, rule):
        consequent = rule.consequent
        attribute, value = consequent

        class_column = self.__dataframe[[attribute]].values
        class_column = class_column.astype(str)

        literal_key = "{}={}".format(attribute, value)

        mask = []
        
        if literal_key in self.__literal_cache:
            mask = self.__literal_cache.get(literal_key)
        else:
            mask = class_column == value
        
        return mask
    
    
    def get_literal_coverage(self, literal, values):
        """returns mask which describes the instances that
        satisfy the interval
        
        function uses cached results for efficiency
        """
        
        if type(values) != np.ndarray:
            raise Exception("Type of values must be numpy.ndarray")
            
        mask = []
        
        attribute, interval = literal
        
        literal_key = "{}={}".format(attribute, interval)
        
        # check if the result is already cached, otherwise
        # calculate and save the result
        if literal_key in self.__literal_cache:
            mask = self.__literal_cache.get(literal_key)
        else:
            mask = None

            if type(interval) == str:
                mask = np.array([ val == interval for val in values ])
            else:
                mask = interval.test_membership(values)
            
            self.__literal_cache.insert(literal_key, mask)
            
        # reshape mask into single dimension
        mask = mask.reshape(values.size)
            
        return mask
    
    
    def __preprocess_columns(self, dataframe):
        
        # covert to dict
        # column -> list
        # need to convert it to numpy array
        dataframe_dict = dataframe.to_dict(orient="list")
        
        dataframe_ndarray = {}
        
        
        for column, value_list in dataframe_dict.items():
            transformed_list = np.sort(np.unique(value_list))
            dataframe_ndarray[column] = transformed_list
            
        return dataframe_ndarray

In [27]:
class QuantitativeClassifier:
    import math
    def __init__(self, rules, default_class):
        self.rules = rules
        self.default_class = default_class
            

    def rule_model_accuracy(self, quantitative_dataframe, ground_truth, df, QCBA):
        predicted = self.predict(quantitative_dataframe)
        if len(df[0][0][0]) == 15 or len(df) == 1007:
            return prediction_class_model(df, QCBA)
        return accuracy_score(predicted, ground_truth) 

    def predict(self, quantitative_dataframe):
        predicted_classes = []
    
        for _, row in quantitative_dataframe.dataframe.iterrows():
            appended = False
            for rule in self.rules:
                antecedent_dict = dict(rule.antecedent)  
                counter = True

                for name, value in row.iteritems():
                    if name in antecedent_dict:
                        interval = antecedent_dict[name]

                        if type(interval) == str:
                            counter &= interval == value
                        else:
                            result = interval.isin(value)
                            counter &= result

                if counter:
                    _, predicted_class = rule.consequent
                    predicted_classes.append(predicted_class)
                    appended = True
                    break
                    
            if not appended:
                predicted_classes.append(self.default_class)

                    
        return predicted_classes            



In [28]:
class QCBATransformation:


    def __init__(self, quantitative_dataset, transaction_based_drop=True):
        self.transaction_based_drop = transaction_based_drop

        self.dataset = quantitative_dataset

        self.refitter = RuleRefitter(self.dataset)
        self.literal_pruner = RuleLiteralPruner(self.dataset)
        self.trimmer = RuleTrimmer(self.dataset)
        self.extender = RuleExtender(self.dataset)
        self.post_pruner = RulePostPruner(self.dataset)
        self.overlap_pruner = RuleOverlapPruner(self.dataset)


    def transform(self, rules, transformation_dict={}):

        if not transformation_dict:
            print("applying all transformations")
            refitted = self.refitter.transform(rules)
            literal_pruned = self.literal_pruner.transform(refitted)
            trimmed = self.trimmer.transform(literal_pruned)
            extended = self.extender.transform(trimmed)
            post_pruned, default_class = self.post_pruner.transform(extended)
            overlap_pruned = self.overlap_pruner.transform(post_pruned, default_class, transaction_based=self.transaction_based_drop)
        
        else:
            print("applying selected transformations")
            transformed_rules = rules
 
            if transformation_dict.get("refitting", False):
                print("refitting")
                transformed_rules = self.refitter.transform(transformed_rules)
            if transformation_dict.get("literal_pruning", False):
                print("literal pruning")
                transformed_rules = self.literal_pruner.transform(transformed_rules)
            if transformation_dict.get("trimming", False):
                print("trimming")
                transformed_rules = self.trimmer.transform(transformed_rules)
            if transformation_dict.get("extension", False):
                print("extending")
                transformed_rules = self.extender.transform(transformed_rules)

            print("post pruning")
            try:
                transformed_rules, default_class = self.post_pruner.transform(transformed_rules)
            except:
                print("overlap pruning")
                return None
            if transformation_dict.get("overlap_pruning", False):
                print("overlap pruning")
                transaction_based = transformation_dict["transaction_based_drop"]
                transformed_rules = self.overlap_pruner.transform(transformed_rules, default_class, transaction_based=transaction_based)
            return transformed_rules, default_class

        return overlap_pruned, default_class

In [29]:
class QCBA:
    import math
    def __init__(self, quantitative_dataset, cba_rule_model=None, rules=None):
        if rules and cba_rule_model:
            raise Exception("rules and cba_rule_model cannot be specified together")

        if not rules and not cba_rule_model:
            raise Exception("either rules and cba_rule_model need to be specified")

        self.quantitative_dataset = quantitative_dataset
        self.__quant_rules = None

        if cba_rule_model:
            self.__quant_rules = [ QuantitativeCAR(r) for r in cba_rule_model.clf.rules ]
        if rules:
            self.__quant_rules = [ QuantitativeCAR(r) for r in rules ]
         

        self.qcba_transformation = QCBATransformation(quantitative_dataset)

        self.clf = None

    def fit(
            self, 
            refitting=True,
            literal_pruning=True,
            trimming=True,
            extension=True,
            overlap_pruning=True,
            transaction_based_drop=True
        ):

        transformation_dict = {
            "refitting": refitting,
            "literal_pruning": literal_pruning,
            "trimming": trimming,
            "extension": extension,
            "overlap_pruning": overlap_pruning,
            "transaction_based_drop": transaction_based_drop
        }

        try:  
            transformed_rules, default_class = self.qcba_transformation.transform(self.__quant_rules, transformation_dict)
            self.clf = QuantitativeClassifier(transformed_rules, default_class)
        except:
            return None

        return self.clf
    
    def param(refitting,literal_pruning,trimming,extension,overlap_pruning,transaction_based_drop, accuracy):
        return accuracy
        
    def score(self, QCBA, quantitative_dataset, df):
        actual = quantitative_dataset.dataframe.iloc[:, -1]
        if len(df) == 103 or len(df) == 120 or len(df) == 280  or len(df) == 6499 or len(df) == 67:
            return prediction_class_model(df, QCBA)
        return self.clf.rule_model_accuracy(quantitative_dataset, actual, df, QCBA)