In [1]:
import cv2
import numpy as np
import copy
import os
import open3d as o3d

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [2]:
import os
def lat_to_scale(lat):
    """Compute Mercator scale from latitude."""
    return np.cos(lat * np.pi / 180.0)

def latlon_to_mercator(lat, lon, scale):
    """Convert lat/lon to mercator coordinates."""
    er = 6378137.0  # earth radius (m)
    mx = scale * lon * np.pi * er / 180.0
    my = scale * er * np.log(np.tan((90.0 + lat) * np.pi / 360.0))
    return mx, my

def convert_oxts_to_pose(oxts):
    """
    Converts a list of oxts measurements into metric poses.
    Each element of `oxts` is expected to be a numpy array or list:
    [lat, lon, alt, roll, pitch, yaw]
    """
    if len(oxts) == 0:
        return []

    # Compute scale from first lat value
    scale = lat_to_scale(oxts[0][0])

    poses = []
    Tr_0_inv = None

    for packet in oxts:
        if packet is None or len(packet) == 0:
            poses.append(None)
            continue

        lat, lon, alt, roll, pitch, yaw = packet[:6]

        # translation vector
        tx, ty = latlon_to_mercator(lat, lon, scale)
        tz = alt
        t = np.array([[tx], [ty], [tz]])

        # rotation matrix
        Rx = np.array([
            [1, 0, 0],
            [0, np.cos(roll), -np.sin(roll)],
            [0, np.sin(roll),  np.cos(roll)]
        ])
        Ry = np.array([
            [np.cos(pitch), 0, np.sin(pitch)],
            [0, 1, 0],
            [-np.sin(pitch), 0, np.cos(pitch)]
        ])
        Rz = np.array([
            [np.cos(yaw), -np.sin(yaw), 0],
            [np.sin(yaw),  np.cos(yaw), 0],
            [0, 0, 1]
        ])
        R = Rz @ Ry @ Rx

        # homogeneous transform
        Tr = np.vstack((np.hstack((R, t)), [0, 0, 0, 1]))

        # normalize translation and rotation (start at 0/0/0)
        if Tr_0_inv is None:
            Tr_0_inv = np.linalg.inv(Tr)

        pose = Tr_0_inv @ Tr
        poses.append(pose)

    return poses


def load_oxts_lite_data(base_dir, frames=None):
    """
    Reads KITTI GPS/IMU (OXTSlite) data from files.
    
    Args:
        base_dir (str): Path to sequence directory (should contain "oxts/data/").
        frames (list[int] or None): If None, load all frames. Otherwise load only the given frames.
    
    Returns:
        list of numpy arrays: Each entry is one OXTS packet (30 floats).
                             Missing files → None.
    """
    data_dir = os.path.join(base_dir, "oxts", "data")

    oxts = []

    if frames is None:
        # load all files in directory, sorted by index
        all_files = sorted(os.listdir(data_dir))
        for fname in all_files:
            if fname.endswith(".txt"):
                fpath = os.path.join(data_dir, fname)
                try:
                    arr = np.loadtxt(fpath)
                    oxts.append(arr)
                except Exception:
                    oxts.append(None)
    else:
        # load only requested frames
        for idx in frames:
            fname = f"{idx:010d}.txt"   # MATLAB used (frames(i)-1), but KITTI indexes from 0 already
            fpath = os.path.join(data_dir, fname)
            try:
                arr = np.loadtxt(fpath)
                oxts.append(arr)
            except Exception:
                oxts.append(None)

    return oxts

def load_images(base_dir,frames):
    data_dir_left = os.path.join(base_dir,"image_02","data")
    data_dir_right = os.path.join(base_dir,"image_03","data")
    
    left_images = []
    right_images = []
    
    for idx in frames:
        fname = f"{idx:010d}.png"   # MATLAB used (frames(i)-1), but KITTI indexes from 0 already
        fpath_left = os.path.join(data_dir_left, fname)
        fpath_right = os.path.join(data_dir_right, fname)
        print(fpath_left)
        try:
            left = cv2.imread(fpath_left)
            right = cv2.imread(fpath_right)
            left_images.append(left)
            right_images.append(right)
        except Exception:
            left_images.append(None)
            right_images.append(None)
    return left_images,right_images        

