In [1]:
from sklearn import datasets
from sklearn.model_selection import train_test_split

# Generate data
X, y = datasets.make_blobs(
    n_samples=[125 * 5, 125 * 2],
    n_features=2,
    centers=[(9.5, 10), (10, 9.4)],
    cluster_std=[[0.6, 0.6], [0.35, 0.3]],
    shuffle=True
)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

Give a first look of compare between no classes_wise Kmeans and classes_wise Kmeans.

In [2]:
from dbc import compute_conditional_risk
from dbc.main import KmeansDiscreteMinmaxClassifier
from scipy.spatial import Voronoi, voronoi_plot_2d
import numpy as np
from matplotlib import pyplot as plt

# No classes_wise Kmeans
DMC_Kmeans = KmeansDiscreteMinmaxClassifier(n_clusters=8)
DMC_Kmeans.fit(X_train, y_train)

# Train set pred
y_pred_train = DMC_Kmeans.predict(X_train)
conditional_risk_train = compute_conditional_risk(y_train, y_pred_train)
print(f'No classes wise Kmeans DMC train class condition risk: {conditional_risk_train[0]}')

# Test set pred
y_pred_test = DMC_Kmeans.predict(X_test)
conditional_risk_test = compute_conditional_risk(y_test, y_pred_test)
print(f'No classes wise Kmeans DMC test class condition risk: {conditional_risk_test[0]}')


# Classes_wise Kmeans
DMC_Kmeans_classes_wise = KmeansDiscreteMinmaxClassifier(n_clusters={0:6, 1:2}, classes_wise=True)
DMC_Kmeans_classes_wise.fit(X_train, y_train)

# Train set pred
y_pred_train = DMC_Kmeans_classes_wise.predict(X_train)
conditional_risk_train = compute_conditional_risk(y_train, y_pred_train)
print(f'Classes wise Kmeans DMC train class condition risk: {conditional_risk_train[0]}')

# Test set pred
y_pred_test = DMC_Kmeans_classes_wise.predict(X_test)
conditional_risk_test = compute_conditional_risk(y_test, y_pred_test)
print(f'Classes wise Kmeans DMC test class condition risk: {conditional_risk_test[0]}')

# Plot the decision regions
padding = 1

fig, ax = plt.subplots(1, 2, figsize=(12, 5))
x_min, x_max = X_train[:, 0].min() - padding, X_train[:, 0].max() + padding
y_min, y_max = X_train[:, 1].min() - padding, X_train[:, 1].max() + padding
xx, yy = np.meshgrid(np.arange(x_min, x_max, 0.01),
                     np.arange(y_min, y_max, 0.01))

voronoi_plot_2d(Voronoi(DMC_Kmeans.cluster_centers), show_points=False, show_vertices=False, s=1, ax=ax[0])
voronoi_plot_2d(Voronoi(DMC_Kmeans_classes_wise.cluster_centers), show_points=False, show_vertices=False, s=1, ax=ax[1])

Z_DMC_kmeans = DMC_Kmeans.predict_prob(np.c_[xx.ravel(), yy.ravel()])
Z_DMC_kmeans_classes_wise = DMC_Kmeans_classes_wise.predict_prob(np.c_[xx.ravel(), yy.ravel()])

Z_DMC_kmeans_reshape = np.argmax(Z_DMC_kmeans, axis=1).reshape(xx.shape)
Z_DMC_kmeans_classes_wise_reshape = np.argmax(Z_DMC_kmeans_classes_wise, axis=1).reshape(xx.shape)

contour1 = ax[0].contourf(xx, yy, Z_DMC_kmeans_reshape, cmap='tab10', alpha=0.6)
contour2 = ax[1].contourf(xx, yy, Z_DMC_kmeans_classes_wise_reshape, cmap='tab10', alpha=0.6)

colors = ['skyblue', 'firebrick', 'forestgreen']
markers = ['o', 's', '^']  # 圆形、方形、三角形

