In [None]:
import os
GPU_ID = "3"

In [None]:
from tensorflow import keras
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import importlib
from tensorflow.keras.datasets import cifar100
from tensorflow.keras import backend as K
from tensorflow import keras as keras
import json
import time
import datetime
from pathlib import Path
from tqdm.notebook import tqdm
import sys
import gc

In [None]:
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"   # see issue #152
os.environ["CUDA_VISIBLE_DEVICES"]=GPU_ID

<h3> Basic optuna usage </h3>

In [None]:
import optuna
optuna.logging.set_verbosity(optuna.logging.WARNING)

In [None]:
def objective(trial):
    x = trial.suggest_float("x", -10, 10)
    return (x - 2) ** 2

In [None]:
study = optuna.create_study()
study.optimize(objective, n_trials=100)

In [None]:
best_params = study.best_params
found_x = best_params["x"]
print("Found x: {}, (x - 2)^2: {}".format(found_x, (found_x - 2) ** 2))

In [None]:
study.best_value

<h3> Loading CIFAR100 data and model</h3>

In [None]:
CIFAR100_VGG_PATH = "/home/ailie/Repos/BBAttacks/models/cifar100vgg/"

import sys
sys.path.append(CIFAR100_VGG_PATH)
import cifar100vgg

In [None]:
import importlib

In [None]:
importlib.reload(cifar100vgg)
WEIGHTS_FILE_NAME = "cifar100vgg.h5"
model = cifar100vgg.cifar100vgg(train=False, weights_path=CIFAR100_VGG_PATH + WEIGHTS_FILE_NAME)

In [None]:
NUM_CLASSES = 100
LOAD_DATA = True
SAVE_DATA = False

In [None]:
if LOAD_DATA:
    (x_train, y_train), (x_test, y_test) = cifar100.load_data()

    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')

    y_train = keras.utils.to_categorical(y_train, NUM_CLASSES)
    y_test = keras.utils.to_categorical(y_test, NUM_CLASSES)
    
    if SAVE_DATA:
        np.save("x_test.npy", x_test)
        np.save("y_test.npy", y_test)

In [None]:
x_train = x_train.astype('int')
x_test = x_test.astype('int')

In [None]:
preds_train = model.predict(x_train)
    
preds_train_labels = np.argmax(preds_train, axis=1)

In [None]:
true_train_labels = np.argmax(y_train, axis=1)

In [None]:
acc_train = np.mean(true_train_labels == preds_train_labels)
print(f"Training accuracy: {acc_train}")

In [None]:
preds_test = model.predict(x_test)

preds_test_labels = np.argmax(preds_test, axis=1)

In [None]:
true_test_labels = np.argmax(y_test, axis=1)

In [None]:
acc_test = np.mean(true_test_labels == preds_test_labels)
print(f"Test accuracy: {acc_test}")

In [None]:
x_test_correct = x_test[preds_test_labels == true_test_labels]
y_test_correct = y_test[preds_test_labels == true_test_labels]

In [None]:
true_test_labels_correct = np.argmax(y_test_correct, axis=1)

In [None]:
plt.imshow(x_test_correct[0])

<h3> Try to use Optuna to generate adversarial grid attacks </h3>

In [None]:
sys.path.append("/home/ailie/Repos/BBAttacks/attacks/")
sys.path.append("/home/ailie/Repos/BBAttacks/utils/")

import utils

In [None]:
pixel_groups = utils.get_grid_pixel_groups(patch_size=4, image_size=np.shape(x_test_correct[0])[0])

In [None]:
def apply_pixel_group_perturbation(image, pixel_group, delta):
    new_image = image.copy()
    for pixel in pixel_group:
        for ch in range(3):
            new_image[pixel[0]][pixel[1]][ch] += delta
            new_image[pixel[0]][pixel[1]][ch] = utils.cap(new_image[pixel[0]][pixel[1]][ch], 0, 255)
    
    return new_image