def read_depth(base_dir,frames):
    """
    Read a KITTI .bin Velodyne file.
    Each point has (x, y, z, reflectance).
    
    Args:
        bin_path (str): Path to the .bin file.
        
    Returns:
        np.ndarray of shape (N, 4) with columns [x, y, z, reflectance].
    """
    # Each point is 4 floats (16 bytes)
    depth_dir = os.path.join(base_dir,"velodyne_points","data")
    pcds = []
    for idx in frames:
        fname = f"{idx:010d}.bin"   # MATLAB used (frames(i)-1), but KITTI indexes from 0 already
        bin_path = os.path.join(depth_dir,fname)
        point_cloud = np.fromfile(bin_path, dtype=np.float32)
        point_cloud =  point_cloud.reshape(-1,4)
        pcds.append(point_cloud[:,:3])
    return pcds

def write_kitti_bin(bin_path, points):
    """
    Write KITTI format .bin Velodyne file.
    
    Args:
        bin_path (str): Path to save .bin file.
        points (np.ndarray): Array of shape (N, 4) with [x, y, z, reflectance].
    """
    points.astype(np.float32).tofile(bin_path)

def convert_velo_2_cam(calib_file,points):
        R, T = None, None
        R_rect = [
    9.998817e-01,  1.511453e-02, -2.841595e-03, 0,
   -1.511724e-02,  9.998853e-01, -9.338510e-04, 0,
    2.827154e-03,  9.766976e-04,  9.999955e-01, 0,
    0,0,0,1
]
        points = np.hstack([points,np.ones((points.shape[0],1))])
        R_rect = np.array(R_rect).reshape(4, 4)
        with open(calib_file, "r") as f:
            for line in f:
                if line.startswith("R:"):
                    values = [float(x) for x in line.split()[1:]]
                    R = np.array(values).reshape(3, 3)
                elif line.startswith("T:"):
                    values = [float(x) for x in line.split()[1:]]
                    T = np.array(values).reshape(3, 1)      
        if R is None or T is None:
            raise ValueError("File does not contain both R and T lines")        
        # Build 4x4 transformation
        Tr = np.eye(4)
        Tr[:3, :3] = R
        Tr[:3, 3] = T.flatten()     
        T = R_rect@Tr
        print(T.shape)
        points_cam = ((T@points.T).T)[:,:3]
        return points_cam,T 

def convert_imu_2_velo(calib_file):
        R, T = None, None

        with open(calib_file, "r") as f:
            for line in f:
                if line.startswith("R:"):
                    values = [float(x) for x in line.split()[1:]]
                    R = np.array(values).reshape(3, 3)
                elif line.startswith("T:"):
                    values = [float(x) for x in line.split()[1:]]
                    T = np.array(values).reshape(3, 1)      
        if R is None or T is None:
            raise ValueError("File does not contain both R and T lines")        
        # Build 4x4 transformation
        Tr = np.eye(4)
        Tr[:3, :3] = R
        Tr[:3, 3] = T.flatten()     

        return Tr 

def show_image(img,winname="default"):
    cv2.imshow(winname,img)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

In [5]:
import numpy as np

def get_velo2cam_matrix(calib_file):
    """
    Reads a KITTI Velodyne-to-camera calibration file and returns the 4x4 transformation matrix.

    Args:
        calib_file (str): Path to calib_velo_to_cam.txt

    Returns:
        np.ndarray: 4x4 transformation matrix
    """
    R, T = None, None

    # Rectification matrix (KITTI default)
    R_rect = [
        9.998817e-01,  1.511453e-02, -2.841595e-03, 0,
       -1.511724e-02,  9.998853e-01, -9.338510e-04, 0,
        2.827154e-03,  9.766976e-04,  9.999955e-01, 0,
        0,0,0,1
    ]
    R_rect = np.array(R_rect).reshape(4, 4)

    # Read R and T from calib file
    with open(calib_file, "r") as f:
        for line in f:
            if line.startswith("R:"):
                values = [float(x) for x in line.split()[1:]]
                R = np.array(values).reshape(3, 3)
            elif line.startswith("T:"):
                values = [float(x) for x in line.split()[1:]]
                T = np.array(values).reshape(3, 1)

    if R is None or T is None:
        raise ValueError("File does not contain both R and T lines")

    # Build 4x4 transformation matrix
    Tr = np.eye(4)
    Tr[:3, :3] = R
    Tr[:3, 3] = T.flatten()

    # Multiply by rectification matrix
    T_velo2cam = R_rect @ Tr

    return T_velo2cam


