In [None]:
import os
import zipfile

import random
import math
from tqdm import tqdm

import cv2
import numpy as np
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Flatten, Dense,\
                                    Dropout, BatchNormalization, Input
from tensorflow.keras.metrics import Mean, CosineSimilarity
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import plot_model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications.efficientnet import EfficientNetB7, preprocess_input

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

import shutil

# Define the path to the new directory
new_dir_path = "/content/input"

# Check if the directory already exists
if not os.path.exists(new_dir_path):
    # If the directory doesn't exist, create it
    os.makedirs(new_dir_path)
    print(f"Directory '{new_dir_path}' created successfully!")
else:
    print(f"Directory '{new_dir_path}' already exists!")

# Copy Face Data.rar
shutil.copy2('/content/drive/MyDrive/Face Data.rar', '/content/input/Face Data.rar')

# Copy Extracted Faces.zip
shutil.copy2('/content/drive/MyDrive/Extracted Faces.zip', '/content/input/Extracted Faces.zip')



In [None]:
!pip install rarfile

print("Current working directory:", os.getcwd())

import zipfile
from rarfile import RarFile

face_data_rar_file = './input/Face Data.rar'
extracted_faces_zip_file = './input/Extracted Faces.zip'

extract_to = './input/'

with zipfile.ZipFile(extracted_faces_zip_file, 'r') as zip_ref:
    zip_ref.extractall(extract_to)

with RarFile(face_data_rar_file, 'r') as rar_ref:
    rar_ref.extractall(extract_to)

extracted_files = os.listdir(extract_to)
print(f"Extracted files: {extracted_files}")

In [None]:
FACE_DATA_PATH = './input/Face Dataset'
EXTRACTED_FACES_PATH = './input/Extracted Faces'

In [None]:
def explore_folder(folder_path):
    print(f'Exploring {os.path.basename(folder_path)}')
    image_shapes = []
    num_images = 0
    num_people = 0
    for folder_name in os.listdir(folder_path):
        subfolder_path = os.path.join(folder_path, folder_name)
        for image_name in os.listdir(subfolder_path):
            image_path = os.path.join(subfolder_path, image_name)
            image = cv2.imread(image_path)
            image_shapes.append(image.shape)
            num_images += 1
        num_people +=1
    print(f'Unique image shapes in: {set(image_shapes)}')
    print(f"Total number of images: {num_images}")
    print(f"Total number of people: {num_people}")
    return image_shapes, num_images, num_people

In [None]:
explore_folder(FACE_DATA_PATH);

In [None]:
explore_folder(EXTRACTED_FACES_PATH);

In [None]:
def visualize_sample_images(folder_path):
    num_images = len(os.listdir(folder_path))
    num_rows = (num_images + 4) // 5
    num_cols = min(num_images, 5)

    fig, axes = plt.subplots(num_rows, num_cols, figsize=(10, 3 * num_rows))

    for i, image_name in enumerate(os.listdir(folder_path)):
        image_path = os.path.join(folder_path, image_name)
        sample_image = cv2.imread(image_path)
        sample_image = cv2.cvtColor(sample_image, cv2.COLOR_BGR2RGB)

        row = i // num_cols
        col = i % num_cols
        ax = axes[row, col] if num_rows > 1 else axes[col]

        ax.imshow(sample_image)
        ax.axis('off')

    for ax in axes.flat[num_images:]:
        ax.remove()

    plt.suptitle(f'Person ID: {os.path.basename(folder_path)}')

    plt.tight_layout()
    plt.show()

In [None]:
person_id = random.choice(os.listdir(FACE_DATA_PATH))
folder_path = os.path.join(FACE_DATA_PATH, person_id)

print(f'Samples from {os.path.basename(FACE_DATA_PATH)}')

visualize_sample_images(folder_path)

In [None]:
print(f'Samples from {os.path.basename(EXTRACTED_FACES_PATH)}')

if person_id in os.listdir(EXTRACTED_FACES_PATH):
    folder_path = os.path.join(EXTRACTED_FACES_PATH, person_id)
    visualize_sample_images(folder_path)
