# CIFAR-10 Differential Attack

## Configuration

In [1]:
import os

import numpy as np
import tensorflow as tf

In [2]:
FILE_PATH = os.getcwd()
MODEL_PATH = os.path.join(FILE_PATH, "../models/my_vgg.h5")
os.environ["TF_FORCE_GPU_ALLOW_GROWTH"] = "true"

## Dataset preparation

We work with categorical (binary class matrix) instead of class vectors (integers).

In [3]:
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.utils import to_categorical

def __prepare_datasets():
    (x_train, y_train), (x_test, y_test) = cifar10.load_data()

    y_train = to_categorical(y_train)
    y_test = to_categorical(y_test)
    return (x_train, y_train), (x_test[:500], y_test[:500])

In [4]:
data_train, data_test = __prepare_datasets()
x_train, y_train = data_train
x_test, y_test = data_test
print(f"x_train.shape = {x_train.shape} y_train.shape = {y_train.shape}")
print(f"x_test.shape = {x_test.shape} y_test.shape = {y_test.shape}")

x_train.shape = (50000, 32, 32, 3) y_train.shape = (50000, 10)
x_test.shape = (500, 32, 32, 3) y_test.shape = (500, 10)


## Model preparation

We use our own VGG model.

In [5]:
from integration_tests.models.my_vgg import my_vgg

def __prepare_model(data_train, data_test):
    if os.path.exists(MODEL_PATH):
        print("---Using Existing Model---")
        model: tf.keras.Model = tf.keras.models.load_model(MODEL_PATH)
    else:
        print("---Training Model---")
        print(f"GPU IS AVAILABLE: {tf.config.list_physical_devices('GPU')}")
        model: tf.keras.Model = my_vgg()
        model.fit(
            *data_train,
            epochs=100,
            batch_size=64,
            validation_data=data_test,
        )
        model.save(MODEL_PATH)

    model.summary()
    return model

In [6]:
model = __prepare_model(data_train, data_test)

---Using Existing Model---
Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 32, 32, 32)        896       
_________________________________________________________________
dropout (Dropout)            (None, 32, 32, 32)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 32, 32, 32)        9248      
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 16, 16, 32)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 16, 16, 64)        18496     
_________________________________________________________________
dropout_1 (Dropout)          (None, 16, 16, 64)        0         
_________________________________________________________________
conv2d_3 (Conv2D)            

## Prior Prediction

In [7]:
y_preds = model.predict(x_test)
y_preds.shape

(500, 10)

## Main Algorithm

Original algorithm:

```txt
CornerSearch:
-  One pixel attack : Choose N pixels with differential evolution
-  For each pixel (x, y, r, g, b), create 8 images (x,y,0,0,0), (x,y,255,0,0), (x,y,0,255,0)... (x,y,255,255,255)
-  Test the 8 images and save the pertubations where M(x_fake) != y_true

Assuming Y_fake, a group of prediction 
For each pair (image, label) x_test, y_true in X (group of images) and Y (group of true prediction):
-  Create a fragile pixel pool P of x based on the algorithm above
-  For each pixel p=(x, y, r, g, b) in the fragile pixel pool P:
   -  x_fake = InjectFault(p, x_test) (or InjectFaultByBit(p.x, p.y, msb_index, channel, bit_flip/set/reset))
   -  M(x_fake) = y_fake, and we store y_fake in a temporary group called Y_temp
   -  If one of Y_temp has y_fake != y_true:
      -  Y_fake append y_fake
      Else:
      -  Y_fake append y_pred (normal prediction W/O FI)
- Measure the accuracy of Y_fake using mean
```

We do a little modification :

