In [None]:
import numpy as np
import matplotlib.pyplot as plt

def visualize_user_embeddings(x_dict, user_gender_dict, save_path=None,
                              color_map=None, marker_map=None):
    user_embeddings = x_dict["user"].detach().cpu().numpy()
    user_ids = list(user_gender_dict.keys())
    gender_labels = np.array([user_gender_dict[uid] for uid in user_ids])
    user_embeddings = user_embeddings[user_ids]  


    tsne = TSNE(n_components=2, perplexity=30, init='pca', random_state=42)
    emb_2d = tsne.fit_transform(user_embeddings)


    if color_map is None:
        # 5F4BA0
        # F0AA3C
        default_colors = ['#F0AA3C', '#F0AA3C', '#5F4BA0']
        unique_labels = np.unique(gender_labels)
        color_map = {label: default_colors[i % len(default_colors)] for i, label in enumerate(unique_labels)}


    if marker_map is None:
        default_markers = ['o', 'o', 'o', 'D']
        unique_labels = np.unique(gender_labels)
        marker_map = {label: default_markers[i % len(default_markers)] for i, label in enumerate(unique_labels)}

 
    plt.figure(figsize=(6, 5))
    for label in np.unique(gender_labels):
        idx = gender_labels == label
        plt.scatter(
            emb_2d[idx, 0], emb_2d[idx, 1],
            c=color_map[label],
            marker=marker_map[label],
            label=f'Gender {label}',
            alpha=0.8,
            s=8
        )

    if save_path:
        plt.savefig(save_path, dpi=300)
    plt.show()

In [None]:
import torch
from sklearn.manifold import TSNE
import pandas as pd
import numpy as np

def generate_rec_items(user_emb, item_emb, topk=30):
        scores = torch.matmul(user_emb, item_emb.T)  # [n_users, n_items]
        rec_items = torch.topk(scores, k=topk, dim=1).indices.cpu().numpy()
        return rec_items


def exposure_diff_from_recitems(rec_items, user2group, groups=(0, 1), discount=True):
        """
        Compute the exposure disparity between two user groups in the recommendation lists
        rec_items: np.array [n_users, K], the recommended items (internal_id).
        user2group: dict {user_id: group_value}.
        groups: (groupA, groupB), e.g., (0, 1).
        discount: whether to apply DCG-style discounting.
        """
        n_users, K = rec_items.shape
        weights = 1.0 / np.log2(np.arange(2, K + 2)) if discount else np.ones(K)

        # 初始化曝光计数
        exposure_A, exposure_B = {}, {}
        for u in range(n_users):
            g = user2group.get(u, None)
            if g is None:  # 没有分组信息
                continue
            for rank, item in enumerate(rec_items[u]):
                w = weights[rank]
                if g == groups[0]:
                    exposure_A[item] = exposure_A.get(item, 0) + w
                elif g == groups[1]:
                    exposure_B[item] = exposure_B.get(item, 0) + w

        # 转换为向量形式
        all_items = set(exposure_A) | set(exposure_B)
        expA = np.array([exposure_A.get(i, 0) for i in all_items])
        expB = np.array([exposure_B.get(i, 0) for i in all_items])

        # 归一化为分布
        pA = expA / expA.sum() if expA.sum() > 0 else expA
        pB = expB / expB.sum() if expB.sum() > 0 else expB

        # 指标
        tvd = 0.5 * np.abs(pA - pB).sum()
        l1 = np.abs(pA - pB).sum()
        klAB = np.sum(pA * np.log((pA + 1e-12) / (pB + 1e-12)))

        # 差异向量
        delta = pd.Series(pA - pB, index=list(all_items))

        return {"TVD": tvd, "L1": l1, "KL(A||B)": klAB, "delta": delta}