In [6]:
import plotly.express as px
import numpy as np
from sklearn.decomposition import PCA

In [7]:
def load_npz_vis(path="Duin_vit_embeddings.npz"):
    data = np.load(path, allow_pickle=True)
    return data["chars"], data["embeddings"], dict(data["meta"])

def load_npz_semantic(path):
    data = np.load(path, allow_pickle=True)
    words = data['words']
    emb_cls = data['emb_cls']
    emb_mean = data['emb_mean']
    emb_max = data['emb_max']
    emb_weighted = data['emb_weighted']
    emb_mixed = data['emb_mixed']
    return words, emb_cls, emb_mean, emb_max, emb_weighted, emb_mixed

In [8]:
import numpy as np
from collections import defaultdict
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import pairwise_distances
from sklearn.model_selection import KFold, GroupKFold, cross_val_score
from sklearn.linear_model import Ridge
from sklearn.kernel_ridge import KernelRidge
from sklearn.cross_decomposition import CCA
from sklearn.cluster import KMeans
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score
from scipy.stats import spearmanr

# ---------- 工具函数 ----------
def zscore(X):
    return StandardScaler(with_mean=True, with_std=True).fit_transform(X)

def label_means(X, labels):
    buckets = defaultdict(list)
    for x, lb in zip(X, labels):
        buckets[lb].append(x)
    uniq = sorted(buckets.keys())
    M = np.stack([np.mean(buckets[lb], axis=0) for lb in uniq], axis=0)
    return M, np.array(uniq)

def rdm_cosine(M):
    # RDM: 1 - cosine_similarity
    return pairwise_distances(M, metric='cosine')

def rsa_spearman(D1, D2):
    # 上三角向量化后 Spearman
    iu = np.triu_indices_from(D1, k=1)
    return spearmanr(D1[iu], D2[iu]).correlation

def center_gram(K):
    n = K.shape[0]
    H = np.eye(n) - np.ones((n, n))/n
    return H.dot(K).dot(H)

def linear_cka(X, Y):
    # X:(n,d1), Y:(n,d2)
    Xc = X - X.mean(0, keepdims=True)
    Yc = Y - Y.mean(0, keepdims=True)
    Kx = Xc.dot(Xc.T)
    Ky = Yc.dot(Yc.T)
    Kxc = center_gram(Kx)
    Kyc = center_gram(Ky)
    hsic = np.trace(Kxc.dot(Kyc))
    norm_x = np.sqrt(np.trace(Kxc.dot(Kxc)))
    norm_y = np.sqrt(np.trace(Kyc.dot(Kyc)))
    return hsic / (norm_x * norm_y + 1e-12)

def kernel_cka(X, Y, kernel='rbf', gamma=None):
    # 简单 RBF-CKA（可补：自适应 gamma）
    def rbf(X, g):
        sq = pairwise_distances(X, metric='sqeuclidean')
        return np.exp(-g * sq)
    if gamma is None:
        # 中位数启发式
        med = np.median(pairwise_distances(X, metric='euclidean'))
        gamma = 1.0 / (2*(med**2 + 1e-12))
    Kx = rbf(X, gamma)
    Ky = rbf(Y, gamma)
    Kxc = center_gram(Kx)
    Kyc = center_gram(Ky)
    hsic = np.trace(Kxc.dot(Kyc))
    norm_x = np.sqrt(np.trace(Kxc.dot(Kxc)))
    norm_y = np.sqrt(np.trace(Kyc.dot(Kyc)))
    return hsic / (norm_x * norm_y + 1e-12)

def cross_pred_r2(X_src, Y_tgt, labels=None, linear=True, alphas=None, n_splits=5):
    if alphas is None:
        alphas = np.logspace(-4, 4, 13)
    if labels is None:
        cv = KFold(n_splits=n_splits, shuffle=True, random_state=0)
    else:
        # 若同一 label 多样本，用 GroupKFold 按 label 分组避免泄漏
        cv = GroupKFold(n_splits=min(n_splits, len(np.unique(labels))))

    # 调 alpha：简单CV选最优
    best_r2 = -np.inf
    best_alpha = None
    for a in alphas:
        if linear:
            model = Ridge(alpha=a)
        else:
            model = KernelRidge(alpha=a, kernel='rbf')
        scores = cross_val_score(model, X_src, Y_tgt, cv=cv, scoring='r2', groups=labels, n_jobs=None)
        mean_r2 = scores.mean()
        if mean_r2 > best_r2:
            best_r2, best_alpha = mean_r2, a

    # 用最佳alpha重评一次并返回
    if linear:
        model = Ridge(alpha=best_alpha)
    else:
        model = KernelRidge(alpha=best_alpha, kernel='rbf')
    scores = cross_val_score(model, X_src, Y_tgt, cv=cv, scoring='r2', groups=labels)
    return scores.mean(), best_alpha

def fit_linear_map_and_residuals(X_src, Y_tgt):
    reg = Ridge(alpha=1.0)
    reg.fit(X_src, Y_tgt)
    Y_hat = reg.predict(X_src)
    resid = np.linalg.norm(Y_tgt - Y_hat, axis=1)
    return reg, resid, Y_hat

