In [1]:
import numpy as np
import pandas as pd
import cv2
import os
import random
import threading


%matplotlib inline
import matplotlib.pyplot as plt

In [0]:
from urllib.request import urlopen
def getData(url,dirname="data",img_shape=(100,100)):
    data = pd.read_csv(url,sep="\t",skiprows=2,header=None,names=['Name','imagenum','url','rect','md5'])
    print(data.shape)
    totalrows=data.shape[0]
    total_personalities = data.Name.nunique()
    current = 0
    if not os.path.exists(dirname): os.mkdir(dirname)
    j=0
    for i in range(data.shape[0]):
        if not os.path.exists(os.path.join(dirname,data.iloc[i].Name)):
            os.mkdir(os.path.join(dirname,data.iloc[i].Name))
            current+=1
            print("{} : {}/{} {:.2f}% done".format(dirname,current,total_personalities,i*100/totalrows))
            j=0
        try:
            resp = urlopen(data.iloc[i].url,timeout=1)
            image = np.asarray(bytearray(resp.read()), dtype="uint8")
            image = cv2.imdecode(image, cv2.COLOR_BGR2GRAY)
            p1,p2,p3,p4 = tuple(map(int,data.iloc[i].rect.split(',')))
            image = image[p2:p4,p1:p3]
            image = cv2.resize(image,img_shape,interpolation = cv2.INTER_AREA)
            plt.imsave(os.path.join(dirname,data.iloc[i].Name,str(j)+'.jpg'),image)
            j+=1
        except:
            pass

In [0]:
data_e = threading.Thread(target = getData, 
                           args = ('http://www.cs.columbia.edu/CAVE/databases/pubfig/download/dev_urls.txt', 'eval'))
data_e.start()

In [0]:
data_d = threading.Thread(target = getData, 
                           args = ('http://www.cs.columbia.edu/CAVE/databases/pubfig/download/eval_urls.txt', 'train'))
data_d.start()

In [0]:
date_d.join()
data_e.join()

(16336, 5)
(42461, 5)
eval : 1/60 1.67% done
train : 1/140 0.71% done
eval : 2/60 3.33% done
eval : 3/60 5.00% done
train : 2/140 1.43% done
eval : 4/60 6.67% done
train : 3/140 2.14% done
eval : 5/60 8.33% done
eval : 6/60 10.00% done
train : 4/140 2.86% done
eval : 7/60 11.67% done
train : 5/140 3.57% done
eval : 8/60 13.33% done
train : 6/140 4.29% done
eval : 9/60 15.00% done
train : 7/140 5.00% done
eval : 10/60 16.67% done
eval : 11/60 18.33% done
eval : 12/60 20.00% done
eval : 13/60 21.67% done
train : 8/140 5.71% done
eval : 14/60 23.33% done
train : 9/140 6.43% done
eval : 15/60 25.00% done
train : 10/140 7.14% done
train : 11/140 7.86% done
eval : 16/60 26.67% done
eval : 17/60 28.33% done
eval : 18/60 30.00% done
train : 12/140 8.57% done
eval : 19/60 31.67% done
eval : 20/60 33.33% done
eval : 21/60 35.00% done
train : 13/140 9.29% done
eval : 22/60 36.67% done
train : 14/140 10.00% done
eval : 23/60 38.33% done
eval : 24/60 40.00% done
train : 15/140 10.71% done
train : 1

In [2]:
def getMiniBatch(batch_size=32,prob=0.5,path = "train"):
    persons = os.listdir(path)
    left = [];right = []
    target = []
    for _ in range(batch_size):
        res = np.random.choice([0,1],p=[1-prob,prob])
        if res==0:
            p1,p2 = tuple(np.random.choice(persons,size=2,replace=False))
            while len(os.listdir(os.path.join(path,p1)))<1 or len(os.listdir(os.path.join(path,p2)))<1:
                p1,p2 = tuple(np.random.choice(persons,size=2,replace=False))
            p1 = os.path.join(path,p1,random.choice(os.listdir(os.path.join(path,p1))))
            p2 = os.path.join(path,p2,random.choice(os.listdir(os.path.join(path,p2))))
            p1,p2 = np.expand_dims(cv2.imread(p1,0),-1),np.expand_dims(cv2.imread(p2,0),-1)
            left.append(p1);right.append(p2)
            target.append(0)
        else:
            p = np.random.choice(persons)
            while len(os.listdir(os.path.join(path,p)))<2:
                p = np.random.choice(persons)
            p1,p2 = tuple(np.random.choice( os.listdir(os.path.join(path,p)), size=2, replace=False ))
            p1,p2 = os.path.join(path,p,p1),os.path.join(path,p,p2)
            p1,p2 = np.expand_dims(cv2.imread(p1,0),-1),np.expand_dims(cv2.imread(p2,0),-1)
            left.append(p1);right.append(p2)
            target.append(1)
    return [np.array(left),np.array(right)],np.array(target)

