# 玻尔兹曼机

玻尔兹曼机是一种基于能量的模型, 它的联合概率分布函数为
$$P(x) = \frac{\exp(-E(x))}{Z}$$

其中 $E(x)$ 是能量函数，$Z$ 是确保 $\sum_x P(x) = 1$ 的配分函数。


### 受限玻尔兹曼机（RBM）
受限玻尔兹曼机其联合概率分布:
$$
P(v = \mathbf{v}, h = \mathbf{h}) = \frac{1}{Z} \exp(-E(\mathbf{v}, \mathbf{h})).
$$

受限玻尔兹曼机的能量函数：

$$
E(\mathbf{v}, \mathbf{h}) = -\mathbf{b}^\top \mathbf{v} - \mathbf{c}^\top \mathbf{h} - \mathbf{v}^\top W \mathbf{h},
$$

其中 $Z$ 是被称为配分函数的归一化常数：

$$
Z = \sum_{\mathbf{v}} \sum_{\mathbf{h}} \exp\{-E(\mathbf{v}, \mathbf{h})\}
$$

下图为RBM的简单示意图：受限玻尔兹曼机是基于二分图的无向图模型

<img src="RBM示意图.png"  width="380" height="230">





In [19]:
# RBM代码实现
import torch
import torch.nn as nn
import torch.nn.functional as F

# 定义RBM类

class RBM(nn.Module):
    def __init__(self, n_vis, n_hin):
        super(RBM, self).__init__()
        # n_vis: 可见层单元数
        # n_hin: 隐藏层单元数
        self.n_vis = n_vis
        self.n_hin = n_hin

        # 权重矩阵，初始化为正态分布
        self.W = nn.Parameter(torch.randn(n_hin, n_vis) * 1e-2)
        self.v_bias = nn.Parameter(torch.zeros(n_vis))
        self.h_bias = nn.Parameter(torch.zeros(n_hin))

    def sample_h(self, v):
        # 计算隐藏层的概率
        wx = torch.matmul(v, self.W.t())
        activation = wx + self.h_bias
        p_h_given_v = torch.sigmoid(activation)
        return p_h_given_v, torch.bernoulli(p_h_given_v)

    def sample_v(self, h):
        # 计算可见层的概率
        wx = torch.matmul(h, self.W)
        activation = wx + self.v_bias
        p_v_given_h = torch.sigmoid(activation)
        return p_v_given_h, torch.bernoulli(p_v_given_h)

    def forward(self, v):
        # 单次 Gibbs 采样
        p_h, h = self.sample_h(v)
        p_v, v = self.sample_v(h)
        return v

    def contrastive_divergence(self, input_data, lr=0.01, k=1):
        # 正向传播
        p_h_positive, h_positive = self.sample_h(input_data)

        # 负向传播
        v_k = input_data
        for _ in range(k):
            p_h, h = self.sample_h(v_k)
            p_v, v_k = self.sample_v(h)

        p_h_negative, _ = self.sample_h(v_k)

        # 计算梯度
        positive_grad = torch.matmul(p_h_positive.t(), input_data)
        negative_grad = torch.matmul(p_h_negative.t(), v_k)

        # 更新权重和偏置
        self.W.data += lr * (positive_grad - negative_grad) / input_data.size(0)
        self.v_bias.data += lr * torch.mean(input_data - v_k, dim=0)
        self.h_bias.data += lr * torch.mean(p_h_positive - p_h_negative, dim=0)

        # 计算重构误差
        loss = torch.mean((input_data - v_k) ** 2)
        return loss


### 深度信念网络（DBN）

深度信念网络是具有若干潜变量层的生成模型。潜变量通常是二值的，而可见单元可以是二值或实数， DBN概率分布为下列公式

$$
P(h^{(l)}, h^{(l-1)}) \propto \exp \left( b^{(l)^\top} h^{(l)} + b^{(l-1)^\top} h^{(l-1)} + h^{(l-1)^\top} W^{(l)} h^{(l)} \right),
$$

$$
P(h^{(k)}_i = 1 \mid h^{(k+1)}) = \sigma \left( b^{(k)}_i + W^{(k+1)^\top}_{:, i} h^{(k+1)} \right) \quad \forall i, \forall k \in 1, \dots, l-2,
$$

$$
P(v_i = 1 \mid h^{(1)}) = \sigma \left( b^{(0)}_i + W^{(1)^\top}_{:, i} h^{(1)} \right) \quad \forall i.
$$

已下为DBN示意图：深度信念网络是涉及有向和无向连接的混合图模型

