In [4]:
!pip install ace_tools




In [6]:
import random
import itertools
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple
from multiprocessing import Manager, Process
from IPython.display import display

# -----------------------------
# Type Definitions
# -----------------------------
Interval = Tuple[int, int]
Itemset = Dict[str, Interval]

# -----------------------------
# Utility Functions
# -----------------------------
def point_in_interval(point: float, interval: Interval) -> bool:
    return interval[0] < point < interval[1]

def satisfies(itemset: Itemset, row: pd.Series) -> bool:
    return all(point_in_interval(row[attr], itemset[attr]) for attr in itemset)

def support(itemset: Itemset, data: pd.DataFrame, class_col: str) -> float:
    total_c = data[class_col].sum()
    matched = data[data.apply(lambda row: satisfies(itemset, row), axis=1)]
    return matched[class_col].sum() / total_c if total_c > 0 else 0

def shrink_difference(inner: Interval, outer: Interval) -> int:
    return (inner[0] - outer[0]) + (outer[1] - inner[1])

def delta(itemset: Itemset, max_vals: Dict[str, int]) -> int:
    return sum(shrink_difference(itemset[attr], (0, max_vals[attr])) for attr in itemset)

# -----------------------------
# Standard Apriori
# -----------------------------
def apriori_standard(data: pd.DataFrame, epsilon: float, class_col: str) -> Dict[str, float]:
    attributes = [col for col in data.columns if col != class_col]
    max_vals = {a: int(np.ceil(data[a].max())) for a in attributes}
    I0 = {a: (0, max_vals[a]) for a in attributes}
    R, SWk = {}, [I0]

    while SWk:
        Wk = []
        for I in SWk:
            for a in attributes:
                b, e = I[a]
                for mid in range(b + 1, e):
                    new_I = I.copy()
                    new_I[a] = (b, mid)
                    if delta(new_I, max_vals) == delta(I, max_vals) + 1:
                        Wk.append(new_I)
        SWk = []
        for I in Wk:
            s = support(I, data, class_col)
            if s >= epsilon:
                R[str(I)] = s
                SWk.append(I)
    return R

# -----------------------------
# Randomic Apriori
# -----------------------------
def apriori_randomic(data: pd.DataFrame, epsilon: float, class_col: str) -> Dict[str, float]:
    attributes = [c for c in data.columns if c != class_col]
    max_vals = {a: int(np.ceil(data[a].max())) for a in attributes}
    I0 = {a: (0, max_vals[a]) for a in attributes}
    R, LP, LS, LNS = {}, [I0], {}, {}

    while LP:
        I = LP.pop(random.randint(0, len(LP) - 1))
        key = str(I)
        if key in LS or key in LNS:
            continue
        s = support(I, data, class_col)
        if s >= epsilon:
            R[key] = s
            LS[key] = s
            for a in attributes:
                b, e = I[a]
                for mid in range(b + 1, e):
                    new_I = I.copy()
                    new_I[a] = (b, mid)
                    LP.append(new_I)
        else:
            LNS[key] = s
    return R

# -----------------------------
# Randomic Distributed Apriori
# -----------------------------
def distributed_worker(GP, GS, GNS, data, epsilon, class_col, max_vals, attributes):
    LS, LNS = {}, {}
    while True:
        if not GP:
            break
        try:
            I = GP.pop()
        except:
            break
        key = str(I)
        if key in LS or key in LNS:
            continue
        reject = False
        for a in attributes:
            b, e = I[a]
            for mid in range(b + 1, e):
                new_I = I.copy()
                new_I[a] = (b, mid)
                if str(new_I) in GS:
                    reject = True
                    break
            if reject:
                break
        if reject:
            continue
        s = support(I, data, class_col)
        if s >= epsilon:
            GS[key] = s
            LS[key] = s
            for a in attributes:
                b, e = I[a]
                for mid in range(b + 1, e):
                    new_I = I.copy()
                    new_I[a] = (b, mid)
                    GP.append(new_I)
        else:
            GNS[key] = s
            LNS[key] = s

