In [1]:
import cv2
import numpy as np
import os
from tqdm import tqdm
import matplotlib.pyplot as plt
from utils import *

In [5]:
def undistort_image(img, K, dist):
    h, w = img.shape[:2]
    newK, _ = cv2.getOptimalNewCameraMatrix(K, dist, (w, h), 1)
    return cv2.undistort(img, K, dist, None, newK)

def rotate_by_yaw(img, yaw_deg):
    h, w = img.shape[:2]
    M = cv2.getRotationMatrix2D((w / 2, h / 2), -yaw_deg, 1.0)
    return cv2.warpAffine(img, M, (w, h))

def extract_sift(img):
    sift = cv2.SIFT_create(nfeatures=4000)
    kp, des = sift.detectAndCompute(img, None)
    return kp, des

def extract_patches(image, patch_size=512, stride=256):
    patches = []
    h, w = image.shape[:2]

    for y in range(0, h - patch_size + 1, stride):
        for x in range(0, w - patch_size + 1, stride):
            patch = image[y:y+patch_size, x:x+patch_size]
            patches.append({
                "image": patch,
                "offset": (x, y)
            })
    return patches

def match_sift_descriptors(des1, des2, ratio=0.85):
    FLANN_INDEX_KDTREE = 1
    index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5)
    search_params = dict(checks=50)

    flann = cv2.FlannBasedMatcher(index_params, search_params)
    knn = flann.knnMatch(des1, des2, k=2)

    good = []
    for m, n in knn:
        if m.distance < ratio * n.distance:
            good.append(m)
    return good

def affine_ransac(kp_d, kp_s, matches, thresh=6):
    if len(matches) < 12:
        return None, None

    src = np.float32([kp_d[m.queryIdx].pt for m in matches])
    dst = np.float32([kp_s[m.trainIdx].pt for m in matches])

    A, mask = cv2.estimateAffinePartial2D(
        src, dst,
        method=cv2.RANSAC,
        ransacReprojThreshold=thresh,
        maxIters=5000,
        confidence=0.99
    )

    if mask is None:
        return None, None

    inlier_matches = [
        m for m, keep in zip(matches, mask.ravel()) if keep
    ]

    return A, inlier_matches

def homography_ransac(kp_d, kp_s, matches, thresh=5):
    if len(matches) < 15:
        return None, []

    src = np.float32(
        [kp_d[m.queryIdx].pt for m in matches]
    ).reshape(-1, 1, 2)

    dst = np.float32(
        [kp_s[m.trainIdx].pt for m in matches]
    ).reshape(-1, 1, 2)

    H, mask = cv2.findHomography(
        src, dst,
        cv2.RANSAC,
        ransacReprojThreshold=thresh,
        maxIters=5000
    )

    if H is None or mask is None:
        return None, []

    inliers = [
        m for m, keep in zip(matches, mask.ravel()) if keep
    ]

    return H, inliers

def localize_drone_image(
    drone_gray,
    satellite_gray,
    K,
    dist,
    imu_yaw_deg=None,
    patch_size=512,
    stride=256,
    T_coarse=25,
    T_affine=12,
    T_homography=15
):
    # --- Drone preprocessing ---
    drone_gray = undistort_image(drone_gray, K, dist)

    if imu_yaw_deg is not None:
        drone_gray = rotate_by_yaw(drone_gray, imu_yaw_deg)

    kp_d, des_d = extract_sift(drone_gray)
    if des_d is None:
        return None

    patches = extract_patches(satellite_gray, patch_size, stride)

    best_match = {
        "score": 0
    }

    # --- Search over patches ---
    for p in tqdm(patches, desc="Matching satellite patches"):
        kp_s, des_s = extract_sift(p["image"])
        if des_s is None:
            continue

        good_matches = match_sift_descriptors(des_d, des_s)

        if len(good_matches) < T_coarse:
            continue

        # --- Affine verification ---
        A, affine_inliers = affine_ransac(kp_d, kp_s, good_matches)
        if A is None or len(affine_inliers) < T_affine:
            continue

        # Reject degenerate affine scales
        sx = np.linalg.norm(A[0, :2])
        sy = np.linalg.norm(A[1, :2])
        if not (0.3 < sx < 3.0 and 0.3 < sy < 3.0):
            continue

        # --- Homography refinement ---
        H, H_inliers = homography_ransac(kp_d, kp_s, affine_inliers)

        # if len(H_inliers) < T_homography:
        #     continue
        score = 0
        use_homography = False
        
        if len(H_inliers) >= 12:
            score = len(H_inliers)
            use_homography = True
        elif len(H_inliers) >= 6 and len(affine_inliers) >= 15:
            score = len(H_inliers)
            use_homography = True
        elif len(affine_inliers) >= 20:
            score = len(affine_inliers)
            use_homography = False
        else:
            continue
        # --- Keep best ---
        if len(H_inliers) > best_match["score"]:
            best_match = {
                "score": len(H_inliers),
                "kp_d": kp_d,
                "kp_s": kp_s,
                "patch": p["image"],
                "offset": p["offset"],
                "affine_A": A,
                "H": H,
                "matches": H_inliers
            }

    return best_match if best_match.get("score", 0) > 0 else None

def visualize_best_match(drone_gray, best_match, save_path=None):
    vis = cv2.drawMatches(
        drone_gray, best_match["kp_d"],
        best_match["patch"], best_match["kp_s"],
        best_match["matches"],
        None,
        flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS
    )

    plt.figure(figsize=(14,6))
    plt.imshow(vis, cmap="gray")
    plt.title(f"Drone ↔ Satellite Patch (Homography Inliers = {len(best_match['matches'])})")
    plt.axis("off")

    if save_path:
        plt.savefig(save_path, bbox_inches="tight")
    plt.show()



In [6]:
K = np.array([
    [456.46871015134053, 0.0, 643.3599454303429],
    [0.0, 455.40127946882507, 357.51076963739786],
    [0.0, 0.0, 1.0]
])

dist = np.array([
    0.03299031731836506,
   -0.03150792611905064,
   -0.0017902177017069096,
    0.00027220443810142304,
    0.0
])


In [7]:
# Load satellite map
sat_img, sat_transform, sat_crs = load_satellite_map("task_cv_model/map.tif")

# Load drone images
drone_imgs = load_drone_images("task_cv_model/train_data/drone_images/")

# Preprocess drone images
drone_imgs_proc = preprocess_drone_images(drone_imgs)

name = list(drone_imgs.keys())[200]

sat_gray = satellite_to_gray(sat_img)

drone_gray = drone_imgs_proc[name]
best_match = localize_drone_image(
    drone_gray,
    sat_gray,
    K=K,
    dist=dist
)

if best_match is None:
    print("Localization failed")
else:
    print("Homography inliers:", best_match["score"])
    print("Satellite offset:", best_match["offset"])

    # visualize_best_match(
    #     drone_gray,
    #     best_match,
    #     save_path="Output/best_homography_match.png"
    # )


Matching satellite patches: 100%|█████████████| 676/676 [01:24<00:00,  7.98it/s]

Localization failed





In [None]:
os.make_dirs(Output_new_18, exist
visualize_best_match(
    drone_gray,
    best_match,
    save_path="Output_new_18/best_homography_match.png"
)