<a href="https://colab.research.google.com/github/co1dtype/face_recog0902/blob/main/0902_final_learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import tensorflow as tf
import random
import os
import cv2
import tensorboard as TensorBoard
import numpy as np
import matplotlib.pyplot as plt

from IPython import display
from tqdm.notebook import tqdm
from PIL import Image
from tensorflow import keras
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Conv2D, MaxPooling2D, AveragePooling2D
from tensorflow.keras import regularizers
from tensorflow.keras.models import *
from tensorflow.keras.callbacks import *
from tensorflow.keras.layers import *
from tensorflow.keras.regularizers import *
from mpl_toolkits.mplot3d import Axes3D

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
#train_data = np.load("/content/drive/MyDrive/Lab/data_list_mid.npy")
train_noisy_data = np.load("/content/drive/MyDrive/Lab/train_noisy_min_list.npy")
train_label = np.load("/content/drive/MyDrive/Lab/data_label_mid.npy")
test_data = np.load("/content/drive/MyDrive/Lab/test_data_list_small.npy")
test_label = np.load("/content/drive/MyDrive/Lab/test_data_label_small.npy")
val_data = np.load("/content/drive/MyDrive/Lab/val_data_list_small.npy")
val_label = np.load("/content/drive/MyDrive/Lab/val_data_label_small.npy")


In [4]:
backup_train_label = train_label
#one hot for train
one_hot = [0]*300
nlabel = [0]*len(train_label)
for i in range(len(train_label)):
    one_hot[train_label[i]] = 1
    nlabel[i] = one_hot
    one_hot = [0]*300
train_label = np.array(nlabel)

In [None]:
len(train_data)

In [7]:
gen_train_dataset = (
    tf.data.Dataset.from_tensor_slices((train_data, train_label))
    .shuffle(buffer_size=13500)
    .batch(64).prefetch(tf.data.experimental.AUTOTUNE).cache()
)

In [8]:
gen_train_dataset2 = (
    tf.data.Dataset.from_tensor_slices((train_noisy_data, train_label))
    .shuffle(buffer_size=13500)
    .batch(64).prefetch(tf.data.experimental.AUTOTUNE).cache()
)

In [9]:
def build_vgg_block(input_layer,
                    num_cnn=3, 
                    channel=64,
                    block_num=1,
                   ):                   
    # 입력 레이어
    x = input_layer
	# num_cnn : 한블럭에서 사용할 conv필터 개수 네트워크에 따라 2개일때가 있고 3개일때가 있음.
    # CNN 레이어
    for cnn_num in range(num_cnn):
        x = keras.layers.Conv2D(
            filters=channel,
            kernel_size=(3,3),
            activation='relu',
            kernel_initializer='he_normal',
            padding='same',
            name=f'block{block_num}_conv{cnn_num}'
        )(x)    

    # Max Pooling 레이어
    x = keras.layers.MaxPooling2D(
        pool_size=(2, 2),
        strides=2,
        name=f'block{block_num}_pooling'
    )(x)

    return x

In [10]:
def build_vgg(input_shape=(112,112,3),
              num_cnn_list=[2,2,3,3,3],
              channel_list=[64,128,256,512,512],
              num_classes=300):
    
    assert len(num_cnn_list) == len(channel_list) #모델을 만들기 전에 config list들이 같은 길이인지 확인합니다.
    
    
    
    
    input_layer = keras.layers.Input(shape=input_shape)  # input layer를 만들어둡니다.
    output = input_layer
    
    # config list들의 길이만큼 반복해서 블록을 생성합니다.
    for i, (num_cnn, channel) in enumerate(zip(num_cnn_list, channel_list)):
        output = build_vgg_block(
            output,
            num_cnn=num_cnn, 
            channel=channel,
            block_num=i
        )
        
    output = keras.layers.Flatten(name='flatten')(output)
    output = keras.layers.Dense(4096, activation='relu', name='fc1')(output)
    output = keras.layers.Dense(4096, activation='relu', name='fc2')(output)
    embedding = keras.layers.Dense(256, activation='relu', name='embedding')(output)
    output = keras.layers.Dense(num_classes, activation='softmax', name='predictions')(embedding)
    
    model = keras.Model(
        inputs=input_layer, 
        outputs=output
    )
    return model