<img src="DBN示意图.png"  width="380" height="300">








In [27]:
# DBN代码
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import numpy as np

# 定义DBN类
class DBN(nn.Module):
    def __init__(self, layers):
        super(DBN, self).__init__()
        self.layers = nn.ModuleList()
        for i in range(len(layers)-1):
            self.layers.append(RBM(layers[i], layers[i+1]))

    def pretrain(self, train_loader, lr=0.01, k=1, epochs=10):
        input_data = None
        for i, rbm in enumerate(self.layers):
            print(f"Pretraining RBM layer {i+1}/{len(self.layers)}")
            for epoch in range(epochs):
                epoch_loss = 0
                for batch, _ in train_loader:
                    batch = batch.view(batch.size(0), -1)
                    if input_data is not None:
                        with torch.no_grad():
                            batch = input_data(batch)
                    # 确保概率值在 [0,1] 之间
                    batch = torch.clamp(batch, 0, 1)
                    batch = batch.bernoulli()  # 二值化
                    loss = rbm.contrastive_divergence(batch, lr=lr, k=k)
                    epoch_loss += loss.item()
                print(f"Epoch {epoch+1}, Loss: {epoch_loss / len(train_loader)}")
            # 设置下一层的输入为当前 RBM 的隐藏层输出
            input_data = lambda x, rbm=rbm: torch.sigmoid(torch.matmul(x, rbm.W.t()) + rbm.h_bias)

    def forward(self, x):
        for rbm in self.layers:
            p_h, h = rbm.sample_h(x)
            x = h
        return x

def train_dbn():
    batch_size = 64
    lr = 0.01
    k = 1
    epochs = 5
    layers = [784, 500, 200]  # 输入层和两层隐藏层

    # 数据预处理
    transform = transforms.Compose([
        transforms.ToTensor(),
    ])

    train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    # 初始化 DBN
    dbn = DBN(layers)

    # 预训练
    dbn.pretrain(train_loader, lr=lr, k=k, epochs=epochs)
    print("Pretraining completed.")

if __name__ == "__main__":
    train_dbn()





Pretraining RBM layer 1/2
Epoch 1, Loss: 0.147926887914316
Epoch 2, Loss: 0.10845467882719376
Epoch 3, Loss: 0.09534132804697765
Epoch 4, Loss: 0.08743825155312318
Epoch 5, Loss: 0.0820729368801183
Pretraining RBM layer 2/2
Epoch 1, Loss: 0.3485538379342825
Epoch 2, Loss: 0.28871215371561965
Epoch 3, Loss: 0.27034404956455677
Epoch 4, Loss: 0.259960687522695
Epoch 5, Loss: 0.25401915649551826
Pretraining completed.


### 深度玻尔兹曼机（DBM）

深度玻尔兹曼机包含一个可见层 $v$ 和三个隐藏层 $h^{(1)}, h^{(2)}$ 和 $h^{(3)}$ 的情况下，联合概率为下面公式：

$$
P(v, h^{(1)}, h^{(2)}, h^{(3)}) = \frac{1}{Z(\theta)} \exp \left( -E(v, h^{(1)}, h^{(2)}, h^{(3)}; \theta) \right).
$$

深度玻尔兹曼机的能量函数为：
$$
E(v, h^{(1)}, h^{(2)}, h^{(3)}; \theta) = -v^\top W^{(1)} h^{(1)} - h^{(1)\top} W^{(2)} h^{(2)} - h^{(2)\top} W^{(3)} h^{(3)}.
$$

下图为深度玻尔兹曼机示意图：深度玻尔兹曼机是具有几层潜变量的无向图模型

<img src="DBM示意图.png"  width="380" height="300">

In [26]:

# DBM代码
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import numpy as np


