# VisualOdometrySLAM

In [1]:
import random

import cv2
import numpy as np
import pandas as pd

def seed_all(seed: int) -> None:
    np.random.seed(seed)
    random.seed(seed)
    cv2.setRNGSeed(seed)

In [2]:
SEED = 42
seed_all(SEED)

In [3]:
from pathlib import Path

DATA_PATH = Path('../data/')
DATA_PATH.mkdir(parents=True, exist_ok=True)

DATA_PATH_VIDEO = DATA_PATH /Path('visual_odometry/')
DATA_PATH_VIDEO.mkdir(parents=True, exist_ok=True)

DATA_PATH_OUTPUT = DATA_PATH / Path('output_data/')
DATA_PATH_OUTPUT.mkdir(parents=True, exist_ok=True)

DATA_PATH_SAVE_MODELS = DATA_PATH / Path('models/')
DATA_PATH_SAVE_MODELS.mkdir(parents=True, exist_ok=True)

DATA_IMGS = Path('../imgs')
DATA_IMGS.mkdir(parents=True, exist_ok=True)

In [4]:
import sys
import os

project_path = os.path.abspath(os.path.join(os.getcwd(), ".."))
sys.path.append(project_path)

## –û–±—â–∏–µ –º–µ—Ç–æ–¥—ã

| –ü–∞—Ä–∞–º–µ—Ç—Ä                  | –ì–¥–µ –≤ YAML                     | –ß—Ç–æ –æ–∑–Ω–∞—á–∞–µ—Ç                                  | –ö–∞–∫ –æ–ø—Ä–µ–¥–µ–ª–∏—Ç—å / –æ—Ç–∫—É–¥–∞ –±–µ—Ä—ë—Ç—Å—è                 |
| ------------------------- | ------------------------------ | --------------------------------------------- | ----------------------------------------------- |
| `fx`, `fy`                | `camera_matrix.data[0]`, `[4]` | –§–æ–∫—É—Å–Ω–æ–µ —Ä–∞—Å—Å—Ç–æ—è–Ω–∏–µ –ø–æ –æ—Å—è–º X –∏ Y             | –ò–∑ –∫–∞–ª–∏–±—Ä–æ–≤–∫–∏ (–Ω–∞–ø—Ä–∏–º–µ—Ä, `cv2.calibrateCamera`) |
| `cx`, `cy`                | `camera_matrix.data[2]`, `[5]` | –ö–æ–æ—Ä–¥–∏–Ω–∞—Ç—ã –≥–ª–∞–≤–Ω–æ–π —Ç–æ—á–∫–∏ (–æ–ø—Ç–∏—á–µ—Å–∫–∏–π —Ü–µ–Ω—Ç—Ä)   | –¶–µ–Ω—Ç—Ä –∫–∞–¥—Ä–∞ –∏–ª–∏ –∏–∑ –∫–∞–ª–∏–±—Ä–æ–≤–∫–∏                   |
| `camera_matrix`           | `camera_matrix.data`           | –í–Ω—É—Ç—Ä–µ–Ω–Ω—è—è –º–∞—Ç—Ä–∏—Ü–∞ 3√ó3                        | –†–µ–∑—É–ª—å—Ç–∞—Ç –∫–∞–ª–∏–±—Ä–æ–≤–∫–∏                            |
| `distortion_model`        | `distortion_model`             | –ú–æ–¥–µ–ª—å –¥–∏—Å—Ç–æ—Ä—Å–∏–∏ (`plumb_bob`, `equidistant`) | –í—ã–±–∏—Ä–∞–µ—Ç—Å—è –ø—Ä–∏ –∫–∞–ª–∏–±—Ä–æ–≤–∫–µ, –ø–æ —Ç–∏–ø—É –æ–±—ä–µ–∫—Ç–∏–≤–∞    |
| `distortion_coefficients` | `distortion_coefficients.data` | –ö–æ—ç—Ñ—Ñ–∏—Ü–∏–µ–Ω—Ç—ã –∏—Å–∫–∞–∂–µ–Ω–∏–π (k1, k2, p1, p2, k3)   | –í—ã—Ö–æ–¥ `cv2.calibrateCamera`, `Kalibr`, ROS      |
| `image_width` / `height`  | `image_width`, `image_height`  | –†–∞–∑–º–µ—Ä –∏–∑–æ–±—Ä–∞–∂–µ–Ω–∏—è (–≤ –ø–∏–∫—Å–µ–ª—è—Ö)               | –ò–∑–≤–µ—Å—Ç–Ω–æ –ø–æ —Å–µ–Ω—Å–æ—Ä—É –∏–ª–∏ –∏–∑ –ø—Ä–∏–º–µ—Ä–∞ –∫–∞–ª–∏–±—Ä–æ–≤–∫–∏   |
| `camera_name`             | `camera_name`                  | –ù–∞–∑–≤–∞–Ω–∏–µ –∫–∞–º–µ—Ä—ã                               | –ó–∞–¥–∞—ë—Ç—Å—è –≤—Ä—É—á–Ω—É—é (–æ–ø—Ü–∏–æ–Ω–∞–ª—å–Ω–æ)                  |


In [5]:
import cv2
import numpy as np
from pathlib import Path
from typing import Union, List


