In [None]:
from keras import backend as K
import keras
from keras.layers import Input, Dense, Lambda, Dropout
from keras.models import Model
from keras.optimizers import Adam
import numpy as np
import json
import numpy as np
import h5py
import matplotlib.pyplot as plt
import tqdm
from skimage.feature import ORB
from skimage.color import rgb2gray
import os
import random


# Load Data

In [None]:
X_train = np.load("./Data/Dataset/X_train.npy")
X_test = np.load("./Data/Dataset/X_test.npy")
X_validate = np.load("./Data/Dataset/X_validate.npy")

Y_train = np.load("./Data/Dataset/Y_train.npy")
Y_test = np.load("./Data/Dataset/Y_test.npy")
Y_validate = np.load("./Data/Dataset/Y_validate.npy")

print(X_train.shape, X_test.shape, X_validate.shape)

# Build Model

In [None]:
# Dimensions of the input vectors
input_dim =  X_train.shape[2]

# Define the Siamese network architecture
input_a = Input(shape=(input_dim,))
input_b = Input(shape=(input_dim,))

# Shared weights between the two networks
shared_dense_layer_1 = Dense(100, activation='sigmoid',kernel_regularizer=keras.regularizers.l2(0.001))
shared_dense_layer_2 = Dense(50, activation='sigmoid',kernel_regularizer=keras.regularizers.l2(0.001)) 
shared_dense_layer_3 = Dropout(0.2)
shared_dense_layer_4 = Dense(10, activation='sigmoid')


# Stacking the layers
encoded_a = shared_dense_layer_4(shared_dense_layer_3(shared_dense_layer_2(shared_dense_layer_1(input_a))))
encoded_b = shared_dense_layer_4(shared_dense_layer_3(shared_dense_layer_2(shared_dense_layer_1(input_b))))


# Define the Euclidean distance between the encoded vectors
def euclidean_distance(vects):
    x, y = vects
    sum_square = K.sum(K.square(x - y), axis=1, keepdims=True)
    return K.sqrt(K.maximum(sum_square, K.epsilon()))

# Define the Cosine distance between the encoded vectors
def cosine_distance(vects):
    x, y = vects
    x = K.l2_normalize(x, axis=-1)
    y = K.l2_normalize(y, axis=-1)
    return -K.mean(x * y, axis=-1, keepdims=True)

distance = Lambda(euclidean_distance)([encoded_a, encoded_b])

# Output layer with a sigmoid activation function
prediction = Dense(1, activation='sigmoid')(distance)

# Define the Siamese model
model = Model(inputs=[input_a, input_b], outputs=prediction)

# Compile the model
model.compile(loss='mean_squared_error', optimizer='Adam', metrics=['accuracy'])

# Display the model summary
model.summary()


# Train the Model

In [None]:
# Train the model
history = model.fit([X_train[:, 0], X_train[:, 1]], Y_train, 
          validation_data=([X_validate[:, 0], X_validate[:, 1]], Y_validate), 
          batch_size=32, epochs=10)

In [None]:
if False:
    model.save('model_lite.h5')

### Plot History loss

In [None]:
import matplotlib.pyplot as plt
# Plot the training and validation loss
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

print(np.count_nonzero(Y_train))
print(Y_train.shape)


# Evaluation of the Model

In [None]:

# prediction
y_pred = model.predict([X_test[:, 0], X_test[:, 1]])

# Define a custom function to calculate accuracy within a margin of error
def calculate_accuracy(y_true, y_pred, margin):
    correct_predictions = 0
    for true, pred in zip(y_true, y_pred):
        if abs(true - pred) < margin:
            correct_predictions += 1
    return correct_predictions / len(y_true)

# Set a margin of error
margin_of_error = 0.0005  # Adjust as needed based on the tolerance level

# Calculate accuracy within the margin of error
accuracy_within_margin = calculate_accuracy(Y_test, y_pred.flatten(), margin_of_error)

# Display the calculated accuracy within the margin of error
print(f"Accuracy within a margin of {margin_of_error}: {accuracy_within_margin}")

# Try the Model

In [None]:
# Load the model
# loaded_model = keras.models.load_model('my_siamese_model')


In [None]:
# map
with open("Data/database/database_lite.json","r") as f:
    m_idx = json.load(f)
    m_imgs = np.array(m_idx["im_paths"])
    m_loc=np.array(m_idx["loc"])

