In [None]:
# Mount Google Drive (already mounted, skipping this step as per output)
from google.colab import drive
drive.mount('/content/drive')

# Unzip dataset (already done, skipping)
from zipfile import ZipFile
zip_file_path = '/content/drive/MyDrive/NIR-VIS-2.0.zip'
extract_path = '/content/dataset/'
with ZipFile(zip_file_path, 'r') as zip_ref:
     zip_ref.extractall(extract_path)

Mounted at /content/drive


In [None]:
# (Previous imports remain the same)
from tensorflow.keras.mixed_precision import set_global_policy
import os
import glob
import cv2
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Layer
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.callbacks import EarlyStopping, LearningRateScheduler
from tensorflow.keras.applications import ResNet50
import matplotlib.pyplot as plt
import seaborn as sns

# Enable mixed precision training
set_global_policy('mixed_float16')

# Check GPU availability
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print(f"GPU available: {gpus}")
else:
    print("No GPU available. Running on CPU.")

# Load paired images (already working)
def load_paired_images(vis_dirs, nir_dirs, img_size=(128, 128)):
    images = []
    labels = []
    for vis_dir, nir_dir in zip(vis_dirs, nir_dirs):
        vis_paths = glob.glob(os.path.join(vis_dir, '**/*.bmp'), recursive=True)
        nir_paths = glob.glob(os.path.join(nir_dir, '**/*.bmp'), recursive=True)
        vis_by_id = {}
        nir_by_id = {}
        for p in vis_paths:
            identity = os.path.basename(os.path.dirname(p))
            if identity not in vis_by_id:
                vis_by_id[identity] = []
            vis_by_id[identity].append(p)
        for p in nir_paths:
            identity = os.path.basename(os.path.dirname(p))
            if identity not in nir_by_id:
                nir_by_id[identity] = []
            nir_by_id[identity].append(p)
        common_ids = set(vis_by_id.keys()).intersection(nir_by_id.keys())
        print(f"Found {len(common_ids)} common identities")
        for identity in common_ids:
            vis_paths_id = vis_by_id[identity]
            nir_paths_id = nir_by_id[identity]
            for vis_path, nir_path in zip(vis_paths_id, nir_paths_id):
                vis_img = cv2.imread(vis_path, cv2.IMREAD_COLOR)
                nir_img = cv2.imread(nir_path, cv2.IMREAD_COLOR)
                if vis_img is None or nir_img is None:
                    continue
                vis_img = cv2.resize(vis_img, img_size)
                nir_img = cv2.resize(nir_img, img_size)
                images.extend([vis_img, nir_img])
                labels.extend([identity, identity])
    images = np.array(images)
    labels = np.array(labels)
    print(f"Loaded {len(images)} paired images with shape: {images.shape}")
    return images, labels

# Load images
vis_dirs = ['/content/dataset/s1/VIS_128x128']
nir_dirs = ['/content/dataset/s1/NIR_128x128']
images, labels = load_paired_images(vis_dirs, nir_dirs)

# Filter classes with insufficient samples
unique_labels, counts = np.unique(labels, return_counts=True)
min_samples = 10
valid_labels = unique_labels[counts >= min_samples]
mask = np.isin(labels, valid_labels)
images = images[mask]
labels = labels[mask]
print(f"Reduced to {len(valid_labels)} classes with at least {min_samples} samples each.")

# Normalize images
images = images.astype('float32') / 255.0

# Encode labels
label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(labels)

# Split into training and test sets (80-20 split)
num_pairs = len(images) // 2
pair_indices = np.arange(num_pairs)
train_indices, test_indices = train_test_split(pair_indices, test_size=0.2, random_state=42)

# Convert pair indices to image indices (each pair has 2 images)
train_image_indices = np.concatenate([train_indices * 2, train_indices * 2 + 1])
test_image_indices = np.concatenate([test_indices * 2, test_indices * 2 + 1])

# Ensure indices are sorted to maintain order
train_image_indices.sort()
test_image_indices.sort()

# Create training and test sets
train_images = images[train_image_indices]
train_labels = labels[train_image_indices]
test_images = images[test_image_indices]
test_labels = labels[test_image_indices]