else:
    print(f'There is no person {person_id} in this folder')

In [None]:
# create dataset
import shutil
DATASET = './images/output_dataset'

if os.path.exists(DATASET):
    shutil.rmtree(DATASET)

os.makedirs(DATASET)

def copy_to_output_dataset(input_path, output_path):
    for person_folder in os.listdir(input_path):
        person_folder_path = os.path.join(input_path, person_folder)).

        if os.path.isdir(person_folder_path):
            output_person_folder = os.path.join(output_path, person_folder)
            if not os.path.exists(output_person_folder):
                os.makedirs(output_person_folder)

            for image_file in os.listdir(person_folder_path):
                if image_file.endswith('.jpg'):
                    src_image_path = os.path.join(person_folder_path, image_file)
                    dst_image_path = os.path.join(output_person_folder, image_file)
                    if os.path.exists(dst_image_path):
                        base, ext = os.path.splitext(dst_image_path)
                        dst_image_path = f"{base}_1{ext}"

                    shutil.copy(src_image_path, dst_image_path)

copy_to_output_dataset(FACE_DATA_PATH, DATASET)

copy_to_output_dataset(EXTRACTED_FACES_PATH, DATASET)

In [None]:
explore_folder(DATASET);

In [None]:
visualize_sample_images(os.path.join(DATASET, '500'))

In [None]:
visualize_sample_images(os.path.join(DATASET, '1500'))

In [None]:
def triplets(folder_paths, max_triplets=7):
    anchor_images = []
    positive_images = []
    negative_images = []

    for person_folder in folder_paths:
        images = [os.path.join(person_folder, img) for img in os.listdir(person_folder)]
        num_images = len(images)
        if num_images < 2:
            continue

        random.shuffle(images)

        for _ in range(max(num_images - 1, max_triplets)):
            # select the anchor randomly
            anchor_image = random.choice(images)
            # select the positive randomly, excluding the anchor
            positive_image = random.choice([x for x in images if x != anchor_image])
            # select the negative randomly, excluding the anchor
            negative_folder = random.choice([x for x in folder_paths if x != person_folder])
            negative_image = random.choice([os.path.join(negative_folder, img)
                                            for img in os.listdir(negative_folder)])

            anchor_images.append(anchor_image)
            positive_images.append(positive_image)
            negative_images.append(negative_image)

    return anchor_images, positive_images, negative_images

In [None]:
person_folders = [os.path.join(DATASET, folder_name)
                  for folder_name in os.listdir(DATASET)]

anchors, positives, negatives = triplets(person_folders)

In [None]:
def split_triplets(anchors, positives, negatives, validation_split=0.2):
    triplets = list(zip(anchors, positives, negatives))

    train_triplets, val_triplets = train_test_split(triplets,
                                                    test_size=validation_split,
                                                    random_state=42)

    return train_triplets, val_triplets

In [None]:
train_triplets, val_triplets = split_triplets(anchors,
                                              positives,
                                              negatives)
len(train_triplets), len(val_triplets)

In [None]:
def load_and_preprocess_image(image_path, expand_dims=False):
    image = cv2.imread(image_path)

    if image is None:
        raise ValueError(f"Error reading image from path: {image_path}")

    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (128, 128))

    if expand_dims:
        image = np.expand_dims(image, axis=0)

    return image

In [None]:
# balance data and generator batch
from sklearn.metrics import roc_curve, auc, confusion_matrix, precision_score, recall_score, f1_score, classification_report
from imblearn.over_sampling import RandomOverSampler
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import torch
import torch.nn as nn
import random

