In [1]:
import numpy as np
import pandas as pd
from sklearn.feature_selection import mutual_info_classif
import gmpy2

In [2]:
DATASETS = {
    'beans': ('beans_kmeans.csv', 'Class', []),
}

In [3]:
def safe_int(x):
    return int(x) if not isinstance(x, int) else x

def log2_safe(p: gmpy2.mpfr):
    return gmpy2.log2(p) if p > 0 else gmpy2.mpfr(0)


In [4]:
def load_csv_data(filename, target_col, drop_cols=None):
    df = pd.read_csv(filename, delimiter=',')
    if drop_cols:
        df = df.drop(columns=drop_cols, errors='ignore')
    df = df.apply(lambda s: s.astype(np.int64) if s.name != target_col else s)
    return np.array_split(df, 3, axis=1)

def load_dataset(name):
    if name not in DATASETS:
        raise ValueError(f"Unknown dataset: {name}")
    file, target, drops = DATASETS[name]
    return load_csv_data(file, target, drops), target

In [5]:
def get_min_mutual_info_feature(data_parts, target_col):
    for part in data_parts:
        if target_col in part.columns:
            feature_cols = [c for c in part.columns if c != target_col]
            if not feature_cols:
                raise ValueError("No feature columns in target-containing part.")
            X = part[feature_cols].values
            y = part[target_col].values
            mi_scores = mutual_info_classif(X, y, discrete_features=True)
            return min(dict(zip(feature_cols, mi_scores)), key=lambda k: mi_scores[feature_cols.index(k)])
    raise ValueError(f"Target column '{target_col}' not found.")

In [6]:
def safe_int(x):
    return int(x) if not isinstance(x, int) else x

def log2_safe(p: gmpy2.mpfr):
    return gmpy2.log2(p) if p > 0 else gmpy2.mpfr(0)

# =====================================================================
# Mutual Information (Plain)
# =====================================================================

def compute_mutual_information(data_parts, target_col):
    # Locate target column
    target_series = None
    for p in data_parts:
        if target_col in p.columns:
            target_series = p[target_col]
            break
    if target_series is None:
        return {}
    n = len(target_series)
    unique_Y = pd.unique(target_series)
    # Precompute Y indicators & entropy H(Y)
    Y_ind = {y: (target_series == y).astype(int).to_numpy() for y in unique_Y}
    H_y = gmpy2.mpfr(0)
    for y in unique_Y:
        py = gmpy2.mpfr(int(Y_ind[y].sum())) / gmpy2.mpfr(n)
        if py > 0:
            H_y -= py * log2_safe(py)
    mi = {}
    for part in data_parts:
        for col in part.columns:
            if col == target_col:
                continue
            X = part[col]
            unique_X = pd.unique(X)
            X_ind = {xv: (X == xv).astype(int).to_numpy() for xv in unique_X}
            count_x = {xv: X_ind[xv].sum() for xv in unique_X}
            H_y_given_x = gmpy2.mpfr(0)
            for xv in unique_X:
                cx = count_x[xv]
                if cx == 0:
                    continue
                for y in unique_Y:
                    c_xy = int((X_ind[xv] & Y_ind[y]).sum())
                    if c_xy == 0:
                        continue
                    p_xy = gmpy2.mpfr(c_xy) / gmpy2.mpfr(n)
                    p_y_given_x = gmpy2.mpfr(c_xy) / cx
                    H_y_given_x -= p_xy * log2_safe(p_y_given_x)
            mi[col] = float(H_y - H_y_given_x)
    return mi