# Milk's Big Subprint Experiment

### Experiment Variables

In [1]:
#Experiment denotion
EXP_DESC = {1:"DMP",2:"Sub",3:"Nov"}
EXP_NUM = 1     #denotes which experiment is on [1,2,3]

#Datasets
DATASET_LIST=["CAPACITIVE","OPTICAL","NIST"]
DATASET="CAPACITIVE"

#FMR denotation
FMR_DESC = {"CAPACITIVE":{1.0:35, 0.1:50, 0.01:65}, "OPTICAL":{1.0:18, 0.1:30, 0.01:40}}
FMR = 1.0    #denotes which FMR using

#Train/Test
TT_DESC=["train","test","full"]
TT_SET="train"   #denotes which set evaluating

#Generator
GENERATOR = None   #set it using variables later

#Classifier
CLASS_TYPES=["Verifinger","MLC"]
CLASSIFIER="Verifinger"



### General Variables

In [2]:
CMA_ITER = 59      #maximum number of iterations to run for CMA-ES objective evolver (X)
TRIALS = 10         #Number of experiment trials (DMP) (Y)
ARC_ITER = 10     #Maximum size of the archive (Nov and Sub) (Z)

---

## Function Definitions

#### Imports

In [3]:
# get imports
import time
import math
import os
import sys
import random

import numpy as np
import keras
import tensorflow as tf
from keras import layers

from keras.preprocessing.image import img_to_array, ImageDataGenerator
from PIL import Image

import cma
sys.path.insert(1, 'PATH/TO/VERIFINGER/SDK/') #insert path to Verifinger SDK
import sub_verifinger as sv

from tqdm import tqdm

import matplotlib.pyplot as plt
%matplotlib inline


os.environ["CUDA_VISIBLE_DEVICES"]="-1"   #define gpu

#### Datasets

In [4]:
#import datasets
#define datatsets
def load_capacitive_data(dirname = "../<path to capacity fingerprint dataset>"):
    imgs_names = os.listdir(dirname)
    
    #Prep Images
    X = []
    Y = []
    with tqdm(total=len(imgs_names)) as pbar:
        for i in imgs_names:
            if(i.endswith('.BMP')):
                im = Image.open(dirname + i)
                im = im.crop((8, 8, 136, 136))
                im = im.convert('L')
                im = img_to_array(im)
                im = (255 - im) / 255.0    #normalize [0,1]
                #im = (127.5-im) / 127.5   #normalize [-1,1]


                Y.append(i)
                X.append(im)
            pbar.update(1)
    
    X_train = np.array(X)
    Y_train = np.array(Y)
    
    return X_train, Y_train

def load_optical_data(dirname = "<path to optical fingerprint dataset>"):
    imgs_names = os.listdir(dirname)
    
    #Prep Images
    X = []
    Y = []
    with tqdm(total=len(imgs_names)) as pbar:
        for i in imgs_names:
            if(i.endswith('.tif')):
                im = Image.open(dirname + i)
                im = im.convert('L')
                im = img_to_array(im)
                im = (255 - im) / 255.0    #normalize [0,1]
                #im = (127.5-im) / 127.5   #normalize [-1,1]

                Y.append(i)
                X.append(im)
            pbar.update(1)
    
    X_train = np.array(X)
    Y_train = np.array(Y)
    
    return X_train, Y_train

def load_nist_data(s = 256, dirname = "<path to nist fingerprint dataset>", randcrop=True, rcs=128):
    imgs_names = os.listdir(dirname)
    
    #Prep Images
    X = []
    Y = []
    with tqdm(total=len(imgs_names)) as pbar:
        for i in imgs_names:
            im = Image.open(dirname + i)
            if(im.size[0] >= s and im.size[1] > s):
                Y.append(i)
                xi = resize(im,s)
                if randcrop:
                    xi = random_crop(np.expand_dims(xi,axis=0),rcs)[0]
                X.append(xi)
            pbar.update(1)
    
    X_train = np.array(X)
    Y_train = np.array(Y)
    
    return X_train, Y_train

def random_crop(img,crop_size):
    x, y = (np.random.randint(0, img.shape[1] - crop_size) for i in range(2))
    return img[:, x:x + crop_size, y:y + crop_size]