In [3]:
VELO2CAM_FILE = "./data/2011-09-26-KITTI/2011_09_26/calib_velo_to_cam.txt"
BASE_DIR = "./data/2011-09-26-KITTI/2011_09_26_drive_0093_sync"
FRAMES = [63,64]
CALIB_VELO2CAM = "./data/2011-09-26-KITTI/2011_09_26/calib_velo_to_cam.txt"
CALIB_IMU2VELO = "./data/2011-09-26-KITTI/2011_09_26/calib_imu_to_velo.txt"

In [None]:
import os
import numpy as np
# -------------------------------# Set directories# -------------------------------
OXTSDIR = os.path.join(BASE_DIR, "oxts", "data")
OUTPUT_FILE = "10.txt"

all_files = sorted([f for f in os.listdir(OXTSDIR) if f.endswith(".txt")])
frame_indices = [int(f.split(".")[0]) for f in all_files]


oxts_all = load_oxts_lite_data(BASE_DIR, frames=frame_indices)


poses_all = convert_oxts_to_pose(oxts_all)


with open(OUTPUT_FILE, "w") as f:
    for pose in poses_all:
        if pose is None:
            f.write("None\n")
        else:
            pose_flat = pose.flatten()
            line = " ".join([f"{x:.6f}" for x in pose_flat])
            f.write(line + "\n")

print(f"Saved {len(poses_all)} poses to {OUTPUT_FILE}")


Saved 433 poses to 10.txt


In [6]:
# Get T_velo2cam once
T_velo2cam = get_velo2cam_matrix(VELO2CAM_FILE)

OUTPUT_FILE_CAM = "10_cam.txt"

with open(OUTPUT_FILE_CAM, "w") as f:
    for pose in poses_all:
        if pose is None:
            f.write("None\n")
        else:
            # Transform the pose to camera frame
            pose_cam = T_velo2cam @ pose
            # Flatten and save
            pose_flat = pose_cam.flatten()
            line = " ".join([f"{x:.6f}" for x in pose_flat])
            f.write(line + "\n")

print(f"Saved {len(poses_all)} poses to {OUTPUT_FILE_CAM} (camera frame)")


Saved 433 poses to 10_cam.txt (camera frame)


In [106]:
left_images,right_images = load_images(BASE_DIR,FRAMES)
velo_depth = read_depth(BASE_DIR,FRAMES)
oxts = load_oxts_lite_data(BASE_DIR,FRAMES)
poses = convert_oxts_to_pose(oxts) 
pcd0 = o3d.geometry.PointCloud()
pcd0.points = o3d.utility.Vector3dVector(velo_depth[0])

pcd1 = o3d.geometry.PointCloud()
pcd1.points = o3d.utility.Vector3dVector(velo_depth[1])
TI2V = convert_imu_2_velo(CALIB_IMU2VELO)


/home/aadi_iiith/Downloads/2011_09_26_drive_0002_sync/2011_09_26/2011_09_26_drive_0002_sync/image_02/data/0000000063.png
/home/aadi_iiith/Downloads/2011_09_26_drive_0002_sync/2011_09_26/2011_09_26_drive_0002_sync/image_02/data/0000000064.png


In [107]:
o3d.visualization.draw_geometries([pcd0,pcd1])

In [58]:
o3d.visualization.draw_geometries([pcd0,pcd1.transform(poses[1])])

In [59]:
o3d.visualization.draw_geometries([pcd0.transform(T),pcd1.transform(T)])


In [108]:
import numpy as np