```txt
CornerSearch:
-  One pixel attack : Choose N pixels with differential evolution
-  For each pixel (x, y, r, g, b), create 8 images (x,y,0,0,0), (x,y,255,0,0), (x,y,0,255,0)... (x,y,255,255,255)
-  Test the 8 images and save the pertubations where M(x_fake) != y_true

Assuming Y_fake, a group of prediction 
For each pair (image, label) x_test, y_true in X (group of images) and Y (group of true prediction):
-  If already missclassified:
   -  Y_fake append y_pred (normal prediction W/O FI)
-  Create a fragile pixel pool P of x based on the algorithm above
-  For each pixel p=(x, y, r, g, b) in the fragile pixel pool P:
   -  x_fake = InjectFault(p, x_test) (or InjectFaultByBit(p.x, p.y, msb_index, channel, bit_flip/set/reset))
   -  M(x_fake) = y_fake, and we store y_fake in a temporary group called Y_temp
   -  If one of Y_temp has y_fake != y_true:
      -  Y_fake append y_fake
      Else:
      -  Y_fake append y_pred (normal prediction W/O FI)
- Measure the accuracy of Y_fake using mean
```

```python
CORNERS = (
    (0, 0, 0),
    (255, 255, 255),
    (0, 0, 255),
    (0, 255, 0),
    (0, 255, 255),
    (255, 0, 0),
    (255, 0, 255),
    (255, 255, 0),
)


def corner_search(
    image_id: int,
    pixels: np.ndarray,
    data_test: np.ndarray,
    model: tf.keras.Model,
) -> Iterable[Tuple[np.ndarray, np.ndarray, PixelFault]]:
    x_test, y_test = data_test

    y_true = y_test[image_id]
    y_true_index = np.argmax(y_true)

    for pixel in pixels:
        corner_pixels = [PixelFault(pixel.x, pixel.y, r, g, b) for r, g, b in CORNERS]

        x_fakes = np.array(
            [
                build_perturb_image([corner_pixel])(x_test[image_id])
                for corner_pixel in corner_pixels
            ]
        )
        y_preds = model.predict(x_fakes)

        for x_fake, y_pred, corner_pixel in zip(
            x_fakes,
            y_preds,
            corner_pixels,
        ):
            y_pred_index = np.argmax(y_pred)
            if y_true_index != y_pred_index:
                yield x_fake, y_pred, corner_pixel

```

In [8]:
from inputtensorfi.attacks.utils import attack
from inputtensorfi.manipulation.img.faults import PixelFault