# query
with open("Data/query/query_lite.json","r") as f:
    q_idx=json.load(f)
    q_imgs=np.array(q_idx["im_paths"])
    q_loc=np.array(q_idx["loc"])

def load_dict(file_path):
    imgs = np.load(file_path+"img_path.npy")
    descriptors = np.load(file_path+"descriptors.npy")

    return dict(zip(imgs, descriptors))

img2vec = load_dict("./Data/Dataset/")

In [None]:

def get_descriptors(img_path, n_keypoints=10):
    
    # Initialize the ORB descriptor
    descriptor_extractor = ORB(n_keypoints=n_keypoints)

    #img = Image.open(os.path.join('data_image_retrieval/', img_name)).convert()
    #img = np.asarray(img)
    img = plt.imread(os.path.join('Data/', img_path))
    img = rgb2gray(img)

    # Extract ORB descriptors
    descriptor_extractor.detect_and_extract(img)

    descriptor = descriptor_extractor.descriptors  # descriptors (the feature vectors)

    return descriptor.reshape(-1)

In [None]:
def retrieve_images(query_image_path, n_images_to_retrieve, img2vec, m_imgs, model):
    # Process the query image and get its vector
    query_vector = get_descriptors(query_image_path,10)  # Replace with your image processing function

    
    # Initialize a list to store the relevance scores
    relevance_scores = []

    # Iterate through all the image paths in m_imgs
    for image_path in m_imgs:
        # Get the vector from the dictionary img2vec
        image_vector = img2vec[image_path]

        # Predict the relevance score using the model
        relevance_score = model.predict([np.array([query_vector]), np.array([image_vector])])[0][0]

        # Append the image path and relevance score to the list
        relevance_scores.append((image_path, relevance_score))
        
    # Sort the list of relevance scores by the score in descending order
    relevance_scores.sort(key=lambda x: x[1], reverse=True)

    # Return the top n_images_to_retrieve relevant image paths with their scores
    return relevance_scores[:n_images_to_retrieve]

In [None]:
test_image = q_imgs[144]
retrieved_images = retrieve_images(test_image,50,img2vec,m_imgs,model)

In [None]:
plt.imshow(plt.imread('Data/' + q_imgs[144]))

In [None]:

import matplotlib.pyplot as plt
import matplotlib.image as mpimg


rt_img  = retrieved_images
retrieved_images = rt_img[:10]
print(retrieved_images)
# Assuming you have an array of image paths named image_paths
# image_paths = [...]  # Your array of image paths

# Define the number of rows and columns for subplots
nrows = 2
ncols = 5


plt.imshow(plt.imread('Data/' + test_image))

# Create a new figure and set the size
fig, axes = plt.subplots(nrows=nrows, ncols=ncols, figsize=(20, 8))

# Loop through the image paths and plot the images
for i, image_path in enumerate(retrieved_images):
    # Read the image
    img = mpimg.imread('Data/'+image_path[0])

    # Determine the subplot index
    ax = axes[i // ncols, i % ncols]

    # Plot the image
    ax.imshow(img)
    ax.set_title(f'Image {i+1}')

# Hide any empty subplots
for i in range(len(image_path), nrows*ncols):
    ax = axes[i // ncols, i % ncols]
    ax.axis('off')

# Display the figure
plt.tight_layout()
plt.show()


# Precision

In [None]:
def precision(relevant, retrieved):
    tp = set(retrieved).intersection(relevant)
    return len(tp)/len(retrieved)

def precision_at_k(relevant, retrieved, k):
    if k<1 or len(retrieved) < k:
        return -1
        
    return precision(relevant, retrieved[:k])

In [None]:
k = 50
n = 10

# loading the relevance judgements
with h5py.File("Data/london_gt.h5","r") as f:
    fovs = f["fov"][:]
    sim = f["sim"][:].astype(np.uint8)

np.random.seed(0)
query_indices = np.random.choice(len(q_imgs), size=n, replace=False)

precision_scores = []

for q_id in query_indices:
    test_image = q_imgs[q_id]

    retrieved_images = retrieve_images(test_image,k,img2vec,m_imgs,model)
    retrieved_inx = [list(m_imgs).index(map_path) for map_path, _ in retrieved_images]

    is_relevant = sim[q_id, :] == 1
    relevant_inx = np.arange(len(is_relevant))[is_relevant]

    precision_scores.append(precision_at_k(relevant_inx, retrieved_inx, k))

In [None]:
print(precision_scores)