1. sklearn - AgglomerativeClustering

In [2]:
from sklearn.cluster import AgglomerativeClustering
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Load data
iris = load_iris()
X, y = iris.data, iris.target

# Split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=1)

# Train & Predict
clf = AgglomerativeClustering(n_clusters=2)
clf.fit(X_train, y_train)
y_pred = clf.fit_predict(X_test)

# Accuracy
print("Accuracy:", accuracy_score(y_test, y_pred))

Accuracy: 0.0


2. scratch - AgglomerativeClustering

In [1]:
import numpy as np
from collections import Counter
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score


# Euclidean distance
def euclidean_dist(a, b):
    return np.sqrt(np.sum((a - b) ** 2))

# Agglomerative clustering (single-link)
def agglomerative_clustering(X, n_clusters):
    clusters = [[i] for i in range(len(X))]
    distances = {}

    while len(clusters) > n_clusters:
        min_dist = float('inf')
        pair = None

        for i in range(len(clusters)):
            for j in range(i + 1, len(clusters)):
                key = (i, j)
                if key not in distances:
                    d = min(euclidean_dist(X[p1], X[p2]) for p1 in clusters[i] for p2 in clusters[j])
                    distances[key] = d
                else:
                    d = distances[key]

                if d < min_dist:
                    min_dist = d
                    pair = (i, j)

        i, j = pair
        clusters[i].extend(clusters[j])
        del clusters[j]

        # Remove distances with old index j
        distances = {k: v for k, v in distances.items() if j not in k}

    labels = np.zeros(len(X), dtype=int)
    for cluster_id, indices in enumerate(clusters):
        for idx in indices:
            labels[idx] = cluster_id
    return labels

# Assign test samples to nearest cluster center
def assign_to_cluster(test_data, train_data, train_labels, n_clusters):
    centers = []
    for k in range(n_clusters):
        cluster_points = train_data[train_labels == k]
        center = np.mean(cluster_points, axis=0)
        centers.append(center)

    test_labels = []
    for point in test_data:
        dists = [euclidean_dist(point, center) for center in centers]
        test_labels.append(np.argmin(dists))
    return np.array(test_labels)

# Map predicted cluster to majority true label
def map_cluster_to_label(cluster_pred, true_labels):
    mapping = {}
    for cluster in set(cluster_pred):
        true_vals = true_labels[cluster_pred == cluster]
        majority = Counter(true_vals).most_common(1)[0][0]
        mapping[cluster] = majority
    return np.array([mapping[c] for c in cluster_pred])


# Load data
iris = load_iris()
X, y = iris.data, iris.target

# Use only two classes (binary clustering)
X = X[y != 2]
y = y[y != 2]

# Split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=1)

# Train: Agglomerative clustering
train_labels = agglomerative_clustering(X_train, n_clusters=2)

# Test: Assign to nearest cluster center
test_pred = assign_to_cluster(X_test, X_train, train_labels, n_clusters=2)

# Map predicted cluster to true label
test_pred_mapped = map_cluster_to_label(test_pred, y_test)

# Evaluate
print("Accuracy:", accuracy_score(y_test, test_pred_mapped))


Accuracy: 1.0


3. scratch - Divisive Clustering

In [2]:
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from collections import Counter


# 计算欧几里得距离
def euclidean_distance(a, b):
    return np.sqrt(np.sum((a - b) ** 2))

# 简单手写 KMeans（k=2）
def custom_kmeans(X, max_iters=100):
    # 随机选两个初始中心
    np.random.seed(0)
    indices = np.random.choice(len(X), 2, replace=False)
    centroids = X[indices]

    for _ in range(max_iters):
        clusters = [[] for _ in range(2)]

        # 分配每个点到最近中心
        for x in X:
            distances = [euclidean_distance(x, c) for c in centroids]
            cluster_id = np.argmin(distances)
            clusters[cluster_id].append(x)

        # 更新中心
        new_centroids = []
        for cluster in clusters:
            if cluster:
                new_centroids.append(np.mean(cluster, axis=0))
            else:
                # 空簇随机初始化
                new_centroids.append(X[np.random.randint(len(X))])
        
        new_centroids = np.array(new_centroids)

        if np.allclose(centroids, new_centroids):
            break
        centroids = new_centroids

    # 输出标签
    labels = np.zeros(len(X), dtype=int)
    for i, x in enumerate(X):
        distances = [euclidean_distance(x, c) for c in centroids]
        labels[i] = np.argmin(distances)

    return labels, centroids

# Divisive Clustering 核心逻辑
def divisive_clustering(X, n_clusters=2):
    clusters = [np.arange(len(X))]  # 每个 cluster 是 index 的集合
    labels = np.zeros(len(X), dtype=int)

    while len(clusters) < n_clusters:
        # 选择最大簇分裂
        cluster_sizes = [len(c) for c in clusters]
        idx_to_split = np.argmax(cluster_sizes)
        indices = clusters.pop(idx_to_split)

        if len(indices) < 2:
            clusters.append(indices)
            continue

        subX = X[indices]
        sub_labels, _ = custom_kmeans(subX)

        # 生成两个新簇
        cluster0 = indices[sub_labels == 0]
        cluster1 = indices[sub_labels == 1]
        clusters.append(cluster0)
        clusters.append(cluster1)

    # 根据最终 clusters 设置标签
    for label_id, idx_list in enumerate(clusters):
        for i in idx_list:
            labels[i] = label_id

    return labels


# 使用最近中心点分配测试集
def assign_test_labels(X_train, train_labels, X_test, n_clusters=2):
    centroids = []
    for k in range(n_clusters):
        points = X_train[train_labels == k]
        center = np.mean(points, axis=0)
        centroids.append(center)

    pred_labels = []
    for x in X_test:
        distances = [euclidean_distance(x, c) for c in centroids]
        pred_labels.append(np.argmin(distances))
    return np.array(pred_labels)



# 映射 cluster label -> true label（用 majority voting）
def map_cluster_to_true(test_pred, y_test):
    mapping = {}
    for cluster in set(test_pred):
        majority = Counter(y_test[test_pred == cluster]).most_common(1)[0][0]
        mapping[cluster] = majority
    return np.array([mapping[p] for p in test_pred])

# Load dataset
iris = load_iris()
X, y = iris.data, iris.target

# 只用前两类，变成二分类问题
X = X[y != 2]
y = y[y != 2]

# Split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=1)

# 训练集做分裂
train_labels = divisive_clustering(X_train, n_clusters=2)

test_pred = assign_test_labels(X_train, train_labels, X_test)

mapped = map_cluster_to_true(test_pred, y_test)

# 输出准确率
print("Accuracy:", accuracy_score(y_test, mapped))


Accuracy: 1.0
