In [None]:
from vita_toolkit import depth_rgb_to_pcd, project_pc_to_bev
from vita_toolkit.point_cloud.viz import visualize_point_cloud, visualize_bev
from vita_toolkit.lmdb_reader import simple_read_example, PureLMDBReader
import numpy as np 
import pickle
import os
%load_ext autoreload
%autoreload 2

In [None]:
# read from file system 
from vita_toolkit.filesystem_reader import FilesystemReader
file_path = "/home/heng.li/repo/vita-agent/test_data"
reader = FilesystemReader(file_path)



In [None]:
test_datas = list(reader.iterate_frames())
test_data = test_datas[15]

In [None]:
test_data

In [None]:
from PIL import Image
# load image as PIL from numpy arrary 
def numpy_to_pil(image: np.ndarray):
    print(image.shape)
    numpy_image = np.ascontiguousarray(image)
    # to pil 
    image = Image.fromarray(image)
    return image, numpy_image



In [None]:
pil_image, numpy_image = numpy_to_pil(test_data["left_image"])

In [None]:
import requests
from PIL import Image
import torch
from transformers import DepthProImageProcessorFast, DepthProForDepthEstimation
import matplotlib.pyplot as plt

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


image = pil_image

image_processor = DepthProImageProcessorFast.from_pretrained("apple/DepthPro-hf")
model = DepthProForDepthEstimation.from_pretrained("apple/DepthPro-hf").to(device)

inputs = image_processor(images=image, return_tensors="pt").to(device)

with torch.no_grad():
    outputs = model(**inputs)

post_processed_output = image_processor.post_process_depth_estimation(
    outputs, target_sizes=[(image.height, image.width)],
)

field_of_view = post_processed_output[0]["field_of_view"]
focal_length = post_processed_output[0]["focal_length"]
depth = post_processed_output[0]["predicted_depth"]
numpy_depth = depth.detach().cpu().numpy()
inverse_depth = 1 / numpy_depth
# Visualize inverse depth instead of depth, clipped to [0.1m;250m] range for better visualization.
max_invdepth_vizu = min(inverse_depth.max(), 1 / 0.1)
min_invdepth_vizu = max(1 / 250, inverse_depth.min())
inverse_depth_normalized = (inverse_depth - min_invdepth_vizu) / (
    max_invdepth_vizu - min_invdepth_vizu
)
# Save as color-mapped "turbo" jpg image.
cmap = plt.get_cmap("turbo")
color_depth = (cmap(inverse_depth_normalized)[..., :3] * 255).astype(np.uint8)
depth = Image.fromarray(color_depth)

In [None]:
image

In [None]:
depth

In [None]:
numpy_depth

In [None]:
import numpy as np

# Camera calibration parameters
intrinsic_params_rgb = {
    "fx": 721.744,
    "fy": 721.744, 
    "cx": 972.885,
    "cy": 590.376
}

extrinsic_matrix = np.array(
    [
        [-0.0130459, 0.999647, 0.0231345, 0.0351892],
        [0.065793, 0.0239446, -0.997545, 0.00365625],
        [-0.997748, -0.0114917, -0.0660821, -0.259888],
        [0, 0, 0, 1],
    ]
)

extrinsic_params = {"data": extrinsic_matrix}

# Debug: Check shapes and depth value ranges
print("Original RGB image shape:", test_data["left_image"].shape)
print("Depth image shape:", numpy_depth.shape)
print("Depth value range:", numpy_depth.min(), "to", numpy_depth.max())

# The RGB image needs to be properly formatted for Open3D
# It should be (height, width, 3) not (3, height, width)
rgb_image = test_data["left_image"]
if rgb_image.shape[0] == 3:  # If channels are first
    rgb_image = np.transpose(rgb_image, (1, 2, 0))  # Move channels to last
    
print("Corrected RGB image shape:", rgb_image.shape)

# Fix 1: Ensure depth is in millimeters and within reasonable range
# DepthPro outputs are in meters, convert to millimeters for Open3D
depth_mm = numpy_depth * 1000

# Fix 2: Clamp depth values to reasonable range (0.1m to 5m)
depth_mm = np.clip(depth_mm, 100, 20000)  # 0.1m to 5m in mm

