## 1.1 Cài đặt thư viện




## 1.2 Thêm các thư viện

In [None]:
#import standar dependentcies
import cv2
import os
import random
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from mtcnn import MTCNN

In [None]:
#import tansorflow dependentcies
# Import tensorflow dependencies - Functional API
from tensorflow import keras
from keras.models import Model
from keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import random
import tensorflow.keras.backend as k

## 1.3 Set GPU Growth

In [None]:
# Avoid OOM errors by setting GPU Memory Consumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
def preprocess(file_path):

    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    # Load in the image
    img = tf.io.decode_jpeg(byte_img)

    # Preprocessing steps - resizing the image to be 100x100x3
    img = tf.image.resize(img, (100,100))
    # Scale image to be between 0 and 1
    img = img / 255.0

    # Return image
    return img

# Load data

In [None]:
def load_lfw_dataset(data_dir):
    # Load the LFW dataset from the specified directory
    images = []
    labels = []
    
    for person_dir in os.listdir(data_dir):
        person_path = os.path.join(data_dir, person_dir)
        
        if not os.path.isdir(person_path):
            continue
        
        for image_name in os.listdir(person_path):
            image_path = os.path.join(person_path, image_name)
            
            # Read in image from file path
            byte_img = tf.io.read_file(image_path)
            # Load in the image 
            img = tf.io.decode_jpeg(byte_img)
    
            # Preprocessing steps - resizing the image to be 100x100x3
            img = tf.image.resize(img, (100,100))
            # Scale image to be between 0 and 1 
            img = img / 255.0
            
            images.append(img)
            # Assign a unique label to each person
            person_label = person_dir
            labels.append(person_label)
    
    return images, labels

In [None]:
def create_siamese_pairs(images, labels, target_size):
    pairs_anchor = []
    pairs_val = []
    target = []
    
    # Create positive pairs (same person)
    for i in range(len(images)-(target_size + 1)):
        for j in range(i+1, i + (target_size + 1)):
            if labels[i] == labels[j]:
                pairs_anchor.append((images[i]))
                pairs_val.append((images[j]))
                target.append(1)
#         print('lable =  1')
    # Create negative pairs (different persons)
    for i in range(len(images)-(target_size + 1)):
        for j in range(i+1, i + (target_size + 1)):
            if labels[i] != labels[j]:
                pairs_anchor.append((images[i]))
                pairs_val.append((images[j]))
                target.append(0)
#         print('lable =  0')
    
    return pairs_anchor, pairs_val, target

In [None]:
pairs_anchor = np.load('siamese_dataset_pairs_anchor.npy')
pairs_val = np.load('siamese_dataset_pairs_val.npy')
labels_dataset = np.load('siamese_dataset_target.npy')

pairs_anchor = np.array(pairs_anchor[:600])
pairs_val = np.array(pairs_val[:600])
labels_dataset = np.array(labels_dataset[:600])

In [None]:
anchor_dataset = tf.convert_to_tensor(pairs_anchor)
pairs_dataset = tf.convert_to_tensor(pairs_val)
labels_dataset = tf.convert_to_tensor(labels_dataset)

# Giải phóng biến sau khi sử dụng
del pairs_anchor, pairs_val

# Tạo tập dữ liệu 2 lớp được gán nhãn
data = tf.data.Dataset.zip((
    tf.data.Dataset.from_tensor_slices(anchor_dataset),
    tf.data.Dataset.from_tensor_slices(pairs_dataset),
    tf.data.Dataset.from_tensor_slices(labels_dataset)
))

In [None]:
del anchor_dataset, pairs_dataset,labels_dataset

In [None]:
# Build dataloader pipeline
data = data.cache()
# trộn dữ liệu, chỉ định bộ đệm 1024
data = data.shuffle(buffer_size = 1024)

In [None]:
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [None]:
# Testing partition
test_data = data.skip(round(len(data)*.7))  # Bỏ qua data train
test_data = test_data.take(round(len(data)*.3))  # Lấy 30% cuối cùng
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
del data

# Build embedding layer