In [3]:
def test_oneshot(model,N,verbose=0):
    """Test average N way oneshot learning accuracy of a siamese neural net over k one-shot tasks"""
    if verbose:
        pass
        #print("Evaluating model on {} one-shot learning tasks ...".format(N))
    inputs, targets = getMiniBatch(N,path="eval")
    probs = model.predict(inputs)
    output = (np.squeeze(probs)>0.5)*1
    percent_correct = (output==targets).sum()*100/N
    if verbose:
        print("Got an average of {}% {} way one-shot learning accuracy".format(percent_correct,N))
    return percent_correct

In [None]:
from keras.layers import Input, Conv2D, Dense, Flatten,MaxPooling2D
from keras.layers import Lambda, Subtract
from keras.models import Model, Sequential
from keras.regularizers import l2
from keras import backend as K
from keras.optimizers import SGD,Adam
from keras.losses import binary_crossentropy

import numpy as np
import os
import matplotlib.pyplot as plt
from sklearn.utils import shuffle


def W_init(shape,name=None):
    """Initialize weights as in paper"""
    values = np.random.normal(loc=0,scale=1e-2,size=shape)
    return K.variable(values,name=name)

#//TODO: figure out how to initialize layer biases in keras.
def b_init(shape,name=None):
    """Initialize bias as in paper"""
    values = np.random.normal(loc=0.5,scale=1e-2,size=shape)
    return K.variable(values,name=name)

input_shape = (100, 100, 1)
left_input = Input(input_shape)
right_input = Input(input_shape)

#build convnet to use in each siamese 'leg'
convnet = Sequential()
convnet.add(Conv2D(64,(10,10),activation='relu',input_shape=input_shape,
                   kernel_initializer=W_init,kernel_regularizer=l2(2e-4)))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(128,(7,7),activation='relu',
                   kernel_regularizer=l2(2e-4),kernel_initializer=W_init,bias_initializer=b_init))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(128,(4,4),activation='relu',kernel_initializer=W_init,kernel_regularizer=l2(2e-4),bias_initializer=b_init))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(256,(4,4),activation='relu',kernel_initializer=W_init,kernel_regularizer=l2(2e-4),bias_initializer=b_init))
convnet.add(Flatten())
convnet.add(Dense(4096,activation="sigmoid",kernel_regularizer=l2(1e-3),kernel_initializer=W_init,bias_initializer=b_init))

#encode each of the two inputs into a vector with the convnet
encoded_l = convnet(left_input)
encoded_r = convnet(right_input)

#merge two encoded inputs with the l1 distance between them
subtracted = Subtract()( [encoded_l,encoded_r]  )
both = Lambda(lambda x: abs(x))(subtracted)
prediction = Dense(1,activation='sigmoid',bias_initializer=b_init)(both)
siamese_net = Model(inputs=[left_input,right_input],outputs=prediction)

#optimizer = SGD(0.0004,momentum=0.6,nesterov=True,decay=0.0003)

optimizer = Adam(0.00006)
#//TODO: get layerwise learning rates and momentum annealing scheme described in paperworking
siamese_net.compile(loss="binary_crossentropy",optimizer=optimizer)

siamese_net.count_params()

Using TensorFlow backend.


27417409

In [0]:
evaluate_every = 7000
loss_every = 500
batch_size = 32
N = 1000
best = 0
loss_history = []
for i in range(0,900000):
    (inputs,targets)= getMiniBatch(batch_size,path="train")
    loss=siamese_net.train_on_batch(inputs,targets)
    loss_history.append(loss)
    if i % loss_every == 0:
        vloss = siamese_net.test_on_batch(*getMiniBatch(batch_size,path="eval"))
        print("iteration {}, training loss: {:.7f}, validation loss : {:.7f}".format(i,np.mean(loss_history),vloss))
        loss_history.clear()
        val_acc = test_oneshot(siamese_net,N,verbose=True)
        if val_acc >= best:
            print("saving")
            siamese_net.save('saved_best')
            best=val_acc

