# Shape Clustering and Persistence Analysis

이 노트북은 기본 도형(원, 정사각형, 별, 삼각형)의 경계를 1차원 극좌표 곡선으로 변환하고, 퍼시스턴스 이미지를 활용해 군집화를 수행한 뒤 결과를 시각화/해석한다.

In [None]:
import os

import numpy as np
import pandas as pd
import matplotlib.cm as cm
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
from matplotlib import image as mpimg

import gudhi as gd
from gudhi.representations.vector_methods import PersistenceImage
from sklearn.cluster import KMeans

from scipy.fft import fft
from scipy.interpolate import PchipInterpolator
from scipy.stats import ttest_ind

from skimage import measure
from skimage.color import rgb2gray
from skimage.filters import threshold_otsu
from skimage.measure import label as sk_label, regionprops, regionprops_table


In [None]:
plt.style.use("seaborn-v0_8")
np.random.seed(0)

SUBJECTS = ["circle", "square", "star", "triangle"]
SAMPLES_PER_CLASS = 100
FOURIER_SAMPLES = 360
FREQUENCIES = np.array([1, 2, 3, 4, 5])
N_CLUSTERS = 5
PERSISTENCE_DIRECTION = np.array([1.0, 1.0, 1.0, 1.0, 0.2], dtype=float)
PERSISTENCE_ENDPOINT = np.zeros_like(PERSISTENCE_DIRECTION)
PERSISTENCE_RESOLUTION = 50
SHAPES_ROOT = "shapes"
STAR_LABEL = SUBJECTS.index("star")


In [None]:
def convert_to_1d_polar(file_path: str, K: int = FOURIER_SAMPLES) -> np.ndarray:
    """이미지를 각도-반지름 곡선 r(θ)으로 변환한다."""
    img = plt.imread(file_path).astype(float)

    if img.ndim == 3:
        img = rgb2gray(img)
    if img.max() > 1:
        img /= 255.0

    threshold = threshold_otsu(img)
    mask = img < threshold

    lab = sk_label(mask)
    regions = regionprops(lab)
    if not regions:
        raise ValueError(f"No valid region found in {file_path!r}.")
    region = max(regions, key=lambda r: r.area)
    cy, cx = region.centroid

    contours = measure.find_contours(mask.astype(float), 0.5)
    if not contours:
        raise ValueError(f"No contour detected in {file_path!r}.")
    contour = max(contours, key=len)

    dy = contour[:, 0] - cy
    dx = contour[:, 1] - cx
    theta = np.mod(np.arctan2(dy, dx), 2 * np.pi)
    radius = np.hypot(dx, dy)

    order = np.argsort(theta)
    theta = theta[order]
    radius = radius[order]

    if len(theta) > 1:
        keep = [0]
        for idx in range(1, len(theta)):
            if theta[idx] - theta[keep[-1]] > 1e-8:
                keep.append(idx)
        theta = theta[keep]
        radius = radius[keep]

    theta_ext = np.concatenate([theta - 2 * np.pi, theta, theta + 2 * np.pi])
    radius_ext = np.concatenate([radius, radius, radius])
    interpolator = PchipInterpolator(theta_ext, radius_ext, extrapolate=False)

    theta_new = np.linspace(0, 2 * np.pi, K, endpoint=False)
    theta_query = ((theta_new - theta.min()) % (2 * np.pi)) + theta.min()
    r_theta = interpolator(theta_query)
    return np.clip(r_theta, 0.0, None)


In [None]:
def load_polar_dataset(subjects, root_dir, samples_per_class, K=FOURIER_SAMPLES):
    total = len(subjects) * samples_per_class
    data = np.zeros((total, K), dtype=float)
    labels = np.zeros(total, dtype=int)
    sample_ids = np.zeros(total, dtype=int)

    for subject_idx, subject in enumerate(subjects):
        for sample_idx in range(samples_per_class):
            row = subject_idx * samples_per_class + sample_idx
            file_path = os.path.join(root_dir, subject, f"{sample_idx}.png")
            data[row] = convert_to_1d_polar(file_path, K=K)
            labels[row] = subject_idx
            sample_ids[row] = sample_idx
    return data, labels, sample_ids


data, labels, sample_ids = load_polar_dataset(SUBJECTS, SHAPES_ROOT, SAMPLES_PER_CLASS)
print(f"signals: {data.shape}, labels: {labels.shape}")


