In [12]:
!pip install tensorflow pillow numpy scikit-learn matplotlib pandas -q

In [13]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("jessicali9530/lfw-dataset")

print("Path to dataset files:", path)

Path to dataset files: /kaggle/input/lfw-dataset


In [14]:
import os
import random
import tensorflow as tf
import numpy as np
import pandas as pd
from PIL import Image
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
import os, random, math, glob, shutil
from pathlib import Path
from zipfile import ZipFile
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing import image as kimage
from sklearn.model_selection import train_test_split

# Set random seed
tf.random.set_seed(42)
np.random.seed(42)
random.seed(42)

# Configure GPU memory growth
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    print('Using GPU:', gpus)
else:
    print('Using CPU')

# Dataset paths
root_dir = '/kaggle/input/lfw-dataset/lfw-deepfunneled/lfw-deepfunneled'
csv_dir = '/kaggle/input/lfw-dataset'

Using GPU: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU'), PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU')]


In [15]:
def semi_hard_triplet_loss(embed_anchor, embed_positive, embed_negative, margin=1.0):
    d_ap = tf.reduce_sum(tf.square(embed_anchor - embed_positive), axis=1)
    d_an = tf.reduce_sum(tf.square(embed_anchor - embed_negative), axis=1)
    semi_hard_mask = tf.cast(tf.logical_and(d_ap < d_an, d_an < d_ap + margin), tf.float32)
    loss = tf.maximum(0.0, d_ap - d_an + margin)
    loss = loss * semi_hard_mask
    valid_triplets = tf.reduce_sum(semi_hard_mask)
    loss = tf.reduce_sum(loss) / tf.maximum(valid_triplets, 1e-10)
    return loss


# def triplet_loss(anchor, positive, negative, margin=0.2):
#    pos_dist = tf.reduce_sum(tf.square(anchor - positive), axis=-1)
#    neg_dist = tf.reduce_sum(tf.square(anchor - negative), axis=-1)
#    loss = tf.maximum(pos_dist - neg_dist + margin, 0.0)
#    return tf.reduce_mean(loss)

In [16]:
def load_people_csv(csv_path):
    df = pd.read_csv(csv_path)
    return df[['name', 'images']].values.tolist()

train_val_people = load_people_csv(os.path.join(csv_dir, 'peopleDevTrain.csv'))
test_people = load_people_csv(os.path.join(csv_dir, 'peopleDevTest.csv'))

# Filter >=2 images, sort by image count, take top 100
train_val_people = sorted([p for p in train_val_people if p[1] >= 2], key=lambda x: x[1], reverse=True)[:100]
test_people = sorted([p for p in test_people if p[1] >= 2], key=lambda x: x[1], reverse=True)[:100]
print(f'Train+Val persons: {len(train_val_people)}, Test persons: {len(test_people)}')

# Split train_val
train_people, val_people = train_test_split(train_val_people, test_size=0.2, random_state=42)
print(f'Train persons: {len(train_people)}, Val: {len(val_people)}')

def get_image_paths(root_dir, people_list):
    image_paths = {}
    for person, _ in people_list:
        person_dir = os.path.join(root_dir, person)
        if os.path.isdir(person_dir):
            image_paths[person] = [os.path.join(person_dir, img) for img in os.listdir(person_dir) if img.endswith('.jpg')]
    return image_paths

train_image_paths = get_image_paths(root_dir, train_people)
val_image_paths = get_image_paths(root_dir, val_people)
test_image_paths = get_image_paths(root_dir, test_people)

Train+Val persons: 100, Test persons: 100
Train persons: 80, Val: 20


In [17]:
def preprocess_train(image_path):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, [224, 224])
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_brightness(image, max_delta=0.2)
    image = tf.image.random_contrast(image, lower=0.8, upper=1.2)
    image = image / 255.0
    image = tf.keras.applications.resnet50.preprocess_input(image * 255.0)
    return image

def preprocess_test(image_path):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, [224, 224])
    image = image / 255.0
    image = tf.keras.applications.resnet50.preprocess_input(image * 255.0)
    return image

In [18]:
def triplet_dataset(image_paths_dict, preprocess_fn, batch_size=8):
    person_list = list(image_paths_dict.keys())
    def generator():
        while True:
            anchor_person = random.choice(person_list)
            if len(image_paths_dict[anchor_person]) < 2:
                continue
            anchor_path, positive_path = random.sample(image_paths_dict[anchor_person], 2)
            negative_person = random.choice([p for p in person_list if p != anchor_person])
            negative_path = random.choice(image_paths_dict[negative_person])
            yield anchor_path, positive_path, negative_path
    
    dataset = tf.data.Dataset.from_generator(
        generator,
        output_types=(tf.string, tf.string, tf.string),
        output_shapes=((), (), ())
    )
    dataset = dataset.map(
        lambda a, p, n: (preprocess_fn(a), preprocess_fn(p), preprocess_fn(n)),
        num_parallel_calls=tf.data.AUTOTUNE
    )
    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset

