In [1]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # suppress tensorflow warnings https://stackoverflow.com/a/40871012
from deepface import DeepFace
import subprocess
import numpy as np
from decimal import Decimal # for proper rounding
import random
import time
import pandas as pd
from datetime import datetime
import struct


# CONSTANTS
EXECUTABLE_PATH = "ABY/build/bin"
INPUT_FILE_NAME = "input_vecs.txt"
OUTPUT_FILE_NAME = "output.txt"
EXECUTABLE_NAME = 'cos_sim_simd_test'
CMD = f"./{EXECUTABLE_NAME} -r 0 -f {INPUT_FILE_NAME} -o {OUTPUT_FILE_NAME} & (./{EXECUTABLE_NAME} -r 1 -f {INPUT_FILE_NAME} 2>&1 > /dev/null)"
ITERATIONS = 1

In [2]:
def get_cos_dist_aby(x, y):
    # write two vectors to a file
    with open(f"{EXECUTABLE_PATH}/{INPUT_FILE_NAME}", 'w') as f:
        for x_i, y_i in zip(x, y):
            f.write(f"{x_i} {y_i}\n")
    
    # execute the ABY cos sim computation
    start = time.time()
    output = subprocess.run(CMD, shell=True, capture_output=True, text=True, cwd=EXECUTABLE_PATH)
    end = time.time()
    assert (output.returncode == 0) # make sure the process executed successfully
    
    # read the result from the output file
    with open(f"{EXECUTABLE_PATH}/{OUTPUT_FILE_NAME}") as f:
        return float(round(Decimal(str(f.read())),6)), end-start
    
def get_cos_dist_numpy(x, y):
    start = time.time()
    result = 1 - np.dot(x, y)/(np.linalg.norm(x)*np.linalg.norm(y))
    end = time.time()
    # print(result)
    # do some magic of casting to str then Decimal then float to round properly
    result = float(round(Decimal(str(result)),6))
    return result, end-start

def get_cos_dist_deepface(img1, img2):
    result = DeepFace.verify(img1_path = img1, img2_path = img2, model_name="SFace", enforce_detection=True, distance_metric="cosine")["distance"]
    return float(round(Decimal(str(result)),6))

def round_to_6(x):
    return float(round(Decimal(str(x)),6))

def get_embedding(imagepath):
    return DeepFace.represent(img_path = imagepath, model_name="SFace", enforce_detection=True)[0]["embedding"]

def get_two_random_images_and_their_embeddings(same_person):
    people = os.listdir('lfw') # list of all people that have images
    people_with_multiple_images = [p for p in people if len(os.listdir(f"lfw/{p}")) > 1] # list of people with more than one image in folder
    embedding1, embedding2 = None, None # face embeddings
    while embedding1 is None or embedding2 is None: # try until the chosen images have detectable faces
        try:
            if same_person:
                # same person should have more than one image (we might still end up choosing the same image of that person with prob 1/n, but that's ok)
                person1 = random.choice(people_with_multiple_images)
                person2 = person1
            else:
                # two persons chosen should be different
                person1 = random.choice(people)
                person2 = random.choice([p for p in people if p != person1])
            # get two random images
            img1 = f"lfw/{person1}/{random.choice(os.listdir(f'lfw/{person1}'))}"
            img2 = f"lfw/{person2}/{random.choice(os.listdir(f'lfw/{person2}'))}"
            # try to extract embeddings from both images
            embedding1 = get_embedding(img1)
            embedding2 = get_embedding(img2)
        except Exception as e:
            # failed to detect faces in images, try again
            # print(e)
            pass
    return img1, embedding1, img2, embedding2

def verify_computation(same_person):
    # get two random images
    img1, img2 = get_two_random_images(same_person)
    print(f"{img1=}, {img2=}")
    # extract embeddings, disregard pictures where the face cannot be detected
    try:
        x = get_embedding(f"lfw/{img1}")
        y = get_embedding(f"lfw/{img2}")
    except ValueError as e:
        print(e)
        return 0
    except Exception as e:
        # print(e)
        return 0
    # print(f"{x=}, {y=}")
    # compute the cosine similarity
    cs = cos_sim(x,y)
    cs_aby = cos_sim_aby(x,y)
    cs_deepface = cos_dist_deepface(img1, img2)
    print(f"{cs=}, {cs_aby=}, {cs_deepface=}")
    print("----------")
    # make sure the results are the same
    assert (cs == cs_aby == cs_deepface)

def run_verification(same_person):
    # get two images and their embeddings to compare
    img1, emb1, img2, emb2 = get_two_random_images_and_their_embeddings(same_person)
    # compute the cos dist with numpy and ABY (with timings)
    cos_dist_numpy, time_numpy = get_cos_dist_numpy(emb1, emb2)
    cos_dist_aby, time_aby = get_cos_dist_aby(emb1, emb2)
    result = DeepFace.verify(img1_path = img1, img2_path = img2, model_name="SFace", enforce_detection=True, distance_metric="cosine")
    cos_dist_df = round_to_6(result["distance"])
    threshold = result["threshold"]
    # verify the results are the same (sanity check against the deepface module in case both numpy and aby are wrong)
    # assert (cos_dist_numpy == cos_dist_aby == cos_dist_df), f"{cos_dist_numpy} == {cos_dist_aby} == {cos_dist_df}"
    aby_ver = cos_dist_aby < threshold
    # put the results in a dataframe according to the scheme in the notebook
    return [same_person,result["verified"],aby_ver,cos_dist_numpy,cos_dist_aby,threshold,img1,img2,time_numpy,time_aby]