def _look_for_pixels(
    image_id: int,
    data_test: np.ndarray,
    model: tf.keras.Model,
    pixel_count=1,
):
    x_test, y_test = data_test
    x = x_test[image_id]
    y_true = y_test[image_id]
    y_true_index = np.argmax(y_true)
    pixels = attack(
        x,
        y_true_index,
        model,
        pixel_count=pixel_count,
        maxiter=10,
        verbose=False,
    ).astype(np.uint8)

    # Convert [x_0, y_0, r_0, g_0, b_0, x_1, ...]
    # to [pixel_fault_0, pixel_fault_1, ...]
    return np.array([PixelFault(*pixels[0:5]) for i in range(len(pixels) // 5)])

In [9]:
from typing import Dict

from inputtensorfi.attacks.corner_search import corner_search

length = len(y_preds)
y_fake = y_preds.copy()
total_faults: Dict[int, PixelFault] = dict()
for image_id, _ in enumerate(y_test):
    if np.argmax(y_preds[image_id]) != np.argmax(y_test[image_id]):
        print(f"MISPREDICTED {image_id}/{length}")
        continue
    pixels = _look_for_pixels(image_id, data_test, model, pixel_count=10)

    try:
        first_pred = next(corner_search(image_id, pixels, data_test, model))
        _, y_pred, pixel = first_pred
        y_fake[image_id] = y_pred
        total_faults[image_id] = pixel
        print(
            f"FAULT {image_id}/{length}, {pixel}, original={data_test[0][image_id, pixel.x, pixel.y]}"
        )
    except StopIteration:
        # print(f"NO FAULT image_id={image_id}")
        pass

FAULT 3/500, PixelFault(x=15, y=9, r=0, g=255, b=0), original=[84 83 88]
FAULT 7/500, PixelFault(x=15, y=8, r=0, g=0, b=0), original=[158 151 132]
MISPREDICTED 15/500
FAULT 22/500, PixelFault(x=17, y=12, r=255, g=255, b=255), original=[36 42 65]
MISPREDICTED 24/500
FAULT 26/500, PixelFault(x=24, y=17, r=255, g=255, b=255), original=[44 28 23]
MISPREDICTED 32/500
FAULT 33/500, PixelFault(x=19, y=4, r=0, g=0, b=0), original=[255 255 252]
MISPREDICTED 35/500
MISPREDICTED 37/500
FAULT 42/500, PixelFault(x=22, y=23, r=255, g=255, b=255), original=[7 7 7]
FAULT 49/500, PixelFault(x=14, y=9, r=0, g=0, b=255), original=[85 79 15]
MISPREDICTED 52/500
FAULT 57/500, PixelFault(x=27, y=22, r=255, g=255, b=255), original=[0 1 3]
MISPREDICTED 58/500
MISPREDICTED 59/500
MISPREDICTED 61/500
MISPREDICTED 63/500
FAULT 65/500, PixelFault(x=4, y=12, r=0, g=255, b=255), original=[ 67 113  65]
FAULT 70/500, PixelFault(x=25, y=12, r=0, g=0, b=0), original=[180 169 150]
MISPREDICTED 78/500
MISPREDICTED 81/500

In [12]:
import json

dict_data = {key: fault.to_dict() for key, fault in total_faults.items()}
print(f"total_faults={json.dumps(dict_data, indent=2)}")

total_faults={
  "3": {
    "x": 15,
    "y": 9,
    "r": 0,
    "g": 255,
    "b": 0
  },
  "7": {
    "x": 15,
    "y": 8,
    "r": 0,
    "g": 0,
    "b": 0
  },
  "22": {
    "x": 17,
    "y": 12,
    "r": 255,
    "g": 255,
    "b": 255
  },
  "26": {
    "x": 24,
    "y": 17,
    "r": 255,
    "g": 255,
    "b": 255
  },
  "33": {
    "x": 19,
    "y": 4,
    "r": 0,
    "g": 0,
    "b": 0
  },
  "42": {
    "x": 22,
    "y": 23,
    "r": 255,
    "g": 255,
    "b": 255
  },
  "49": {
    "x": 14,
    "y": 9,
    "r": 0,
    "g": 0,
    "b": 255
  },
  "57": {
    "x": 27,
    "y": 22,
    "r": 255,
    "g": 255,
    "b": 255
  },
  "65": {
    "x": 4,
    "y": 12,
    "r": 0,
    "g": 255,
    "b": 255
  },
  "70": {
    "x": 25,
    "y": 12,
    "r": 0,
    "g": 0,
    "b": 0
  },
  "87": {
    "x": 17,
    "y": 16,
    "r": 0,
    "g": 0,
    "b": 0
  },
  "97": {
    "x": 19,
    "y": 11,
    "r": 0,
    "g": 255,
    "b": 0
  },
  "143": {
    "x": 16,
    "y": 19,
    "r": 

## Accuracies

In [11]:
y_true_acc = np.array([np.max(y) for y in y_test])
y_preds_acc = np.array([y[np.argmax(y_true)] for y, y_true in zip(y_preds, y_test)])
y_fake_acc = np.array([y[np.argmax(y_true)] for y, y_true in zip(y_fake, y_test)])
print(f"y_true_acc={np.mean(y_true_acc)}")
print(f"y_prior_acc={np.mean(y_preds_acc)}")
print(f"y_fake_acc={np.mean(y_fake_acc)}")

y_true_acc=1.0
y_prior_acc=0.7911568284034729
y_fake_acc=0.7529138326644897
