# Salamander Siamese Network

Created by Alejandro Marin (816035363)

Edited by:

Thought Process:

We initially thought that this was a simple classification problem. However, when we observed the sample solution we realised that this was way more complicated. We needed to actually identify the specific animal by an ID. So our first thought was a Siamese network where we compare the test image with the database.


In [None]:
!pip install -q tensorflow pandas matplotlib opencv-python

In [None]:
import os
os.listdir()

['.config', 'SalamanderID2025.zip', 'metadata.csv', 'sample_data']

In [None]:
!unzip -q SalamanderID2025.zip -d .

In [None]:
import pandas as pd

# Load the original metadata but remove images
df = pd.read_csv('metadata.csv')

# Fixed the 'path' column to remove 'images/' prefix - using colab and loading each separately, image is WAY too big to load
df['path'] = df['path'].str.replace(r'^images/', '', regex=True)

# Save the fixed version for reuse in later phases
df.to_csv('metadata_fixed.csv', index=False)

print(df.head())


   image_id            identity  \
0         0  LynxID2025_lynx_37   
1         1  LynxID2025_lynx_37   
2         2  LynxID2025_lynx_49   
3         3                 NaN   
4         4  LynxID2025_lynx_13   

                                                path date orientation species  \
0  LynxID2025/database/000f9ee1aad063a4485379ec06...  NaN       right    lynx   
1  LynxID2025/database/0020edb6689e9f78462394d5a6...  NaN        left    lynx   
2  LynxID2025/database/003152e4145b5b6940091d5c12...  NaN        left    lynx   
3  LynxID2025/query/003b89301c7b9f6d18f722082617f...  NaN        back    lynx   
4  LynxID2025/database/003c3f82011e9c3f849f945a93...  NaN       right    lynx   

      split     dataset  
0  database  LynxID2025  
1  database  LynxID2025  
2  database  LynxID2025  
3     query  LynxID2025  
4  database  LynxID2025  


In [None]:
import pandas as pd
from collections import defaultdict

# Load new metadata
df = pd.read_csv('metadata_fixed.csv')

# Step 1: Filter by only SeaTurtleID2022 + database images (so the ones that specifically labelled as database - refer to metadata csv)
salamander_df = df[(df['dataset'] == 'SalamanderID2025') & (df['split'] == 'database')]

# Step 2: Drop rows with missing values (shouldn't though, just precaution)
salamander_df = salamander_df.dropna(subset=['identity', 'orientation', 'path'])

# Step 3: Group images by identity and orientation - now this is because like I expolained before we are going to have different phases and will deal with the orientation in later phases, first is to ensure is works
grouped_data = defaultdict(lambda: defaultdict(list)) #this would be used, the lambda part is to initialise any missing data as an empty defaultdict - this is to store all the
#data

for _, row in salamander_df.iterrows():
    identity = row['identity']
    orientation = row['orientation'].lower()
    path = row['path']

    grouped_data[identity][orientation].append(path)     #now recap if forgotten but we would be taking the direct images in the database folder and use the info from metadata to get
    #the actual label of this image, this is why we do this and have it in this format

#Check the first two
for identity, orientations in list(grouped_data.items())[:2]:
    print(f"Identity: {identity}")
    for orientation, paths in orientations.items():
        print(f"  Orientation: {orientation}, {len(paths)} images")


Identity: SalamanderID2025_2
  Orientation: top, 4 images
Identity: SalamanderID2025_3
  Orientation: top, 3 images


In [None]:
import random
from itertools import combinations

positive_pairs = [] #for phase 1 I want to deal with same orientation and same ID - to make sure it works (no val yet)

for identity, orientation_dict in grouped_data.items():
    for orientation, paths in orientation_dict.items():

        if len(paths) >= 2:# Ensures there are at least two image paths in the paths  list for the current orientation and ID
            for img1, img2 in combinations(paths, 2):  #combinations (python itertools)
                positive_pairs.append((img1, img2, 1))  # 1 = this would be positive label

# Shuffle for training randomness
random.shuffle(positive_pairs)

print(f"Generated {len(positive_pairs)} positive pairs")
print("Sample:", positive_pairs[:5])


