In [1]:
import numpy as np
import pandas as pd
import cv2
import os
import random
import tensorflow as tf
from keras.layers import Input, Conv2D, Dense, Flatten,MaxPooling2D, Dropout
from keras.layers import Lambda, Subtract
from keras.models import Model, Sequential, load_model
from keras.regularizers import l2
from keras import backend as K
from keras.optimizers import SGD, Adam
from keras.losses import binary_crossentropy

from sklearn.utils import shuffle
import warnings
warnings.filterwarnings(action='ignore')

In [2]:
class Config():
    training_dir = "D:/data/training/"
    testing_dir = "D:/data/testing/"
    validation_dir = "D:/data/validation/"
    train_batch_size = 64
    train_number_epochs = 500

In [3]:
def getMiniBatch(batch_size=Config.train_batch_size, path=Config.training_dir, prob=0.5):
    persons = os.listdir(path)
    left, right, target = [], [], []
    for _ in range(batch_size):
        # 일치 여부
        res = np.random.choice([0, 1], p=[1-prob, prob])
        # 불일치
        if res == 0:
            p1, p2 = tuple(np.random.choice(persons, size=2, replace=False))
            # 폴더 내 사진이 없으면 다른 폴더 탐색
            while len(os.listdir(os.path.join(path,p1))) < 1 or len(os.listdir(os.path.join(path,p2))) < 1:
                p1, p2 = tuple(np.random.choice(persons, size=2, replace=False))
            # 사진 선택
            p1 = os.path.join(path, p1, random.choice(os.listdir(os.path.join(path,p1))))
            p2 = os.path.join(path, p2, random.choice(os.listdir(os.path.join(path,p2))))
            # 벡터화
            p1 = np.expand_dims(cv2.resize(cv2.imread(p1,0), (150,150)), -1)
            p2 = np.expand_dims(cv2.resize(cv2.imread(p2,0), (150,150)), -1)
            
            left.append(p1)
            right.append(p2)
            target.append(0)
        # 일치
        else:
            p = np.random.choice(persons)
            # 폴더 내에 2장의 사진이 없으면 다른 폴더 탐색
            while len(os.listdir(os.path.join(path, p))) < 2:
                p = np.random.choice(persons)
            # 중복없이 2장의 사진 선택
            p1, p2 = tuple(np.random.choice(os.listdir( os.path.join(path, p) ), 
                                            size=2, 
                                            replace=False ))
            p1 = os.path.join(path, p, p1)
            p2 = os.path.join(path, p, p2)

            # 벡터화
            p1 = np.expand_dims(cv2.resize(cv2.imread(p1,0), (150,150)), -1)
            p2 = np.expand_dims(cv2.resize(cv2.imread(p2,0), (150,150)), -1)

            left.append(p1)
            right.append(p2)
            target.append(1)

    return [np.array(left), np.array(right)], np.array(target)

In [4]:
(inputs, targets) = getMiniBatch(batch_size=Config.train_batch_size, path=Config.training_dir)

In [5]:
inputs[0].shape

(64, 150, 150, 1)

In [6]:
def test_oneshot(model, N, path=Config.testing_dir, verbose=0):
    """Test average N way oneshot learning accuracy of a siamese neural net over k one-shot tasks"""
    if verbose:
        pass
        #print("Evaluating model on {} one-shot learning tasks ...".format(N))
    inputs, targets = getMiniBatch(N, path=path)
    
    # 모델 예측(이진분류)
    probs = model.predict(inputs)
    output = (np.squeeze(probs) > 0.5)*1
    percent_correct = (output==targets).sum()*100 / N
    if verbose:
        print("Got an average of {}% {} way one-shot learning accuracy".format(percent_correct, N))
    return percent_correct

In [7]:
def W_init(shape, dtype=None):
    """Initialize weights as in paper"""
    values = np.random.normal(loc=0, scale=1e-2, size=shape)
    return K.variable(values, dtype=None)

def b_init(shape, dtype=None):
    """Initialize bias as in paper"""
    values = np.random.normal(loc=0.5, scale=1e-2, size=shape)
    return K.variable(values, dtype=None)

In [8]:
input_shape = (150, 150, 1)
left_input = Input(input_shape)
right_input = Input(input_shape)

