# Fault Injection on Quantized Neural Networks

This notebook tests fault injection on quantized neural networks.

The different precision neural networks inspired at VGG-16 feature 6 convolutional layers, 3 max pool layers and 3 fully connected layers. The networks classify the CIFAR-10 testset. The 2 different precision networks tested here are:

- CNVW1A1 using 1 bit weights and 1 bit activation,
- CNVW2A2 using 2 bit weights and 2 bit activation

## 1. Import the packages

In [None]:
import bnn
import os
from copy import deepcopy
from json import dumps
from yapf.yapflib.yapf_api import FormatCode
import matplotlib.pyplot as plt
from pynq import Xlnk

## 2. The Cifar-10 testset

This notebook required the testset from https://www.cs.toronto.edu/~kriz/cifar.html which contains 10000 images that can be processed by CNV network directly without preprocessing.

You can download the cifar-10 set from given url via wget and unzip it to a folder on Pynq as shown below.
This may take a while as the training set is included in the archive as well.
After that we need to read the labels from the binary file to be able to compare the results later:

In [None]:
if not os.path.exists("/home/xilinx/jupyter_notebooks/bnn/cifar-10-binary.tar.gz"):
    #get
    !wget https://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz

if not os.path.exists("/home/xilinx/jupyter_notebooks/bnn/cifar-10-batches-bin/"):
    #unzip
    !tar -xf cifar-10-binary.tar.gz

if not os.path.exists("/home/xilinx/jupyter_notebooks/bnn/cifar-10-batches-bin/test_batch_1000.bin"):
    with open("/home/xilinx/jupyter_notebooks/bnn/cifar-10-batches-bin/test_batch.bin", "rb") as infile:
        with open("/home/xilinx/jupyter_notebooks/bnn/cifar-10-batches-bin/test_batch_1000.bin", "wb+") as outfile:
            for i in range(1000):
                outfile.write(infile.read(3073))

labels = []
input_file = "/home/xilinx/jupyter_notebooks/bnn/cifar-10-batches-bin/test_batch_1000.bin"
with open(input_file, "rb") as file:
    #for 10000 pictures
    for i in range(1000):
        #read first byte -> label
        labels.append(int.from_bytes(file.read(1), byteorder="big"))
        #read image (3072 bytes) and do nothing with it
        file.read(3072)
    file.close()

print("Read", len(labels), "labels")

## 3. Start inference

The inference can be performed with different precision for weights and activation. Creating a specific Classifier will automatically download the correct bitstream onto PL and load the weights and thresholds trained on the specific dataset. 

In [None]:
xlnk = Xlnk()

num_runs = 100
output_folder = "/home/xilinx/jupyter_notebooks/bnn/{}flips/cnv/"

### Utility Functions

In [None]:
def calculate_accuracy(results, labels):
    countRight = 0
    for idx in range(len(labels)):
        if labels[idx] == results[idx]:
            countRight += 1
    return countRight*100/len(labels)


#Merge dictionaries that contain sub-dictionaries
def dict_of_dicts_merge(*dicts):
    out = {}
    for item in dicts:
        overlapping_keys = out.keys() & item.keys()
        for key in overlapping_keys:
            if (isinstance(out[key], dict) and isinstance(item[key], dict)):
                out[key] = dict_of_dicts_merge(out[key], item[key])
        for key in item.keys() - overlapping_keys:
            out[key] = deepcopy(item[key])
    return out


#Make a dictionary for storing the results, times, or accuracies from a test
def make_results_dict(size, targeting_bits_and_words, targeting_weights_and_activations):
    if targeting_bits_and_words and targeting_weights_and_activations:
        return {
            "bit": {
                "weight": [ None for i in range(size) ],
                "activation": [ None for i in range(size) ],
            },
            "word": {
                "weight": [ None for i in range(size) ],
                "activation": [ None for i in range(numsize_runs) ],
            },
        }
    elif targeting_bits_and_words:
        return {
            "bit": [ None for i in range(size) ],
            "word": [ None for i in range(size) ],
        }
    elif targeting_weights_and_activations:
        return {
            "weight": [ None for i in range(size) ],
            "activation": [ None for i in range(numsize_runs) ],
        }


