# tensorflow model optimization for weight quantization

In [None]:
!pip install -q tensorflow-model-optimization

# Imports

In [None]:
from pathlib import Path
import numpy as np
import seaborn as sns
from matplotlib import pyplot as plt
import hashlib
import math
import tensorflow as tf
from tensorflow.keras.datasets import mnist, fashion_mnist, cifar10
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.metrics import confusion_matrix
import seaborn as sns
import pathlib
import numpy as np
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, CSVLogger, EarlyStopping, LearningRateScheduler
import tempfile
import zipfile
import os
import h5py
import glob
from numpy import linalg as LA
from scipy.stats import rankdata
from shutil import copyfile, move
from PIL import Image
import random
import cv2
import tensorflow_model_optimization as tfmot
import tensorboard
import shutil
from sklearn.model_selection import train_test_split
from tensorflow.keras.applications.vgg19 import VGG19

# Simulation Parameters

In [None]:
dataset = 'cifar10' #the dataset for the experiment, OPTIONS: mnist, fashionmnist, cifar10 and imagenet

FT_portion = 0.1 # how much from the original dataset we preserve for fine-tuning

nb_classes = 10

if dataset == 'mnist' or dataset == 'fashionmnist':
    input_shape = (28, 28, 1) # the input volume shape for the CNN
    epochs = 100
else:
    input_shape = (32, 32, 3)
    epochs = 200

batch_size = 64 #training batch size

batch_size_FT = 64 # fine tuning batch size

epochs_FT = 30 # epochs FT

pruning_trials = 50 #how many experiments on random weights pruning for each k

lr_FT = 1e-5 # learning rate for finetuning/pruning

val_split_training = 0.1 # validation set for training

val_split_FT = 0.1 # validation set for finetuning

Freeze_CNN = False # to freeze or not the feature extractors at fine-tuning

base_dir = './' #base directory where to create dirs and save results

hash_output_size = 160 # 160 for SHA1, 256 for SHA256, 512 for SHA512

ks = [0.0, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95, 0.97, 0.99] #weight pruning rates

JPEG_QF = [55, 60, 65, 70, 75, 80, 85, 90, 95, 100] #JPEG QUALITY FACTORS USED

learn_rate = 0.001 #used in case of cifar10 and imagenet

# Functions

**copy folder and its content to a destination**

In [None]:
def copy_and_overwrite(from_path, to_path):
    if os.path.exists(to_path):
        shutil.rmtree(to_path)
    shutil.copytree(from_path, to_path)

**Confusion Matrix**

In [None]:
def make_confusion_matrix(cf, savePath, saveName,
                          group_names=None,
                          categories='auto',
                          count=True,
                          percent=True,
                          cbar=True,
                          xyticks=True,
                          xyplotlabels=True,
                          sum_stats=True,
                          figsize=None,
                          cmap='Blues',
                          title=None):
    '''
    This function will make a pretty plot of an sklearn Confusion Matrix cm using a Seaborn heatmap visualization.

    Arguments
    ---------
    cf:            confusion matrix to be passed in

    group_names:   List of strings that represent the labels row by row to be shown in each square.

    categories:    List of strings containing the categories to be displayed on the x,y axis. Default is 'auto'

    count:         If True, show the raw number in the confusion matrix. Default is True.

    normalize:     If True, show the proportions for each category. Default is True.

    cbar:          If True, show the color bar. The cbar values are based off the values in the confusion matrix.
                   Default is True.

    xyticks:       If True, show x and y ticks. Default is True.

    xyplotlabels:  If True, show 'True Label' and 'Predicted Label' on the figure. Default is True.

    sum_stats:     If True, display summary statistics below the figure. Default is True.

    figsize:       Tuple representing the figure size. Default will be the matplotlib rcParams value.

    cmap:          Colormap of the values displayed from matplotlib.pyplot.cm. Default is 'Blues'
                   See http://matplotlib.org/examples/color/colormaps_reference.html

    title:         Title for the heatmap. Default is None.

    savePath:      where to save the plot

    '''

    # CODE TO GENERATE TEXT INSIDE EACH SQUARE
    blanks = ['' for i in range(cf.size)]

    if group_names and len(group_names) == cf.size:
        group_labels = ["{}\n".format(value) for value in group_names]
    else:
        group_labels = blanks

    if count:
        group_counts = ["{0:0.0f}\n".format(value) for value in cf.flatten()]
    else:
        group_counts = blanks

    if percent:
        group_percentages = ["{0:.2%}".format(value) for value in cf.flatten() / np.sum(cf)]
    else:
        group_percentages = blanks

    box_labels = [f"{v1}{v2}{v3}".strip() for v1, v2, v3 in zip(group_labels, group_counts, group_percentages)]
    box_labels = np.asarray(box_labels).reshape(cf.shape[0], cf.shape[1])

    # CODE TO GENERATE SUMMARY STATISTICS & TEXT FOR SUMMARY STATS
    if sum_stats:
        # Accuracy is sum of diagonal divided by total observations
        accuracy = np.trace(cf) / float(np.sum(cf))

        # if it is a binary confusion matrix, show some more stats
        if len(cf) == 2:
            # Metrics for Binary Confusion Matrices
            precision = cf[1, 1] / sum(cf[:, 1])
            recall = cf[1, 1] / sum(cf[1, :])
            f1_score = 2 * precision * recall / (precision + recall)
            stats_text = "\n\nAccuracy={:0.3f}\nPrecision={:0.3f}\nRecall={:0.3f}\nF1 Score={:0.3f}".format(
                accuracy, precision, recall, f1_score)
        else:
            stats_text = "\n\nAccuracy={:0.3f}".format(accuracy)
    else:
        stats_text = ""

    # SET FIGURE PARAMETERS ACCORDING TO OTHER ARGUMENTS
    if figsize == None:
        # Get default figure size if not set
        figsize = plt.rcParams.get('figure.figsize')

    if xyticks == False:
        # Do not show categories if xyticks is False
        categories = False

    # MAKE THE HEATMAP VISUALIZATION
    plt.figure(figsize=figsize)
    sns.heatmap(cf, annot=box_labels, fmt="", cmap=cmap, cbar=cbar, xticklabels=categories, yticklabels=categories)

    if xyplotlabels:
        plt.ylabel('True label')
        plt.xlabel('Predicted label' + stats_text)
    else:
        plt.xlabel(stats_text)

    if title:
        plt.title(title)

    plt.savefig(Path(savePath) / (str(saveName) + '.jpg'))
    plt.close()