print(f"Training set: {len(train_images)} images")
print(f"Test set: {len(test_images)} images")

# Build embedding model (moved up for triplet generation)
base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(128, 128, 3))
base_model.trainable = True
for layer in base_model.layers[:-20]:
    layer.trainable = False

embedding_model = Sequential([
    base_model,
    GlobalAveragePooling2D(),
    Dense(128, activation=None, dtype='float32'),
    tf.keras.layers.Dropout(0.3)
])

# Create a tf.data.Dataset for triplets with semi-hard negative mining
def create_triplet_dataset(images, labels, embedding_model, batch_size=16, num_triplets=10000):
    # Generate embeddings for all images
    embeddings = embedding_model.predict(images)

    def generate_triplets():
        label_list = np.unique(labels)
        if len(label_list) == 0:
            raise ValueError("No labels available to create triplets.")
        for _ in range(num_triplets):
            anchor_label = np.random.choice(label_list)
            anchor_pos_indices = np.where(labels == anchor_label)[0]
            if len(anchor_pos_indices) < 2:
                continue
            anchor_idx, pos_idx = np.random.choice(anchor_pos_indices, 2, replace=False)

            # Compute distance between anchor and positive
            anchor_emb = embeddings[anchor_idx]
            pos_emb = embeddings[pos_idx]
            pos_dist = np.sum(np.square(anchor_emb - pos_emb))

            # Semi-hard negative mining: select a negative where pos_dist < neg_dist < pos_dist + margin
            margin = 0.5
            neg_candidates = np.where(labels != anchor_label)[0]
            neg_dists = np.sum(np.square(embeddings[neg_candidates] - anchor_emb), axis=1)
            semi_hard_mask = (neg_dists > pos_dist) & (neg_dists < pos_dist + margin)
            valid_neg_indices = neg_candidates[semi_hard_mask]

            if len(valid_neg_indices) == 0:
                # Fallback to random negative if no semi-hard negatives are found
                neg_idx = np.random.choice(np.where(labels != anchor_label)[0])
            else:
                neg_idx = np.random.choice(valid_neg_indices)

            yield (images[anchor_idx], images[pos_idx], images[neg_idx])

    dataset = tf.data.Dataset.from_generator(
        generate_triplets,
        output_signature=(
            tf.TensorSpec(shape=(128, 128, 3), dtype=tf.float32),
            tf.TensorSpec(shape=(128, 128, 3), dtype=tf.float32),
            tf.TensorSpec(shape=(128, 128, 3), dtype=tf.float32)
        )
    )

    dataset = dataset.map(lambda anchor, pos, neg: tf.stack([anchor, pos, neg], axis=0),
                          num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.map(lambda triplet: (triplet, tf.constant(0.0, dtype=tf.float32)),
                          num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.shuffle(buffer_size=1000)
    return dataset

# Create the triplet dataset
num_triplets = 10000
dataset = create_triplet_dataset(train_images, train_labels, embedding_model, num_triplets=num_triplets)

# Split into training and validation datasets
triplet_count = 0
for _ in dataset:
    triplet_count += 1
print(f"Total number of triplets generated: {triplet_count}")

validation_split = 0.1
val_size = int(triplet_count * validation_split)
train_size = triplet_count - val_size

train_dataset = dataset.take(train_size)
val_dataset = dataset.skip(train_size)

batch_size = 16
train_dataset = train_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
val_dataset = val_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

# Define a custom layer for triplet processing
class TripletLayer(Layer):
    def __init__(self, embedding_model, **kwargs):
        super(TripletLayer, self).__init__(**kwargs)
        self.embedding_model = embedding_model

    def call(self, inputs):
        anchor = self.embedding_model(inputs[:, 0])
        positive = self.embedding_model(inputs[:, 1])
        negative = self.embedding_model(inputs[:, 2])
        return tf.concat([anchor, positive, negative], axis=-1)

# Build triplet model
inputs = tf.keras.Input(shape=(3, 128, 128, 3))
triplet_output = TripletLayer(embedding_model)(inputs)
triplet_model = tf.keras.Model(inputs, triplet_output)

# Define triplet loss
def triplet_loss(y_true, y_pred, alpha=0.5):
    anchor, positive, negative = y_pred[:, :128], y_pred[:, 128:256], y_pred[:, 256:]
    pos_dist = tf.reduce_sum(tf.square(anchor - positive), axis=-1)
    neg_dist = tf.reduce_sum(tf.square(anchor - negative), axis=-1)
    loss = tf.maximum(pos_dist - neg_dist + alpha, 0.0)
    return tf.reduce_mean(loss)

# Compile model
triplet_model.compile(optimizer=tf.keras.optimizers.Adam(1e-4), loss=triplet_loss)

# Callbacks
def lr_schedule(epoch):
    lr = 1e-4
    if epoch > 10:
        lr *= 0.5
    if epoch > 20:
        lr *= 0.5
    return lr

early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
lr_scheduler = LearningRateScheduler(lr_schedule)

# Train model
history = triplet_model.fit(
    train_dataset,
    epochs=50,
    validation_data=val_dataset,
    callbacks=[early_stopping, lr_scheduler]
)

# Updated evaluation function with cosine similarity distribution
def evaluate_embeddings(images, labels, num_pairs=1000):
    embeddings = embedding_model.predict(images)
    unique_labels = np.unique(labels)

    # Generate positive and negative pairs
    similarities = []
    pair_labels = []  # 1 for positive (same identity), 0 for negative (different identity)

    # Positive pairs (same identity)
    for _ in range(num_pairs // 2):
        label = np.random.choice(unique_labels)
        indices = np.where(labels == label)[0]
        if len(indices) < 2:
            continue
        idx1, idx2 = np.random.choice(indices, 2, replace=False)
        emb1, emb2 = embeddings[idx1], embeddings[idx2]
        sim = np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2))
        similarities.append(sim)
        pair_labels.append(1)

    # Negative pairs (different identities)
    for _ in range(num_pairs // 2):
        label1, label2 = np.random.choice(unique_labels, 2, replace=False)
        idx1 = np.random.choice(np.where(labels == label1)[0])
        idx2 = np.random.choice(np.where(labels == label2)[0])
        emb1, emb2 = embeddings[idx1], embeddings[idx2]
        sim = np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2))
        similarities.append(sim)
        pair_labels.append(0)

    # Convert to numpy arrays
    similarities = np.array(similarities)
    pair_labels = np.array(pair_labels)

    # Plot histogram of similarities
    plt.figure(figsize=(8, 6))
    plt.hist(similarities[pair_labels == 1], bins=30, alpha=0.5, label='Positive Pairs', color='blue')
    plt.hist(similarities[pair_labels == 0], bins=30, alpha=0.5, label='Negative Pairs', color='red')
    plt.title('Cosine Similarity Distribution')
    plt.xlabel('Cosine Similarity')
    plt.ylabel('Frequency')
    plt.legend()
    plt.grid(True)
    plt.show()

    # Test different thresholds
    thresholds = np.arange(0.1, 1.0, 0.1)
    accuracies = []
    for thresh in thresholds:
        predictions = (similarities > thresh).astype(int)
        accuracy = np.mean(predictions == pair_labels)
        accuracies.append(accuracy)
        print(f"Threshold {thresh:.1f}: Accuracy = {accuracy:.4f}")

    # Plot accuracy vs threshold
    plt.figure(figsize=(6, 4))
    plt.plot(thresholds, accuracies, marker='o')
    plt.title('Accuracy vs Cosine Similarity Threshold')
    plt.xlabel('Threshold')
    plt.ylabel('Accuracy')
    plt.grid(True)
    plt.show()

    # Return the best accuracy and threshold
    best_idx = np.argmax(accuracies)
    best_threshold = thresholds[best_idx]
    best_accuracy = accuracies[best_idx]
    print(f"Best threshold: {best_threshold:.1f}, Best accuracy: {best_accuracy:.4f}")
    return best_accuracy, best_threshold