# Fix 3: Ensure depth is uint16 format (preferred by Open3D)
depth_mm = depth_mm.astype(np.uint16)

print("Processed depth range:", depth_mm.min(), "to", depth_mm.max(), "mm")

# Call the function with corrected depth
points, colors = depth_rgb_to_pcd(
    depth=depth_mm, 
    intrinsic=intrinsic_params_rgb,
    extrinsic=extrinsic_params,
    rgb=numpy_image,
    depth_rgb_scale=1.0
)

In [None]:
points


In [None]:
colors

In [None]:
import rerun as rr
import matplotlib
cmap = matplotlib.colormaps["turbo_r"]
norm = matplotlib.colors.Normalize(
vmin=2.0,
vmax=15,
)
# Now we viz use rerun. 
rr.init("vita_toolkit")
rr.connect_grpc(url="rerun+http://127.0.0.1:9876/proxy")
# For point cloud 


In [None]:
from vita_toolkit.point_cloud.img_to_pc import pcd_to_camera_coordinate
from vita_toolkit.point_cloud.pc_to_bev import filter_pcd
t = np.array([-0.0613157 -0.0842516 -0.112423])
# pc_range = np.array([-6.4, -6.4, -2.0, 6.4, 6.4, 2.8])
#lcd = filter_pcd(test_data["point_clouds"][:, :3], pc_range)
before_lidar = test_data["point_clouds"][:, :3]
before_lidar[:, 1] = before_lidar[:, 1] * -1
lidar_pcd = pcd_to_camera_coordinate(before_lidar, extrinsic_params)


In [None]:
# transpose to corrent cooridnates 
# Current z is the y. Current y is x. Current x is z. 
# Transpose axes: z -> y, y -> x, x -> z
# Original lidar_pcd shape: (N, 3)
# New order: [2, 0, 1] (x, y, z) -> (z, x, y)
lidar_pcd = lidar_pcd[:, [2, 0, 1]]
lidar_pcd[:, 2] = lidar_pcd[:, 2] * -1

In [None]:
np.max(lidar_pcd[:, 2], axis=-1)

In [None]:
point_colors = cmap(norm(np.linalg.norm(lidar_pcd[:, :3], axis=1)))
rr.log(
    "/world/ego/lidar", 
    rr.Points3D(
        positions=lidar_pcd[:, :3], 
        colors=point_colors
    ),
)

In [None]:
# align with pcd 
from vita_toolkit.point_cloud.depht_lidar_matching import align_depth_lidar
scale_factor, valid_depth_pairs, final_lidar_depths = align_depth_lidar(numpy_depth, lidar_pcd[:, :3], intrinsic_params_rgb)

In [None]:
len(final_lidar_depths), len(valid_depth_pairs)

In [None]:
scale_factor

In [None]:
point_colors = cmap(norm(np.linalg.norm(points[:, :3], axis=1)))
points[:, 0] = points[:, 0] * -1
points[:, 1] = points[:, 1] * -1
rr.log(
    "/world/ego/depth_lidar", 
    rr.Points3D(
        positions=points[:, :3] * scale_factor,
        colors=point_colors
    ),
)

## Now project to BEV view 

In [None]:
pc_ranges = np.array([-6.4, -6.4, -2.0, 6.4, 6.4, 2.8])
voxel_size = np.array([0.2, 0.2, 8.0])
bev_size = project_pc_to_bev(points, pc_ranges, voxel_size, pc_ranges)

In [None]:
# Get the first batch and sum across z dimension to get 2D occupancy
occupancy_2d = np.sum(bev_size[0], axis=0)  # Shape: (ny, nx)

# Find non-zero (occupied) cells
occupied_indices = np.where(occupancy_2d > 0)

# Randomly sample from occupied cells
random_idx = np.random.randint(len(occupied_indices[0]))
y_idx, x_idx = occupied_indices[0][random_idx], occupied_indices[1][random_idx]

# Convert grid indices to world coordinates
x_world = pc_ranges[0] + (x_idx + 0.5) * voxel_size[0]
y_world = pc_ranges[1] + (y_idx + 0.5) * voxel_size[1]

