In [1]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model, Input
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense, Lambda
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import train_test_split
from mtcnn import MTCNN
import cv2
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.applications.resnet50 import preprocess_input

In [2]:


# Initialize MTCNN face detector
detector = MTCNN()

def load_dataset(data_dir, min_images_per_person=1):
    images = []
    labels = []
    label_dict = {}
    current_label = 0
    
    all_persons = os.listdir(data_dir)
    # Randomly select about 60% of the persons
    selected_persons = np.random.choice(all_persons, size=int(0.4 * len(all_persons)), replace=False)  
    for person_name in selected_persons:
        person_dir = os.path.join(data_dir, person_name)
        if not os.path.isdir(person_dir):
            continue
        # List all image files for the person
        image_files = [f for f in os.listdir(person_dir) if os.path.isfile(os.path.join(person_dir, f))]
        # Check the number of images for the person
        if len(image_files) < min_images_per_person:
            continue  # Skip if not enough images
        label_dict[current_label] = person_name
        # Iterate over each image of the person
        for image_name in image_files:
            image_path = os.path.join(person_dir, image_name)
            # Read the image using OpenCV
            img = cv2.imread(image_path)
            if img is not None:
                images.append(img)
                labels.append(current_label)
        current_label += 1
    return images, labels, label_dict

def preprocess_faces(images, labels):
    processed_images = []
    processed_labels = []
    for idx, img in enumerate(images):
        # Convert image to RGB (OpenCV uses BGR)
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        # Detect faces
        results = detector.detect_faces(img_rgb)
        if results:
            # Find the largest face
            max_area = 0
            largest_face = None
            for result in results:
                x, y, width, height = result['box']
                area = width * height
                if area > max_area:
                    max_area = area
                    largest_face = result['box']
            if largest_face is not None:
                x, y, width, height = largest_face
                h, w, _ = img_rgb.shape
                x1 = max(0, x)
                y1 = max(0, y)
                x2 = min(w, x1 + width)
                y2 = min(h, y1 + height)
                face = img_rgb[y1:y2, x1:x2]
                # Resize face to 224x224 (ResNet input size)
                face = cv2.resize(face, (224, 224))
                processed_images.append(face)
                processed_labels.append(labels[idx])
        else:
            continue
    return np.array(processed_images), np.array(processed_labels)

def create_triplets(images, labels, minperson):
    triplets = []
    num_classes = len(np.unique(labels))
    label_indices = {label: np.where(labels == label)[0] for label in np.unique(labels)}
    
    for anchor_label in label_indices.keys():
        indices = label_indices[anchor_label]
        if len(indices) < 2 and minperson > 1:
            continue
        for idx in indices:
            anchor_image = images[idx]
            # Positive image (same class)
            positive_idx = idx
            if minperson > 1:
                while positive_idx == idx:
                    positive_idx = np.random.choice(indices)
            positive_image = images[positive_idx]
            
            # Negative image (different class)
            negative_label = np.random.choice([l for l in label_indices.keys() if l != anchor_label])
            negative_indices = label_indices[negative_label]
            negative_idx = np.random.choice(negative_indices)
            negative_image = images[negative_idx]
            
            triplets.append([anchor_image, positive_image, negative_image])
        
    return np.array(triplets)

In [3]:
data_dir = 'archive\\lfw-deepfunneled\\lfw-deepfunneled'
include_single_image_folders = True
min_images_per_person = 2 if include_single_image_folders else 5

images, labels, label_dict = load_dataset(data_dir, min_images_per_person)
processed_images, processed_labels = preprocess_faces(images, labels)

# Convert dtype and preprocess with ResNet50's preprocess_input
processed_images = processed_images.astype('float32')
processed_images = preprocess_input(processed_images)

triplets = create_triplets(processed_images, processed_labels, min_images_per_person)

triplets_train, triplets_test = train_test_split(triplets, test_size=0.2, random_state=42)

def split_triplets(triplets):
    anchors = triplets[:, 0]
    positives = triplets[:, 1]
    negatives = triplets[:, 2]
    return [anchors, positives, negatives]

X_train = split_triplets(triplets_train)
X_test = split_triplets(triplets_test)
y_dummy = np.zeros((len(triplets_train),))

In [None]:
def create_embedding_model(input_shape):
    base_model = ResNet50(include_top=False, weights='imagenet', input_shape=input_shape)
    # Freeze base_model layers
    for layer in base_model.layers:
        layer.trainable = False
    
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(128)(x)
    x = Lambda(lambda v: tf.math.l2_normalize(v, axis=1))(x)
    
    model = Model(inputs=base_model.input, outputs=x)
    return model