def load_video_frames(video_path: Union[str, Path], target_fps: float) -> List[np.ndarray]:
    """
    –ò–∑–≤–ª–µ–∫–∞–µ—Ç –∫–∞–¥—Ä—ã –∏–∑ –≤–∏–¥–µ–æ —Å –∑–∞–¥–∞–Ω–Ω–æ–π —á–∞—Å—Ç–æ—Ç–æ–π –∏ –ø–µ—Ä–µ–≤–æ–¥–∏—Ç –∏—Ö –≤ –æ—Ç—Ç–µ–Ω–∫–∏ —Å–µ—Ä–æ–≥–æ.

    :param video_path: –ø—É—Ç—å –∫ –≤–∏–¥–µ–æ—Ñ–∞–π–ª—É
    :param target_fps: —Ü–µ–ª–µ–≤–∞—è —á–∞—Å—Ç–æ—Ç–∞ –æ–±—Ä–∞–±–æ—Ç–∫–∏ –∫–∞–¥—Ä–æ–≤
    :return: —Å–ø–∏—Å–æ–∫ –∫–∞–¥—Ä–æ–≤ (–≤ –æ—Ç—Ç–µ–Ω–∫–∞—Ö —Å–µ—Ä–æ–≥–æ)
    """
    video_path = Path(video_path)
    if not video_path.exists():
        raise FileNotFoundError(f'–í–∏–¥–µ–æ –Ω–µ –Ω–∞–π–¥–µ–Ω–æ –ø–æ –ø—É—Ç–∏: {video_path.resolve()}')

    cap = cv2.VideoCapture(str(video_path))
    if not cap.isOpened():
        raise IOError(f'–ù–µ —É–¥–∞–ª–æ—Å—å –æ—Ç–∫—Ä—ã—Ç—å –≤–∏–¥–µ–æ—Ñ–∞–π–ª: {video_path}')

    video_fps = cap.get(cv2.CAP_PROP_FPS)
    frame_interval = int(video_fps / target_fps)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    duration_sec = total_frames / video_fps
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    print(f"–ò–Ω—Ñ–æ—Ä–º–∞—Ü–∏—è –æ –≤–∏–¥–µ–æ:")
    print(f"- –ò–º—è —Ñ–∞–π–ª–∞: {video_path.name}")
    print(f"- –†–∞–∑–º–µ—Ä –∫–∞–¥—Ä–∞: {frame_width} x {frame_height}")
    print(f"- –û—Ä–∏–≥–∏–Ω–∞–ª—å–Ω—ã–π FPS: {video_fps:.2f}")
    print(f"- –û–±—â–µ–µ —á–∏—Å–ª–æ –∫–∞–¥—Ä–æ–≤: {total_frames}")
    print(f"- –î–ª–∏—Ç–µ–ª—å–Ω–æ—Å—Ç—å –≤–∏–¥–µ–æ: {duration_sec:.2f} —Å–µ–∫—É–Ω–¥")
    print(f"- –¶–µ–ª–µ–≤–∞—è —á–∞—Å—Ç–æ—Ç–∞ –æ–±—Ä–∞–±–æ—Ç–∫–∏: {target_fps} FPS")
    print(f"- –ë—É–¥–µ—Ç –æ–±—Ä–∞–±–∞—Ç—ã–≤–∞—Ç—å—Å—è –∫–∞–∂–¥—ã–π {frame_interval}-–π –∫–∞–¥—Ä")

    processed_frames = []
    frame_idx = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if frame_idx % frame_interval == 0:
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            processed_frames.append(gray)

        frame_idx += 1

    cap.release()
    print(f"–û–±—Ä–∞–±–æ—Ç–∞–Ω–æ –∫–∞–¥—Ä–æ–≤: {len(processed_frames)} –ø—Ä–∏ —Ü–µ–ª–µ–≤–æ–º FPS: {target_fps}")
    return processed_frames


def load_kitti_sequence(path: Union[str, Path], grayscale: bool = True) -> List[np.ndarray]:
    image_paths = sorted(Path(path).glob("*.png"))
    frames = []
    for p in image_paths:
        img = cv2.imread(str(p), cv2.IMREAD_GRAYSCALE if grayscale else cv2.IMREAD_COLOR)
        if img is not None:
            frames.append(img)
    return frames


## ORB params

