## In The Name Of GOD
#### Developed By Eiliya Mohebi For Educational Purposes (As Part Of Filoger Advanced Computer Vision Course)

## Setup and Library Imports
This cell imports necessary libraries such as TensorFlow, NumPy, and others. It also sets the image size and configures GPU memory growth. 

In [None]:
import os
import tarfile
import tensorflow as tf
import numpy as np
from tensorflow.keras import layers, models
from tensorflow.keras.applications import ResNet50
from sklearn.model_selection import train_test_split
import random

IMG_SIZE = 224

from tensorflow.keras import mixed_precision
mixed_precision.set_global_policy("mixed_float16")

gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("✅ Enabled GPU memory growth.")
    except RuntimeError as e:
        print(e)

## Download and Prepare Oxford Flowers 102 Dataset
The following function downloads the Oxford Flowers 102 dataset, extracts the images from a tar file, downloads the corresponding labels, and organizes the images into class-specific folders.

In [None]:
def download_and_prepare_oxford_flowers102(download_dir="oxford_flowers102"):
    os.makedirs(download_dir, exist_ok=True)

    # URLs for the dataset
    data_url = "https://www.robots.ox.ac.uk/~vgg/data/flowers/102/102flowers.tgz"
    labels_url = "https://www.robots.ox.ac.uk/~vgg/data/flowers/102/imagelabels.mat"

    # 1) Download the images tar
    tar_path = tf.keras.utils.get_file(
        origin=data_url,
        fname="102flowers.tgz",
        cache_dir=download_dir,
        cache_subdir="",
        extract=False
    )

    # 2) Extract the tar into download_dir
    with tarfile.open(tar_path, "r:gz") as tar:
        tar.extractall(path=download_dir)

    # 3) Download the labels .mat file
    labels_path = tf.keras.utils.get_file(
        origin=labels_url,
        fname="imagelabels.mat",
        cache_dir=download_dir,
        cache_subdir=""
    )

    # 4) Read .mat to get image labels
    import scipy.io
    mat = scipy.io.loadmat(os.path.join(download_dir, "imagelabels.mat"))
    labels = mat["labels"][0]

    jpg_dir = os.path.join(download_dir, "jpg")

    for i, label in enumerate(labels):
        class_folder = os.path.join(download_dir, f"class_{label}")
        os.makedirs(class_folder, exist_ok=True)

        old_path = os.path.join(jpg_dir, f"image_{i+1:05d}.jpg")
        new_path = os.path.join(class_folder, f"image_{i+1:05d}.jpg")
        os.rename(old_path, new_path)

    os.rmdir(jpg_dir)

    return download_dir


base_dir = download_and_prepare_oxford_flowers102()

## Load Images and Get Image Paths with Labels
The following functions perform two key tasks:

- **load_image:** Reads an image from disk, resizes it to the defined size, and normalizes the pixel values to the range [0, 1].
- **get_image_paths_and_labels:** Walks through the dataset directory structure to retrieve image paths and the corresponding labels (folder names).

In [None]:
def load_image(image_path):
    """Load and resize a single image, normalizing to [0,1]."""
    image = tf.io.read_file(image_path)
    image = tf.image.decode_image(image, channels=3)
    image = tf.image.resize(image, (IMG_SIZE, IMG_SIZE))
    image = tf.cast(image, tf.float32) / 255.0
    return image

def get_image_paths_and_labels(base_dir):
    image_paths = []
    labels = []
    for root, dirs, files in os.walk(base_dir):
        for file in files:
            if file.lower().endswith(('.png', '.jpg', '.jpeg')):
                full_path = os.path.join(root, file)
                # Label is the name of the folder containing the image
                label = os.path.basename(os.path.dirname(full_path))
                image_paths.append(full_path)
                labels.append(label)
    return image_paths, labels


base_dir = "./oxford_flowers102"
image_paths, labels = get_image_paths_and_labels(base_dir)