In [None]:
def restore_study_image(study, image, pixel_groups):
    deltas_map = study.best_params
    altered_image = image.copy()
    
    for x in deltas_map:
        delta = deltas_map[x]
        group_idx = int(x[1:])
        pixel_group = pixel_groups[group_idx]
        altered_image = apply_pixel_group_perturbation(altered_image, pixel_group, delta)
        
    return altered_image

In [None]:
def run_optuna_attack(model, image, true_label, pixel_groups, DELTA_RANGE=20, N_TRIALS=200, early_stop=False):
    QUERY_COUNT = 0
    PERTURBED = False
    STOP = False
    BEST_IMPROVEMENT = 0
    
    original_image = image.copy()    
    init_prob = model.predict(np.array([image]))[0][true_label]
    
    def objective(trial):
        nonlocal QUERY_COUNT
        nonlocal PERTURBED
        nonlocal STOP
        nonlocal BEST_IMPROVEMENT
        
        altered_image = original_image.copy()

        for group_idx in range(len(pixel_groups)):
            pixel_group = pixel_groups[group_idx]
            delta = trial.suggest_int(f"x{group_idx}", -DELTA_RANGE, DELTA_RANGE)
            altered_image = apply_pixel_group_perturbation(altered_image, pixel_group, delta)

        perturbed_output = model.predict(np.array([altered_image]))
        QUERY_COUNT += 1
        
        if (np.argmax(perturbed_output[0], axis=0) != true_label) or PERTURBED:
            PERTURBED = True
            trial.study.stop()
        
        
        if early_stop:
            improvement = (init_prob - perturbed_output[0][true_label]) * 100
            if improvement > BEST_IMPROVEMENT:
                BEST_IMPROVEMENT = improvement
                
            if BEST_IMPROVEMENT < 1 and QUERY_COUNT >= N_TRIALS/4:
                STOP = True
                trial.study.stop()

            if BEST_IMPROVEMENT < 5 and QUERY_COUNT >= N_TRIALS/2:
                STOP = True
                trial.study.stop()

            if STOP:
                trial.study.stop()
        
        return perturbed_output[0][true_label]
    
    study = optuna.create_study()
    study.optimize(objective, n_trials=N_TRIALS)
    
    if STOP:
        print("EARLY STOPPED")
        
    perturbed_image = restore_study_image(study, image, pixel_groups)
    init_prob = model.predict(np.array([image]))[0][true_label]

    perturbed_label = np.argmax(model.predict(np.array([perturbed_image]))[0])
    
    return {
        "original_image": original_image,
        "true_label": true_label,
        "perturbed_image": perturbed_image, 
        "perturbed_label": perturbed_label, 
        "query_count": QUERY_COUNT
    }

In [None]:
import math

def get_optuna_attack_stats(optuna_attack_result):
    is_perturbed = not(optuna_attack_result["true_label"] == optuna_attack_result["perturbed_label"])
    
    original_image = optuna_attack_result["original_image"]
    perturbed_image = optuna_attack_result["perturbed_image"]

    perturbation = original_image - perturbed_image

    l2_distance = math.sqrt(np.sum(perturbation**2)) / 255
    # linf_distance = np.max(np.abs(perturbation)) / 255
    # TODO: uncomment above and delete below for normalized Linf, keeping it now like this for debugging
    linf_distance = np.max(np.abs(perturbation))
    l1_distance = np.sum(np.abs(perturbation)) / 255
    query_count = optuna_attack_result["query_count"]
    return {
        "is_perturbed": is_perturbed,
        "l1_distance": l1_distance,
        "l2_distance": l2_distance,
        "linf_distance": linf_distance,
        "query_count": query_count
    }