def create_triplet_model(input_shape):
    embedding_model = create_embedding_model(input_shape)
    anchor_input = Input(shape=input_shape, name='anchor_input')
    positive_input = Input(shape=input_shape, name='positive_input')
    negative_input = Input(shape=input_shape, name='negative_input')
    
    encoded_anchor = embedding_model(anchor_input)
    encoded_positive = embedding_model(positive_input)
    encoded_negative = embedding_model(negative_input)
    
    merged_output = layers.concatenate([encoded_anchor, encoded_positive, encoded_negative], axis=1)
    
    model = Model(inputs=[anchor_input, positive_input, negative_input], outputs=merged_output)
    return model

def triplet_loss(y_true, y_pred, alpha=0.2):
    total_length = y_pred.shape.as_list()[-1]
    anchor = y_pred[:, 0:128]
    positive = y_pred[:, 128:256]
    negative = y_pred[:, 256:384]
    pos_dist = tf.reduce_sum(tf.square(anchor - positive), axis=1)
    neg_dist = tf.reduce_sum(tf.square(anchor - negative), axis=1)
    basic_loss = pos_dist - neg_dist + alpha
    loss = tf.reduce_mean(tf.maximum(basic_loss, 0.0))
    return loss

input_shape = (224, 224, 3)
triplet_model = create_triplet_model(input_shape)

triplet_model.compile(optimizer=Adam(0.0001), loss=triplet_loss)

# Add EarlyStopping callback
from tensorflow.keras.callbacks import EarlyStopping
early_stopping = EarlyStopping(monitor='val_loss', patience=2, restore_best_weights=True)

# Train the model for 6 epochs instead of 10, with early stopping
triplet_model.fit(
    X_train,
    y_dummy,
    validation_data=(X_test, np.zeros((len(triplets_test),))),
    batch_size=32,
    epochs=6,
    callbacks=[early_stopping]
)



embedding_model = create_embedding_model(input_shape)
embedding_model.set_weights(triplet_model.get_weights()[:len(embedding_model.weights)])
embedding_model.save_weights('test40%.weights.h5')


Epoch 1/6




[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m506s[0m 6s/step - loss: 0.0937 - val_loss: 0.0717
Epoch 2/6
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m466s[0m 5s/step - loss: 0.0493 - val_loss: 0.0615
Epoch 3/6
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m555s[0m 6s/step - loss: 0.0313 - val_loss: 0.0563
Epoch 4/6
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m525s[0m 6s/step - loss: 0.0196 - val_loss: 0.0523
Epoch 5/6
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m529s[0m 6s/step - loss: 0.0100 - val_loss: 0.0513
Epoch 6/6
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m573s[0m 7s/step - loss: 0.0063 - val_loss: 0.0492
[1m108/108[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m220s[0m 2s/step
(224, 224, 3)


In [None]:

def get_embeddings(model, images):
    return model.predict(images)
def create_embedding_model(input_shape):
    base_model = ResNet50(include_top=False, weights='imagenet', input_shape=input_shape)
    # Freeze base_model layers
    for layer in base_model.layers:
        layer.trainable = False
    
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(128)(x)
    x = Lambda(lambda v: tf.math.l2_normalize(v, axis=1))(x)
    
    model = Model(inputs=base_model.input, outputs=x)
    return model

model_path = 'test.weights.h5'  # Replace with your model file

input_shape = (224, 224, 3)
embedding_model = create_embedding_model(input_shape)
    
if not os.path.exists(model_path):
    print(f"Model weights file not found: {model_path}")

embedding_model.load_weights(model_path)

embeddings = get_embeddings(embedding_model, processed_images)
def compute_distance(emb1, emb2):
    return np.linalg.norm(emb1 - emb2)

all_same = []
labels_set = set(processed_labels)
for label in labels_set:
    same_person_indices = np.where(processed_labels != label)[0]
    if len(same_person_indices) > 1:
        idx1, idx2 = same_person_indices[:2]
        distance_same = compute_distance(embeddings[idx1], embeddings[idx2])
        all_same.append(distance_same)
       #print(f"Distance between same person ({label_dict[label]} ): {distance_same}")

a = 0
for i in range(len(all_same)):
    a += all_same[i]

if len(all_same) > 0:
    print(a / len(all_same))
    print(min(all_same), max(all_same))
else:
    print("No pairs found for same person comparison.")

all_dif = []
labels_set = set(processed_labels)
for label in labels_set:
    same_person_indices = np.where(processed_labels == label)[0]
    if len(same_person_indices) > 1:
        idx1, idx2 = same_person_indices[:2]
        distance_same = compute_distance(embeddings[idx1], embeddings[idx2])
        all_dif.append(distance_same)
       #print(f"Distance between same person ({label_dict[label]} ): {distance_same}")

a = 0
for i in range(len(all_dif)):
    a += all_dif[i]

if len(all_dif) > 0:
    print(a / len(all_dif))
    print(min(all_dif), max(all_dif))
else:
    print("No pairs found for dif person comparison.")

[1m107/107[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m167s[0m 2s/step
1.0480235
0.83653784 1.0483295
0.9764533
0.83653784 1.0483295
