# COIL-20 Data Preparation

In [6]:
import os
import requests
import zipfile
import glob
import numpy as np
from PIL import Image
from skimage.feature import local_binary_pattern
from skimage.filters import gabor
from skimage.transform import resize


In [12]:
def extract_intensity_feature(img):
    """
    提取强度特征：
      - 将图像转换为灰度图；
      - 将图像调整大小为 32x32；
      - 展平为一个 1024-D 的向量（32x32=1024）。
    """
    gray_img = img.convert("L")
    resized_img = resize(np.array(gray_img), (32, 32), anti_aliasing=True)
    feature = resized_img.flatten()  # 1024-D 向量
    return feature

def extract_lbp_feature(img, desired_dim=3304):
    """
    提取 LBP (局部二值模式) 特征：
      - 将图像转换为灰度图；
      - 计算灰度图的 LBP 图像（使用 uniform 模式）；
      - 将图像划分为不重叠的块，并计算每个块的直方图；
      - 拼接所有直方图，并通过截断或填充将向量调整为 desired_dim（3304）维。
    
    注意：这里使用简单的块直方图策略，实际情况可根据需要调整块大小和直方图的 bin 数。
    """
    gray = np.array(img.convert("L"))
    radius = 1
    n_points = 8
    lbp_image = local_binary_pattern(gray, P=n_points, R=radius, method="uniform")
    
    block_size = 8  # 根据需求调整块大小
    h, w = lbp_image.shape
    features = []
    n_bins = n_points + 2  # 通常 uniform 模式下有 n_points+2 个 bin
    for i in range(0, h, block_size):
        for j in range(0, w, block_size):
            block = lbp_image[i:i+block_size, j:j+block_size]
            hist, _ = np.histogram(block, bins=np.arange(0, n_bins+1), density=True)
            features.extend(hist)
    
    feature_vec = np.array(features)
    if feature_vec.shape[0] > desired_dim:
        feature_vec = feature_vec[:desired_dim]
    elif feature_vec.shape[0] < desired_dim:
        feature_vec = np.pad(feature_vec, (0, desired_dim - feature_vec.shape[0]), mode="constant")
    return feature_vec

def extract_gabor_feature(img, desired_dim=6750):
    """
    提取 Gabor 特征：
      - 将图像转换为灰度图；
      - 使用不同频率和方向的一组 Gabor 滤波器处理图像；
      - 对每个滤波器计算幅值响应；
      - 将每个响应图划分为块，计算块的直方图；
      - 拼接直方图，并通过填充或截断调整维度为 desired_dim（6750）。
    
    注意：这里的参数（滤波器的频率、方向、块大小、直方图的 bin 数）均为示例，实际应用中可能需要根据实验结果调整。
    """
    gray = np.array(img.convert("L"))
    
    frequencies = [0.1, 0.2, 0.3]
    thetas = [0, np.pi/4, np.pi/2, 3*np.pi/4]
    
    features = []
    for frequency in frequencies:
        for theta in thetas:
            filt_real, filt_imag = gabor(gray, frequency=frequency, theta=theta)
            magnitude = np.sqrt(filt_real**2 + filt_imag**2)
            resized_mag = resize(magnitude, (64, 64), anti_aliasing=True)
            
            block_size = 8  # 根据需要调整块大小
            n_bins = 10     # 直方图的 bin 数
            for i in range(0, 64, block_size):
                for j in range(0, 64, block_size):
                    block = resized_mag[i:i+block_size, j:j+block_size]
                    hist, _ = np.histogram(block, bins=n_bins, density=True)
                    features.extend(hist)
    
    feature_vec = np.array(features)
    if feature_vec.shape[0] > desired_dim:
        feature_vec = feature_vec[:desired_dim]
    elif feature_vec.shape[0] < desired_dim:
        feature_vec = np.pad(feature_vec, (0, desired_dim - feature_vec.shape[0]), mode="constant")
    return feature_vec

### 2. 处理 dataset 文件夹下的 COIL-20 图像，并提取三视图特征

