In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from PIL import Image
import tensorflow as tf
from keras.models import Sequential, Model
from tensorflow.keras import datasets
from tensorflow.keras.layers import Dense
from tensorflow.keras.utils import to_categorical

from IPython import display

In [None]:
(training_images, training_labels), (test_images, test_labels) = datasets.mnist.load_data()
print("training_images.shape:",training_images.shape)
print("test_images.shape:",test_images.shape)

In [None]:
# 把後面二維的部分攤平成一維
t_training_images = training_images.reshape(60000, 784)
t_test_images = test_images.reshape(10000, 784)

#轉換格式為 float32
t_training_images = t_training_images.astype('float32')
t_test_images = t_test_images.astype('float32')

# 將數值做正規化
t_training_images  = t_training_images / 255.0
t_test_images = t_test_images / 255.0

num_classes=10
t_training_labels = tf.keras.utils.to_categorical(training_labels, num_classes)
t_test_labels = tf.keras.utils.to_categorical(test_labels, num_classes)

In [None]:
# 定應屬於自己的 base model
class myModule(tf.keras.Model):
    def __init__(self) :
        super(myModule,self).__init__()
        # 這邊只做一層是因為做兩層的回覆效果就不好了
        #self.dense1 = Dense(128,activation=tf.nn.sigmoid)
        self.dense2 = Dense(10,activation=tf.nn.softmax)
        
    def call(self, inputs, training=True) :
        #x = self.dense1(inputs)
        #x = self.dense2(x)
        x = self.dense2(inputs)
        return x
    
    # 這邊在模型內定義一個 function, 傳入輸入資料跟標籤會回傳當下 gradient
    def gradient(self,x,y) :
        with tf.GradientTape() as tape:
            predict = self(x)            
            tape.watch(self.weights)
            loss = tf.reduce_mean(tf.losses.categorical_crossentropy(predict, y))
            g = tape.gradient(loss, self.weights)
        
        # 以下只是把梯度所有資料串接成一個很長的 1 x N 陣列
        gradient_all = tf.reshape(g[0], (1,-1))        
        for grad in g[1:]:
            gradient_all = tf.concat([gradient_all, tf.reshape(grad, (1,-1))], axis=-1)
        return gradient_all
        

my_model = myModule()

In [None]:
# 編譯模型, 並輸出內部數值
my_model.compile(optimizer = tf.optimizers.Adam(),
              loss = 'categorical_crossentropy',
              metrics=['accuracy'])

In [None]:
my_model.fit(t_training_images, t_training_labels, epochs=10, batch_size=128)

In [None]:
loss, acc = my_model.evaluate(t_test_images, t_test_labels, batch_size=128)
print("loss rate:", loss, " accuracy: " , (100.0 * acc))

In [None]:
# 這邊要注意 Model subclassing 儲存的格式以及缺點
my_model.save('mnist_basic_model',save_format='tf')
#my_model.dense1.trainable=False
my_model.dense2.trainable=False

In [None]:
# 這個是我們要還原的目標, 利用輸入資料跟標籤得到他的梯度資料
test_index = 0

plt.imshow(test_images[test_index],cmap='gray')
print(test_labels[test_index])

target_gradient = my_model.gradient(t_test_images[test_index].reshape((-1,784)),t_test_labels[test_index].reshape(-1,10))

In [None]:
class DeepLeakage(tf.keras.Model) :
    def __init__(self,base_model) :
        super(DeepLeakage,self).__init__()
        # 宣告兩個 Dense , 一個負責圖形輸入, 一個負責判斷標籤
        self.dense1 = Dense(784, use_bias=False, kernel_initializer=tf.keras.initializers.RandomUniform(0, 1))
        self.dense2 = Dense(10, activation="softmax", use_bias=False, kernel_initializer=tf.keras.initializers.Ones())
        self.base_model = base_model
    
    def call(self, inputs) :
        # 這邊使用 functional API 方式讓一個輸入串接兩個 Dense
        x = self.dense1(inputs)
        y = self.dense2(inputs)
        return self.base_model.gradient(x,y)     

In [None]:
model = DeepLeakage(my_model)
model.compile(optimizer="adam", loss="mse")

In [None]:
def train_itertions(model, g, itertions=100000, interval=10):
    model.dense1(np.ones((1,1)))
    for i in range(itertions):
        if i%interval==0:
            plt.clf()
            plt.imshow(model.dense1.weights[0].numpy()[0].reshape(28, 28),cmap='gray')
            plt.axis('off')

            display.clear_output(wait=True)
            display.display(plt.gcf())
        loss = model.train_on_batch(np.ones((1, 1)), y=g)

In [None]:
train_itertions(model, g=target_gradient.numpy(), itertions=2000, interval=10)