# ECC
## Train 不同精度模型

按照FP32、BF32、FP16、Int8、BF16进行Train 


Int8Train 在单独的cell中，使用了huggingface用于给LLM做Int8量化的包（对于resnet一样兼容）

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter

# 设置Train 参数
batch_size = 128
epochs = 10
learning_rate = 2e-5
momentum = 0.9

transform = transforms.Compose([
    transforms.Resize((224, 224)),  # 调整图像大小为224x224
    transforms.Grayscale(num_output_channels=3),  # 将单通道图像扩展为三通道
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

# 加载MNIST数据集
train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = datasets.MNIST(root='./data', train=False, transform=transform)

# 创建数据加载器
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

# 定义Train 函数
def train(model, device, precision,ifint8=False):
    # 设置模型为Train 模式
    model.train()
    
    # 创建优化器
    optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
    
    # 创建Loss函数
    criterion = nn.CrossEntropyLoss()
    
    # 创建Tensorboard日志写入器
    log_dir = f"/root/autodl-tmp/log/{precision}"
    os.makedirs(log_dir, exist_ok=True)
    writer = SummaryWriter(log_dir=log_dir)
    
    # Train 循环
    for epoch in range(epochs):
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            if precision in ['bf16', 'bf32']:
                data = data.bfloat16()
            elif precision == 'fp16':
                data = data.half()
            
            # 前向传播
            output = model(data)
            loss = criterion(output, target)
            
            # 反向传播和优化
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            # 记录Train Loss
            writer.add_scalar('Loss/train', loss.item(), epoch * len(train_loader) + batch_idx)
        
        # 在Test 集上评估模型
        test(model, device, precision, epoch, writer)
        # 保存模型参数
        checkpoint_dir = f"/root/autodl-tmp/checkpoint/{precision}"
        os.makedirs(checkpoint_dir, exist_ok=True)
        torch.save(model.state_dict(), f"{checkpoint_dir}/model_epoch_{epoch}.pth")


    writer.close()

# 定义Test 函数
def test(model, device, precision, epoch, writer):
    # 设置模型为评估模式
    model.eval()
    
    # 初始化Test Loss和Accuracy
    test_loss = 0
    correct = 0
    
    # 创建Loss函数
    criterion = nn.CrossEntropyLoss()
    
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            if precision in ['bf16', 'bf32']:
                data = data.bfloat16()
            elif precision == 'fp16':
                data = data.half()
            
            # 前向传播
            output = model(data)
            test_loss += criterion(output, target).item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    
    # 计算平均Test Loss和Accuracy
    test_loss /= len(test_loader)
    accuracy = 100. * correct / len(test_loader.dataset)
    
    # 记录Test Loss和Accuracy
    writer.add_scalar('Loss/test', test_loss, epoch)
    writer.add_scalar('Accuracy/test', accuracy, epoch)
    
    print(f"Epoch {epoch}, Precision {precision}, Test Loss: {test_loss:.4f}, Test Accuracy: {accuracy:.2f}%")

# Train 和Test 模型
precisions = ['fp32', 'bf32', 'fp16','bf16']

for precision in precisions:
    # 创建模型
    model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=False, num_classes=10)
    
    # 将模型移动到指定设备
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    
    # 设置模型精度
    if precision == 'fp32':
        model.float()
    elif precision == 'bf32':
        model.bfloat16()
    elif precision == 'fp16':
        model.half()
    elif precision == 'bf16':
        model.bfloat16()
    # Train 模型(除了int8以外的精度)
    train(model, device, precision)


## 对网络参数进行随机扰动

随机扰动：遍历模型，对每个参数按照比特位进行扰动。

1. FP32使用32位来表示每个参数，其中1位用于符号位，8位用于指数位，23位用于尾数位。

2. BF32是英特尔提出的一种优化的浮点格式，也使用32位来表示每个参数，与FP32相比，BF32减少了指数位的数量，增加了尾数位的数量，BF32使用1位表示符号位，8位表示指数位，23位表示尾数位。

3. FP16使用16位来表示每个参数，其中1位用于符号位，5位用于指数位，10位用于尾数位。

4. INT8使用8位来表示每个参数，INT8表示有符号整数，范围从-128到127。

5. BF16是Google提出的一种优化的浮点格式，使用16位来表示每个参数，与FP16相比，BF16减少了指数位的数量，增加了尾数位的数量，BF16使用1位表示符号位，8位表示指数位，7位表示尾数位。

-   FP32和BF32使用32位表示每个参数。
-   FP16和BF16使用16位表示每个参数。
-   INT8使用8位表示每个参数。

### FP32

FP32使用32位来表示每个参数，其中1位用于符号位，8位用于指数位，23位用于尾数位。

1. 引入四个变量，ERR_Rate表示引入错误的概率，从[1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6, 1e-7, 1e-8, 1e-9]取值，剩余三个变量为布尔型：ERR_01、ERR_02、ERR_03，用于控制符号位是否发生错误（bit翻转）、指数位是否发生错误（bit翻转）、尾数位是否发生错误（bit翻转）。

2. 对模型进行随机错误注入，将注入错误的模型保存在/media/tangshi/AI0011/笔记/信工所/ECC校验_正式/ERR_checkpoint对应的子文件夹下，如果子文件夹不存在则新建。

3. ERR_01、ERR_02、ERR_03分别单独设为True进行实验，也就是说，最终注入错误后你需要保存3*9=27个不同错误率的、不同错误发生位置的模型。

目前存在待优化的问题：

1. 即便多进程进行错误注入还是很慢，跑了好几天才把fp32的注入完，因此后续几种精度暂时只跑了注入一个错误的Test 代码。

对于任何非INT8的数，我们采用以下新的基于奇偶的ECC鉴别纠错：
众所周知，所有的数据在展开到bit级别都只有0和1，我们假设这是一串二进制编码，将它转化成整数之后就可以判别奇偶性。
因此，我基于此思想设计了一种新的ECC显存校验机制，我们通过对参数展开后的基础性进行分析，如果奇偶性正确，虽然这个参数还有可能出错，但我们判断这个参数是正确的。如果奇偶性错误，那么这个参数必然出错，我们将这个参数设为零。
同样的，Int8数据类型的参数就更加适用了！
请你记住这个想法。

```
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter

# 设置Train 参数
batch_size = 128
epochs = 10
learning_rate = 2e-5
momentum = 0.9

transform = transforms.Compose([
    transforms.Resize((224, 224)),  # 调整图像大小为224x224
    transforms.Grayscale(num_output_channels=3),  # 将单通道图像扩展为三通道
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

# 加载MNIST数据集
train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = datasets.MNIST(root='./data', train=False, transform=transform)

# 创建数据加载器
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

# 定义Train 函数
def train(model, device, precision,ifint8=False):
    # 设置模型为Train 模式
    model.train()
    
    # 创建优化器
    optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
    
    # 创建Loss函数
    criterion = nn.CrossEntropyLoss()
    
    # 创建Tensorboard日志写入器
    log_dir = f"/root/autodl-tmp/log/{precision}"
    os.makedirs(log_dir, exist_ok=True)
    writer = SummaryWriter(log_dir=log_dir)
    
    # Train 循环
    for epoch in range(epochs):
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            if precision in ['bf16', 'bf32']:
                data = data.bfloat16()
            elif precision == 'fp16':
                data = data.half()
            
            # 前向传播
            output = model(data)
            loss = criterion(output, target)
            
            # 反向传播和优化
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            # 记录Train Loss
            writer.add_scalar('Loss/train', loss.item(), epoch * len(train_loader) + batch_idx)
        
        # 在Test 集上评估模型
        test(model, device, precision, epoch, writer)
        # 保存模型参数
        checkpoint_dir = f"/root/autodl-tmp/checkpoint/{precision}"
        os.makedirs(checkpoint_dir, exist_ok=True)
        torch.save(model.state_dict(), f"{checkpoint_dir}/model_epoch_{epoch}.pth")


    writer.close()

# 定义Test 函数
def test(model, device, precision, epoch, writer):
    # 设置模型为评估模式
    model.eval()
    
    # 初始化Test Loss和ACC
    test_loss = 0
    correct = 0
    
    # 创建Loss函数
    criterion = nn.CrossEntropyLoss()
    
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            if precision in ['bf16', 'bf32']:
                data = data.bfloat16()
            elif precision == 'fp16':
                data = data.half()
            
            # 前向传播
            output = model(data)
            test_loss += criterion(output, target).item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    
    # 计算平均Test Loss和Accuracy
    test_loss /= len(test_loader)
    accuracy = 100. * correct / len(test_loader.dataset)
    
    # 记录Test Loss和Accuracy
    writer.add_scalar('Loss/test', test_loss, epoch)
    writer.add_scalar('Accuracy/test', accuracy, epoch)
    
    print(f"Epoch {epoch}, Precision {precision}, Test Loss: {test_loss:.4f}, Test Accuracy: {accuracy:.2f}%")

# Train 和Test 模型
precisions = ['fp32', 'bf32', 'fp16','bf16']

for precision in precisions:
    # 创建模型
    model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=False, num_classes=10)
    
    # 将模型移动到指定设备
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    
    # 设置模型精度
    if precision == 'fp32':
        model.float()
    elif precision == 'bf32':
        model.bfloat16()
    elif precision == 'fp16':
        model.half()
    elif precision == 'bf16':
        model.bfloat16()
    # Train 模型(除了int8以外的精度)
    train(model, device, precision)

```
以上为Train 代码
```
import torch
import os
import random
import numpy as np
import csv

precision = 'fp32'
epochs = 1
checkpoint_dir = f"/root/autodl-tmp/checkpoint/{precision}"
model_path = f"{checkpoint_dir}/model_epoch_{epochs-1}.pth"

# 检查是否有可用的GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=False, num_classes=10)
model.load_state_dict(torch.load(model_path, map_location=device))
model = model.to(device).float()

ERR_Rates = [1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6, 1e-7, 1e-8, 1e-9]

def inject_error(tensor, err_rate, err_01, err_02, err_03):
    tensor_copy = tensor.clone().cpu().numpy()  # Move tensor to CPU for numpy operations
    total_bits = tensor_copy.nbytes * 8
    num_errors = int(total_bits * err_rate)

    error_indices = []
    tensor_bytes = tensor_copy.tobytes()
    tensor_int8 = np.frombuffer(tensor_bytes, dtype=np.int8)
    tensor_int8_writable = np.copy(tensor_int8)  # 创建一个可写的副本

    for _ in range(num_errors):
        bit_pos = random.randint(0, total_bits - 1)
        byte_pos, bit_offset = divmod(bit_pos, 8)

        if err_01 and bit_offset == 31:  # 符号位错误
            tensor_int8_writable[byte_pos] ^= 1 << (bit_offset % 8)
            error_indices.append(bit_pos)
        elif err_02 and 23 <= bit_offset <= 30:  # 指数位错误
            tensor_int8_writable[byte_pos] ^= 1 << (bit_offset % 8)
            error_indices.append(bit_pos)
        elif err_03 and bit_offset < 23:  # 尾数位错误
            tensor_int8_writable[byte_pos] ^= 1 << (bit_offset % 8)
            error_indices.append(bit_pos)

    tensor_copy = np.frombuffer(tensor_int8_writable.tobytes(), dtype=tensor_copy.dtype).reshape(tensor_copy.shape)
    tensor_copy_writable = np.copy(tensor_copy)  # 创建一个可写的副本
    return torch.from_numpy(tensor_copy_writable).to(device), error_indices  # Move tensor back to the original device

def save_error_injected_model(model, err_rate, err_01, err_02, err_03):
    err_checkpoint_dir = f"/root/autodl-tmp/ERR_checkpoint/FP32/ERR_Rate_{err_rate}"
    os.makedirs(err_checkpoint_dir, exist_ok=True)

    model_state_dict = model.state_dict()
    error_dict = {}
    for key, tensor in model_state_dict.items():
        model_state_dict[key], error_indices = inject_error(tensor, err_rate, err_01, err_02, err_03)
        error_dict[key] = error_indices

    model.load_state_dict(model_state_dict)
    torch.save(model.state_dict(), f"{err_checkpoint_dir}/model_ERR_01_{err_01}_ERR_02_{err_02}_ERR_03_{err_03}.pth")

    with open(f"{err_checkpoint_dir}/error_log_ERR_01_{err_01}_ERR_02_{err_02}_ERR_03_{err_03}.csv", 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Parameter', 'Error Indices'])
        for key, indices in error_dict.items():
            writer.writerow([key, indices])

if __name__ == '__main__':
    for err_rate in ERR_Rates:
        print(f"Processing error rate: {err_rate}")
        save_error_injected_model(model, err_rate, True, False, False)  # ERR_01 = True
        save_error_injected_model(model, err_rate, False, True, False)  # ERR_02 = True
        save_error_injected_model(model, err_rate, False, False, True)  # ERR_03 = True
    
    print("All experiments completed.")
```
FP32使用32位来表示每个参数，其中1位用于符号位，8位用于指数位，23位用于尾数位。

1. 引入四个变量，ERR_Rate表示引入错误的概率，从[1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6, 1e-7, 1e-8, 1e-9]取值，剩余三个变量为布尔型：ERR_01、ERR_02、ERR_03，用于控制符号位是否发生错误（bit翻转）、指数位是否发生错误（bit翻转）、尾数位是否发生错误（bit翻转）。

2. 对模型进行随机错误注入，将注入错误的模型保存在/media/tangshi/AI0011/笔记/信工所/ECC校验_正式/ERR_checkpoint对应的子文件夹下，如果子文件夹不存在则新建。

3. ERR_01、ERR_02、ERR_03分别单独设为True进行实验，也就是说，最终注入错误后你需要保存3*9=27个不同错误率的、不同错误发生位置的模型。

目前存在待优化的问题：

1. 即便多进程进行错误注入还是很慢，跑了好几天才把fp32的注入完，因此后续几种精度暂时只跑了注入一个错误的Test 代码。

以上就是对于FP32数据的错误注入代码。

你需要做的是：
使用

In [None]:
import torch
import os
import random
import numpy as np
import csv

precision = 'fp32'
epochs = 1
checkpoint_dir = f"/root/autodl-tmp/checkpoint/{precision}"
model_path = f"{checkpoint_dir}/model_epoch_{epochs-1}.pth"

# 检查是否有可用的GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=False, num_classes=10)
model.load_state_dict(torch.load(model_path, map_location=device))
model = model.to(device).float()

ERR_Rates = [1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6, 1e-7, 1e-8, 1e-9]

def inject_error(tensor, err_rate, err_01, err_02, err_03):
    tensor_copy = tensor.clone().cpu().numpy()  # Move tensor to CPU for numpy operations
    total_bits = tensor_copy.nbytes * 8
    num_errors = int(total_bits * err_rate)

    error_indices = []
    tensor_bytes = tensor_copy.tobytes()
    tensor_int8 = np.frombuffer(tensor_bytes, dtype=np.int8)
    tensor_int8_writable = np.copy(tensor_int8)  # 创建一个可写的副本

    for _ in range(num_errors):
        bit_pos = random.randint(0, total_bits - 1)
        byte_pos, bit_offset = divmod(bit_pos, 8)

        if err_01 and bit_offset == 31:  # 符号位错误
            tensor_int8_writable[byte_pos] ^= 1 << (bit_offset % 8)
            error_indices.append(bit_pos)
        elif err_02 and 23 <= bit_offset <= 30:  # 指数位错误
            tensor_int8_writable[byte_pos] ^= 1 << (bit_offset % 8)
            error_indices.append(bit_pos)
        elif err_03 and bit_offset < 23:  # 尾数位错误
            tensor_int8_writable[byte_pos] ^= 1 << (bit_offset % 8)
            error_indices.append(bit_pos)

    tensor_copy = np.frombuffer(tensor_int8_writable.tobytes(), dtype=tensor_copy.dtype).reshape(tensor_copy.shape)
    tensor_copy_writable = np.copy(tensor_copy)  # 创建一个可写的副本
    return torch.from_numpy(tensor_copy_writable).to(device), error_indices  # Move tensor back to the original device

def save_error_injected_model(model, err_rate, err_01, err_02, err_03):
    err_checkpoint_dir = f"/root/autodl-tmp/ERR_checkpoint/FP32/ERR_Rate_{err_rate}"
    os.makedirs(err_checkpoint_dir, exist_ok=True)

    model_state_dict = model.state_dict()
    error_dict = {}
    for key, tensor in model_state_dict.items():
        model_state_dict[key], error_indices = inject_error(tensor, err_rate, err_01, err_02, err_03)
        error_dict[key] = error_indices

    model.load_state_dict(model_state_dict)
    torch.save(model.state_dict(), f"{err_checkpoint_dir}/model_ERR_01_{err_01}_ERR_02_{err_02}_ERR_03_{err_03}.pth")

    with open(f"{err_checkpoint_dir}/error_log_ERR_01_{err_01}_ERR_02_{err_02}_ERR_03_{err_03}.csv", 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Parameter', 'Error Indices'])
        for key, indices in error_dict.items():
            writer.writerow([key, indices])

if __name__ == '__main__':
    for err_rate in ERR_Rates:
        print(f"Processing error rate: {err_rate}")
        save_error_injected_model(model, err_rate, True, False, False)  # ERR_01 = True
        save_error_injected_model(model, err_rate, False, True, False)  # ERR_02 = True
        save_error_injected_model(model, err_rate, False, False, True)  # ERR_03 = True
    
    print("All experiments completed.")

### BF32

同FP32

In [None]:
import torch
import os
import random
import numpy as np
import csv

precision = 'bf32'
epochs = 1
checkpoint_dir = f"/root/autodl-tmp/checkpoint/{precision}"
model_path = f"{checkpoint_dir}/model_epoch_{epochs-1}.pth"

# 检查是否有可用的GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=False, num_classes=10)
model.load_state_dict(torch.load(model_path, map_location=device))
model = model.to(device).bfloat16()

ERR_Rates = [1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6, 1e-7, 1e-8, 1e-9]

def inject_error(tensor, err_rate, err_01, err_02, err_03):
    tensor_copy = tensor.clone().float().cpu().numpy()  # Move to CPU and convert to float before numpy conversion
    total_bits = tensor_copy.nbytes * 8
    num_errors = int(total_bits * err_rate)

    error_indices = []
    tensor_bytes = tensor_copy.tobytes()
    tensor_int8 = np.frombuffer(tensor_bytes, dtype=np.int8)
    tensor_int8_writable = np.copy(tensor_int8)

    for _ in range(num_errors):
        bit_pos = random.randint(0, total_bits - 1)
        byte_pos, bit_offset = divmod(bit_pos, 8)

        if err_01 and bit_offset == 31:  # 符号位错误
            tensor_int8_writable[byte_pos] ^= 1 << (bit_offset % 8)
            error_indices.append(bit_pos)
        elif err_02 and 23 <= bit_offset <= 30:  # 指数位错误
            tensor_int8_writable[byte_pos] ^= 1 << (bit_offset % 8)
            error_indices.append(bit_pos)
        elif err_03 and bit_offset < 23:  # 尾数位错误
            tensor_int8_writable[byte_pos] ^= 1 << (bit_offset % 8)
            error_indices.append(bit_pos)

    tensor_copy = np.frombuffer(tensor_int8_writable.tobytes(), dtype=tensor_copy.dtype).reshape(tensor_copy.shape)
    tensor_copy_writable = np.copy(tensor_copy)
    return torch.from_numpy(tensor_copy_writable).to(device).bfloat16(), error_indices  # Move back to GPU and convert to bfloat16

def save_error_injected_model(model, err_rate, err_01, err_02, err_03):
    err_checkpoint_dir = f"/root/autodl-tmp/ERR_checkpoint/BF32/ERR_Rate_{err_rate}"
    os.makedirs(err_checkpoint_dir, exist_ok=True)

    model_state_dict = model.state_dict()
    error_dict = {}
    for key, tensor in model_state_dict.items():
        model_state_dict[key], error_indices = inject_error(tensor, err_rate, err_01, err_02, err_03)
        error_dict[key] = error_indices

    model.load_state_dict(model_state_dict)
    torch.save(model.state_dict(), f"{err_checkpoint_dir}/model_ERR_01_{err_01}_ERR_02_{err_02}_ERR_03_{err_03}.pth")

    with open(f"{err_checkpoint_dir}/error_log_ERR_01_{err_01}_ERR_02_{err_02}_ERR_03_{err_03}.csv", 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Parameter', 'Error Indices'])
        for key, indices in error_dict.items():
            writer.writerow([key, indices])

if __name__ == '__main__':
    for err_rate in ERR_Rates:
        print(f"Processing error rate: {err_rate}")
        save_error_injected_model(model, err_rate, True, False, False)  # ERR_01 = True
        save_error_injected_model(model, err_rate, False, True, False)  # ERR_02 = True
        save_error_injected_model(model, err_rate, False, False, True)  # ERR_03 = True
    
    print("All experiments completed.")

### FP16

类似FP32

In [None]:
import torch
import os
import random
import numpy as np
import csv

precision = 'fp16'
epochs = 1
checkpoint_dir = f"/root/autodl-tmp/checkpoint/{precision}"
model_path = f"{checkpoint_dir}/model_epoch_{epochs-1}.pth"

# 检查是否有可用的GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=False, num_classes=10)
model.load_state_dict(torch.load(model_path, map_location=device))
model = model.to(device).half()

ERR_Rates = [1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6, 1e-7, 1e-8, 1e-9]

def inject_error(tensor, err_rate, err_01, err_02, err_03):
    tensor = tensor.cpu()  # Move tensor to CPU for numpy operations
    if tensor.dim() == 0:
        # 处理标量值
        tensor_copy = np.array([tensor.item()], dtype=np.float16)
    else:
        tensor_copy = tensor.clone().numpy().view(np.uint16)

    total_bits = tensor_copy.nbytes * 8
    num_errors = int(total_bits * err_rate)

    error_indices = []
    tensor_bytes = tensor_copy.tobytes()
    tensor_uint16 = np.frombuffer(tensor_bytes, dtype=np.uint16)
    tensor_uint16_writable = np.copy(tensor_uint16)  # 创建一个可写的副本

    for _ in range(num_errors):
        bit_pos = random.randint(0, total_bits - 1)
        uint16_pos, bit_offset = divmod(bit_pos, 16)

        if err_01 and bit_offset == 15:  # 符号位错误
            tensor_uint16_writable[uint16_pos] ^= 1 << (bit_offset % 16)
            error_indices.append(bit_pos)
        elif err_02 and 10 <= bit_offset <= 14:  # 指数位错误
            tensor_uint16_writable[uint16_pos] ^= 1 << (bit_offset % 16)
            error_indices.append(bit_pos)
        elif err_03 and bit_offset < 10:  # 尾数位错误
            tensor_uint16_writable[uint16_pos] ^= 1 << (bit_offset % 16)
            error_indices.append(bit_pos)

    tensor_copy = np.frombuffer(tensor_uint16_writable.tobytes(), dtype=np.float16).reshape(tensor_copy.shape)
    tensor_copy_writable = np.copy(tensor_copy)  # 创建一个可写的副本

    if tensor.dim() == 0:
        # 将标量值转换回张量
        return torch.tensor(tensor_copy_writable[0], device=device), error_indices
    else:
        return torch.from_numpy(tensor_copy_writable).to(device), error_indices

def save_error_injected_model(model, err_rate, err_01, err_02, err_03):
    err_checkpoint_dir = f"/root/autodl-tmp/ERR_checkpoint/FP16/ERR_Rate_{err_rate}"
    os.makedirs(err_checkpoint_dir, exist_ok=True)

    model_state_dict = model.state_dict()
    error_dict = {}
    for key, tensor in model_state_dict.items():
        model_state_dict[key], error_indices = inject_error(tensor, err_rate, err_01, err_02, err_03)
        error_dict[key] = error_indices

    model.load_state_dict(model_state_dict)
    torch.save(model.state_dict(), f"{err_checkpoint_dir}/model_ERR_01_{err_01}_ERR_02_{err_02}_ERR_03_{err_03}.pth")

    with open(f"{err_checkpoint_dir}/error_log_ERR_01_{err_01}_ERR_02_{err_02}_ERR_03_{err_03}.csv", 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Parameter', 'Error Indices'])
        for key, indices in error_dict.items():
            writer.writerow([key, indices])

if __name__ == '__main__':
    for err_rate in ERR_Rates:
        print(f"Processing error rate: {err_rate}")
        save_error_injected_model(model, err_rate, True, False, False)  # ERR_01 = True
        save_error_injected_model(model, err_rate, False, True, False)  # ERR_02 = True
        save_error_injected_model(model, err_rate, False, False, True)  # ERR_03 = True
    
    print("All experiments completed.")

### BF16

In [None]:
import torch
import os
import random
import numpy as np
import csv

precision = 'bf16'
epochs = 1
checkpoint_dir = f"/root/autodl-tmp/checkpoint/{precision}"
model_path = f"{checkpoint_dir}/model_epoch_{epochs-1}.pth"

# 检查是否有可用的GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=False, num_classes=10)
model.load_state_dict(torch.load(model_path, map_location=device))
model = model.to(device).bfloat16()

ERR_Rates = [1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6, 1e-7, 1e-8, 1e-9]

def inject_error(tensor, err_rate, err_01, err_02):
    tensor_copy = tensor.clone().float().cpu().numpy()  # Move to CPU and convert to float before numpy conversion
    total_bits = tensor_copy.nbytes * 8
    num_errors = int(total_bits * err_rate)

    error_indices = []
    tensor_bytes = tensor_copy.tobytes()
    tensor_int8 = np.frombuffer(tensor_bytes, dtype=np.int8)
    tensor_int8_writable = np.copy(tensor_int8)  # 创建一个可写的副本

    for _ in range(num_errors):
        bit_pos = random.randint(0, total_bits - 1)
        byte_pos, bit_offset = divmod(bit_pos, 8)

        if err_01 and bit_offset == 15:  # 符号位错误
            tensor_int8_writable[byte_pos] ^= 1 << (bit_offset % 8)
            error_indices.append(bit_pos)
        elif err_02 and bit_offset < 15:  # 尾数位错误
            tensor_int8_writable[byte_pos] ^= 1 << (bit_offset % 8)
            error_indices.append(bit_pos)

    tensor_copy = np.frombuffer(tensor_int8_writable.tobytes(), dtype=tensor_copy.dtype).reshape(tensor_copy.shape)
    tensor_copy_writable = np.copy(tensor_copy)  # 创建一个可写的副本
    return torch.from_numpy(tensor_copy_writable).to(device).bfloat16(), error_indices  # Move back to GPU and convert to bfloat16

def save_error_injected_model(model, err_rate, err_01, err_02):
    err_checkpoint_dir = f"/root/autodl-tmp/ERR_checkpoint/BF16/ERR_Rate_{err_rate}"
    os.makedirs(err_checkpoint_dir, exist_ok=True)

    model_state_dict = model.state_dict()
    error_dict = {}
    for key, tensor in model_state_dict.items():
        model_state_dict[key], error_indices = inject_error(tensor, err_rate, err_01, err_02)
        error_dict[key] = error_indices

    model.load_state_dict(model_state_dict)
    torch.save(model.state_dict(), f"{err_checkpoint_dir}/model_ERR_01_{err_01}_ERR_02_{err_02}.pth")

    with open(f"{err_checkpoint_dir}/error_log_ERR_01_{err_01}_ERR_02_{err_02}.csv", 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Parameter', 'Error Indices'])
        for key, indices in error_dict.items():
            writer.writerow([key, indices])

if __name__ == '__main__':
    for err_rate in ERR_Rates:
        print(f"Processing error rate: {err_rate}")
        save_error_injected_model(model, err_rate, True, False)  # ERR_01 = True
        save_error_injected_model(model, err_rate, False, True)  # ERR_02 = True
    
    print("All experiments completed.")

### Int8

第一个cell是hf的量化包，理论上这是最高效的int8量化，但是没法直接用python操作底层存储，只能当成fp32进行错误注入，和实际向int8进行错误注入存在偏差

第二个cell是模拟float32转成int8之后再进行错误注入的代码，实际底层还是float32

In [None]:
import torch
import torchvision
import numpy as np
import os

# 定义量化方式
adc2 = []
func2 = lambda x: (x / 128 - 1) * 0.6
for i in range(0, 256):
    adc2.append((func2(i), func2(i+1), func2(i+0.5), i))
adc2[0] = (-999, adc2[0][1], adc2[0][2], adc2[0][3])
adc2[-1] = (adc2[-1][0], 999, adc2[-1][2], adc2[-1][3])

def encode1(origin):
    return ((origin & 0xf) << 4) | (origin >> 4)

def encode2(c):
    c = (c & 0x55) << 1 | (c & 0xAA) >> 1
    c = (c & 0x33) << 2 | (c & 0xCC) >> 2
    c = (c & 0x0F) << 4 | (c & 0xF0) >> 4
    return c

def encode3(c):
    return encode1(encode2(c))

def encode(origin, err_mask, stuck_at_state):
    err_mask00, stuck_at_state00 = err_mask, stuck_at_state
    err_mask10, stuck_at_state10 = encode1(err_mask), encode1(stuck_at_state)
    err_mask01, stuck_at_state01 = encode2(err_mask), encode2(stuck_at_state)
    err_mask11, stuck_at_state11 = encode3(err_mask), encode3(stuck_at_state)

    cor00 = (~err_mask00 & origin) | stuck_at_state00
    cor01 = (~err_mask01 & origin) | stuck_at_state01
    cor10 = (~err_mask10 & origin) | stuck_at_state10
    cor11 = (~err_mask11 & origin) | stuck_at_state11

    result = torch.stack((cor00, cor01, cor10, cor11))
    diff_index = torch.argmin(torch.sum(torch.abs(result - origin), dim=1))
    return result[diff_index]

# 注入错误
def inject_errors(params, width, err_rate):
    for param in params:
        if param.numel() < 1000:
            continue

        device = param.device  # 获取当前参数所在的设备

        t = (0 * param.data).int().to(device)
        for s, e, _, dig in adc2:
            t += ((param.data >= s) * (param.data < e)).int() * dig

        mask = torch.zeros(t.shape, device=device, dtype=torch.int32)
        for jj in range(7):
            bitmask = (torch.rand(mask.shape, device=device) < err_rate).int()
            mask += bitmask * (1 << jj)

        stuckat = ((torch.rand(t.shape, device=device) * 256).int()) & mask

        t = t.view(-1)
        mask = mask.view(-1)
        stuckat = stuckat.view(-1)
        for i0 in range(0, t.numel(), width):
            t[i0:i0+width] = encode(t[i0:i0+width], mask[i0:i0+width], stuckat[i0:i0+width])

        t = t.reshape(param.shape)
        t2 = torch.zeros_like(param.data)
        for _, _, ana, dig in adc2:
            t2 += (t == dig).float() * ana

        param.data = t2.float()

# 主程序
precision = 'fp32'
epochs = 1
checkpoint_dir = f"/root/autodl-tmp/checkpoint/{precision}"
output_dir = "/root/autodl-tmp/ERR_checkpoint/int8"

# 错误率列表
ERR_Rates = [1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6, 1e-7, 1e-8, 1e-9]

# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)

# 检查是否有可用的GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

for epoch in range(1, epochs + 1):
    model_path = f"{checkpoint_dir}/model_epoch_{epoch-1}.pth"
    
    # 加载模型
    model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=False, num_classes=10)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.float()
    
    # 对每个错误率进行处理
    for err_rate in ERR_Rates:
        # 创建模型的深拷贝，以便每个错误率都从原始模型开始
        model_copy = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=False, num_classes=10)
        model_copy.load_state_dict(model.state_dict())
        model_copy.float().to(device)
        
        # 获取模型参数并注入错误
        params = [param for param in model_copy.parameters()]
        inject_errors(params, width=1024, err_rate=err_rate)
        
        # 保存处理后的模型
        output_path = f"{output_dir}/model_epoch_{epoch-1}_err_rate_{err_rate:.1e}_int8.pth"
        torch.save(model_copy.state_dict(), output_path)
        
        print(f"Processed and saved model for epoch {epoch}, error rate {err_rate:.1e}")

print("All models have been processed and saved.")

## 传统汉明码方法ECC检错修正

使用了Hamming(72,64)码校验。

1. 编码 (hamming_encode 函数):
   - 输入64位数据，输出72位编码。
   - 在72位编码中，位置 1, 2, 4, 8, 16, 32, 64 (即 2^n - 1) 被保留为校验位。
   - 其他位置填充原始数据。
   - 每个校验位通过异或运算计算，检查特定的数据位。
   - 校验位的计算利用了位操作，每个校验位负责检查二进制表示中对应位为1的所有位置。

2. 解码和纠错 (hamming_decode 函数):
   - 输入72位编码数据。
   - 计算syndrome（错误指示器）：
     - 对每个校验位，重新计算其值并与接收到的值比较。
     - 如果不匹配，将对应的位在syndrome中置1。
   - 如果syndrome不为0：
     - 如果syndrome值小于等于72，表示单比特错误，直接纠正对应位置。
     - 如果syndrome值大于72，表示检测到双比特错误，无法纠正。
   - 最后，提取原始64位数据。

3. 错误注入和纠正过程 (在radom_err_resnet 函数中):
   - 将模型参数转换为位字符串。
   - 对每64位进行汉明编码。
   - 随机注入错误，翻转一些比特。
   - 使用汉明解码尝试恢复原始数据。
   - 将恢复的数据重新加载到模型中。


In [None]:
import random
import numpy as np
import torch
import torch.nn as nn
from torchvision import transforms
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader
import os
import sys
import random
import numpy as np
import torch
import torch.nn as nn
from torchvision import transforms
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader
import os
import sys
import multiprocessing as mp
import torchvision.models as models

class ResNet18(nn.Module):
    def __init__(self, num_classes=10):
        super(ResNet18, self).__init__()
        # 加载预Train 的 ResNet-18 模型
        self.model = models.resnet18(pretrained=True)
        
        # 替换最后的全连接层，以匹配目标分类数量
        num_ftrs = self.model.fc.in_features
        self.model.fc = nn.Linear(num_ftrs, num_classes)

    def forward(self, x):
        return self.model(x)

# 初始化设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")

# Hamming(72,64) ECC编码
def hamming_encode(data):
    data = data.zfill(64)  # 如果数据长度不足64,在左侧填充0
    encoded = ['0'] * 72
    data_index = 0
    for i in range(72):
        if i == 0 or i == 1 or i == 3 or i == 7 or i == 15 or i == 31 or i == 63:
            continue
        if data_index < len(data):
            encoded[i] = data[data_index]
            data_index += 1
    
    for i in range(7):
        parity = 0
        for j in range(72):
            if j & (1 << i):
                parity ^= int(encoded[j])
        encoded[2**i - 1] = str(parity)
    
    return ''.join(encoded)

# Hamming(72,64) ECC解码和纠错
def hamming_decode(encoded_data):
    assert len(encoded_data) == 72
    syndrome = 0
    for i in range(7):
        parity = 0
        for j in range(72):
            if j & (1 << i):
                parity ^= int(encoded_data[j])
        if parity:
            syndrome |= (1 << i)
    
    if syndrome:
        if syndrome <= 72:
            # 单比特错误，纠正
            corrected = list(encoded_data)
            corrected[syndrome - 1] = str(1 - int(corrected[syndrome - 1]))
            encoded_data = ''.join(corrected)
            print(f"检测到单比特错误，已纠正。错误位置: {syndrome - 1}")
        else:
            print("检测到双比特错误，无法纠正。")
    
    decoded = ''
    for i in range(72):
        if i != 0 and i != 1 and i != 3 and i != 7 and i != 15 and i != 31 and i != 63:
            decoded += encoded_data[i]
    
    return decoded

def radom_err_resnet(model, dataset_name, r=1e-4):
    print(f"\n开始处理数据集: {dataset_name}, 错误率: {r:.1e}")
    
    if dataset_name in ["mnist"]:
        transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,)),
            transforms.Lambda(lambda x: x.repeat(3, 1, 1))
        ])
    else:
        transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        ])

    data_path = "./data"
    if dataset_name == "mnist":
        test_dataset = MNIST(root=data_path, train=False, download=True, transform=transform)
    test_loader = DataLoader(test_dataset, batch_size=128)
    print(f"数据加载完成，Test 集大小: {len(test_dataset)}")

    print("开始处理模型参数...")
    with torch.no_grad():
        all_params = torch.cat([param.view(-1) for param in model.parameters()])
        param_bits = ''.join(format(b, '08b') for b in all_params.cpu().numpy().tobytes())
        
        encoded_params = [hamming_encode(param_bits[i:i+64]) for i in range(0, len(param_bits), 64)]
        print(f"ECC编码完成，编码后参数数量: {len(encoded_params)}")
        
        error_count = 0
        for i in range(len(encoded_params)):
            for j in range(len(encoded_params[i])):
                if random.random() < r:
                    encoded_params[i] = encoded_params[i][:j] + str(1 - int(encoded_params[i][j])) + encoded_params[i][j+1:]
                    error_count += 1
        print(f"模拟错误完成，总计引入错误: {error_count}")
        
        decoded_params = ''.join(hamming_decode(p) for p in encoded_params)
        print("ECC解码和纠错完成")
        
        param_np = np.frombuffer(bytes(int(decoded_params[i:i+8], 2) for i in range(0, len(decoded_params), 8)), dtype=np.float32)
        param_tensor = torch.from_numpy(param_np).to(device)
        
        # 将解码后的参数重新加载到模型中
        start = 0
        for param in model.parameters():
            num_param = param.numel()
            param.data = param_tensor[start:start+num_param].view(param.size())
            start += num_param
        print(f"参数重新加载到模型完成")

    print("开始模型评估...")
    model.eval()
    criterion = nn.CrossEntropyLoss()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(test_loader):
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
            if batch_idx % 10 == 0:
                print(f"已评估 {batch_idx * len(data)} / {len(test_loader.dataset)} 个样本")

    test_loss /= len(test_loader)
    accuracy = correct / len(test_loader.dataset)
    
    print(f"评估完成。数据集: {dataset_name}, 错误率: {r:.1e}")
    print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {accuracy:.4f}")
    print("---")

