#### 1. 라이브러리 설치: tensorflow, tensorflow-gpu, opencv-python, matplotlib

#### 2. 라이브러리 불러오기

In [1]:
# 기본
import cv2
import os
import random
import numpy as np
import matplotlib.pyplot as plt

In [2]:
# 텐서플로우 - functionl api
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

#### 3. GPU 설정하기

In [3]:
gpus= tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

#### 4. 데이터가 들어갈 경로, 폴더 설정 os

In [4]:
# 경로 설정
POS_PATH= os.path.join('data', 'positive')
NEG_PATH= os.path.join('data', 'negative')
ANC_PATH= os.path.join('data', 'anchor')

In [6]:
# 폴더 설정
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

#### 5. 데이터 수집

##### 1) 데이터 수집(http://vis-www.cs.umass.edu/lfw/#download -> All images as gzipped tar file)

In [7]:
# tgz 입축풀기 → 현재 노트의 파일로 복사
!tar -xf lfw.tgz

##### 2) 수집한 사진 경로 재지정(negative)

In [8]:
# lfw 이미지 negative 파일로 이동
# lfw 각 파일 안의 사진들의 경로를 각 하위로 들어갈 수 있도록 지정.
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH= os.path.join('lfw', directory, file)
        NEW_PATH= os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)

##### 3) 내 사진 찍기(positive)

In [21]:
# uuid 라이브러리 불러오기
import uuid

In [22]:
# 카메라와 연결
cap= cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame= cap.read()
    # 카메라 프레임 250 x 250px 으로 수집한 데이터와 동일하게 지정
    frame= frame[30:30+250, 250:250+250, :]
    
    # 키보드 a를 누르면 anchors 사진을 모을 수 있다.
    if cv2.waitKey(1) & 0XFF == ord('a'):
        # 이미지 이름, 경로 지정
        imgname= os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        # anchor 이미지(사진찍기) 생성
        cv2.imwrite(imgname, frame)
        
    # 키보드 p를 누르면 positives 사진을 모을 수 있다.
    if cv2.waitKey(1) & 0XFF == ord('p'):
        # 이미지 이름, 경로 지정
        imgname= os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        # anchor 이미지(사진찍기) 생성
        cv2.imwrite(imgname, frame)
        
    # 카메라 화면을 모니터에 띄운다
    cv2.imshow('Image Collection', frame)
    
    # 키보드 q를 누르면 종료한다.
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
        
# 카메라 종료
cap.release()
cv2.destroyAllWindows()

#### 6. 데이터 준비

##### 1) 각 폴더의 사진.jpg(확장자포함) 300장 경로 가져오기

In [5]:
anchor= tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(300)
positive= tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(300)
negative= tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(300)

##### 2) 전처리: 스케일, 사이즈 조정

In [53]:
def preprocess(file_path):
    # 사진 주소 불러오기
    byte_img = tf.io.read_file(file_path)
    # 사진 업로드
    img = tf.io.decode_jpeg(byte_img)
    # 100x100px로 조정
    img = tf.image.resize(img, (100,100))
    # 스케일 조정(0~1 사의 값)
    img = img / 255.0
    # Return image
    return img

##### 3) 라벨 생성(positive:1, negative:0) 

##### 4) (anchor, positive), (anchor, negative) + 라벨링

In [8]:
positives= tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives= tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data= positives.concatenate(negatives) # positives와 negatives를 하나의 배열로 

##### 5) train, test set 만들기

In [9]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [10]:
data= data.map(preprocess_twin)
data= data.cache()
data= data.shuffle(buffer_size= 1024)

In [11]:
# train data
train_data= data.take(round(len(data)*.7))
train_data= train_data.batch(16)
train_data= train_data.prefetch(8)

In [12]:
# test data
test_data= data.skip(round(len(data)*.7))
test_data= test_data.take(round(len(data)*.3))
test_data= test_data.batch(16)
test_data= test_data.prefetch(8)

#### 7. 모델 생성

##### 1) embedding 레이어