#build convnet to use in each siamese 'leg'
convnet = Sequential()
convnet.add(Conv2D(64, (3,3), activation='relu', input_shape=input_shape, kernel_regularizer=l2(2e-4),
                   kernel_initializer=W_init))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(64, (3,3), activation='relu', kernel_regularizer=l2(2e-4), 
                   kernel_initializer=W_init, bias_initializer=b_init))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(128, (3,3), activation='relu', kernel_regularizer=l2(2e-4),
                   kernel_initializer=W_init, bias_initializer=b_init))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(256, (3,3), activation='relu', kernel_regularizer=l2(2e-4),
                   kernel_initializer=W_init, bias_initializer=b_init))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(512, (3,3), activation='relu', kernel_regularizer=l2(2e-4),
                   kernel_initializer=W_init, bias_initializer=b_init))
convnet.add(Flatten())
convnet.add(Dense(4096, activation="relu", kernel_regularizer=l2(1e-3),
                  kernel_initializer=W_init, bias_initializer=b_init))

#encode each of the two inputs into a vector with the convnet
encoded_l = convnet(left_input)
encoded_r = convnet(right_input)

#merge two encoded inputs with the l1 distance between them
subtracted = Subtract()([encoded_l, encoded_r])
both = Lambda(lambda x: abs(x))(subtracted)
prediction = Dense(1, activation='sigmoid', bias_initializer=b_init)(both)
siamese_net = Model(inputs=[left_input, right_input], 
                    outputs=prediction)

#optimizer = SGD(0.0004,momentum=0.6,nesterov=True,decay=0.0003)
optimizer = Adam(0.0005)

#get layerwise learning rates and momentum annealing scheme described in paperworking
siamese_net.compile(loss="binary_crossentropy", optimizer=optimizer)

siamese_net.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 150, 150, 1  0           []                               
                                )]                                                                
                                                                                                  
 input_2 (InputLayer)           [(None, 150, 150, 1  0           []                               
                                )]                                                                
                                                                                                  
 sequential (Sequential)        (None, 4096)         54019648    ['input_1[0][0]',                
                                                                  'input_2[0][0]']            

In [10]:
# 모델학습
epoch = Config.train_number_epochs
loss_every = 50
batch_size = Config.train_batch_size
N = 309
best = 0
loss_history = []
for i in range(epoch):
    (inputs, targets) = getMiniBatch(batch_size, path=Config.training_dir)
    # print(inputs.shape)
    loss = siamese_net.train_on_batch(inputs, targets)
    loss_history.append(loss)
    if (i+1) % loss_every == 0:
        val_loss = siamese_net.test_on_batch(*getMiniBatch(batch_size, path=Config.validation_dir))
        print("iteration {}, training loss: {:.7f}, validation loss: {:.7f}".format(i+1, np.mean(loss_history), val_loss))
        loss_history.clear()
        val_acc = test_oneshot(siamese_net, N, path=Config.validation_dir, verbose=True)
        if val_acc >= best:
            print("saving")
            siamese_net.save_weights('saved_best')
            best = val_acc

In [None]:
# 테스트셋 정확도
siamese_net.load_weights("saved_best")
test_acc = test_oneshot(siamese_net, 619, path=Config.testing_dir, verbose=True)
print("Accuracy: {}".format(test_acc))

Got an average of 81.26009693053312% 619 way one-shot learning accuracy
Accuracy: 81.26009693053312


In [None]:
db = "D:/data/db/"
face = "D:/data/testing/0071/2.jpg"
real = face.split('/')[3]
dbs = os.listdir(db)
right = np.array([ np.expand_dims( cv2.resize( cv2.imread(os.path.join(db,person),0), (150,150) ), -1 ) for person in dbs ])
names = [ os.path.splitext(person)[0] for person in dbs ]
face = cv2.resize(cv2.imread(face, 0), (150,150), interpolation=cv2.INTER_AREA)
face = np.expand_dims(face, -1)
left = np.array([face for _ in range(len(dbs))])
probs = np.squeeze(siamese_net.predict([left, right]))
index = np.argmax(probs)
prob = probs[index]
name = "Unknown"
if prob>0.5:
    name = names[index]

print(real)
print(name)


0071
0186