for i, (c, m) in enumerate(zip(colors, markers)):
    ax[0].scatter(
        X_train[y_train == i, 0],
        X_train[y_train == i, 1],
        color=c,
        edgecolor='k',
        marker=m,
        s=40,
        label=f'Class {i}'
    )
    ax[1].scatter(
        X_train[y_train == i, 0],
        X_train[y_train == i, 1],
        color=c,
        edgecolor='k',
        marker=m,
        s=40,
        label=f'Class {i}'
    )
ax[0].set_title('No classes wise Kmeans DMC')
ax[1].set_title('Classes wise Kmeans DMC')
plt.colorbar(contour2, ax=ax, ticks=[0, 1, 2], label='Predicted class')
plt.show()

No classes wise Kmeans DMC train class condition risk: [0.07645875 0.45320197]
No classes wise Kmeans DMC test class condition risk: [0.0546875  0.44680851]
{0: 6, 1: 2}
None
None


InvalidParameterError: The 'n_clusters' parameter of KMeans must be an int in the range [1, inf). Got None instead.

In [None]:
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.metrics import make_scorer
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from ucimlrepo import fetch_ucirepo
from sklearn.preprocessing import LabelEncoder
from dbc.utils import compute_conditional_risk


def make_preprocessor(X):
    num_features = X.select_dtypes(include=["int64", "float64"]).columns.tolist()
    cat_features = X.select_dtypes(include=["object", "category"]).columns.tolist()

    num_transformer = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="mean")),
        ("scaler", StandardScaler())
    ])

    cat_transformer = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False))
    ])

    preprocessor = ColumnTransformer(
        transformers=[
            ("num", num_transformer, num_features),
            ("cat", cat_transformer, cat_features)
        ]
    )
    return preprocessor

def global_risk(y_true, y_pred):
    return np.mean(y_true != y_pred)

def max_gap(y_true, y_pred):
    R = compute_conditional_risk(y_true, y_pred)[0]
    return np.max(R) - np.min(R)

def variances_Rk(y_true, y_pred):
    R = compute_conditional_risk(y_true, y_pred)[0]
    return np.var(R)

def max_Rk(y_true, y_pred):
    R = compute_conditional_risk(y_true, y_pred)[0]
    return np.max(R)

def print_results(results):
    print("Train global_risk: %.3f (± %.3f)" %
          (-results["train_global_risk"].mean(), results["train_global_risk"].std()))
    print("Train max_Rk: %.3f (± %.3f)" %
          (-results["train_max_Rk"].mean(), results["train_max_Rk"].std()))
    print("Train max_gap: %.3f (± %.3f)" %
          (-results["train_max_gap"].mean(), results["train_max_gap"].std()))
    print("Train variances_Rk: %.3f (± %.3f)" %
          (-results["train_variances_Rk"].mean(), results["train_variances_Rk"].std()))

    print("Test global_risk: %.3f (± %.3f)" %
          (-results["test_global_risk"].mean(), results["test_global_risk"].std()))
    print("Test max_Rk: %.3f (± %.3f)" %
          (-results["test_max_Rk"].mean(), results["test_max_Rk"].std()))
    print("Test max_gap: %.3f (± %.3f)" %
          (-results["test_max_gap"].mean(), results["test_max_gap"].std()))
    print("Test variances_Rk: %.3f (± %.3f)" %
          (-results["test_variances_Rk"].mean(), results["test_variances_Rk"].std()))

scoring = {
    "global_risk": make_scorer(global_risk, greater_is_better=False),
    "max_Rk": make_scorer(max_Rk, greater_is_better=False),
    "max_gap": make_scorer(max_gap, greater_is_better=False),
    "variances_Rk": make_scorer(variances_Rk, greater_is_better=False),
}

rskf = RepeatedStratifiedKFold(n_splits=5, n_repeats=20, random_state=42)