| –ü–∞—Ä–∞–º–µ—Ç—Ä        | –¢–∏–ø     | –ó–Ω–∞—á–µ–Ω–∏–µ –ø–æ —É–º–æ–ª—á–∞–Ω–∏—é  | –û–ø–∏—Å–∞–Ω–∏–µ –∏ —Ä–µ–∫–æ–º–µ–Ω–¥–∞—Ü–∏–∏                                                                                                         |
| --------------- | ------- | ---------------------- | ------------------------------------------------------------------------------------------------------------------------------- |
| `nfeatures`     | `int`   | 500                    | –ú–∞–∫—Å–∏–º–∞–ª—å–Ω–æ–µ –∫–æ–ª–∏—á–µ—Å—Ç–≤–æ —Ñ–∏—á, –∫–æ—Ç–æ—Ä—ã–µ ORB –ø–æ–ø—ã—Ç–∞–µ—Ç—Å—è –Ω–∞–π—Ç–∏. <br>üîß –£–≤–µ–ª–∏—á—å –¥–æ 3000‚Äì5000 –¥–ª—è –±–æ–≥–∞—Ç–æ–π —Å—Ü–µ–Ω—ã –∏–ª–∏ —Å–µ—Ç–æ—á–Ω–æ–π –¥–µ—Ç–µ–∫—Ü–∏–∏. |
| `scaleFactor`   | `float` | 1.2                    | –ú–∞—Å—à—Ç–∞–± –º–µ–∂–¥—É —É—Ä–æ–≤–Ω—è–º–∏ –ø–∏—Ä–∞–º–∏–¥—ã. <br>–ú–µ–Ω—å—à–µ ‚Äî –±–æ–ª—å—à–µ –ø–µ—Ä–µ–∫—Ä—ã—Ç–∏–π, –ª—É—á—à–µ –¥–ª—è –º–∞—Å—à—Ç–∞–±–Ω—ã—Ö —Å—Ü–µ–Ω, –Ω–æ –º–µ–¥–ª–µ–Ω–Ω–µ–µ.                       |
| `nlevels`       | `int`   | 8                      | –ö–æ–ª-–≤–æ —É—Ä–æ–≤–Ω–µ–π –≤ –ø–∏—Ä–∞–º–∏–¥–µ –∏–∑–æ–±—Ä–∞–∂–µ–Ω–∏–π. <br>üîß –£–≤–µ–ª–∏—á—å –¥–æ 12‚Äì16, –µ—Å–ª–∏ –æ–∂–∏–¥–∞–µ—Ç—Å—è –º–∞—Å—à—Ç–∞–±–Ω—ã–π –¥–∏–∞–ø–∞–∑–æ–Ω –æ–±—ä–µ–∫—Ç–æ–≤.                    |
| `edgeThreshold` | `int`   | 31                     | –û—Ç—Å—Ç—É–ø –æ—Ç –≥—Ä–∞–Ω–∏—Ü –∏–∑–æ–±—Ä–∞–∂–µ–Ω–∏—è. <br>üîß –£–º–µ–Ω—å—à–∏ –¥–æ 10‚Äì15, –µ—Å–ª–∏ —Ö–æ—á–µ—à—å –Ω–∞—Ö–æ–¥–∏—Ç—å —Ñ–∏—á–∏ —É –∫—Ä–∞—ë–≤.                                       |
| `firstLevel`    | `int`   | 0                      | –ù–∞—á–∞–ª—å–Ω—ã–π —É—Ä–æ–≤–µ–Ω—å –ø–∏—Ä–∞–º–∏–¥—ã (–æ–±—ã—á–Ω–æ 0). –ü–æ—á—Ç–∏ –Ω–∏–∫–æ–≥–¥–∞ –Ω–µ –º–µ–Ω—è–µ—Ç—Å—è.                                                               |
| `WTA_K`         | `int`   | 2                      | –ú–µ—Ç–æ–¥ –≥–µ–Ω–µ—Ä–∞—Ü–∏–∏ BRIEF-–¥–µ—Å–∫—Ä–∏–ø—Ç–æ—Ä–∞: 2 = –±—ã—Å—Ç—Ä–µ–µ, 3 –∏–ª–∏ 4 = —Ç–æ—á–Ω–µ–µ. <br>–ò—Å–ø–æ–ª—å–∑—É–π 2, –µ—Å–ª–∏ –Ω–µ—Ç –ø—Ä–æ–±–ª–µ–º —Å —Ç–æ—á–Ω–æ—Å—Ç—å—é.                |
| `scoreType`     | `int`   | `cv2.ORB_HARRIS_SCORE` | –í—ã–±–æ—Ä —Å–ø–æ—Å–æ–±–∞ —Ä–∞–Ω–∂–∏—Ä–æ–≤–∞–Ω–∏—è —É–≥–ª–æ–≤: `cv2.ORB_HARRIS_SCORE` –∏–ª–∏ `cv2.ORB_FAST_SCORE`. <br>`HARRIS` —Ç–æ—á–Ω–µ–µ, `FAST` ‚Äî –±—ã—Å—Ç—Ä–µ–µ.       |
| `patchSize`     | `int`   | 31                     | –†–∞–∑–º–µ—Ä –æ–±–ª–∞—Å—Ç–∏ –≤–æ–∫—Ä—É–≥ —Ñ–∏—á–∏ –¥–ª—è –¥–µ—Å–∫—Ä–∏–ø—Ç–æ—Ä–∞. <br>üîß 31 ‚Äî —Å—Ç–∞–Ω–¥–∞—Ä—Ç, –º–æ–∂–Ω–æ —É–≤–µ–ª–∏—á–∏—Ç—å –¥–æ 49, –µ—Å–ª–∏ –æ–±—ä–µ–∫—Ç –∫—Ä—É–ø–Ω—ã–π.                   |
| `fastThreshold` | `int`   | 20                     | –ü–æ—Ä–æ–≥ FAST-—É–≥–ª–∞: —á–µ–º –º–µ–Ω—å—à–µ ‚Äî —Ç–µ–º –±–æ–ª—å—à–µ —Ñ–∏—á. <br>üîß –°–Ω–∏–∑—å –¥–æ 5‚Äì10, –µ—Å–ª–∏ ORB –ø—Ä–æ–ø—É—Å–∫–∞–µ—Ç —Ñ–∏—á–∏.                                   |


In [6]:
import yaml
from pathlib import Path
from collections import Counter
from typing import Union, List, Optional

import cv2
import numpy as np
import open3d as o3d
import matplotlib.pyplot as plt



Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [7]:
class MapPoint:
    def __init__(self, coord_3d: np.ndarray, descriptor: np.ndarray):
        self.coord = coord_3d  # (X, Y, Z)
        self.descriptor = descriptor
        self.observations = []  # —Å–ø–∏—Å–æ–∫ (frame_idx, keypoint_idx, (x, y))