In [13]:
def make_embedding():
    inp= Input(shape= (105, 105, 3), name= 'input_image')
    
    # 첫번째 블록
    c1= Conv2D(64, (10, 10), activation= 'relu')(inp)
    m1= MaxPooling2D(64, (2, 2), padding= 'same')(c1)
    
    # 두번째 블록
    c2= Conv2D(128, (7, 7), activation= 'relu')(m1)
    m2= MaxPooling2D(64, (2, 2), padding= 'same')(c2)
    
    # 세번째 블록
    c3= Conv2D(128, (4,4), activation= 'relu')(m2)
    m3= MaxPooling2D(64, (2, 2), padding= 'same')(c3)
    
    # 마지막 블록
    c4= Conv2D(256, (4, 4), activation= 'relu')(m3)
    f1= Flatten()(c4)
    d1= Dense(4096, activation= 'sigmoid')(f1)
    
    return Model(inputs= [inp], outputs=[d1], name= 'embedding')

In [14]:
embedding= make_embedding()

##### 2) distance 레이어

In [15]:
# L1 distance
class L1Dist(Layer):
    # 상속
    def __init__(self, **kwargs):
        super().__init__()
    # ?
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [16]:
l1= L1Dist()

##### 3) siamese 뉴럴 네트워크

In [17]:
def make_siamese_model():
    
    # anchor 이미지 입력
    input_image= Input(name= 'input_img', shape= (105, 105, 3))
    # validation 이미지 입력
    validation_image= Input(name= 'validation_img', shape= (105, 105, 3))
    
    # 입력값 전달
    siamese_layer= L1Dist() # distance
    siamese_layer._name= 'distance'
    distance= siamese_layer(embedding(input_image), embedding(validation_image)) # embedding
    
    # output
    classifier= Dense(1, activation= 'sigmoid')(distance)
    
    return Model(inputs= [input_image, validation_image], outputs= classifier, name= 'SiameseNetwork')

In [18]:
siamese_model= make_siamese_model()

In [18]:
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

#### 8. 훈련..

##### 1) loss, optimizer 설정

In [19]:
# loss
binary_cross_loss= tf.losses.BinaryCrossentropy()

In [20]:
# optimizer 0.0001
opt= tf.keras.optimizers.Adam(1e-4)

##### 2) 체크포인트 설정

In [21]:
checkpoint_dir= './training_checkpoints'
checkpoint_prefix= os.path.join(checkpoint_dir, 'ckpt')
checkpoint= tf.train.Checkpoint(opt= opt, siamese_model= siamese_model)

##### 3) train step function

In [23]:
@tf.function
def train_step(batch):
    
    # 기록
    with tf.GradientTape() as tape:
        # anchor, positiva/negative 이미지 불러오기
        X= batch[:2]
        # 라벨 불러오기(0,1)
        y= batch[2]
        
        # forward pass
        yhat= siamese_model(X, training= True)
        # loss 계산
        loss= binary_cross_loss(y, yhat)
    print(loss)
    
    # gradient 계산
    grad= tape.gradient(loss, siamese_model.trainable_variables)
    
    # 모델에 가중치 적용
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    
    return loss

##### 4) 훈련 함수 생성

In [24]:
def train(data, EPOCHS):
    # epochs 반복
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar= tf.keras.utils.Progbar(len(data))
        
        # batch 반복
        for idx, batch in enumerate(data):
            # run
            train_step(batch)
            progbar.update(idx+1)
        
        # 체크포인트
        if epoch % 10 == 0:
            checkpoint.save(file_prefix= checkpoint_prefix)

##### 5) 모델 훈련

In [25]:
EPOCHS= 50