def plot_confusion_matrix(cm, save_path, save_name, categories_digits):
    make_confusion_matrix(cf=cm, savePath=save_path, saveName=save_name,
                          group_names=None,
                          categories=categories_digits,
                          count=True,
                          percent=True,
                          cbar=True,
                          xyticks=True,
                          xyplotlabels=True,
                          sum_stats=True,
                          figsize=(12, 8),
                          cmap='Blues',
                          title=None)

**Function bool2int(x): convert binary to integer**

In [None]:
def bool2int(x):
    y = 0
    for i, j in enumerate(x):
        y += j << i
    return y

**flat_N_images(trainX, index_imgs): flat N images and return them plus their binary vector**

In [None]:
def flat_N_images(trainX, index_imgs):
    all_images_flat_array = trainX[index_imgs].flatten()
    all_images_flat_binary_array = np.array(all_images_flat_array).clip(max=1)

    return all_images_flat_array, all_images_flat_binary_array

**Hash Functions: SHA1, SHA256, SHA512**

In [None]:
def hash_functionSHA1(s):
    hashcode = hashlib.sha1(
        s.encode('utf-8')).hexdigest()
    padded_binary = bin(int(hashcode, 16))[2:].zfill(len(hashcode) * 4)
    if len(padded_binary) < 160:
        print(len(padded_binary))

    binary_array_result = np.array([int(padded_binary[x]) for x in range(len(padded_binary))])
    return binary_array_result, hashcode

def hash_functionSHA256(s):
    hashcode = hashlib.sha256(
        s.encode('utf-8')).hexdigest()
    padded_binary = bin(int(hashcode, 16))[2:].zfill(len(hashcode) * 4)
    if len(padded_binary) < 256:
        print(len(padded_binary))

    binary_array_result = np.array([int(padded_binary[x]) for x in range(len(padded_binary))])
    return binary_array_result, hashcode

def hash_functionSHA512(s):
    hashcode = hashlib.sha512(
        s.encode('utf-8')).hexdigest()
    padded_binary = bin(int(hashcode, 16))[2:].zfill(len(hashcode) * 4)
    if len(padded_binary) < 512:
        print(len(padded_binary))

    binary_array_result = np.array([int(padded_binary[x]) for x in range(len(padded_binary))])
    return binary_array_result, hashcode

**MAP(): take the hash output and return the key labels**

In [None]:
def MAP(hash_output, nb_classes):
  nb_bits_per_class = int(math.ceil(math.log2(nb_classes)))
  arr_dim = len(hash_output)//nb_bits_per_class
  labels = np.zeros((arr_dim,))
  for i in range(hash_output.shape[0]//nb_bits_per_class):
    tmp = int(bool2int(hash_output[i*nb_bits_per_class:(i+1)*nb_bits_per_class]))
    if tmp >= nb_classes:
      labels[i] = tmp%nb_classes
    else:
      labels[i] = tmp
  return labels.astype("int")

**GenerateKeyLabels(): Encapsulates all the functions above. Take the secret key, training set and image indeces and return the key labels**

In [None]:
def GenerateKeyLabels(trainX, img_index, sec_key, nb_classes, hash_output_size): 
    res_flat_str, res_flat_binary = flat_N_images(trainX, img_index)

    res_flat_str = str(trainX[img_index]).lstrip('[').rstrip(']')
    sec_key_str = str(sec_key).lstrip('[').rstrip(']')

    res_img_key = [chr(ord(a) ^ ord(b)) for a,b in zip(res_flat_str, sec_key_str)]
    res_img_key = "".join(res_img_key)
    res_str = str(res_img_key)


    if hash_output_size == 160:
        binary_hashout, hashout = hash_functionSHA1(res_str)
    elif hash_output_size == 256:
        binary_hashout, hashout = hash_functionSHA256(res_str)
    else:
        binary_hashout, hashout = hash_functionSHA512(res_str)
    labels = MAP(binary_hashout, nb_classes)

    return labels

**run_tflite_model(): Helper function to run inference on a TFLite model when applying weight quantization methods**

In [None]:
def run_tflite_model(tflite_file, dsX, dsY, index_imgs):

    # Initialize the interpreter
    interpreter = tf.lite.Interpreter(model_path=str(tflite_file))
    interpreter.allocate_tensors()

    input_details = interpreter.get_input_details()[0]
    output_details = interpreter.get_output_details()[0]

    predictions = np.zeros((len(index_imgs),), dtype=int)
    for i, test_image_index in enumerate(index_imgs):
        test_image = dsX[test_image_index,]
        test_label = dsY[test_image_index,]

        if input_details['dtype'] == np.uint8:
            input_scale, input_zero_point = input_details["quantization"]
            test_image = test_image / input_scale + input_zero_point

        test_image = np.expand_dims(test_image, axis=0).astype(input_details["dtype"])
        interpreter.set_tensor(input_details["index"], test_image)
        interpreter.invoke()
        output = interpreter.get_tensor(output_details["index"])[0]

        predictions[i] = output.argmax()

    return predictions

**Normalize Data and Generate One hot label version**

In [None]:
def Normalize_and_OneHot(x,y, nb_classes):
    x = x.astype("float32") / 255
    if x.shape.__len__() == 3: # expand greyscale, not RGB
        x = np.expand_dims(x, -1)
    y = to_categorical(y, num_classes = nb_classes)
    return x,y

**DNN Model**

In [None]:
def model_mnist():
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.Conv2D(64, kernel_size=5, activation='relu', input_shape=input_shape))
    model.add(tf.keras.layers.MaxPool2D())
    model.add(tf.keras.layers.Conv2D(128, kernel_size=5, activation='relu'))
    model.add(tf.keras.layers.MaxPool2D())
    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(256, activation='relu'))
    model.add(tf.keras.layers.Dense(nb_classes, activation='softmax'))
    model.compile(optimizer=tf.keras.optimizers.Adam(), loss="categorical_crossentropy", metrics=["accuracy"])
    return model