#resize
def resize(img, s):
    if(img.size[0] > img.size[1]):
        h = s
        ratio = h/float(img.size[1])
        w = int(img.size[0]*ratio)
        img = img.resize((w,h), Image.ANTIALIAS)
        
        border = int((w - s)/2)
        img = img.crop((border, 0, border + s, h))
    else:
        w = s
        ratio = w/float(img.size[0])
        h = int(img.size[1]*ratio)
        img = img.resize((w,h), Image.ANTIALIAS)
        
        border = int((h - s)/2)
        img = img.crop((0, border, w, border + s))
        
    img = img.convert('L')    
    #img = preK.img_to_array(img, 'tf')
    img = img_to_array(img)
    img = (255 - img) / 255.0    #normalize [0,1]
    #img = (127.5-img) / 127.5   #normalize [-1,1]
    
    return img

In [5]:
#gets a X,y fingerprint dataset and print dictionary with users as keys and array of prints as values
def importDataset():
    X_train=[]
    y_train=[]
    y_clean=[]
    
    if DATASET == "CAPACITIVE":
        X_train, y_train = load_capacitive_data()
        y_clean = list(map(lambda x: int(x.split("_")[1])-1,y_train))
    elif DATASET == "OPTICAL":
        X_train, y_train = load_optical_data()
        y_clean = list(map(lambda x: int(x.split("_")[0]),y_train))
    elif DATASET == "NIST":
        X_train, y_train = load_nist_data()
        y_clean = y_train
    
    print_dict = {}
    for i in range(len(X_train)):
        if y_clean[i] not in print_dict:
            print_dict[y_clean[i]] = []
        print_dict[y_clean[i]].append(X_train[i])
        
    return X_train, y_clean, print_dict

### Generators

In [6]:
GENERATOR = None

#import the variational autoencoder
def import_VAE(dataset,vae_size):
    return keras.models.load_model(f"autoencoder_models/print_{dataset}_var_decoder-{vae_size}.h5")

#import the wasserstein-gan
def import_WGAN(dataset,epoch_num):
    #return keras.models.load_model(f"gan_models/wgan_{dataset}_{epoch_num}_gen.h5")
    return import_VAE(dataset, epoch_num)    #use VAE for now (epoch_num == vae_size)

#create a single sample using the generator given an input latent vector
def generateSample(x):
    xs = np.array(x)
    if(len(xs.shape)==1):   #assume 1d
        xs = np.expand_dims(xs,axis=0)
        
    return np.array(GENERATOR(xs,training=False)[0])

### Classifiers

In [7]:
import sub_verifinger as sv  #import verifinger wrapper
MLC_MODEL=None               #set MLC model to call later

#import Multi Label Classifier model
def setMLC(dataset):
    if dataset=='CAPACITIVE':
        return keras.models.load_model('multiclass_models/print_multiclassifier_720.h5')
    #add other models here
    
#get user classification from a specific model type
#returns an array of ints corresponding to the user id (starting from 0)
def getUserPredictions(img,model,dataset,threshold,t_set='full',name='temp'):
    #check the input image formatting
    img_m=np.array(img)
    if len(img_m.shape) == 2:   # assume shape is [#,#]
        img_m=np.expand_dims(img_m,axis=2)
    if len(img_m.shape) == 3:   # assume shape is [#,#,1]
        img_m=np.expand_dims(img_m,axis=0)
    
    #get predictions straight from the mlc model
    if model=='MLC':
        thresh_p = threshold/100.0
        pred = MLC_MODEL.predict(img_m)[0]
        users = np.where(pred >= thresh_p)[0]
        return users
    
    #verifinger calls a wrapper to get user results
    elif model=='Verifinger':
        users = sv.usersMatched(img_m, dataset.lower(),threshold,name,t_set)
        return [int(u) for u in users]
        
    #no other model found
    else:
        print(f"No model [ {model} ] found")
        return []

### CMA Fitness functions

the part that actually runs the CMA-ES


In [8]:
CURRENT_USERS = list(range(720))   #use defaults for now
NUM_USERS = 720

NOVELTY_ARCHIVE = []
#get novelty score for a vector based on novelty set
def novelScore(v,novSet):
    #get raw distance from origin 
    if len(novSet) == 0:
        tv = np.zeros(len(v))
        return np.linalg.norm(np.array(tv)-np.array(v))
    
    #get distance to closest vector
    else:
        d = []
        for nv in novSet:
            d.append(np.linalg.norm(np.array(nv)-np.array(v)))
        return min(d)
        