In [None]:
x_world

In [None]:
y_world

In [None]:
# Import the new depth-lidar alignment functions
from vita_toolkit.point_cloud.depht_lidar_matching import align_depth_lidar, DepthLidarSolver, AlignmentResult
import matplotlib.pyplot as plt
import pandas as pd
import time

In [None]:
# Benchmark different depth-lidar alignment methods
print("=" * 60)
print("DEPTH-LIDAR ALIGNMENT BENCHMARK")
print("=" * 60)

# Define methods to benchmark
methods = [
    {"name": "Least Squares", "method": "least_squares"},
    {"name": "Median Ratio", "method": "median_ratio"},
    {"name": "Robust Least Squares (Huber)", "method": "robust_least_squares", "robust_loss": "huber"},
    {"name": "Robust Least Squares (Soft L1)", "method": "robust_least_squares", "robust_loss": "soft_l1"},
    {"name": "RANSAC", "method": "ransac"},
    {"name": "Adaptive Scaling", "method": "adaptive_neighborhood"}
]

# Store results for comparison
benchmark_results = []

for method_config in methods:
    method_name = method_config["name"]
    method = method_config["method"]
    
    print(f"\n🔍 Testing {method_name}...")
    
    # Time the alignment
    start_time = time.time()
    

    # Create solver with reasonable parameters
    solver = DepthLidarSolver(outlier_threshold=0.2, max_iterations=100)
    
    # Prepare solver arguments
    solver_args = {
        "depth_map": numpy_depth,
        "lidar_points": lidar_pcd,
        "intrinsic_params": intrinsic_params_rgb,
        "method": method
    }
    
    # Add robust loss if specified
    if "robust_loss" in method_config:
        solver_args["robust_loss"] = method_config["robust_loss"]
    
    # Run alignment
    result = solver.solve(**solver_args)
    
    # Calculate execution time
    execution_time = time.time() - start_time
    
    # Store results
    benchmark_results.append({
        "Method": method_name,
        "Scale Factor": result.scale_factor,
        "RMSE": result.rmse,
        "Mean Error": result.mean_error,
        "Median Error": result.median_error,
        "Std Error": result.std_error,
        "Correspondences": result.num_correspondences,
        "Outlier Ratio": result.outlier_ratio,
        "Converged": result.converged,
        "Iterations": result.iterations,
        "Time (s)": execution_time
    })
    
    print(f"   ✅ Scale factor: {result.scale_factor:.4f}")
    print(f"   📊 RMSE: {result.rmse:.4f}")
    print(f"   🎯 Mean error: {result.mean_error:.4f}")
    print(f"   📈 Outlier ratio: {result.outlier_ratio:.2%}")
    print(f"   🔢 Correspondences: {result.num_correspondences}")
    print(f"   ⏱️ Time: {execution_time:.4f}s")

print("\n" + "=" * 60)
print("BENCHMARK SUMMARY")
print("=" * 60)

In [None]:
# Create DataFrame for better visualization
df_results = pd.DataFrame(benchmark_results)
print(df_results.to_string(index=False))

# Find best method based on RMSE
successful_results = df_results[df_results['RMSE'].notna()]
if not successful_results.empty:
    best_method = successful_results.loc[successful_results['RMSE'].idxmin()]
    print(f"\n🏆 Best method: {best_method['Method']} (RMSE: {best_method['RMSE']:.4f})")
    
    # Use best method for further analysis
    best_solver = DepthLidarSolver(outlier_threshold=0.2, max_iterations=100)
    best_result = best_solver.solve(
        depth_map=numpy_depth,
        lidar_points=lidar_pcd,
        intrinsic_params=intrinsic_params_rgb,
        method=best_method['Method'].lower().replace(' ', '_').replace('(', '').replace(')', '').replace('huber', 'robust_least_squares')
    )
    
    print(f"\n📈 Detailed metrics for {best_method['Method']}:")
    metrics = best_solver.evaluate_alignment(best_result)
    for key, value in metrics.items():
        print(f"   {key}: {value:.4f}")
else:
    print("\n❌ No successful alignments found!")