#Output dict contains the combined raw results from a test. Used as input to calculate_stats
def make_output_dict(network_name, num_runs, num_flips, accuracy_control, accuracies_dict):
    out = {}
    out["network"] = network_name
    out["run count"] = num_runs
    out["flips"] = num_flips
    out["control"] = accuracy_control
    out["results"] = {}
    for key, val in accuracies_dict.items():
        out["results"][key] = val
    return out


#Calculates various stats given the results of a test (formatted with make_output_dict)
def calculate_stats(output_dict):
    out = output_dict.copy()
    for key, value in output_dict["results"].items():
        out["results"][key] = {}
        out["results"][key]["runs"] = {}
        out["results"][key]["runs"]["all"] = value
        out["results"][key]["runs"]["effective"] = list(filter(lambda x: x != out["control"], value))
        out["results"][key]["effective count"] = len(out["results"][key]["runs"]["effective"])
        if out["results"][key]["effective count"] != 0:
            out["results"][key]["avg accuracy"] = sum(value) / len(value)
            out["results"][key]["avg effective accuracy"] = sum(out["results"][key]["runs"]["effective"]) / out["results"][key]["effective count"]
            out["results"][key]["effective accuracy delta"] = out["control"] - out["results"][key]["avg effective accuracy"]
        out["results"][key]["min accuracy"] = min(value)
        out["results"][key]["max accuracy"] = max(value)
    return out


def dict_to_str(dict):
    dict_string = dumps(dict)
    formatted_code, _ = FormatCode(dict_string)
    return formatted_code


def write_stats_file(file_name, stats_dict):
    os.makedirs(os.path.dirname(file_name), exist_ok=True)
    f = open(file_name, "w+")
    f.write(dict_to_str(stats_dict))
    f.close()

### Fault Test Function

In [None]:
def cnv_cifar_fault_test(title, network, input_file, num_runs, num_flips, flip_word, weight_or_activation, target_layers = []):
    results    = [ None for i in range(num_runs) ]
    times      = [ None for i in range(num_runs) ]
    accuracies = [ None for i in range(num_runs) ]

    for i in range(num_runs):
        hw_classifier = bnn.CnvClassifier(network, 'cifar10', bnn.RUNTIME_HW)
        print("{} run {} of {} (flipping {} {}(s) at random times)".format(title, i+1, num_runs, num_flips, "word" if flip_word else "bit"))

        results[i]    = hw_classifier.classify_cifars_with_faults(input_file, num_flips, flip_word, weight_or_activation, target_layers)
        times[i]      = hw_classifier.usecPerImage
        accuracies[i] = calculate_accuracy(results[i], labels)

        print("Accuracy:", accuracies[i])

        xlnk.xlnk_reset()
        print()
    
    return (results, times, accuracies)

### W1A1 - 1 bit weight and 1 activation
Run the control test for cnvW1A1

In [None]:
#Control run
accuracy_W1A1_control = 0
result_W1A1_control = None
time_W1A1_control = 0

#Control test
hw_classifier = bnn.CnvClassifier(bnn.NETWORK_CNVW1A1, 'cifar10', bnn.RUNTIME_HW)

print("Control classification")
result_W1A1_control   = hw_classifier.classify_cifars(input_file)
time_W1A1_control     = hw_classifier.usecPerImage
accuracy_W1A1_control = calculate_accuracy(result_W1A1_control, labels)
print("Accuracy:", accuracy_W1A1_control)

#Reset the device
xlnk.xlnk_reset()
print()

### W1A1 Test Function
For each combination of weight/activation and SEU/MBU, the test will be repeated __num_runs__ times and __num_flips__ faults will be injected. The results will be written to files located in __output_folder__.