In [None]:
def compute_persistence_diagram(signal, direction, endpoint):
    max_freq = len(direction)
    norm = np.sqrt(max_freq)
    birth = (-endpoint) / (norm * direction)
    fft_coeff = np.abs(fft(signal))[1:max_freq + 1] / signal.shape[0]
    death = (2 * np.sqrt(3) * fft_coeff - endpoint) / (norm * direction)
    return np.vstack([birth, death]).T


def build_persistence_features(signals, direction, endpoint, resolution):
    diagrams = [compute_persistence_diagram(signal, direction, endpoint) for signal in signals]
    max_value = max(diag[:, 1].max() for diag in diagrams)

    imager = PersistenceImage(
        bandwidth=0.05,
        weight=lambda x: x[1],
        resolution=[resolution, resolution],
        im_range=[0, max_value, 0, max_value],
    )

    features = np.array([imager(diagram) for diagram in diagrams])
    min_vals = features.min(axis=1, keepdims=True)
    max_vals = features.max(axis=1, keepdims=True)
    denom = np.clip(max_vals - min_vals, 1e-12, None)
    normalized = (features - min_vals) / denom
    return diagrams, normalized


diagrams, persistence_features = build_persistence_features(
    data,
    PERSISTENCE_DIRECTION,
    PERSISTENCE_ENDPOINT,
    PERSISTENCE_RESOLUTION,
)
print(persistence_features.shape)


In [None]:
kmeans = KMeans(
    n_clusters=N_CLUSTERS,
    init="random",
    n_init=100,
    max_iter=300,
    tol=1e-4,
    random_state=0,
)
cluster_labels = kmeans.fit_predict(persistence_features)
cluster_labels[:10]


In [None]:
def plot_cluster_confusion(y_true, y_pred, subject_names):
    n_classes = len(subject_names)
    n_clusters = len(np.unique(y_pred))
    matrix = np.zeros((n_classes, n_clusters), dtype=int)

    for class_idx in range(n_classes):
        for cluster_idx in range(n_clusters):
            matrix[class_idx, cluster_idx] = np.sum((y_true == class_idx) & (y_pred == cluster_idx))

    fig, ax = plt.subplots(figsize=(6, 4))
    im = ax.imshow(matrix, cmap="summer")
    ax.set_xticks(range(n_clusters))
    ax.set_xlabel("Cluster")
    ax.set_yticks(range(n_classes))
    ax.set_yticklabels(subject_names)
    ax.set_ylabel("True label")
    ax.set_title("Cluster vs. true label")

    for i in range(n_classes):
        for j in range(n_clusters):
            ax.text(j, i, matrix[i, j], ha="center", va="center", color="black")

    fig.colorbar(im, ax=ax, shrink=0.8, pad=0.02)
    plt.tight_layout()
    return matrix


confusion = plot_cluster_confusion(labels, cluster_labels, SUBJECTS)
confusion


In [None]:
star_indices = np.where(labels == STAR_LABEL)[0]
star_cluster_counts = np.bincount(cluster_labels[star_indices], minlength=N_CLUSTERS)
top_clusters = np.argsort(star_cluster_counts)[-2:][::-1]

star_group_a = np.where((labels == STAR_LABEL) & (cluster_labels == top_clusters[0]))[0]
star_group_b = np.where((labels == STAR_LABEL) & (cluster_labels == top_clusters[1]))[0]

print("Top clusters for 'star':", top_clusters)
print("Group A size:", len(star_group_a))
print("Group B size:", len(star_group_b))


In [None]:
def plot_signal_group(ax, dataset, indices, title, color):
    if len(indices) == 0:
        ax.set_title(f"{title} (empty)")
        ax.axis("off")
        return
    subset = dataset[indices[: min(len(indices), 9)]]
    ax.plot(subset.T, color=color, alpha=0.3)
    ax.set_title(f"{title} (n={len(indices)})")
    ax.set_xlabel("Sample index (θ)")
    ax.set_ylabel("Radius")


fig, axes = plt.subplots(1, 2, figsize=(10, 4), sharey=True)
plot_signal_group(axes[0], data, star_group_a, "Star cluster A", "tomato")
plot_signal_group(axes[1], data, star_group_b, "Star cluster B", "royalblue")
plt.tight_layout()
plt.show()