def process_error_rate(model, dataset_name, err_r):
    radom_err_resnet(model, dataset_name, err_r)

def process_dataset(dataset_name):
    checkpoint_dir = "/media/tangshi/AI0011/笔记/信工所/ECC校验/checkpoint/resnet18"
    log_dir = "/media/tangshi/AI0011/笔记/信工所/ECC校验/log/resnet18"

    best_epoch = None
    best_loss = float('inf')
    best_accuracy = 0.0

    log_file = os.path.join(log_dir, dataset_name, "test_log.txt")
    with open(log_file, "r") as f:
        content = f.read().strip()
        if content:
            loss, accuracy = content.split(',')
            loss = float(loss.split(':')[1].strip())
            accuracy = float(accuracy.split(':')[1].strip())
            
            print(f"Dataset: {dataset_name}")
            print(f"Test Loss: {loss:.4f}, Test Accuracy: {accuracy:.4f}")
            
            best_loss = loss
            best_accuracy = accuracy
            best_epoch = 10  # 假设Best epoch为10,你可以根据需要修改
        else:
            print(f"Empty log file for {dataset_name} dataset.")
        
        print("---")

    if best_epoch is not None:
        checkpoint_file = os.path.join(checkpoint_dir, dataset_name, f"checkpoint_epoch_{best_epoch}.pth")
        if os.path.exists(checkpoint_file):
            model = ResNet18().to(device)
            model.load_state_dict(torch.load(checkpoint_file))
            model.eval()
            
            error_rates = [1e-4, 1e-5, 1e-6, 1e-7, 1e-8, 1e-9]
            
            # 创建进程池
            pool = mp.Pool(processes=mp.cpu_count())
            
            # 并行处理每个错误率
            pool.starmap(process_error_rate, [(model, dataset_name, err_r) for err_r in error_rates])
            
            # 关闭进程池
            pool.close()
            pool.join()
            
            print(f"Best Model for {dataset_name} Dataset (Epoch {best_epoch}):")
            print(f"Test Loss: {best_loss:.4f}, Test Accuracy: {best_accuracy:.4f}")
        else:
            print(f"Checkpoint file not found for {dataset_name} dataset.")
    else:
        print(f"No valid model found for {dataset_name} dataset.")
    
    print("===")