In [None]:
def test_W1A1(num_runs, num_flips, output_folder):
    #Separate output dicts for each run. They will be combined later for the
    #final results, and the redundant data will be discarded.
    weight_bit_output = {}
    activation_bit_output = {}
    weight_word_output = {}
    activation_word_output = {}
    stats = {}
    
    
    #Test bit flips (weight)
    weight_bit_data   = cnv_cifar_fault_test("Weight Bit", bnn.NETWORK_CNVW1A1, input_file, num_runs, num_flips, False, 0)
    weight_bit_output = make_output_dict("cnvW1A1", num_runs, num_flips, accuracy_W1A1_control, {"weight bit": weight_bit_data[2]})

    write_stats_file(output_folder.format(num_flips) + "temp/cnvW1A1_results_bit_weight.json", weight_bit_output)


    #Test bit flips (activation)
    activation_bit_data   = cnv_cifar_fault_test("Activation Bit", bnn.NETWORK_CNVW1A1, input_file, num_runs, num_flips, False, 1)
    activation_bit_output = make_output_dict("cnvW1A1", num_runs, num_flips, accuracy_W1A1_control, {"activation bit": activation_bit_data[2]})

    write_stats_file(output_folder.format(num_flips) + "temp/cnvW1A1_results_bit_activation.json", activation_bit_output)


    #Test word flips (weight)
    weight_word_data   = cnv_cifar_fault_test("Weight Word", bnn.NETWORK_CNVW1A1, input_file, num_runs, num_flips, True, 0)
    weight_word_output = make_output_dict("cnvW1A1", num_runs, num_flips, accuracy_W1A1_control, {"weight word": weight_word_data[2]})

    write_stats_file(output_folder.format(num_flips) + "temp/cnvW1A1_results_word_weight.json", weight_word_output)


    #Test word flips (activation)
    activation_word_data   = cnv_cifar_fault_test("Activation Word", bnn.NETWORK_CNVW1A1, input_file, num_runs, num_flips, True, 1)
    activation_word_output = make_output_dict("cnvW1A1", num_runs, num_flips, accuracy_W1A1_control, {"activation word": activation_word_data[2]})

    write_stats_file(output_folder.format(num_flips) + "temp/cnvW1A1_results_word_activation.json", activation_word_output)

    
    #Write all stats to file
    stats = dict_of_dicts_merge(weight_bit_output, activation_bit_output, weight_word_output, activation_word_output)
    stats = calculate_stats(stats)
    write_stats_file(output_folder.format(num_flips) + "cnvW1A1_stats.json", stats)

In [None]:
def test_W1A1_layers(num_runs, num_flips, target_layers, output_folder):
    bit_output = {}
    word_output = {}
    stats = {}

    #Test bit flips
    bit_test_results = cnv_cifar_fault_test("Bit{}".format(target_layers), bnn.NETWORK_CNVW1A1, input_file, num_runs, num_flips, False, -1, target_layers)
    bit_output       = make_output_dict("cnvW1A1", num_runs, num_flips, accuracy_W1A1_control, {"bit": bit_test_results[2]})

    #Test word flips
    word_test_results = cnv_cifar_fault_test("Word{}".format(target_layers), bnn.NETWORK_CNVW1A1, input_file, num_runs, num_flips, True, -1, target_layers)
    word_output       = make_output_dict("cnvW1A1", num_runs, num_flips, accuracy_W1A1_control, {"word": word_test_results[2]})
    
    #Write all stats to file
    stats = dict_of_dicts_merge(bit_output, word_output)
    stats = calculate_stats(stats)
    write_stats_file(output_folder.format(num_flips) + "layers/cnvW1A1_stats_layer{}.json".format(target_layers), stats)

### W2A2 - 2 bit weight and 2 activation
Run the control test for cnvW2A2

In [None]:
#Control run
accuracy_W2A2_control = 0
result_W2A2_control = None
time_W2A2_control = 0

#Control test
hw_classifier = bnn.CnvClassifier(bnn.NETWORK_CNVW2A2,'cifar10',bnn.RUNTIME_HW)
print("Control classification")
result_W2A2_control = hw_classifier.classify_cifars(input_file)
time_W2A2_control = hw_classifier.usecPerImage
accuracy_W2A2_control = calculate_accuracy(result_W2A2_control, labels)
print("Accuracy:", accuracy_W2A2_control)

#Reset the device
xlnk.xlnk_reset()
print()

### W2A2 Test Functions
For each combination of weight/activation and SEU/MBU, the test will be repeated __num_runs__ times and __num_flips__ faults will be injected. The results will be written to files located in __output_folder__.

