In [6]:
from examples import models
from torchvision import datasets, transforms
# from examples import DataLoader
from examples import data  # 这是示例代码中的一个配置文件
from pytorch_hebbian import config
import torchvision
from torch.utils.data import DataLoader
from pytorch_hebbian.learning_rules import KrotovsRule

from torch.utils.data import DataLoader
from torchvision import transforms, datasets
from pytorch_hebbian import config
from pytorch_hebbian.learning_rules import KrotovsRule
from pytorch_hebbian.optimizers import Local
from pytorch_hebbian.trainers import HebbianTrainer


In [None]:
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, RandomSampler # 导入 RandomSampler

# 假设这些是你项目中的模块
from pytorch_hebbian import config
from pytorch_hebbian.learning_rules import KrotovsRule
from pytorch_hebbian.optimizers import Local
from pytorch_hebbian.trainers import HebbianTrainer
from pytorch_hebbian import utils # 导入 utils 来使用 get_device (可选但推荐)

# --- 确定设备 ---
if torch.cuda.is_available():
    device = 'cuda'
    seed = 42 # 设定种子
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
else:
    device = 'cpu'
    seed = 42 # 设定种子
    torch.manual_seed(seed)
print(f"Using device: {device}")

# --- 修改这里：创建与设备匹配的生成器 ---
# 直接在创建时指定 device
g = torch.Generator(device=device)
g.manual_seed(seed) # 使用和全局一样的种子

# Creating the model
model = models.create_fc1_model([28 ** 2, 400])
model.to(device)

# Creating the dataset
dataset = datasets.MNIST(root=config.DATASETS_DIR, download=True,
                         transform=transforms.ToTensor())

# 在 DataLoader 中使用 generator
sampler = RandomSampler(dataset, generator=g)
train_loader = DataLoader(dataset, batch_size=1024, sampler=sampler, shuffle=False) # shuffle=False 因为 sampler 已经随机了


# Creating the learning rule, optimizer and trainer
learning_rule = KrotovsRule()
optimizer = Local(named_params=model.named_parameters(), lr=0.01)
trainer = HebbianTrainer(model=model, learning_rule=learning_rule,
                         optimizer=optimizer, device=device)

# Running the trainer
print("Starting training...")
trainer.run(train_loader=train_loader, epochs=1)
print("Training finished.")


Using device: cuda
Starting training...
Training finished.


In [12]:
# ... (你之前的代码保持不变) ...

# Running the trainer
print("Starting training...")
trainer.run(train_loader=train_loader, epochs=1) # 假设 Hebbian 训练已完成
print("Hebbian training finished.")

# --- 开始添加评估部分 ---
import torch.nn as nn
import torch.optim as optim
from ignite.metrics import Accuracy, Loss
from pytorch_hebbian.evaluators import SupervisedEvaluator # 导入评估器

print("\nStarting supervised training for the classifier layer...")

# 1. 冻结 Hebbian 训练过的层 (假设是 model 中的第一个线性层 linear1)
# 首先确认层的名字，根据 models.py 里的 create_fc1_model，它叫 'linear1'
for name, param in model.named_parameters():
    if 'linear1' in name: # 或者更精确地 if name.startswith('linear1.'):
        param.requires_grad = False
    else:
        # 确保其他层（特别是 linear2）是可训练的
        param.requires_grad = True
print("Froze parameters for layer 'linear1'.")

# 2. 定义损失函数和优化器 (只优化未冻结的参数，即 linear2)
criterion = nn.CrossEntropyLoss()
# 只把需要训练的参数（linear2 的参数）传给优化器
optimizer_sup = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-3)
# 或者更明确地指定:
# optimizer_sup = optim.Adam(model.linear2.parameters(), lr=1e-3) # 假设分类层叫 linear2

# 3. 进行监督训练 (训练分类层 linear2)
# 定义训练轮数
supervised_epochs = 5 # 可以根据需要调整

model.train() # 确保模型在训练模式
for epoch in range(supervised_epochs):
    epoch_loss = 0.0
    for batch_idx, (data, target) in enumerate(train_loader): # 使用训练集 train_loader
        data, target = data.to(device), target.to(device)

        optimizer_sup.zero_grad()
        outputs = model(data)
        loss = criterion(outputs, target)
        loss.backward()
        optimizer_sup.step()
        epoch_loss += loss.item()

    print(f"Supervised Epoch {epoch+1}/{supervised_epochs}, Loss: {epoch_loss / len(train_loader):.4f}")

print("Supervised training finished.")

# 4. 创建测试数据加载器 (你已经创建了 test_loader，但要确保数据集正确)
# 加载 MNIST 测试集
test_dataset = datasets.MNIST(root=config.DATASETS_DIR, train=False, download=True,
                              transform=transforms.ToTensor())
test_loader = DataLoader(test_dataset, batch_size=1024, shuffle=False) # 测试时通常不打乱

# 5. 创建评估器并进行评估
print("\nEvaluating on test set...")
model.eval() # 切换到评估模式

# 定义评估指标
metrics = {
    'accuracy': Accuracy(),
    'loss': Loss(criterion) # 使用和监督训练一样的损失函数
}



Starting training...
Hebbian training finished.

Starting supervised training for the classifier layer...
Froze parameters for layer 'linear1'.
Supervised Epoch 1/5, Loss: 0.5305
Supervised Epoch 2/5, Loss: 0.4913
Supervised Epoch 3/5, Loss: 0.4781
Supervised Epoch 4/5, Loss: 0.4666
Supervised Epoch 5/5, Loss: 0.4628
Supervised training finished.

Evaluating on test set...


In [None]:
# 把之前定义的 criterion 作为第二个参数传进去
evaluator = SupervisedEvaluator(model, criterion, metrics=metrics, device=device)

# 在测试集上运行评估
evaluator.run(test_loader)

# 6. 打印结果
# 通过 evaluator.engine 访问内部引擎的状态
results = evaluator.engine.state.metrics
accuracy = results['accuracy']
avg_loss = results['loss']
print(f"Test Results - Loss: {avg_loss:.4f} Accuracy: {accuracy:.4f}")
# --- 评估部分结束 ---

Test Results - Loss: 0.9308 Accuracy: 0.7220