if __name__ == '__main__':
    datasets = ["mnist"]
    
    # 创建进程池
    pool = mp.Pool(processes=mp.cpu_count())
    
    # 并行处理每个数据集
    pool.map(process_dataset, datasets)
    
    # 关闭进程池
    pool.close()
    pool.join()

## 新ECC验证机制


1. 奇偶校验位的计算：
   代码中的 `ecc_encode` 函数使用了8个校验位。每个校验位是通过对数据位的特定子集进行异或（XOR）运算得到的。这实际上是一种并行的奇偶校验。

2. 校验位的使用：
   每个校验位负责检查数据位中的特定位置。这种方法允许不仅检测错误，还能定位错误。

3. 错误检测：
   在 `ecc_decode` 函数中，通过比较存储的校验位和重新计算的校验位来生成syndrome。如果syndrome不为零，就表示检测到了错误。

4. 错误纠正：
   如果检测到错误，代码会尝试通过syndrome的值来定位错误位置。这允许纠正单比特错误。

5. 多位错误处理：
   虽然这个方法主要用于纠正单比特错误，但它也能检测（但不能纠正）双比特错误

In [None]:
import random
import numpy as np
import torch
import torch.nn as nn
from torchvision import transforms
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader
import os
import sys
import multiprocessing as mp
import torchvision.models as models