In [None]:
def test_W2A2(num_runs, num_flips, output_folder):
    weight_bit_output = {}
    activation_bit_output = {}
    weight_word_output = {}
    activation_word_output = {}
    stats = {}

    #Test bit flips (weight)
    weight_bit_data   = cnv_cifar_fault_test("Weight Bit", bnn.NETWORK_CNVW2A2, input_file, num_runs, num_flips, False, 0)
    weight_bit_output = make_output_dict("cnvW2A2", num_runs, num_flips, accuracy_W2A2_control, {"weight bit": weight_bit_data[2]})

    write_stats_file(output_folder.format(num_flips) + "temp/cnvW2A2_results_bit_weight.json", weight_bit_output)


    #Test bit flips (activation)
    activation_bit_data   = cnv_cifar_fault_test("Activation Bit", bnn.NETWORK_CNVW2A2, input_file, num_runs, num_flips, False, 1)
    activation_bit_output = make_output_dict("cnvW2A2", num_runs, num_flips, accuracy_W2A2_control, {"activation bit": activation_bit_data[2]})

    write_stats_file(output_folder.format(num_flips) + "temp/cnvW2A2_results_bit_activation.json", activation_bit_output)


    #Test word flips (weight)
    weight_word_data   = cnv_cifar_fault_test("Weight Word", bnn.NETWORK_CNVW2A2, input_file, num_runs, num_flips, True, 0)
    weight_word_output = make_output_dict("cnvW2A2", num_runs, num_flips, accuracy_W2A2_control, {"weight word": weight_word_data[2]})

    write_stats_file(output_folder.format(num_flips) + "temp/cnvW2A2_results_word_weight.json", weight_word_output)


    #Test word flips (activation)
    activation_word_data   = cnv_cifar_fault_test("Activation Word", bnn.NETWORK_CNVW2A2, input_file, num_runs, num_flips, True, 1)
    activation_word_output = make_output_dict("cnvW2A2", num_runs, num_flips, accuracy_W2A2_control, {"activation word": activation_word_data[2]})

    write_stats_file(output_folder.format(num_flips) + "temp/cnvW2A2_results_word_activation.json", activation_word_output)

    
    #Write all stats to file
    stats = dict_of_dicts_merge(weight_bit_output, activation_bit_output, weight_word_output, activation_word_output)
    stats = calculate_stats(stats)
    write_stats_file(output_folder.format(num_flips) + "cnvW2A2_stats.json", stats)

In [None]:
def test_W2A2_layers(num_runs, num_flips, target_layers, output_folder):
    bit_output = {}
    word_output = {}
    stats = {}

    #Test bit flips
    bit_test_results = cnv_cifar_fault_test("Bit{}".format(target_layers), bnn.NETWORK_CNVW2A2, input_file, num_runs, num_flips, False, -1, target_layers)
    bit_output       = make_output_dict("cnvW2A2", num_runs, num_flips, accuracy_W2A2_control, {"bit": bit_test_results[2]})

    #Test word flips
    word_test_results = cnv_cifar_fault_test("Word{}".format(target_layers), bnn.NETWORK_CNVW2A2, input_file, num_runs, num_flips, True, -1, target_layers)
    word_output       = make_output_dict("cnvW2A2", num_runs, num_flips, accuracy_W2A2_control, {"word": word_test_results[2]})
    
    #Write all stats to file
    stats = dict_of_dicts_merge(bit_output, word_output)
    stats = calculate_stats(stats)
    write_stats_file(output_folder.format(num_flips) + "layers/cnvW2A2_stats_layer{}.json".format(target_layers), stats)

## 4. Run the tests

In [None]:
test_W1A1(1, 1, output_folder)
test_W2A2(1, 1, output_folder)

test_W1A1(num_runs, 2, output_folder)
test_W2A2(num_runs, 2, output_folder)

test_W1A1(num_runs, 5, output_folder)
test_W2A2(num_runs, 5, output_folder)

test_W1A1(num_runs, 10, output_folder)
test_W2A2(num_runs, 10, output_folder)

test_W1A1(num_runs, 20, output_folder)
test_W2A2(num_runs, 20, output_folder)

test_W1A1(num_runs, 50, output_folder)
test_W2A2(num_runs, 50, output_folder)

test_W1A1(num_runs, 100, output_folder)
test_W2A2(num_runs, 100, output_folder)

for i in range(9):
    test_W1A1_layers(1, 5, [i], output_folder)
    test_W2A2_layers(1, 5, [i], output_folder)

    test_W1A1_layers(num_runs, 10, [i], output_folder)
    test_W2A2_layers(num_runs, 10, [i], output_folder)

    test_W1A1_layers(num_runs, 50, [i], output_folder)
    test_W2A2_layers(num_runs, 50, [i], output_folder)

    test_W1A1_layers(num_runs, 100, [i], output_folder)
    test_W2A2_layers(num_runs, 100, [i], output_folder)