In [None]:
import tensorflow as tf

# Check if a GPU is available
if tf.test.is_gpu_available():
    print("GPU is available")
else:
    print("GPU is NOT available")


In [None]:
import os
import random
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Dense, Input, GlobalAveragePooling2D, BatchNormalization, Dropout, Layer
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.regularizers import l2

In [None]:
# Avoid OOM errors by setting GPU Memory Consumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
# Setup paths
BASE_PATH = r'/kaggle/input/faces-935'
SUB_FOLDERS = os.listdir(BASE_PATH)

# Image preprocessing
def preprocess_image(img_path):
    img = load_img(img_path, target_size=(200, 200))
    img_array = img_to_array(img) / 255.0
    return img_array

# Data generator function to create triplets
def create_triplets(sub_folders):
    triplets = []

    for anchor_folder in sub_folders:
        anchor_images = os.listdir(os.path.join(BASE_PATH, anchor_folder))
        for anchor_img in anchor_images:
            anchor_img_path = os.path.join(BASE_PATH, anchor_folder, anchor_img)
            
            # Choose a random negative image from a different sub-folder
            negative_folder = random.choice(sub_folders)
            while negative_folder == anchor_folder:
                negative_folder = random.choice(sub_folders)
            negative_images = os.listdir(os.path.join(BASE_PATH, negative_folder))
            negative_img = random.choice(negative_images)
            negative_img_path = os.path.join(BASE_PATH, negative_folder, negative_img)
            
            # Choose a random positive image from the same anchor folder
            positive_img = random.choice(anchor_images)
            while positive_img == anchor_img:
                positive_img = random.choice(anchor_images)
            positive_img_path = os.path.join(BASE_PATH, anchor_folder, positive_img)
            
            triplets.append([anchor_img_path, positive_img_path, negative_img_path])
    
    print(f"Created {len(triplets)} triplets")
    return np.array(triplets)

triplets = create_triplets(SUB_FOLDERS)

# Create tf.data.Dataset
def triplet_generator(triplets):
    for triplet in triplets:
        anchor_img = preprocess_image(triplet[0])
        positive_img = preprocess_image(triplet[1])
        negative_img = preprocess_image(triplet[2])
        yield ({
            'anchor': anchor_img,
            'positive': positive_img,
            'negative': negative_img
        }, [0.0])  # Dummy label

# Reduced batch size to handle limited data
batch_size = 32

triplet_dataset = tf.data.Dataset.from_generator(
    lambda: triplet_generator(triplets), 
    output_signature=(
        {
            'anchor': tf.TensorSpec(shape=(200, 200, 3), dtype=tf.float32),
            'positive': tf.TensorSpec(shape=(200, 200, 3), dtype=tf.float32),
            'negative': tf.TensorSpec(shape=(200, 200, 3), dtype=tf.float32),
        },
        tf.TensorSpec(shape=(1,), dtype=tf.float32)  # Dummy label
    )
)

# Cache images to improve performance
triplet_dataset = triplet_dataset.cache()

In [None]:
# Repeat dataset to ensure it does not run out of data
triplet_dataset = triplet_dataset.shuffle(buffer_size=512).batch(batch_size).repeat().prefetch(tf.data.experimental.AUTOTUNE)

# Calculate steps per epoch
total_size = len(triplets)
train_size = int(0.8 * total_size)  # Adjusted split ratio for train/test
test_size = total_size - train_size
steps_per_epoch = train_size // batch_size 
validation_steps = test_size // batch_size

# Split dataset
train_dataset = triplet_dataset.take(train_size).repeat()
test_dataset = triplet_dataset.skip(train_size).take(test_size).repeat()

# Build Embedding Model with ResNet50
def make_embedding():
    base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(200, 200, 3))
    base_model.trainable = False  # Freeze the base model

    model = Sequential([
        base_model,
        GlobalAveragePooling2D(),
        # Dense(1024, activation='relu', kernel_regularizer=l2(0.01)),  # Larger Dense layer
        # BatchNormalization(),
        # Dropout(0.5),  # Adding dropout to prevent overfitting
        Dense(1024, activation='relu', kernel_regularizer=l2(0.01)),   # Last Dense layer
        BatchNormalization(),
        Dropout(0.5)  # Adding dropout to prevent overfitting
    ])
    return model

