# Evaluation

Tabular 데이터에 대해 AbPC 구하기

In [1]:
import os; os.chdir("../")

In [None]:
import pickle
import functools
from math import comb

import numpy as np
import pandas as pd
from sklearn.cluster import KMeans
import xgboost as xgb

import matplotlib.pyplot as plt

Note: You have installed the 'manylinux2014' variant of XGBoost. Certain features such as GPU algorithms or federated learning are not available. To use these features, please upgrade to a recent Linux distro with glibc 2.28+, and install the 'manylinux_2_28' variant.


In [None]:
def _transform(X, feature_metadata):
    input_data = []
    for k, v in feature_metadata.items():
        preprocessed = v['encoder'].transform(X[[k]].values)
        if v['type'] == 'categorical':
            preprocessed = preprocessed.toarray()
        input_data.append(preprocessed)
    
    input_array = np.concatenate(input_data, axis=1)
    return input_array

def _invert_input_array(input_array, feature_metadata):
    inverted_data = {}
    
    for col, meta in feature_metadata.items():
        if meta['type'] == 'categorical':
            # One-hot encoded 된 부분 추출
            start_idx, end_idx = meta['index'][0], meta['index'][-1] + 1
            cat_data = input_array[:, start_idx:end_idx]
            # OneHotEncoder로 복원
            inverted_col = meta['encoder'].inverse_transform(cat_data)
            inverted_data[col] = inverted_col.flatten()
        else:
            # 수치형 데이터 복원
            idx = meta['index']
            num_data = input_array[:, idx].reshape(-1, 1)
            inverted_col = meta['encoder'].inverse_transform(num_data)
            inverted_data[col] = inverted_col.flatten()
    
    # 복원된 데이터를 DataFrame으로 변환
    inverted_df = pd.DataFrame(inverted_data)
    
    return inverted_df


def find_closest_data_with_center(X_train, cluster_centers):
    closest_data = []
    
    for center in cluster_centers:
        # 각 중심에 대해 유클리드 거리 계산
        distances = np.linalg.norm(X_train - center, axis=1)
        # 가장 가까운 데이터의 인덱스 찾기
        closest_index = np.argmin(distances)
        # 가장 가까운 데이터 추가
        closest_data.append(X_train[closest_index])
    
    return np.array(closest_data)

def shapley_kernel(N):
    kernel = np.zeros(N)
    for s in range(1, N):  # subset 크기 s는 1부터 N-1까지
        kernel[s] = (N - 1) / (comb(N, s) * s * (N - s))
    return kernel / kernel.sum()

In [4]:
dataset_nm = "Adult"
X_train = np.load(f"data/{dataset_nm}/X_train.npy")
y_train = np.load(f"data/{dataset_nm}/y_train.npy")

X_test = np.load(f"data/{dataset_nm}/X_test.npy")
y_test = np.load(f"data/{dataset_nm}/y_test.npy")

feature_metadata = pickle.load(open(f"data/{dataset_nm}/feature_metadata.pkl", "rb"))
invert_input_array = functools.partial(_invert_input_array, feature_metadata=feature_metadata)
transform = functools.partial(_transform, feature_metadata=feature_metadata)

In [5]:
num_cols = []
cat_cols = []

for i, (col_nm, info) in enumerate(feature_metadata.items()):
    if info['type'] == 'numerical':
        num_cols.append(col_nm)
    else:
        cat_cols.append(col_nm)
        encoder = info['encoder']
        weight = info['cat_dist']

In [7]:
kmeans = KMeans(n_clusters=50, random_state=42)
kmeans.fit(X_train)

res = kmeans.predict(X_train)
_, counts = np.unique(res, return_counts=True)
weight = counts / counts.sum()

bg_data = find_closest_data_with_center(X_train, kmeans.cluster_centers_)

In [None]:
bg_data = invert_input_array(bg_data)
train_data = invert_input_array(X_train)

In [12]:
model = xgb.XGBClassifier()
model.load_model(f"data/{dataset_nm}/xgb_model.json")

In [77]:
target_size = 10
n_coalition = 100
orig_feature = bg_data.columns.copy()

target = invert_input_array(X_test[:target_size])
base_y = model.predict_proba(transform(target))[:, 1]

record = {"bin_coal" : [], "perturbed" : []}
for _ in range(n_coalition):
    coalition_sz = np.random.choice(np.arange(len(feature_metadata)), replace=False, p=shapley_kernel(len(feature_metadata)))
    coalition = np.random.permutation(len(feature_metadata))[:coalition_sz]

    coal_feature = orig_feature[coalition]
    non_coal_feature = [col for col in orig_feature if col not in coal_feature]

    coal_data = target.loc[target.index.repeat(len(bg_data)), coal_feature].reset_index(drop=True)
    non_coal_data = bg_data.loc[np.tile(bg_data.index, len(target))].reset_index(drop=True)[non_coal_feature]
    new_data = pd.concat([coal_data, non_coal_data], axis=1)[orig_feature]

    input_data = transform(new_data)
    pred = model.predict_proba(input_data)[:, 1]

    perturbed = pred.reshape(-1, bg_data.shape[0]) @ weight
    
    bin_coal = np.zeros(len(feature_metadata), dtype=int)
    bin_coal[coalition] = 1
    
    record["perturbed"].append(perturbed)
    record["bin_coal"].append(bin_coal)