class KeyFrame:
    def __init__(self, frame_id: int, image: np.ndarray, keypoints, descriptors, pose: np.ndarray):
        self.id = frame_id
        self.image = image
        self.keypoints = keypoints
        self.descriptors = descriptors
        self.pose = pose        

In [None]:
from visual_slam.config import Config
from visual_slam.slam import SlamBase
from visual_slam.camera.camera import PinholeCamera
from visual_slam.source import DatasetSource, CameraSource

class VisualOdometrySLAM(SlamBase):

    def __init__(self, config: Config):
        super().__init__()
        self.config = config

        if config.dataset_path is not None:
            self.source = DatasetSource(config.dataset_path)
        else:
            self.source = CameraSource(
                camera_id=config.camera_id,
                width=config.width,
                height=config.height,
                fps=config.fps
            )

        # –ö–∞–º–µ—Ä–∞
        self.camera = PinholeCamera(
            width=config.width,
            height=config.height,
            fx=config.fx,
            fy=config.fy,
            cx=config.cx,
            cy=config.cy,
            dist_coeffs=config.dist_coeffs,
            fps=config.fps,
            bf=config.bf
        )

        # ORB –¥–µ—Ç–µ–∫—Ç–æ—Ä –∏ –º–∞—Ç—á–∏–Ω–≥
        self.orb = cv2.ORB_create(
            nfeatures=config.orb_nfeatures,
            scaleFactor=config.orb_scale_factor,
            nlevels=config.orb_nlevels,
            edgeThreshold=config.orb_edge_threshold,
            fastThreshold=config.orb_fast_threshold
        )
        self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)

        # –°–ª—É–∂–µ–±–Ω—ã–µ –∫–æ–Ω—Ç–µ–π–Ω–µ—Ä—ã
        self.keyframes: List = []
        self.mappoints: List = []
        self.poses: List[np.ndarray] = []

    # -----------------------------
    # –û—Å–Ω–æ–≤–Ω–æ–π —Ü–∏–∫–ª
    # -----------------------------
    def run(self):
        """–ó–∞–ø—É—Å–∫ –ø—Ä–æ—Ü–µ—Å—Å–∞ –≤–∏–∑—É–∞–ª—å–Ω–æ–π –æ–¥–æ–º–µ—Ç—Ä–∏–∏ –∏ SLAM."""
        print("‚ñ∂ –ó–∞–ø—É—Å–∫ –≤–∏–∑—É–∞–ª—å–Ω–æ–π –æ–¥–æ–º–µ—Ç—Ä–∏–∏...")

        while self.source.is_ok():
            frame, timestamp = self.source.get_frame()
            if frame is None:
                break

            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            keypoints, descriptors = self.orb.detectAndCompute(gray, None)

            # TODO: –æ–±—Ä–∞–±–æ—Ç–∫–∞ —Ñ–∏—á, –º–∞—Ç—á–∏–Ω–≥, PnP, —Ç—Ä–∏–∞–Ω–≥—É–ª—è—Ü–∏—è –∏ –æ–±–Ω–æ–≤–ª–µ–Ω–∏–µ –∫–∞—Ä—Ç—ã
            print(f"[{timestamp:.3f}] –ö–ª—é—á–µ–≤—ã—Ö —Ç–æ—á–µ–∫: {len(keypoints)}")

        print("‚úî –ó–∞–≤–µ—Ä—à–µ–Ω–æ")

    def reset(self):
        """–°–±—Ä–æ—Å —Å–æ—Å—Ç–æ—è–Ω–∏—è (–Ω–∞–ø—Ä–∏–º–µ—Ä, –ø—Ä–∏ –ø–æ–≤—Ç–æ—Ä–Ω–æ–º –∑–∞–ø—É—Å–∫–µ)."""
        self.keyframes.clear()
        self.mappoints.clear()
        self.poses.clear()
        self.source.reset()

In [9]:
import math
import time
from collections import Counter
from typing import List, Tuple

import numpy as np
import cv2
import g2o


