In [None]:
import tensorflow as tf
import tensorflow_addons as tfa
import os
import random
import numpy as np
from tensorflow.keras.applications.vgg16 import VGG16 
from tensorflow.keras.preprocessing.image import load_img, img_to_array

In [None]:
# Set the random seed for reproducibility
random.seed(42)
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
# Step 1: Dataset Preparation
dataset_path = "/kaggle/input/d/adriankucharski12345/nordland-train/Nordland_Train"  # Replace with the actual path to the Nordland dataset
seasons = ["summer", "spring", "winter", "fall"]
image_size = (224, 224)  # Resize images to this size

In [None]:
# Step 2: Data Preprocessing
def preprocess_image(image_path):
    img = load_img(image_path, target_size=image_size)
    img = img_to_array(img)
    img = img / 255.0  # Normalize pixel values to the range [0, 1]
    return img

In [None]:
# Function to generate anchor, positive, negative triplets
def generate_triplets(dataset_path):
    seasons = ["summer", "spring", "winter", "fall"]
    triplets = []

    # Loop over the seasons
    for season in seasons:
        season_dir = os.path.join(dataset_path, season)
        images = os.listdir(season_dir)

        # Generate anchor, positive, negative triplets for each image
        for i in range(len(images)):
            anchor_image = os.path.join(season_dir, images[i])
            positive_season = random.choice(seasons)
            while positive_season == season:
                positive_season = random.choice(seasons)
            positive_dir = os.path.join(dataset_path, positive_season)
            positive_image = os.path.join(positive_dir, images[i])

            # Randomly select a negative image from a different season
            negative_season = random.choice(seasons)
            while negative_season == season:
                negative_season = random.choice(seasons)
            negative_dir = os.path.join(dataset_path, negative_season)
            negative_images = os.listdir(negative_dir)
            negative_image = os.path.join(negative_dir, random.choice(negative_images))

            triplets.append((anchor_image, positive_image, negative_image, season, positive_season, negative_season))

    return triplets

In [None]:
triplets = generate_triplets(dataset_path)

In [None]:
# Step 4: Triplet Network Architecture
base_model = VGG16(weights="imagenet", include_top=False, input_shape=(224, 224, 3))
flatten = tf.keras.layers.Flatten()(base_model.output)
embeddings = tf.keras.layers.Dense(256)(flatten)
triplet_model = tf.keras.Model(inputs=base_model.input, outputs=embeddings)

In [None]:
# Define the optimizer
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

# Define the Online Triplet Loss
def triplet_loss(anchor_embeddings, positive_embeddings, negative_embeddings, margin=0.1):
    anchor_positive_dist = tf.reduce_sum(tf.square(anchor_embeddings - positive_embeddings), axis=1)
    anchor_negative_dist = tf.reduce_sum(tf.square(anchor_embeddings - negative_embeddings), axis=1)
    loss = tf.maximum(anchor_positive_dist - anchor_negative_dist + margin, 0.0)
    loss = tf.reduce_mean(loss)
    return loss


In [None]:
import numpy as np
from tqdm import tqdm

batch_size = 64
epochs = 5

num_batches = len(triplets) // batch_size

for epoch in range(epochs):
    np.random.shuffle(triplets)
    
    epoch_loss = 0.0
    
    with tqdm(total=num_batches, desc=f"Epoch {epoch+1}/{epochs}", unit="batch") as pbar:
        for batch in range(num_batches):
            anchor_batch = []
            positive_batch = []
            negative_batch = []

            batch_triplets = triplets[batch*batch_size:(batch+1)*batch_size]

            for triplet in batch_triplets:
                anchor_image = triplet[0]
                positive_image = triplet[1]
                negative_image = triplet[2]

                # Preprocess and append images to the respective batches
                anchor_img = preprocess_image(anchor_image)
                positive_img = preprocess_image(positive_image)
                negative_img = preprocess_image(negative_image)

                anchor_batch.append(anchor_img)
                positive_batch.append(positive_img)
                negative_batch.append(negative_img)

            anchor_batch = np.array(anchor_batch)
            positive_batch = np.array(positive_batch)
            negative_batch = np.array(negative_batch)

            with tf.GradientTape() as tape:
                anchor_embeddings = triplet_model(anchor_batch, training=True)
                positive_embeddings = triplet_model(positive_batch, training=True)
                negative_embeddings = triplet_model(negative_batch, training=True)

                # Generate dummy labels indicating valid triplets
                #labels = np.ones((batch_size, 1))

                loss_value = triplet_loss(anchor_embeddings, positive_embeddings, negative_embeddings)

            grads = tape.gradient(loss_value, triplet_model.trainable_variables)
            optimizer.apply_gradients(zip(grads, triplet_model.trainable_variables))

            epoch_loss += loss_value.numpy()
            
            pbar.set_postfix({"Loss": epoch_loss / (batch + 1)})
            pbar.update(1)
    
    epoch_loss /= num_batches
    print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss}")