In [None]:
def run_adaptive_optuna_attack(model, image, true_label, pixel_groups, delta_range_list, num_trials_list, early_stop=False):
    # TODO: add early stopping. if e.g > half iterations and <1% decrease in prob, then stop
    num_queries = 0
    for (delta_range, n_trials) in list(zip(delta_range_list, num_trials_list)):        
        optuna_attack_result = run_optuna_attack(
            model=model, 
            image=x_test_correct[image_index], 
            true_label=true_test_labels_correct[image_index], 
            pixel_groups=pixel_groups,
            DELTA_RANGE=delta_range,
            N_TRIALS=n_trials,
            early_stop=early_stop
        )
        
        current_stats = get_optuna_attack_stats(optuna_attack_result)
        num_queries += current_stats["query_count"]
        
        if current_stats["is_perturbed"]:
            break
    
    final_stats = current_stats.copy()
    final_stats["query_count"] = num_queries
    return final_stats

In [None]:
plt.figure(figsize=(3,3))
plt.imshow(x_test_correct[5])
print(np.argmax(y_test_correct[5]))

In [None]:
image_index = 5
adaptive_optuna_stats = run_adaptive_optuna_attack(
    model=model, 
    image=x_test_correct[image_index], 
    true_label=true_test_labels_correct[image_index], 
    pixel_groups=pixel_groups, 
    delta_range_list=[30], 
    num_trials_list=[120],
    early_stop=True
)
print(adaptive_optuna_stats)

In [None]:
for trial in range(20):
    for image_index in range(5,6):
        adaptive_optuna_stats = run_adaptive_optuna_attack(
            model=model, 
            image=x_test_correct[image_index], 
            true_label=true_test_labels_correct[image_index], 
            pixel_groups=pixel_groups, 
            delta_range_list=[30], 
            num_trials_list=[1200]
        )
        print(adaptive_optuna_stats)

In [None]:
perturbed_count = 0
total_l1_dist = 0
total_l2_dist = 0
total_linf_dist = 0
total_query_count = 0
for image_index in tqdm(range(100)):
    adaptive_optuna_stats = run_adaptive_optuna_attack(
        model=model, 
        image=x_test_correct[image_index], 
        true_label=true_test_labels_correct[image_index], 
        pixel_groups=pixel_groups, 
        delta_range_list=[10, 20, 30], 
        num_trials_list=[200, 400, 800]
    )
    print(adaptive_optuna_stats)
    try:
        if(adaptive_optuna_stats["is_perturbed"]):
            perturbed_count += 1
            total_l1_dist += adaptive_optuna_stats['l1_distance']
            total_l2_dist += adaptive_optuna_stats['l2_distance']
            total_linf_dist += adaptive_optuna_stats['linf_distance']
            total_query_count += adaptive_optuna_stats['query_count']
    except Exception as e:
        print(e)

In [None]:
perturbed_count

In [None]:
total_l2_dist/perturbed_count / (32*32*3)

In [None]:
adaptive_optuna_stats

In [None]:
total_query_count/perturbed_count

In [None]:
np.shape(x_test[0])

In [None]:
optuna_stats = {}
for image_index in tqdm(range(30)):
    optuna_attack_result = run_optuna_attack(
        model=model, 
        image=x_test_correct[image_index], 
        true_label=true_test_labels_correct[image_index], 
        pixel_groups=pixel_groups,
        DELTA_RANGE=12,
        N_TRIALS=20000
    )

    optuna_stats[image_index] = get_optuna_attack_stats(optuna_attack_result)
    print(optuna_stats[image_index])

In [None]:
fig = plt.figure(figsize=(5,2))

ax1 = fig.add_subplot(1,2,1)
ax1.set_title("Initial prob:{:.2f}".format(init_prob))
ax1.imshow(x_test_correct[IMG_IDX])

ax2 = fig.add_subplot(1,2,2)
ax2.set_title("Perturbed prob:{:.2f}".format(study.best_value))
ax2.imshow(best_img)
print(f"queries: {QUERY_COUNT}")
print(f"L_inf: {Linf_dist}")

In [None]:
# TODO: implement dynamic strategy, where Linf changes adaptively with the number of queries