def balance_data(train_triplets):
    anchors, positives, negatives = zip(*train_triplets)
    labels = [1] * len(positives) + [0] * len(negatives)
    data = list(zip(anchors + anchors, positives + negatives))

    ros = RandomOverSampler(random_state=42)
    data_resampled, labels_resampled = ros.fit_resample(data, labels)
    anchors_resampled, positives_negatives_resampled = zip(*data_resampled)
    positives_resampled = [pn for pn, l in zip(positives_negatives_resampled, labels_resampled) if l == 1]
    negatives_resampled = [pn for pn, l in zip(positives_negatives_resampled, labels_resampled) if l == 0]

    return list(zip(anchors_resampled[:len(positives_resampled)], positives_resampled, negatives_resampled))

def batch_generator(triplets, batch_size=32, augment=True):
    triplets = balance_data(triplets)
    total_triplets = len(triplets)
    random_indices = list(range(total_triplets))
    random.shuffle(random_indices)

    datagen = ImageDataGenerator(
        rotation_range=10,
        width_shift_range=0.05,
        height_shift_range=0.05,
        horizontal_flip=True,
        zoom_range=0.2
    )

    for i in range(0, total_triplets, batch_size):
        batch_indices = random_indices[i:i + batch_size]
        batch_triplets = [triplets[j] for j in batch_indices]

        anchor_batch = []
        positive_batch = []
        negative_batch = []

        for triplet in batch_triplets:
            anchor, positive, negative = triplet
            anchor_image = load_and_preprocess_image(anchor)
            positive_image = load_and_preprocess_image(positive)
            negative_image = load_and_preprocess_image(negative)

            if augment:
                anchor_image = datagen.random_transform(anchor_image)
                positive_image = datagen.random_transform(positive_image)
                negative_image = datagen.random_transform(negative_image)

            anchor_batch.append(anchor_image)
            positive_batch.append(positive_image)
            negative_batch.append(negative_image)

        yield [np.array(anchor_batch), np.array(positive_batch), np.array(negative_batch)]

In [None]:
def visualize_triplets(triplets):
  anchor_batch, positive_batch, negative_batch = triplets

  for i in range(len(anchor_batch)):
    plt.figure(figsize=(15, 5))

    plt.subplot(1, 3, 1)
    plt.title("Anchor")
    plt.imshow(anchor_batch[i])
    plt.axis('off')

    plt.subplot(1, 3, 2)
    plt.title("Positive")
    plt.imshow(positive_batch[i])
    plt.axis('off')

    plt.subplot(1, 3, 3)
    plt.title("Negative")
    plt.imshow(negative_batch[i])
    plt.axis('off')

    plt.show()

In [None]:
example_triplets = [next(batch_generator(train_triplets, 5))]
visualize_triplets(example_triplets[0])

In [None]:
def get_embedding(input_shape, num_layers_to_unfreeze=25):
  base_model = EfficientNetB7(
      weights='imagenet',  # Load pre-trained weights
      input_shape=input_shape,  # Specify input shape
      include_top=False,  # Exclude the top classification layers
      pooling='avg'  # Use average pooling for feature extraction
  )

  for i in range(len(base_model.layers) - num_layers_to_unfreeze):
    base_model.layers[i].trainable = False

  embedding = tf.keras.models.Sequential([
      base_model,
      Flatten(),
      Dense(512, activation='relu'),
      BatchNormalization(),
      Dropout(0.3),
      Dense(256, activation='relu'),
      BatchNormalization(),
      Dropout(0.3),
      Dense(128, activation='relu'),
      BatchNormalization(),
      Dense(128)
  ], name='Embedding')

  return embedding

In [None]:
input_shape = (128, 128, 3)

embedding = get_embedding(input_shape)
embedding.summary()

