In [4]:
from src.aq import AQClassifier
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
from sklearn.preprocessing import OneHotEncoder
from copy import deepcopy
import pandas as pd


def remove_duplicate_dicts(dict_list):
    unique_dicts = set()
    result = []
    for d in dict_list:
        dict_tuple = tuple(sorted({k: tuple(v) for k, v in d.items()}.items()))
        if dict_tuple not in unique_dicts:
            unique_dicts.add(dict_tuple)
            result.append(d)
    return result


def partial_star_score(partial_star, df):
    boolean_mask = pd.Series(True, index=df.index)
    for column, allowed_values in partial_star.items():
        if len(allowed_values) > 0:
            boolean_mask &= df[column].isin(allowed_values)
    return boolean_mask.sum()


def generate_stars(seed, negatives):
    partial_stars = []
    MAX_CPX = 10
    for idx, neg_ex in negatives.iterrows():
        diff_columns = neg_ex[seed.iloc[0] != neg_ex]
        if len(partial_stars) == 0:
            for col_idx, val in diff_columns.items():
                star = {name: set() for name in neg_ex.index}
                star[col_idx].add(val)
                partial_stars.append(star)
            continue

        candidates = []
        for ps in partial_stars:
            for col_idx, val in diff_columns.items():
                if val not in ps[col_idx]:
                    new_ps = deepcopy(ps)
                    new_ps[col_idx].add(val)
                    candidates.append(new_ps)
        for col_idx, val in diff_columns.items():
            star = {name: set() for name in neg_ex.index}
            star[col_idx].add(val)
            partial_stars.append(star)
        partial_stars.extend(candidates)
        partial_stars = remove_duplicate_dicts(partial_stars)
        partial_stars.sort(key=lambda x: partial_star_score(x, negatives[negatives.index <= idx]), reverse=True)
        partial_stars = partial_stars[:MAX_CPX] if len(partial_stars) >= MAX_CPX else partial_stars
    return partial_stars


def covers(example, star):
    for col_idx, val in star.items():
        if col_idx not in example.keys():
            continue
        if example[col_idx] in val:
            return False
    return True


def find_best_star(positives, stars):
    best_star = stars[0]
    value = -1
    for star in stars:
        covered = positives.apply(lambda x: covers(x, star), axis=1)
        covered_count = covered.sum()
        if covered_count >= value:
            best_star = star
            value = covered_count
    return best_star


def aq_algorithm(positives, negatives):
    work_df = positives
    cover = []
    while len(work_df) > 0:
        seed = work_df.head(1)
        stars = generate_stars(seed, negatives)
        star = find_best_star(positives, stars)
        covered = work_df.apply(lambda x: not covers(x, star), axis=1)
        work_df = work_df[covered]
        cover.append(star)
        cover.sort(key=lambda x: positives.map(lambda p: covers(p, x)).sum(), reverse=True)
        cover = cover[0]
    return cover


class AQClassifier:
    def __init__(self):
        self.rules = []
        self.classes_ = []

    def fit(self, x_train, y_train):
        pos = x_train[y_train]
        neg = x_train[~ y_train]
        self.rules = aq_algorithm(pos, neg)
        self.classes_ = list(set(y_train))

    @staticmethod
    def _rule_cover(rules, example):
        return any(covers(example, rule) for rule in rules)

    def predict(self, x_test):
        return x_test.apply(lambda x: self._rule_cover(self.rules, x), axis=1)


df = pd.read_csv("data/bank.csv", sep=';').select_dtypes(exclude=["number"]).head(30)
df['y'] = df['y'].map(lambda x: x == 'yes')
X_train, X_test, y_train, y_test = train_test_split(df.drop(columns='y'), df['y'], test_size=0.2)
cls = AQClassifier()
cls.fit(X_train, y_train)
y_pred = cls.predict(X_test)
print(f"Accuracy: {accuracy_score(y_test, y_pred)}")

Accuracy: 0.8333333333333334
