In [None]:
!git clone https://github.com/WaiNaat/PyTorch_CIFAR10.git
!git clone https://github.com/WaiNaat/pytorchfi.git
!pip install bitstring

fatal: destination path 'PyTorch_CIFAR10' already exists and is not an empty directory.
fatal: destination path 'pytorchfi' already exists and is not an empty directory.
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import torch
import torchvision
import random
import copy
import numpy as np
import logging
from bitstring import BitArray

from torchvision import transforms
from tqdm import tqdm

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pytorchfi
from pytorchfi.core import FaultInjection
import pytorchfi.weight_error_models as weight_error_models
from pytorchfi.util import random_value

from PyTorch_CIFAR10.cifar10_models.vgg import vgg11_bn, vgg13_bn, vgg16_bn, vgg19_bn
from PyTorch_CIFAR10.cifar10_models.resnet import resnet18, resnet34, resnet50
from PyTorch_CIFAR10.cifar10_models.densenet import densenet121, densenet161, densenet169
from PyTorch_CIFAR10.cifar10_models.mobilenetv2 import mobilenet_v2
from PyTorch_CIFAR10.cifar10_models.googlenet import googlenet
from PyTorch_CIFAR10.cifar10_models.inception import inception_v3

In [None]:
# 실험 환경 설정
model_name = "resnet18"
model = resnet18()
save_dir = 'resnet18'

seed = 12345678

batch_size = 1024
img_size = 32
channels = 3

use_gpu = torch.cuda.is_available()

corrupt_input_images = True
save_detailed_results = True

custom_bit_flip_pos = None
layer_type = ['all']
layer_nums = ['all']

torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed) # if use multi-GPU
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(seed)
random.seed(seed)

In [None]:
class add_input_layer(torch.nn.Module):

    def __init__(self, model, *args):
        super().__init__(*args)
        self.input_layer = torch.nn.Identity()
        self.model = model

    def forward(self, x):
        input = self.input_layer(x)
        output = self.model(input)
        return output

In [None]:
# 모델 설정
path = f"/content/drive/My Drive/소종/2학기/state_dicts/{model_name}.pt"
model.load_state_dict(torch.load(path))

if corrupt_input_images:
    model = add_input_layer(model)

if use_gpu: model.to(device='cuda')

# print(model)

In [None]:
# Normalization statics from https://github.com/huyvnphan/PyTorch_CIFAR10/blob/master/data.py
transform = transforms.Compose(
    [
        transforms.ToTensor(),
        transforms.Normalize([0.4914, 0.4822, 0.4465], (0.2471, 0.2435, 0.2616))
    ]
)

data = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
dataset = torch.utils.data.DataLoader(data, batch_size=batch_size, shuffle=False, num_workers=0, drop_last=True)

Files already downloaded and verified


In [None]:
# single bit flip을 일으킬 모델 만들기
base_fi_model = FaultInjection(
    model = copy.deepcopy(model),
    batch_size = batch_size, 
    input_shape = [channels, img_size, img_size], 
    use_gpu = use_gpu,
    layer_types = layer_type,
    flip_bit_pos = custom_bit_flip_pos,
    save_log_list = save_detailed_results
)
# print(base_fi_model.print_pytorchfi_layer_summary())

In [None]:
# single bit flip을 수행할 layer 번호 정리
if 'all' in layer_nums:
    layer_nums = range(base_fi_model.get_total_layers())
else:
    layer_nums.sort()
    while layer_nums and layer_nums[-1] >= base_fi_model.get_total_layers():
        layer_nums.pop()

In [None]:
con_out_array = []
def _Weight_single_bit_flip(weight, fault_position):
    global con_out_array
    bits = weight[fault_position].dtype
    if bits == torch.float32:
        bits = 32
    elif bits == torch.float64:
        bits = 64
    else:
        print(f'Unsupported data type {bits}')
        raise AssertionError(f'Unsupported data type {bits}')
    bit_pos = random.randint(0, bits - 1) # if self.flip_bit_pos is None else self.flip_bit_pos
    # single bit flip
    orig_arr = BitArray(float = weight[fault_position].item(), length = bits)
    error = list(map(int, orig_arr.bin))
    error[bit_pos] = (error[bit_pos] + 1) % 2
    error = ''.join(map(str, error))
    error = BitArray(bin=error)
    new_value = error.float
    log = [
        f'Layer: {layer_num}',
        f'Position: ({k}, {C}, {H}, {W})',
        f'Original value:  {weight[fault_position].item()}',
        f'Original binary: {orig_arr.bin}',
        f'Flip bit: {bit_pos}',
        f'Error value:     {error.float}',
        f'Error binary:    {error.bin}',
        # f'Model output: {corrupted_output}',
        '\n'
    ]
    con_out_array.append('\n'.join(log))
    return new_value