In [None]:
@tf.keras.saving.register_keras_serializable()
class DistanceLayer(Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return ap_distance, an_distance

anchor_input = Input(name='anchor', shape=input_shape)
positive_input = Input(name='positive', shape=input_shape)
negative_input = Input(name='negative', shape=input_shape)

distances = DistanceLayer()(
    embedding(preprocess_input(anchor_input)),
    embedding(preprocess_input(positive_input)),
    embedding(preprocess_input(negative_input))
)

siamese_net = Model(
    inputs=[anchor_input,
            positive_input,
            negative_input],
    outputs=distances
)

In [None]:
plot_model(siamese_net, show_shapes=True, show_layer_names=True)

In [None]:
siamese_net.summary()

In [None]:
@tf.keras.saving.register_keras_serializable()
class SiameseModel(Model):
    def __init__(self, siamese_net, margin=0.5):
        super().__init__()
        self.siamese_net = siamese_net
        self.margin = margin
        self.loss_tracker = Mean(name='loss')
        self.accuracy_tracker = Mean(name='accuracy')

    def call(self, inputs):
        return self.siamese_net(inputs)

    def train_step(self, data):
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)

        gradients = tape.gradient(loss, self.siamese_net.trainable_weights)

        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_net.trainable_weights)
        )

        self.loss_tracker.update_state(loss)

        accuracy = self._compute_accuracy(data)
        self.accuracy_tracker.update_state(accuracy)

        return {'loss': self.loss_tracker.result(),
                'accuracy': self.accuracy_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)

        self.loss_tracker.update_state(loss)

        accuracy = self._compute_accuracy(data)
        self.accuracy_tracker.update_state(accuracy)

        return {'loss': self.loss_tracker.result(),
                'accuracy': self.accuracy_tracker.result()}

    def _compute_loss(self, data):
        ap_distance, an_distance = self.siamese_net(data)

        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, .0)
        return loss

    def _compute_accuracy(self, data):
        ap_distance, an_distance = self.siamese_net(data)
        accuracy = tf.reduce_mean(tf.cast(ap_distance < an_distance,
                                          tf.float32))
        return accuracy

    @property
    def metrics(self):
        return [self.loss_tracker, self.accuracy_tracker]

    def get_config(self):
        base_config = super().get_config()
        config = {
            'siamese_net': tf.keras.saving.serialize_keras_object(self.siamese_net),
            'margin': tf.keras.saving.serialize_keras_object(self.margin),
            'loss_tracker': tf.keras.saving.serialize_keras_object(self.loss_tracker),
            'accuracy_tracker': tf.keras.saving.serialize_keras_object(self.accuracy_tracker),
        }
        return {**base_config, **config}

    @classmethod
    def from_config(cls, config):
        config['siamese_net'] = tf.keras.saving.deserialize_keras_object(config.pop('siamese_net'))
        config['margin'] = tf.keras.saving.deserialize_keras_object(config.pop('margin'))
        config['loss_tracker'] = tf.keras.saving.deserialize_keras_object(config.pop('loss_tracker'))
        config['accuracy_tracker'] = tf.keras.saving.deserialize_keras_object(config.pop('accuracy_tracker'))
        return cls(**config)

In [None]:
def train_model(model,
                train_triplets,
                epochs,
                batch_size,
                val_triplets,
                patience,
                delta=0.0001):

    best_val_accuracy = 0
    best_val_loss = float('inf')
    temp_patience = patience
    history = {
        'loss': [],
        'val_loss': [],
        'accuracy': [],
        'val_accuracy': []
    }

    train_steps_per_epoch = math.ceil(len(train_triplets) / batch_size)
    val_steps_per_epoch = math.ceil(len(val_triplets) / batch_size)

    for epoch in range(epochs):
        print(f'Epoch {epoch+1}/{epochs}')
        train_loss = 0.
        train_accuracy = 0.
        val_loss = 0.
        val_accuracy = 0.

        with tqdm(total=train_steps_per_epoch, desc='Training') as pbar:
            for batch in batch_generator(train_triplets, batch_size=batch_size):
                loss, accuracy = model.train_on_batch(batch)
                train_loss += loss
                train_accuracy += accuracy

                pbar.update()
                pbar.set_postfix({'Loss': loss, 'Accuracy': accuracy})

        with tqdm(total=val_steps_per_epoch, desc='Validation') as pbar:
            for batch in batch_generator(val_triplets, batch_size=batch_size):
                loss, accuracy = model.test_on_batch(batch)
                val_loss += loss
                val_accuracy += accuracy

                pbar.update()
                pbar.set_postfix({'Loss': loss, 'Accuracy': accuracy})

        train_loss /= train_steps_per_epoch
        train_accuracy /= train_steps_per_epoch
        val_loss /= val_steps_per_epoch
        val_accuracy /= val_steps_per_epoch

        history['loss'].append(train_loss)
        history['accuracy'].append(train_accuracy)
        history['val_loss'].append(val_loss)
        history['val_accuracy'].append(val_accuracy)

        print(f'\nTrain Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
        print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}\n')


        if val_accuracy > best_val_accuracy and val_loss < best_val_loss:
            best_val_accuracy = val_accuracy
            best_val_loss = val_loss
            model.layers[0].layers[3].save_weights('best_model.weights.h5')

        if val_loss - best_val_loss > delta:
            temp_patience -= 1
            if temp_patience == 0:
                print('Early stopping: Validation loss did not improve.')
                break
        else:
            best_val_loss = val_loss
            temp_patience = patience

    return model, history