class MatcherFilterMixin:
    @staticmethod
    def filter_by_ratio_test(knn_matches, kp1, kp2, ratio_thresh):
        good_matches = []
        pts1, pts2 = [], []

        for pair in knn_matches:
            if len(pair) < 2:
                continue
            m, n = pair
            if m.distance < ratio_thresh * n.distance:
                good_matches.append(m)
                pts1.append(kp1[m.queryIdx].pt)
                pts2.append(kp2[m.trainIdx].pt)

        return good_matches, np.float32(pts1), np.float32(pts2)

    @staticmethod
    def filter_by_max_distance(matches, pts1, pts2, max_dist):
        filtered_matches = []
        filtered_pts1, filtered_pts2 = [], []

        for i, m in enumerate(matches):
            if m.distance < max_dist:
                filtered_matches.append(m)
                filtered_pts1.append(pts1[i])
                filtered_pts2.append(pts2[i])

        return filtered_matches, np.float32(filtered_pts1), np.float32(filtered_pts2)

    @staticmethod
    def filter_unique_matches(matches, pts1, pts2):
        seen_trainIdx = set()
        unique_matches = []
        unique_pts1, unique_pts2 = [], []

        for i, m in enumerate(matches):
            if m.trainIdx not in seen_trainIdx:
                seen_trainIdx.add(m.trainIdx)
                unique_matches.append(m)
                unique_pts1.append(pts1[i])
                unique_pts2.append(pts2[i])

        return unique_matches, np.float32(unique_pts1), np.float32(unique_pts2)

    @staticmethod
    def filter_by_ransac_fundamental(matches, pts1, pts2):
        if len(matches) < 8:
            return [], np.array([]), np.array([])

        F, mask = cv2.findFundamentalMat(pts1, pts2, method=cv2.FM_RANSAC, ransacReprojThreshold=1.0)

        inlier_matches = []
        inlier_pts1, inlier_pts2 = [], []

        if mask is not None:
            for i, m in enumerate(matches):
                if mask[i]:
                    inlier_matches.append(m)
                    inlier_pts1.append(pts1[i])
                    inlier_pts2.append(pts2[i])

        return inlier_matches, np.float32(inlier_pts1), np.float32(inlier_pts2)

    @staticmethod
    def filter_by_exclusion_mask(matches, pts1, pts2, mask_exclude_regions: List[Tuple[int, int, int, int]]):
        if not mask_exclude_regions:
            return matches, pts1, pts2

        filtered_matches = []
        filtered_pts1 = []
        filtered_pts2 = []

        for i, m in enumerate(matches):
            x1, y1 = pts1[i]
            x2, y2 = pts2[i]

            excluded = any(
                (xmin <= x1 <= xmax and ymin <= y1 <= ymax) or
                (xmin <= x2 <= xmax and ymin <= y2 <= ymax)
                for (xmin, ymin, xmax, ymax) in mask_exclude_regions
            )

            if not excluded:
                filtered_matches.append(m)
                filtered_pts1.append((x1, y1))
                filtered_pts2.append((x2, y2))

        return filtered_matches, np.float32(filtered_pts1), np.float32(filtered_pts2)

    @staticmethod
    def filter_mutual_matches(
        matcher: cv2.DescriptorMatcher,
        des1: np.ndarray,
        des2: np.ndarray,
        kp1: List[cv2.KeyPoint],
        kp2: List[cv2.KeyPoint],
        matches: List[cv2.DMatch],
        pts1: np.ndarray,
        pts2: np.ndarray
    ) -> Tuple[List[cv2.DMatch], np.ndarray, np.ndarray]:
        """
        –û—Å—Ç–∞–≤–ª—è–µ—Ç —Ç–æ–ª—å–∫–æ –≤–∑–∞–∏–º–Ω—ã–µ —Å–æ–≤–ø–∞–¥–µ–Ω–∏—è (A‚ÜíB –∏ B‚ÜíA).
        
        :param matcher: –æ–±—ä–µ–∫—Ç cv2.DescriptorMatcher (–Ω–∞–ø—Ä–∏–º–µ—Ä, BFMatcher –∏–ª–∏ FlannBasedMatcher)
        :param des1: –¥–µ—Å–∫—Ä–∏–ø—Ç–æ—Ä—ã –ø–µ—Ä–≤–æ–≥–æ –∏–∑–æ–±—Ä–∞–∂–µ–Ω–∏—è
        :param des2: –¥–µ—Å–∫—Ä–∏–ø—Ç–æ—Ä—ã –≤—Ç–æ—Ä–æ–≥–æ –∏–∑–æ–±—Ä–∞–∂–µ–Ω–∏—è
        :param kp1: –∫–ª—é—á–µ–≤—ã–µ —Ç–æ—á–∫–∏ –ø–µ—Ä–≤–æ–≥–æ –∏–∑–æ–±—Ä–∞–∂–µ–Ω–∏—è
        :param kp2: –∫–ª—é—á–µ–≤—ã–µ —Ç–æ—á–∫–∏ –≤—Ç–æ—Ä–æ–≥–æ –∏–∑–æ–±—Ä–∞–∂–µ–Ω–∏—è
        :param matches: –ø—Ä—è–º—ã–µ —Å–æ–≤–ø–∞–¥–µ–Ω–∏—è A‚ÜíB
        :param pts1: –∫–æ–æ—Ä–¥–∏–Ω–∞—Ç—ã —Ç–æ—á–µ–∫ –ø–µ—Ä–≤–æ–≥–æ –∏–∑–æ–±—Ä–∞–∂–µ–Ω–∏—è
        :param pts2: –∫–æ–æ—Ä–¥–∏–Ω–∞—Ç—ã —Ç–æ—á–µ–∫ –≤—Ç–æ—Ä–æ–≥–æ –∏–∑–æ–±—Ä–∞–∂–µ–Ω–∏—è
        :return: (–≤–∑–∞–∏–º–Ω—ã–µ —Å–æ–≤–ø–∞–¥–µ–Ω–∏—è, pts1, pts2)
        """
        reverse_matches = matcher.match(des2, des1)
        reverse_set = {(m.trainIdx, m.queryIdx) for m in reverse_matches}

        mutual_matches = []
        mutual_pts1, mutual_pts2 = [], []

        for i, m in enumerate(matches):
            if (m.queryIdx, m.trainIdx) in reverse_set:
                mutual_matches.append(m)
                mutual_pts1.append(pts1[i])
                mutual_pts2.append(pts2[i])

        return mutual_matches, np.float32(mutual_pts1), np.float32(mutual_pts2)

In [11]:
import cv2
from pathlib import Path

VIDEO_PATH = DATA_PATH / 'KITTI_sequence_1/image_l'
CALIBRATION_PATH = DATA_PATH / 'KITTI_sequence_1/calib.txt'
TARGET_FPS = 2.0

if not VIDEO_PATH.exists():
    raise FileNotFoundError(f'–í–∏–¥–µ–æ –Ω–µ –Ω–∞–π–¥–µ–Ω–æ –ø–æ –ø—É—Ç–∏: {VIDEO_PATH.resolve()}')

