## 시작하기 전에

CIFAR-10 pretrained model 받아오기
1. 이 파일이 존재하는 디렉토리에 git bash를 열고 `git clone https://github.com/WaiNaat/PyTorch_CIFAR10.git` 실행


몇 가지 오류를 수정한 PytorchFI 라이브러리 받아오기
1. 이 파일이 존재하는 디렉토리에 git bash를 열고 `git clone https://github.com/WaiNaat/pytorchfi.git` 실행

CIFAR-10 pretrained weight 받아오기

1. https://github.com/huyvnphan/PyTorch_CIFAR10 중간의 구글 드라이브 링크에서 zip 파일을 다운 (약 1기가)
2. 압축 해제 후 `state_dicts` 폴더를 `./PyTorch_CIFAR10/cifar10_models` 내부로 옮기기

In [None]:
import torch
import torchvision
import random
import copy
import numpy as np

from torchvision import transforms
from tqdm import tqdm
from bitstring import BitArray

In [None]:
import pytorchfi
from pytorchfi.core import FaultInjection
from pytorchfi.neuron_error_models import random_neuron_location
from pytorchfi.weight_error_models import random_weight_location

from PyTorch_CIFAR10.cifar10_models.vgg import vgg11_bn, vgg13_bn, vgg16_bn, vgg19_bn
from PyTorch_CIFAR10.cifar10_models.resnet import resnet18, resnet34, resnet50
from PyTorch_CIFAR10.cifar10_models.densenet import densenet121, densenet161, densenet169
from PyTorch_CIFAR10.cifar10_models.mobilenetv2 import mobilenet_v2
from PyTorch_CIFAR10.cifar10_models.googlenet import googlenet
from PyTorch_CIFAR10.cifar10_models.inception import inception_v3

## 설정

---

`model_name`, `model`: 위 셀의 `PyTorch_CIFAR10.cifar10_models` 에서 `import` 한 것들 중 하나      
`layer_type`: `['all']` 또는 `torch.nn.Modules`를 상속하는 클래스명으로 구성된 iterable   
`layer_nums`: `['all']` 또는 0 이상의 정수로 구성된 배열    
`corrupt_input_images`: `True`로 설정 시 model inference 진행 전, 입력 이미지 자체에도 single bit flip 적용


In [None]:
# 실험 환경 설정
experiment_id = 3
model_name = "vgg11_bn"
model = vgg11_bn()
save_dir = model_name + '_' + str(experiment_id)

seed = 12345678

batch_size = 256
img_size = 32
channels = 3

use_gpu = torch.cuda.is_available()

corrupt_input_images = True
save_detailed_results = True

custom_bit_flip_pos = None
layer_type = ['all']
layer_nums = ['all']

In [None]:
random.seed(seed)
torch.manual_seed(seed)

## Classes

### add_input_layer

Identity layer를 맨 앞에 추가해서 input image 자체에 fault injection을 할 수 있도록 함

In [None]:
class add_input_layer(torch.nn.Module):
    
    def __init__(self, model, *args):
        super().__init__(*args)
        self.input_layer = torch.nn.Identity()
        self.model = model

    def forward(self, x):
        input = self.input_layer(x)
        output = self.model(input)
        return output

### custom_single_bit_flip

`_single_bit_flip`: IEEE-754 standard를 따르는 부동소수점 값을 `bitstring.BitArray` 라이브러리를 이용해서 single bit flip 수행

`reset_log`: 만약 `save_log_list=True`로 설정할 경우 `declare_neuron_fault_injection` 과 inference 사이에 반드시 실행시켜야 함.

`neuron_single_bit_flip`: `declare_neuron_fault_injection`의 `function`인자로 넘기는 함수.

`weight_single_bit_flip`: `declare_weight_fault_injection`의 `function`인자로 넘기는 함수.