def model_fashionmnist():
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.Conv2D(64,kernel_size=5,activation='relu',input_shape=input_shape))
    model.add(tf.keras.layers.MaxPool2D())
    model.add(tf.keras.layers.Conv2D(128,kernel_size=5,activation='relu'))
    model.add(tf.keras.layers.MaxPool2D())
    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dropout(0.2))
    model.add(tf.keras.layers.Dense(256, activation='relu'))
    model.add(tf.keras.layers.Dropout(0.2))
    model.add(tf.keras.layers.Dense(nb_classes, activation='softmax'))
    model.compile(optimizer=tf.keras.optimizers.Adam(), loss="categorical_crossentropy", metrics=["accuracy"])
    return model

def model_cifar10():
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same', input_shape=(32, 32, 3)))
    model.add(tf.keras.layers.Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
    model.add(tf.keras.layers.MaxPooling2D((2, 2)))
    model.add(tf.keras.layers.Dropout(0.2))
    model.add(tf.keras.layers.Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
    model.add(tf.keras.layers.Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
    model.add(tf.keras.layers.MaxPooling2D((2, 2)))
    model.add(tf.keras.layers.Dropout(0.2))
    model.add(tf.keras.layers.Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
    model.add(tf.keras.layers.Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
    model.add(tf.keras.layers.MaxPooling2D((2, 2)))
    model.add(tf.keras.layers.Dropout(0.2))
    model.add(tf.keras.layers.Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
    model.add(tf.keras.layers.Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
    model.add(tf.keras.layers.MaxPooling2D((2, 2)))
    model.add(tf.keras.layers.Dropout(0.2))
    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(128, activation='relu', kernel_initializer='he_uniform'))
    model.add(tf.keras.layers.Dropout(0.2))
    model.add(tf.keras.layers.Dense(256, activation='relu', kernel_initializer='he_uniform'))
    model.add(tf.keras.layers.Dropout(0.2))
    model.add(tf.keras.layers.Dense(nb_classes, activation='softmax'))
    opt = SGD(lr=learn_rate, momentum=0.9)
    model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
    return model


def model_imagenet():
    base_model = VGG19(include_top=False,weights='imagenet',input_shape=input_shape,classes=nb_classes)
    model = tf.keras.Sequential()
    model.add(base_model)
    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(1024,activation=('relu')))
    model.add(tf.keras.layers.Dense(512,activation=('relu')))
    model.add(tf.keras.layers.Dense(nb_classes,activation=('softmax'))) #This is the classification layer
    sgd=SGD(learning_rate=learn_rate,momentum=.9,nesterov=False)
    model.compile(optimizer=sgd,loss='categorical_crossentropy',metrics=['accuracy'])
    return model

**Plot and Save Training History**

In [None]:
def PlotTrainingHistory(res_dir, history): 
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='lower right')
    plt.grid()
    plt.savefig(os.path.join(res_dir, 'accuracy_plot_model.jpg'))
    plt.close()


    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper right')
    plt.grid()
    plt.savefig(os.path.join(res_dir, 'loss_plot_model.jpg'))
    plt.close()

**Weights pruning**

In [None]:
def weight_pruning(modelPruning, pruning_rate):
    for idx, layer in enumerate(modelPruning.layers):
        #print(layer.name, layer.trainable)

        layer_weights = modelPruning.get_layer(name=layer.name).get_weights()

        if np.array(layer_weights).shape[0] == 0:
            weight = np.array(layer_weights)
        else:
            weight = np.array(layer_weights[0])

        indices = np.random.choice(np.arange(weight.size), replace=False, size=int(weight.size * k))

        weight[np.unravel_index(indices, weight.shape)] = 0

        if np.array(layer_weights).shape[0] == 0:
            layer_weights = weight
        else:
            layer_weights[0] = weight
        
        modelPruning.get_layer(name=layer.name).set_weights(layer_weights)
    return modelPruning

**Fully connected layers pruning function**



In [None]:
def pruning_FC(model_FC_prune, k):
    for layer in filter(lambda x: 'dense' in x.name, model_FC_prune.layers):
        weights_shape, bias_shape = map(lambda x: x.shape, layer.get_weights())
        pruned_weights = np.copy(np.array(layer.get_weights()[0]))
        pruned_bias = np.copy(np.array(layer.get_weights()[1]))

        indices = np.random.choice(pruned_weights.shape[1]*pruned_weights.shape[0],
                                    replace=False, size=int(pruned_weights.shape[1]*pruned_weights.shape[0]*k))
        pruned_weights[np.unravel_index(indices, pruned_weights.shape)] = 0

        layer.set_weights([pruned_weights, pruned_bias])
    return model_FC_prune

# Training the model

**Data Preparation**

In [None]:
random.seed(1)

tf.keras.backend.clear_session()

res_dir = os.path.join(base_dir, str(hash_output_size))
isExist = os.path.exists(res_dir)
if not isExist:
    os.mkdir(res_dir)


#Create text file to write all results and info needed
txt_file_results_Path = os.path.join(res_dir, 'results_Hash_' + str(hash_output_size) +'.txt')
txt_file_results = open(txt_file_results_Path, 'w')


#################################################################################################################
###################### Data preparation #########################################################################
#################################################################################################################

#Download the dataset
if dataset == 'mnist':
    (x_train_all, y_train_all_base), (x_test_all, y_test_all_base) = tf.keras.datasets.mnist.load_data()
elif dataset == 'fashionmnist':
    (x_train_all, y_train_all_base), (x_test_all, y_test_all_base) = tf.keras.datasets.fashion_mnist.load_data()
elif dataset == 'cifar10':
    (x_train_all, y_train_all_base), (x_test_all, y_test_all_base) = tf.keras.datasets.cifar10.load_data()
else:
    (x_train_all, y_train_all_base), (x_test_all, y_test_all_base) = tf.keras.datasets.cifar10.load_data()

#Copy the labels to avoid variable update in numpy
y_train_all = np.copy(y_train_all_base)
y_test_all = np.copy(y_test_all_base)


#SPLIT the dataset for finetuning and training
x_train, x_train_FT, y_train, y_train_FT = train_test_split(x_train_all, y_train_all, 
                                                            test_size=FT_portion, shuffle=True, random_state=42)

x_test, x_test_FT, y_test, y_test_FT = train_test_split(x_test_all, y_test_all, 
                                                            test_size=FT_portion, shuffle=True, random_state=42)

Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz



**Key image-label pairs injection: DNN watermark generation**

In [None]:
#################################################################################################################
###################### Key image-label pairs injection ##########################################################
#################################################################################################################
#Generate Key Image-Label Pairs and Save the key indeces into a text file
bits_per_class = int(math.ceil(math.log2(nb_classes)))
nb_images = hash_output_size // bits_per_class

#Defender secret info
sec_key = np.random.randint(2, size=(hash_output_size,)).astype("uint8") #secret key k_{sec}
index_imgs = [random.randint(0, x_train.shape[0] - 1) for p in range(0, nb_images)]  # indeces of Key Images

#Generate the key labels and inject them back into the dataset labels
Y_h = GenerateKeyLabels(x_train, index_imgs, sec_key, nb_classes, hash_output_size)
Y_h = Y_h.astype(np.uint8)

if dataset == 'mnist' or dataset == 'fashionmnist':
    y_train[index_imgs] = Y_h[:]
else:
    y_train[index_imgs] = Y_h[:].reshape((Y_h.shape[0],1))

txt_file_results.write("The Key Images indices from the Training Dataset" + "\n")
txt_file_results.write(str(index_imgs) + '\n')

267

**Normalize dataset and One hot encoding the labels**

In [None]:
#Normalize and OneHot Encoding
x_train_all, y_train_all = Normalize_and_OneHot(x_train_all,y_train_all, nb_classes)
x_test_all, y_test_all = Normalize_and_OneHot(x_test_all,y_test_all, nb_classes)
x_train, y_train = Normalize_and_OneHot(x_train,y_train, nb_classes)
x_test, y_test = Normalize_and_OneHot(x_test,y_test, nb_classes)
x_test_FT, y_test_FT = Normalize_and_OneHot(x_test_FT,y_test_FT, nb_classes)
x_train_FT, y_train_FT = Normalize_and_OneHot(x_train_FT,y_train_FT, nb_classes)
x_test_FT, y_test_FT = Normalize_and_OneHot(x_test_FT,y_test_FT, nb_classes)

**Training**

In [None]:
#Create a temporary log directory
logdir = tempfile.mkdtemp()

#Create a new model instance
if dataset == 'mnist':
    model = model_mnist()
elif dataset == 'fashionmnist':
    model = model_fashionmnist()
elif dataset == 'cifar10':
    model = model_cifar10()
else:
    model = model_imagenet()
model.summary()

#create a copy of the initial model before training
_, model_file_init = tempfile.mkstemp('.h5')
tf.keras.models.save_model(model, model_file_init, include_optimizer=False)

#Training Part to Inject the key Image-Label Pairs
model_name_call_back = os.path.join(res_dir, str(dataset) + "-model-at_{epoch}.h5")

csv_name = os.path.join(res_dir, str(dataset) + "-model-SHA" + str(hash_output_size) + ".csv")
csv_logger = CSVLogger(csv_name)

model_checkpoint = tf.keras.callbacks.ModelCheckpoint(model_name_call_back, save_best_only=False, save_freq='epoch',
                                                        period=10), tf.keras.callbacks.TensorBoard(log_dir=logdir, profile_batch=0),

callbacks_training = [model_checkpoint, csv_logger]

#Perform Training
history = model.fit(x_train, y_train, batch_size=batch_size, callbacks = callbacks_training, epochs=epochs,
                                validation_split=val_split_training)


#Calculate performance metrics on Trained host DNN
score_ts = model.evaluate(x_test, y_test, verbose=0)
txt_file_results.write('Test accuracy of the DNN model is'+'\n')
txt_file_results.write(str(score_ts[1])+'\n')

score_tr = model.evaluate(x_train, y_train, verbose=0)
txt_file_results.write('Training accuracy of the DNN model is'+'\n')
txt_file_results.write(str(score_tr[1])+'\n')

predictions = model.predict(x_test)
y_pred = np.argmax(predictions, axis=-1)
y_true = np.argmax(y_test, axis=-1)

cm = confusion_matrix(y_true, y_pred)#, labels=[1,0])
plot_confusion_matrix(cm, res_dir, 'confusion_matrix_model_' + str(dataset),
                        categories_digits=["0","1", "2", "3", "4", "5", "6", "7", "8", "9"])

#Plot the history of Training: Accuracy and Loss
PlotTrainingHistory(res_dir, history)

#Compute the watermark recovery rate on the trained host DNN model
predictions = model.predict(x_train[index_imgs])
y_pred = np.argmax(predictions, axis=-1)
equal_guesses = (y_pred == Y_h).sum()

print('DNN Watermark recovery rate of Trained model ' + str("{0:.0f}%".format((equal_guesses/len(Y_h))*100)))
txt_file_results.write('DNN Watermark recovery rate of Trained model'+'\n')
txt_file_results.write(str(equal_guesses/len(Y_h))+'\n')


#Save copy of the Host DNN model for post-processings/attacks
_, model_file = tempfile.mkstemp('.h5')
tf.keras.models.save_model(model, model_file, include_optimizer=False)

_, model_file_ForPruning = tempfile.mkstemp('.h5')
tf.keras.models.save_model(model, model_file_ForPruning, include_optimizer=True)

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 32, 32, 32)        896       
                                                                 
 conv2d_1 (Conv2D)           (None, 32, 32, 32)        9248      
                                                                 
 max_pooling2d (MaxPooling2D  (None, 16, 16, 32)       0         
 )                                                               
                                                                 
 dropout (Dropout)           (None, 16, 16, 32)        0         
                                                                 
 conv2d_2 (Conv2D)           (None, 16, 16, 64)        18496     
                                                                 
 conv2d_3 (Conv2D)           (None, 16, 16, 64)        36928     
                                                        

  super(SGD, self).__init__(name, **kwargs)


Epoch 1/2
Epoch 2/2
DNN Watermark recovery rate of Trained model 15%


# Fine-Tuning

In [None]:
#Fine Tuning the host DNN model
FineTune_model = tf.keras.models.load_model(model_file)

#Unfreeze/Freeze the feature extractor of the host DNN
if Freeze_CNN == True:
    for ix, layer in enumerate(FineTune_model.layers):
        if isinstance(layer, tf.keras.layers.Dense):
            layer.trainable = True
        else:
            layer.trainable = False

#check the freezed layers
for ix, layer in enumerate(FineTune_model.layers):
    print(layer.name, layer.trainable)

FineTune_model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate = lr_FT),
                    loss="categorical_crossentropy", metrics=["accuracy"])

history_fine_tuning = FineTune_model.fit(x_train_FT, y_train_FT, batch_size=batch_size_FT,
                                    epochs=epochs_FT, validation_split=0.1)

#Compute TestSet accuracy on fine-tuned model 
_, fine_tuning_test_accuracy = FineTune_model.evaluate(x_test, y_test, verbose=0)
txt_file_results.write('Test Accuracy of the Fine-Tuned model'+'\n')
txt_file_results.write(str(fine_tuning_test_accuracy)+'\n')

#Compute watermark recovery performance after fine-tuning
predicted_WM_labels_finetuning = FineTune_model.predict(x_train[index_imgs])
predicted_WM_labels_finetuning = np.argmax(predicted_WM_labels_finetuning, axis=-1)

equal_guesses = (predicted_WM_labels_finetuning == Y_h).sum()
print('DNN Watermark recovery rate of the fine-tuned model' + ' is: ' + str("{0:.0f}%".format((equal_guesses/len(Y_h))*100)))
txt_file_results.write('DNN Watermark recovery rate of Fine-Tuned model'+'\n')
txt_file_results.write(str(equal_guesses/len(Y_h))+'\n')


#Compute testset accuracy after fine-tuning
predictions = FineTune_model.predict(x_test)
y_pred = np.argmax(predictions, axis=-1)
y_true = np.argmax(y_test, axis=-1)

cm = confusion_matrix(y_true, y_pred)
plot_confusion_matrix(cm, res_dir, 'confusion_matrix_FineTuned_model',
                        categories_digits=["0","1", "2", "3", "4", "5", "6", "7", "8", "9"])



conv2d True
conv2d_1 True
max_pooling2d True
dropout True
conv2d_2 True
conv2d_3 True
max_pooling2d_1 True
dropout_1 True
conv2d_4 True
conv2d_5 True
max_pooling2d_2 True
dropout_2 True
conv2d_6 True
conv2d_7 True
max_pooling2d_3 True
dropout_3 True
flatten True
dense True
dropout_4 True
dense_1 True
dropout_5 True
dense_2 True
Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30
DNN Watermark recovery rate of the fine-tuned model is: 12%


# Pruning

In [None]:
test_res_pruning = ks*0 #store testset accuracy weights pruning

recovery_hash_pruning = ks*0 #watermark recovery rates weights pruning

test_res_FC_pruning = ks*0 #store testset accuracy FC pruning

recovery_hash_FC_pruning = ks*0 #watermark recovery rates FC pruning


for idx, k in enumerate(ks):
    print("Pruning with k = " +  str(k) + '  ....')
    accuracy_k = 0
    rec_k = 0

    accuracyFC_k = 0
    recFC_k = 0        
    for exp in range(pruning_trials):
        #weights Pruning
        modelPruning = tf.keras.models.load_model(model_file_ForPruning)        

        modelPruning = weight_pruning(modelPruning, k)
        #Compute TestSet accuracy on pruned model 
        _, TA_tmp = modelPruning.evaluate(x_test, y_test, verbose=0)

        predictions_watermark_pruning = modelPruning.predict(x_train[index_imgs])
        predictions_watermark_pruning = np.argmax(predictions_watermark_pruning, axis=-1)
        equal_guesses = (predictions_watermark_pruning == Y_h).sum()
        rec_tmp = (equal_guesses/len(Y_h))

        accuracy_k = accuracy_k + TA_tmp
        rec_k = rec_k + rec_tmp

        #FC pruning            
        modelFCPruning = tf.keras.models.load_model(model_file_ForPruning)
        modelFCPruning = pruning_FC(modelFCPruning, k)

        _, TAFC_tmp = modelFCPruning.evaluate(x_test, y_test, verbose=0)
        
        predictions = modelFCPruning.predict(x_train[index_imgs])
        predictions = np.argmax(predictions, axis=-1)
        equal_guesses = (predictions == Y_h).sum()
        recFC_tmp = (equal_guesses/len(Y_h))

        accuracyFC_k = accuracyFC_k + TAFC_tmp
        recFC_k = recFC_k + recFC_tmp

    #1. weights
    Weightspruning_accuracy = accuracy_k / pruning_trials
    Weightspruning_rec = rec_k / pruning_trials


    print('Pruning Ratio ', str(k) )
    #Compute TestSet accuracy on pruned model 
    print('TA Weights pruning : ' + str("{0:.0f}%".format(Weightspruning_accuracy*100)))
    test_res_pruning.append(Weightspruning_accuracy)

    #Compute watermark recovery rate on pruned model 
    print('rec Weights pruning : ' + str("{0:.0f}%".format(Weightspruning_rec*100)))
    recovery_hash_pruning.append(Weightspruning_rec)

    #2. FC
    FCpruning_accuracy = accuracyFC_k / pruning_trials
    FCpruning_rec = recFC_k / pruning_trials        

    print('TA FC pruning : ' + str("{0:.0f}%".format(FCpruning_accuracy*100)))
    test_res_FC_pruning.append(FCpruning_accuracy)


    print('rec FC pruning : ' + str("{0:.0f}%".format(FCpruning_rec*100)))
    recovery_hash_FC_pruning.append(FCpruning_rec)



txt_file_results.write('Pruning rates used'+'\n')
txt_file_results.write(str(ks)+'\n')

txt_file_results.write('Test Accuracy Weights Pruning'+'\n')
txt_file_results.write(str(test_res_pruning)+'\n')
txt_file_results.write('Watermark recovery rate Weights Pruning'+'\n')
txt_file_results.write(str(recovery_hash_pruning)+'\n')

txt_file_results.write('Test Accuracy FC Pruning'+'\n')
txt_file_results.write(str(test_res_FC_pruning)+'\n')
txt_file_results.write('Watermark recovery rate FC Pruning'+'\n')
txt_file_results.write(str(recovery_hash_FC_pruning)+'\n')

Pruning with k = 0.0  ....


  import sys
  app.launch_new_instance()


KeyboardInterrupt: ignored

# JPEG Compression

In [None]:
watermark_rate_jpeg = JPEG_QF*0
Testaccuracy_jpeg = JPEG_QF*0
for QF in JPEG_QF:
    #create a different save directory for different quality factors
    save_jpeg_dir_crazy = pathlib.Path(os.path.join(res_dir, "JPEG_IMAGES_CRAZY_QF_"+str(QF)+"/"))
    save_jpeg_dir_crazy.mkdir(exist_ok=True, parents=True)
    save_jpeg_dir_TestSet = pathlib.Path(os.path.join(res_dir, "JPEG_IMAGES_TestSet_QF_"+str(QF)+"/"))
    save_jpeg_dir_TestSet.mkdir(exist_ok=True, parents=True)
    
    #load jpeg model
    model_JPEG = tf.keras.models.load_model(model_file)

    labels_jpg_crazy = np.zeros((len(index_imgs),))
    predictions_jpeg_crazy = np.zeros((len(index_imgs),))
    

    labels_jpg_testSet = np.zeros((len(y_test),))
    predictions_jpeg_testSet = np.zeros((len(y_test),))
    

    #loop for key image-label pairs
    for idx, val in enumerate(index_imgs):
        labels_jpg_crazy[idx] = np.argmax(y_train[val,], -1)
        pil_img = tf.keras.preprocessing.image.array_to_img(x_train[val,])
        pil_img_name = 'JPEG-IMG-Crazy-' + str(idx) + '-' + str(str(int(labels_jpg_crazy[idx]))) + '.jpg'
        #save the image as jpeg with the current quality factor
        pil_img.save(os.path.join(save_jpeg_dir_crazy, pil_img_name), quality = QF)
        jpeg_img = Image.open(os.path.join(save_jpeg_dir_crazy, pil_img_name))
        #prediction
        pred = model_JPEG.predict(np.expand_dims(np.expand_dims(np.array(jpeg_img), -1), axis=0) / 255)
        predictions_jpeg_crazy[idx] = int(np.argmax(pred, -1))

    #loop on the test dataset
    for idx, val in enumerate(x_test):
        labels_jpg_testSet[idx] = np.argmax(y_test[idx,], -1)
        pil_img = tf.keras.preprocessing.image.array_to_img(val)
        pil_img_name = 'JPEG-IMG-TestSet-' + str(idx) + '-' + str(str(int(labels_jpg_testSet[idx]))) + '.jpg'
        #save the image as jpeg with the current quality factor
        pil_img.save(os.path.join(save_jpeg_dir_TestSet, pil_img_name), quality = QF)
        jpeg_img = Image.open(os.path.join(save_jpeg_dir_TestSet, pil_img_name))
        pred = model_JPEG.predict(np.expand_dims(np.expand_dims(np.array(jpeg_img), -1), axis=0) / 255)
        predictions_jpeg_testSet[idx] = int(np.argmax(pred, -1))

    #save the labels of the key images in npy file
    labels_file_name = os.path.join(save_jpeg_dir_crazy, 'labels_crazy_jpg.npy')
    with open(labels_file_name, 'wb') as f:
        np.save(f, labels_jpg_crazy)

    #Compute watermark recovery rate performance JPEG
    equal_guesses_crazy = (predictions_jpeg_crazy == Y_h).sum()
    recovery_perc_crazy = (equal_guesses_crazy / len(Y_h))
    print('Recovery performance of JPEG model Key Samples is '+ str("{0:.0f}%".format(recovery_perc_crazy * 100)))
    watermark_rate_jpeg.append(recovery_perc_crazy)

    y_true = np.argmax(y_test, axis=-1)
    y_pred = predictions_jpeg_testSet

    cm = confusion_matrix(y_true, y_pred)
    plot_confusion_matrix(cm, res_dir, 'confusion_matrix_model_JPG_QF' + str(QF),
                        categories_digits=["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"])
    
    #Compute TestSet accuracy JPEG
    equal_guesses_TestSet = (predictions_jpeg_testSet == y_true).sum()
    recovery_perc_TestSet = (equal_guesses_TestSet / len(y_test))
    print('Recovery performance of JPEG model TestSet is '+ str("{0:.0f}%".format(recovery_perc_TestSet * 100)))
    Testaccuracy_jpeg.append(recovery_perc_TestSet)

#Write JPEG performance results to text file
txt_file_results.write('JPEG QF used'+'\n')
txt_file_results.write(str(JPEG_QF)+'\n')
txt_file_results.write('Test Accuracy JPEG'+'\n')
txt_file_results.write(str(Testaccuracy_jpeg)+'\n')
txt_file_results.write('Watermark recovery JPEG'+'\n')
txt_file_results.write(str(watermark_rate_jpeg)+'\n')

# Weight Quantization

**Initialization and Data Generators**

In [None]:
#Data Generator for key images
def representative_data_gen_KeySamples():
    for input_value in tf.data.Dataset.from_tensor_slices(x_train[index_imgs]).batch(1).take(Y_h.shape[0]):
        # Model has only one input so each data point has one element.
        yield [input_value]


def representative_data_gen_TestSet():
    for input_value in tf.data.Dataset.from_tensor_slices(x_test).batch(1).take(y_test.shape[0]):
        # Model has only one input so each data point has one element.
        yield [input_value]

#create directory to save the quantized models
tflite_models_dir = pathlib.Path(os.path.join(res_dir, "tflite_models/"))
tflite_models_dir.mkdir(exist_ok=True, parents=True)

#load models to be quantized as a copy for each method from the originally trained model
model_Dyn = tf.keras.models.load_model(model_file)
model_FullInt = tf.keras.models.load_model(model_file)
model_Float16 = tf.keras.models.load_model(model_file)

**Dynamic Quantization Method: Convert Weights From Floating Points to Integers**

In [None]:
#$$$$$$$$$$$$$$$$$$$$$$$$
#1. Key image-labels part
#$$$$$$$$$$$$$$$$$$$$$$$$

#Quantize the model
converter = tf.lite.TFLiteConverter.from_keras_model(model_Dyn)
tflite_model = converter.convert()
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen_KeySamples
tflite_quant_model = converter.convert()

interpreter = tf.lite.Interpreter(model_content=tflite_quant_model)
#check the input/output types
input_type = interpreter.get_input_details()[0]['dtype']
print('input: ', input_type)
output_type = interpreter.get_output_details()[0]['dtype']
print('output: ', output_type)


# Save the unquantized/float model:
tflite_model_file = tflite_models_dir / "modelDyn_KeySamples.tflite"
tflite_model_file.write_bytes(tflite_model)
# Save the quantized model:
tflite_model_quant_file = tflite_models_dir / "modelDyn_KeySamples_quantized.tflite"
tflite_model_quant_file.write_bytes(tflite_quant_model)

#Compute the watermark recovery rate
predictions = run_tflite_model(tflite_model_quant_file, x_train, y_train, index_imgs)
equal_guesses = (predictions == Y_h).sum()
recovery_WM_Dyn = (equal_guesses / len(Y_h))
print('Watermark recovery rate of Dyn Quantization model is: ' + str("{0:.0f}%".format(recovery_WM_Dyn * 100)))

txt_file_results.write('Watermark recovery Dynamic Quantization'+'\n')
txt_file_results.write(str(recovery_WM_Dyn)+'\n')

#$$$$$$$$$$$$$$$$$$$$$$$$
#2. Test DataSet Part
#$$$$$$$$$$$$$$$$$$$$$$$$

#Quantize the model
converter = tf.lite.TFLiteConverter.from_keras_model(model_Dyn)
tflite_model = converter.convert()
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen_TestSet
tflite_quant_model = converter.convert()

interpreter = tf.lite.Interpreter(model_content=tflite_quant_model)
#check the input/output types
input_type = interpreter.get_input_details()[0]['dtype']
print('input: ', input_type)
output_type = interpreter.get_output_details()[0]['dtype']
print('output: ', output_type)

# Save the unquantized/float model:
tflite_model_file = tflite_models_dir / "modelDyn_TestSet.tflite"
tflite_model_file.write_bytes(tflite_model)
# Save the quantized model:
tflite_model_quant_file = tflite_models_dir / "modelDyn_TestSet_quantized.tflite"
tflite_model_quant_file.write_bytes(tflite_quant_model)

index_test = np.arange(y_test.shape[0])
predictions = run_tflite_model(tflite_model_quant_file, x_test, y_test, index_test)
y_pred = predictions
y_true = np.argmax(y_test, axis=-1)

cm = confusion_matrix(y_true, y_pred)  # , labels=[1,0])
plot_confusion_matrix(cm, res_dir, 'confusion_matrix_model_Dyn_testset',
                    categories_digits=["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"])
equal_guesses = (y_pred == y_true).sum()
test_acc_Dyn = (equal_guesses / len(y_true))
print('Test accuracy of Dyn model is: ' + str("{0:.0f}%".format(test_acc_Dyn * 100)))

txt_file_results.write('Test accuracy Dynamic Quantization'+'\n')
txt_file_results.write(str(test_acc_Dyn)+'\n')

**Full Integer Quantization Method: Convert Weights, Inputs, Outputs From Floating Points to Integers**

In [None]:
#$$$$$$$$$$$$$$$$$$$$$$$$
#1. Key image-labels part
#$$$$$$$$$$$$$$$$$$$$$$$$
converter = tf.lite.TFLiteConverter.from_keras_model(model_FullInt)
tflite_model = converter.convert()
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen_KeySamples
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.uint8  # or tf.uint8
converter.inference_output_type = tf.uint8  # or tf.uint8
tflite_quant_model = converter.convert()

interpreter = tf.lite.Interpreter(model_content=tflite_quant_model)
input_type = interpreter.get_input_details()[0]['dtype']
print('input: ', input_type)
output_type = interpreter.get_output_details()[0]['dtype']
print('output: ', output_type)



# Save the unquantized/float model:
tflite_model_file = tflite_models_dir / "modelFullInt_KeySamples.tflite"
tflite_model_file.write_bytes(tflite_model)
# Save the quantized model:
tflite_model_quant_file = tflite_models_dir / "modelFullInt_KeySamples_quantized.tflite"
tflite_model_quant_file.write_bytes(tflite_quant_model)

#compute the watermark recovery performance
predictions = run_tflite_model(tflite_model_quant_file, x_train, y_train, index_imgs)
equal_guesses = (predictions == Y_h).sum()
recovery_WM_FullInt = (equal_guesses / len(Y_h))
print('watermark recovery rate of FullInt model is: ' + str("{0:.0f}%".format(recovery_WM_FullInt* 100)))

txt_file_results.write('Watermark recovery Full Integer Quantization'+'\n')
txt_file_results.write(str(recovery_WM_FullInt)+'\n')


#$$$$$$$$$$$$$$$$$$$$$$$$
#2. Test DataSet Part
#$$$$$$$$$$$$$$$$$$$$$$$$
converter = tf.lite.TFLiteConverter.from_keras_model(model_FullInt)
tflite_model = converter.convert()
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen_TestSet
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.uint8  # or tf.uint8
converter.inference_output_type = tf.uint8  # or tf.uint8
tflite_quant_model = converter.convert()

interpreter = tf.lite.Interpreter(model_content=tflite_quant_model)
input_type = interpreter.get_input_details()[0]['dtype']
print('input: ', input_type)
output_type = interpreter.get_output_details()[0]['dtype']
print('output: ', output_type)


# Save the unquantized/float model:
tflite_model_file = tflite_models_dir / "modelFullInt_TestSet.tflite"
tflite_model_file.write_bytes(tflite_model)
# Save the quantized model:
tflite_model_quant_file = tflite_models_dir / "modelFullInt_TestSet_quantized.tflite"
tflite_model_quant_file.write_bytes(tflite_quant_model)

index_test = np.arange(y_test.shape[0])
predictions = run_tflite_model(tflite_model_quant_file, x_test, y_test, index_test)
y_pred = predictions

y_true = np.argmax(y_test, axis=-1)

cm = confusion_matrix(y_true, y_pred)  # , labels=[1,0])
plot_confusion_matrix(cm, res_dir, 'confusion_matrix_model_FullInt_testset',
                    categories_digits=["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"])
equal_guesses = (y_pred == y_true).sum()
test_acc_FullInt = (equal_guesses / len(y_true))
print('Test accuracy of FullInt model is: ' + str("{0:.0f}%".format(test_acc_FullInt * 100)))

txt_file_results.write('Test accuracy Full Integer Quantization'+'\n')
txt_file_results.write(str(test_acc_FullInt)+'\n')

**Float16 quantization Method: Convert weights to Float16**

In [None]:
#$$$$$$$$$$$$$$$$$$$$$$$$
#1. Key image-labels part
#$$$$$$$$$$$$$$$$$$$$$$$$
converter = tf.lite.TFLiteConverter.from_keras_model(model_Float16)
tflite_model = converter.convert()
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]
converter.representative_dataset = representative_data_gen_KeySamples
tflite_quant_model = converter.convert()