if not CALIBRATION_PATH.exists():
    raise FileNotFoundError(f'–ö–∞–ª–∏–±—Ä–æ–≤–æ—á–Ω—ã–π —Ñ–∞–π–ª –Ω–µ –Ω–∞–π–¥–µ–Ω: {CALIBRATION_PATH.resolve()}')

params = {
    "target_fps": TARGET_FPS,  # —á–∞—Å—Ç–æ—Ç–∞ –æ–±—Ä–∞–±–æ—Ç–∫–∏ –∫–∞–¥—Ä–æ–≤

    # ORB-–ø–∞—Ä–∞–º–µ—Ç—Ä—ã
    "nfeatures": 100,             # [default=500] –±–æ–ª—å—à–µ —Ñ–∏—á ‚Äî –≤—ã—à–µ –ø–ª–æ—Ç–Ω–æ—Å—Ç—å –ø–æ–∫—Ä—ã—Ç–∏—è —Å—Ü–µ–Ω—ã
    "fastThreshold": 10,          # [default=20] –Ω–∏–∂–µ –ø–æ—Ä–æ–≥ ‚Äî —á—É–≤—Å—Ç–≤–∏—Ç–µ–ª—å–Ω–µ–µ –∫ —Å–ª–∞–±—ã–º —É–≥–ª–∞–º
    "edgeThreshold": 20,           # [default=31] –ø–æ–∑–≤–æ–ª—è–µ—Ç –¥–µ—Ç–µ–∫—Ç–∏—Ä–æ–≤–∞—Ç—å –±–ª–∏–∂–µ –∫ –∫—Ä–∞—è–º –∫–∞–¥—Ä–∞
    "scaleFactor": 1.2,           # [default=1.2] –º–∞—Å—à—Ç–∞–± –º–µ–∂–¥—É —É—Ä–æ–≤–Ω—è–º–∏ –ø–∏—Ä–∞–º–∏–¥—ã (–º–µ–Ω—å—à–µ ‚Äî –º–µ–¥–ª–µ–Ω–Ω–µ–µ, –Ω–æ —Ç–æ—á–Ω–µ–µ)
    "nlevels": 8,                 # [default=8] —á–∏—Å–ª–æ —É—Ä–æ–≤–Ω–µ–π –ø–∏—Ä–∞–º–∏–¥—ã ‚Äî –≤—ã—à–µ = –ª—É—á—à–µ —É—Å—Ç–æ–π—á–∏–≤–æ—Å—Ç—å –∫ –º–∞—Å—à—Ç–∞–±—É

    # –§–∏–ª—å—Ç—Ä–∞—Ü–∏—è –º–∞—Ç—á–µ–π –ø–æ –¥–∏—Å—Ç–∞–Ω—Ü–∏–∏ (Lowe‚Äôs ratio test)
    "ratio_thresh": 0.75,          # [default=0.75] –º–µ–Ω—å—à–µ —Å—Ç—Ä–æ–≥–∞—è —Ñ–∏–ª—å—Ç—Ä–∞—Ü–∏—è
    "max_match_distance": 200,    # –º–∞–∫—Å–∏–º–∞–ª—å–Ω–∞—è –¥–æ–ø—É—Å—Ç–∏–º–∞—è –¥–∏—Å—Ç–∞–Ω—Ü–∏—è –º–∞—Ç—á–∞
    # "use_mutual_check": True,      # –≤–∫–ª—é—á–∏—Ç—å/–≤—ã–∫–ª—é—á–∏—Ç—å –≤–∑–∞–∏–º–Ω—É—é –ø—Ä–æ–≤–µ—Ä–∫—É

    # Grid-–¥–µ—Ç–µ–∫—Ç–æ—Ä (—Ä–∞–∑–¥–µ–ª–µ–Ω–∏–µ –∫–∞–¥—Ä–∞ –Ω–∞ —Å–µ—Ç–∫—É)
    "use_grid": False,             # –≤–∫–ª—é—á–µ–Ω–∏–µ/–≤—ã–∫–ª—é—á–µ–Ω–∏–µ —Å–µ—Ç–∫–∏
    "grid_rows": 4,
    "grid_cols": 6,

    # ANMS (—Ä–∞–≤–Ω–æ–º–µ—Ä–Ω–æ–µ —Ä–∞—Å–ø—Ä–µ–¥–µ–ª–µ–Ω–∏–µ —Ñ–∏—á–µ–π)
    "use_anms": True,             # –≤–∫–ª—é—á–∏—Ç—å/–≤—ã–∫–ª—é—á–∏—Ç—å ANMS –ø–æ—Å–ª–µ ORB
    "anms_count": 3000,            # —Å–∫–æ–ª—å–∫–æ —Ñ–∏—á –æ—Å—Ç–∞–≤–∏—Ç—å –ø–æ—Å–ª–µ ANMS (–ø–æ–¥–±–µ—Ä–∏ –ø–æ–¥ —Å–≤–æ—é —Å—Ü–µ–Ω—É)
    "anms_tolerance": 0.2,  # –ø–æ—Ä–æ–≥ ANMS (—á–µ–º –º–µ–Ω—å—à–µ, —Ç–µ–º –±–æ–ª–µ–µ —Ä–∞–≤–Ω–æ–º–µ—Ä–Ω–æ —Ä–∞—Å–ø—Ä–µ–¥–µ–ª–µ–Ω—ã —Ñ–∏—á–∏)

    # FLANN-–º–∞—Ç—á–µ—Ä (–±—ã—Å—Ç—Ä—ã–π –ø–æ–∏—Å–∫ –ø–æ—Ö–æ–∂–∏—Ö —Ñ–∏—á–µ–π)
    "flann_table_number": 10,
    "flann_key_size": 14,
    "flann_multi_probe_level": 2,
    "flann_checks": 50,
    
    "init_keyframes":10,         # –º–∏–Ω–∏–º–∞–ª—å–Ω–æ–µ —á–∏—Å–ª–æ KeyFrame –¥–ª—è –∏–Ω–∏—Ü–∏–∞–ª–∏–∑–∞—Ü–∏–∏ –∫–∞—Ä—Ç—ã
    "last_frame_size":10,         # –∫–æ–ª–∏—á–µ—Å—Ç–≤–æ –ø–æ—Å–ª–µ–¥–Ω–∏—Ö –∫–∞–¥—Ä–æ–≤ –¥–ª—è g2o Bundle Adjustment
    "g2o_optimize_step": 200,  # —à–∞–≥ –æ–ø—Ç–∏–º–∏–∑–∞—Ü–∏–∏ g2o (–∫–æ–ª–∏—á–µ—Å—Ç–≤–æ –∏—Ç–µ—Ä–∞—Ü–∏–π)
    "ba_obs_size": 2,            # –º–∏–Ω–∏–º–∞–ª—å–Ω–æ–µ —á–∏—Å–ª–æ –Ω–∞–±–ª—é–¥–µ–Ω–∏–π –¥–ª—è Bundle Adjustment

    # –î–æ–±–∞–≤—å —Å—é–¥–∞ –¥—Ä—É–≥–∏–µ –Ω—É–∂–Ω—ã–µ –ø–∞—Ä–∞–º–µ—Ç—Ä—ã (–Ω–∞–ø—Ä–∏–º–µ—Ä, –¥–ª—è keyframe/track decision)
    "min_tracked_points": 30,      # –º–∏–Ω–∏–º–∞–ª—å–Ω–æ–µ —á–∏—Å–ª–æ —Ç—Ä–µ–∫–∞–µ–º—ã—Ö MapPoints –¥–ª—è —É–¥–µ—Ä–∂–∞–Ω–∏—è —Ç—Ä–µ–∫–∏–Ω–≥–∞
    "keyframe_interval": 3,        # –º–∏–Ω–∏–º–∞–ª—å–Ω–æ–µ —á–∏—Å–ª–æ –∫–∞–¥—Ä–æ–≤ –º–µ–∂–¥—É KeyFrame
    # "keyframe_translation_thresh": 0.2, # –º–∏–Ω–∏–º–∞–ª—å–Ω–æ–µ —Å–º–µ—â–µ–Ω–∏–µ –∫–∞–º–µ—Ä—ã –¥–ª—è –Ω–æ–≤–æ–≥–æ KeyFrame
}

