# Report on Reverse Engineering the Mechanism of Wechat Red Envelop

## 5. Generative Modeling
For this section, we modify the diffusion model so as to learn the latent distribution behind the data and hence generate new data. The code supports running with GPU and checkpoint saving. The code is partially borrowed from the open tutorial resource published on github, and partial debugging is under the assistance of ChatGPT. We run the code on GPU of NVIDIA GeForce RTX 4070 Laptop, with the CPU of 13th Gen Intel(R) Core(TM) i9-13980HX.

### 5.2 Code Implementaion

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

data = np.load('raw_data_1.npy')
dataset = torch.Tensor(data).float()
# 超参数设置
num_steps = 100  # 扩散过程的时间步数
batch_size = 256  # 每次训练的批量大小

# 创建 beta 调度，模拟噪声的逐步变化
betas = torch.linspace(-6, 6, num_steps)
betas = torch.sigmoid(betas) * (0.5e-2 - 1e-5) + 1e-5

# 计算 alpha 和相关变量
alphas = 1 - betas
alphas_prod = torch.cumprod(alphas, dim=0)
alphas_prod_p = torch.cat([torch.tensor([1]).float(), alphas_prod[:-1]], 0)
alphas_bar_sqrt = torch.sqrt(alphas_prod)
one_minus_alphas_bar_sqrt = torch.sqrt(1 - alphas_prod)

# 扩散过程中的加噪声函数
def q_x(x_0, t):
    noise = torch.randn_like(x_0)  # 从标准正态分布中生成噪声
    alphas_t = alphas_bar_sqrt[t]
    alphas_l_m_t = one_minus_alphas_bar_sqrt[t]
    return alphas_t * x_0 + alphas_l_m_t * noise

def save_checkpoint(model, optimizer, epoch, loss, filename):
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': loss,
    }
    torch.save(checkpoint, filename)
    print(f"Checkpoint saved to {filename}")

def load_checkpoint(model, optimizer, filename):
    checkpoint = torch.load(filename)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    epoch = checkpoint['epoch']
    loss = checkpoint['loss']
    print(f"Checkpoint loaded from {filename}")
    return epoch, loss


# 创建一个多层感知机模型，用于拟合噪声预测
class MLPDiffusion(nn.Module):
    def __init__(self, n_steps, num_units=128):
        super(MLPDiffusion, self).__init__()
        self.linears = nn.ModuleList([
            nn.Linear(3, num_units),
            nn.ReLU(),
            nn.Linear(num_units, num_units),
            nn.ReLU(),
            nn.Linear(num_units, num_units),
            nn.ReLU(),
            nn.Linear(num_units, 3),  # 输出3个维度 (x1, x2, x3)
        ])
        self.step_embeddings = nn.ModuleList([
            nn.Embedding(n_steps, num_units),
            nn.Embedding(n_steps, num_units),
            nn.Embedding(n_steps, num_units)
        ])

    def forward(self, x, t):
        for idx, embedding_layer in enumerate(self.step_embeddings):
            t_embedding = embedding_layer(t)
            x = self.linears[2 * idx](x)
            x += t_embedding
            x = self.linears[2 * idx + 1](x)
        return self.linears[-1](x)

# 损失函数：计算模型预测的噪声与真实噪声之间的均方误差
def diffusion_loss_fn(model, x_0, alphas_bar_sqrt, one_minus_alphas_bar_sqrt, n_steps):
    batch_size = x_0.shape[0]

    # 随机采样一个时刻 t
    t = torch.randint(0, n_steps, size=(batch_size,))
    t = t.unsqueeze(-1)

    # 计算扩散过程中的 x(t)
    a = alphas_bar_sqrt[t]
    e = torch.randn_like(x_0)
    aml = one_minus_alphas_bar_sqrt[t]

    # 加噪声的输入
    x = x_0 * a + e * aml

    # 送入模型得到预测的噪声
    t = t.to(x.device)
    output = model(x, t.squeeze(-1))

    # 计算损失：与真实噪声 e 的均方误差
    return (e - output).square().mean()

# 训练模型的函数
def train(model, dataset, optimizer, alphas_bar_sqrt, one_minus_alphas_bar_sqrt, num_epochs=4000):
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
    for epoch in range(num_epochs):
        for batch_x in dataloader:
            # 清空梯度
            optimizer.zero_grad()

            # 计算损失
            loss = diffusion_loss_fn(model, batch_x, alphas_bar_sqrt, one_minus_alphas_bar_sqrt, num_steps)

            # 反向传播
            loss.backward()
            optimizer.step()

        if epoch % 100 == 0:
            print(f"Epoch {epoch}/{num_epochs}, Loss: {loss.item()}")
    return loss

# 采样函数：通过逆扩散生成数据
def p_sample_loop(model, shape, n_steps, betas, one_minus_alphas_bar_sqrt):
    cur_x = torch.randn(shape)
    x_seq = [cur_x]
    for i in reversed(range(n_steps)):
        cur_x = p_sample(model, cur_x, i, betas, one_minus_alphas_bar_sqrt)
        x_seq.append(cur_x)
    return x_seq

def p_sample(model, x, t, betas, one_minus_alphas_bar_sqrt):
    t = torch.tensor(t).cuda()
    x = torch.tensor(x).cuda()
    coeff = betas[t] / one_minus_alphas_bar_sqrt[t]
    eps_theta = model(x, t)
    mean = (1 / torch.sqrt(1 - betas[t])) * (x - (coeff * eps_theta))
    z = torch.randn_like(x)
    sigma_t = torch.sqrt(betas[t])
    sample = mean + sigma_t * z
    return sample

# 初始化模型、优化器和训练
model = MLPDiffusion(num_steps, num_units=128).cuda()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
num_steps = 100
filename = 'checkpoint0.ckpt'
# 训练模型
loss = train(model, dataset.cuda(), optimizer, alphas_bar_sqrt.cuda(), one_minus_alphas_bar_sqrt.cuda(), num_steps)

save_checkpoint(model, optimizer, num_steps, loss, filename)

# 生成样本
load_checkpoint(model, optimizer, 'checkpoint0.ckpt')
generated_samples = p_sample_loop(model, (1000, 3), num_steps, betas, one_minus_alphas_bar_sqrt)

# 可视化生成的样本
generated_samples = generated_samples[-1].cpu().detach().numpy()
np.save('toy.npy', generated_samples)