In [11]:
model = build_vgg()
# tf.keras.utils.plot_model(model, show_shapes=True, dpi=64)

In [12]:
model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 112, 112, 3)]     0         
                                                                 
 block0_conv0 (Conv2D)       (None, 112, 112, 64)      1792      
                                                                 
 block0_conv1 (Conv2D)       (None, 112, 112, 64)      36928     
                                                                 
 block0_pooling (MaxPooling2  (None, 56, 56, 64)       0         
 D)                                                              
                                                                 
 block1_conv0 (Conv2D)       (None, 56, 56, 128)       73856     
                                                                 
 block1_conv1 (Conv2D)       (None, 56, 56, 128)       147584    
                                                             

In [13]:
len(val_data)

150

In [14]:
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.CategoricalAccuracy(name='train_accuracy')

In [15]:
loss_object = tf.keras.losses.CategoricalCrossentropy
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)

In [16]:
@tf.function
def train_step(images, labels):
    with tf.GradientTape() as tape:
    # training=True is only needed if there are layers with different
    # behavior during training versus inference (e.g. Dropout).
        predictions = model(images, training=True)
        loss = loss_object()(labels, predictions)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    train_loss(loss)
    train_accuracy(labels, predictions)

In [16]:
EPOCHS = 5

for epoch in (range(EPOCHS)):
    # Reset the metrics at the start of the next epoch
    train_loss.reset_states()
    train_accuracy.reset_states()

    for images, labels in tqdm(gen_train_dataset, total=len(gen_train_dataset)):
        train_step(images, labels)
    
    print(
    f'\n\nEpoch {epoch + 1}, '
    f'Loss: {train_loss.result()}, '
    f'Accuracy: {train_accuracy.result() * 100}, '
    )
    model.save("0902")
    



  0%|          | 0/211 [00:00<?, ?it/s]





Epoch 1, Loss: 5.646106719970703, Accuracy: 0.585185170173645, 


  0%|          | 0/211 [00:00<?, ?it/s]





Epoch 2, Loss: 4.291070938110352, Accuracy: 11.259259223937988, 


  0%|          | 0/211 [00:00<?, ?it/s]





Epoch 3, Loss: 1.4702686071395874, Accuracy: 60.481483459472656, 


  0%|          | 0/211 [00:00<?, ?it/s]





Epoch 4, Loss: 0.30597347021102905, Accuracy: 91.01481628417969, 


  0%|          | 0/211 [00:00<?, ?it/s]





Epoch 5, Loss: 0.11384681612253189, Accuracy: 96.86666870117188, 


In [17]:
EPOCHS = 7

for epoch in (range(EPOCHS)):
    # Reset the metrics at the start of the next epoch
    train_loss.reset_states()
    train_accuracy.reset_states()

    for images, labels in tqdm(gen_train_dataset2, total=len(gen_train_dataset2)):
        train_step(images, labels)
    
    print(
    f'\n\nEpoch {epoch + 1}, '
    f'Loss: {train_loss.result()}, '
    f'Accuracy: {train_accuracy.result() * 100}, '
    )
    model.save("0902_2")
    



  0%|          | 0/211 [00:00<?, ?it/s]





Epoch 1, Loss: 5.6608476638793945, Accuracy: 0.7333332896232605, 


  0%|          | 0/211 [00:00<?, ?it/s]





Epoch 2, Loss: 5.2623724937438965, Accuracy: 4.7037034034729, 


  0%|          | 0/211 [00:00<?, ?it/s]





Epoch 3, Loss: 4.176854610443115, Accuracy: 15.822222709655762, 


  0%|          | 0/211 [00:00<?, ?it/s]





Epoch 4, Loss: 2.4144740104675293, Accuracy: 42.77777862548828, 


  0%|          | 0/211 [00:00<?, ?it/s]





Epoch 5, Loss: 0.9004035592079163, Accuracy: 75.61481475830078, 


  0%|          | 0/211 [00:00<?, ?it/s]





Epoch 6, Loss: 0.3207996189594269, Accuracy: 91.01481628417969, 


  0%|          | 0/211 [00:00<?, ?it/s]





Epoch 7, Loss: 0.1571727842092514, Accuracy: 95.39259338378906, 


