In [1]:
import cv2
import numpy as np
from typing import Optional, Tuple, Dict, Any

def _guess_K(w: int, h: int, focal_scale: float = 0.8) -> np.ndarray:
    f = focal_scale * max(w, h)
    return np.array([[f, 0, w/2.0],
                     [0, f, h/2.0],
                     [0, 0, 1.0]], dtype=np.float64)

def estimate_egomotion(
    img1: np.ndarray,
    img2: np.ndarray,
    K: Optional[np.ndarray] = None,
    ransac_thresh: float = 1.0,
    ratio: float = 0.75,
    nfeatures: int = 4000,
    return_inliers: bool = False,
    focal_scale: float = 0.8,
) -> Dict[str, Any]:
    """
    Estimate ego-motion (R, t) from two images using ORB + Essential matrix.

    Args:
        img1, img2: First and second images (grayscale or BGR).
        K: 3x3 intrinsics. If None, guessed from image size (square pixels, centered principal point).
        ransac_thresh: RANSAC pixel threshold for Essential matrix.
        ratio: Lowe ratio for KNN descriptor filtering.
        nfeatures: ORB feature count.
        return_inliers: If True, include inlier masks and matched points.
        focal_scale: Used when K is guessed; f = focal_scale * max(W, H).

    Returns:
        dict with:
            - "R": 3x3 rotation matrix (np.ndarray)
            - "t": 3x1 translation direction (unit vector; scale unknown)
            - "K": 3x3 intrinsics used
            - "E": 3x3 Essential matrix
            - "num_inliers": int
            - optionally: "inliers1", "inliers2" (Nx2), "mask" (Nx1 uint8)
    Raises:
        ValueError if insufficient matches/inliers are found.
    """
    # ensure grayscale
    if img1.ndim == 3:
        img1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
    if img2.ndim == 3:
        img2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)

    h, w = img1.shape[:2]
    if img2.shape[:2] != (h, w):
        raise ValueError("img1 and img2 must be the same size")

    if K is None:
        K = _guess_K(w, h, focal_scale=focal_scale)

    # detect + describe
    orb = cv2.ORB_create(nfeatures=nfeatures)
    kp1, des1 = orb.detectAndCompute(img1, None)
    kp2, des2 = orb.detectAndCompute(img2, None)
    if des1 is None or des2 is None or len(kp1) < 8 or len(kp2) < 8:
        raise ValueError("Not enough keypoints/descriptors")

    # KNN match + Lowe ratio
    bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
    knn = bf.knnMatch(des1, des2, k=2)
    good = []
    for pair in knn:
        if len(pair) == 2 and pair[0].distance < ratio * pair[1].distance:
            good.append(pair[0])
    if len(good) < 8:
        raise ValueError("Not enough good matches after ratio test")

    pts1 = np.float32([kp1[m.queryIdx].pt for m in good])
    pts2 = np.float32([kp2[m.trainIdx].pt for m in good])

    # Essential matrix with RANSAC
    E, mask = cv2.findEssentialMat(pts1, pts2, K, method=cv2.RANSAC, prob=0.999, threshold=ransac_thresh)
    if E is None or mask is None or mask.sum() < 5:
        raise ValueError("Failed to estimate a valid Essential matrix")

    # Recover relative pose (up to scale)
    _, R, t, mask_pose = cv2.recoverPose(E, pts1, pts2, K, mask=mask)

    result: Dict[str, Any] = {
        "R": R,
        "t": t,  # unit-norm vector; scale is unknown in monocular
        "K": K,
        "E": E,
        "num_inliers": int(mask_pose.sum()),
    }

    if return_inliers:
        inliers1 = pts1[mask_pose.ravel() == 1]
        inliers2 = pts2[mask_pose.ravel() == 1]
        result.update({"inliers1": inliers1, "inliers2": inliers2, "mask": mask_pose})

    return result


In [1]:
import cv2
import os
from pathlib import Path
import csv
rows = [["frame", "Rotation", "Translation"]]
dirs = os.listdir("/Users/eddie/Trauma_THOMPSON/Trauma_THOMPSON/Hand Tracking Dataset/Hand Tracking Dataset/train/")
for dir in dirs: 
    root_dir = "/Users/eddie/Trauma_THOMPSON/Trauma_THOMPSON/Hand Tracking Dataset/Hand Tracking Dataset/train/" + dir + "/" + dir + "/"
    frames_root_dir = "/Users/eddie/Downloads/temp/" + dir + "/"
    r_matrices = []
    t_matrices = []
    
    num_frames = 50
    
    for index in range(len(os.listdir(root_dir))):
        if index+num_frames>=len(os.listdir(root_dir)):
            r_matrices.append("SKIP")
            t_matrices.append("SKIP")
            continue
        frame1 = cv2.imread(frames_root_dir+"frame_"+f"{index+1:06d}.jpg")
        frame2 = cv2.imread(frames_root_dir+"frame_"+f"{index+1+num_frames:06d}.jpg")
        out = estimate_egomotion(frame1, frame2, return_inliers=True)
        # print("R:\n", out["R"])
        # print("t (direction):\n", out["t"]) 
        # print(frames_root_dir+"frame_"+f"{index+1:06d}.jpg")
        print(index+1+num_frames)
        r_matrices.append(out["R"].flatten().tolist())
        t_matrices.append(out["t"].flatten().tolist())
    for i in range(len(r_matrices)):
        rows.append(["frame_"+f"{i+1:06d}", r_matrices[i], t_matrices[i]])
with open(dir+"_50_ego.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)
    writer.writerows(rows)

NameError: name 'estimate_egomotion' is not defined

In [3]:
import cv2

img1 = cv2.imread("frame-1.png")  # or grayscale already
img2 = cv2.imread("frame-2.png")

out = estimate_egomotion(img1, img2, return_inliers=True)
print("R:\n", out["R"])
print("t (direction):\n", out["t"])


R:
 [[ 9.99999422e-01 -6.54107984e-05  1.07273535e-03]
 [ 6.54981342e-05  9.99999995e-01 -8.13792110e-05]
 [-1.07273002e-03  8.14494262e-05  9.99999421e-01]]
t (direction):
 [[-0.32158841]
 [ 0.17836853]
 [ 0.92992772]]
