In [1]:
import torch
import torch.nn as nn
import os
import shutil
import time
from models import * # 假设你的 QuantConv2d 等定义在 models.py 中

class VGG16_Part1(nn.Module):
    def __init__(self, num_classes=10):
        super(VGG16_Part1, self).__init__()
        
        # 我们手动构建 features，以便精准插入修改层
        # 注意：这里假设你使用 CIFAR-10 类似的输入尺寸 (32x32)
        self.features = nn.Sequential(
            # --- Block 1 ---
            QuantConv2d(3, 64, kernel_size=3, padding=1), nn.BatchNorm2d(64), nn.ReLU(inplace=True),
            QuantConv2d(64, 64, kernel_size=3, padding=1), nn.BatchNorm2d(64), nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # --- Block 2 ---
            QuantConv2d(64, 128, kernel_size=3, padding=1), nn.BatchNorm2d(128), nn.ReLU(inplace=True),
            QuantConv2d(128, 128, kernel_size=3, padding=1), nn.BatchNorm2d(128), nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # --- Block 3 ---
            QuantConv2d(128, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(inplace=True),
            QuantConv2d(256, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(inplace=True),
            QuantConv2d(256, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # --- Block 4 ---
            QuantConv2d(256, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True),
            QuantConv2d(512, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True),
            QuantConv2d(512, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # --- Block 5 (修改区域) ---
            # 原本的第一层: 512 -> 512
            QuantConv2d(512, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True),
            
            # ================= PART 1 核心修改: 8x8 Squeezed Layer =================
            # 1. 适配层 (Adapter): 512 -> 8 (使用 1x1 卷积降维)
            QuantConv2d(512, 8, kernel_size=1), 
            nn.BatchNorm2d(8), # 这个BN可以保留，帮助收敛
            nn.ReLU(inplace=True),
            
            # 2. 目标层 (Target Layer for FPGA): 8 -> 8 (3x3 卷积)
            # 要求: 8 input, 8 output, NO Batch Norm
            QuantConv2d(8, 8, kernel_size=3, padding=1, bias=False), 
            nn.ReLU(inplace=True), # 直接接 ReLU，没有 BN
            
            # 3. 恢复层 (Expand): 8 -> 512 (使用 1x1 卷积升维，接回原网络)
            QuantConv2d(8, 512, kernel_size=1), 
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            # ===================================================================
            
            # Block 5 剩余部分
            QuantConv2d(512, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.AvgPool2d(kernel_size=1, stride=1),
        )
        
        self.classifier = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

model_part1 = VGG16_Part1().cuda()
print("VGG16_Part1 模型构建完成。目标 8x8 层已插入且去除了 BN。")

VGG16_Part1 模型构建完成。目标 8x8 层已插入且去除了 BN。


In [2]:
def load_pretrained_weights(model, pretrained_path):
    if os.path.isfile(pretrained_path):
        print(f"=> loading checkpoint '{pretrained_path}'")
        checkpoint = torch.load(pretrained_path)
        pretrained_dict = checkpoint['state_dict']
        model_dict = model.state_dict()
        
        # 1. 过滤掉形状不匹配的层 (即我们修改过的 Block 5 部分)
        # 2. 过滤掉名字不匹配的层 (虽然大部分名字应该匹配)
        pretrained_dict = {k: v for k, v in pretrained_dict.items() 
                           if k in model_dict and v.shape == model_dict[k].shape}
        
        # 更新当前模型权重
        model_dict.update(pretrained_dict)
        model.load_state_dict(model_dict)
        
        print(f"已加载预训练权重。忽略了 {len(model.state_dict()) - len(pretrained_dict)} 个不匹配的参数层（这些层将从头训练）。")
    else:
        print(f"在 '{pretrained_path}' 未找到 checkpoint")

# 请将此路径替换为你实际存放 >90% 模型的路径
PRETRAINED_PATH = "result/vgg16/model_best.pth.tar" 
load_pretrained_weights(model_part1, PRETRAINED_PATH)

=> loading checkpoint 'result/vgg16/model_best.pth.tar'
已加载预训练权重。忽略了 63 个不匹配的参数层（这些层将从头训练）。


In [3]:
import torch
import torchvision
import torchvision.transforms as transforms

# 1. 定义预处理 (Normalization)
# 这些均值和方差是 CIFAR-10 的标准参数
normalize = transforms.Normalize(mean=[0.491, 0.482, 0.447], std=[0.247, 0.243, 0.262])

# 2. 训练集加载器 (Train Loader)
train_dataset = torchvision.datasets.CIFAR10(
    root='./data', 
    train=True, 
    download=True,
    transform=transforms.Compose([
        transforms.RandomCrop(32, padding=4), # 数据增强
        transforms.RandomHorizontalFlip(),    # 数据增强
        transforms.ToTensor(),
        normalize,
    ]))

trainloader = torch.utils.data.DataLoader(
    train_dataset, 
    batch_size=128, 
    shuffle=True, 
    num_workers=2
)

# 3. 测试集加载器 (Test Loader)
test_dataset = torchvision.datasets.CIFAR10(
    root='./data', 
    train=False, 
    download=True,
    transform=transforms.Compose([
        transforms.ToTensor(),
        normalize,
    ]))

testloader = torch.utils.data.DataLoader(
    test_dataset, 
    batch_size=128, 
    shuffle=False, 
    num_workers=2
)

print("数据加载器 (trainloader, testloader) 已准备就绪！")

数据加载器 (trainloader, testloader) 已准备就绪！


In [4]:
import torch.optim as optim

# 1. 重新定义优化器 (初始 LR = 0.01)
optimizer = optim.SGD(model_part1.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)

# 2. 关键改进：添加学习率调度器 (LR Scheduler)
# 在第 20 轮和第 40 轮时，将学习率乘以 0.1 (即变为 0.001 和 0.0001)
scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[20, 40], gamma=0.1)

# 3. 定义 Loss
criterion = nn.CrossEntropyLoss().cuda()

def fine_tune_advanced(model, train_loader, test_loader, epochs=50):
    best_acc = 0
    model.cuda()
    
    for epoch in range(epochs):
        # --- Training ---
        model.train()
        for i, (input, target) in enumerate(train_loader):
            input, target = input.cuda(), target.cuda()
            
            optimizer.zero_grad()
            output = model(input)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
        
        # 更新学习率
        scheduler.step()
        current_lr = scheduler.get_last_lr()[0]
            
        # --- Validation ---
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for input, target in test_loader:
                input, target = input.cuda(), target.cuda()
                output = model(input)
                _, predicted = output.max(1)
                total += target.size(0)
                correct += predicted.eq(target).sum().item()
        
        acc = 100. * correct / total
        
        print(f"Epoch [{epoch+1}/{epochs}] (LR: {current_lr:.5f}) -> Test Accuracy: {acc:.2f}%")
        
        # 保存最佳模型
        if acc > best_acc:
            best_acc = acc
            torch.save({'state_dict': model.state_dict()}, "result/vgg_16_part1_best.pth.tar")
            print(f"   New Best found: {best_acc:.2f}% (Saved)")
            
    print(f" Advanced Fine-tuning 完成！最终最佳精度: {best_acc:.2f}%")
    return best_acc

# 4. 开始训练 (这次跑 50 轮，请耐心等待)
print("启动增强版训练 (含学习率衰减)...")
fine_tune_advanced(model_part1, trainloader, testloader, epochs=50)

启动增强版训练 (含学习率衰减)...
Epoch [1/50] (LR: 0.01000) -> Test Accuracy: 83.58%
   New Best found: 83.58% (Saved)
Epoch [2/50] (LR: 0.01000) -> Test Accuracy: 81.42%
Epoch [3/50] (LR: 0.01000) -> Test Accuracy: 82.62%
Epoch [4/50] (LR: 0.01000) -> Test Accuracy: 83.05%
Epoch [5/50] (LR: 0.01000) -> Test Accuracy: 82.51%
Epoch [6/50] (LR: 0.01000) -> Test Accuracy: 85.77%
   New Best found: 85.77% (Saved)
Epoch [7/50] (LR: 0.01000) -> Test Accuracy: 86.35%
   New Best found: 86.35% (Saved)
Epoch [8/50] (LR: 0.01000) -> Test Accuracy: 86.62%
   New Best found: 86.62% (Saved)
Epoch [9/50] (LR: 0.01000) -> Test Accuracy: 84.95%
Epoch [10/50] (LR: 0.01000) -> Test Accuracy: 86.94%
   New Best found: 86.94% (Saved)
Epoch [11/50] (LR: 0.01000) -> Test Accuracy: 86.81%
Epoch [12/50] (LR: 0.01000) -> Test Accuracy: 87.04%
   New Best found: 87.04% (Saved)
Epoch [13/50] (LR: 0.01000) -> Test Accuracy: 88.09%
   New Best found: 88.09% (Saved)
Epoch [14/50] (LR: 0.01000) -> Test Accuracy: 88.34%
   New Be

91.89

In [5]:
import numpy as np
import os

def generate_part1_verification_files(model, test_loader):
    print("正在准备生成验证文件...")
    model.eval()
    
    # 1. 定义钩子 (Hook) 用于捕获中间层数据
    captured_data = {}
    def get_activation(name):
        def hook(model, input, output):
            # input[0] 是进入该层的激活值
            captured_data[name] = input[0].detach()
        return hook

    # 2. 自动寻找目标 8x8 层
    target_layer = None
    target_layer_name = ""
    
    print("正在模型中搜索 8x8 卷积层...")
    for name, module in model.features.named_children():
        # 检查是否为 QuantConv2d 且输入输出通道均为 8
        if hasattr(module, 'in_channels') and module.in_channels == 8 and module.out_channels == 8:
            target_layer = module
            target_layer_name = name
            print(f"找到目标层: features[{name}] (In:8 -> Out:8)")
            break
    
    if target_layer is None:
        print("错误：未在模型中找到 8x8 的目标层。请检查 VGG16_Part1 类的定义是否正确。")
        return

    # 3. 注册 Hook
    handle = target_layer.register_forward_hook(get_activation("target_layer"))
    
    # 4. 运行一次前向传播 (取一个 Batch)
    print(" 正在运行一次前向传播以捕获数据...")
    dataiter = iter(test_loader)
    images, labels = next(dataiter)
    images = images.cuda()
    with torch.no_grad():
        model(images)
    
    # 5. 提取数据并转换为整数 (模拟硬件行为)
    if "target_layer" not in captured_data:
        print("错误：Hook 未捕获到数据。")
        return

    # 获取激活输入 (Batch, 8, H, W) -> 取第一张图 [0]
    act_float = captured_data["target_layer"][0] # Shape: [8, H, W]
    print(f"捕获输入数据形状: {act_float.shape}")
    
    # 获取量化参数
    # 兼容性处理：检查 alpha 是 Tensor 还是 float
    try:
        act_alpha = target_layer.act_alpha.item() if torch.is_tensor(target_layer.act_alpha) else target_layer.act_alpha
        wgt_alpha = target_layer.weight_quant.wgt_alpha.item() if torch.is_tensor(target_layer.weight_quant.wgt_alpha) else target_layer.weight_quant.wgt_alpha
    except AttributeError:
        print("警告：无法读取标准量化参数，尝试使用默认值或检查模型结构。")
        act_alpha = 10.0 # 默认值，仅作防止崩溃用
        wgt_alpha = 1.0
    
    w_bit = 4
    a_bit = 4
    
    # --- 转换输入 (Input) ---
    # Quantize: round(x / step)
    act_step = act_alpha / (2**a_bit - 1)
    # 模拟硬件：输入通常是无符号的 4-bit (0~15) 或者有符号 (根据你的硬件设计，通常 ReLU 后是无符号)
    # 这里假设 ReLU 后输入为正，映射到 0-15
    input_int = torch.round(act_float / act_step).clamp(0, 15).cpu().numpy().astype(int)
    
    # --- 转换权重 (Weight) ---
    # 权重通常是有符号 4-bit (-8 ~ 7)
    wgt_step = wgt_alpha / (2**(w_bit - 1) - 1)
    weight_int = torch.round(target_layer.weight / wgt_step).clamp(-8, 7).detach().cpu().numpy().astype(int)
    
    print(f"权重形状: {weight_int.shape}")
    
    # --- 计算预期输出 (Golden Output) ---
    print("正在计算 Golden Output (Psum Recovered)...")
    # 使用 PyTorch 的 Conv2d 来模拟整数卷积
    # 硬件公式: Psum = Input_Int * Weight_Int (累加)
    sim_conv = nn.Conv2d(8, 8, kernel_size=3, padding=1, bias=False)
    sim_conv.weight = nn.Parameter(torch.tensor(weight_int).float())
    
    input_tensor = torch.tensor(input_int).unsqueeze(0).float() # Add batch dim
    
    # 计算整数 Psum
    output_int_psum = sim_conv(input_tensor)
    
    # 模拟 ReLU (硬件最后一步)
    output_int_final = torch.relu(output_int_psum).detach().numpy().astype(int)
    
    # 6. 保存为 TXT 文件 (供 RTL 读取)
    output_dir = "part1_verification_data"
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    # 保存 Weight (Flattened) - 硬件读取顺序
    np.savetxt(f"{output_dir}/weight.txt", weight_int.flatten(), fmt='%d')
    
    # 保存 Input (Flattened)
    np.savetxt(f"{output_dir}/input.txt", input_int.flatten(), fmt='%d')
    
    # 保存 Output (Flattened)
    np.savetxt(f"{output_dir}/output.txt", output_int_final.flatten(), fmt='%d')
    
    print("="*40)
    print(f"   成功！验证文件已保存在: {output_dir}/")
    print(f"   - {output_dir}/input.txt  (大小: {input_int.size})")
    print(f"   - {output_dir}/weight.txt (大小: {weight_int.size})")
    print(f"   - {output_dir}/output.txt (大小: {output_int_final.size})")
    print("="*40)
    
    handle.remove()

#  激活调用：生成文件
generate_part1_verification_files(model_part1, testloader)

正在准备生成验证文件...
正在模型中搜索 8x8 卷积层...
找到目标层: features[40] (In:8 -> Out:8)
 正在运行一次前向传播以捕获数据...
捕获输入数据形状: torch.Size([8, 2, 2])
权重形状: (8, 8, 3, 3)
正在计算 Golden Output (Psum Recovered)...
   成功！验证文件已保存在: part1_verification_data/
   - part1_verification_data/input.txt  (大小: 32)
   - part1_verification_data/weight.txt (大小: 576)
   - part1_verification_data/output.txt (大小: 32)