# df = fetch_ucirepo(id=17)  # Breast Cancer Wisconsin
# df = fetch_ucirepo(id=15)  # Breast Cancer Wisconsin
# df = fetch_ucirepo(id=53)  # IRIS
# df = fetch_ucirepo(id=186)  # Wine quality
# df = fetch_ucirepo(id=2)  # Adult
# df = fetch_ucirepo(id=222)  # Bank Marketing
# df = fetch_ucirepo(id=19)  # Car Evaluation(表现很差)

# SPDMC参数似乎是越大越好70 1.4，没测试更高的
# df = fetch_ucirepo(id=59)  # Letter Recognition(不知道为什么这个数据集DMC和SPDMC效果很差,难道是分类类别过多导致的？)
df = fetch_ucirepo(id=149)  # Statlog (Vehicle Silhouettes)(如果SPDMC不收敛记得移除类别只有1个的)

X = df.data.features.replace("?", np.nan)  # Only for adult

X = df.data.features
X = make_preprocessor(X).fit_transform(X)
y = df.data.targets.values.ravel()

# Onlu for Statlog
mask = y != '204'
X = X.iloc[mask] if hasattr(X, 'iloc') else X[mask]
y = y[mask]

y = LabelEncoder().fit_transform(y)
np.bincount(y)


In [None]:
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
import numpy as np
import matplotlib.pyplot as plt

def optimal_kmeans_by_silhouette(X, k_range=(2, 10), random_state=0, plot=False):
    """
    使用轮廓系数（Silhouette Score）自动选择最优聚类数的 KMeans 聚类。

    参数:
        X : ndarray, shape (n_samples, n_features)
            数据矩阵。
        k_range : tuple(int, int)
            要测试的聚类数范围 (min_k, max_k)。
        random_state : int
            随机种子，保证结果可复现。
        plot : bool
            是否绘制轮廓系数随 K 变化的曲线。

    返回:
        best_k : int
            最优聚类数。
        best_model : sklearn.cluster.KMeans
            对应的最优 KMeans 模型。
        silhouette_scores : dict
            每个 K 对应的轮廓系数得分。
    """
    min_k, max_k = k_range
    silhouette_scores = {}

    for k in range(min_k, max_k + 1):
        kmeans = KMeans(n_clusters=k, random_state=random_state)
        labels = kmeans.fit_predict(X)
        score = silhouette_score(X, labels)
        silhouette_scores[k] = score
        if plot:
            print(f"K={k}: Silhouette score = {score:.4f}")

    # 选择最高得分的 k
    best_k = max(silhouette_scores, key=silhouette_scores.get)
    best_model = KMeans(n_clusters=best_k, random_state=random_state).fit(X)

    if plot:
        plt.figure(figsize=(6,4))
        plt.plot(list(silhouette_scores.keys()), list(silhouette_scores.values()), 'o-', color='navy')
        plt.axvline(best_k, color='red', linestyle='--', label=f'Best K = {best_k}')
        plt.xlabel("Number of clusters (K)")
        plt.ylabel("Silhouette Score")
        plt.title("Optimal K Selection by Silhouette Coefficient")
        plt.legend()
        plt.tight_layout()
        plt.show()

    return best_k, best_model, silhouette_scores

def optimal_kmeans_by_silhouette_all_class(X, y, k_range=(2, 10), random_state=0, plot=False):
    n_classes = len(np.unique(y))
    clusters_dict = {}
    for i in range(n_classes):
        clusters_dict[i] = optimal_kmeans_by_silhouette(X[y==i], k_range=(2, 12))[0]
    return clusters_dict

dict = optimal_kmeans_by_silhouette_all_class(X,y)

In [None]:
from sklearn.model_selection import cross_validate

results_DMC_classes_wise = cross_validate(
    KmeansDiscreteMinmaxClassifier(n_clusters=dict, classes_wise=True),
    X, y,
    cv=rskf,
    scoring=scoring,
    return_train_score=True,
    n_jobs=-1,
)

In [None]:
from sklearn.utils import estimator_html_repr
from dbc.main import KmeansDiscreteMinmaxClassifier

est = KmeansDiscreteMinmaxClassifier(n_clusters=dict)
print(est.get_params())