In [None]:
%load_ext autoreload
%autoreload 2

# Как посчитать метрику Recall@1 самому

**Внимание:** это не самое "вычислительно эффективное" решение, но оно позволяет понять, как работает метрика Recall@1.

In [None]:
from pathlib import Path
import shutil
import cv2
import albumentations as A

import torchshow as ts
import torch
from torch import Tensor
from tqdm import tqdm
import pandas as pd

import opr
import numpy as np
import faiss

from opr.models.place_recognition import BoQModel, SequenceLateFusionModel
from opr.modules.temporal import TemporalAveragePooling
from opr.pipelines.place_recognition.sequential import SequencePlaceRecognitionPipeline

In [None]:
REPO_ROOT = Path.cwd().parent
print(f"Repository root dir: {REPO_ROOT}")

DATA_DIR = REPO_ROOT / "data"
assert DATA_DIR.exists(), f"Data directory {DATA_DIR} does not exist. Please run the download script."
print(f"Data dir: {DATA_DIR}")

SUBMISSIONS_DIR = REPO_ROOT / "submissions"
SUBMISSIONS_DIR.mkdir(exist_ok=True, parents=True)
print(f"Submissions dir: {SUBMISSIONS_DIR}")

In [None]:
DATABASE_TRACK_DIR = DATA_DIR / "train-val" / "00_2023-02-10-twilight"
QUERY_TRACK_DIR = DATA_DIR / "train-val" / "01_2023-02-21-day"

print(f"Database track dir: {DATABASE_TRACK_DIR}")
print(f"Query track dir: {QUERY_TRACK_DIR}")

In [None]:
class ITLPTrackDataReader:
    def __init__(self, root: Path, image_transform: A.Compose, front_cam: bool = True, back_cam: bool = False):
        self._root = Path(root)
        self._front_cam_dir = self._root / "front_cam"
        self._back_cam_dir = self._root / "back_cam"

        self._track_df = pd.read_csv(self._root / "track.csv")
        self._image_transform = image_transform  # note that we use albumentations for image transformations
        self._front_cam = front_cam
        self._back_cam = back_cam
        if not self._front_cam and not self._back_cam:
            raise ValueError("At least one camera must be enabled: front_cam or back_cam.")

    def __len__(self) -> int:
        return len(self._track_df)

    def __getitem__(self, idx: int) -> dict[str, Tensor]:
        pose = self._track_df[["tx", "ty"]].iloc[idx].to_numpy()
        front_cam_path = self._front_cam_dir / f"{self._track_df['front_cam_ts'].iloc[idx]}.jpg"
        back_cam_path = self._back_cam_dir / f"{self._track_df['back_cam_ts'].iloc[idx]}.jpg"

        out_dict = {"pose": Tensor(pose)}

        if self._front_cam:
            front_cam_image = cv2.cvtColor(cv2.imread(str(front_cam_path)), cv2.COLOR_BGR2RGB)
            front_cam_image = self._image_transform(image=front_cam_image)["image"]  #
            out_dict["image_front_cam"] = front_cam_image
        if self._back_cam:
            back_cam_image = cv2.cvtColor(cv2.imread(str(back_cam_path)), cv2.COLOR_BGR2RGB)
            back_cam_image = self._image_transform(image=back_cam_image)["image"]
            out_dict["image_back_cam"] = back_cam_image

        return out_dict

    def collate_fn(self, batch: list[dict[str, Tensor]]) -> dict[str, Tensor]:
        collated_batch = {}
        for key in batch[0].keys():
            if key.startswith("image_"):
                collated_batch["images_" + key[6:]] = torch.stack([item[key] for item in batch])
            elif key == "pose":
                collated_batch["poses"] = torch.stack([item[key] for item in batch])
        return collated_batch

In [None]:
def setup_transforms(image_size: int = 322) -> A.Compose:  # 384 for ResNet50, 322 for DINOv2
    """Create image transformation pipeline."""
    return A.Compose(
        [
            A.CenterCrop(height=720, width=720),  # Crop to 720x720 for 1:1 aspect ratio
            A.Resize(height=image_size, width=image_size),
            A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
            A.pytorch.ToTensorV2(),
        ]
    )

In [None]:
database_reader = ITLPTrackDataReader(
    root=DATABASE_TRACK_DIR,
    image_transform=setup_transforms(image_size=322)  # Use 384 for ResNet50, 322 for DINOv2
)