class ResNet18(nn.Module):
    def __init__(self, num_classes=10):
        super(ResNet18, self).__init__()
        # 加载预Train 的 ResNet-18 模型
        self.model = models.resnet18(pretrained=True)
        
        # 替换最后的全连接层，以匹配目标分类数量
        num_ftrs = self.model.fc.in_features
        self.model.fc = nn.Linear(num_ftrs, num_classes)

    def forward(self, x):
        return self.model(x)


# 初始化设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")
# ECC编码函数
def ecc_encode(data):
    assert len(data) == 64  # 确保输入数据长度为64位
    parity = [0] * 8  # 初始化8位校验位
    for i in range(64):
        for j in range(8):
            if (i & (1 << j)) != 0:  # 检查第i位是否应该参与第j个校验位的计算
                parity[j] ^= int(data[i])  # 异或运算计算校验位
    return data + ''.join(map(str, parity))  # 返回数据和校验位的组合

# ECC解码和纠错函数
def ecc_decode(encoded_data):
    data = encoded_data[:64]  # 提取原始数据
    stored_parity = encoded_data[64:]  # 提取存储的校验位
    
    recalculated_parity = ecc_encode(data)[64:]  # 重新计算校验位
    
    # 计算syndrome（错误指示器）
    syndrome = ''.join([str(int(a) ^ int(b)) for a, b in zip(stored_parity, recalculated_parity)])
    
    if syndrome == '00000000':
        return data  # 无错误，直接返回数据
    
    error_position = int(syndrome, 2) - 1  # 将syndrome转换为错误位置
    if 0 <= error_position < 64:
        corrected_data = list(data)
        corrected_data[error_position] = str(1 - int(data[error_position]))  # 翻转错误位
        print(f"检测到单比特错误，已纠正。错误位置: {error_position}")
        return ''.join(corrected_data)
    
    print("检测到双比特错误，无法纠正。")
    return data  # 无法纠正，返回原始数据