print("Total images found:", len(image_paths))
print("Unique classes:", len(set(labels)))

from collections import defaultdict
class_to_images = defaultdict(list)
for path, label in zip(image_paths, labels):
    class_to_images[label].append(path)

# Convert to a list of (class_name, list_of_paths)
class_names = sorted(list(class_to_images.keys()))
num_classes = len(class_names)
print("Class names:", class_names)

# Shuffle paths within each class
for c in class_names:
    random.shuffle(class_to_images[c])

## Creating Episodes for Few-Shot Learning
The `create_episode` function creates one episode for few-shot learning. Each episode contains a support set and a query set sampled from a subset of classes. The function selects a few classes (n-way) and for each class picks `k_shot` examples for the support set and a fixed number for the query set.

In [None]:
def create_episode(
    class_to_imgs_dict, 
    class_names, 
    n_way=3, 
    k_shot=3, 
    n_query_per_class=2
):
    """
    Creates a single episode with:
      - support_x: (n_way * k_shot) images
      - support_y: (n_way * k_shot) labels
      - query_x:   (n_way * n_query_per_class) images
      - query_y:   (n_way * n_query_per_class) labels
    """
    chosen_classes = random.sample(class_names, n_way)
    
    support_x = []
    support_y = []
    query_x = []
    query_y = []
    
    for i, cls in enumerate(chosen_classes):
        # For each class, randomly sample images
        imgs = class_to_imgs_dict[cls]
        
        chosen_imgs = random.sample(imgs, k_shot + n_query_per_class)
        support_paths = chosen_imgs[:k_shot]
        query_paths   = chosen_imgs[k_shot:]
        
        # Load images and store
        for sp in support_paths:
            support_x.append(load_image(sp))
            support_y.append(i)  # label i for this class
        for qp in query_paths:
            query_x.append(load_image(qp))
            query_y.append(i)
    
    # Convert to tensors
    support_x = tf.stack(support_x, axis=0)
    support_y = tf.convert_to_tensor(support_y, dtype=tf.int32)
    query_x   = tf.stack(query_x, axis=0)
    query_y   = tf.convert_to_tensor(query_y, dtype=tf.int32)
    
    return support_x, support_y, query_x, query_y

## Building the Embedding Model
This function creates an embedding network using a pre-trained ResNet50 as the backbone (with its weights frozen) and adds a global average pooling, dropout, and a dense layer to output a 128-dimensional embedding.