# 定义DBM类
class DBM(nn.Module):
    def __init__(self, layers):
        super(DBM, self).__init__()
        self.layers = nn.ModuleList()
        for i in range(len(layers)-1):
            self.layers.append(RBM(layers[i], layers[i+1]))
        self.n_layers = len(self.layers)

    def pretrain(self, train_loader, lr=0.01, k=1, epochs=5):
        # 逐层预训练每个RBM
        input_data = None
        for i, rbm in enumerate(self.layers):
            print(f"Pretraining RBM layer {i+1}/{self.n_layers}")
            for epoch in range(epochs):
                epoch_loss = 0
                for batch, _ in train_loader:
                    batch = batch.view(batch.size(0), -1)
                    if input_data is not None:
                        with torch.no_grad():
                            batch = input_data(batch)
                    batch = torch.clamp(batch, 0, 1)
                    batch = batch.bernoulli()  # 二值化
                    loss = rbm.contrastive_divergence(batch, lr=lr, k=k)
                    epoch_loss += loss.item()
                print(f"Epoch {epoch+1}, Loss: {epoch_loss / len(train_loader)}")
            # 设置下一层的输入为当前 RBM 的隐藏层输出
            def new_input(x, rbm=rbm):
                p_h, h = rbm.sample_h(x)
                return p_h
            input_data = new_input

    def forward(self, x):
        for rbm in self.layers:
            p_h, h = rbm.sample_h(x)
            x = h
        return x

    def gibbs_sampling(self, v0, steps=1):
        # 在DBM中执行多步Gibbs采样
        vk = v0
        for step in range(steps):
            for rbm in self.layers:
                p_h, h = rbm.sample_h(vk)
                p_v, vk = rbm.sample_v(h)
        return vk

def train_dbm():
    batch_size = 64
    lr = 0.01
    k = 1
    epochs = 5
    layers = [784, 500, 200]

    # 数据预处理
    transform = transforms.Compose([
        transforms.ToTensor(),
    ])

    train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    # 初始化 DBM
    dbm = DBM(layers)

    # 预训练
    dbm.pretrain(train_loader, lr=lr, k=k, epochs=epochs)
    print("Pretraining completed.")

if __name__ == "__main__":
    train_dbm()


Pretraining RBM layer 1/2
Epoch 1, Loss: 0.14769503898394387
Epoch 2, Loss: 0.1083591301510456
Epoch 3, Loss: 0.0952612899783959
Epoch 4, Loss: 0.087439653795284
Epoch 5, Loss: 0.08202889150203165
Pretraining RBM layer 2/2
Epoch 1, Loss: 0.348730943763434
Epoch 2, Loss: 0.289149253511988
Epoch 3, Loss: 0.27082059551467264
Epoch 4, Loss: 0.2607529981812434
Epoch 5, Loss: 0.25463259614098555
Pretraining completed.
Pretraining completed.


## 实值数据上的玻尔兹曼机
### Gaussian-Bernoulli RBM

Gaussian-Bernoulli RBM 上定义能量函数：
$$
E(v, h) = \frac{1}{2} v^\top (\beta \odot v) - (v \odot \beta)^\top W h - b^\top h,
$$





In [28]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import numpy as np

class GRBM(nn.Module):
    def __init__(self, n_vis, n_hin, sigma=1.0):
        super(GRBM, self).__init__()
        # n_vis: 可见层单元数
        # n_hin: 隐藏层单元数
        # sigma: 高斯分布的标准差
        self.n_vis = n_vis
        self.n_hin = n_hin
        self.sigma = sigma  # 标准差

        # 权重矩阵，初始化为正态分布
        self.W = nn.Parameter(torch.randn(n_hin, n_vis) * 0.01)
        self.v_bias = nn.Parameter(torch.zeros(n_vis))  
        self.h_bias = nn.Parameter(torch.zeros(n_hin))  

    def sample_h(self, v):
        activation = torch.matmul(v, self.W.t()) + self.h_bias
        p_h_given_v = torch.sigmoid(activation)
        h_sample = torch.bernoulli(p_h_given_v)
        return p_h_given_v, h_sample

    def sample_v(self, h):
        # 对于GRBM，可见层激活为高斯分布
        activation = torch.matmul(h, self.W) + self.v_bias
        p_v_given_h = activation  # 均值为线性激活值
        v_sample = p_v_given_h + torch.randn_like(p_v_given_h) * self.sigma  # 添加高斯噪声
        return p_v_given_h, v_sample

    def forward(self, v):
        # 单次 Gibbs 采样
        p_h, h = self.sample_h(v)
        p_v, v = self.sample_v(h)
        return v

    def contrastive_divergence(self, input_data, lr=0.01, k=1):
        # 正向传播
        p_h_positive, h_positive = self.sample_h(input_data)

        # 负向传播
        v_k = input_data
        for _ in range(k):
            p_h, h = self.sample_h(v_k)
            p_v, v_k = self.sample_v(h)

        # 隐藏层激活概率
        p_h_negative, _ = self.sample_h(v_k)

        # 计算梯度
        positive_grad = torch.matmul(p_h_positive.t(), input_data)
        negative_grad = torch.matmul(p_h_negative.t(), v_k)

        # 更新权重和偏置
        self.W.data += lr * (positive_grad - negative_grad) / input_data.size(0)
        self.v_bias.data += lr * torch.mean(input_data - v_k, dim=0)
        self.h_bias.data += lr * torch.mean(p_h_positive - p_h_negative, dim=0)

        # 计算重构误差
        loss = torch.mean((input_data - v_k) ** 2)
        return loss