In [None]:
class custom_single_bit_flip(FaultInjection):
    def __init__(self, model, batch_size, flip_bit_pos=None, save_log_list=False, **kwargs):
        super().__init__(model, batch_size, **kwargs)
        self.flip_bit_pos = flip_bit_pos
        self.save_log_list = save_log_list

        self.log_original_value = []
        self.log_original_value_bin = []
        self.log_error_value = []
        self.log_error_value_bin = []
        self.log_bit_pos = []

    def reset_log(self):
        self.log_original_value = []
        self.log_original_value_bin = []
        self.log_error_value = []
        self.log_error_value_bin = []
        self.log_bit_pos = []

    def _single_bit_flip(self, orig_value, bit_pos):
        # data type 설정
        save_type = orig_value.dtype
        orig_value = orig_value.cpu().item()
        length = None
        if save_type == torch.float32:
            length = 32
        elif save_type == torch.float64:
            length = 64
        else:
            raise AssertionError(f'Unsupported Data Type: {save_type}')

        # single bit flip
        orig_arr = BitArray(float = orig_value, length = length)
        error = list(map(int, orig_arr.bin))
        error[bit_pos] = (error[bit_pos] + 1) % 2
        error = ''.join(map(str, error))
        error = BitArray(bin=error)
        new_value = error.float

        if self.save_log_list:
            self.log_original_value.append(orig_value)
            self.log_original_value_bin.append(orig_arr.bin)
            self.log_error_value.append(new_value)
            self.log_error_value_bin.append(error.bin)
            self.log_bit_pos.append(bit_pos)

        return torch.tensor(new_value, dtype=save_type)

    # structure from pytorchfi/neuron_error_models/single_bit_flip_func/single_bit_flip_signed_across_batch
    def neuron_single_bit_flip(self, module, input_val, output):
        corrupt_conv_set = self.corrupt_layer
        
        bits = output.dtype
        if bits == torch.float32:
            bits = 32
        elif bits == torch.float64:
            bits = 64
        else:
            raise AssertionError(f'Unsupported data type {bits}')
            
        if type(corrupt_conv_set) is list:
            inj_list = list(
                filter(
                    lambda x: corrupt_conv_set[x] == self.current_layer,
                    range(len(corrupt_conv_set)),
                )
            )
            for i in inj_list:
                self.assert_injection_bounds(index=i)
                prev_value = output[self.corrupt_batch[i]][self.corrupt_dim[0][i]][
                    self.corrupt_dim[1][i]
                ][self.corrupt_dim[2][i]]

                rand_bit = random.randint(0, bits - 1) if self.flip_bit_pos is None else self.flip_bit_pos

                new_value = self._single_bit_flip(prev_value, rand_bit)

                output[self.corrupt_batch[i]][self.corrupt_dim[0][i]][
                    self.corrupt_dim[1][i]
                ][self.corrupt_dim[2][i]] = new_value

        else:
            if self.current_layer == corrupt_conv_set:
                prev_value = output[self.corrupt_batch][self.corrupt_dim[0]][
                    self.corrupt_dim[1]
                ][self.corrupt_dim[2]]

                rand_bit = random.randint(0, bits - 1)

                new_value = self._single_bit_flip(prev_value, rand_bit)

                output[self.corrupt_batch][self.corrupt_dim[0]][self.corrupt_dim[1]][
                    self.corrupt_dim[2]
                ] = new_value     

        self.update_layer()
        if self.current_layer >= len(self.output_size):
            self.reset_current_layer()

    def weight_single_bit_flip(self, weight, corrupt_idx):
        bits = weight.dtype
        if bits == torch.float32:
            bits = 32
        elif bits == torch.float64:
            bits = 64
        else:
            raise AssertionError(f'Unsupported data type {bits}')

        rand_bit = random.randint(0, bits - 1) if self.flip_bit_pos is None else self.flip_bit_pos
        orig_value = weight[(corrupt_idx)].item()
        error_value = self._single_bit_flip(weight[(corrupt_idx)], rand_bit)

        return error_value

## 모델 불러오기

In [None]:
# 모델 설정
path = f"./PyTorch_CIFAR10/cifar10_models/state_dicts/{model_name}.pt"
model.load_state_dict(torch.load(path))

if corrupt_input_images:
    model = add_input_layer(model)

if use_gpu: model.to(device='cuda')

#print(model)

## 데이터 전처리

In [None]:
# Transform statics from https://github.com/huyvnphan/PyTorch_CIFAR10/blob/master/data.py
transform = transforms.Compose(
    [
        transforms.ToTensor(),
        transforms.Normalize([0.4914, 0.4822, 0.4465], (0.2471, 0.2435, 0.2616))
    ]
)

data = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
dataset = torch.utils.data.DataLoader(data, batch_size=batch_size, shuffle=False, num_workers=0, drop_last=True)