batch_size = 32
database_dl = torch.utils.data.DataLoader(
    database_reader,
    batch_size=batch_size,
    shuffle=False,
    collate_fn=database_reader.collate_fn,
    drop_last=False,
)

In [None]:
model = BoQModel(backbone_name="dinov2")

In [None]:
database_for_pipe_dir = DATA_DIR / "database"
database_for_pipe_dir.mkdir(parents=True, exist_ok=True)

model = model.to("cuda")
model.eval()

descriptors_list = []
with torch.no_grad():
    for batch in tqdm(database_dl):
        batch = {k: v.to("cuda") for k, v in batch.items()}
        descriptors = model(batch)["final_descriptor"]
        descriptors_list.append(descriptors)
descriptors = torch.cat(descriptors_list, dim=0)
print(f"Descriptors shape: {descriptors.shape}")

# Create L2 distance FAISS index for nearest neighbor search
faiss_index = faiss.IndexFlatL2(descriptors.shape[1])
faiss_index.add(descriptors.cpu().numpy())
faiss.write_index(
    faiss_index,
    str(database_for_pipe_dir / "index.faiss")
)

# Copy pose data as track.csv (required by PlaceRecognitionPipeline)
shutil.copy(DATABASE_TRACK_DIR/ "track.csv", database_for_pipe_dir / "track.csv")

In [None]:
class TrackSeqWrapper:
    def __init__(self, track_data_reader: ITLPTrackDataReader, seq_len: int = 3):
        """Wrapper for ITLPTrackDataReader to provide sequences of specified length."""
        self.track_data_reader = track_data_reader
        self.seq_len = seq_len

    def __len__(self) -> int:
        return len(self.track_data_reader)

    def __getitem__(self, idx: int) -> list[dict[str, Tensor]]:
        """Get a sequence of frames up to the given index."""
        sequence = []
        for i in range(max(0, idx - self.seq_len + 1), idx + 1):
            sequence.append(self.track_data_reader[i])
        return sequence

In [None]:
query_reader = ITLPTrackDataReader(
    root=QUERY_TRACK_DIR,
    image_transform=setup_transforms(image_size=322),
)

seq_data_reader = TrackSeqWrapper(
    track_data_reader=query_reader,
    seq_len=3,
)

In [None]:
seq_model = SequenceLateFusionModel(
    model=model,
    temporal_fusion_module=TemporalAveragePooling()
)

pipe = SequencePlaceRecognitionPipeline(
    database_dir=database_for_pipe_dir,
    model=seq_model,
    use_candidate_pool_fusion=True,
)

In [None]:
# UPPER_LIMIT = np.inf
UPPER_LIMIT = 50  # SET TO SOME SMALL VALUE FOR DEBUGGING / SET TO np.inf FOR FULL RUN

output_ids = []
for i, query_seq in tqdm(enumerate(seq_data_reader)):
    if i >= UPPER_LIMIT:
        break  # Limit the number of sequences for testing
    output = pipe.infer(query_seq)
    output_ids.append(output['idx'])

## Расчет метрики Recall@1

(примерно такой же код под капотом у чекера на Яндекс.Контесте)


In [None]:
DIST_THRESHOLD = 10.0

query_df = pd.read_csv(QUERY_TRACK_DIR / "track.csv")
database_df = pd.read_csv(DATABASE_TRACK_DIR / "track.csv")
database_coords = database_df[["tx", "ty"]].values

gt_lines = []

for _, row in query_df.iterrows():
    coords = row[["tx", "ty"]].values
    distances = ((database_coords - coords) ** 2).sum(axis=1) ** 0.5
    match_indices = np.argwhere(distances < DIST_THRESHOLD).flatten()
    if match_indices.size == 0:
        gt_lines.append("-1\n")
        continue
    indices_str = " ".join(map(str, match_indices))
    gt_lines.append(f"{indices_str}\n")


matched = []
for a, gt in zip(output_ids, gt_lines[:UPPER_LIMIT]):
    if gt == "-1\n":  # -1 means that there is no true answer and we should simply skip it
        continue
    if str(a) in gt:
        matched.append(1)
    else:
        matched.append(0)
r_at_1 = sum(matched) / len(matched) if matched else 0

print(r_at_1)