#remember, decrease fitness value for cma
# FMR is set
def fitness(x):
    gen_print = generateSample(x)
    #plt.imshow(gen_print,cmap='binary')
    
    fmr_val = FMR_DESC[DATASET][FMR]
    print_name = f"print_{DATASET}_{fmr_val}-exp_{EXP_NUM}"
    usersFound = getUserPredictions(gen_print,CLASSIFIER,DATASET,fmr_val,TT_SET,print_name)
    
    #normal comparison
    if(EXP_NUM in [1,2]):
        rem_usersFound = 0
        for u in usersFound:
            if u in CURRENT_USERS:       #reward for finding new users
                rem_usersFound += 1

        #return number of users found 
        # (CMA) decrease fitness value for cma
        f = 1.0-(rem_usersFound/len(CURRENT_USERS))
        return f
    #novelty comparison
    else:
        zv = np.zeros(NUM_USERS)
        for u in usersFound:
            zv[u] = 1
        return 1.0/novelScore(zv,NOVELTY_ARCHIVE)

### Archive handling

In [9]:
#converts the user list to a binary value for storage within the archive
def userlist2Bin(classUserList,numUsers):
    bstr = np.zeros(numUsers)
    for u in classUserList:
        bstr[u] = 1
    return "".join([str(int(i)) for i in bstr])

#convert a binary number to the user list
def bin2Userlist(bstr):
    ulist=[]
    for i,b in enumerate(bstr):
        if int(b) == 1:
            ulist.append(i)
    return ulist

In [10]:
#create default path and file output path for the archive based on parameters
def makeDefaultFilename(i=''):
    path = "sp-exp_archive-out"
    
    #check if directory exists - create it if not
    if not os.path.exists(path):
        os.mkdir(path)
        
    #default filename -> experiment #, ttype, fmr, dataset, classifier, [CMA_ITER,TRIALS,ARC_ITER]
    extra = f"__{i}" if i != '' else ''
    fname=f"[SPEXP-ARCHIVE]_expnum-{EXP_NUM}_ttype-{TT_SET}_fmr-{FMR}_dataset-{DATASET}_classifier-{CLASSIFIER}__[{CMA_ITER}_{TRIALS}_{ARC_ITER}]{extra}.txt"

    return path, fname


#export the entire archive text file
def exportFullArchiveTxt(arc,path,fname):
    #exports each entry in the following form:
    #binary#:x
    with open(os.path.join(path,fname),'w+') as f:
        for k,v in arc.items():
            f.write(f"{k}:{','.join([str(r) for r in v])}\n")  
    
    return fname

#add single entry to archive text file
def addArchiveTxt(entry,fpath):
     with open(fpath,'w+') as f:
        f.write(f"{entry[0]}:{','.join([str(r) for r in entry[1]])}\n")
       
    
#add new line for archive output to differentiate between trials
def newTrialArchiveTxt(fpath):
    with open(fpath,'w+') as f:
        f.write("~ ~ ~")
    
#import the contents of a txt file to an archive dictionary
def importArchiveTxt(fpath):
    all_arcs = []
    temp_arc = {}
    
    #open, read, and close file
    f = open(fpath, 'r')
    entries = f.readlines()
    f.close()
    
    #add each line as an entry to the archive
    for e in entries:
        if e=="~ ~ ~":
            all_arcs.append(temp_arc)
            temp_arc={}
            continue
            
        ep = e.split(":")
        binnum = ep[0]
        x = ep[1]
        
        temp_arc[binnum]=x
        
    return all_arcs
    


In [11]:
#calculate the coverage of users by the archive
def calcArcCoverage(arc,numUsers):
    usersCovered = np.zeros(numUsers)
    for c in arc.keys():
        b = [int(z) for z in c]   #convert to bitwise array (assume len==numUsers)
        
        #bitwise operation rewrite
        for i in range(numUsers):
            usersCovered[i] = (int(usersCovered[i]) | b[i])
    
    #return percentage of users found
    return sum(usersCovered)/numUsers
        

#generate a report for the experiment based on the coverage and parameters
def archiveCoverageReport(coverArr,fname):
    #save parameters
    rep = {}
    rep["EXPERIMENT"] = EXP_DESC[EXP_NUM]
    rep["DATASET"] = DATASET
    rep["FMR"] = FMR
    rep["TTYPE"] = TT_SET
    rep["CLASSIFIER"] = CLASSIFIER
    
    #save full coverage array (sorted from greatest to least)
    rep["COVERAGE"] = sorted(coverArr,reverse=True)
    
    #print to a file
    with open(fname, 'a+') as f:
        for k,v in rep.items():
            f.write(f"{k}:{v}\n")
        f.write("\n\n")
    

