In [None]:
import math
import torch
import torch.nn as nn

In [None]:
class VectorQuantizer(nn.Module):
    """Some Information about VectorQuantizer"""
    def __init__(self, num_embedding, embedding_dim, commitment_cost):
        super().__init__()
        self.num_embedding = num_embedding
        self.embedding_dim = embedding_dim
        self.commitment_cost = commitment_cost # * 表示约束的力度
        
        # * 生成embedding space: [num_embedding, embedding_dim], 并赋予权重初值（均匀分布）
        self.embedding_space = nn.Embedding(self.num_embedding, self.embedding_dim)
        self.embedding_space.weight.data.uniform_(-1 / self.num_embedding, 1 / self.num_embedding)
        self._mse_loss = nn.MSELoss()

    def forward(self, inputs):
        
        # * (BCHW) -> (BHWC)
        inputs = inputs.permute(0, 2, 3, 1).contiguous()
        inputs_shape = inputs.shape
        
        # * (BHWC) -> (B*H*W, C) = (B*H*W, embedding_dim)
        flaten_input = inputs.view(-1, self.embedding_dim)
        
        
        # ! 计算quantized
        # * torch.sum(flaten_input ** 2, dim=1, keepdim=True) -> (B*H*W, 1)，在通道（channel）维度上做加法，将每个像素的所有通道值加起来
        # * torch.sum(self.embedding_space.weight ** 2, dim=1) -> (num_embedding,)，同样也在通道（channel）维度上做加法，将每个嵌入向量的所有维度值加起来
        # * distances -> (B*H*W, num_embedding)，相当于得到了一个距离矩阵，每行表示每个输入的像素与所有嵌入向量之间的欧式距离
        distances = (torch.sum(flaten_input ** 2, dim=1, keepdim=True) + torch.sum(self.embedding_space.weight ** 2, dim=1) - 2 * flaten_input @ self.embedding_space.weight.transpose(1, 0))
        
        # * encoder_indices -> (B*H*W, 1)
        # * encoder -> (B*H*W, num_embedding)
        # * 利用scatter_生成一个one-hot的encoder
        encoder_indices = torch.argmin(distances, dim=1, keepdim=True)
        encoder = torch.zeros((encoder_indices.shape[0], self.num_embedding))
        encoder.scatter_(dim=1, index=encoder_indices, src=1)
        
        # * (B*H*W, embedding_dim) -> (B, H, W, C=embedding_dim)
        quantized = encoder @ self.embedding_space.weight
        quantized = quantized.view(inputs_shape)
        
        
        
        # * e_latent_loss在反向传播的过程中，影响inputs
        # * q_latent_loss在反向传播的过程中，影响quantized
        e_latent_loss = self._mse_loss(quantized.detach(), inputs) # * 实际上就commitment loss
        q_latent_loss = self._mse_loss(quantized, inputs.detach()) # * 这个部分的loss会直接作用于inputs的梯度，用于更新codebook的嵌入向量
        loss = q_latent_loss + self.commitment_cost * e_latent_loss
        
        # * 让quantized与inputs发生计算关系，但是保持了quantized数值不变(前向传播)，这样梯度就保留到了quantized
        # * (quantized - inputs).detach()相当于常数，通过常数让编码器(encoder)和解码器(decoder)可导
        # * 让BP过程只更新inputs的梯度，quantized通过STE的方式更新
        # * BP过程中，quantized梯度就等于inputs的梯度， quantized = inputs + C，C是常数
        quantized = inputs + (quantized - inputs).detach() # * 伪梯度传递技巧
        
        # * av_prob -> (num_embedding,), 每个嵌入向量的平均使用概率
        avg_probs = torch.mean(encoder, dim=0)
        # * 困惑度: 度量嵌入空间中使用的不同嵌入向量的多样性，值越大表示更多嵌入向量被使用
        perplexity = torch.exp(-torch.sum(avg_probs * torch.log(avg_probs + 1e-10)))
        
        
        # * quantized (BHWC) -> (BCHW)
        return loss, quantized.permute(0, 2, 3, 1), avg_probs, perplexity