In [16]:
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
              loss=tf.keras.losses.CategoricalCrossentropy(),
              metrics=tf.keras.metrics.CategoricalAccuracy())

In [None]:
model.fit(train_data,train_label,epochs=2,batch_size = 32,validation_split = 0.2)

In [18]:
def test_eer(check_model):
    layer_name = 'embedding'
    model_epoch = keras.Model(inputs=check_model.input,
                                           outputs=check_model.get_layer(layer_name).output)
    embedding_testlist = [] # 임베딩 리스트 이름
    qqqq = 0
    for i in (range(1350)): # chect len(test_data_label)
        timg = test_data[i].reshape(1,112,112,3)
        intermediate_output = model_epoch((timg)) # 모델
        embedding_testlist.append([np.array(intermediate_output[0]),test_label[i]])#append


    distance_testlist = [] # distance list 이름
    #print("임베딩 완료")
    for i in (range(1349)):
        for j in range(i+1,1350):
            a = embedding_testlist[i][0] # a,b 임베딩 리스트 이름
            b = embedding_testlist[j][0]
            c = 0
            if embedding_testlist[i][1] == embedding_testlist[j][1]:
                c = 1
            dist = np.linalg.norm(a-b)
            distance_testlist.append([dist,c,embedding_testlist[i][1],embedding_testlist[j][1],embedding_testlist[i][-1],embedding_testlist[j][-1]])

    fmr = 0
    fm = 0
    tnm = 0
    distance_testlist.sort()


    score_list = [0]*len(distance_testlist)
    for i in (range(len(distance_testlist))):
        score_list[i] = distance_testlist[i][1]
    score_list = np.array(score_list)

    fm = 0
    tnm = 0
    fnm = 0
    tm = 0
    fmr_list = []
    fnmr_list = []
    for i in range(len(score_list)):
        if i < 0:
            if score_list[i] == 1:
                tm += 1
            if score_list[i] == 0:
                fm += 1
        elif i > 0:
            if score_list[i] == 1:
                fnm += 1
            if score_list[i] == 0:
                tnm += 1

    for i in (range(1,len(score_list)+1)):

        if score_list[i-1] == 1:
            tm += 1
            fnm -= 1
        if score_list[i-1] == 0:
            fm += 1
            tnm -= 1
        fmr = fm / (fm + tnm)
        fnmr = fnm / (fnm + tm)
        fmr_list.append(fmr)
        fnmr_list.append(fnmr)
    val_fmr = fmr_list
    val_fnmr = fnmr_list


    for_eer_list = []
    for i in range(len(val_fmr)):
        for_eer_list.append(abs(val_fmr[i]-val_fnmr[i]))

    val_th_index = for_eer_list.index(min(for_eer_list))

    val_eer = val_fmr[val_th_index]

    tp = 0
    tn = 0
    for i in (range(len(score_list))):
        if i <= val_th_index:
            if score_list[i] == 1:
                tp += 1
        else:
            if score_list[i] == 0:
                tn += 1

    val_acc = (tp+tn)/len(score_list)
    print("test_eer :",val_eer)
    print("test_acc :",val_acc)


In [19]:
test_eer(model)

test_eer : 0.08720543764738521
test_acc : 0.9127935645059441


In [20]:
pb_path =  './0902'
model0 = tf.keras.models.load_model(pb_path)



In [21]:
test_eer(model0)

test_eer : 0.09397808295186573
test_acc : 0.9060209208467177


## emsemble acc
test_eer : 0.0757752808988764  
test_acc : 0.9242237048019109

In [22]:
model1 = model

In [23]:
layer_name = 'embedding'
model_epoch = keras.Model(inputs=model0.input,
                                       outputs=model0.get_layer(layer_name).output)
embedding_testlist = [] # 임베딩 리스트 이름
qqqq = 0
for i in (range(1350)): # chect len(test_data_label)
    timg = test_data[i].reshape((1,112,112,3))
    intermediate_output = model_epoch((timg)) # 모델
    embedding_testlist.append([np.array(intermediate_output[0]),test_label[i]])#append


distance_testlist = [] # distance list 이름


model0s_embedding_testlist = embedding_testlist