# Evaluate on the test set
best_accuracy, best_threshold = evaluate_embeddings(test_images, test_labels)

# Plot training loss
plt.figure(figsize=(6, 4))
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Val Loss')
plt.title('Triplet Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

No GPU available. Running on CPU.
Found 0 common identities
Loaded 0 paired images with shape: (0,)
Reduced to 0 classes with at least 10 samples each.


ValueError: With n_samples=0, test_size=0.2 and train_size=None, the resulting train set will be empty. Adjust any of the aforementioned parameters.

In [None]:
!nvidia-smi

Sun Jun 15 10:13:02 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   77C    P0             37W /   70W |    1410MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [None]:
!nvcc --version
!cat /usr/include/cudnn_version.h | grep CUDNN_MAJOR -A 2
print("TensorFlow Version:", tf.__version__)
print("Built with CUDA:", tf.test.is_built_with_cuda())
print("GPU Available:", tf.test.gpu_device_name())

nvcc: NVIDIA (R) Cuda compiler driver
Copyright (c) 2005-2024 NVIDIA Corporation
Built on Thu_Jun__6_02:18:23_PDT_2024
Cuda compilation tools, release 12.5, V12.5.82
Build cuda_12.5.r12.5/compiler.34385749_0
#define CUDNN_MAJOR 9
#define CUDNN_MINOR 2
#define CUDNN_PATCHLEVEL 1
--
#define CUDNN_VERSION (CUDNN_MAJOR * 10000 + CUDNN_MINOR * 100 + CUDNN_PATCHLEVEL)

/* cannot use constexpr here since this is a C-only file */
TensorFlow Version: 2.18.0
Built with CUDA: True
GPU Available: /device:GPU:0


In [None]:
# Replace the existing evaluate_embeddings function with this
def evaluate_embeddings(images, labels, spectrum_type="VIS", num_pairs=1000):
    embeddings = embedding_model.predict(images)
    unique_labels = np.unique(labels)

    # Separate VIS and NIR indices (assuming pairs: even for VIS, odd for NIR)
    if spectrum_type == "VIS":
        image_indices = np.arange(0, len(images), 2)  # VIS as first in each pair
    else:  # NIR
        image_indices = np.arange(1, len(images), 2)  # NIR as second in each pair

    embeddings = embeddings[image_indices]
    labels = labels[image_indices]

    # Generate pairs for ranking
    ranks = []
    for _ in range(num_pairs):
        # Positive pair (same identity)
        label = np.random.choice(unique_labels)
        indices = np.where(labels == label)[0]
        if len(indices) < 2:
            continue
        idx1, idx2 = np.random.choice(indices, 2, replace=False)
        emb1, emb2 = embeddings[idx1], embeddings[idx2]
        sim = np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2))

        # Negative pairs for ranking
        neg_indices = np.where(labels != label)[0]
        neg_sims = [np.dot(emb1, embeddings[neg_idx]) / (np.linalg.norm(emb1) * np.linalg.norm(embeddings[neg_idx])) for neg_idx in neg_indices]
        all_sims = np.array([sim] + neg_sims)  # Positive sim + negative sims
        all_indices = np.array([idx2] + neg_indices.tolist())
        sorted_indices = all_indices[np.argsort(all_sims)[::-1]]  # Sort descending
        rank = np.where(sorted_indices == idx2)[0][0] + 1  # Rank of positive match
        ranks.append(rank)

    # Compute CMC curve
    max_rank = min(10, len(unique_labels))  # Limit to rank 10 or number of classes
    cmc = np.zeros(max_rank)
    for rank in ranks:
        if rank <= max_rank:
            cmc[rank - 1] += 1
    cmc = cmc / len(ranks)  # Normalize to get rates

    # Cumulative sum for CMC
    cmc = np.cumsum(cmc)

    # Plot CMC curve
    plt.figure(figsize=(6, 4))
    plt.plot(range(1, max_rank + 1), cmc, marker='o')
    plt.title(f'CMC Curve for {spectrum_type} Spectrum (Generated at 01:26 PM IST, June 23, 2025)')
    plt.xlabel('Rank')
    plt.ylabel('Identification Rate')
    plt.grid(True)
    plt.show()

    return cmc