interpreter = tf.lite.Interpreter(model_content=tflite_quant_model)
input_type = interpreter.get_input_details()[0]['dtype']
print('input: ', input_type)
output_type = interpreter.get_output_details()[0]['dtype']
print('output: ', output_type)

# Save the unquantized/float model:
tflite_model_file = tflite_models_dir / "modelFloat16_KeySamples.tflite"
tflite_model_file.write_bytes(tflite_model)
# Save the quantized model:
tflite_model_quant_file = tflite_models_dir / "modelFloat16_KeySamples_quantized.tflite"
tflite_model_quant_file.write_bytes(tflite_quant_model)

predictions = run_tflite_model(tflite_model_quant_file, x_train, y_train, index_imgs)
equal_guesses = (predictions == Y_h).sum()
recovery_WM_Float16 = (equal_guesses / len(Y_h))
print('Watermark recovery of Float16 model is: ' + str("{0:.0f}%".format(recovery_WM_Float16 * 100)))


txt_file_results.write('Watermark recovery Float16 Quantization'+'\n')
txt_file_results.write(str(recovery_WM_Float16)+'\n')

#$$$$$$$$$$$$$$$$$$$$$$$$
#2. Test DataSet Part
#$$$$$$$$$$$$$$$$$$$$$$$$
converter = tf.lite.TFLiteConverter.from_keras_model(model_Float16)
tflite_model = converter.convert()
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]
converter.representative_dataset = representative_data_gen_TestSet
tflite_quant_model = converter.convert()

