In [None]:
import numpy as np
import os

In [None]:
os.environ["CUDA_VISIBLE_DEVICES"]="0"
os.environ['TF_DETERMINISTIC_OPS'] = '1'

In [None]:
from __future__ import print_function
import argparse
from matplotlib import pyplot
from collections import Counter
import json
import random
from collections import defaultdict

In [None]:
import tensorflow as tf
from tensorflow.python.framework.ops import enable_eager_execution
enable_eager_execution()
from tensorflow import keras

In [None]:
from keras.layers import Flatten, Dense, Input
import keras_vggface #need installation
from keras_vggface import utils
from keras_vggface.vggface import VGGFace #need installation

In [None]:
import tensorflow_model_optimization as tfmot

In [None]:
from livelossplot import PlotLossesKeras #need installation

In [None]:
from tensorflow.python.keras import backend

# Preprocess Functions

In [None]:
def preprocess_input(x):
    x_temp = np.copy(x)
    x_temp = x_temp[..., ::-1]
    x_temp[..., 0] -= 91.4953
    x_temp[..., 1] -= 103.8827
    x_temp[..., 2] -= 131.0912
    return x_temp

# Part 1: Create Dataset and Models

In [None]:
H,W= 224, 224 #Input Dimension

In [None]:
def loadimgs(path,n = 0):
    curr_y = n
    person_dict={}
    c=0
    for person in os.listdir(path):
        #print("loading person: "+person)
        person_path = os.path.join(path,person)
        person_images=[]
        for name in os.listdir(person_path):
            image_path=os.path.join(person_path,name)
            pixels = pyplot.imread(image_path)
            image = tf.image.resize(pixels,[H,W]).numpy()
            samples = preprocess_input(image)
            person_images.append(samples)
        person_dict[person]=person_images
    return person_dict

## 1.  Load Data

In [None]:
path="../../../datasets/PubFig/CelebDataProcessed"
x=loadimgs(path)
print("done!")

In [None]:
# label decoder: {label: person_name}
label = 0
label_dic = {}
labels = []
for k,v in x.items():
    label_dic[k] = label
    label += 1
    labels.append(len(v))

inv_map = {v: k for k, v in label_dic.items()}
# with open("../../../datasets/PubFig/identities_decoder.json", "w") as fp:
#     json.dump(inv_map,fp) 

## 2. Split Data

In [None]:
#random shuffle
for k,v in x.items():
    x[k] = np.stack(random.sample(x[k],len(x[k])))

In [None]:
# for test dataset, use 4 - 28 images per person, so we can guarentee there are at least 3 agreed images per person for the DIVA dataset  
# we want the test dataset size be 1164 which is 10% of the whole dataset (11640)
size = 0
train_set = {}
test_set = {}
for k,v in x.items():
    if len(v) == 403: # only one key has this many images
        size += 28
        test_set[k] = v[:28]
        train_set[k] = v[28:]
    elif len(v) >= 80:
        size += 13
        test_set[k] = v[:13]
        train_set[k] = v[13:]
    else:
        size += 4
        test_set[k] = v[:4]
        train_set[k] = v[4:]
print(size)

In [None]:
# create numpy array for train_x
train_x = np.empty((0,H,W,3))
train_y = []
for k,v in train_set.items():
    train_x = np.concatenate((train_x, v), axis=0)
    train_y = train_y + np.full(shape=len(v), fill_value=label_dic[k], dtype=np.int).tolist()

In [None]:
c = list(zip(train_x,train_y))

random.shuffle(c) #Random permute

train_x, train_y = zip(*c)

In [None]:
train_x, train_y = np.array(train_x), np.array(train_y)

In [None]:
# np.save('../../../datasets/PubFig/train_x_10476.npy',train_x)
# np.save('../../../datasets/PubFig/train_y_10476.npy',train_y)

In [None]:
test_x = np.empty((0,H,W,3))
test_y = []
for k,v in test_set.items():
    test_x = np.concatenate((test_x, v), axis=0)
    test_y = test_y + np.full(shape=len(v), fill_value=label_dic[k], dtype=np.int).tolist()

In [None]:
c = list(zip(test_x,test_y))