def random_err_resnet_parallel(model, dataset_name, r=1e-4):
    print(f"\n开始处理数据集: {dataset_name}, 错误率: {r:.1e}")
    
    # 根据数据集选择适当的数据变换
    if dataset_name in ["mnist"]:
        transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,)),
            transforms.Lambda(lambda x: x.repeat(3, 1, 1))  # 将单通道图像转换为3通道
        ])
    else:
        transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        ])

    # 加载数据集
    data_path = "./data"
    if dataset_name == "mnist":
        test_dataset = MNIST(root=data_path, train=False, download=True, transform=transform)
    test_loader = DataLoader(test_dataset, batch_size=128)
    print(f"数据加载完成，Test 集大小: {len(test_dataset)}")

    def process_param(name, param):
        print(f"处理参数: {name}, 形状: {param.shape}")
        # 将参数转换为位字符串
        param_bits = ''.join(format(b, '08b') for b in param.cpu().numpy().tobytes())
        
        # 对参数进行ECC编码
        encoded_params = [ecc_encode(param_bits[i:i+64]) for i in range(0, len(param_bits), 64)]
        print(f"ECC编码完成，编码后参数数量: {len(encoded_params)}")
        
        # 模拟随机错误
        error_count = 0
        for i in range(len(encoded_params)):
            for j in range(len(encoded_params[i])):
                if random.random() < r:
                    encoded_params[i] = encoded_params[i][:j] + str(1 - int(encoded_params[i][j])) + encoded_params[i][j+1:]
                    error_count += 1
        print(f"模拟错误完成，总计引入错误: {error_count}")
        
        # ECC解码和纠错
        decoded_params = ''.join(ecc_decode(p) for p in encoded_params)
        print("ECC解码和纠错完成")
        
        # 将解码后的参数重新加载到模型中
        param_np = np.frombuffer(bytes(int(decoded_params[i:i+8], 2) for i in range(0, len(decoded_params), 8)), dtype=np.float32)
        param.data = torch.from_numpy(param_np).view(param.size()).to(device)
        print(f"参数重新加载到模型完成")

    print("开始处理模型参数...")
    with torch.no_grad():
        # 创建进程池
        pool = mp.Pool(processes=mp.cpu_count())
        
        # 并行处理每个参数
        pool.starmap(process_param, model.named_parameters())
        
        # 关闭进程池
        pool.close()
        pool.join()

    print("开始模型评估...")
    model.eval()
    criterion = nn.CrossEntropyLoss()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(test_loader):
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
            if batch_idx % 10 == 0:
                print(f"已评估 {batch_idx * len(data)} / {len(test_loader.dataset)} 个样本")

    test_loss /= len(test_loader)
    accuracy = correct / len(test_loader.dataset)
    
    print(f"评估完成。数据集: {dataset_name}, 错误率: {r:.1e}")
    print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {accuracy:.4f}")
    print("---")