In [19]:
def create_backbone():
    base_model = tf.keras.applications.ResNet50(
        include_top=False,
        weights='imagenet',
        input_shape=(224, 224, 3)
    )
    for layer in base_model.layers[:143]: 
        layer.trainable = False
    for layer in base_model.layers[143:]:
        layer.trainable = True
    x = base_model.output
    x = tf.keras.layers.GlobalAveragePooling2D()(x)
 #   x = tf.keras.layers.Dense(512, activation=None, dtype='float32')(x)
    x = tf.keras.layers.Dense(512, kernel_regularizer=tf.keras.regularizers.l2(0.01), activation = None, dtype='float32')(x)
#   x = tf.keras.layers.Lambda(lambda x: tf.nn.l2_normalize(x, axis=1),output_shape=(512,))(x)
    x = tf.keras.layers.UnitNormalization(axis=1)(x)
    return tf.keras.Model(inputs=base_model.input, outputs=x)

backbone = create_backbone()

def forward_one(image):
    return backbone(image)

def forward_triplet(anchor, positive, negative):
    embed_anchor = forward_one(anchor)
    embed_positive = forward_one(positive)
    embed_negative = forward_one(negative)
    return embed_anchor, embed_positive, embed_negative

In [20]:
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-5)

@tf.function
def train_step(anchor, positive, negative):
    with tf.GradientTape() as tape:
        embed_a, embed_p, embed_n = forward_triplet(anchor, positive, negative)
        loss = semi_hard_triplet_loss(embed_a, embed_p, embed_n, margin=0.75)
    gradients = tape.gradient(loss, backbone.trainable_variables)
    optimizer.apply_gradients(zip(gradients, backbone.trainable_variables))
    return loss
    
@tf.function
def val_step(anchor, positive, negative):
    embed_a, embed_p, embed_n = forward_triplet(anchor, positive, negative)
    loss = semi_hard_triplet_loss(embed_a, embed_p, embed_n, margin=0.75)
    return loss

def train_epoch(dataset, num_batches=100):
    total_loss = 0.0
    iterator = iter(dataset)
    for _ in range(num_batches):
        anchor, positive, negative = next(iterator)
        loss = train_step(anchor, positive, negative)
        total_loss += loss.numpy()
        tf.keras.backend.clear_session()
        gc.collect()
    return total_loss / max(1, num_batches)

def val_epoch(dataset, num_batches=100):
    total_loss = 0.0
    iterator = iter(dataset)
    for _ in range(num_batches):
        anchor, positive, negative = next(iterator)
        loss = val_step(anchor, positive, negative)
        total_loss += loss.numpy()
        tf.keras.backend.clear_session()
        gc.collect()
    return total_loss / max(1, num_batches)

In [None]:
import gc
num_epochs = 8
batch_size = 32
train_losses = []
val_losses = []
num_batches_per_epoch = 100

#early_stopping = tf.keras.callbacks.EarlyStopping(
#    monitor='val_loss',
#    patience=2,
#    restore_best_weights=True,
#    verbose=1
#)

for epoch in range(num_epochs):
    train_dataset = triplet_dataset(train_image_paths, preprocess_train, batch_size=batch_size)
    val_dataset = triplet_dataset(val_image_paths, preprocess_test, batch_size=batch_size)
    train_loss = train_epoch(train_dataset, num_batches_per_epoch)
    val_loss = val_epoch(val_dataset, num_batches_per_epoch)
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    print(f'Epoch {epoch+1}/{num_epochs}: Train Loss {train_loss:.4f}, Val Loss {val_loss:.4f}')
 #   early_stopping.on_epoch_end(epoch, logs={'val_loss': val_loss})
 #   if early_stopping.stopped_epoch > 0:
 #       print(f"Early stopping triggered at epoch {early_stopping.stopped_epoch + 1}")
 #       break
    tf.keras.backend.clear_session()
    gc.collect()

backbone.save('/kaggle/working/siamese_try_6.keras')
print('Model saved')

plt.figure(figsize=(10, 5))
plt.plot(range(1, num_epochs+1), train_losses, label='Train Loss')
plt.plot(range(1, num_epochs+1), val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Losses')
plt.savefig('/kaggle/working/losses_plot_try_6.png')
plt.show()

In [None]:
from IPython.display import FileLink
FileLink(r'losses_plot_try_5.png')

In [11]:
from IPython.display import FileLink
FileLink(r'siamese_try_5.keras')