In [None]:
test_dataset_path = "/kaggle/input/d/adriankucharski12345/nordland-test/Nordland_Test"

In [None]:
def calculate_distance(feature_vector1, feature_vector2):
    # Calculate the Euclidean distance between the two feature vectors
    distance = np.linalg.norm(feature_vector1 - feature_vector2)
    return distance

In [None]:
seasons = ["summer", "spring", "winter", "fall"]

# Create lists to store the feature vectors for each season
feature_vectors = {season: [] for season in seasons}

# Process the test images and extract features
for season in seasons:
    season_path = os.path.join(test_dataset_path, season)
    image_files = sorted(os.listdir(season_path))
    
    for image_file in image_files:
        image_path = os.path.join(season_path, image_file)
        
        # Preprocess the image (e.g., resize, normalize)
        preprocessed_image = preprocess_image(image_path)
        
        # Obtain the feature vector using the trained model
        feature_vector = triplet_model.predict(np.expand_dims(preprocessed_image, axis=0), verbose=0)

        
        # Store the feature vector for the corresponding season
        feature_vectors[season].append(feature_vector)

# Perform place recognition for each query season against reference seasons
for query_season in seasons:
    correct_matches = 0
    total_evaluated_places = 0
    
    for query_feature in feature_vectors[query_season]:
        closest_distance = float('inf')
        closest_place = None
        
        # Compare the query feature vector with reference feature vectors from other seasons
        for ref_season in seasons:
            if ref_season != query_season:
                for ref_feature in feature_vectors[ref_season]:
                    # Calculate the distance between query and reference feature vectors
                    distance = calculate_distance(query_feature, ref_feature)

                    
                    if distance < closest_distance:
                        closest_distance = distance
                        closest_place = ref_season
        
        # Check if the closest place is within a 5-frame window
        if abs(seasons.index(query_season) - seasons.index(closest_place)) <= 1:
            correct_matches += 1
        
        total_evaluated_places += 1
    
    # Calculate the fraction of correct matches
    fc = correct_matches / total_evaluated_places

    
    print(f"Query Season: {query_season}, Fraction of Correct Matches: {fc}")

In [None]:
triplet_model.save("/kaggle/working/triplet_model.h5")

In [None]:
def evaluate_place_recognition(triplet_model, dataset_path, seasons):
    def extract_features(image_path):
        img = preprocess_image(image_path)
        features = triplet_model.predict(np.expand_dims(img, axis=0), verbose=0)
        return features

    for reference_season in seasons:
        reference_dir = os.path.join(dataset_path, reference_season)
        for query_season in seasons:
            query_dir = os.path.join(dataset_path, query_season)
            num_correct_matches = 0
            num_evaluated_places = 0
            for query_image in os.listdir(query_dir):
                query_image_path = os.path.join(query_dir, query_image)
                query_features = extract_features(query_image_path)

                closest_distance = float('inf')
                closest_place = None

                for reference_image in os.listdir(reference_dir):
                    reference_image_path = os.path.join(reference_dir, reference_image)
                    reference_features = extract_features(reference_image_path)

                    distance = np.linalg.norm(query_features - reference_features)

                    if distance < closest_distance:
                        closest_distance = distance
                        closest_place = reference_image

                query_index = int(query_image.split('.')[0])
                closest_index = int(closest_place.split('.')[0])

                if abs(query_index - closest_index) <= 5:
                    num_correct_matches += 1

                num_evaluated_places += 1

            fraction_correct_matches = num_correct_matches / num_evaluated_places

    print(f"For seasons: {reference_season} vs {query_season}")
    print(f"Fraction of correct matches: {fraction_correct_matches}")


In [None]:
# Usage
evaluate_place_recognition(triplet_model, test_dataset_path, seasons)