def process_error_rate(model, dataset_name, err_r):
    random_err_resnet_parallel(model, dataset_name, err_r)

def process_dataset(dataset_name):
    checkpoint_dir = "/media/tangshi/AI0011/笔记/信工所/ECC校验/checkpoint/resnet18"
    log_dir = "/media/tangshi/AI0011/笔记/信工所/ECC校验/log/resnet18"

    best_epoch = None
    best_loss = float('inf')
    best_accuracy = 0.0

    log_file = os.path.join(log_dir, dataset_name, "test_log.txt")
    with open(log_file, "r") as f:
        content = f.read().strip()
        if content:
            loss, accuracy = content.split(',')
            loss = float(loss.split(':')[1].strip())
            accuracy = float(accuracy.split(':')[1].strip())
            
            print(f"Dataset: {dataset_name}")
            print(f"Test Loss: {loss:.4f}, Test Accuracy: {accuracy:.4f}")
            
            best_loss = loss
            best_accuracy = accuracy
            best_epoch = 10  # 假设Best epoch为10,你可以根据需要修改
        else:
            print(f"Empty log file for {dataset_name} dataset.")
        
        print("---")

    if best_epoch is not None:
        checkpoint_file = os.path.join(checkpoint_dir, dataset_name, f"checkpoint_epoch_{best_epoch}.pth")
        if os.path.exists(checkpoint_file):
            model = ResNet18().to(device)
            model.load_state_dict(torch.load(checkpoint_file))
            model.eval()
            
            error_rates = [1e-4, 1e-5, 1e-6, 1e-7, 1e-8, 1e-9]
            
            # 创建进程池
            pool = mp.Pool(processes=mp.cpu_count())
            
            # 并行处理每个错误率
            pool.starmap(process_error_rate, [(model, dataset_name, err_r) for err_r in error_rates])
            
            # 关闭进程池
            pool.close()
            pool.join()
            
            print(f"Best Model for {dataset_name} Dataset (Epoch {best_epoch}):")
            print(f"Test Loss: {best_loss:.4f}, Test Accuracy: {best_accuracy:.4f}")
        else:
            print(f"Checkpoint file not found for {dataset_name} dataset.")
    else:
        print(f"No valid model found for {dataset_name} dataset.")
    
    print("===")