Generated 1126 positive pairs
Sample: [('SalamanderID2025/database/images/80a618b159b58d6c_1043.jpg', 'SalamanderID2025/database/images/a5f79371654ddfe4_1343.jpg', 1), ('SalamanderID2025/database/images/145b2e3b47f30c2a_172.jpg', 'SalamanderID2025/database/images/ec52f4e22dccdb1d_1903.jpg', 1), ('SalamanderID2025/database/images/1478452802817df8_174.jpg', 'SalamanderID2025/database/images/e605136528466ad5_1863.jpg', 1), ('SalamanderID2025/database/images/b124fc14367bf188_1441.jpg', 'SalamanderID2025/database/images/2fd5149df1ddee16_386.jpg', 1), ('SalamanderID2025/database/images/9f2b0f1d1cd6d838_1290.jpg', 'SalamanderID2025/database/images/866528982db541b3_1093.jpg', 1)]


In [None]:
import tensorflow as tf

IMG_SIZE = (224, 224)

def preprocess_image(path):  #preprocess for NN as normal
    image = tf.io.read_file(path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, IMG_SIZE)
    image = image / 255.0  # Normalize to [0, 1]
    return image


In [None]:
# Separate the triplets into three lists
img1_paths = [pair[0] for pair in positive_pairs]
img2_paths = [pair[1] for pair in positive_pairs]
labels     = [pair[2] for pair in positive_pairs]


In [None]:
dataset = tf.data.Dataset.from_tensor_slices((img1_paths, img2_paths, labels)) #sets up a tf dataset pipeline to process image pairs
#convert each input into a dataset (each item a tuple)

def load_pair(img1_path, img2_path, label):
    img1 = preprocess_image(img1_path) #prep each image
    img2 = preprocess_image(img2_path)
    return (img1, img2), tf.cast(label, tf.float32) #float 32 is standard for  tf ops

