# 项目说明
该项目复现local error训练，通过对深度网络的每一层单独计算准确性，最终实现整体的训练。

这是一种新的网络训练方式，可以调控网络每一层的流形，从而实现可解释性的分析。

文献参考：
* [Deep Supervised Learning Using Local Errors](https://www.frontiersin.org/journals/neuroscience/articles/10.3389/fnins.2018.00608/full)
* [Relationship between manifold smoothness and adversarial vulnerability in deep learning with local errors](https://cpb.iphy.ac.cn/EN/10.1088/1674-1056/abd68e)

# 准备

In [None]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#  数据输入

In [None]:
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# 定义数据转换（将图像转换为Tensor并标准化）
transform = transforms.Compose([
    transforms.ToTensor(),  # 转换为Tensor
    transforms.Normalize((0.5,), (0.5,))  # 标准化（均值0.5，标准差0.5）
])

# 下载和加载训练集和测试集
trainset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
testset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

# 使用DataLoader加载数据
trainloader = DataLoader(trainset, batch_size=64, shuffle=True)
testloader = DataLoader(testset, batch_size=64, shuffle=False)

# 查看训练数据的一部分
dataiter = iter(trainloader)
images, labels = next(dataiter)
print(images.shape)  # 输出形状，应该是[64, 1, 28, 28]，即64张28x28的图像
print(labels.shape)  # 输出标签形状，应该是[64]


# 网络构建

单层网络和粘合多层网络

In [None]:
import torch.nn as nn

class SingleLayerNetwork(nn.Module):
    def __init__(self, input_size, output_size):
        super(SingleLayerNetwork, self).__init__()
        # 定义线性层
        self.fc = nn.Linear(input_size, output_size)
        # 定义ReLU激活函数
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.fc(x)
        x = self.relu(x)
        return x

In [None]:
import copy

class MultiLayerNetwork(nn.Module):
    def __init__(self):
        super(MultiLayerNetwork, self).__init__()
        self.layers = nn.ModuleList()  # 用于存储逐步添加的网络层

    def add(self, layer):
        # 添加已训练好的网络层到ModuleList中
        self.layers.append(copy.deepcopy(layer))


    def forward(self, x, return_intermediate=False, n_layers=None):
        outputs = []
        
        # 逐层计算输出
        for i, layer in enumerate(self.layers):
            x = layer(x)
            if return_intermediate and (n_layers is None or i < n_layers):
                outputs.append(x)
        
        if return_intermediate:
            return outputs
        else:
            return x


# 训练方法
通过读出头训练目标网络

In [None]:
import torch.optim as optim
import torch.nn.functional as F

# 定义读出头网络
class ReadoutHead(nn.Module):
    def __init__(self, input_size, output_size):
        super(ReadoutHead, self).__init__()
        # 初始化权重为高斯分布，且权重不可训练
        self.weight = nn.Parameter(torch.randn(input_size, output_size) * 0.01, requires_grad=False)
        self.bias = nn.Parameter(torch.zeros(output_size), requires_grad=False)

    def forward(self, x):
        # 线性变换：y = xW + b
        return torch.matmul(x, self.weight) + self.bias

In [None]:
# 定义训练流程
def train_with_readout(fixed_network, target_network, readout_head, data_loader, optimizer, criterion, device):
    if fixed_network is not None:
        fixed_network.eval()  # 固定网络不训练
    target_network.train()  # 目标网络训练
    total_loss = 0

    for inputs, labels in data_loader:
        inputs = inputs.view(inputs.shape[0], -1)  # 将图像展平
        inputs, labels = inputs.to(device), labels.to(device)

        # 如果固定网络不为空，数据先通过固定网络（不计算梯度）
        outputs = inputs
        if fixed_network is not None:
            with torch.no_grad():
                outputs = fixed_network(inputs)


        # 数据通过目标网络
        target_outputs = target_network(outputs)

        # 数据通过读出头网络
        logits = readout_head(target_outputs)

        # 计算交叉熵损失
        loss = criterion(logits, labels)
        total_loss += loss.item()

        # 反向传播优化目标网络
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    return total_loss / len(data_loader)

In [None]:
tot_NN = MultiLayerNetwork()
Single_NN = SingleLayerNetwork(28*28, 1000).to(device)
# 定义优化器和损失函数
optimizer = optim.Adam(Single_NN.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
readout_head = ReadoutHead(1000, 10).to(device)

for epoch in range(10):
    loss = train_with_readout(fixed_network=None, target_network=Single_NN, readout_head=readout_head, data_loader=trainloader, optimizer=optimizer, criterion=criterion, device=device)
    print(loss)

# 测试正确率

In [8]:
def evaluate_accuracy(target_network, readout_head, data_loader, device):
    # 固定网络、目标网络和读出头网络都设置为评估模式
    target_network.eval()
    readout_head.eval()

    correct = 0
    total = 0

    with torch.no_grad():  # 不计算梯度
        for inputs, labels in data_loader:
            inputs = inputs.view(inputs.shape[0], -1)  # 将图像展平
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = inputs  # 否则直接使用原始输入

            # 数据通过目标网络
            target_outputs = target_network(outputs)

            # 数据通过读出头网络
            logits = readout_head(target_outputs)

            # 预测类别
            _, predicted = torch.max(logits, dim=1)  # 取概率最大的类别

            # 统计正确预测的数量
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    # 计算并返回准确率
    accuracy = correct / total
    return accuracy

tot_NN.add(Single_NN)
evaluate_accuracy(target_network=tot_NN, readout_head=readout_head, data_loader=testloader, device=device)

0.923

# 构建K层的神经网络

In [9]:
tot_NN = MultiLayerNetwork()
input_size = 28*28
size_range = [1000, 1000, 1000, 1000, 1000]
for k, output_size in enumerate(size_range):
    # 初始化一个单层网络
    Single_NN = SingleLayerNetwork(input_size, output_size).to(device)
    optimizer = optim.Adam(Single_NN.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()
    readout_head = ReadoutHead(output_size, 10).to(device)

    # 训练该单层网络
    for epoch in range(30):
        loss = train_with_readout(fixed_network=tot_NN, target_network=Single_NN, readout_head=readout_head, data_loader=trainloader, optimizer=optimizer, criterion=criterion, device=device)
        print(loss)

    input_size = output_size
    tot_NN.add(Single_NN)

    eva_value = evaluate_accuracy(target_network=tot_NN, readout_head=readout_head, data_loader=testloader, device=device)
    print("evsl", eva_value)
    
final_eval = evaluate_accuracy(target_network=tot_NN, readout_head=readout_head, data_loader=testloader, device=device)
print(final_eval)

0.39017897546450214
0.240037655334737
0.18679260970877687
0.1542889989420041
0.13292533516296065
0.11369331996403396
0.09988672153821695
0.09081291814130157
0.0819651381656734
0.07396197762749375
0.06720978218013607
0.061354180634109134
0.05681880087311715
0.053389444656824984
0.047691436567858084
0.04489018160677048
0.04091853250278783
0.03896126953506671
0.03703258924239845
0.03283419119475135
0.031541944070131396
0.029273753394750055
0.027342400601459368
0.025680544145002183
0.023831345072176966
0.0222530340716771
0.020689605092351782
0.019981677201750286
0.018256018783570502
0.017523002866129362
evsl 0.9805
0.08407890707497541
0.03865458690187298
0.03232587335308224
0.02949823846217144
0.02302456708152606
0.025489770301875665
0.01771544928868135
0.021898657428793683
0.017370043202093212
0.01815270947103568
0.019246937517409652
0.013795591634961987
0.01341516855989248
0.014731566840778648
0.012769270338041767
0.00964233705460523
0.011626428903832033
0.014774192648094384
0.0071430674

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats as stats

def estimate_alpha_mle(data, x_min):
    """
    使用最大似然估计（MLE）计算幂律指数 α
    :param data: 观测数据（numpy 数组）
    :param x_min: 设定的最小阈值，幂律分布从 x_min 开始适用
    :return: 估计的 α
    """
    filtered_data = data[data >= x_min]  # 只选取大于等于 x_min 的数据
    n = len(filtered_data)  # 数据点数
    alpha = 1 + n / np.sum(np.log(filtered_data / x_min))
    return alpha

# 生成一个模拟的幂律分布数据
np.random.seed(42)
n_samples = 1000
alpha_true = 2.5  # 真实幂律指数
x_min = 1  # 设定最小阈值

# 生成服从幂律分布的数据（使用逆变换采样法）
random_values = np.random.uniform(size=n_samples)
data = x_min * (1 - random_values) ** (-1 / (alpha_true - 1))

# 估计幂律指数 α
alpha_estimated = estimate_alpha_mle(data, x_min)
print(f"估计的幂律指数 α: {alpha_estimated:.4f}")

# 绘制直方图（对数-对数图）
plt.figure(figsize=(8, 6))
hist, bins, _ = plt.hist(data, bins=50, density=True, alpha=0.6, color='b', label='数据直方图')
bin_centers = (bins[:-1] + bins[1:]) / 2

# 拟合一条直线（检查幂律特性）
slope, intercept, _, _, _ = stats.linregress(np.log(bin_centers), np.log(hist))
plt.plot(bin_centers, np.exp(intercept) * bin_centers**slope, 'r--', label=f'拟合: slope={slope:.2f}')

plt.xscale('log')
plt.yscale('log')
plt.xlabel('x')
plt.ylabel('P(x)')
plt.title('幂律分布的对数-对数图')
plt.legend()
plt.show()


ImportError: cannot import name 'VisibleDeprecationWarning' from 'numpy' (unknown location)