def process_coil20(dataset_dir):
    """
    处理 dataset 目录中所有 PNG 图像：
      - 读取每一张图像；
      - 分别提取强度特征（1024-D）、LBP 特征（3304-D）和 Gabor 特征（6750-D）；
      - 返回 3 个 numpy 数组（每个视图一个）以及对应的标签。
    
    假设图像命名格式为 "objXX__YY.png"，其中 XX 为目标类别标签。
    """
    # 获取 dataset 目录下所有 PNG 文件
    image_paths = glob.glob(os.path.join(dataset_dir, "*.png"))
    intensity_features = []
    lbp_features = []
    gabor_features = []
    labels = []
    
    for path in sorted(image_paths):
        try:
            img = Image.open(path)
        except Exception as e:
            print(f"读取图像 {path} 出错：{e}")
            continue
        
        view1 = extract_intensity_feature(img)  # 1024-D
        view2 = extract_lbp_feature(img)         # 3304-D
        view3 = extract_gabor_feature(img)       # 6750-D
        
        intensity_features.append(view1)
        lbp_features.append(view2)
        gabor_features.append(view3)
        
        # 从文件名中提取标签，例如 "obj10__5.png" 则标签为 10
        filename = os.path.basename(path)
        label_str = filename.split("__")[0]
        label = int(''.join(filter(str.isdigit, label_str)))
        labels.append(label)
    
    intensity_features = np.array(intensity_features)
    lbp_features = np.array(lbp_features)
    gabor_features = np.array(gabor_features)
    labels = np.array(labels)
    
    return intensity_features, lbp_features, gabor_features, labels

if __name__ == "__main__":
    # 假设 dataset 文件夹与当前代码文件在同一路径下
    dataset_dir = "./dataset"
    view1_features, view2_features, view3_features, labels = process_coil20(dataset_dir)
    
    # 打印提取特征的维度，验证是否符合预期
    print("Intensity features (View 1) shape:", view1_features.shape)  # 预期形状: (n_samples, 1024)
    print("LBP features (View 2) shape:", view2_features.shape)         # 预期形状: (n_samples, 3304)
    print("Gabor features (View 3) shape:", view3_features.shape)         # 预期形状: (n_samples, 6750)
    print("Labels shape:", labels.shape)

Intensity features (View 1) shape: (1440, 1024)
LBP features (View 2) shape: (1440, 3304)
Gabor features (View 3) shape: (1440, 6750)
Labels shape: (1440,)


In [16]:
features_list = [view1_features, view2_features, view3_features]
for idx, view in enumerate(features_list):
    print(f"View {idx+1} shape:", view.shape)

View 1 shape: (1440, 1024)
View 2 shape: (1440, 3304)
View 3 shape: (1440, 6750)


# Single View

In [86]:
import numpy as np
from scipy.linalg import eigh
import urllib.request
import matplotlib.pyplot as plt
from sklearn.datasets import make_moons
from tqdm import tqdm 

In [87]:
def project_simplex(v):
    n = len(v)
    u = np.sort(v)[::-1]
    cssv = np.cumsum(u)
    rho = np.nonzero(u + (1 - cssv) / (np.arange(n) + 1) > 0)[0][-1]
    theta = (cssv[rho] - 1) / (rho + 1)
    w = np.maximum(v - theta, 0)
    return w

def compute_laplacian(S):
    D = np.diag(S.sum(axis=1))
    L = D - S
    return L

def update_S(Q, beta):
    n, c = Q.shape
    S = np.zeros((n, n))
    Q_norms = np.sum(Q**2, axis=1, keepdims=True)
    dist_sq = Q_norms + Q_norms.T - 2 * np.dot(Q, Q.T)
    dist_sq = np.maximum(dist_sq, 0)
    
    for j in range(n):
        g_j = dist_sq[:, j]
        v = -g_j / (2 * beta)
        s_j = project_simplex(v)
        S[:, j] = s_j
        
    return S

def update_Q(L, c):
    eigvals, eigvecs = eigh(L)
    Q = eigvecs[:, :c]
    return Q, eigvals