layer_name = 'embedding'
model_epoch = keras.Model(inputs=model1.input,
                                       outputs=model1.get_layer(layer_name).output)
embedding_testlist = [] # 임베딩 리스트 이름
qqqq = 0
for i in (range(1350)): # chect len(test_data_label)
    timg = test_data[i].reshape((1,112,112,3))
    intermediate_output = model_epoch((timg)) # 모델
    embedding_testlist.append([np.array(intermediate_output[0]),test_label[i]])#append


distance_testlist = [] # distance list 이름


model1s_embedding_testlist = embedding_testlist

model1s_embedding_testlist = np.array(model1s_embedding_testlist)
model0s_embedding_testlist = np.array(model0s_embedding_testlist)




In [24]:
ensemble_embedding_list = []


for i in tqdm(range(len(model0s_embedding_testlist))):
    ensemble = np.array([model0s_embedding_testlist[i][0],model1s_embedding_testlist[i][0]])
    ensemble = np.concatenate(ensemble)
    ensemble_embedding_list.append([ensemble,model0s_embedding_testlist[i][1]])

  0%|          | 0/1350 [00:00<?, ?it/s]

In [25]:
ensemble_embedding_list = np.array(ensemble_embedding_list)

  """Entry point for launching an IPython kernel.


In [26]:
ensemble_embedding_list[0][0].shape

(512,)

In [27]:
distance_testlist = [] # distance list 이름
#print("임베딩 완료")
for i in (range(1349)):
    for j in range(i+1,1350):
        a = ensemble_embedding_list[i][0] # a,b 임베딩 리스트 이름
        b = ensemble_embedding_list[j][0]
        c = 0
        if ensemble_embedding_list[i][1] == ensemble_embedding_list[j][1]:
            c = 1
        dist = np.linalg.norm(a-b)
        distance_testlist.append([dist,c,embedding_testlist[i][1],embedding_testlist[j][1],embedding_testlist[i][-1],embedding_testlist[j][-1]])

fmr = 0
fm = 0
tnm = 0
distance_testlist.sort()


score_list = [0]*len(distance_testlist)
for i in (range(len(distance_testlist))):
    score_list[i] = distance_testlist[i][1]
score_list = np.array(score_list)

fm = 0
tnm = 0
fnm = 0
tm = 0
fmr_list = []
fnmr_list = []
for i in range(len(score_list)):
    if i < 0:
        if score_list[i] == 1:
            tm += 1
        if score_list[i] == 0:
            fm += 1
    elif i > 0:
        if score_list[i] == 1:
            fnm += 1
        if score_list[i] == 0:
            tnm += 1

for i in (range(1,len(score_list)+1)):

    if score_list[i-1] == 1:
        tm += 1
        fnm -= 1
    if score_list[i-1] == 0:
        fm += 1
        tnm -= 1
    fmr = fm / (fm + tnm)
    fnmr = fnm / (fnm + tm)
    fmr_list.append(fmr)
    fnmr_list.append(fnmr)
val_fmr = fmr_list
val_fnmr = fnmr_list


for_eer_list = []
for i in range(len(val_fmr)):
    for_eer_list.append(abs(val_fmr[i]-val_fnmr[i]))

val_th_index = for_eer_list.index(min(for_eer_list))

val_eer = val_fmr[val_th_index]

tp = 0
tn = 0
for i in (range(len(score_list))):
    if i <= val_th_index:
        if score_list[i] == 1:
            tp += 1
    else:
        if score_list[i] == 0:
            tn += 1

val_acc = (tp+tn)/len(score_list)
print("test_eer :",val_eer)
print("test_acc :",val_acc)


test_eer : 0.0757752808988764
test_acc : 0.9242237048019109


## Model Zip Download

In [32]:
from zipfile import *
import os



In [None]:
dir = '/content/0902'
os.chdir(dir)


for file in os.listdir(dir):
    filename = file.rpartition('.') 
    with ZipFile(filename[0]+'.zip', 'w') as compzip:
        compzip.write(file)

In [33]:
dir = '/content/0902_2'
os.chdir(dir)


for file in os.listdir(dir):
    filename = file.rpartition('.') 
    with ZipFile(filename[0]+'.zip', 'w') as compzip:
        compzip.write(file)