vo = VisualOdometry(
    video_path=VIDEO_PATH,
    calibration_file=CALIBRATION_PATH,
    **params
)

[KITTI] –ó–∞–≥—Ä—É–∂–µ–Ω–æ 51 –∫–∞–¥—Ä–æ–≤ –∏–∑ image_l


In [None]:
# from IPython.display import display, clear_output
# from PIL import Image
# import time
# import cv2

# num_frames_to_show = int(TARGET_FPS * 30)
# frames_to_show = vo.frames[:num_frames_to_show]

# for frame in frames_to_show:
#     rgb_frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
#     clear_output(wait=True)
#     display(Image.fromarray(rgb_frame))
#     time.sleep(1 / TARGET_FPS / 2)

In [13]:
vo.clear_data()
vo.run()

[Init] –ó–∞–ø—É—Å–∫ –∏–Ω–∏—Ü–∏–∞–ª–∏–∑–∞—Ü–∏–∏ –∫–∞—Ä—Ç—ã...
[Debug] –ù–∞–π–¥–µ–Ω–æ 8105 –∫–ª—é—á–µ–≤—ã—Ö —Ç–æ—á–µ–∫
[Debug] –ü–æ—Å–ª–µ ANMS –æ—Å—Ç–∞–ª–æ—Å—å 2763 —Ç–æ—á–µ–∫
[Init] –ü–µ—Ä–≤—ã–π –∫–∞–¥—Ä: #0, –Ω–∞–π–¥–µ–Ω–æ 2763 –∫–ª—é—á–µ–≤—ã—Ö —Ç–æ—á–µ–∫.
[Init:1] –û–±—Ä–∞–±–æ—Ç–∫–∞ –∫–∞–¥—Ä–∞ #1..............
[Debug] –ù–∞–π–¥–µ–Ω–æ 8196 –∫–ª—é—á–µ–≤—ã—Ö —Ç–æ—á–µ–∫
[Debug] –ü–æ—Å–ª–µ ANMS –æ—Å—Ç–∞–ª–æ—Å—å 2860 —Ç–æ—á–µ–∫
[Init:1] –°–æ–ø–æ—Å—Ç–∞–≤–ª–µ–Ω–æ 676 —Ñ–∏—á –º–µ–∂–¥—É –∫–∞–¥—Ä–∞–º–∏ 0 –∏ 1
[Init:1] –°—Ä–µ–¥–Ω–∏–π –ø–∞—Ä–∞–ª–ª–∞–∫—Å: 15.42
[Pose:1] –ö–∞–º–µ—Ä–∞ –Ω–∞ –ø–æ–∑–∏—Ü–∏–∏: X=0.013, Y=-0.021, Z=1.000
[Init:1] –î–æ–±–∞–≤–ª–µ–Ω KeyFrame #1
[Triangulate] –í—Å–µ–≥–æ: 676 | –≤–∞–ª–∏–¥–Ω—ã—Ö Z‚àà(0.01,200.0): 642
[Init:1] –¢—Ä–∏–∞–Ω–≥—É–ª—è—Ü–∏—è –¥–∞–ª–∞ 642 3D-—Ç–æ—á–µ–∫.
[Init:1] –î–æ–±–∞–≤–ª–µ–Ω–æ 642 –Ω–æ–≤—ã—Ö 3D-—Ç–æ—á–µ–∫ –ø–æ—Å–ª–µ —Ñ–∏–ª—å—Ä–∞—Ü–∏–∏.
[Init:1] –î–æ–±–∞–≤–ª–µ–Ω–æ 0 –Ω–æ–≤—ã—Ö –Ω–∞–±–ª—é–¥–µ–Ω–∏–π –∫ —Å—É—â–µ—Å—Ç–≤—É—é—â–∏–º MapPoints.
[Init:1] KeyFram