In [None]:
def harmonic_amplitude(dataset, indices, harmonic):
    if len(indices) == 0:
        return np.array([])
    coeff = np.abs(fft(dataset[indices], axis=1))[:, harmonic]
    return coeff / dataset.shape[1]


harmonic_index = 5
amp_a = harmonic_amplitude(data, star_group_a, harmonic_index)
amp_b = harmonic_amplitude(data, star_group_b, harmonic_index)

fig, axes = plt.subplots(1, 2, figsize=(12, 4))

axes[0].scatter(
    np.ones_like(amp_a) + 0.05 * np.random.randn(len(amp_a)),
    amp_a,
    color="tomato",
    alpha=0.6,
    label="Cluster A",
)
axes[0].scatter(
    2 * np.ones_like(amp_b) + 0.05 * np.random.randn(len(amp_b)),
    amp_b,
    color="royalblue",
    alpha=0.6,
    label="Cluster B",
)
axes[0].set_xticks([1, 2])
axes[0].set_xticklabels(["Cluster A", "Cluster B"])
axes[0].set_ylabel(f"Harmonic {harmonic_index} amplitude")
axes[0].set_title("Per-sample amplitudes")
axes[0].legend()

axes[1].hist(amp_a, bins=15, alpha=0.6, color="tomato", density=True, label="Cluster A")
axes[1].hist(amp_b, bins=15, alpha=0.6, color="royalblue", density=True, label="Cluster B")
axes[1].set_xlabel(f"Harmonic {harmonic_index} amplitude")
axes[1].set_ylabel("Density")
axes[1].set_title("Amplitude distribution")
axes[1].legend()

plt.tight_layout()
plt.show()


In [None]:
def recolor_star_for_display(img_rgb, *, cmap_name="Set3", color_idx=0, alpha=0.9, edge=False, ax=None):
    img = img_rgb.astype(np.float32).copy()
    if img.max() > 1.0:
        img /= 255.0

    gray = rgb2gray(img)
    thr = threshold_otsu(gray)
    mask = gray < thr

    cmap = plt.colormaps[cmap_name]
    color = np.array(cmap(color_idx % cmap.N)[:3], dtype=np.float32).reshape(1, 1, 3)
    img[mask] = (1 - alpha) * img[mask] + alpha * color

    if edge and ax is not None:
        contours = measure.find_contours(mask.astype(float), 0.5)
        for cnt in contours:
            ax.plot(cnt[:, 1], cnt[:, 0], lw=1.2, color="k", alpha=0.8)

    return img


def to_rgb(arr):
    arr = np.asarray(arr)
    if arr.ndim == 2:
        arr = np.stack([arr] * 3, axis=-1)
    if arr.ndim == 3 and arr.shape[2] == 4:
        arr = arr[..., :3]
    return arr


def draw_leaf(path):
    img = mpimg.imread(path)
    if img.dtype not in (np.float32, np.float64):
        img = img.astype(np.float32) / 255.0
    h, w = img.shape[:2]
    return img, (w / 2.0, h / 2.0)


def cohens_d(x, y):
    x = np.asarray(x)
    y = np.asarray(y)
    if len(x) < 2 or len(y) < 2:
        return np.nan
    sx = np.var(x, ddof=1)
    sy = np.var(y, ddof=1)
    sp = np.sqrt(((len(x) - 1) * sx + (len(y) - 1) * sy) / (len(x) + len(y) - 2))
    return (np.mean(y) - np.mean(x)) / sp if sp > 0 else np.nan


def measure_with_regionprops(img_rgb, *, spacing=None):
    gray = rgb2gray(img_rgb)
    thr = threshold_otsu(gray)
    mask = gray < thr
    lab = sk_label(mask)
    if lab.max() == 0:
        return None

    props = [
        "label",
        "area",
        "area_bbox",
        "area_convex",
        "area_filled",
        "axis_major_length",
        "axis_minor_length",
        "eccentricity",
        "equivalent_diameter_area",
        "euler_number",
        "extent",
        "feret_diameter_max",
        "orientation",
        "perimeter_crofton",
        "perimeter",
        "solidity",
        "centroid",
        "centroid_local",
    ]

    table = regionprops_table(lab, intensity_image=gray, spacing=spacing, properties=props)
    df_props = pd.DataFrame(table)
    if df_props.empty:
        return None

    row = df_props.loc[df_props["area"].idxmax()].to_dict()

    result = {k: (float(v) if np.isscalar(v) else v) for k, v in row.items()}
    result["eq_radius"] = float(np.sqrt(result["area"] / np.pi))
    orientation = result.get("orientation", 0.0)
    result["angle_deg"] = float(np.degrees(orientation) % 180.0)

    h, w = img_rgb.shape[:2]
    tcx, tcy = w / 2.0, h / 2.0
    cx = result.get("centroid-1", np.nan)
    cy = result.get("centroid-0", np.nan)
    off_x = cx - tcx
    off_y = cy - tcy
    result["off_x"] = float(off_x)
    result["off_y"] = float(off_y)
    result["r_off"] = float(np.hypot(off_x, off_y))
    return result