## Main

In [None]:
# single bit flip을 일으킬 모델 만들기
base_fi_model = custom_single_bit_flip(
    model = copy.deepcopy(model),
    batch_size = batch_size, 
    input_shape = [channels, img_size, img_size], 
    use_gpu = use_gpu,
    layer_types = layer_type,
    flip_bit_pos = custom_bit_flip_pos,
    save_log_list = save_detailed_results
)

#print(base_fi_model.print_pytorchfi_layer_summary())

In [None]:
# single bit flip을 수행할 layer 번호 정리
if 'all' in layer_nums:
    layer_nums = range(base_fi_model.get_total_layers())
else:
    layer_nums.sort()
    while layer_nums and layer_nums[-1] >= base_fi_model.get_total_layers():
        layer_nums.pop()

In [None]:
# 실험 진행
results = []
error_logs = []

for layer_num in tqdm(layer_nums):
    
    orig_correct_cnt = 0
    orig_corrupt_diff_cnt = 0
    batch_idx = -1
    
    for images, labels in dataset:

        batch_idx += 1

        if use_gpu:
            images = images.to(device='cuda')

        # 원본에 inference 진행
        model.eval()
        with torch.no_grad():
            orig_output = model(images)

        # single bit flip 위치 지정
        layer_num_list = []
        dim1 = []
        dim2 = []
        dim3 = []

        for _ in range(batch_size):
            layer, C, H, W = random_neuron_location(base_fi_model, layer=layer_num)

            layer_num_list.append(layer)
            dim1.append(C)
            dim2.append(H)
            dim3.append(W)

        # corrupted model 만들기
        base_fi_model.reset_log()
        corrupted_model = base_fi_model.declare_neuron_fault_injection(
            batch = [i for i in range(batch_size)],
            layer_num = layer_num_list,
            dim1 = dim1,
            dim2 = dim2,
            dim3 = dim3,
            function = base_fi_model.neuron_single_bit_flip
        )

        # corrupted model에 inference 진행
        corrupted_model.eval()
        with torch.no_grad():
            corrupted_output = corrupted_model(images)

        # 결과 정리
        original_output = torch.argmax(orig_output, dim=1).cpu().numpy()
        corrupted_output = torch.argmax(corrupted_output, dim=1).cpu().numpy()
        labels = labels.numpy()

        # 결과 비교: 원본이 정답을 맞춘 경우 중 망가진 모델이 틀린 경우를 셈
        for i in range(batch_size):

            if labels[i] == original_output[i]:
                orig_correct_cnt += 1

                if original_output[i] != corrupted_output[i]:
                    orig_corrupt_diff_cnt += 1

                    if save_detailed_results:
                        log = [
                            f'Layer: {layer_num}',
                            f'Batch: {batch_idx}',
                            f'Position: ({i}, {dim1[i]}, {dim2[i]}, {dim3[i]})',
                            f'Original value:  {base_fi_model.log_original_value[i]}',
                            f'Original binary: {base_fi_model.log_original_value_bin[i]}',
                            f'Flip bit: {base_fi_model.log_bit_pos[i]}',
                            f'Error value:     {base_fi_model.log_error_value[i]}',
                            f'Error binary:    {base_fi_model.log_error_value_bin[i]}',
                            f'Label:        {labels[i]}',
                            f'Model output: {corrupted_output[i]}',
                            '\n'
                        ]

                        error_logs.append('\n'.join(log))

    # 결과 저장
    result = f'Layer #{layer_num}: {orig_corrupt_diff_cnt} / {orig_correct_cnt} = {orig_corrupt_diff_cnt / orig_correct_cnt * 100:.4f}%, ' + str(base_fi_model.layers_type[layer_num]).split(".")[-1].split("'")[0]
    #print(result)
    results.append(result)

In [None]:
for result in results:
    print(result)

## 결과 파일 저장

In [None]:
f = open(save_dir + '.txt', 'w')

f.write(base_fi_model.print_pytorchfi_layer_summary())
f.write(f'\n\n===== Result =====\nSeed: {seed}\n')
for result in results:
    f.write(result + '\n')

f.close()

In [None]:
if save_detailed_results:
    f = open(save_dir + '_detailed.txt', 'w')

    for error_log in error_logs:
        f.write(error_log + '\n')

    f.close()