interpreter = tf.lite.Interpreter(model_content=tflite_quant_model)
input_type = interpreter.get_input_details()[0]['dtype']
print('input: ', input_type)
output_type = interpreter.get_output_details()[0]['dtype']
print('output: ', output_type)


# Save the unquantized/float model:
tflite_model_file = tflite_models_dir / "mnist_model_float16_TestSet.tflite"
tflite_model_file.write_bytes(tflite_model)
# Save the quantized model:
tflite_model_quant_file = tflite_models_dir / "mnist_model_float16_TestSet_quant.tflite"
tflite_model_quant_file.write_bytes(tflite_quant_model)

index_test = np.arange(y_test.shape[0])
predictions = run_tflite_model(tflite_model_quant_file, x_test, y_test, index_test)
y_pred = predictions

y_true = np.argmax(y_test, axis=-1)

cm = confusion_matrix(y_true, y_pred)
plot_confusion_matrix(cm, res_dir, 'confusion_matrix_model_float16_testset',
                    categories_digits=["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"])
equal_guesses = (y_pred == y_true).sum()
test_acc_Float16 = (equal_guesses / len(y_true))
print('Test accuracy of Float16 model is: ' + str("{0:.0f}%".format(test_acc_Float16 * 100)))

txt_file_results.write('Test accuracy Float16 Quantization'+'\n')
txt_file_results.write(str(test_acc_Float16)+'\n')