def summarize_and_test(df, metric, *, group_col="group", groups=("cluster_a", "cluster_b")):
    left = df.loc[df[group_col] == groups[0], metric].dropna().values
    right = df.loc[df[group_col] == groups[1], metric].dropna().values
    if len(left) < 2 or len(right) < 2:
        return pd.Series({
            "left_mean": np.nan,
            "left_std": np.nan,
            "right_mean": np.nan,
            "right_std": np.nan,
            "diff_right_minus_left": np.nan,
            "p_value": np.nan,
            "cohens_d": np.nan,
        })

    t_stat, p_val = ttest_ind(right, left, equal_var=False)
    series = pd.Series({
        "left_mean": left.mean(),
        "left_std": left.std(ddof=1),
        "right_mean": right.mean(),
        "right_std": right.std(ddof=1),
        "diff_right_minus_left": right.mean() - left.mean(),
        "p_value": p_val,
        "cohens_d": cohens_d(left, right),
    })
    return series


In [None]:
STAR_DIR = os.path.join(SHAPES_ROOT, "star")
star_samples_a = np.sort(sample_ids[star_group_a])[:9]
star_samples_b = np.sort(sample_ids[star_group_b])[:9]

fig = plt.figure(figsize=(10, 10))
outer = gridspec.GridSpec(2, 1, height_ratios=[1, 1])

records = []

inner_a = gridspec.GridSpecFromSubplotSpec(3, 3, subplot_spec=outer[0])
for ax_idx, sample in enumerate(star_samples_a):
    ax = fig.add_subplot(inner_a[ax_idx])
    path = os.path.join(STAR_DIR, f"{int(sample)}.png")
    img, _ = draw_leaf(path)
    img = to_rgb(img)
    disp = recolor_star_for_display(img, cmap_name="Set3", color_idx=0, alpha=0.95, edge=True, ax=ax)
    ax.imshow(disp)
    ax.set_title(f"A-{int(sample)}")
    ax.axis("off")

    metrics = measure_with_regionprops(img)
    if metrics:
        metrics.update(group="cluster_a", sample=int(sample))
        records.append(metrics)

inner_b = gridspec.GridSpecFromSubplotSpec(3, 3, subplot_spec=outer[1])
for ax_idx, sample in enumerate(star_samples_b):
    ax = fig.add_subplot(inner_b[ax_idx])
    path = os.path.join(STAR_DIR, f"{int(sample)}.png")
    img, _ = draw_leaf(path)
    img = to_rgb(img)
    disp = recolor_star_for_display(img, cmap_name="Set3", color_idx=4, alpha=0.95, edge=True, ax=ax)
    ax.imshow(disp)
    ax.set_title(f"B-{int(sample)}")
    ax.axis("off")

    metrics = measure_with_regionprops(img)
    if metrics:
        metrics.update(group="cluster_b", sample=int(sample))
        records.append(metrics)

plt.tight_layout()
plt.show()

metrics_df = pd.DataFrame(records)
metrics_df.head()


In [None]:
metrics_to_report = [
    "eq_radius",
    "axis_major_length",
    "axis_minor_length",
    "eccentricity",
    "solidity",
    "extent",
    "perimeter",
    "perimeter_crofton",
    "feret_diameter_max",
    "equivalent_diameter_area",
    "euler_number",
    "r_off",
    "angle_deg",
]

summary = pd.concat(
    [summarize_and_test(metrics_df, metric) for metric in metrics_to_report],
    axis=1,
).T
summary = summary.sort_values("p_value")
summary.round(3)