def apriori_randomic_distributed(data: pd.DataFrame, epsilon: float, class_col: str, num_workers=2) -> Dict[str, float]:
    attributes = [c for c in data.columns if c != class_col]
    max_vals = {a: int(np.ceil(data[a].max())) for a in attributes}
    I0 = {a: (0, max_vals[a]) for a in attributes}
    manager = Manager()
    GP, GS, GNS = manager.list([I0]), manager.dict(), manager.dict()
    processes = [Process(target=distributed_worker, args=(GP, GS, GNS, data, epsilon, class_col, max_vals, attributes)) for _ in range(num_workers)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    return dict(GS)

# -----------------------------
# Run All Algorithms
# -----------------------------
data = pd.DataFrame({'A1': [1.2, 2.5, 3.1, 2.9, 1.5], 'A2': [5.0, 5.5, 6.0, 5.2, 5.1], 'C': [1, 1, 1, 1, 1]})
epsilon = 0.4
std_result = apriori_standard(data, epsilon, 'C')
rnd_result = apriori_randomic(data, epsilon, 'C')
dist_result = apriori_randomic_distributed(data, epsilon, 'C')

# Display All Results
df = pd.DataFrame(
    [{"algorithm": "standard", "itemset": k, "support": v} for k, v in std_result.items()] +
    [{"algorithm": "randomic", "itemset": k, "support": v} for k, v in rnd_result.items()] +
    [{"algorithm": "distributed", "itemset": k, "support": v} for k, v in dist_result.items()]
)

display(df)


Unnamed: 0,algorithm,itemset,support
0,standard,"{'A1': (0, 3), 'A2': (0, 6)}",0.8
1,standard,"{'A1': (0, 2), 'A2': (0, 6)}",0.4
2,randomic,"{'A1': (0, 4), 'A2': (0, 6)}",0.8
3,randomic,"{'A1': (0, 2), 'A2': (0, 6)}",0.4
4,randomic,"{'A1': (0, 3), 'A2': (0, 6)}",0.8
5,distributed,"{'A1': (0, 4), 'A2': (0, 6)}",0.8
6,distributed,"{'A1': (0, 3), 'A2': (0, 6)}",0.8
7,distributed,"{'A1': (0, 2), 'A2': (0, 6)}",0.4


In [7]:
import itertools
from scipy.stats import fisher_exact
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple
from IPython.display import display

# -----------------------------
# Type Definitions
# -----------------------------
Interval = Tuple[int, int]
Itemset = Dict[str, Interval]

# -----------------------------
# Satisfiability and Support
# -----------------------------
def point_in_interval(point: float, interval: Interval) -> bool:
    return interval[0] < point < interval[1]

def satisfies(itemset: Itemset, row: pd.Series) -> bool:
    return all(point_in_interval(row[attr], itemset[attr]) for attr in itemset)

def support(itemset: Itemset, data: pd.DataFrame, class_col: str) -> float:
    total_c = data[class_col].sum()
    matched = data[data.apply(lambda row: satisfies(itemset, row), axis=1)]
    return matched[class_col].sum() / total_c if total_c > 0 else 0

# -----------------------------
# Parse Apriori Output
# -----------------------------
def parse_itemsets(itemset_dict: Dict[str, float]) -> List[Tuple[Itemset, float]]:
    return [(eval(k), v) for k, v in itemset_dict.items()]

# -----------------------------
# Extract Association Rules
# -----------------------------
def extract_rules_from_frontier(itemsets: List[Tuple[Itemset, float]], data: pd.DataFrame, class_col='C', min_conf=0.8):
    rules = []
    for full_itemset, full_support in itemsets:
        attrs = list(full_itemset.keys())
        for i in range(1, len(attrs)):
            for antecedent_keys in itertools.combinations(attrs, i):
                antecedent = {a: full_itemset[a] for a in antecedent_keys}
                consequent = {a: full_itemset[a] for a in attrs if a not in antecedent_keys}

                ant_support = support(antecedent, data, class_col)
                cons_support = support(consequent, data, class_col)
                conf = full_support / ant_support if ant_support > 0 else 0
                lift_val = full_support / (ant_support * cons_support + 1e-6)

                if conf >= min_conf:
                    rules.append({
                        "antecedent": antecedent,
                        "consequent": consequent,
                        "support": round(full_support, 3),
                        "confidence": round(conf, 3),
                        "lift": round(lift_val, 3)
                    })
    return rules

# -----------------------------
# Compute P-values
# -----------------------------
def add_p_values(rules, data, class_col='C'):
    for rule in rules:
        A = data.apply(lambda row: satisfies(rule['antecedent'], row), axis=1)
        B = data.apply(lambda row: satisfies(rule['consequent'], row), axis=1)

        a = sum(A & B)
        b = sum(A & ~B)
        c = sum(~A & B)
        d = sum(~A & ~B)

        contingency = [[a, b], [c, d]]
        try:
            _, p = fisher_exact(contingency, alternative='greater')
        except:
            p = 1.0
        rule["p_value"] = round(p, 4)
    return rules

# -----------------------------
# Example Dataset and Results
# -----------------------------
data = pd.DataFrame({
    'A1': [1.2, 2.5, 3.1, 2.9, 1.5],
    'A2': [5.0, 5.5, 6.0, 5.2, 5.1],
    'C': [1, 1, 1, 1, 1]
})

# Sample frontier from previous Apriori execution
frontier = {
    "{'A1': (0, 3), 'A2': (0, 6)}": 0.8,
    "{'A1': (0, 2), 'A2': (0, 6)}": 0.4,
    "{'A1': (0, 4), 'A2': (0, 6)}": 0.8
}

parsed_itemsets = parse_itemsets(frontier)
rules = extract_rules_from_frontier(parsed_itemsets, data, min_conf=0.8)
rules_with_p = add_p_values(rules, data)

# -----------------------------
# Display Resulting Rules
# -----------------------------
rules_df = pd.DataFrame(rules_with_p)
display(rules_df)


Unnamed: 0,antecedent,consequent,support,confidence,lift,p_value
0,"{'A1': (0, 3)}","{'A2': (0, 6)}",0.8,1.0,1.25,0.2
1,"{'A2': (0, 6)}","{'A1': (0, 3)}",0.8,1.0,1.25,0.2
2,"{'A1': (0, 2)}","{'A2': (0, 6)}",0.4,1.0,1.25,0.6
3,"{'A1': (0, 4)}","{'A2': (0, 6)}",0.8,0.8,1.0,1.0
4,"{'A2': (0, 6)}","{'A1': (0, 4)}",0.8,1.0,1.0,1.0


In [9]:

from itertools import combinations
from collections import defaultdict

# -----------------------------
# Filter Final Rules (Relaxed Thresholds for Small Dataset)
# -----------------------------
def filter_final_rules(rules, p_thresh=0.5, lift_thresh=1.2):
    return [
        rule for rule in rules
        if rule['p_value'] < p_thresh and rule['lift'] > lift_thresh
    ]

# -----------------------------
# Build Ant and Cons Sets
# -----------------------------
def build_ant_cons(final_rules):
    Ant = set()
    Cons = set()
    for rule in final_rules:
        for i, iv in rule['antecedent'].items():
            Ant.add((i, iv))
        for j, jv in rule['consequent'].items():
            Cons.add((j, jv))
    return Ant, Cons

# -----------------------------
# Support for Single Attribute-Interval Pair
# -----------------------------
def sup_single(pair, data, class_col='C'):
    attr, interval = pair
    total_c = data[class_col].sum()
    matched = data[(data[attr] > interval[0]) & (data[attr] < interval[1])]
    return matched[class_col].sum() / total_c if total_c > 0 else 0

# -----------------------------
# Closure of Antecedent Subset
# -----------------------------
def Cl(Ant_subset, data, class_col='C'):
    best = {}
    grouped = defaultdict(list)
    for attr, interval in Ant_subset:
        grouped[attr].append(interval)
    for attr, intervals in grouped.items():
        best_interval = max(intervals, key=lambda iv: sup_single((attr, iv), data, class_col))
        best[attr] = best_interval
    return best

# -----------------------------
# J-Measure (approximated with lift proxy)
# -----------------------------
def j_measure(antecedent, consequent, data, class_col='C'):
    full = {**antecedent, **consequent}
    s_xy = support(full, data, class_col)
    s_x = support(antecedent, data, class_col)
    s_y = support(consequent, data, class_col)
    if s_x == 0 or s_y == 0:
        return 0
    return s_xy / (s_x * s_y)

# -----------------------------
# Shapley Value Approximation
# -----------------------------
def shapley_values(final_rules, Ant, Cons, data, class_col='C', num_samples=30):
    shapley_scores = defaultdict(lambda: defaultdict(float))

    for j, interval in Cons:
        Antj = [(i, iv) for (i, iv) in Ant if i != j]
        for attr_iv in Antj:
            score = 0
            for _ in range(num_samples):
                subset = [ai for ai in Antj if ai != attr_iv]
                coalition = subset + [attr_iv]

                cl_with = Cl(coalition, data, class_col)
                cl_without = Cl(subset, data, class_col)

                cons_dict = {j: interval}
                j_with = j_measure(cl_with, cons_dict, data, class_col)
                j_without = j_measure(cl_without, cons_dict, data, class_col)

                score += j_with - j_without
            shapley_scores[(j, interval)][attr_iv] = round(score / num_samples, 4)
    return shapley_scores

# -----------------------------
# Run Exercise 3 (Fixed with Relaxed Thresholds)
# -----------------------------
final_rules = filter_final_rules(rules_with_p, p_thresh=0.5, lift_thresh=1.2)
print(f"✅ Found {len(final_rules)} final rules")

if not final_rules:
    print("❌ No rules passed the relaxed filter conditions. Try on a larger dataset.")
else:
    Ant, Cons = build_ant_cons(final_rules)
    shapley = shapley_values(final_rules, Ant, Cons, data, num_samples=30)

    formatted_shapley = []
    for cons, contributors in shapley.items():
        for ant, val in contributors.items():
            formatted_shapley.append({
                "consequent": cons,
                "antecedent": ant,
                "shapley_value": val
            })

    shapley_df = pd.DataFrame(formatted_shapley)
    display(shapley_df)


✅ Found 2 final rules


Unnamed: 0,consequent,antecedent,shapley_value
0,"(A1, (0, 3))","(A2, (0, 6))",0.25
1,"(A2, (0, 6))","(A1, (0, 3))",0.25