In [88]:
def make_single_view_graph(single_view_graph_X, class_number, default_beta=1.0):
    
    single_view_graph = []
    
    for i in tqdm(range(len(single_view_graph_X))):
        
        # init
        beta = default_beta
        S = update_S(single_view_graph_X[i], beta)
        S = (S + S.T) / 2
        L = compute_laplacian(S)
        Q = update_Q(L, class_number)
        Q, eigvals = update_Q(L, class_number)

        for j in range(100):
            S = update_S(Q, beta)
            L = compute_laplacian(S)
            Q_old = Q
            Q, eigvals = update_Q(L, class_number)

            fn1 = np.sum(eigvals[:class_number])
            fn2 = np.sum(eigvals[:class_number+1])
            # print("L_rank",L_rank, "beta", beta)
            if fn1 > 0.00000000001:
                beta = beta * 2
                #tqdm.write(f"{i}th graph end at {j}th iteration, smallest eigenvalues: {eigvals[:class_number]}")
            elif fn2 < 0.00000000001:
                beta = beta / 2
                Q = Q_old
                #tqdm.write(f"{i}th graph end at {j}th iteration, smallest eigenvalues: {eigvals[:class_number]}")
            else:
                num_zero_eig = np.sum(eigvals < 1e-5)
                tqdm.write(f"Converged: the Laplacian has {num_zero_eig} (>= {class_number}) zero eigenvalues.")
                break
                
        single_view_graph.append(S)
        
    return single_view_graph


In [89]:
class_number = 20
single_view_graph = make_single_view_graph(features_list, class_number)

100%|██████████| 3/3 [03:30<00:00, 70.08s/it] 

Converged: the Laplacian has 20 (>= 20) zero eigenvalues.





# Global Graph


In [90]:
def global_graph_learning(S_list, c, gamma=1.0, max_iter=100, tol=1e-4):
    """
    Input:
        S_list  : a list with each element being a single-view graph (n x n matrix), total of nv views.
        c       : number of clusters (global graph A should have exactly c connected components).
        gamma   : trade-off parameter in the global graph objective function.
        max_iter: maximum number of iterations.
        tol     : tolerance for convergence.

    Output:
        A       : the learned global graph (n x n matrix)
        P       : spectral embedding matrix (n x c matrix)
        W       : weight matrix (nv x n) where each column corresponds to the weights for each view for a data point.
    """
    nv = len(S_list)  # number of views
    n = S_list[0].shape[0]  # number of data points

    # Initialize: set all view weights for each data point to 1/nv
    W = np.full((nv, n), 1.0 / nv)

    # Initialize the global graph A using the weighted-sum rule from S_list
    A = np.zeros((n, n))
    for j in range(n):
        # For the j-th column corresponding to data point j
        a_j = np.zeros(n)
        for v in range(nv):
            a_j += W[v, j] * S_list[v][:, j]
        A[:, j] = a_j
    # Enforce symmetry for A
    A = (A + A.T) / 2

    # Compute the initial Laplacian and spectral embedding matrix P
    L = compute_laplacian(A)
    P, eigvals = update_Q(L, c)

    # Main iterative process
    for it in tqdm(range(max_iter), desc="Global Graph Learning"):
        A_old = A.copy()

        # =============================
        # 1. Update the global graph A: update each column a_j for data point j
        # =============================
        for j in range(n):
            # Compute h_j, where h_j[i] = ||P[i, :] - P[j, :]||^2
            p_j = P[j, :]
            h_j = np.sum((P - p_j)**2, axis=1)
            # Compute the weighted sum from single-view graphs: s_bar = sum_v W[v, j] * S_list[v][:, j]
            s_bar = np.zeros(n)
            for v in range(nv):
                s_bar += W[v, j] * S_list[v][:, j]
            # Construct the updating vector v_vec = s_bar - (gamma/2)*h_j
            v_vec = s_bar - (gamma / 2.0) * h_j
            # Update a_j by projecting v_vec onto the probability simplex
            A[:, j] = project_simplex(v_vec)

        # Enforce symmetry for A
        A = (A + A.T) / 2

        # =============================
        # 2. Update the spectral embedding matrix P
        # =============================
        L = compute_laplacian(A)
        P, eigvals = update_Q(L, c)

        # Check condition: if the Laplacian L has at least c zero eigenvalues, we assume the ideal graph structure is reached.
        num_zero_eig = np.sum(eigvals < 1e-5)
        if num_zero_eig >= c:
            tqdm.write(f"Converged at iteration {it}: {num_zero_eig} zero eigenvalues (>= {c}).")
            break

        # =============================
        # 3. Update the weight matrix W
        # For each data point j, update its view weights based on the difference between A and each S_list[:, j].
        # =============================
        for j in range(n):
            # Construct matrix Z_j of size n x nv: the v-th column is A[:, j] - S_list[v][:, j]
            Z_j = np.column_stack([A[:, j] - S_list[v][:, j] for v in range(nv)])
            M = np.dot(Z_j.T, Z_j)  # size nv x nv
            # Add a small regularizer for numerical stability
            reg = 1e-8 * np.eye(nv)
            try:
                M_inv = np.linalg.inv(M + reg)
            except np.linalg.LinAlgError:
                M_inv = np.linalg.pinv(M + reg)
            ones = np.ones((nv, 1))
            w_j = np.dot(M_inv, ones)
            w_j = w_j / np.sum(w_j)  # normalize so the sum equals 1
            # Update the j-th column of weight matrix W
            for v in range(nv):
                W[v, j] = w_j[v, 0]

        # Optionally: check if A has converged by monitoring relative change
        if np.linalg.norm(A - A_old, 'fro') / np.linalg.norm(A_old, 'fro') < tol:
            tqdm.write(f"A converged at iteration {it} with relative change below {tol}.")
            break

    return A