In [None]:
def make_embedding(): 
    inp = Input(shape=(100,100,3), name='input_image')
    
    # First block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # Second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # Third block 
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [None]:
embedding = make_embedding()

In [None]:
embedding.summary()

## 4.2 Build Distance Layer

In [None]:
# Siamese L1 Distance class
class L1Dist(Layer):
    
    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
       
    # Euclidean distance calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [None]:
l1 = L1Dist()

## 4.3 Make Siamese Model

In [None]:
def make_siamese_model(): 
    
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(100,100,3))
    
    # Validation image in the network 
    validation_image = Input(name='validation_img', shape=(100,100,3))
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    # Classification layer 
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [None]:
siamese_model = make_siamese_model()

In [None]:
siamese_model.summary()

# 5. Training

In [None]:
def calculate_accuracy(a,b,c):
    for idx, batch in enumerate(test_data):
        y_hat = siamese_model.predict([batch[0], batch[1]])
        y_true = batch[2]
        y_hat_round = []
        for prediction in y_hat:
            if prediction > 0.9:
                y_hat_round.append(1)
            else:
                y_hat_round.append(0)
        true = 0

        for idx in range(0, len(y_hat_round)-1):
            if y_hat_round[idx] == y_true[idx]:
                true = true + 1
            # print(true)
        correct_ratio = true/len(y_hat)
    print('val_accuracy',correct_ratio)
    
    return correct_ratio

## 5.1 Setup Loss and Optimizer

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4) # 0.0001

## 5.2 Establish Checkpoints

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

## 5.3 Build Train Step Function

In [None]:
@tf.function
def train_step(batch):
    
    # Record all of our operations 
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
        
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
        
    # Return loss
    return loss

## 5.4 Build Training Loop

In [None]:
# Import metric calculations
from tensorflow.keras.metrics import Precision, Recall

In [None]:
def train(data, EPOCHS):

    loss_numpys = []
    r_results = []
    p_results = []
    val_accuracy = []
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Creating a metric object 
        r = Recall()
        p = Precision()
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat) 
            progbar.update(idx+1)
        
        print(loss.numpy(), r.result().numpy(), p.result().numpy())
        
        loss_numpys.append(loss.numpy())
        r_results.append(r.result().numpy())
        p_results.append(p.result().numpy())
        
        accuracy_calculation = calculate_accuracy(test_input, test_val, y_true)
        val_accuracy.append(accuracy_calculation)
        # Save checkpoints
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)
    # return val_accuracy
    return (loss_numpys, r_results, p_results, val_accuracy)

## 5.5 Train the model

In [None]:
EPOCHS = 50

In [None]:
siamese_model.compile(loss='BinaryCrossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
loss_numpy, r_result, p_result, val_accuracy = train(train_data, EPOCHS)

In [None]:
# create x-axis values
x = range(len(val_accuracy))
# loss.numpy(), r.result().numpy(), p.result().numpy(), val_accuracy
# plot the data
plt.plot(x, loss_numpy, label='loss')
plt.plot(x, r_result, label='r.result')
plt.plot(x, p_result, label='p.result')
plt.plot(x, val_accuracy, label='val_accuracy')

# Đánh dấu điểm có giá trị lớn nhất
max_index = val_accuracy.index(max(val_accuracy))
plt.scatter(x[max_index], val_accuracy[max_index], color='red', label='Max Value')

plt.annotate(f'({x[max_index]}, {val_accuracy[max_index]})',
             xy=(x[max_index], val_accuracy[max_index]), xytext=(x[max_index]+0.3, val_accuracy[max_index]-0.2),
             arrowprops=dict(facecolor='black', arrowstyle='->'))

# Đánh dấu điểm có giá trị nhỏ nhất
min_index = loss_numpy.index(min(loss_numpy))
plt.scatter(x[min_index], loss_numpy[min_index], color='red', label='Min Value')

plt.annotate(f'({x[min_index]}, {loss_numpy[min_index]})',
             xy=(x[min_index], loss_numpy[min_index]), xytext=(x[min_index]+0.3, loss_numpy[min_index]+0.2),
             arrowprops=dict(facecolor='black', arrowstyle='->'))



# add labels
plt.title('Facial Verification using LFW')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')

# Hiển thị bảng chú thích
legend = plt.legend(loc='center left', bbox_to_anchor=(1, 0.5), bbox_transform=plt.gcf().transFigure)


# plt.grid(True)
plt.grid(True, linestyle='--', linewidth=0.5, color='gray')

# save image
plt.savefig('Facial Verification using LFW(kha_quan)(600)(50).png', dpi=500, bbox_inches='tight')

# show the plot
plt.show()

# 6. Evaluate Model

## 6.1 Import Metrics

In [None]:
# Import metric calculations
from tensorflow.keras.metrics import Precision, Recall

## 6.2 Make Predictions

In [None]:
# Get a batch of test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
y_hat = siamese_model.predict([test_input, test_val])

In [None]:
# Post processing the results
[1 if prediction > 0.8 else 0 for prediction in y_hat ]

In [None]:
y_true

## 6.3 Calculate Metrics

In [None]:
# Creating a metric object
m = Recall()

# Calculating the recall value
m.update_state(y_true, y_hat)

# Return Recall Result
m.result().numpy()

In [None]:
# Creating a metric object
m = Precision()

# Calculating the recall value
m.update_state(y_true, y_hat)

# Return Recall Result
m.result().numpy()

In [None]:
r = Recall()
p = Precision()

for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = siamese_model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat)