if __name__ == '__main__':
    datasets = ["mnist"]
    
    # 创建进程池
    pool = mp.Pool(processes=mp.cpu_count())
    
    # 并行处理每个数据集
    pool.map(process_dataset, datasets)
    
    # 关闭进程池
    pool.close()
    pool.join()

# ECCECCECCEECECCCEEEC

In [15]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.models import resnet18
from torch.utils.data import DataLoader

# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 数据预处理
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

# 加载MNIST数据集
train_dataset = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# 定义模型
model = resnet18(pretrained=False)
model.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
model.fc = nn.Linear(512, 10)
model = model.to(device)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练函数
def train(model, train_loader, criterion, optimizer, device):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 100 == 0:
            print(f'Train Batch: {batch_idx}/{len(train_loader)} Loss: {loss.item():.6f}')

# 测试函数
def test(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
    accuracy = 100 * correct / total
    print(f'Test Accuracy: {accuracy:.2f}%')
    return accuracy

# 训练模型
epochs = 1
for epoch in range(epochs):
    print(f'Epoch {epoch+1}/{epochs}')
    train(model, train_loader, criterion, optimizer, device)
    test(model, test_loader, device)

# 保存原始模型
torch.save(model.state_dict(), 'original_model.pth')

Epoch 1/1
Train Batch: 0/938 Loss: 2.340935
Train Batch: 100/938 Loss: 0.034722
Train Batch: 200/938 Loss: 0.249568
Train Batch: 300/938 Loss: 0.154008
Train Batch: 400/938 Loss: 0.037314
Train Batch: 500/938 Loss: 0.008300
Train Batch: 600/938 Loss: 0.020768
Train Batch: 700/938 Loss: 0.038694
Train Batch: 800/938 Loss: 0.110554
Train Batch: 900/938 Loss: 0.123319
Test Accuracy: 98.78%


In [1]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from torchvision.models import resnet18
from torch.utils.data import DataLoader
import numpy as np
# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 数据预处理
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

# 加载MNIST测试数据集
test_dataset = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

def ERR(q_tensor, ERR_TYPE, ERR_RATE):
    # 确保张量在 GPU 上
    if not q_tensor.is_cuda:
        q_tensor = q_tensor.cuda()

    # 将张量转换为字节表示
    np_arr = q_tensor.cpu().numpy()
    byte_arr = np.frombuffer(np_arr.data, dtype=np.uint8)
    byte_tensor = torch.from_numpy(byte_arr).cuda()

    # 创建错误掩码
    error_mask = torch.rand(byte_tensor.shape, device=byte_tensor.device) < ERR_RATE

    if ERR_TYPE == "0":
        # 在第一位（符号位）注入错误
        flip_mask = error_mask & (byte_tensor & 0b10000000 != 0)
        byte_tensor = byte_tensor ^ (flip_mask.to(torch.uint8) << 7)
    elif ERR_TYPE == "1":
        # 在 2、3、4 位注入错误
        for i in range(5, 7):
            flip_mask = error_mask & ((byte_tensor & (1 << i)) != 0)
            byte_tensor = byte_tensor ^ (flip_mask.to(torch.uint8) << i)
    elif ERR_TYPE == "2":
        # 在 5、6、7、8 位注入错误
        for i in range(4):
            flip_mask = error_mask & ((byte_tensor & (1 << i)) != 0)
            byte_tensor = byte_tensor ^ (flip_mask.to(torch.uint8) << i)
    else:
        raise ValueError("Invalid ERR_TYPE. Must be '0', '1', or '2'.")

    # 将字节张量转回原始数据类型
    byte_arr_with_error = byte_tensor.cpu().numpy()
    np_arr_with_error = np.frombuffer(byte_arr_with_error.data, dtype=np_arr.dtype)
    q_tensor_with_error = torch.from_numpy(np_arr_with_error).cuda().view(q_tensor.shape)

    return q_tensor_with_error

# 修改 quantize 函数以使用新的 ERR 函数
def quantize(tensor, ERR_TYPE, ERR_RATE, num_bits=8):
    qmin = 0.
    qmax = 2.**num_bits - 1.
    scale = (tensor.max() - tensor.min()) / (qmax - qmin)
    zero_point = qmin - tensor.min() / scale
    q_tensor = torch.round(tensor / scale + zero_point)
    q_tensor.clamp_(qmin, qmax)
    q_tensor = ERR(q_tensor.to(torch.uint8), ERR_TYPE, ERR_RATE)  # 确保输入是 uint8
    return q_tensor.to(tensor.dtype), scale, zero_point  # 转回原始数据类型

# 反量化函数
def dequantize(q_tensor, scale, zero_point):
    return scale * (q_tensor - zero_point)

# 测试函数
def test(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
    accuracy = 100 * correct / total
    print(f'Test Accuracy: {accuracy:.2f}%')
    return accuracy

# 加载原始模型
original_model = resnet18(pretrained=False)
original_model.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
original_model.fc = nn.Linear(512, 10)
original_model.load_state_dict(torch.load('original_model.pth'))
original_model = original_model.to(device)

print("Original Model Performance:")
original_accuracy = test(original_model, test_loader, device)

# 量化模型并评估
for ERR_TYPE in ["0", "1", "2"]:
    # for ERR_RATE in [1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6, 1e-7, 1e-8, 1e-9]:
    for ERR_RATE in [1e-1, 1e-2, 1e-3, 1e-4]:
        print(f"ERR_TYPE: {ERR_TYPE}, ERR_RATE: {ERR_RATE}")
        
        quantized_model = resnet18(pretrained=False)
        quantized_model.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        quantized_model.fc = nn.Linear(512, 10)
        quantized_model.load_state_dict(torch.load('original_model.pth'))
        quantized_model = quantized_model.to(device)
        
        for name, param in quantized_model.named_parameters():
            q_param, scale, zero_point = quantize(param.data, ERR_TYPE, ERR_RATE)
            param.data = dequantize(q_param, scale, zero_point)

        print("Quantized Model Performance:")
        quantized_accuracy = test(quantized_model, test_loader, device)

        print(f"Accuracy difference: {abs(original_accuracy - quantized_accuracy):.2f}%")
        print()



Original Model Performance:
Test Accuracy: 98.78%
ERR_TYPE: 0, ERR_RATE: 0.1
Quantized Model Performance:
Test Accuracy: 9.82%
Accuracy difference: 88.96%

ERR_TYPE: 0, ERR_RATE: 0.01
Quantized Model Performance:
Test Accuracy: 21.26%
Accuracy difference: 77.52%

ERR_TYPE: 0, ERR_RATE: 0.001
Quantized Model Performance:
Test Accuracy: 98.87%
Accuracy difference: 0.09%

ERR_TYPE: 0, ERR_RATE: 0.0001
Quantized Model Performance:
Test Accuracy: 98.80%
Accuracy difference: 0.02%

ERR_TYPE: 0, ERR_RATE: 1e-05
Quantized Model Performance:
Test Accuracy: 98.79%
Accuracy difference: 0.01%

ERR_TYPE: 0, ERR_RATE: 1e-06
Quantized Model Performance:
Test Accuracy: 98.79%
Accuracy difference: 0.01%

ERR_TYPE: 0, ERR_RATE: 1e-07
Quantized Model Performance:
Test Accuracy: 98.79%
Accuracy difference: 0.01%

ERR_TYPE: 0, ERR_RATE: 1e-08
Quantized Model Performance:
Test Accuracy: 98.79%
Accuracy difference: 0.01%

ERR_TYPE: 0, ERR_RATE: 1e-09
Quantized Model Performance:
Test Accuracy: 98.79%
Accurac