def cca_spectrum(X, Y, n_components=20):
    k = min(n_components, X.shape[1], Y.shape[1])
    cca = CCA(n_components=k, max_iter=5000)
    Xc, Yc = zscore(X), zscore(Y)
    U, V = cca.fit_transform(Xc, Yc)
    # 每个维度的皮尔逊相关
    cs = [np.corrcoef(U[:,i], V[:,i])[0,1] for i in range(U.shape[1])]
    return np.array(cs)

def clustering_agreement(M1, M2, k_list=(3,4,5,6,8,10), random_state=0):
    # 对同一组标签的两个表示分别聚类，再比较ARI/NMI
    best = {}
    for k in k_list:
        km1 = KMeans(n_clusters=k, n_init='auto', random_state=random_state).fit(M1)
        km2 = KMeans(n_clusters=k, n_init='auto', random_state=random_state).fit(M2)
        ari = adjusted_rand_score(km1.labels_, km2.labels_)
        nmi = normalized_mutual_info_score(km1.labels_, km2.labels_)
        best[k] = (ari, nmi)
    return best

# ---------- 一键分析主函数 ----------
def compare_embed_spaces(semantic_embeddings, visual_embeddings, words):
    # 1) 标准化
    S = zscore(semantic_embeddings)
    V = zscore(visual_embeddings)

    # 2) 按 label 汇聚（若每词仅1样本，此步等价于原数据）
    S_lbl, uniq = label_means(S, words)
    V_lbl, _ = label_means(V, words)

    # 3) 结构一致性
    D_s = rdm_cosine(S_lbl)
    D_v = rdm_cosine(V_lbl)
    rsa_r = rsa_spearman(D_s, D_v)
    cka_lin = linear_cka(S_lbl, V_lbl)
    cka_rbf = kernel_cka(S_lbl, V_lbl)

    # 4) 相互预测（标签层面）
    r2_s2v_lin, a1 = cross_pred_r2(S_lbl, V_lbl, labels=uniq, linear=True)
    r2_v2s_lin, a2 = cross_pred_r2(V_lbl, S_lbl, labels=uniq, linear=True)
    r2_s2v_krr, a3 = cross_pred_r2(S_lbl, V_lbl, labels=uniq, linear=False)
    r2_v2s_krr, a4 = cross_pred_r2(V_lbl, S_lbl, labels=uniq, linear=False)

    # 5) CCA 光谱
    cca_corrs = cca_spectrum(S_lbl, V_lbl, n_components=20)

    # 6) 残差与“私有性”（线性映射视角）
    reg_s2v, resid_per_label_s2v, _ = fit_linear_map_and_residuals(S_lbl, V_lbl)
    reg_v2s, resid_per_label_v2s, _ = fit_linear_map_and_residuals(V_lbl, S_lbl)
    top_priv_vis = uniq[np.argsort(-resid_per_label_s2v)[:10]]  # 在visual中更“私有”的词
    top_priv_sem = uniq[np.argsort(-resid_per_label_v2s)[:10]]  # 在semantic中更“私有”的词

    # 7) 聚类一致性
    cluster_scores = clustering_agreement(S_lbl, V_lbl)

    return {
        "RSA_spearman": rsa_r,
        "CKA_linear": cka_lin,
        "CKA_rbf": cka_rbf,
        "R2_sem_to_vis_linear": r2_s2v_lin,
        "R2_vis_to_sem_linear": r2_v2s_lin,
        "R2_sem_to_vis_kernel": r2_s2v_krr,
        "R2_vis_to_sem_kernel": r2_v2s_krr,
        "CCA_correlations": cca_corrs,
        "Top_private_in_visual": list(top_priv_vis),
        "Top_private_in_semantic": list(top_priv_sem),
        "Clustering_ARI_NMI_by_k": cluster_scores,
    }


In [None]:
words, visual_embeddings, _ = load_npz_vis("embeddings/Duin_vit_embeddings_vit_per_char.npz")

In [10]:
words, emb_cls, emb_mean, emb_max, emb_weighted, emb_mixed = load_npz_semantic("embeddings/Duin_bert_embeddings.npz")
semantic_embeddings = emb_mean

In [11]:
results = compare_embed_spaces(semantic_embeddings, visual_embeddings, words)
print(results)

  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos",

{'RSA_spearman': 0.00416708033128866, 'CKA_linear': 0.3500815329814558, 'CKA_rbf': 0.47150376005560896, 'R2_sem_to_vis_linear': -0.13421159721397344, 'R2_vis_to_sem_linear': -0.1840309021837571, 'R2_sem_to_vis_kernel': -0.08707818467328332, 'R2_vis_to_sem_kernel': -0.11875667075734304, 'CCA_correlations': array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1.]), 'Top_private_in_visual': ['把', '找', '吃', '喝', '拿', '你', '看', '是', '有', '我'], 'Top_private_in_semantic': ['换药', '预约', '篮球', '猪肉', '蒜泥', '软糖', '脸盆', '平静', '凳子', '钢琴'], 'Clustering_ARI_NMI_by_k': {3: (0.015799522322967368, 0.06908022021380614), 4: (0.010948526427636098, 0.09052604321797145), 5: (-0.013331597229514325, 0.14279868743763754), 6: (0.031832977887529465, 0.23159614500819886), 8: (-0.014989065127626145, 0.1785034142141446), 10: (-0.001354942936057116, 0.2531318162103996)}}