iteration 0, training loss: 3.4481633, validation loss : 3.4017541
Got an average of 49.2% 1000 way one-shot learning accuracy
saving
iteration 500, training loss: 1.8093039, validation loss : 1.1579119
Got an average of 62.7% 1000 way one-shot learning accuracy
saving
iteration 1000, training loss: 0.9088915, validation loss : 0.8754915
Got an average of 66.1% 1000 way one-shot learning accuracy
saving
iteration 1500, training loss: 0.7204882, validation loss : 0.7068923
Got an average of 67.6% 1000 way one-shot learning accuracy
saving
train : 54/140 38.57% done
iteration 2000, training loss: 0.6333932, validation loss : 0.7930869
Got an average of 68.8% 1000 way one-shot learning accuracy
saving
iteration 2500, training loss: 0.5876395, validation loss : 0.7113198
Got an average of 68.9% 1000 way one-shot learning accuracy
saving
iteration 3000, training loss: 0.5528495, validation loss : 0.5960521
Got an average of 71.5% 1000 way one-shot learning accuracy
saving
iteration 3500, tr

In [None]:
val_acc = None
while val_acc==None: 
    try:
        siamese_net.load_weights("saved_best")
        val_acc = test_oneshot(siamese_net,1000,verbose=True)
        print("Accuracy: {}".format(val_acc))
    except:
        pass

In [None]:
#haarcascade_frontalface_default.xml is saved model for face detection
faceCascade = cv2.CascadeClassifier("haarcascade_frontalface_default.xml")
def giveAllFaces(image,BGR_input=True,BGR_output=False):
    """
      return GRAY cropped_face,x,y,w,h 
    """
    gray = image.copy()
    if BGR_input:
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    faces = faceCascade.detectMultiScale(
        gray,
        scaleFactor=1.3,
        minNeighbors=3,
        minSize=(30, 30)
    )
    if BGR_output:
        for (x, y, w, h) in faces:
            yield image[y:y+h,x:x+w,:],x,y,w,h
    else:
        for (x, y, w, h) in faces:
            yield gray[y:y+h,x:x+w],x,y,w,h

#to draw rectangle
#for (_,x, y, w, h) in giveAllFaces(image):
#    cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2)

import math
def test(path="sample/tbbt.jpg"):
    image = cv2.imread(path)
    faces= [ cv2.resize(face,(100,100),interpolation = cv2.INTER_AREA) for face,_,_,_,_ in giveAllFaces(image,BGR_output=True)]
    print("Total Faces Detected: {}".format(len(faces)))
    t = math.ceil(len(faces)/2)
    i,one = 0,[]
    while i<t:
        one.append(faces[i]);i+=1
    two = one.copy()
    while i<len(faces):
        two[i-t] = faces[i];i+=1
    plt.imshow(np.vstack([np.hstack(one),np.hstack(two)]))

test() #other options - got.jpg, friends.jpg

In [None]:
def putBoxText(image,x,y,w,h,text="unknown"):
    font = cv2.FONT_HERSHEY_SIMPLEX
    cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2)
    cv2.putText(image,text, (x,y-6), font, 1, (0, 255, 0), 2, cv2.LINE_AA)

In [None]:
def putCharacters(image,db="database"):
    dbs = os.listdir(db)
    right = np.array([ np.expand_dims(cv2.imread(os.path.join(db,x),0),-1) for x in dbs ])
    names = [ os.path.splitext(x)[0] for x in dbs ]
    for face,x,y,w,h in giveAllFaces(image):
        face = cv2.resize(face,(100,100),interpolation = cv2.INTER_AREA)
        face = np.expand_dims(face,-1)
        left = np.array([face for _ in range(len(dbs))])
        probs = np.squeeze(siamese_net.predict([left,right]))
        index = np.argmax(probs)
        prob = probs[index]
        name = "Unknown"
        if prob>0.5:
            name = names[index]
        putBoxText(image,x,y,w,h,text=name+"({:.2f})".format(prob))

In [None]:
im = cv2.imread('myself.jpg',1)
putCharacters(im)
plt.imshow(im)