In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from keras.applications.vgg19 import VGG19
from keras.preprocessing import image
from keras.applications.vgg19 import preprocess_input
from keras.models import Model, Sequential
from keras.datasets import mnist, fashion_mnist, cifar100
from keras.layers import Input, Conv2D, Dense, MaxPooling2D, Flatten
from tensorflow.keras.optimizers import Adam
from skimage import color
import matplotlib.pyplot as plt
import cv2
import gc

## 利用PGD嵌入資訊

In [1]:
import tensorflow as tf

#利用pgd的idea修改圖片 讓它生出我想要的label
def pgd_attack_to_target(input_image, target_label, loss_object, model, epsilon, alpha, iterations):
    x_adv = input_image  # 複製一個圖片 (不要動到原圖片)
    for i in range(iterations):  # 做很多次
        x_adv = tf.convert_to_tensor(x_adv)  # 將圖片轉成tensor
        with tf.GradientTape() as tape:  #開始計算梯度
            tape.watch(x_adv)  # 要計算的圖片是x_adv
            prediction = model(x_adv)  #先做判斷(取得目前的結果)
            loss = loss_object(target_label, prediction)  # 計算loss(距離target有多遠)
        grad = tape.gradient(loss, x_adv)  # 計算梯度 (loss對x_adv的梯度，d_loss/d_x_adv)
        x_adv_new  = x_adv - alpha * tf.sign(grad)  # 更新圖片(做梯度下降會讓預測結果接近我想要的label)
        x_adv = tf.clip_by_value(x_adv_new , x_adv - epsilon, x_adv + epsilon)  # 限制圖片的範圍
        x_adv = tf.clip_by_value(x_adv, 0, 1)  # 限制圖片的範圍(圖片要在0~1之間)
        x_adv = x_adv.numpy()  # 將圖片轉成numpy
    return x_adv  # 回傳攻擊後的圖片

## 設定訓練資料集和訓練參數

In [None]:
DATA_LENGTH = 1000
BIT_COUNT = 16
iterations = 7  #攻擊次數
alpha = 0.0001  #攻擊程度
epsilon = 8.0 / 255  #攻擊範圍
EPOCH = 50 #訓練EPOCH
START_BIT = 256
END_BIT = 256

(cifar100_train, _), (cifar100_test, _) = cifar100.load_data()
cifar100_train = cifar100_train.astype('float32') / 255.0
cifar100_test = cifar100_test.astype('float32') / 255.0
cifar100_train = color.rgb2gray(cifar100_train)
cifar100_test = color.rgb2gray(cifar100_test)

cifar100_train = [cv2.resize(image,(28,28)) for image in cifar100_train[:2000]]
cifar100_train = np.concatenate([i[np.newaxis] for i in cifar100_train]).astype(np.float32)

cifar100_test = [cv2.resize(image,(28,28)) for image in cifar100_test[:2000]]
cifar100_test = np.concatenate([i[np.newaxis] for i in cifar100_test]).astype(np.float32)

cifar100_train = cifar100_train.reshape((-1,) + (28, 28, 1))
cifar100_test = cifar100_test.reshape((-1,) + (28, 28, 1))

## 設定資料擴增

In [None]:
from tensorflow.keras.layers import RandomFlip, RandomRotation, RandomZoom

data_augmentation = tf.keras.Sequential([
    RandomRotation(0.02, fill_mode = 'constant'),
    RandomZoom(.2, .2)
])

## 設定訓練模型

In [None]:
def plot_loss(losses):
    """
    @losses.keys():
        0: loss
        1: accuracy
    """
    dec_loss = losses["decoder"]

    plt.figure(figsize=(10, 8))
    plt.plot(dec_loss, label="decoder loss")
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

In [None]:
def plot_accuracy(accuracy):
    plt.figure(figsize=(10,8))
    plt.plot(accuracy["train_accuracy"], label="train_accuracy")
    # plt.plot(accuracy["val_accuracy"], label="val_accuracy")
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()

In [None]:
adam = Adam(learning_rate=0.0002, beta_1=0.5)