print(r.result().numpy(), p.result().numpy())

In [None]:
test_resul = 0
for idx in range(0,100):

    test_input, test_val, y_true = test_data.as_numpy_iterator().next()

    y_hat = siamese_model.predict([test_input, test_val])
    y_hat_round = []
    for prediction in y_hat:
        if prediction > 0.9:
            y_hat_round.append(1)
        else:
            y_hat_round.append(0)
    true = 0
    for idx in range(len(y_true)):
        if y_hat_round[idx] == y_true[idx]:
            true = true + 1
    print(true)
    test_resul = test_resul + true

correct_ratio = test_resul/len(y_hat)
print(correct_ratio)

## TEST

In [None]:
test_input = np.load('test_siamese_dataset_pairs_anchor.npy')
test_val = np.load('test_siamese_dataset_pairs_val.npy')
y_true = np.load('test_siamese_dataset_target.npy')


In [None]:
y_hat = siamese_model.predict([test_input, test_val])
y_hat_round = []
for prediction in y_hat:
    if prediction > 0.9:
        y_hat_round.append(1)
    else:
        y_hat_round.append(0)
true = 0

for idx in range(0, len(y_hat_round)-1):
    if y_hat_round[idx] == y_true[idx]:
        true = true + 1
#     print(true)

correct_ratio = true/len(y_hat)
print(correct_ratio)

In [None]:
test_input = np.load('LFW_pairs_anchor.npy')
test_val = np.load('LFW_pairs_val.npy')
y_true = np.load('LFW_target.npy')

y_hat = siamese_model.predict([test_input, test_val])
y_hat_round = []
for prediction in y_hat:
    if prediction > 0.95:
        y_hat_round.append(1)
    else:
        y_hat_round.append(0)
true = 0

for idx in range(0, len(y_hat_round)-1):
    if y_hat_round[idx] == y_true[idx]:
        true = true + 1
  # print(true)
correct_ratio = true/len(y_hat)
print(correct_ratio)

## 6.4 Viz Results

In [None]:
# Set plot size
plt.figure(figsize=(10,8))

# Set first subplot
plt.subplot(1,2,1)
plt.imshow(test_input[0])

# Set second subplot
plt.subplot(1,2,2)
plt.imshow(test_val[0])

# Renders cleanly
plt.show()

# 7. Save Model

In [None]:
# Save weights
siamese_model.save('siamesemodel_LFW(600)(50).h5')

In [None]:
# # Reload model
# model = tf.keras.models.load_model('siamesemodel_LFW_v1_0.h5',
#                                    custom_objects={'L1Dist':L1Dist, 'contrastive_loss':contrastive_loss})

In [None]:
# Reload model 
model = tf.keras.models.load_model('siamesemodel_LFW(600)(50).h5', 
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})