In [30]:
import os
import sys

sys.path.append("..")

from silk_lib.common import SILK_MATCHER, get_model, load_images

from silk.backbones.silk.silk import from_feature_coords_to_image_coords
from silk.cli.image_pair_visualization import create_img_pair_visual, save_image
from lib.unproject_points import unproject_points
from lib.find_pose import find_relative_pose
from torch.utils.data import DataLoader
from yacs.config import CfgNode as CN
import unittest
from lib.dataset.mapfree import MapFreeDataset
import skimage.io as io
import numpy as np
from lib.camera import Camera
import torch
from torchvision import transforms
import matplotlib.pyplot as plt
from lib.utils.data import data_to_model_device
from lib.rot3 import Rot3
from lib.pose3 import Pose3
from collections import defaultdict

In [2]:
checkpoint = "../../silk/assets/models/silk/coco-rgb-aug.ckpt"
model = get_model(checkpoint=checkpoint, default_outputs=("sparse_positions", "sparse_descriptors"))

[32m2024-07-31 15:14:58.103[0m | [34m[1mDEBUG   [0m | [36mfsspec.implementations.local[0m:[36m__init__[0m:[36m347[0m - [34m[1mopen file: /home/vgmachinist/Desktop/Projects/mapfree/notebooks/../../silk/assets/models/silk/coco-rgb-aug.ckpt[0m


In [3]:
sift_config = "../config/sift/sift_config.yaml"
checkpoint = "../../silk/assets/models/silk/coco-rgb-aug.ckpt"
data_dir = "../lib/tests/test_data"
dataset_config = os.path.join(data_dir, "testset.yaml")

node = CN()
node.set_new_allowed(True)
node.merge_from_file(dataset_config)
node.DEBUG = False

# explicitely setting to None because if loading from yaml it's a string
node.DATASET.SCENES = None
node.DATASET.AUGMENTATION_TYPE = None
node.DATASET.DATA_ROOT = os.path.join("..", node.DATASET.DATA_ROOT)
node.DATASET.DEPTH_ROOT = os.path.join("..", node.DATASET.DEPTH_ROOT)
dataset = MapFreeDataset(node, "val")

In [23]:
def run_one(img1, img2, img1_depth, img2_depth, camera1, camera2, depth_scale, model) -> tuple:
    grayscale = transforms.Compose([
        transforms.Grayscale(num_output_channels=1),  # Convert image to grayscale  
    ])
    grayscale_img1 = img1 if img1.shape[0] == 1 else grayscale(img1)
    grayscale_img2 = img2 if img2.shape[0] == 1 else grayscale(img2)
    grayscale_img1 = grayscale_img1.unsqueeze(0)
    grayscale_img2 = grayscale_img2.unsqueeze(0)

    with torch.no_grad():
        sparse_positions_1, sparse_descriptors_1 = model(grayscale_img1)
        sparse_positions_2, sparse_descriptors_2 = model(grayscale_img2)        

    sparse_positions_1 = from_feature_coords_to_image_coords(model, sparse_positions_1)
    sparse_positions_2 = from_feature_coords_to_image_coords(model, sparse_positions_2)
    
    matches = SILK_MATCHER(sparse_descriptors_1[0], sparse_descriptors_2[0])

    pts1 = sparse_positions_1[0][matches[:, 0]].detach().cpu().numpy()
    pts2 = sparse_positions_2[0][matches[:, 1]].detach().cpu().numpy()
    pts1 = pts1[:, :-1]
    pts2 = pts2[:, :-1]


    img1_depth = img1_depth.detach().cpu().numpy()
    img2_depth = img2_depth.detach().cpu().numpy()
    print(img1_depth.shape)
    print(img1.shape)
    pts1 = pts1[:, [1, 0]]
    pts2 = pts2[:, [1, 0]]
    print(pts1)
    pts1_3d = unproject_points(pts1, img1_depth, camera1, depth_scale)
    pts2_3d = unproject_points(pts2, img2_depth, camera2, depth_scale)

    (R, t), inliers = find_relative_pose(
        pts1_3d,
        pts2_3d,
        ransac_iterations=100,
        inlier_threshold=0.15,
        num_matches=3,
    )
    return R, t, inliers
        

In [24]:
loader = DataLoader(dataset, batch_size=1)

for data in loader:
    data = data_to_model_device(data, model)
    img1 = data["image0"].squeeze()
    img2 = data["image1"].squeeze()
    img1_depth = data["depth0"].squeeze()
    img2_depth = data["depth1"].squeeze()
    
    K1 = data["K_color0"].detach().cpu().numpy().squeeze()
    K2 = data["K_color1"].detach().cpu().numpy().squeeze()
    camera1 = Camera.from_K(K1, img1.shape[1], img1.shape[0])
    camera2 = Camera.from_K(K2, img2.shape[1], img2.shape[0])
    frame_num = data["pair_names"][1][0][-9:-4]
    R, t, inliers = run_one(img1, img2, img1_depth, img2_depth, camera1, camera2, 1.0, model)
    

width: tensor([540.], device='cuda:0', dtype=torch.float64)
(720, 540)
torch.Size([3, 720, 540])
[[ 41.5 218.5]
 [ 51.5 219.5]
 [ 52.5 219.5]
 [ 47.5 220.5]
 [105.5 221.5]
 [105.5 223.5]
 [ 19.5 224.5]
 [ 20.5 224.5]
 [134.5 238.5]
 [134.5 239.5]
 [ 15.5 248.5]
 [ 18.5 248.5]
 [ 64.5 255.5]
 [ 16.5 256.5]
 [211.5 257.5]
 [ 60.5 260.5]
 [ 61.5 260.5]
 [ 60.5 261.5]
 [ 61.5 262.5]
 [241.5 265.5]
 [173.5 284.5]
 [175.5 284.5]
 [172.5 285.5]
 [ 40.5 299.5]
 [ 26.5 300.5]
 [315.5 332.5]
 [273.5 338.5]
 [154.5 347.5]
 [296.5 380.5]
 [263.5 381.5]
 [203.5 388.5]
 [472.5 475.5]
 [362.5 507.5]
 [ 98.5 518.5]
 [ 99.5 518.5]]
width: tensor([540.], device='cuda:0', dtype=torch.float64)
(720, 540)
torch.Size([3, 720, 540])
[[ 69.5 202.5]
 [ 69.5 203.5]
 [ 69.5 204.5]
 [288.5 205.5]
 [ 41.5 219.5]
 [ 50.5 219.5]
 [ 19.5 226.5]
 [134.5 239.5]
 [ 14.5 249.5]
 [ 62.5 253.5]
 [210.5 255.5]
 [211.5 255.5]
 [ 33.5 257.5]
 [ 33.5 258.5]
 [173.5 284.5]
 [174.5 284.5]
 [172.5 285.5]
 [174.5 286.5]
 [159.5 28

In [67]:
grayscale = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),  # Convert image to grayscale  
])
grayscale_img1 = grayscale(img1).unsqueeze(0)
grayscale_img1.shape

torch.Size([1, 1, 720, 540])

In [68]:
sparse_positions_1, sparse_descriptors_1 = model(grayscale_img1)

In [71]:
sparse_positions_1[0].shape

torch.Size([10001, 3])

In [43]:
class SilkRunner:
    def __init__(self, config: str, checkpoint: str) -> None:
        self._config = CN()
        self._config.set_new_allowed(True)
        if os.path.exists(config):
            self._config.merge_from_file(config)
        if os.path.exists(checkpoint):
            print("Found checkpoint path.")
            self._model = get_model(
                checkpoint=checkpoint, default_outputs=("sparse_positions", "sparse_descriptors")
            )
        else:
            print("Didn't find checkpoint path. Using default.")
            self._model = get_model(
                default_outputs=("sparse_positions", "sparse_descriptors")
            ) # use default otherwise
            
        self._model.eval()
            
    def run_one(self, img1, img2, img1_depth, img2_depth, camera1, camera2, depth_scale) -> tuple:
        # convert to grayscale first
        grayscale = transforms.Compose([
            transforms.Grayscale(num_output_channels=1),  # Convert image to grayscale  
        ])
        grayscale_img1 = img1 if img1.shape[0] == 1 else grayscale(img1)
        grayscale_img2 = img2 if img2.shape[0] == 1 else grayscale(img2)
        grayscale_img1 = grayscale_img1.unsqueeze(0)
        grayscale_img2 = grayscale_img2.unsqueeze(0)

        with torch.no_grad():
            sparse_positions_1, sparse_descriptors_1 = self._model(grayscale_img1)
            sparse_positions_2, sparse_descriptors_2 = self._model(grayscale_img2)        

        sparse_positions_1 = from_feature_coords_to_image_coords(self._model, sparse_positions_1)
        sparse_positions_2 = from_feature_coords_to_image_coords(self._model, sparse_positions_2)
        
        matches = SILK_MATCHER(sparse_descriptors_1[0], sparse_descriptors_2[0])
        
        pts1 = sparse_positions_1[0][matches[:, 0]].detach().cpu().numpy()
        pts2 = sparse_positions_2[0][matches[:, 1]].detach().cpu().numpy()
        pts1 = pts1[:, :-1]
        pts2 = pts2[:, :-1]
        
        img1_depth = img1_depth.detach().cpu().numpy()
        img2_depth = img2_depth.detach().cpu().numpy()
        pts1 = pts1[:, [1, 0]]
        pts2 = pts2[:, [1, 0]]
        pts1_3d = unproject_points(pts1, img1_depth, camera1, depth_scale)
        pts2_3d = unproject_points(pts2, img2_depth, camera2, depth_scale)
        
        (R, t), inliers = find_relative_pose(
            pts1_3d,
            pts2_3d,
            ransac_iterations=self._config.SIFT.RANSAC_ITERATIONS,
            inlier_threshold=self._config.SIFT.INLIER_THRESHOLD,
            num_matches=self._config.SIFT.NUM_MATCHES,
        )
        return R, t, inliers
        

    def run(self, data_loader: DataLoader) -> dict:
        estimated_poses = defaultdict(list)
        for data in data_loader:
            data = data_to_model_device(data, self._model)
            img1 = data["image0"].squeeze()
            img2 = data["image1"].squeeze()
            img1_depth = data["depth0"].squeeze()
            img2_depth = data["depth1"].squeeze()
            camera1 = Camera.from_K(data["K_color0"].detach().cpu().numpy().squeeze(), img1.shape[1], img1.shape[0])
            camera2 = Camera.from_K(data["K_color1"].detach().cpu().numpy().squeeze(), img2.shape[1], img2.shape[0])
            frame_num = data["pair_names"][1][0][-9:-4]
            scene = data["scene_id"][0]
            print(f"scene: {scene}")
            R, t, inliers = self.run_one(img1, img2, img1_depth, img2_depth, camera1, camera2, depth_scale=1.0)
            r = Rot3(R.squeeze())
            estimated_pose = (Pose3(r, t), float(inliers), int(frame_num))
            estimated_poses[scene].append(estimated_pose)
        return estimated_poses

In [42]:
class TestSilkRunner(unittest.TestCase):

    @classmethod
    def setUp(cls):
        
        def initConfig(dataset_config: str) -> CN:
            node = CN()
            node.set_new_allowed(True)
            node.merge_from_file(dataset_config)
            node.DEBUG = False
            
            # explicitely setting to None because if loading from yaml it's a string
            node.DATASET.SCENES = None
            node.DATASET.AUGMENTATION_TYPE = None
            return node
        
        cls.sift_config = "../config/sift/sift_config.yaml"
        cls.checkpoint = "../../silk/assets/models/silk/coco-rgb-aug.ckpt"
        cls.data_dir = "../lib/tests/test_data"
        cls.dataset_config = os.path.join(cls.data_dir, "testset.yaml")
        
        paths = [cls.sift_config, cls.checkpoint, cls.data_dir, cls.dataset_config]
        paths_exist = [
            os.path.exists(i) for i in paths
        ]
        
        if sum(paths_exist) < len(paths): 
            print("Not all paths exist :(")
            exit(1) # exit failure
            
        cls.config = initConfig(cls.dataset_config)
        cls.config.DATASET.DATA_ROOT = os.path.join("..", cls.config.DATASET.DATA_ROOT)
        cls.config.DATASET.DEPTH_ROOT = os.path.join("..", cls.config.DATASET.DEPTH_ROOT)
        cls.dataset = MapFreeDataset(cls.config, "val")
        cls.model = get_model(checkpoint=cls.checkpoint, default_outputs=("sparse_positions", "sparse_descriptors"))
    
    def test_creation(self):
        silk_runner = SilkRunner(self.sift_config, self.checkpoint)
        self.assertTrue(isinstance(silk_runner, SilkRunner))
        self.assertTrue(isinstance(self.dataset, MapFreeDataset))
        
    def test_run_one(self):
        silk_runner = SilkRunner(self.sift_config, self.checkpoint)
        data = data_to_model_device(self.dataset[0], self.model)
        img1 = data["image0"]
        img2 = data["image1"]
        depth1 = data["depth0"]
        depth2 = data["depth1"]
        camera1 = Camera.from_K(data["K_color0"].detach().cpu().numpy(), img1.shape[1], img1.shape[0])
        camera2 = Camera.from_K(data["K_color1"].detach().cpu().numpy(), img2.shape[1], img2.shape[0])
        R, t, inliers = silk_runner.run_one(
            img1, img2, depth1, depth2, camera1, camera2, depth_scale=1.0
        )
        self.assertEqual(R.shape, (3, 3))
        self.assertEqual(t.shape, (3,))
        self.assertTrue(inliers > 0)
        
    def test_run(self):
        silk_runner = SilkRunner(self.sift_config, self.checkpoint)
        self.assertTrue(silk_runner)
        self.assertTrue(self.dataset)
        self.assertTrue(self.sift_config)
        
        loader = DataLoader(self.dataset, batch_size=1)
        estimated_poses = silk_runner.run(loader)
        self.assertEqual(len(estimated_poses), 1)

        for k, v in estimated_poses.items():
            for pose_info in v:
                pose, inliers, frame_num = pose_info
                self.assertTrue(isinstance(pose, Pose3))
                self.assertTrue(isinstance(inliers, float))
                self.assertTrue(isinstance(frame_num, int))
        
        
        
        
        
        
    
unittest.main(argv=[''], verbosity=2, exit=False)

test_creation (__main__.TestSilkRunner) ... [32m2024-07-31 15:45:14.599[0m | [34m[1mDEBUG   [0m | [36mfsspec.implementations.local[0m:[36m__init__[0m:[36m347[0m - [34m[1mopen file: /home/vgmachinist/Desktop/Projects/mapfree/notebooks/../../silk/assets/models/silk/coco-rgb-aug.ckpt[0m
[32m2024-07-31 15:45:14.650[0m | [34m[1mDEBUG   [0m | [36mfsspec.implementations.local[0m:[36m__init__[0m:[36m347[0m - [34m[1mopen file: /home/vgmachinist/Desktop/Projects/mapfree/notebooks/../../silk/assets/models/silk/coco-rgb-aug.ckpt[0m
ok
test_run (__main__.TestSilkRunner) ... [32m2024-07-31 15:45:14.718[0m | [34m[1mDEBUG   [0m | [36mfsspec.implementations.local[0m:[36m__init__[0m:[36m347[0m - [34m[1mopen file: /home/vgmachinist/Desktop/Projects/mapfree/notebooks/../../silk/assets/models/silk/coco-rgb-aug.ckpt[0m
[32m2024-07-31 15:45:14.753[0m | [34m[1mDEBUG   [0m | [36mfsspec.implementations.local[0m:[36m__init__[0m:[36m347[0m - [34m[1mopen file:

Found checkpoint path.
Found checkpoint path.
scene: s00460
scene: s00460
scene: s00460


ok
test_run_one (__main__.TestSilkRunner) ... [32m2024-07-31 15:45:15.211[0m | [34m[1mDEBUG   [0m | [36mfsspec.implementations.local[0m:[36m__init__[0m:[36m347[0m - [34m[1mopen file: /home/vgmachinist/Desktop/Projects/mapfree/notebooks/../../silk/assets/models/silk/coco-rgb-aug.ckpt[0m
[32m2024-07-31 15:45:15.254[0m | [34m[1mDEBUG   [0m | [36mfsspec.implementations.local[0m:[36m__init__[0m:[36m347[0m - [34m[1mopen file: /home/vgmachinist/Desktop/Projects/mapfree/notebooks/../../silk/assets/models/silk/coco-rgb-aug.ckpt[0m
ok

Found checkpoint path.




----------------------------------------------------------------------
Ran 3 tests in 0.864s

OK


<unittest.main.TestProgram at 0x7311a8091910>

In [None]:
IMAGE_0_PATH = "../data/val/s00460/seq1/frame_00000.jpg"
IMAGE_1_PATH = "../data/val/s00460/seq1/frame_00001.jpg"

OUTPUT_IMAGE_PATH = "./img.png"
images_0 = load_images(IMAGE_0_PATH)
images_1 = load_images(IMAGE_1_PATH)

# load model
model = get_model(default_outputs=("sparse_positions", "sparse_descriptors"))

# run model
sparse_positions_0, sparse_descriptors_0 = model(images_0)
sparse_positions_1, sparse_descriptors_1 = model(images_1)

sparse_positions_0 = from_feature_coords_to_image_coords(model, sparse_positions_0)
sparse_positions_1 = from_feature_coords_to_image_coords(model, sparse_positions_1)

# get matches
matches = SILK_MATCHER(sparse_descriptors_0[0], sparse_descriptors_1[0])

# create output image
image_pair = create_img_pair_visual(
    IMAGE_0_PATH,
    IMAGE_1_PATH,
    None,
    None,
    sparse_positions_0[0][matches[:, 0]].detach().cpu().numpy(),
    sparse_positions_1[0][matches[:, 1]].detach().cpu().numpy(),
)

save_image(
    image_pair,
    os.path.dirname(OUTPUT_IMAGE_PATH),
    os.path.basename(OUTPUT_IMAGE_PATH),
)

print(f"result saved in {OUTPUT_IMAGE_PATH}")
print("done")