def fxor(a, b):
    rtrn = []
    a = struct.pack('d', a)
    b = struct.pack('d', b)
    for ba, bb in zip(a, b):
        rtrn.append(ba ^ bb)
    return struct.unpack('d', bytes(rtrn))[0]

def fxor_list(a, b):
    assert len(a) == len(b)
    rtrn = []
    for i in range(len(a)):
        rtrn.append(fxor(a[i], b[i]))
    return rtrn


In [3]:
img1, x, img2, y = get_two_random_images_and_their_embeddings(False)
get_cos_dist_aby(x, y)

(1.019979, 5.086735725402832)

In [4]:
df = pd.DataFrame(columns=['real_ver','df_ver','aby_ver','df_dist','aby_dist','threshold','img1','img2', 'np_time', 'aby_time'])
dt_string = datetime.now().strftime("%d_%m_%Y_%H_%M_%S")
for _ in range(ITERATIONS):
    # append results of a run with the same person
    df.loc[len(df)] = run_verification(True)
    # run with different people
    df.loc[len(df)] = run_verification(False)
    df.to_csv(f"df_{dt_string}_{ITERATIONS=}")
    print(_)


0


In [5]:
df

Unnamed: 0,real_ver,df_ver,aby_ver,df_dist,aby_dist,threshold,img1,img2,np_time,aby_time
0,True,True,True,0.291786,0.291786,0.593,lfw/Richard_Norton-Taylor/Richard_Norton-Taylo...,lfw/Richard_Norton-Taylor/Richard_Norton-Taylo...,0.000263,5.060364
1,False,False,False,0.832571,0.832571,0.593,lfw/Steve_Mariucci/Steve_Mariucci_0003.jpg,lfw/Jennie_Garth/Jennie_Garth_0001.jpg,0.000205,4.493353


In [6]:
# Simulation of the protocol in plaintext

# y is the base image that should be distributed between Drone and MB, x is the current face captured

img1, x, img2, y = get_two_random_images_and_their_embeddings(False)

# split y into two shares

# create a vector of random nonces
r = [random.uniform(-2.0, 2.0) for _ in range(128)]
# y1 is the nonces
y1 = r.copy()
# y0 is y XOR random nonces
y0 = fxor_list(y, r)


In [None]:
def cos_dist_np_temp(x, y):
    result = 1 - np.dot(x, y)/(np.linalg.norm(x)*np.linalg.norm(y))
    return result

In [None]:
print(get_cos_dist_numpy(x, y))

In [None]:
cos_dist_x_y0 = 1 - np.dot(x, y0)/(np.linalg.norm(x)*np.linalg.norm(y0))
cos_dist_x_y0

In [None]:
r

In [None]:
xor = lambda x,y:(x.view("int64")^y.view("int64")).view("float64")
print(xor(np.array(r),np.array(y)))

In [None]:
# Simulation of the protocol in plaintext on ints

# y is the base image that should be distributed between Drone and MB, f is the current face captured

# Generate two random fake int embeddings
f = np.random.randint(-100,100,128)
y = np.random.randint(-100,100,128)

# split y into share

# y1 is the nonces
y1 = np.random.randint(-100,100,128)
# y0 is y XOR y1
y0 = np.bitwise_xor(y, y1)

# sanity check
y == np.bitwise_xor(y0, y1)

In [None]:
# Drone computes distance between f and y1
drone_dist, t = get_cos_dist_numpy(f, y1)
# MD computes distance between f and y0
md_dist, t = get_cos_dist_numpy(f, y0)
drone_dist, md_dist, fxor(drone_dist, md_dist), get_cos_dist_numpy(f, y)

In [None]:
def fxor(a, b):
    """
    Take two floats and xor their individual bits. Return the created float.
    """
    rtrn = []
    a = struct.pack('d', a)
    b = struct.pack('d', b)
    for ba, bb in zip(a, b):
        rtrn.append(ba ^ bb)
    return struct.unpack('d', bytes(rtrn))[0]

# another way of xoring individual bit of floats using numpy
fxor_np = lambda x,y:(x.view("int64")^y.view("int64")).view("float64")

# just so I get it, a very simplified scenario

x = 4 # captured face
y = 5 # base image
r = 6 # 'random' nonce
y1 = r # one share is the nonce
y0 = y ^ r # second share is y xor r
# take an operation (multiplying) that results in ints as well
a = y1*x
b = y0*x
print(f"This works, {a=}, {b=}, x*y == {x*y}, a^b == {a^b}")
# if intermediate results are floats (say we wanted to compute y/x), then xoring does not make sense
a = y1/x
b = y0/x
print(f"Now {a=} and {b=}, and y/x = {y/x}")
print(f"xoring floats does not make sense: {fxor(a,b)}")
# a^b gives " unsupported operand type(s) for ^: 'float' and 'float' "


In [None]:
# this makes sense beacuse we are using ints
a = 4
b = 5
c = a^b
print(f"{c=}, c^a={c^a}, c^b={c^b}")
print((c+5)^a)

In [None]:
# this does not makes sense, c becomes garbage and thus operations on it do not make sense
a = 0.4
b = 0.5
c = fxor(a, b)
# I can still xor it back to get the original value
print(f"{c=}, f3^a ={fxor(c,a)}, f3^b = {fxor(c,b)}")
print(fxor(c+2,a))