Experiment runner

In [12]:
#actually build the archive based on the parameters specified
def runExp():
    global CURRENT_USERS
    global NUM_USERS
    global NOVELTY_ARCHIVE
    
    # -- DEEP MASTERPRINTS EXPERIMENT -- #
    if EXP_NUM == 1:
        
        #set number of users and initial userset
        if DATASET == "CAPACITIVE":
            NUM_USERS = 720
        else:
            NUM_USERS = 720    #change to something else later
        CURRENT_USERS = list(range(NUM_USERS))    
        
        #intialize coverage array
        COVERAGE_ARR = []
        
        #delete old file if it exists
        p,f = makeDefaultFilename()
        if(os.path.exists(os.path.join(p,f))):
            os.remove(os.path.join(o,f))
        
        for t in range(TRIALS):
            print(f"---  TRIAL #{t} ---")
            #run cma-es
            es = cma.CMAEvolutionStrategy(np.random.normal(0,1,100).tolist(), 1, {'maxiter':CMA_ITER})  
            es.optimize(fitness)   #fitness function is defined above
            z = es.result[0]    # save the best solution
            
            #get user list and binary number
            fmr_val = FMR_DESC[DATASET][FMR]
            print_name = f"print_{DATASET}_{fmr_val}-exp_{EXP_NUM}"
            gen_print = generateSample(z)
            usersFound = getUserPredictions(gen_print,CLASSIFIER,DATASET,fmr_val,TT_SET,print_name)
            
            user_binstr = userlist2Bin(usersFound,NUM_USERS)
            fake_arc={user_binstr:z}
            
            #export to file
            addArchiveTxt(ARCHIVE,os.path.join(p,f))
             
            #save coverage (single item in archive)
            cov = calcArcCoverage(fake_arc,NUM_USERS)
            COVERAGE_ARR.append(cov)
        
            newTrialArchiveTxt(os.path.join(p,f))
        
        #export report
        cov_d="sp-exp_coverage-out/"
        if not os.path.exists(cov_d):
            os.mkdir(cov_d)
        cov_report_filename = f"[SPEXP-coverage]_report_EXP-{EXP_NUM}.txt"
        archiveCoverageReport(COVERAGE_ARR,os.path.join(cov_d,cov_report_filename))
            
            
    # -- SUBPRINT EXPERIMENT -- # .  diversity master prints
    elif EXP_NUM == 2:
        #intialize coverage array
        COVERAGE_ARR = []
        
        for t in range(TRIALS):
            print(f"---------     TRIAL #{t+1}    ----------")
            
            ARCHIVE = {}  #initialize empty archive
            NOVELTY_ARCHIVE = []  #initialize empty novelty user archive

            #set number of users and initial userset
            if DATASET == "CAPACITIVE":
                NUM_USERS = 720
            else:
                NUM_USERS = 720    #change to something else later
            CURRENT_USERS = list(range(NUM_USERS))    

            for AI in range(ARC_ITER):
                print(f"*** Archive iteration {AI+1} / {ARC_ITER} -- USERS: [{len(CURRENT_USERS)} / {NUM_USERS}] ***")
                #run cma-es
                es = cma.CMAEvolutionStrategy(np.random.normal(0,1,100).tolist(), 1, {'maxiter':CMA_ITER})
                es.optimize(fitness)
                z = es.result[0]

                #get user list and binary number
                fmr_val = FMR_DESC[DATASET][FMR]
                print_name = f"print_{DATASET}_{fmr_val}-exp_{EXP_NUM}"
                gen_print = generateSample(z)
                usersFound = getUserPredictions(gen_print,CLASSIFIER,DATASET,fmr_val,TT_SET,print_name)

                #SUBPRINTS - REMOVE USERS FOUND
                for u in usersFound:
                    if u in CURRENT_USERS:
                        CURRENT_USERS.remove(u)

                #add to lv and classification bin string to archive
                user_binstr = userlist2Bin(usersFound,NUM_USERS)
                ARCHIVE[user_binstr] = z

                #export entry to file
                p,f = makeDefaultFilename("")
                addArchiveTxt(ARCHIVE,os.path.join(p,f))

            #save coverage (single item in archive)
            cov = calcArcCoverage(ARCHIVE,NUM_USERS)
            COVERAGE_ARR.append(cov)
            
            newTrialArchiveTxt(os.path.join(p,f))
        
        #export report
        cov_d="sp-exp_coverage-out/"
        if not os.path.exists(cov_d):
            os.mkdir(cov_d)
        cov_report_filename = f"[SPEXP-coverage]_report_EXP-{EXP_NUM}.txt"
        archiveCoverageReport(COVERAGE_ARR,os.path.join(cov_d,cov_report_filename))
        
        
        
    # -- NOVELTY EXPERIMENT -- #
    elif EXP_NUM == 3:
        #intialize coverage array
        COVERAGE_ARR = []
        
        for t in range(TRIALS):
            print(f"---------     TRIAL #{t+1}    ----------")
            
            ARCHIVE = {}  #initialize empty archive

            #set number of users and initial userset
            if DATASET == "CAPACITIVE":
                NUM_USERS = 720
            else:
                NUM_USERS = 720    #change to something else later
            CURRENT_USERS = list(range(NUM_USERS))    

            for AI in range(ARC_ITER):
                print(f"*** Archive iteration {AI+1} / {ARC_ITER} -- USERS: [{len(CURRENT_USERS)} / {NUM_USERS}] ***")
                #run cma-es
                es = cma.CMAEvolutionStrategy(np.random.normal(0,1,100).tolist(), 1, {'maxiter':CMA_ITER})
                es.optimize(fitness)
                z = es.result[0]

                #get user list and binary number
                fmr_val = FMR_DESC[DATASET][FMR]
                print_name = f"print_{DATASET}_{fmr_val}-exp_{EXP_NUM}"
                gen_print = generateSample(z)
                usersFound = getUserPredictions(gen_print,CLASSIFIER,DATASET,fmr_val,TT_SET,print_name)

                #SUBPRINTS - REMOVE USERS FOUND
                for u in usersFound:
                    if u in CURRENT_USERS:
                        CURRENT_USERS.remove(u)

                #add to lv and classification bin string to archive
                user_binstr = userlist2Bin(usersFound,NUM_USERS)
                ARCHIVE[user_binstr] = z
                
                #add user list to novelty archive
                zv = np.zeros(NUM_USERS)
                for u in usersFound:
                    zv[u] = 1
                NOVELTY_ARCHIVE.append(zv)

                #export entry to file
                p,f = makeDefaultFilename("")
                addArchiveTxt(ARCHIVE,os.path.join(p,f))

            #save coverage (single item in archive)
            cov = calcArcCoverage(ARCHIVE,NUM_USERS)
            COVERAGE_ARR.append(cov)
            
            newTrialArchiveTxt(os.path.join(p,f))
        
        #export report
        cov_d="sp-exp_coverage-out/"
        if not os.path.exists(cov_d):
            os.mkdir(cov_d)
        cov_report_filename = f"[SPEXP-coverage]_report_EXP-{EXP_NUM}.txt"
        archiveCoverageReport(COVERAGE_ARR,os.path.join(cov_d,cov_report_filename))
        

