# Training using MobileNetV2

In [1]:
import os
import random
import tensorflow as tf
from glob import glob
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Layer
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score


2025-07-01 12:40:07.883821: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1751373608.338264      35 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1751373608.450410      35 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [2]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [3]:
def get_all_images(root_dir):
    person_images = {}
    for person in os.listdir(root_dir):
        person_path = os.path.join(root_dir, person)
        if os.path.isdir(person_path):
            distorted_folder = os.path.join(person_path, 'distortion')
            distorted_images = glob(os.path.join(distorted_folder, '*.jpg'))
            clean_images = [f for f in glob(os.path.join(person_path, '*.jpg')) if 'distortion' not in f]
            if len(clean_images) >= 1 and len(distorted_images) >= 1:
                person_images[person_path] = {
                    'clean': clean_images,
                    'distorted': distorted_images
                }
    return person_images

In [4]:
def create_pairs(person_images, num_neg_pairs=5000):
    pairs = []
    persons = list(person_images.keys())
    for person in persons:
        clean_imgs = person_images[person]['clean']
        distorted_imgs = person_images[person]['distorted']
        for c in clean_imgs:
            for d in distorted_imgs:
                pairs.append((c, d, 1))
        for i in range(len(clean_imgs)):
            for j in range(i + 1, len(clean_imgs)):
                pairs.append((clean_imgs[i], clean_imgs[j], 1))
        for i in range(len(distorted_imgs)):
            for j in range(i + 1, len(distorted_imgs)):
                pairs.append((distorted_imgs[i], distorted_imgs[j], 1))
    while len([p for p in pairs if p[2] == 0]) < num_neg_pairs:
        p1, p2 = random.sample(persons, 2)
        img1 = random.choice(person_images[p1]['clean'] + person_images[p1]['distorted'])
        img2 = random.choice(person_images[p2]['clean'] + person_images[p2]['distorted'])
        pairs.append((img1, img2, 0))
    return pairs


In [5]:
def preprocess(path):
    byte_img = tf.io.read_file(path)
    img = tf.io.decode_jpeg(byte_img, channels=3)
    img = tf.image.resize(img, (160, 160))
    img = preprocess_input(img)
    return img

In [6]:
def preprocess_pair(path1, path2, label):
    return (preprocess(path1), preprocess(path2), tf.convert_to_tensor(label))

In [7]:
def get_tf_dataset(pairs):
    path1 = [p[0] for p in pairs]
    path2 = [p[1] for p in pairs]
    labels = [p[2] for p in pairs]
    ds1 = tf.data.Dataset.from_tensor_slices(path1)
    ds2 = tf.data.Dataset.from_tensor_slices(path2)
    lbls = tf.data.Dataset.from_tensor_slices(labels)
    dataset = tf.data.Dataset.zip((ds1, ds2, lbls))
    dataset = dataset.map(preprocess_pair, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.shuffle(2048).batch(64).prefetch(tf.data.AUTOTUNE)
    return dataset

In [8]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [9]:
def make_embedding(trainable=False):
    base_model = MobileNetV2(include_top=False, input_shape=(160, 160, 3), pooling='avg', weights='imagenet')
    base_model.trainable = trainable
    inputs = Input(shape=(160, 160, 3))
    x = base_model(inputs)
    x = Dense(256, activation='relu')(x)
    x = Dense(128, activation='sigmoid')(x)
    return Model(inputs, x, name='MobileNetV2_Embedding')

In [10]:
embedding_model = make_embedding(trainable=False)
def make_siamese_model():
    input_image = Input(name='input_img', shape=(160, 160, 3))
    validation_image = Input(name='validation_img', shape=(160, 160, 3))
    distances = L1Dist()(embedding_model(input_image), embedding_model(validation_image))
    outputs = Dense(1, activation='sigmoid')(distances)
    return Model(inputs=[input_image, validation_image], outputs=outputs, name='SiameseNetwork')


I0000 00:00:1751373841.680352      35 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 13942 MB memory:  -> device: 0, name: Tesla T4, pci bus id: 0000:00:04.0, compute capability: 7.5
I0000 00:00:1751373841.681100      35 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:1 with 13942 MB memory:  -> device: 1, name: Tesla T4, pci bus id: 0000:00:05.0, compute capability: 7.5


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_160_no_top.h5
[1m9406464/9406464[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


In [11]:
TRAIN_ROOT = "/kaggle/input/comsys-taskb/Comys_Hackathon5/Task_B/train"
train_images = get_all_images(TRAIN_ROOT)
train_pairs = create_pairs(train_images, num_neg_pairs=5000)
train_pairs = random.sample(train_pairs, 200000)
train_data = get_tf_dataset(train_pairs)

siamese_model = make_siamese_model()
loss_fn = tf.losses.BinaryCrossentropy()
optimizer = tf.keras.optimizers.Adam(1e-4)
train_auc = tf.keras.metrics.AUC()

@tf.function
def train_step(batch):
    with tf.GradientTape() as tape:
        x = batch[:2]
        y = tf.cast(batch[2], tf.float32)
        yhat = siamese_model(x, training=True)
        loss = loss_fn(y, yhat)
        train_auc.update_state(y, yhat)
    gradients = tape.gradient(loss, siamese_model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, siamese_model.trainable_variables))
    return loss

In [15]:
def train_model(train_data, epochs=5):
    for epoch in range(epochs):
        print(f"Epoch {epoch+1}/{epochs}")
        progbar = tf.keras.utils.Progbar(len(train_data))
        for idx, batch in enumerate(train_data):
            loss = train_step(batch)
            progbar.update(idx + 1)
        print(f"\nEpoch {epoch+1} AUC: {train_auc.result().numpy():.4f}")
        train_auc.reset_state()

        # Unfreeze MobileNetV2 after 2 epochs
        if epoch == 1:
            print("\nUnfreezing MobileNetV2 for fine-tuning...")
            base_model = embedding_model.get_layer('mobilenetv2_1.00_160')
            base_model.trainable = True
            for layer in base_model.layers[:100]:
                layer.trainable = False
            optimizer.learning_rate.assign(1e-5)

train_model(train_data, epochs=5)

Epoch 1/5
[1m3125/3125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m328s[0m 104ms/step

Epoch 1 AUC: 0.9806
Epoch 2/5
[1m3125/3125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m323s[0m 102ms/step

Epoch 2 AUC: 0.9905

Unfreezing MobileNetV2 for fine-tuning...
Epoch 3/5
[1m3125/3125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m338s[0m 107ms/step

Epoch 3 AUC: 0.9973
Epoch 4/5
[1m3125/3125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m322s[0m 101ms/step

Epoch 4 AUC: 0.9975
Epoch 5/5
[1m3125/3125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m331s[0m 105ms/step

Epoch 5 AUC: 0.9986


In [16]:
siamese_model.save("face_verifier.h5")