In [96]:
def init_W(single_view_graph):
    W = [np.full(single_view_graph[0].shape, 1/len(single_view_graph))] * len(single_view_graph)
    return W

def init_A(single_view_graph, W):
    A = np.sum(single_view_graph, axis=0) * W[0]
    return A

def init_P(A,c):
    L = compute_laplacian(A)
    P, eigvals = update_Q(L, c)
    return P, eigvals

def update_A(P, w_list, s_list, gamma=1.0):
    n = P.shape[0]
    c = P.shape[1]
    m = len(w_list)

    H = np.sum((P[:, np.newaxis, :] - P)**2, axis=2)
    
    A = np.zeros((n, n))
    
    for j in range(c):
        h_j = H[:, j]
    
        sum_term = np.zeros(n)
        for v in range(m):
            w_jv = w_list[v][:, j]
            s_jv = s_list[v][:, j] 
            sum_term += w_jv * s_jv  
        intermediate = (((gamma / 2.0) * (h_j)) - sum_term)
        
    eta = (1 + np.sum(intermediate)) / n
    a_j = np.maximum(0, -intermediate + eta)
    A[j] = a_j

    return A


def update_P(L, c):
    eigvals, eigenvectors = np.linalg.eigh(L)
    Q = eigenvectors[:, :c]
    return Q, eigvals


def compute_W(a, s_list):
    v, n, _ = np.shape(s_list) 
    w_list = []

    for i in range(v):
        wv = np.zeros((n, n)) 
        for j in range(n):
            Z_j = a[:,j] - s_list[i][:,j]
            Z_j = Z_j.reshape(1, -1) 
            one_vector = np.ones((n, 1)) 
            
            # try: # takes forever
            #     print("not triggered", j)
            #     ZTZ_inv = np.linalg.pinv(Z_j.T @ Z_j)  # (Z_j^T Z_j)^{-1}
            #     w_jv = (ZTZ_inv @ one_vector) * (1 / (one_vector.T @ ZTZ_inv @ one_vector))
            # except:
            #     print("triggered", j)
            #     w_jv = np.zeros((1, n))
                
            sum_Z = np.sum(Z_j)
            if np.isclose(sum_Z, 0.0):
                w_jv = np.zeros((1, n))
            else:
                w_jv = Z_j.T / sum_Z

            wv[:,j] = w_jv.reshape(-1) / np.sum(w_jv)
        w_list.append(wv)

    return w_list

