In [1]:
import glob
import os
from typing import Tuple

import numpy as np
from efficientnet_pytorch import EfficientNet
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from torchvision.io import read_image
import torch


## Embedding of the test dataset

In [None]:
TEST_DATASET_PATH = ""
NUM_WORKERS = 8
BATCH_SIZE = 128
IMAGE_SIZE = (224, 224)
MODEL_PATH = ""
EFFICIENT_NET_MODEL = "efficientnet-b0"
FEATURE_SIZE = 512
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

In [2]:
def get_path(folder, image_id):
    return os.path.join(folder, f"{image_id[0]}/{image_id[1]}/{image_id[2]}/{image_id}.jpg")

def get_id(image_path):
    return os.path.splitext(os.path.basename(image_path))[0]

In [None]:
class LandmarkDataset(Dataset):
    def __init__(self, img_dir, transform=None):
        self.img_dir = img_dir
        self.img_paths = glob.glob(os.path.join(img_dir, "*/*/*/*.jpg"))
        self.transform = transform

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx: int):
        img_path = self.img_paths[idx]
        image = read_image(img_path).float()
        if self.transform:
            image = self.transform(image)
        return image, get_id(img_path)

In [None]:
transformations = transforms.Compose(
    [
        transforms.Resize(IMAGE_SIZE),
        transforms.Normalize(
            mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],
            std=[0.229 * 255, 0.224 * 255, 0.225 * 255],
        ),
    ]
)

In [None]:
test_dataset = LandmarkDataset(TEST_DATASET_PATH, transformations)

test_loader = DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    drop_last=False,
    num_workers=NUM_WORKERS,
)

In [None]:
import torch.nn as nn


class EfficientNetBackbone(nn.Module):
    def __init__(self, feature_size: int, efficientNet: nn.Module):
        super(EfficientNetBackbone, self).__init__()

        self.efficientNet = efficientNet

    def forward(self, x):
        return self.efficientNet(x)

In [None]:
@torch.no_grad()
def extract_embeddings(
    test_loader: DataLoader, backbone: nn.Module, device: str
) -> Tuple[np.ndarray, np.ndarray]:
    log_interval = len(test_loader) // 10
    test_embeddings = []
    test_ids = []

    backbone.eval()
    for i_batch, (x, y) in enumerate(test_loader):
        x = x.to(device)

        test_embeddings.append(backbone(x))
        test_ids += y

        if i_batch % log_interval == 0:
            print(f"Extracting embedings Batch {i_batch}/{len(test_loader)}")

    return torch.cat(test_embeddings).cpu().numpy(), np.array(test_ids)


In [None]:
efficientnet = EfficientNet.from_name(EFFICIENT_NET_MODEL)

backbone = EfficientNetBackbone(feature_size=FEATURE_SIZE, efficientNet=efficientnet)

model_save = torch.load(MODEL_PATH)
backbone.load_state_dict(model_save["backbone_state_dict"])

In [None]:
test_embeddings, test_ids = extract_embeddings(test_loader, backbone, DEVICE)

## Embeddings similarity

In [None]:
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd

In [None]:
INDEX_EMBEDDINGS_PATH = "index_embeddings.npy"
INDEX_IDS_PATH = "index_ids.npy"

In [None]:
index_embeddings = np.load(INDEX_EMBEDDINGS_PATH)
index_ids = np.load(INDEX_IDS_PATH)

In [None]:
def find_best_neighbors(index_ids, query_ids, index_embeddings, query_embeddings):

    similarities = cosine_similarity(query_embeddings, index_embeddings)

    results = {"id": [], "images": []}

    for i, query_id in enumerate(query_ids):
        results["id"].append(query_id)
        results["images"].append(" ".join(index_ids[np.argsort(similarities[i])[-100:][::-1]]))

    return results


In [None]:
results = find_best_neighbors(index_ids, test_ids, index_embeddings, test_embeddings)

In [None]:
pd.DataFrame.from_dict(results).to_csv("submission.csv", index=False)