---

## RUN EXPERIMENTS

In [None]:
#Experiment denotion
EXPs = [1,2,3]
EXP_NUM = 1     #denotes which experiment is on [1,2,3]

#Datasets
DATASETs = ["CAPACITIVE"]
DATASET="CAPACITIVE"

#FMR denotation
FMRs = [1.0, 0.1, 0.01]
FMR = 1.0    #denotes which FMR using

#Train/Test
TTs=["train"]
TT_SET="train"   #denotes which set evaluating

#Classifier
CLASSs=["MLC"]
CLASSIFIER="MLC"

#do the thing
for c in CLASSs:
    for d in DATASETs:
        #set dataset
        X_train, y_clean, print_dict = importDataset()
         
        #set classifier (if needed)
        if CLASSIFIER == "MLC":
            MLC_MODEL = setMLC(DATASET)
        
        for t in TTs:
            for f in FMRs:
                for e in EXPs:
                    
                    #set parameters
                    EXP_NUM = e
                    FMR = f
                    TT_SET = t
                    DATASET = d
                    CLASSIFIER = c
                    
                    print(f"-- EXPERIMENT: {EXP_NUM}-- ")
                    print(f"FMR: {FMR}")
                    print(f"T SET: {TT_SET}")
                    print(f"DATASET: {DATASET}")
                    print(f"CLASSIFIER: {CLASSIFIER}")
                    print("")
                    
                    
                    #set generator
                    GENERATOR = import_VAE(DATASET,32)
                    
                    #run it!
                    runExp()