# Add this after the training section to generate the graphs
# Ensure images and labels are loaded correctly before this
if len(images) > 0:  # Check if data is loaded
    print("Generating CMC curve for VIS spectrum (Fig. 4)...")
    cmc_vis = evaluate_embeddings(images, labels, spectrum_type="VIS")

    print("Generating CMC curve for NIR spectrum (Fig. 5)...")
    cmc_nir = evaluate_embeddings(images, labels, spectrum_type="NIR")
else:
    print("No images loaded. Please fix the dataset paths and rerun the notebook.")

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix

def evaluate_embeddings(images, labels, train_images, train_labels, spectrum_type="VIS", num_pairs=1000):
    # Generate embeddings for all images
    embeddings = embedding_model.predict(images)
    train_embeddings = embedding_model.predict(train_images)

    # Separate VIS and NIR indices (assuming pairs: even for VIS, odd for NIR)
    if spectrum_type == "VIS":
        image_indices = np.arange(0, len(images), 2)  # VIS as first in each pair
        train_indices = np.arange(0, len(train_images), 2)  # VIS training set
    else:  # NIR
        image_indices = np.arange(1, len(images), 2)  # NIR as second in each pair
        train_indices = np.arange(1, len(train_images), 2)  # NIR training set

    embeddings = embeddings[image_indices]
    labels = labels[image_indices]
    train_embeddings = train_embeddings[train_indices]
    train_labels = train_labels[train_indices]

    unique_labels = np.unique(labels)

    # Generate pairs for CMC
    ranks = []
    for _ in range(num_pairs):
        label = np.random.choice(unique_labels)
        indices = np.where(labels == label)[0]
        if len(indices) < 2:
            continue
        idx1, idx2 = np.random.choice(indices, 2, replace=False)
        emb1, emb2 = embeddings[idx1], embeddings[idx2]
        sim = np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2))

        neg_indices = np.where(labels != label)[0]
        neg_sims = [np.dot(emb1, embeddings[neg_idx]) / (np.linalg.norm(emb1) * np.linalg.norm(embeddings[neg_idx])) for neg_idx in neg_indices]
        all_sims = np.array([sim] + neg_sims)
        all_indices = np.array([idx2] + neg_indices.tolist())
        sorted_indices = all_indices[np.argsort(all_sims)[::-1]]
        rank = np.where(sorted_indices == idx2)[0][0] + 1
        ranks.append(rank)

    # Compute CMC curve
    max_rank = min(10, len(unique_labels))
    cmc = np.zeros(max_rank)
    for rank in ranks:
        if rank <= max_rank:
            cmc[rank - 1] += 1
    cmc = cmc / len(ranks)
    cmc = np.cumsum(cmc)

    # Plot CMC curve
    plt.figure(figsize=(6, 4))
    plt.plot(range(1, max_rank + 1), cmc, marker='o')
    plt.title(f'CMC Curve for {spectrum_type} Spectrum (Generated at 01:53 PM IST, June 23, 2025)')
    plt.xlabel('Rank')
    plt.ylabel('Identification Rate')
    plt.grid(True)
    plt.show()

    # Generate confusion matrix
    # Predict labels using nearest neighbor on train embeddings
    predicted_labels = []
    for emb in embeddings:
        sims = np.array([np.dot(emb, train_emb) / (np.linalg.norm(emb) * np.linalg.norm(train_emb)) for train_emb in train_embeddings])
        pred_idx = np.argmax(sims)
        predicted_labels.append(train_labels[pred_idx])

    predicted_labels = np.array(predicted_labels)
    true_labels = labels

    # Compute confusion matrix
    cm = confusion_matrix(true_labels, predicted_labels, labels=unique_labels)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=unique_labels, yticklabels=unique_labels)
    plt.title(f'Confusion Matrix for {spectrum_type} Spectrum (Generated at 01:53 PM IST, June 23, 2025)')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.show()

    return cmc, cm

# Add this after the training section to generate graphs
if len(images) > 0:  # Check if data is loaded
    print("Generating CMC curve and confusion matrix for VIS spectrum (Fig. 4)...")
    cmc_vis, cm_vis = evaluate_embeddings(images, labels, train_images, train_labels, spectrum_type="VIS")

    print("Generating CMC curve and confusion matrix for NIR spectrum (Fig. 5)...")
    cmc_nir, cm_nir = evaluate_embeddings(images, labels, train_images, train_labels, spectrum_type="NIR")
else:
    print("No images loaded. Please fix the dataset paths and rerun the notebook.")