random.shuffle(c)

test_x,test_y = zip(*c)

In [None]:
test_x, test_y = np.array(test_x), np.array(test_y)

In [None]:
# np.save('../../../datasets/PubFig/test_x_1164.npy',test_x)
# np.save('../../../datasets/PubFig/test_y_1164.npy',test_y)

# Create VGGFACE Model

In [None]:
input = tf.keras.Input(shape=(224, 224, 3))
vgg_model = VGGFace(include_top=False, input_tensor=input,model='resnet50') # its called vgg but it uses resnet50
x = Flatten(name='flatten')(vgg_model.output)
out = Dense(150, activation='softmax', name='classifier')(x)
model = tf.keras.Model(input, out)

## 1. Train FP and Q models

In [None]:
# model.compile(optimizer=tf.keras.optimizers.RMSprop(lr=2e-5),
#             loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
#               metrics=['accuracy'])
# model.fit(x=train_x,y=train_y,epochs =10,validation_data = (test_x,test_y),callbacks=[PlotLossesKeras()])

In [None]:
q_model = tfmot.quantization.keras.quantize_model(model)

In [None]:
# q_model.compile(optimizer=tf.keras.optimizers.RMSprop(lr=2e-5),
#             loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
#               metrics=['accuracy'])
# q_model.fit(x=train_x,y=train_y,epochs =5,validation_data = (test_x,test_y))

In [None]:
# q_model.save('../../../weights/q_model_90_pubface.h5')
# model.save('../../../weights/fp_model_90_pubface.h5')

## 2. Load and Evaluate Models 

In [None]:
test_x = np.load('../../../datasets/PubFig/test_x_1164.npy')
test_y = np.load('../../../datasets/PubFig/test_y_1164.npy')

In [None]:
q_model.load_weights('../../../weights/q_model_90_pubface.h5')
model.load_weights('../../../weights/fp_model_90_pubface.h5')
q_model.trainable =False
model.trainable =False
model.compile(optimizer=tf.keras.optimizers.RMSprop(lr=2e-5),
            loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
              metrics=['accuracy'])
q_model.compile(optimizer=tf.keras.optimizers.RMSprop(lr=2e-5),
            loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
              metrics=['accuracy'])

In [None]:
model.evaluate(test_x,test_y)

In [None]:
q_model.evaluate(test_x,test_y)

## 4. Instability Test

In [None]:
fp_correct = []
q_correct = []
for i in range(0,len(test_y)):
    pred = np.argmax(model.predict(test_x[i][None,...])[0])
    q_pred = np.argmax(q_model.predict(test_x[i][None,...])[0])
    label = test_y[i]
    if pred == label:
        fp_correct.append(i)
    if q_pred == label:
        q_correct.append(i)

In [None]:
quant_correct = len(q_correct )
orig_correct = len(fp_correct)
q_correct_orig_wrong = len(set(q_correct).difference(set(fp_correct)))
q_wrong_orig_correct = len(set(fp_correct).difference(set(q_correct)))
print(quant_correct, orig_correct, q_correct_orig_wrong, q_wrong_orig_correct)