# Train Benign Model

**Download the dataset, Normalize and OneHot Encoding**

In [None]:
#Download the dataset
if dataset == 'mnist':
    (x_train_all, y_train_all_base), (x_test_all, y_test_all_base) = tf.keras.datasets.mnist.load_data()
elif dataset == 'fashionmnist':
    (x_train_all, y_train_all_base), (x_test_all, y_test_all_base) = tf.keras.datasets.fashion_mnist.load_data()
elif dataset == 'cifar10':
    (x_train_all, y_train_all_base), (x_test_all, y_test_all_base) = tf.keras.datasets.cifar10.load_data()
else:
    (x_train_all, y_train_all_base), (x_test_all, y_test_all_base) = tf.keras.datasets.cifar10.load_data()

#Normalize and OneHot Encoding
x_train_all, y_train_all = Normalize_and_OneHot(x_train_all,y_train_all, nb_classes)
x_test_all, y_test_all = Normalize_and_OneHot(x_test_all,y_test_all, nb_classes)

In [None]:
#Create a new model instance
if dataset == 'mnist':
    model = model_mnist()
elif dataset == 'fashionmnist':
    model = model_fashionmnist()
elif dataset == 'cifar10':
    model = model_cifar10()
else:
    model = model_imagenet()
model.summary()

