# 下面的代码是用来学习残差量化的

In [11]:
import numpy as np
from sklearn.cluster import KMeans

## 读取item embedding

In [7]:
item_embeddings = np.load('/Users/althealam/Desktop/Code/RQ-VAE-Recommendation-System/data/embeddings/item_embeddings.npy')
print(f"加载物品嵌入: 数量={item_embeddings.shape[0]}, 维度={item_embeddings.shape[1]}")
# 每一行是一个样本，每一列是一个特征

加载物品嵌入: 数量=3706, 维度=128


## 设置量化层数和每层的码本大小

In [8]:
num_layers = 2 # 量化层数
codebook_size = 256  # 每层聚类中心数量

## 定义码本和码本索引
* 码本：存储可以近似原始嵌入的原型向量
* 码本索引：每个样本在每一层选择的码本索引

In [None]:
residual = item_embeddings.copy()

# 1. 用于存储可以近似原始嵌入的原型向量
all_codebooks = [] # codebooks里面的连续向量集合（浮点数矩阵），形状为(num_codebooks, codebook_size, embedding_dim_per_codebook)
# num_codebooks：残差量化层数 codebook_size：每个码书里面向量的数量 embedding_dim_per_codebook：每个向量的维度

# 2. 每个样本在每一层选择的码本索引（离散向量）
# 告诉你在第i层码书里，每个样本选择了哪一个向量
# 例如：all_codes[0, 2]=5表示样本0在第三层选中了码书里索引为5的向量
all_codes = [] # 整数索引矩阵，形状为(num_samples, num_codebooks)，其中num_samples为样本数量，num_codebooks为残差量化层数

## 残差量化
1. 步骤
* 假设我们有一个连续向量x（item embedding）
* 用多个码本来近似这个向量：x=c1[x1]+c2[x2]+...+cn[xn]，其中ci表示第i个码本（里面有K个连续向量），zi表示选择的第i个码本索引（离散值），表示用哪一个中心向量
2. KMeans
* 将连续残差向量映射到离散码字
* 降低信息损失：每一层量化只负责一小部分残差，这样总残差的损失最小
* 分层编码：多层码字组合，可以表示更高精度的向量

In [None]:
for layer in range(num_layers):
    # 对当前残差做K—Means聚类
    kmeans = KMeans(n_clusters = codebook_size, random_state=42).fit(residual)
    codebook = kmeans.cluster_centers_
    codes = kmeans.predict(residual) # 当前层码本
    all_codebooks.append(codebook) # 每个embedding的索引
    all_codes.append(codes)

    # 更新残差
    residual-=codebook[codes]

## 重建embedding

In [15]:
reconstructed = np.zeros_like(item_embeddings)
for layer in range(num_layers):
    reconstructed += all_codebooks[layer][all_codes[layer]]

## 打印重建损失

In [16]:
# 打印重建误差
mse = np.mean((item_embeddings - reconstructed) ** 2)
print(f"重建均方误差: {mse:.6f}")

重建均方误差: 0.009981