dataset = dataset.map(load_pair, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.shuffle(1024).batch(32).prefetch(tf.data.AUTOTUNE)


# Phase 1.1 Training - Same Orientation (Same ID)

## Define Siamese Model (Base CNN)

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, Model

# Shared CNN base -  a normal tf CNN
def create_embedding_network(input_shape=(224, 224, 3)):
    inputs = tf.keras.Input(shape=input_shape)

    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(inputs) #conv layer with 64 filters, 3x3 kernel
    x = layers.MaxPooling2D()(x)  #maxpool to "downsize" - reduce but keep info by highest value in block

    x = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D()(x)

    x = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D()(x)

    x = layers.Flatten()(x)
    x = layers.Dense(256, activation='relu')(x)
    x = layers.Dense(128)(x)  # final embedding vector

    return Model(inputs, x, name="EmbeddingNetwork")


In [None]:
def contrastive_loss(y_true, y_pred, margin=0.5):
    # y_true: 1 if same, 0 if different
    square_pred = tf.square(y_pred)
    margin_square = tf.square(tf.maximum(margin - y_pred, 0))
    return tf.reduce_mean(y_true * square_pred + (1 - y_true) * margin_square)

class DistanceLayer(tf.keras.layers.Layer):
    def call(self, anchor, candidate):
        return tf.sqrt(tf.reduce_sum(tf.square(anchor - candidate + 1e-9), axis=1))


Construct SN

In [None]:
# Input shape (img1, img2)
input_shape = (224, 224, 3)
embedding_net = create_embedding_network(input_shape)

# Two input branches
input_a = tf.keras.Input(shape=input_shape)
input_b = tf.keras.Input(shape=input_shape)

# Pass both through the same embedding network
embedding_a = embedding_net(input_a)
embedding_b = embedding_net(input_b)

# Distance calculation
distance = DistanceLayer()(embedding_a, embedding_b)

# Full model
siamese_model = tf.keras.Model(inputs=[input_a, input_b], outputs=distance)
siamese_model.compile(optimizer='adam', loss=contrastive_loss)


Fit

In [None]:
# Train with the dataset built earlier
siamese_model.fit(dataset, epochs=3)


Epoch 1/3
[1m36/36[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m27s[0m 290ms/step - loss: 0.1017
Epoch 2/3
[1m36/36[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 186ms/step - loss: 2.0500e-08
Epoch 3/3
[1m36/36[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 186ms/step - loss: 6.0786e-09


<keras.src.callbacks.history.History at 0x7849a35ad450>

# Phase 1.2 Training - Same Orientation (with Negative pairs)

In [None]:
import random
from itertools import product

# Phase 1.2: Generate negative pairs (different identity, same orientation)
negative_pairs = []

identities = list(grouped_data.keys())

for i in range(len(identities)):
    id1 = identities[i]
    for j in range(i + 1, len(identities)):
        id2 = identities[j]

        # Find shared orientations
        common_orientations = set(grouped_data[id1].keys()) & set(grouped_data[id2].keys())

        for orientation in common_orientations:
            paths1 = grouped_data[id1][orientation]
            paths2 = grouped_data[id2][orientation]

            # Create all cross-identity pairs for the shared orientation
            for img1, img2 in product(paths1, paths2):
                negative_pairs.append((img1, img2, 0))  # 0 = negative

print(f"Generated {len(negative_pairs)} negative pairs")
print("Sample:", negative_pairs[:5])


positive_pairs = []

for identity, orientation_dict in grouped_data.items():
    for orientation, paths in orientation_dict.items():
        # Generate all unique combinations of two images for same ID & orientation
        if len(paths) >= 2:
            for img1, img2 in combinations(paths, 2):
                positive_pairs.append((img1, img2, 1))  # 1 = positive label

# Shoofle
random.shuffle(positive_pairs)

print(f"Generated {len(positive_pairs)} positive pairs")
print("Sample:", positive_pairs[:5])

Generated 485513 negative pairs
Sample: [('SalamanderID2025/database/images/eafd41b675ff3330_1893.jpg', 'SalamanderID2025/database/images/56d75a5db470298e_703.jpg', 0), ('SalamanderID2025/database/images/eafd41b675ff3330_1893.jpg', 'SalamanderID2025/database/images/5c80da39ea556a4f_749.jpg', 0), ('SalamanderID2025/database/images/eafd41b675ff3330_1893.jpg', 'SalamanderID2025/database/images/e254f8215312b187_1832.jpg', 0), ('SalamanderID2025/database/images/e80dc66902b38838_1881.jpg', 'SalamanderID2025/database/images/56d75a5db470298e_703.jpg', 0), ('SalamanderID2025/database/images/e80dc66902b38838_1881.jpg', 'SalamanderID2025/database/images/5c80da39ea556a4f_749.jpg', 0)]
Generated 1126 positive pairs
Sample: [('SalamanderID2025/database/images/b01ad4ebc731652f_1428.jpg', 'SalamanderID2025/database/images/0a8470c0e83644dc_83.jpg', 1), ('SalamanderID2025/database/images/d6db420cedad7c63_1740.jpg', 'SalamanderID2025/database/images/0bf4548387fc2ad4_100.jpg', 1), ('SalamanderID2025/datab

In [None]:
import tensorflow as tf

IMG_SIZE = (224, 224)

# Preprocess
def preprocess_image(path):
    image = tf.io.read_file(path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, IMG_SIZE)
    return image / 255.0

#load a pair of images and a label
def load_pair(img1_path, img2_path, label):
    img1 = preprocess_image(img1_path)
    img2 = preprocess_image(img2_path)
    return (img1, img2), tf.cast(label, tf.float32)

# Build dataset
dataset = tf.data.Dataset.from_tensor_slices((img1_paths, img2_paths, labels))
dataset = dataset.map(load_pair, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.shuffle(buffer_size=1024).batch(32).prefetch(tf.data.AUTOTUNE)


In [None]:
# Cap to N pairs total (balanced between positive & negative)
N = 10000  # NOTE - Adjust as needed

# Shuffle first to ensure mix
random.shuffle(positive_pairs)
random.shuffle(negative_pairs)

# Trim each (AGAIN, adjust N if needed but running this raw was pain)
positive_pairs = positive_pairs[:N//2]
negative_pairs = negative_pairs[:N//2]

# Combine and shuffle again
all_pairs = positive_pairs + negative_pairs
random.shuffle(all_pairs)

print(f"Using {len(all_pairs)} total pairs for training.")


Using 6126 total pairs for training.


In [None]:
all_pairs = positive_pairs + negative_pairs
random.shuffle(all_pairs)

img1_paths = [p[0] for p in all_pairs]
img2_paths = [p[1] for p in all_pairs]
labels     = [p[2] for p in all_pairs]

dataset = tf.data.Dataset.from_tensor_slices((img1_paths, img2_paths, labels))
dataset = dataset.map(load_pair, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.shuffle(buffer_size=1024).batch(32).prefetch(tf.data.AUTOTUNE)

siamese_model.fit(dataset, epochs=3)    #SWITCH BACK TO 10 LATER


Epoch 1/3
[1m192/192[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m46s[0m 219ms/step - loss: 0.0674
Epoch 2/3
[1m192/192[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 192ms/step - loss: 0.0378
Epoch 3/3
[1m192/192[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 194ms/step - loss: 0.0291


<keras.src.callbacks.history.History at 0x7849202a9cd0>

# Phase 1.3 Training - Same Orientation (Positive pairs have random augmentation)

In [None]:
import tensorflow as tf
import random

# Augmentation function
def augment_image(image):
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_brightness(image, max_delta=0.1)
    image = tf.image.random_contrast(image, lower=0.9, upper=1.1)
    return image

# Preprocess image with optional augmentation
def preprocess_image(path, augment=False):
    image = tf.io.read_file(path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, [224, 224])
    image = image / 255.0
    if augment:
        image = augment_image(image)
    return image

# Load a pair with augmentation flag
def load_augmented_pair(img1_path, img2_path, label, augment_flag):
    img1 = preprocess_image(img1_path, augment=False)         # anchor
    img2 = preprocess_image(img2_path, augment=augment_flag)  # candidate (possibly augmented)
    return (img1, img2), tf.cast(label, tf.float32)

# Phase 1.3: Mix positive + negative with some augmented
augment_probability = 0.5
augmented_positive_pairs = []

for img1, img2, label in positive_pairs:
    do_augment = random.random() < augment_probability
    augmented_positive_pairs.append((img1, img2, 1, do_augment))

# Negative pairs (no augment)
augmented_negative_pairs = [(img1, img2, 0, False) for img1, img2, _ in negative_pairs]

# Combine & shuffle
combined_pairs = augmented_positive_pairs + augmented_negative_pairs
random.shuffle(combined_pairs)
print(f"Total training pairs: {len(combined_pairs)}")
print(f"Sample: {combined_pairs[:2]}")


Total training pairs: 6126
Sample: [('SalamanderID2025/database/images/360d3d2b98199920_442.jpg', 'SalamanderID2025/database/images/9aa9c32b0b7482cb_1262.jpg', 0, False), ('SalamanderID2025/database/images/928dd6b849070bd8_1189.jpg', 'SalamanderID2025/database/images/d25afdd14c607cbd_1702.jpg', 0, False)]


In [None]:
# Extract columns
img1_paths     = tf.constant([p[0] for p in combined_pairs])
img2_paths     = tf.constant([p[1] for p in combined_pairs])
labels         = tf.constant([p[2] for p in combined_pairs], dtype=tf.float32)
augment_flags  = tf.constant([p[3] for p in combined_pairs], dtype=tf.bool)

# Build TensorFlow Dataset
dataset = tf.data.Dataset.from_tensor_slices((img1_paths, img2_paths, labels, augment_flags))
dataset = dataset.map(load_augmented_pair, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.shuffle(1024).batch(32).prefetch(tf.data.AUTOTUNE)


In [None]:
siamese_model.fit(dataset, epochs=3)  #CHANGE BACK TO 10


Epoch 1/3
[1m192/192[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 191ms/step - loss: 0.0216
Epoch 2/3
[1m192/192[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 192ms/step - loss: 0.0162
Epoch 3/3
[1m192/192[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 194ms/step - loss: 0.0121


<keras.src.callbacks.history.History at 0x7849af576210>

# Phase 1.4 Training - Same Orientation (Add Test Split)

In [None]:
from sklearn.model_selection import train_test_split

# Combined_pairs is already shuffled because we are reusing
train_pairs, val_pairs = train_test_split(combined_pairs, test_size=0.2, random_state=42)

print(f"Train pairs: {len(train_pairs)}")
print(f"Validation pairs: {len(val_pairs)}")


Train pairs: 4900
Validation pairs: 1226


In [None]:
def build_dataset(pairs, augment_on_img2=True):
    img1_paths = tf.constant([p[0] for p in pairs])
    img2_paths = tf.constant([p[1] for p in pairs])
    labels     = tf.constant([p[2] for p in pairs], dtype=tf.float32)
    augment_flags = tf.constant([p[3] if augment_on_img2 else False for p in pairs], dtype=tf.bool)

    ds = tf.data.Dataset.from_tensor_slices((img1_paths, img2_paths, labels, augment_flags))
    ds = ds.map(load_augmented_pair, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.shuffle(1024).batch(32).prefetch(tf.data.AUTOTUNE)
    return ds

train_dataset = build_dataset(train_pairs)
val_dataset   = build_dataset(val_pairs, augment_on_img2=False)  # Don't augment during validation


In [None]:
siamese_model.fit(train_dataset, validation_data=val_dataset, epochs=3)  #CHANGE BACK TO 10


Epoch 1/10
[1m154/154[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m47s[0m 280ms/step - loss: 0.0018 - val_loss: 0.0014
Epoch 2/10
[1m154/154[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 228ms/step - loss: 0.0018 - val_loss: 0.0018
Epoch 3/10
[1m154/154[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 231ms/step - loss: 0.0015 - val_loss: 0.0016
Epoch 4/10
[1m154/154[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 230ms/step - loss: 0.0014 - val_loss: 0.0017
Epoch 5/10
[1m154/154[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 230ms/step - loss: 0.0018 - val_loss: 0.0021
Epoch 6/10
[1m154/154[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 230ms/step - loss: 0.0017 - val_loss: 0.0019
Epoch 7/10
[1m154/154[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 231ms/step - loss: 0.0019 - val_loss: 0.0022
Epoch 8/10
[1m154/154[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 229ms/step - loss: 0.0017 - val_loss: 0.0023
Epoch 9/10
[1m1

<keras.src.callbacks.history.History at 0x7a448c4777d0>

# Phase 2.1 Training - Change Orientations

In [None]:
from itertools import product

# Generate positive pairs where the identity is the same but orientation is different
cross_orientation_positive_pairs = []

for identity, orientation_dict in grouped_data.items():
    orientations = list(orientation_dict.keys())

    # Cross-orientation pairs: left-right, front-back, etc.
    for i in range(len(orientations)):
        for j in range(i + 1, len(orientations)):
            orient1 = orientations[i]
            orient2 = orientations[j]
            paths1 = orientation_dict[orient1]
            paths2 = orientation_dict[orient2]

            for img1, img2 in product(paths1, paths2):
                cross_orientation_positive_pairs.append((img1, img2, 1))

print(f"Generated {len(cross_orientation_positive_pairs)} cross-orientation positive pairs")
print("Sample:", cross_orientation_positive_pairs[:5])


Generated 1057 cross-orientation positive pairs
Sample: [('SalamanderID2025/database/images/30c4a661e585e5a7_392.jpg', 'SalamanderID2025/database/images/300f05747c664509_388.jpg', 1), ('SalamanderID2025/database/images/30c4a661e585e5a7_392.jpg', 'SalamanderID2025/database/images/db9ea4061b99894e_1781.jpg', 1), ('SalamanderID2025/database/images/30c4a661e585e5a7_392.jpg', 'SalamanderID2025/database/images/dd28b3f3c330b764_1793.jpg', 1), ('SalamanderID2025/database/images/30c4a661e585e5a7_392.jpg', 'SalamanderID2025/database/images/b1fb0538cda86891_1452.jpg', 1), ('SalamanderID2025/database/images/30c4a661e585e5a7_392.jpg', 'SalamanderID2025/database/images/ccdec086fcbadc9f_1665.jpg', 1)]


Reuse old negative pair (will add orientated negative pair in 2.2)

In [None]:
# Combine with existing negative pairs
combined_pairs_2_1 = cross_orientation_positive_pairs + negative_pairs
random.shuffle(combined_pairs_2_1)

# Augmentation flags (only for positive, same logic as before)
augment_probability = 0.5
augmented_pairs_2_1 = []
for img1, img2, label in combined_pairs_2_1:
    do_augment = label == 1 and random.random() < augment_probability
    augmented_pairs_2_1.append((img1, img2, label, do_augment))


In [None]:
from sklearn.model_selection import train_test_split

# Split
train_pairs_2_1, val_pairs_2_1 = train_test_split(augmented_pairs_2_1, test_size=0.2, random_state=42)

# Dataset builder (reuses existing functions)
train_dataset_2_1 = build_dataset(train_pairs_2_1)
val_dataset_2_1   = build_dataset(val_pairs_2_1, augment_on_img2=False)

# Train
siamese_model.fit(train_dataset_2_1, validation_data=val_dataset_2_1, epochs=10)


Epoch 1/10
[1m152/152[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m46s[0m 275ms/step - loss: 0.0450 - val_loss: 0.0405
Epoch 2/10
[1m152/152[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 224ms/step - loss: 0.0374 - val_loss: 0.0409
Epoch 3/10
[1m152/152[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 224ms/step - loss: 0.0232 - val_loss: 0.0311
Epoch 4/10
[1m152/152[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 225ms/step - loss: 0.0161 - val_loss: 0.0293
Epoch 5/10
[1m152/152[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 228ms/step - loss: 0.0123 - val_loss: 0.0279
Epoch 6/10
[1m152/152[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 228ms/step - loss: 0.0104 - val_loss: 0.0268
Epoch 7/10
[1m152/152[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 227ms/step - loss: 0.0080 - val_loss: 0.0252
Epoch 8/10
[1m152/152[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 227ms/step - loss: 0.0069 - val_loss: 0.0246
Epoch 9/10
[1m1

<keras.src.callbacks.history.History at 0x7a44d44e04d0>

# Phase 2.2 Training - Change Orientation (Add Negative Orientation Change)

In [None]:
from itertools import product

# Generate positive pairs where the identity is the same but orientation is different
cross_orientation_positive_pairs = []

for identity, orientation_dict in grouped_data.items():
    orientations = list(orientation_dict.keys())

    # Cross-orientation pairs: left-right, front-back, etc.
    for i in range(len(orientations)):
        for j in range(i + 1, len(orientations)):
            orient1 = orientations[i]
            orient2 = orientations[j]
            paths1 = orientation_dict[orient1]
            paths2 = orientation_dict[orient2]

            for img1, img2 in product(paths1, paths2):
                cross_orientation_positive_pairs.append((img1, img2, 1))

print(f"Generated {len(cross_orientation_positive_pairs)} cross-orientation positive pairs")
print("Sample:", cross_orientation_positive_pairs[:5])


Generated 1057 cross-orientation positive pairs
Sample: [('SalamanderID2025/database/images/30c4a661e585e5a7_392.jpg', 'SalamanderID2025/database/images/300f05747c664509_388.jpg', 1), ('SalamanderID2025/database/images/30c4a661e585e5a7_392.jpg', 'SalamanderID2025/database/images/db9ea4061b99894e_1781.jpg', 1), ('SalamanderID2025/database/images/30c4a661e585e5a7_392.jpg', 'SalamanderID2025/database/images/dd28b3f3c330b764_1793.jpg', 1), ('SalamanderID2025/database/images/30c4a661e585e5a7_392.jpg', 'SalamanderID2025/database/images/b1fb0538cda86891_1452.jpg', 1), ('SalamanderID2025/database/images/30c4a661e585e5a7_392.jpg', 'SalamanderID2025/database/images/ccdec086fcbadc9f_1665.jpg', 1)]


In [None]:
negative_pairs = []

identities = list(grouped_data.keys())

for i in range(len(identities)):
    id1 = identities[i]
    for j in range(i + 1, len(identities)):
        id2 = identities[j]

        for ori1 in grouped_data[id1]:
            for ori2 in grouped_data[id2]:
                paths1 = grouped_data[id1][ori1]
                paths2 = grouped_data[id2][ori2]

                for img1, img2 in product(paths1, paths2):
                    negative_pairs.append((img1, img2, 0))


In [None]:
# Cap to N pairs total (balanced between positive & negative)
N = 10000

# Shuffle first to ensure mix
random.shuffle(cross_orientation_positive_pairs)
random.shuffle(negative_pairs)

# Trim each
cross_orientation_positive_pairs = cross_orientation_positive_pairs[:N//2]
negative_pairs = negative_pairs[:N//2]

# Combine with existing negative pairs
combined_pairs_2_1 = cross_orientation_positive_pairs + negative_pairs
random.shuffle(combined_pairs_2_1)

print(f"Using {len(combined_pairs_2_1)} total pairs for training.")

Using 6057 total pairs for training.


In [None]:


# Augmentation flags (only for positive, same logic as before)
augment_probability = 0.5
augmented_pairs_2_1 = []
for img1, img2, label in combined_pairs_2_1:
    do_augment = label == 1 and random.random() < augment_probability
    augmented_pairs_2_1.append((img1, img2, label, do_augment))


In [None]:
from sklearn.model_selection import train_test_split

# Split
train_pairs_2_1, val_pairs_2_1 = train_test_split(augmented_pairs_2_1, test_size=0.2, random_state=42)

# Dataset builder (reuses existing functions)
train_dataset_2_1 = build_dataset(train_pairs_2_1)
val_dataset_2_1   = build_dataset(val_pairs_2_1, augment_on_img2=False)

# Train
siamese_model.fit(train_dataset_2_1, validation_data=val_dataset_2_1, epochs=10)


Epoch 1/10
[1m152/152[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m51s[0m 309ms/step - loss: 0.0364 - val_loss: 0.0397
Epoch 2/10
[1m152/152[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 234ms/step - loss: 0.0274 - val_loss: 0.0335
Epoch 3/10
[1m152/152[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 237ms/step - loss: 0.0178 - val_loss: 0.0314
Epoch 4/10
[1m152/152[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 236ms/step - loss: 0.0123 - val_loss: 0.0298
Epoch 5/10
[1m152/152[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 236ms/step - loss: 0.0103 - val_loss: 0.0286
Epoch 6/10
[1m152/152[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 237ms/step - loss: 0.0085 - val_loss: 0.0266
Epoch 7/10
[1m152/152[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 238ms/step - loss: 0.0072 - val_loss: 0.0261
Epoch 8/10
[1m152/152[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 239ms/step - loss: 0.0057 - val_loss: 0.0248
Epoch 9/10
[1m1

<keras.src.callbacks.history.History at 0x7849805ad750>

# Final Phase (Hopefully)

First save model in case


In [None]:
siamese_model.save('siamese_model.h5')



Ok now the fun begins (ps it is 6:00  am rn, started at 8 yesterday)


Ok I go sleep, continue later God willing. Have game exam tomorrow so maybe not idk

In [None]:
siamese_model.save('siamese_model')

ValueError: Invalid filepath extension for saving. Please add either a `.keras` extension for the native Keras format (recommended) or a `.h5` extension. Use `model.export(filepath)` if you want to export a SavedModel for use with TFLite/TFServing/etc. Received: filepath=siamese_model.

In [None]:
import tensorflow as tf
siamese_model = tf.keras.models.load_model('siamese_model.h5')

ValueError: Unknown layer: 'DistanceLayer'. Please ensure you are using a `keras.utils.custom_object_scope` and that this object is included in the scope. See https://www.tensorflow.org/guide/keras/save_and_serialize#registering_the_custom_object for details.

In [None]:
import pandas as pd
from collections import defaultdict

# Load metadata from scratch
df = pd.read_csv('metadata_fixed.csv')

# Step 1: Filter only SalamanderID2025 + query images
query_df = df[(df['dataset'] == 'SalamanderID2025') & (df['split'] == 'query')]

# Step 2: Drop rows with missing critical fields
query_df = query_df.dropna(subset=['orientation', 'path'])

# Step 3: Group by orientation only (no identity in query set)
grouped_query = defaultdict(list)

for _, row in query_df.iterrows():
    orientation = row['orientation'].lower()
    path = row['path']

    grouped_query[orientation].append(path)

# Check first few orientations and paths
for orientation, paths in list(grouped_query.items())[:3]:
    print(f"Orientation: {orientation} → {len(paths)} images")
    print("  Sample:", paths[:2])


Orientation: top → 391 images
  Sample: ['SalamanderID2025/query/images/0737a5a022dc4a70_52.jpg', 'SalamanderID2025/query/images/421034673af2bedf_531.jpg']
Orientation: left → 2 images
  Sample: ['SalamanderID2025/query/images/0ca1b405a8494159_106.jpg', 'SalamanderID2025/query/images/07910c22b2a7de3c_57.jpg']
Orientation: right → 296 images
  Sample: ['SalamanderID2025/query/images/62a6adc09c8be8dc_806.jpg', 'SalamanderID2025/query/images/a3ad62a6f23aac6b_1329.jpg']


In [None]:
import numpy as np

THRESHOLD = 0.2  # Tune based on validation, so maybe look at a negative pair contrastive diff and use to set

# Load database embeddings again if needed
db_df = df[(df['dataset'] == 'SalamanderID2025') & (df['split'] == 'database')].dropna(subset=['identity', 'path'])

db_image_paths = db_df['path'].tolist()
db_identities = db_df['identity'].tolist()

# Generate DB embeddings
db_embeddings = []
valid_db_paths = []

for path in db_image_paths:
    try:
        img = preprocess_image(path)
        img = tf.expand_dims(img, axis=0)
        emb = embedding_net(img).numpy().squeeze()
        db_embeddings.append(emb)
        valid_db_paths.append(path)
    except:
        print("Skipping:", path)

db_embeddings = np.array(db_embeddings)
print(f"Loaded {len(db_embeddings)} DB embeddings")

# Run predictions for query images
query_predictions = {}

for orientation, query_paths in grouped_query.items():
    for query_path in query_paths:
        try:
            img = preprocess_image(query_path)
            img = tf.expand_dims(img, axis=0)
            query_emb = embedding_net(img).numpy().squeeze()

            # Compute Euclidean distances
            dists = np.linalg.norm(db_embeddings - query_emb, axis=1)
            min_idx = np.argmin(dists)
            min_dist = dists[min_idx]

            if min_dist > THRESHOLD:
                predicted_id = "unknown"
            else:
                predicted_id = db_identities[min_idx]

            query_predictions[query_path] = {
                "predicted_id": predicted_id,
                "distance": float(min_dist)
            }

        except Exception as e:
            print(f"Error on {query_path}:", e)

# Preview sample predictions
for k, v in list(query_predictions.items())[:5]:
    print(f"Image Path: {k} -> {v['predicted_id']} (distance: {v['distance']:.4f})")


Loaded 1388 DB embeddings
Image Path: SalamanderID2025/query/images/0737a5a022dc4a70_52.jpg -> SalamanderID2025_438 (distance: 0.1652)
Image Path: SalamanderID2025/query/images/421034673af2bedf_531.jpg -> SalamanderID2025_212 (distance: 0.1768)
Image Path: SalamanderID2025/query/images/4534bd79f3e6b736_561.jpg -> SalamanderID2025_347 (distance: 0.1821)
Image Path: SalamanderID2025/query/images/6ba9cca71738e4ed_870.jpg -> SalamanderID2025_157 (distance: 0.1424)
Image Path: SalamanderID2025/query/images/79449d8ed0ff0262_989.jpg -> SalamanderID2025_345 (distance: 0.1900)


In [None]:
import random
import numpy as np

# Pick 2 different identities
unique_ids = list(grouped_data.keys())
id1, id2 = random.sample(unique_ids, 2)

# Randomly select one image from each identity
img1_path = random.choice([img for imgs in grouped_data[id1].values() for img in imgs])
img2_path = random.choice([img for imgs in grouped_data[id2].values() for img in imgs])

print(f"Testing with:")
print(f"Image 1: {img1_path} (ID: {id1})")
print(f"Image 2: {img2_path} (ID: {id2})")

# Preprocess and embed
img1 = preprocess_image(img1_path)
img2 = preprocess_image(img2_path)
img1 = tf.expand_dims(img1, axis=0)
img2 = tf.expand_dims(img2, axis=0)

emb1 = embedding_net(img1)
emb2 = embedding_net(img2)

# Compute Euclidean distance (contrastive loss = dist)
distance = tf.norm(emb1 - emb2).numpy().item()

print(f"Computed distance: {distance:.4f}")


Testing with:
Image 1: SalamanderID2025/database/images/2259c7546282471a_286.jpg (ID: SalamanderID2025_534)
Image 2: SalamanderID2025/database/images/145b2e3b47f30c2a_172.jpg (ID: SalamanderID2025_406)
Computed distance: 0.4615


Export to csv

In [None]:
import csv

with open("salamander_predictions.csv", "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["query_path", "predicted_id", "distance"])
    for path, pred in query_predictions.items():
        writer.writerow([path, pred['predicted_id'], pred['distance']])