#create a copy of the initial model before training
_, model_file_init = tempfile.mkstemp('.h5')
tf.keras.models.save_model(model, model_file_init, include_optimizer=False)

#Training Part to Inject the key Image-Label Pairs
model_name_call_back = os.path.join(res_dir, str(dataset) + "-modelBenign-at_{epoch}.h5")

csv_name = os.path.join(res_dir, str(dataset) + "-modelBenign.csv")
csv_logger = CSVLogger(csv_name)

model_checkpoint = tf.keras.callbacks.ModelCheckpoint(model_name_call_back, save_best_only=False, save_freq='epoch',
                                                        period=10), tf.keras.callbacks.TensorBoard(log_dir=logdir, profile_batch=0),

callbacks_training = [model_checkpoint, csv_logger]

#Perform Training
history = model.fit(x_train_all, y_train_all, batch_size=batch_size, callbacks = callbacks_training, epochs=epochs,
                                validation_split=val_split_training)


#Calculate performance metrics on Trained Benign DNN
score_ts = model.evaluate(x_test_all, y_test_all, verbose=0)
txt_file_results.write('Test accuracy of the BENIGN DNN model is'+'\n')
txt_file_results.write(str(score_ts[1])+'\n')

score_tr = model.evaluate(x_train, y_train, verbose=0)
txt_file_results.write('Training accuracy of the BENIGN DNN model is'+'\n')
txt_file_results.write(str(score_tr[1])+'\n')

**close results file**

In [None]:
txt_file_results.close()