def random_weight_location_mod(pfi, layer: int = -1):
    if layer == -1:
        layer = random.randint(0, pfi.get_total_layers() - 1)

    dim = pfi.get_weights_dim(layer)
    shape = pfi.get_weights_size(layer)

    dim0_shape = shape[0]
    k = random.randint(0, dim0_shape - 1)
    if dim > 1:
        dim1_shape = shape[1]
        dim1_rand = random.randint(0, dim1_shape - 1)
    else:
        dim1_rand = None
    if dim > 2:
        dim2_shape = shape[2]
        dim2_rand = random.randint(0, dim2_shape - 1)
    else:
        dim2_rand = None
    if dim > 3:
        dim3_shape = shape[3]
        dim3_rand = random.randint(0, dim3_shape - 1)
    else:
        dim3_rand = None

    return ([layer], [k], [dim1_rand], [dim2_rand], [dim3_rand])

# 실험 진행
results = []

for layer_num in tqdm(layer_nums):
    con_out_array.append(f"Processing layer # {layer_num}")
    if type(base_fi_model.get_weights_size(layer_num)[0]) == str:
        con_out_array.append(f"Layer # {layer_num} has no weight")
        continue
    orig_correct_cnt = 0
    orig_corrupt_diff_cnt = 0
    for images, labels in dataset:
        if use_gpu:
            images = images.to(device='cuda')
        # 원본에 inference 진행
        model.eval()
        with torch.no_grad():
            orig_output = model(images)
        # Kernel 1개에 대한 single bit flip 위치 지정
        layer, k, C, H, W = random_weight_location_mod(base_fi_model, layer=layer_num)
        # corrupted model 만들기
        # base_fi_model.reset_log()
        corrupted_model = base_fi_model.declare_weight_fault_injection(
            function = _Weight_single_bit_flip,
            layer_num = [layer],
            k = [k],
            dim1 = [C],
            dim2 = [H],
            dim3 = [W]
        )
        
        # corrupted model에 inference 진행
        corrupted_model.eval()
        with torch.no_grad():
            corrupted_output = corrupted_model(images)
        # 결과 정리
        original_output = torch.argmax(orig_output, dim=1).cpu().numpy()
        corrupted_output = torch.argmax(corrupted_output, dim=1).cpu().numpy()
        # 결과 비교: 원본이 정답을 맞춘 경우 중 망가진 모델이 틀린 경우를 셈
        for i in range(batch_size):
            if labels[i] == original_output[i]:
                orig_correct_cnt += 1
                if original_output[i] != corrupted_output[i]:
                        orig_corrupt_diff_cnt += 1
    # 결과 저장
    result = f'Layer #{layer_num}: {orig_corrupt_diff_cnt} / {orig_correct_cnt} = {orig_corrupt_diff_cnt / orig_correct_cnt * 100:.4f}%'
    results.append(result)

 20%|█▉        | 12/61 [16:07<1:06:41, 81.67s/it]

In [None]:
for result in results:
    print(result)

In [None]:
f = open('/content/drive/MyDrive/' + model_name+ "_" + str(batch_size) + ".txt", 'w')

f.write(base_fi_model.print_pytorchfi_layer_summary())
f.write(f'\n\n===== Result =====\nSeed: {seed}\n')
for result in results:
    f.write(result + '\n')

f.close()
f2 = open('/content/drive/MyDrive/' + model_name+ "_" + str(batch_size) + "_ConsoleOut.txt", 'w')

# f2.write(base_fi_model.print_pytorchfi_layer_summary())
f2.write(f'\n\n===== Result =====\nSeed: {seed}\n')
for result in con_out_array:
    f2.write(result + '\n')

f2.close()