In [None]:
from pathlib import Path
import dask
import dask.array as da
import numpy as np
import open3d as o3d
import shutil


def load_pcd(pcd_path: Path) -> np.ndarray:
    return np.asarray(o3d.io.read_point_cloud(str(pcd_path)).points).astype(np.float32)


def write_pcd(
    xyz: np.ndarray,
    path: Path,
) -> bool:
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(xyz)
    o3d.io.write_point_cloud(str(path), pcd)
    return True


def lazy_load_pcd_stack(pcd_list):
    # load_pcd the first pcd (assume rest are same shape/dtype)
    sample = load_pcd(pcd_list[0])

    # Build a list of lazy dask arrays
    arrays = [
        da.from_delayed(
            dask.delayed(load_pcd)(path),
            dtype=sample.dtype,
            shape=sample.shape,
        )
        for path in pcd_list
    ]
    # Stack all point coordinates into a 3D dask array
    stack = da.stack(arrays, axis=0)
    stack = stack.rechunk()
    return stack


def load_pcd_stack(pcd_list):
    arrays = [load_pcd(path) for path in pcd_list]
    stack = np.stack(arrays, axis=0)
    return stack


def generate_random_data(directory, npcd, npoints, noise_std):
    directory = Path(directory)
    if directory.exists():
        shutil.rmtree(directory)
    directory.mkdir(parents=True)

    def random_points3d_uniform(
        npoints=1000,
        xlim=(100, 200),
        ylim=(100, 200),
        zlim=(0, 1),
        datatype=np.float32,
    ):
        rng = np.random.default_rng(seed=0)
        x = rng.uniform(xlim[0], xlim[1], npoints)
        y = rng.uniform(ylim[0], ylim[1], npoints)
        z = rng.uniform(zlim[0], zlim[1], npoints)
        return np.stack([x, y, z], axis=1).astype(datatype)

    def generate_simulated_pcd(pcd_path, ref_pcd_path, noise_std):
        rng = np.random.default_rng()
        xyz = load_pcd(ref_pcd_path)
        # add noise
        # x = xyz[:, 0]
        # y = xyz[:, 1]
        # z = xyz[:, 2]
        # x += rng.normal(0, noise_std, x.shape)
        # y += rng.normal(0, noise_std, y.shape)
        # z += rng.normal(0, noise_std, z.shape)
        # xyz = np.stack([x, y, z], axis=1)
        xyz += rng.normal(0, noise_std, xyz.shape)
        return write_pcd(xyz, pcd_path)

    # generate random reference pcd
    ref_pcd_path = directory / "sparse_pts_reference.ply"
    xyz = random_points3d_uniform(npoints)
    write_pcd(xyz, ref_pcd_path)

    # generate random pcds starting from reference and adding noise
    output_directory = directory / "Monte_Carlo_output"
    output_directory.mkdir()

    if npcd > 1000:
        delayed_tasks = []
        for i in range(npcd):
            path = output_directory / f"{i:04}_pts.ply"
            generate_simulated_pcd(path, ref_pcd_path, noise_std)

            delayed_tasks.append(
                dask.delayed(generate_simulated_pcd)(path, ref_pcd_path, noise_std)
            )

        dask.compute(*delayed_tasks)
    else:
        for i in range(npcd):
            path = output_directory / f"{i:04}_pts.ply"
            generate_simulated_pcd(path, ref_pcd_path, noise_std)


generate_random_pcd = False
num_pcd = 1000
num_points = 100000
noise_std = 0.01

# Generate Random point cloud as test data
if generate_random_pcd:
    pcd_dir = Path("data/test")
    generate_random_data(pcd_dir, num_pcd, num_points, noise_std)
    print("Generated random point cloud data")

In [None]:
# pcd_dir = Path("data/test")
# pcd_dir = Path("data/rossia")
pcd_dir = Path("data/calib")
pcd_ext = "ply"
cov_ddof = 1

# Get pcd list
pcd_dir = pcd_dir / "Monte_Carlo_output"
pcd_list = sorted(list(pcd_dir.glob(f"*.{pcd_ext}")))
print(f"Found {len(pcd_list)} pointclouds in {pcd_dir}")

ref_pcd_path = pcd_dir.parent / "sparse_pts_reference.ply"
if ref_pcd_path.exists():
    ref_pcd = load_pcd(ref_pcd_path)
    print(f"Loaded reference pointcloud from {ref_pcd_path}")
else:
    ref_pcd = load_pcd(pcd_list[0])
    print("Reference pointcloud not found, using first pointcloud as reference")

# Build a lazy dask array of all pointclouds
stack = lazy_load_pcd_stack(pcd_list)
stack

In [None]:
# Load the data and compute the mean and std with dask
operations = [
    stack.mean(axis=0),
    stack.std(axis=0, ddof=cov_ddof),
]
mean, std = dask.compute(*operations)
print(mean)
print(std)

In [None]:
# Load all the data and compute the mean and std with numpy
# stack = load_pcd_stack(pcd_list)
# mean = stack.mean(axis=0)
# std = stack.std(axis=0, ddof=cov_ddof)
# print(mean) 
# print(std)

In [None]:
# Compute difference between reference and mean coordinates
ref_diff = np.mean(ref_pcd - mean, axis=0)
print(f"{ref_diff}")

In [None]:
# Make plot of the mean coordinates of the point colored with the standard deviation of the coordinates

from matplotlib import pyplot as plt

point_size = 1
scale_fct = 1000

fig, axes = plt.subplots(1, 3, figsize=(15, 5))
plt_x = axes[0].scatter(mean[:, 0], mean[:, 1], s=point_size, c=std[:, 0] * scale_fct)
axes[0].set_title("Precision X [mm]")
axes[0].set_xlabel("X")
axes[0].set_ylabel("Y")
fig.colorbar(plt_x)

plt_y = axes[1].scatter(mean[:, 0], mean[:, 1], s=point_size, c=std[:, 1] * scale_fct)
axes[1].set_title("Precision Y [mm]")
axes[1].set_xlabel("X")
axes[1].set_ylabel("Y")
fig.colorbar(plt_y)

plt_z = axes[2].scatter(mean[:, 0], mean[:, 1], s=point_size, c=std[:, 2] * scale_fct)
axes[2].set_title("Precision Z [mm]")
axes[2].set_xlabel("X")
axes[2].set_ylabel("Y")
fig.colorbar(plt_z)

fig.tight_layout()

plt.show()

In [None]:
from thirdparty import transformations as tf

T = tf.affine_matrix_from_points(
    mean.T, ref_pcd.T, shear=False, scale=True, usesvd=True
)
scale, _, angles, translation, _ = tf.decompose_matrix(T)
scale_percent = scale.mean() - 1
angles_deg = np.rad2deg(angles)

print(f"Translation: {translation*1000} mm")
print(f"Angles: {angles_deg} deg")
print(f"Scale: {scale_percent:.6}%")

In [None]:
# Compute covariance matrix with numpy (note that all the pcd are loaded in memory at once here!)
def compute_covariance(points):
    return np.cov(points, rowvar=False, ddof=cov_ddof)


np_stack = np.array(stack)
np_cov = [compute_covariance(np_stack[:, i, :]) for i in range(np_stack.shape[1])]
np.sqrt(np_cov[0].diagonal()) - std[0]

In [None]:
# # Compute covariance matrix with dask
# delayed_dask_cov = [
#     dask.delayed(compute_covariance)(stack[:, i, :]) for i in range(stack.shape[1])
# ]
# dask_cov = da.compute(*delayed_dask_cov)
# print(np.sqrt(np.diag(np.array(dask_cov[0]))) - std[0])