In [14]:
# vo.visualize_matches()

In [15]:
import open3d as o3d
import numpy as np

def create_camera_frustum(scale: float = 0.05) -> o3d.geometry.LineSet:
    """
    –°–æ–∑–¥–∞—ë—Ç 3D-–º–æ–¥–µ–ª—å –ø–∏—Ä–∞–º–∏–¥—ã –∫–∞–º–µ—Ä—ã –¥–ª—è –≤–∏–∑—É–∞–ª–∏–∑–∞—Ü–∏–∏ –≤ open3d.
    scale ‚Äì –º–∞—Å—à—Ç–∞–± —Ñ—Ä—É—Å—Ç—É–º–∞
    """
    points = np.array([
        [0, 0, 0],
        [ scale,  scale,  scale * 2],
        [ scale, -scale,  scale * 2],
        [-scale, -scale,  scale * 2],
        [-scale,  scale,  scale * 2],
    ])
    lines = [
        [0, 1], [0, 2], [0, 3], [0, 4],
        [1, 2], [2, 3], [3, 4], [4, 1]
    ]
    frustum = o3d.geometry.LineSet()
    frustum.points = o3d.utility.Vector3dVector(points)
    frustum.lines = o3d.utility.Vector2iVector(lines)
    frustum.paint_uniform_color([0.0, 0.6, 1.0])
    return frustum


def check_and_visualize_mappoints(vo):
    """
    –ü—Ä–æ–≤–µ—Ä—è–µ—Ç –∏ –≤–∏–∑—É–∞–ª–∏–∑–∏—Ä—É–µ—Ç 3D MapPoints –ø–æ—Å–ª–µ –∏–Ω–∏—Ü–∏–∞–ª–∏–∑–∞—Ü–∏–∏.
    vo: —ç–∫–∑–µ–º–ø–ª—è—Ä –∫–ª–∞—Å—Å–∞ VisualOdometry
    """
    if not vo.mappoints:
        print("[Check] –ù–µ—Ç MapPoints –¥–ª—è –æ—Ç–æ–±—Ä–∞–∂–µ–Ω–∏—è.")
        return

    coords = np.array([mp.coord for mp in vo.mappoints if np.all(np.isfinite(mp.coord))])

    if coords.size == 0:
        print("[Check] –í—Å–µ —Ç–æ—á–∫–∏ –Ω–µ–≤–∞–ª–∏–¥–Ω—ã.")
        return

    # –°—Ç–∞—Ç–∏—Å—Ç–∏–∫–∞
    depths = coords[:, 2]
    print(f"[Check] –í—Å–µ–≥–æ —Ç–æ—á–µ–∫: {len(coords)}")
    print(f"[Check] –ì–ª—É–±–∏–Ω–∞ Z: mean={depths.mean():.3f}, median={np.median(depths):.3f}, min={depths.min():.3f}, max={depths.max():.3f}")

    print(f"[Check] X: mean={coords[:,0].mean():.3f}, min={coords[:,0].min():.3f}, max={coords[:,0].max():.3f}")
    print(f"[Check] Y: mean={coords[:,1].mean():.3f}, min={coords[:,1].min():.3f}, max={coords[:,1].max():.3f}")

    print("[Check] –ü—Ä–∏–º–µ—Ä—ã —Ç–æ—á–µ–∫:")
    for i in range(min(5, len(coords))):
        print(f"   {coords[i]}")

    # –°–æ–∑–¥–∞—ë–º PointCloud –¥–ª—è Open3D
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(coords)
    pcd.paint_uniform_color([1.0, 0.0, 0.0])  # –∫—Ä–∞—Å–Ω—ã–µ —Ç–æ—á–∫–∏

    # –î–æ–±–∞–≤–ª—è–µ–º –∫–∞–º–µ—Ä—ã (–µ—Å–ª–∏ –µ—Å—Ç—å –ø–æ–∑—ã)
    geometries = [pcd]
    for pose in vo.poses:
        frustum = create_camera_frustum(scale=5).transform(pose)
        geometries.append(frustum)

    # –í–∏–∑—É–∞–ª–∏–∑–∞—Ü–∏—è
    o3d.visualization.draw_geometries(geometries, window_name="MapPoints –ø–æ—Å–ª–µ –∏–Ω–∏—Ü–∏–∞–ª–∏–∑–∞—Ü–∏–∏")

# check_and_visualize_mappoints(vo)