### ssRBM

ssRBM的能量函数：

$$
E_{SS}(x, s, h) = -\sum_i x^\top W_{:,i} s_i h_i + \frac{1}{2} x^\top \left( \Lambda + \sum_i \Phi_i h_i \right) x
+ \frac{1}{2} \sum_i \alpha_i s_i^2 - \sum_i \alpha_i \mu_i s_i h_i - \sum_i b_i h_i + \sum_i \alpha_i \mu_i^2 h_i,
$$

其中 $b_i$ 是尖峰 $h_i$ 的偏置，$\Lambda$ 是观测值 $x$ 上的对角精度矩阵。参数 $\alpha_i > 0$ 是实值平板变量 $s_i$ 的标准精度参数。参数 $\Phi_i$ 是定义 $x$ 上的 $h$ 调制二次惩罚的非负对角矩阵。每个 $\mu_i$ 是平板变量 $s_i$ 的均值参数。


In [29]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import numpy as np

class SpikeAndSlabRBM(nn.Module):
    def __init__(self, n_vis, n_hin, sigma=1.0):
        super(SpikeAndSlabRBM, self).__init__()
        self.n_vis = n_vis
        self.n_hin = n_hin
        self.sigma = sigma  

        # 权重矩阵，初始化为正态分布
        self.W = nn.Parameter(torch.randn(n_hin, n_vis) * 0.01)
        # 可学习的偏置
        self.v_bias = nn.Parameter(torch.zeros(n_vis))   # 可见层偏置
        self.s_bias = nn.Parameter(torch.zeros(n_hin))   # 尖峰偏置
        self.z_bias = nn.Parameter(torch.zeros(n_hin))   # 平板偏置

    def sample_s(self, v):
        activation = torch.matmul(v, self.W.t()) + self.s_bias
        p_s = torch.sigmoid(activation)
        s_sample = torch.bernoulli(p_s)
        return p_s, s_sample

    def sample_z(self, v, s):
        # 平板的均值
        mu_z = torch.matmul(v, self.W.t()) + self.z_bias
        # 平板的采样：仅在尖峰s=1时采样，否则为0
        mu_z = mu_z * s
        z_sample = mu_z + torch.randn_like(mu_z) * self.sigma
        return mu_z, z_sample

    def sample_h(self, v):
        p_s, s_sample = self.sample_s(v)
        mu_z, z_sample = self.sample_z(v, s_sample)
        h_sample = z_sample  # 隐藏层的最终采样结果
        return p_s, s_sample, h_sample

    def sample_v(self, h):
        activation = torch.matmul(h, self.W) + self.v_bias
        p_v = activation  # 可见层的均值
        v_sample = p_v + torch.randn_like(p_v) * self.sigma  # 添加高斯噪声
        return p_v, v_sample

    def forward(self, v):
        #单次 Gibbs 采样
        p_s, s, h = self.sample_h(v)
        p_v, v = self.sample_v(h)
        return v

    def contrastive_divergence(self, input_data, lr=0.01, k=1):
        # 正向传播（Positive Phase）
        p_s_pos, s_pos, h_pos = self.sample_h(input_data)

        # 负向传播（Negative Phase）
        v_neg = input_data
        for _ in range(k):
            p_s_neg, s_neg, h_neg = self.sample_h(v_neg)
            p_v_neg, v_neg = self.sample_v(h_neg)

        # 隐藏层激活概率（负向传播后）
        p_s_neg, s_neg, h_neg = self.sample_h(v_neg)

        # 计算梯度
        positive_grad = torch.matmul(p_s_pos.t(), input_data)
        negative_grad = torch.matmul(p_s_neg.t(), v_neg)

        # 更新权重和偏置
        self.W.data += lr * (positive_grad - negative_grad) / input_data.size(0)
        self.v_bias.data += lr * torch.mean(input_data - v_neg, dim=0)
        self.s_bias.data += lr * torch.mean(p_s_pos - p_s_neg, dim=0)
        self.z_bias.data += lr * torch.mean(torch.matmul(p_s_pos.t(), input_data) - torch.matmul(p_s_neg.t(), v_neg), dim=0)

        # 计算重构误差
        loss = torch.mean((input_data - v_neg) ** 2)
        return loss