In [None]:
siamese_model = SiameseModel(siamese_net)
siamese_model.compile(optimizer=Adam(0.001))

siamese_model, history = train_model(siamese_model,
                                     train_triplets=train_triplets,
                                     epochs=200,
                                     batch_size=128,
                                     val_triplets=val_triplets,
                                     patience=5)

In [None]:
plt.figure(figsize=(15, 5))

plt.subplot(121)
plt.plot(history['loss'])
plt.plot(history['val_loss'])
plt.title('Model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['TRAIN', 'VAL'], loc='lower right')

plt.subplot(122)
plt.plot(history['accuracy'])
plt.plot(history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['TRAIN', 'VAL'], loc='lower right')

plt.show()

In [None]:
sample = next(batch_generator(val_triplets, 10))
anchor, positive, negative = sample

preprocessed_anchor = preprocess_input(anchor)
preprocessed_positive = preprocess_input(positive)
preprocessed_negative = preprocess_input(negative)

anchor_embedding = embedding(preprocessed_anchor)
positive_embedding = embedding(preprocessed_positive)
negative_embedding = embedding(preprocessed_negative)

cosine_similarity = CosineSimilarity()

positive_similarity = cosine_similarity(anchor_embedding, positive_embedding)
print(f"positive_similarity: {positive_similarity}")

negative_similarity = cosine_similarity(anchor_embedding, negative_embedding)
print(f"negative_similarity: {negative_similarity}")

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns

def plot_confusion_matrix(model, val_triplets):
    y_true = []
    y_pred = []
    for batch in batch_generator(val_triplets, batch_size=1, augment=False):
        ap_distance, an_distance = model.predict_on_batch(batch)
        y_true.append(1)
        y_true.append(0)
        y_pred.append(1 if ap_distance < an_distance else 0)
        y_pred.append(0 if ap_distance < an_distance else 1)

    cm = confusion_matrix(y_true, y_pred)
    sns.heatmap(cm, annot=True, fmt='d')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.show()
plot_confusion_matrix(siamese_model, val_triplets)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc
from sklearn.metrics import RocCurveDisplay

def compute_roc_auc(model, val_triplets):
    y_true = []
    y_scores = []

    for batch in batch_generator(val_triplets, batch_size=1, augment=False):
        ap_distance, an_distance = model.predict_on_batch(batch)

        # true label：1 is positive sample，0 is negative sample
        y_true.append(1)
        y_scores.append(-ap_distance)  # Negative distance means higher similarity
        y_true.append(0)
        y_scores.append(-an_distance)

    fpr, tpr, thresholds = roc_curve(y_true, y_scores)
    roc_auc = auc(fpr, tpr)

    return fpr, tpr, roc_auc

def plot_roc_curve(fpr, tpr, roc_auc):
    plt.figure()
    plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic')
    plt.legend(loc='lower right')
    plt.show()

fpr, tpr, roc_auc = compute_roc_auc(siamese_model, val_triplets)
print(f"AUC: {roc_auc}")

plot_roc_curve(fpr, tpr, roc_auc)

In [None]:
from sklearn.metrics import mean_squared_error

mse = mean_squared_error(anchor_embedding, positive_embedding)
print(f"Mean Squared Error between Anchor and Positive: {mse}")

mse = mean_squared_error(anchor_embedding, negative_embedding)
print(f"Mean Squared Error between Anchor and Negative: {mse}")


In [None]:
import os
import zipfile
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.efficientnet import preprocess_input
from tensorflow.keras.metrics import CosineSimilarity
from tensorflow.keras.models import load_model
from google.colab import files

def load_and_preprocess_image(image_path, target_size=(128, 128)):
    img = image.load_img(image_path, target_size=target_size)
    img_array = image.img_to_array(img)
    return preprocess_input(img_array)

def prepare_image(image_path):
    preprocessed_query_image = load_and_preprocess_image(image_path)
    preprocessed_query_image = np.expand_dims(preprocessed_query_image, axis=0)
    return preprocess_input(preprocessed_query_image)

def get_embedding(input_shape, num_layers_to_unfreeze=25):
    base_model = tf.keras.applications.EfficientNetB7(
        weights='imagenet',
        input_shape=input_shape,
        include_top=False,
        pooling='avg'
    )

    for i in range(len(base_model.layers) - num_layers_to_unfreeze):
        base_model.layers[i].trainable = False

    embedding = tf.keras.models.Sequential([
        base_model,
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(512, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Dense(256, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dense(128)
    ], name='Embedding')

    return embedding

def find_matching_images(directory_path, model_weights_path='best_model.weights.h5', threshold=0.5):
    input_shape = (128, 128, 3)
    embedding = get_embedding(input_shape)
    embedding.load_weights(model_weights_path)

    for folder_name in os.listdir(directory_path):
        print(folder_name)
        subfolder_path = os.path.join(directory_path, folder_name)
        query_image_folder = os.path.join(subfolder_path, "query")
        query_image_name = os.listdir(query_image_folder)[0]
        query_image_path = os.path.join(query_image_folder, query_image_name)
        datasets_path = os.path.join(subfolder_path, "support")

        query_image = prepare_image(query_image_path)
        query_embedding = embedding.predict(query_image)

        cosine_similarity = CosineSimilarity()
        best_match = None
        best_similarity = -1

        for image_name in os.listdir(datasets_path):
            image_path = os.path.join(datasets_path, image_name)

            if image_path.endswith(".jpg"):
                support_image = prepare_image(image_path)
                support_embedding = embedding.predict(support_image)

                similarity = cosine_similarity(query_embedding, support_embedding)

                if similarity > best_similarity:
                    best_similarity = similarity
                    best_match_path = image_path

        if best_similarity >= threshold:
            print(f"Similarity: {best_similarity}")
            print(f"Query Image Path: {query_image_path}")
            print(f"Best match Path: {best_match_path}")
            fig, axs = plt.subplots(1, 2, figsize=(10, 5))
            fig.suptitle("Query Image and Best Match")
            axs[0].imshow(plt.imread(query_image_path))
            axs[0].set_title("Query Image")
            axs[1].imshow(plt.imread(best_match_path))
            axs[1].set_title("Best Match")
            plt.show()
        else:
            print("No match found")

def upload_and_test(model_weights_path='best_model.weights.h5', threshold=0.5):
    uploaded = files.upload()
    zip_file_path = list(uploaded.keys())[0]
    directory_path = "/content/input"

    if not os.path.exists(directory_path):
        os.makedirs(directory_path)

    # Extract the uploaded zip file
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        zip_ref.extractall(directory_path)
    print(f"Extracted files to '{directory_path}'.")

    extracted_folder_name = os.path.splitext(zip_file_path)[0]
    directory_path = os.path.join(directory_path, extracted_folder_name)
    print(f"Updated directory path: {directory_path}")

    if not os.path.exists(directory_path):
        print(f"Error: Directory '{directory_path}' does not exist.")
        return

    find_matching_images(directory_path, model_weights_path, threshold)

In [None]:
upload_and_test()