## 2. Convert model to tflite format that can run on aarch64

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(q_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_quant_model = converter.convert()
# with open("../../../weights/tflite_int8_model_90.tflite", 'wb') as f:
#     f.write(tflite_quant_model)

# Create Dataset for DIVA test

In [None]:
index = []# index for the agreed images for DIVA evaluation
fp_y = []
q_y = []
for i in range(0,len(test_y)):
    pred = np.argmax(model.predict(test_x[i][None,...])[0])
    q_pred = np.argmax(q_model.predict(test_x[i][None,...])[0])
    label = test_y[i]
    fp_y.append(pred)
    q_y.append(q_pred)
    if pred == q_pred and pred == label:
        index.append(i)
condidate_x = test_x[np.array(index)]
condidate_y = test_y[np.array(index)]

In [None]:
# np.save('./results/test_fp_1164.npy',np.array(fp_y))
# np.save('./results/test_q_1164.npy',np.array(q_y))

In [None]:
d = defaultdict(int)
dataset_y = []
dataset_x =np.empty((0,224,224,3))
for i in range(0,len(condidate_y)):
    if d[condidate_y[i]] < 3:# 3 images per person
        dataset_x = np.concatenate((dataset_x, condidate_x[i][None,...]), axis=0)
        dataset_y.append(condidate_y[i])
        d[condidate_y[i]] += 1

In [None]:
# np.save('../../../datasets/PubFig/dataset_y_450.npy',np.array(dataset_y))
# np.save('../../../datasets/PubFig/dataset_x_450.npy',dataset_x)

____________________________________________________________________________________________________________________________________

# Part 2: Record Success Rate and Confidence Score for Analysis( results from wb_fr.py or pgd_fr.py)

In [None]:
fail = {1: 'failure/', 0: ''}
wb = {1: 'wb', 0: 'pgd'}
WB = {1: 'WB', 0: 'PGD'}
flag = 0

In [None]:
wb = wb[flag]
fail = fail[flag]
WB = WB[flag]

## 1. Top-1 Evaluation

In [None]:
arrays = {}
for filename in sorted(os.listdir('./results/'+ WB + '/' + fail +'images_second/')):
    if filename.endswith('.npy'):
        if '@' in filename:
            arrays[filename] = (np.load('./results/'+ WB + '/' + fail +'images_second/' + filename),np.load('./results/'+ WB + '/' + fail +'filters_second/' + filename))

In [None]:
imageplusfilter = list(arrays.values())

In [None]:
filenames = list(arrays.keys())

In [None]:
orig_images_deprocess = np.array([(x[0] - x[1]) for x in imageplusfilter ])
ad_images_deprocess = np.array([x[0] for x in imageplusfilter ])
orig_images = np.array([preprocess_input(x[0] - x[1]) for x in imageplusfilter ])
ad_images = np.array([preprocess_input(x[0]) for x in imageplusfilter ])

In [None]:
fp_label = []
orig_score = []
ad_score = []
for i in range(0,len(orig_images)):
    orig_img = backend.constant(orig_images[i])[None,...]
    ad_img = backend.constant(ad_images[i])[None,...]
    fp_label.append(np.argmax(model.predict(orig_img)[0])) # predicted labelfrom the fp_model
    orig_score.append(model.predict(orig_img)[0][fp_label[i]]) # prediction score from the fp_model
    ad_score.append(model.predict(ad_img)[0][fp_label[i]]) # prediction score from the fp_model

In [None]:
fail = {1: '_failure', 0: ''}
fail = fail[flag]

In [None]:
# Record these and compare them with the ones from the tflite model (script in FaceRocognization_Aarch64.ipynb)
# np.save('./results/' + WB + '/' + wb +'_y' + fail + '_v2.npy', np.array(fp_label))
# np.save('./results/' + WB + '/' + wb +'_x' + fail + '_orig_v2.npy', np.array(orig_images_deprocess))
# np.save('./results/' + WB + '/' + wb +'_x' + fail + '_ad_v2.npy', np.array(ad_images_deprocess))
# np.save('./results/' + WB + '/' + wb +'_fp_v2' + fail + '_orig_score.npy', np.array(orig_score))
# np.save('./results/' + WB + '/' + wb +'_fp_v2' + fail + '_ad_score.npy', np.array(ad_score))

## 2. Top-5 Evaluation

In [None]:
arrays = {}
for filename in sorted(os.listdir('./results/'+ WB + '/images_second/')):
    if filename.endswith('.npy'):
        if not '@' in filename:
            arrays[filename] = np.load('./results/'+ WB + '/images_second/' + filename)

In [None]:
success = np.array(list(arrays.values()))
success_ = np.array([preprocess_input(x) for x in success])

In [None]:
q_label = []
fp_label = []
for image in success_:
    img = backend.constant(image)[None,...]
    pgd_fp_label.append(model.predict(img)[0].argsort()[-5:][::-1])

In [None]:
# Record these and compare them with the ones from the tflite model (script in FaceRocognization_Aarch64.ipynb)
# np.save('./results/' + WB + '/' + wb +'_5_y_v2.npy', np.array(fp_label))
# np.save('./results/' + WB + '/' + wb +'_5_x_v2.npy', np.array(success))