R = np.array([
    [9.998817e-01,  1.511453e-02, -2.841595e-03],
    [-1.511724e-02, 9.998853e-01, -9.338510e-04],
    [2.827154e-03,  9.766976e-04,  9.999955e-01]
])

P = np.array([
    [7.215377e+02, 0.000000e+00, 6.095593e+02, 4.485728e+01],
    [0.000000e+00, 7.215377e+02, 1.728540e+02, 2.163791e-01],
    [0.000000e+00, 0.000000e+00, 1.000000e+00, 2.745884e-03]
])

def recover_K_from_P_and_R(P, R):
    """
    Given P (3x4) and R (3x3), return intrinsic K (3x3).
    Handles the common cases where R is a rotation (uses R.T),
    otherwise uses np.linalg.inv(R).
    Ensures positive diagonal in K and normalizes K[2,2]=1.
    """
    P = np.asarray(P, dtype=float)
    R = np.asarray(R, dtype=float)
    if P.shape != (3, 4):
        raise ValueError("P must be shape (3,4)")
    if R.shape != (3, 3):
        raise ValueError("R must be shape (3,3)")

    M = P[:, :3]  # left 3x3 block

    # Use R.T if R is (almost) orthonormal; otherwise use inverse
    if np.allclose(R @ R.T, np.eye(3), atol=1e-6):
        Rinv = R.T
    else:
        Rinv = np.linalg.inv(R)

    K_raw = M @ Rinv

    # Fix sign on diagonal: enforce positive diagonal entries
    diag_sign = np.sign(np.diag(K_raw))
    # replace zeros with +1 (in case any diag element is exactly 0)
    diag_sign[diag_sign == 0] = 1.0
    S = np.diag(diag_sign)
    K_signed = K_raw @ S
    # Optionally adjust R as well: R_corrected = S @ R   (not returned here)

    # Normalize so that K[2,2] == 1
    K = K_signed / K_signed[2, 2]

    return K

# Example usage:
# P = np.array([...]).reshape(3,4)
# R = np.array([...]).reshape(3,3)
K = recover_K_from_P_and_R(P, R)
print(K)


[[ 7.19723460e+02 -1.14769478e+01  6.11599207e+02]
 [ 1.04145690e+01  7.21296766e+02  1.73558727e+02]
 [-2.84160779e-03 -9.33855202e-04  1.00000000e+00]]


In [163]:
disp0 = cv2.imread("depth_raw_0.png",cv2.IMREAD_GRAYSCALE)
print(disp0.shape)
disp0 = cv2.resize(disp0,(1242,375))
disp1 = cv2.imread("depth_raw_1.png",cv2.IMREAD_GRAYSCALE)
disp1 = cv2.resize(disp1,(1242,375))
disp1 = disp1.astype(np.float32)
disp1 = disp1*0.75
disp1 [:disp1.shape[0]//2,:] = 0
disp0 = disp0.astype(np.float32)
disp0 = disp0*0.75
disp0 [:disp0.shape[0]//2,:] = 0
depth0  = (388.68)/disp0   
depth1  = (388.68)/disp1
   
rgbd_ref = o3d.geometry.RGBDImage.create_from_color_and_depth(
    o3d.geometry.Image(left_images[0]),
    o3d.geometry.Image(depth0.astype(np.float32)),
    depth_scale=1, depth_trunc=3000.0, convert_rgb_to_intensity=False)
intrinsic = o3d.camera.PinholeCameraIntrinsic()
intrinsic.intrinsic_matrix = K
pcd_0 = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd_ref, intrinsic)

rgbd_ref = o3d.geometry.RGBDImage.create_from_color_and_depth(
    o3d.geometry.Image(left_images[1]),
    o3d.geometry.Image(depth1.astype(np.float32)),
    depth_scale=1, depth_trunc=3000.0, convert_rgb_to_intensity=False)
intrinsic = o3d.camera.PinholeCameraIntrinsic()
intrinsic.intrinsic_matrix = K
pcd_1 = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd_ref, intrinsic)


(384, 1248)


  depth0  = (388.68)/disp0
  depth1  = (388.68)/disp1