In [16]:
def build_embedding_model(input_shape=(224, 224, 3)):
    base_model = ResNet50(weights='imagenet', include_top=False, input_shape=input_shape)
    # Freeze the base model
    base_model.trainable = False

    inputs = tf.keras.Input(shape=input_shape)
    x = base_model(inputs, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(128, activation=None)(x)  # 128-dim embedding
    
    model = tf.keras.Model(inputs, outputs, name="EmbeddingModel")
    return model

embedding_model = build_embedding_model((IMG_SIZE, IMG_SIZE, 3))
embedding_model.summary()

## Defining Loss Functions and Utility Functions
Below are the functions to compute the Euclidean distance and define the contrastive loss for the Siamese network. The `euclidean_distance` function computes the distance between two vectors, while the `contrastive_loss` returns a loss function that is used when compiling the Siamese model.

In [None]:
def euclidean_distance(vects):
    x, y = vects
    sum_square = tf.reduce_sum(tf.square(x - y), axis=1, keepdims=True)
    return tf.sqrt(tf.maximum(sum_square, 1e-9))

def contrastive_loss(margin=1.0):
    def loss(y_true, y_pred):
        # y_true is 0 or 1, y_pred is the distance
        square_pred = tf.square(y_pred)
        margin_square = tf.square(tf.maximum(margin - y_pred, 0.0))
        return tf.reduce_mean(
            y_true * square_pred + (1 - y_true) * margin_square
        )
    return loss

## Building the Siamese (Contrastive) Network
This function builds a Siamese network that takes two inputs, processes them via the embedding model, computes their Euclidean distance (via a Lambda layer), and compiles the model using the contrastive loss.

In [18]:
def build_siamese_network(embedding_model, margin=1.0):
    input_a = tf.keras.Input(shape=(IMG_SIZE, IMG_SIZE, 3), name="input_a")
    input_b = tf.keras.Input(shape=(IMG_SIZE, IMG_SIZE, 3), name="input_b")
    
    # Get embeddings
    emb_a = embedding_model(input_a)
    emb_b = embedding_model(input_b)
    
    # Distance
    distance = layers.Lambda(euclidean_distance, name="distance")([emb_a, emb_b])
    
    # Build model
    siamese_model = tf.keras.Model(inputs=[input_a, input_b], outputs=distance, name="SiameseModel")
    
    # Compile with contrastive loss
    siamese_model.compile(
        optimizer=tf.keras.optimizers.Adam(),
        loss=contrastive_loss(margin=margin)
    )
    
    return siamese_model

## Defining the Triplet Loss and Building the Triplet Network
For the triplet network, we define a custom triplet loss function which ensures that the distance between the anchor and the positive is smaller than the distance between the anchor and the negative by at least a margin. The `build_triplet_network` function creates a model that takes three inputs (anchor, positive, negative), concatenates the embeddings, and compiles the model with the triplet loss.

In [None]:
def triplet_loss(margin=1.0):

    def loss(y_true, y_pred):
        anchor, positive, negative = tf.split(y_pred, 3, axis=1)
        
        anchor = tf.squeeze(anchor, axis=1)
        positive = tf.squeeze(positive, axis=1)
        negative = tf.squeeze(negative, axis=1)
        
        # Distances
        pos_dist = tf.reduce_sum(tf.square(anchor - positive), axis=1)
        neg_dist = tf.reduce_sum(tf.square(anchor - negative), axis=1)
        
        loss_val = tf.maximum(pos_dist - neg_dist + margin, 0.0)
        return tf.reduce_mean(loss_val)
    return loss

def build_triplet_network(embedding_model, margin=1.0):
    input_anchor = tf.keras.Input(shape=(IMG_SIZE, IMG_SIZE, 3), name="anchor")
    input_pos    = tf.keras.Input(shape=(IMG_SIZE, IMG_SIZE, 3), name="positive")
    input_neg    = tf.keras.Input(shape=(IMG_SIZE, IMG_SIZE, 3), name="negative")
    
    # Embeddings
    emb_anchor = embedding_model(input_anchor)
    emb_pos    = embedding_model(input_pos)
    emb_neg    = embedding_model(input_neg)
    
    merged = layers.Concatenate(axis=1)([tf.expand_dims(emb_anchor, axis=1),
                                         tf.expand_dims(emb_pos, axis=1),
                                         tf.expand_dims(emb_neg, axis=1)])
    
    triplet_model = tf.keras.Model(
        inputs=[input_anchor, input_pos, input_neg], 
        outputs=merged,
        name="TripletModel"
    )
    
    triplet_model.compile(
        optimizer=tf.keras.optimizers.Adam(),
        loss=triplet_loss(margin=margin)
    )
    
    return triplet_model

## Generating Pairs and Triplets
These helper functions generate training data:

- **generate_contrastive_pairs:** For each image in the support set, a positive pair (same class) and a negative pair (different class) are generated.
- **generate_triplets:** For each anchor image, a positive image (same class) and a negative image (different class) are selected to form a triplet.

In [None]:
def generate_contrastive_pairs(support_x, support_y):

    pairs = []
    labels = []
    num_samples = support_x.shape[0]
    
    for i in range(num_samples):
        anchor = support_x[i]
        anchor_label = support_y[i]
        
        # Positive pair
        same_class_indices = tf.where(support_y == anchor_label)
        same_class_indices = tf.reshape(same_class_indices, [-1])
        same_class_indices = same_class_indices[same_class_indices != i]
        if len(same_class_indices) == 0:
            continue
        pos_idx = np.random.choice(same_class_indices)
        
        pairs.append((anchor, support_x[pos_idx]))
        labels.append(1.0)
        
        # Negative pair
        diff_class_indices = tf.where(support_y != anchor_label)
        diff_class_indices = tf.reshape(diff_class_indices, [-1])
        neg_idx = np.random.choice(diff_class_indices)
        
        pairs.append((anchor, support_x[neg_idx]))
        labels.append(0.0)
    
    # Convert to arrays
    pairs_a = []
    pairs_b = []
    for (a, b) in pairs:
        pairs_a.append(a.numpy())
        pairs_b.append(b.numpy())
    
    return np.array(pairs_a), np.array(pairs_b), np.array(labels)

def generate_triplets(support_x, support_y):
    triplets = []
    num_samples = support_x.shape[0]
    
    for i in range(num_samples):
        anchor = support_x[i]
        anchor_label = support_y[i]
        
        # Positive
        same_class_indices = tf.where(support_y == anchor_label)
        same_class_indices = tf.reshape(same_class_indices, [-1])
        same_class_indices = same_class_indices[same_class_indices != i]
        if len(same_class_indices) == 0:
            continue
        pos_idx = np.random.choice(same_class_indices)
        
        # Negative
        diff_class_indices = tf.where(support_y != anchor_label)
        diff_class_indices = tf.reshape(diff_class_indices, [-1])
        neg_idx = np.random.choice(diff_class_indices)
        
        triplets.append((anchor, support_x[pos_idx], support_x[neg_idx]))
    
    # Convert to arrays
    anchors, positives, negatives = [], [], []
    for (a, p, n) in triplets:
        anchors.append(a.numpy())
        positives.append(p.numpy())
        negatives.append(n.numpy())
    
    return np.array(anchors), np.array(positives), np.array(negatives)

## Training the Siamese (Contrastive) Network
The following cell demonstrates how to generate an episode, create contrastive pairs, and then train the Siamese network over several episodes.

In [None]:
N_WAY = 3
K_SHOT = 3
N_QUERY = 4

embedding_model = build_embedding_model((IMG_SIZE, IMG_SIZE, 3))

# ====  Contrastive (Siamese) Example ====
siamese_model = build_siamese_network(embedding_model, margin=1.0)

for episode in range(20):
    # Create an episode
    support_x, support_y, query_x, query_y = create_episode(
        class_to_imgs_dict=class_to_images,
        class_names=class_names,
        n_way=N_WAY,
        k_shot=K_SHOT,
        n_query_per_class=N_QUERY
    )
    
    X_a, X_b, y_pairs = generate_contrastive_pairs(support_x, support_y)
    
    siamese_model.fit(
        x=[X_a, X_b],
        y=y_pairs,
        batch_size=4,
        epochs=2,
        verbose=1
    )

## Training the Triplet Network
In this cell, we generate triplets (anchor, positive, negative) and train the triplet network with the custom triplet loss.

In [None]:
# ==== Triplet Example ====
embedding_model_2 = build_embedding_model((IMG_SIZE, IMG_SIZE, 3))
triplet_model = build_triplet_network(embedding_model_2, margin=1.0)

for episode in range(20):
    support_x, support_y, query_x, query_y = create_episode(
        class_to_imgs_dict=class_to_images,
        class_names=class_names,
        n_way=N_WAY,
        k_shot=K_SHOT,
        n_query_per_class=N_QUERY
    )
    
    anchors, positives, negatives = generate_triplets(support_x, support_y)
    
    dummy_y = np.zeros((anchors.shape[0],))
    
    triplet_model.fit(
        x=[anchors, positives, negatives],
        y=dummy_y,
        batch_size=4,
        epochs=2,
        verbose=1
    )