In [None]:
accuracy_list = {"test_adv":[], "test_adv_att":[]}
losses_list = {"test_adv":[], "test_adv_att":[]}
for BIT_COUNT in range(START_BIT,END_BIT+8,8):
    test_accuracy = []
    test_losses = []
    test_accuracy_att = []
    test_losses_att = []

    #建立網路
    inputs = Input((28, 28, 1))
    x = Conv2D(64, (3, 3), padding="same", activation='relu')(inputs)
    x = MaxPooling2D((2, 2))(x)
    x = Conv2D(32, (3, 3), padding="same", activation='relu')(x)
    x = MaxPooling2D((2, 2))(x)
    x = Conv2D(16, (3, 3), padding="same", activation='relu')(x)
    x = MaxPooling2D((2, 2))(x)
    x = Flatten()(x)
    x = Dense(512, activation='relu')(x)
    x = Dense(BIT_COUNT+1, activation="softmax")(x)
    decoder = Model(inputs=inputs, outputs=x)
    decoder.compile(optimizer=adam, loss='binary_crossentropy', metrics=['categorical_accuracy'])
    decoder.summary()

    # 產生user_code
    temp = np.array([ i for i in range(BIT_COUNT+1)])
    user_code = np.eye(BIT_COUNT+1)[temp]
    #print(user_code)  #生成onehot encoding

    losses = {"train_loss":[]}
    accuracy = {"train_accuracy":[]}

    cifar100_train_adv = [cifar100_train[:DATA_LENGTH] for _ in range(BIT_COUNT+1)]
    # 開始訓練
    for epoch in range(EPOCH):
      # 第一步 調整圖片 讓decoder能正確讀取出user_code
      for i in range(1, BIT_COUNT+1):
        print(i,end=' ')
        cifar100_train_adv[i] = pgd_attack_to_target(cifar100_train_adv[i], [user_code[i]] * DATA_LENGTH, tf.keras.losses.CategoricalCrossentropy(), decoder, epsilon, alpha, iterations)
      # # 第二步 data augmentation(旋轉，縮放....)
      # cifar100_train_adv_att = [data_augmentation(cifar100_train_adv[i]) for i in range(BIT_COUNT+1)]

      #第四步 訓練Decoder
      x_train = np.concatenate([cifar100_train_adv[i] for i in range(BIT_COUNT+1)])
      y_train = np.concatenate([[user_code[i]] * DATA_LENGTH for i in range(BIT_COUNT+1)])
      decoder_loss = decoder.fit(x_train, y_train, epochs=1)

      # 紀錄loss
      losses["train_loss"].append(decoder_loss.history["loss"])
      accuracy["train_accuracy"].append(decoder_loss.history["categorical_accuracy"])
      print("epoch:{}  train_loss:{}".format(epoch, decoder_loss.history["loss"]))

    plot_loss(losses)
    plot_accuracy(accuracy)

    #對test_data進行攻擊 & augmentation
    cifar100_test_adv = [cifar100_test[:DATA_LENGTH] for _ in range(BIT_COUNT+1)]
    for i in range(1, BIT_COUNT+1):
      cifar100_test_adv[i] = pgd_attack_to_target(cifar100_test_adv[i], [user_code[i]] * DATA_LENGTH, tf.keras.losses.CategoricalCrossentropy(), decoder, epsilon, alpha, 180)
    #測試test_adv
    for i in range(BIT_COUNT+1):
      result_temp = decoder.evaluate(cifar100_test_adv[i], np.array([user_code[i]] * DATA_LENGTH))
      test_losses.append(result_temp[0])
      test_accuracy.append(result_temp[1])

    print(tf.image.psnr(cifar100_test_adv[0][0], cifar100_test_adv[1][0], max_val=1.0))

    plt.subplot(1,2,1)
    plt.title('origin image')
    plt.imshow(cifar100_test_adv[0][0].reshape(28,28),cmap='gray')
    plt.subplot(1,2,2)
    plt.title('adv image')
    plt.imshow(cifar100_test_adv[1][0].reshape(28,28),cmap='gray')
    plt.show()

    accuracy_list["test_adv"].append(test_accuracy)
    losses_list["test_adv"].append(test_losses)
    accuracy_list["test_adv_att"].append(test_accuracy_att)
    losses_list["test_adv_att"].append(test_losses_att)
    # 釋放記憶體
    del x_train
    del y_train
    del cifar100_train_adv
    del cifar100_test_adv
    # del cifar100_train_adv_att
    # del cifar100_test_adv_att
    gc.collect()

In [None]:
cifar100_test_adv = [cifar100_test[:1000] for _ in range(256+1)]
for i in range(1, BIT_COUNT+1):
  cifar100_test_adv[i] = pgd_attack_to_target(cifar100_test_adv[i], [user_code[i]] * 1000, tf.keras.losses.CategoricalCrossentropy(), decoder, epsilon, alpha, 150)
#測試test_adv
for i in range(BIT_COUNT+1):
  result_temp = decoder.evaluate(cifar100_test_adv[i], np.array([user_code[i]] * DATA_LENGTH))

In [None]:
test_accuracy_final = []
for i in range(START_BIT,184+8,8):
  test_accuracy_final.append([np.mean(accuracy_list['test_adv'][int(i/8)-1]), np.std(accuracy_list['test_adv'][int(i/8)-1],ddof=1), sorted(accuracy_list['test_adv'][int(i/8)-1])[0]])

In [None]:
x = [i for i in range(START_BIT,184+8,8)]

In [None]:
print("avg_accuracy:{}  min_accuracy:{}   std:{}".format(test_accuracy_final[0][0],test_accuracy_final[0][2],test_accuracy_final[0][1]))

In [None]:
plt.plot(x, np.array(test_accuracy_final)[:,2], label='min')
plt.xlabel('Number of categories')
plt.ylabel('Accuracy')
plt.legend()
plt.savefig('min.png',dpi=3000)
plt.show()