In [134]:
cam_points,T = convert_velo_2_cam(CALIB_VELO2CAM,velo_depth[0])
pcdcam0 = o3d.geometry.PointCloud()
pcdcam0.points = o3d.utility.Vector3dVector(cam_points)
cam_points1,T = convert_velo_2_cam(CALIB_VELO2CAM,velo_depth[1])
pcdcam1 = o3d.geometry.PointCloud()
pcdcam1.points = o3d.utility.Vector3dVector(cam_points1)

(4, 4)
(4, 4)


In [145]:
poses[1] = T@poses[1]@np.linalg.inv(T)

In [158]:
import numpy as np

pose_noisy = poses[1].copy()

# Generate a random Gaussian 3D vector
noise = np.random.normal(0, 1, size=3)

# Normalize and scale to magnitude 0.1
noise = noise / np.linalg.norm(noise) * 0.5
# Add to translation
pose_noisy[:3, 3] += noise

print("Original translation:", poses[1][:3, 3])
print("Noise vector (|noise|=%.3f):" % np.linalg.norm(noise), noise)
print("Noisy translation:   ", pose_noisy[:3, 3])


Original translation: [0.04137541 0.04645703 1.18382996]
Noise vector (|noise|=0.500): [ 0.41068614  0.26318798 -0.10985892]
Noisy translation:    [0.45206155 0.30964501 1.07397104]


In [159]:
o3d.visualization.draw_geometries([pcd_0,pcd_1.transform(poses[1])])


In [52]:
o3d.visualization.draw_geometries([pcd_0,pcdcam0])
# o3d.visualization.draw_geometries([pcd_1,pcdcam1])


In [164]:
import copy
pcd_1_noise = copy.deepcopy(pcd_1)
pcd_1_noise.transform(pose_noisy)
o3d.visualization.draw_geometries([pcd_0,pcd_1_noise])

In [165]:
import open3d as o3d
import numpy as np

# Make copies to avoid modifying originals
source = pcd_1_noise  # source cloud (moved by initial pose)
target = pcd_0        # target cloud

# # Downsample (optional, helps ICP)
source_down = source.voxel_down_sample(voxel_size=0.1)
target_down = target.voxel_down_sample(voxel_size=0.1)

# No normals needed for point-to-point ICP, but won’t hurt if already estimated

# Run ICP (point-to-point)
threshold = 0.75  # distance threshold
trans_init = np.eye(4)  # initial guess

icp_result = o3d.pipelines.registration.registration_icp(
    source_down, target_down, threshold, trans_init,
    o3d.pipelines.registration.TransformationEstimationPointToPoint()
)

print("ICP fitness:", icp_result.fitness)
print("ICP inlier RMSE:", icp_result.inlier_rmse)
print("Estimated transformation:\n", icp_result.transformation @ poses[1])

# Apply ICP transformation to the original high-res source
source.transform(icp_result.transformation)

# Visualize aligned point clouds
o3d.visualization.draw_geometries([target, source])


ICP fitness: 0.6654592595416937
ICP inlier RMSE: 0.23076772991673317
Estimated transformation:
 [[ 9.99886551e-01  2.96943821e-03  1.47671009e-02 -5.67289932e-01]
 [-2.98679718e-03  9.99994874e-01  1.15358330e-03 -2.43166811e-01]
 [-1.47636000e-02 -1.19755871e-03  9.99890295e-01  1.72311722e+00]
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00  1.00000000e+00]]


In [129]:
print("Estimated transformation:\n", icp_result.transformation @ pose_noisy)

print(poses[1])

Estimated transformation:
 [[ 9.99999492e-01 -4.59937736e-04  8.97313147e-04 -2.75052322e-02]
 [ 4.58316128e-04  9.99998264e-01  1.80626775e-03  1.77621466e-02]
 [-8.98142638e-04 -1.80585555e-03  9.99997966e-01  1.18447069e+00]
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00  1.00000000e+00]]
[[ 9.99997712e-01  1.27636482e-03  1.71671717e-03 -5.41079112e-04]
 [-1.27807702e-03  9.99998687e-01  9.96494015e-04  6.73737006e-03]
 [-1.71544330e-03 -9.98685796e-04  9.99998030e-01  1.18557022e+00]
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00  1.00000000e+00]]