In [26]:
train(train_data, EPOCHS)


 Epoch 1/50
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)

 Epoch 2/50

 Epoch 3/50

 Epoch 4/50

 Epoch 5/50

 Epoch 6/50

 Epoch 7/50

 Epoch 8/50

 Epoch 9/50

 Epoch 10/50

 Epoch 11/50

 Epoch 12/50

 Epoch 13/50

 Epoch 14/50

 Epoch 15/50

 Epoch 16/50

 Epoch 17/50

 Epoch 18/50

 Epoch 19/50

 Epoch 20/50

 Epoch 21/50

 Epoch 22/50

 Epoch 23/50

 Epoch 24/50

 Epoch 25/50

 Epoch 26/50

 Epoch 27/50

 Epoch 28/50

 Epoch 29/50

 Epoch 30/50

 Epoch 31/50

 Epoch 32/50

 Epoch 33/50

 Epoch 34/50

 Epoch 35/50

 Epoch 36/50

 Epoch 37/50

 Epoch 38/50

 Epoch 39/50

 Epoch 40/50

 Epoch 41/50

 Epoch 42/50

 Epoch 43/50

 Epoch 44/50

 Epoch 45/50

 Epoch 46/50

 Epoch 47/50

 Epoch 48/50

 Epoch 49/50

 Epoch 50/50


#### 9. 모델 평가

In [27]:
from tensorflow.keras.metrics import Precision, Recall

In [28]:
test_input, test_val, y_true= test_data.as_numpy_iterator().next()

In [29]:
# prediction
predictions= siamese_model.predict([test_input, test_val])
predictions



array([[9.9994862e-01],
       [1.0000000e+00],
       [9.9979919e-01],
       [1.0000000e+00],
       [3.1617617e-10],
       [8.2528784e-10],
       [9.9890327e-01],
       [2.8101536e-05],
       [9.9996042e-01],
       [3.8849133e-09],
       [1.1258986e-08],
       [6.8303241e-10],
       [9.9987388e-01],
       [9.9999976e-01],
       [7.4503559e-09],
       [5.9898958e-10]], dtype=float32)

In [30]:
# 0.5 이상이면 1
[1 if prediction>0.5 else 0 for prediction in predictions]

[1, 1, 1, 1, 0, 0, 1, 0, 1, 0, 0, 0, 1, 1, 0, 0]

In [31]:
y_true

array([1., 1., 1., 1., 0., 0., 1., 0., 1., 0., 0., 0., 1., 1., 0., 0.],
      dtype=float32)

In [32]:
# recall 평가
m= Recall()
m.update_state(y_true, predictions)
m.result().numpy()

1.0

In [33]:
# Precision 평가
m= Precision()
m.update_state(y_true, predictions)
m.result().numpy()

1.0

#### 10. 모델저장.h5

In [34]:
siamese_model.save('siamesemodel.h5')



In [35]:
model= tf.keras.models.load_model('siamesemodel.h5', 
                                 custom_objects= {'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})



In [36]:
model.predict([test_input, test_val])



array([[9.9994862e-01],
       [1.0000000e+00],
       [9.9979919e-01],
       [1.0000000e+00],
       [3.1617617e-10],
       [8.2528784e-10],
       [9.9890327e-01],
       [2.8101536e-05],
       [9.9996042e-01],
       [3.8849133e-09],
       [1.1258986e-08],
       [6.8303241e-10],
       [9.9987388e-01],
       [9.9999976e-01],
       [7.4503559e-09],
       [5.9898958e-10]], dtype=float32)

In [37]:
model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

#### 11. 최종 테스트

In [52]:
def verify(model, detection_threshold, verification_threshold):
    # Build results array
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))
        
        # prediction 
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)
    
    #detection_threshold: positive로 예측한 metrics
    detection = np.sum(np.array(results) > detection_threshold)
    
    #verification_threshold: positive로 예측한 비율 / positive 이미지 개수
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images'))) 
    verified = verification > verification_threshold
    
    return results, verified

In [54]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame= frame[30:30+250, 250:250+250, :]
    
    cv2.imshow('Verification', frame)
    
    # v 눌러서 확인 
    if cv2.waitKey(10) & 0xFF == ord('v'):
        # 해당 경로에 v로 찍은 사진 저장
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        # True or False 출력
        results, verified = verify(model, 0.9, 0.7)
        print(verified)
    # q눌러 종료
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

True
True
False
False
False
True