record["perturbed"] = np.array(record["perturbed"])
record["bin_coal"] = np.array(record["bin_coal"])


In [31]:
def maximize_correlation(X, y, epsilon=1e-6):
    n, m = X.shape
    
    # 평균 중심화
    X_centered = X - X.mean(axis=0)
    y_centered = y - y.mean()
    
    # 공분산 행렬 생성
    cov_matrix = X_centered.T @ X_centered / n
    
    # 정규화 항 추가 (ridge regression 방식)
    regularized_cov = cov_matrix + epsilon * np.eye(m)
    
    # beta 추정값 계산
    beta = np.linalg.solve(regularized_cov, X_centered.T @ y_centered / n)
    
    # 상관계수 최대화를 위해 beta의 크기를 스케일링
    beta /= np.linalg.norm(beta)
    
    return beta


res = maximize_correlation(record['bin_coal'], record['perturbed'])

In [32]:
record['bin_coal'].shape, record['perturbed'].shape

((100, 14), (100, 10))

In [79]:
base_y, base_y > 0.5

(array([1.8035243e-03, 4.8375152e-02, 3.5502409e-04, 7.0616817e-03,
        6.7211664e-01, 1.8720534e-02, 9.6442008e-01, 2.4355875e-02,
        2.2555871e-01, 2.1727468e-01], dtype=float32),
 array([False, False, False, False,  True, False,  True, False, False,
        False]))

In [None]:
import numpy as np
import pandas as pd

def create_masked_vector(explanation, d):
    """
    Create a masked binary vector for MORF or LERF calculation.
    """
    sorted_idx = np.argsort(-explanation)  # 내림차순 정렬
    bin_vec = np.ones((d + 1, d), dtype=int)
    row_indices = np.broadcast_to(np.arange(d + 1)[:, None], (d + 1, d))
    sorted_indices = np.broadcast_to(sorted_idx, (d + 1, d))
    mask = (np.arange(d)[None, :] < np.arange(d + 1)[:, None])
    bin_vec[row_indices[mask], sorted_indices[mask]] = 0
    return bin_vec

def compute_curve(bin_vec, target, bg_data, model, transform, pred_label, weight):
    """
    Compute MORF or LERF curve based on the masked vector.
    """
    orig_feature = bg_data.columns.copy()
    curve = np.zeros(bin_vec.shape[0])
    
    for i, bin_coal in enumerate(bin_vec):
        coalition = np.where(bin_coal == 1)[0]
        coal_feature = orig_feature[coalition]
        non_coal_feature = [col for col in orig_feature if col not in coal_feature]

        coal_data = target.loc[target.index.repeat(len(bg_data)), coal_feature].reset_index(drop=True)
        non_coal_data = bg_data.loc[np.tile(bg_data.index, len(target))].reset_index(drop=True)[non_coal_feature]
        new_data = pd.concat([coal_data, non_coal_data], axis=1)[orig_feature]

        input_data = transform(new_data)
        pred = model.predict_proba(input_data)[:, pred_label]

        # 곡선 업데이트
        curve[i] = pred.reshape(-1, bg_data.shape[0]) @ weight

    return curve

def compute_abpc(X_test, bg_data, model, transform, explanation, weight):
    """
    Compute ABPC based on MORF and LERF curves.
    """
    target = invert_input_array(X_test)
    proba = model.predict_proba(transform(target))
    pred_label = proba.argmax()
    print(pred_label)

    # MORF 곡선 생성
    morf_bin_vec = create_masked_vector(explanation, len(bg_data.columns))
    morf_curve = compute_curve(morf_bin_vec, target, bg_data, model, transform, pred_label, weight)

    # LERF 곡선 생성
    lerf_bin_vec = create_masked_vector(-explanation, len(bg_data.columns))
    lerf_curve = compute_curve(lerf_bin_vec, target, bg_data, model, transform, pred_label, weight)

    if pred_label == 0:
        morf_curve, lerf_curve = lerf_curve, morf_curve

    # ABPC 계산
    abpc = (lerf_curve - morf_curve).mean()
    return abpc

# 실행 예제



In [None]:
for i in range(res.shape[1]):
    explanation = res[:,i]
    abpc = compute_abpc(X_test[[i]], bg_data, model, transform, explanation, weight)
    print(f"ABPC: {abpc:.4f}")

0
ABPC: 0.1735
0
ABPC: 0.2419
0
ABPC: 0.1706
0
ABPC: 0.2825
1
ABPC: 0.3220
0
ABPC: 0.1822
1
ABPC: 0.4171
0
ABPC: 0.1921
0
ABPC: 0.2898
0
ABPC: 0.2680


In [73]:
res.shape

(14, 10)