def make_global_graph(single_view_graph, class_number, default_gamma=1.0):
    
    # init
    W = init_W(single_view_graph)
    A = init_A(single_view_graph, W)
    P, eigvals= init_P(A, class_number)
    gamma = default_gamma
    
    for j in tqdm(range(100)):
        A = update_A(P, W, single_view_graph)
        L = compute_laplacian(A)
        P, eigvals = update_P(L, class_number)
        W = compute_W(A, single_view_graph)
        fn1 = np.sum(eigvals[:class_number])
        fn2 = np.sum(eigvals[:class_number+1])
        # print("L_rank",L_rank, "beta", beta)
        if fn1 > 0.00000000001:
            gamma = gamma * 2
            #tqdm.write(f"{i}th graph end at {j}th iteration, smallest eigenvalues: {eigvals[:class_number]}")
        elif fn2 < 0.00000000001:
            gamma = gamma / 2
            #tqdm.write(f"{i}th graph end at {j}th iteration, smallest eigenvalues: {eigvals[:class_number]}")
        else:
            num_zero_eig = np.sum(eigvals < 1e-5)
            tqdm.write(f"Converged: the Laplacian has {num_zero_eig} (>= {class_number}) zero eigenvalues.")
            break
        
    return L


In [97]:
global_graph = make_global_graph(single_view_graph, class_number)

100%|██████████| 100/100 [01:37<00:00,  1.03it/s]


In [98]:
import numpy as np
from sklearn.cluster import KMeans

def cluster(laplacian, n_clusters):
    eigenvalues, eigenvectors = np.linalg.eigh(laplacian)
    X = eigenvectors[:, :n_clusters]
    kmeans = KMeans(n_clusters=n_clusters, random_state=0).fit(X)
    return kmeans.labels_

# get clustering results
single_view_graph_labels = []
for i in range(len(single_view_graph)):
    single_view_graph_labels.append(cluster(single_view_graph[i], class_number))

global_graph_labels = cluster(global_graph, class_number)

In [99]:
import numpy as np
from scipy.optimize import linear_sum_assignment
from sklearn.metrics import normalized_mutual_info_score, adjusted_rand_score
from sklearn.metrics.cluster import contingency_matrix

def cluster_accuracy(y_true, y_pred):
    acc = np.mean(y_pred == y_true)
    return acc

def purity_score(y_true, y_pred):
    contingency = contingency_matrix(y_true, y_pred)
    return np.sum(np.amax(contingency, axis=0)) / np.sum(contingency)

def pairwise_precision_recall_fscore(y_true, y_pred):

    def get_pairs(labels):
        pairs = set()
        for label in np.unique(labels):
            indices = np.where(labels == label)[0]
            for i in range(len(indices)):
                for j in range(i + 1, len(indices)):
                    pairs.add((indices[i], indices[j]))
        return pairs

    true_pairs = get_pairs(y_true)
    pred_pairs = get_pairs(y_pred)
    
    tp = len(true_pairs & pred_pairs)
    fp = len(pred_pairs - true_pairs)
    fn = len(true_pairs - pred_pairs)

    precision = tp / (tp + fp) if tp + fp > 0 else 0
    recall = tp / (tp + fn) if tp + fn > 0 else 0
    f_score = 2 * precision * recall / (precision + recall) if precision + recall > 0 else 0
    
    return precision, recall, f_score

def evaluate_clustering(y_true, y_pred):
    
    # remapping 
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    assert y_true.shape == y_pred.shape

    labels = np.unique(y_true)
    pred_labels = np.unique(y_pred)
    cost_matrix = -contingency_matrix(y_true, y_pred)

    row_ind, col_ind = linear_sum_assignment(cost_matrix)
    best_mapping = {pred_labels[col]: labels[row] for row, col in zip(row_ind, col_ind)}

    y_pred_mapped = np.array([best_mapping[label] for label in y_pred])

    # evaluate
    acc = cluster_accuracy(y_true, y_pred_mapped)
    nmi = normalized_mutual_info_score(y_true, y_pred)
    purity = purity_score(y_true, y_pred_mapped)
    precision, recall, f_score = pairwise_precision_recall_fscore(y_true, y_pred_mapped)
    ari = adjusted_rand_score(y_true, y_pred_mapped)

    return {
        "ACC": acc,
        "NMI": nmi,
        "Purity": purity,
        "Precision": precision,
        "Recall": recall,
        "F-score": f_score,
        "ARI": ari
    }


In [100]:
metrics = evaluate_clustering(labels, global_graph_labels)
print(metrics)

{'ACC': 0.050694444444444445, 'NMI': 0.026641223449763655, 'Purity': 0.06319444444444444, 'Precision': 0.049500946566096084, 'Recall': 0.9769561815336463, 'F-score': 0.0942275218625888, 'ARI': 0.000338506565232005}