embedding = make_embedding()

# Siamese Network Model with Triplet Loss
input_anchor = Input(shape=(200, 200, 3), name='anchor')
input_positive = Input(shape=(200, 200, 3), name='positive')
input_negative = Input(shape=(200, 200, 3), name='negative')

embedding_anchor = embedding(input_anchor)
embedding_positive = embedding(input_positive)
embedding_negative = embedding(input_negative)

# Custom layer for concatenation and distance calculation
class TripletConcatenateLayer(Layer):
    def call(self, inputs):
        anchor, positive, negative = inputs
        return tf.concat([anchor, positive, negative], axis=1)

concatenated_embeddings = TripletConcatenateLayer()([embedding_anchor, embedding_positive, embedding_negative])

# Triplet Loss function
def triplet_loss(margin=0.5):
    def loss(y_true, y_pred):
        anchor, positive, negative = tf.split(y_pred, num_or_size_splits=3, axis=1)
        pos_dist = tf.reduce_sum(tf.square(anchor - positive), axis=1)
        neg_dist = tf.reduce_sum(tf.square(anchor - negative), axis=1)
        basic_loss = pos_dist - neg_dist + margin
        loss = tf.reduce_mean(tf.maximum(basic_loss, 0.0))
        return loss
    return loss

# Combine the embeddings into a model
siamese_model = Model(inputs=[input_anchor, input_positive, input_negative], 
                      outputs=concatenated_embeddings)

In [None]:
# Compile the model with the triplet loss and optimizer improvements
print("Compiling the model...")
siamese_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001, clipnorm=1.0), loss=triplet_loss())
print("Model compiled successfully")

# Custom callback to save the best embedding model
class SaveBestEmbeddingModel(tf.keras.callbacks.Callback):
    def __init__(self, embedding_model, save_path, monitor='val_loss', mode='min'):
        super(SaveBestEmbeddingModel, self).__init__()
        self.embedding_model = embedding_model
        self.save_path = save_path
        self.monitor = monitor
        self.mode = mode
        self.best = np.Inf if mode == 'min' else -np.Inf

    def on_epoch_end(self, epoch, logs=None):
        current = logs.get(self.monitor)
        if current is not None:
            if (self.mode == 'min' and current < self.best) or (self.mode == 'max' and current > self.best):
                self.best = current
                self.embedding_model.save(self.save_path)
                print(f"\nSaved best embedding model to {self.save_path} (epoch {epoch + 1}, {self.monitor}: {current})")

# Train the model with learning rate scheduling
print("Starting model training...")
EPOCHS = 100  # Set appropriate epochs for better training

# Learning rate scheduler
lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=1e-6, verbose=1)

# Early stopping to prevent overfitting
early_stopping = EarlyStopping(monitor='val_loss', patience=30, restore_best_weights=True, verbose=1)

# Instantiate the custom callback
save_best_embedding_model = SaveBestEmbeddingModel(embedding, 'best_embedding_model.h5', monitor='val_loss', mode='min')

history = siamese_model.fit(train_dataset, epochs=EPOCHS, steps_per_epoch=steps_per_epoch, 
                            validation_data=test_dataset, validation_steps=validation_steps, 
                            callbacks=[lr_scheduler, early_stopping, save_best_embedding_model])
print("Model training completed")

# Remove the existing model file if it exists
model_path = '/kaggle/working/siamese_model.keras'
if os.path.exists(model_path):
    os.remove(model_path)

# Save model in Keras format
siamese_model.save(model_path)
print("Model Saved")

# Save the full Siamese model in .h5 format
model_path_h5 = '/kaggle/working/siamese_model.h5'
siamese_model.save(model_path_h5)
print("Full Siamese model saved as .h5 file")

# Save just the embedding model
embedding_model_path = '/kaggle/working/embedding_model.h5'
embedding.save(embedding_model_path)
print("Embedding model saved as .h5 file")

In [None]:
# Training and validation loss
import matplotlib.pyplot as plt
def plot_loss(history):
    plt.figure(figsize=(10, 6))
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss Over Epochs')
    plt.legend()
    plt.grid(True)
    plt.show()

plot_loss(history)