In [None]:
import numpy as np
import py4dgeo
import sys

import matplotlib.pyplot as plt

py4dgeo.set_interactive_backend("vtk")

# Function to create synthetic test epochs:

In [None]:
import numpy as np

def random_rotation_matrix():
    """Generate a random 3×3 rotation matrix using QR decomposition."""
    H = np.random.randn(3, 3)
    Q, R = np.linalg.qr(H)
    # Ensure right-handed orientation
    if np.linalg.det(Q) < 0:
        Q[:, 0] = -Q[:, 0]
    return Q

def write_las(outpoints, outfilepath, normals=None, segment_id=None, attribute_dict={}):
    import laspy
    import numpy as np

    """
    Write LAS/LAZ file with coords, optional normals, and extra attributes.
    """

    hdr = laspy.LasHeader(version="1.4", point_format=6)
    las = laspy.LasData(hdr)

    # coords
    las.x = outpoints[:, 0]
    las.y = outpoints[:, 1]
    las.z = outpoints[:, 2]

    # normals (as defined by common LAS ExtraBytes convention)
    if normals is not None:
        normals = np.asarray(normals, dtype=np.float32)
        if normals.shape[1] != 3:
            raise ValueError("Normals must have shape (N,3)")
        for i, comp in enumerate(["NX", "NY", "NZ"]):
            las.add_extra_dim(laspy.ExtraBytesParams(name=comp, type=np.float32))
            las[comp] = normals[:, i]

    # segment_id into point_source_id
    if segment_id is not None:
        seg = np.asarray(segment_id, dtype=np.uint32)
        if seg.max() > 65535:
            raise ValueError("LAS point_source_id supports only 0–65535")
        las.point_source_id = seg.astype(np.uint16)

    # other attributes
    for key, vals in attribute_dict.items():
        vals = np.asarray(vals)
        if key in las.point_format.dimension_names:
            las[key] = vals
        else:
            dtype = np.float32 if np.issubdtype(vals.dtype, np.floating) else np.int32
            las.add_extra_dim(laspy.ExtraBytesParams(name=key, type=dtype))
            las[key] = vals.astype(dtype)

    las.write(outfilepath)
    return



def make_test_epochs(n_planes=5,
                     points_per_side=20,
                     shift=(0.5, -0.5, 0.5),
                     noise=0.0,
                     file_prefix="epoch",
                     unambiguous=True):
    """
    Create two synthetic Epochs with randomly rotated planes.

    unambiguous=True is a paramter to determine whether segment IDs across all epochs should be unique (True) or if segment IDs start at 0 for each epoch.
    """

    xyz0_list, xyz1_list = [], []
    seg0_list, seg1_list = [], []
    normals0_list, normals1_list = [], []

    for i in range(n_planes):
        # start with a default normal
        n = np.array([0.0, 0.0, 1.0])

        # build orthonormal basis
        u = np.array([1.0, 0.0, 0.0])
        v = np.cross(n, u)

        # stack basis and rotate
        basis = np.column_stack([u, v, n])  # (3,3)
        R = random_rotation_matrix()
        u_rot, v_rot, n_rot = R @ basis

        # grid of points
        grid = np.linspace(-1, 1, points_per_side)
        uu, vv = np.meshgrid(grid, grid)
        pts = uu[..., None] * u_rot + vv[..., None] * v_rot + 5 * i * n_rot
        pts = pts.reshape(-1, 3)

        # noise (isotropic jitter)
        if noise > 0:
            pts += np.random.normal(scale=noise, size=pts.shape)

        # always add tiny thickness along normal to avoid PCA degeneracy
        pts += np.random.normal(scale=1e-6, size=(pts.shape[0], 1)) * n_rot

        pts_shift = pts + np.array(shift, dtype=float)

        xyz0_list.append(pts)
        xyz1_list.append(pts_shift)
        seg0_list.append(np.full(pts.shape[0], i, dtype=np.int64))
        if unambiguous == True:
            seg1_list.append(np.full(pts.shape[0], i + n_planes, dtype=np.int64))
        else:
            seg1_list.append(np.full(pts.shape[0], i, dtype=np.int64))
        normals0_list.append(np.tile(n_rot, (pts.shape[0], 1)))
        normals1_list.append(np.tile(n_rot, (pts.shape[0], 1)))

    # Concatenate
    xyz0, xyz1 = np.vstack(xyz0_list), np.vstack(xyz1_list)
    seg0, seg1 = np.concatenate(seg0_list), np.concatenate(seg1_list)
    normals0, normals1 = np.vstack(normals0_list), np.vstack(normals1_list)

    # Structured arrays
    dtype = [
        ("segment_id", np.int64),
        ("N_x", np.float64),
        ("N_y", np.float64),
        ("N_z", np.float64),
    ]
    add0 = np.zeros(xyz0.shape[0], dtype=dtype)
    add0["segment_id"], add0["N_x"], add0["N_y"], add0["N_z"] = seg0, normals0[:, 0], normals0[:, 1], normals0[:, 2]
    add1 = np.zeros(xyz1.shape[0], dtype=dtype)
    add1["segment_id"], add1["N_x"], add1["N_y"], add1["N_z"] = seg1, normals1[:, 0], normals1[:, 1], normals1[:, 2]

    epoch0 = py4dgeo.Epoch(xyz0, additional_dimensions=add0)
    epoch1 = py4dgeo.Epoch(xyz1, additional_dimensions=add1)

    # Save epochs to .xyz (7 space-separated columns)
    data0 = np.column_stack([xyz0, seg0.reshape(-1, 1), normals0])
    data1 = np.column_stack([xyz1, seg1.reshape(-1, 1), normals1])
    np.savetxt(f"{file_prefix}0.xyz", data0, delimiter=" ", fmt="%.6f")
    np.savetxt(f"{file_prefix}1.xyz", data1, delimiter=" ", fmt="%.6f")

    # Save epochs to .las
    write_las(xyz0,f"{file_prefix}0.las", normals=normals0,segment_id=seg0)
    write_las(xyz1,f"{file_prefix}1.las", normals=normals1,segment_id=seg1)

    # Save extended_y.csv with known correspondences
    # Format: epoch0_segment_id, epoch1_segment_id, label

    n_train = max(1, int(0.7 * n_planes))   # 10% of planes, at least 1

    train_ids = np.arange(n_train)          # first 10% → training
    apply_ids = np.arange(n_train, n_planes)  # rest → left for application

    # positives for training
    positives = [[i, i + n_planes, 1] for i in train_ids]

    # negatives for training (simple wrong matches)
    negatives = []
    for i in train_ids:
        wrong_j = (i + 1) % n_planes
        if wrong_j in train_ids:   # keep negatives inside training subset
            negatives.append([i, wrong_j + n_planes, 0])

    correspondences = np.array(positives + negatives, dtype=int)

    np.savetxt(f"{file_prefix}_extended_y.csv", correspondences, delimiter=",", fmt="%d")

    print(f"Training correspondences saved for {len(train_ids)} planes.")
    print(f"Remaining {len(apply_ids)} planes reserved for application/testing.")

    print(f"Saved {file_prefix}0.xyz, {file_prefix}1.xyz and {file_prefix}_extended_y.csv")

    return epoch0, epoch1


In [None]:
# Generate synthetic test data
epoch0_synth, epoch1_synth = make_test_epochs(n_planes=100, shift=(0.3, -0.2, 0.1), noise=0.01,unambiguous=True)
print("Epoch0 points:", epoch0_synth.cloud.shape)
print("Epoch1 points:", epoch1_synth.cloud.shape)
print("Epoch0 unique IDs:", np.unique(epoch0_synth.additional_dimensions["segment_id"]))
print("Epoch1 unique IDs:", np.unique(epoch1_synth.additional_dimensions["segment_id"]))

# plot the synthetic data
fig = plt.figure(figsize=(8,6))
ax = fig.add_subplot(111, projection="3d")

# flatten (N,1) → (N,)
seg0 = epoch0_synth.additional_dimensions['segment_id'].ravel()
seg1 = epoch1_synth.additional_dimensions['segment_id'].ravel()

all_ids = np.unique(np.concatenate([seg0, seg1]))
n_ids = len(all_ids)

# discrete colormap with n_ids colors
cmap = plt.get_cmap("tab20", n_ids)

sc0 = ax.scatter(epoch0_synth.cloud[:,0],
                 epoch0_synth.cloud[:,1],
                 epoch0_synth.cloud[:,2],
                 c=seg0, cmap=cmap,
                 marker="o", alpha=0.6, label="Epoch0")

sc1 = ax.scatter(epoch1_synth.cloud[:,0],
                 epoch1_synth.cloud[:,1],
                 epoch1_synth.cloud[:,2],
                 c=seg1, cmap=cmap,
                 marker="x", alpha=0.6, label="Epoch1")

ax.set_xlabel("X")
ax.set_ylabel("Y")
ax.set_zlabel("Z")
ax.set_title("Synthetic planes with unique segment IDs")

# Colorbar with ticks at actual IDs
cbar = fig.colorbar(sc1, ax=ax, shrink=0.6, ticks=all_ids)
cbar.set_label("Segment ID")

plt.show()


In [None]:
epoch0 = py4dgeo.epoch.read_from_xyz(
    'epoch0.xyz',
    additional_dimensions={3: "segment_id", 4: "N_x", 5: "N_y", 6: "N_z"},
    delimiter=" "
)
epoch1 = py4dgeo.epoch.read_from_xyz(
    'epoch1.xyz',
    additional_dimensions={3: "segment_id", 4: "N_x", 5: "N_y", 6: "N_z"},
    delimiter=" "
)

In [None]:
import numpy as np
data = np.genfromtxt("epoch0.xyz", delimiter=" ")
seg_ids, counts = np.unique(data[:,3], return_counts=True)
print("Segment IDs:", seg_ids)
print("Counts per segment:", counts)
print("Any empty segment?", np.any(counts == 0))

TESTING BLOCKS:

In [None]:
seg0 = epoch0.additional_dimensions["segment_id"].ravel()
print("Epoch0 segments and counts:", np.unique(seg0, return_counts=True))

# Test of pbm3c2_test.py

In [None]:
epoch0 = py4dgeo.epoch.read_from_xyz(
    'epoch0.xyz',
    additional_dimensions={3: "segment_id", 4: "N_x", 5: "N_y", 6: "N_z"},
    delimiter=" "
)
epoch1 = py4dgeo.epoch.read_from_xyz(
    'epoch1.xyz',
    additional_dimensions={3: "segment_id", 4: "N_x", 5: "N_y", 6: "N_z"},
    delimiter=" "
)

In [None]:
from pbm3c2_test import PBM3C2
import numpy as np

n_planes = 100
n_train = int(0.7 * n_planes)
train_ids = np.arange(n_train)
apply_ids = np.arange(n_train, n_planes) #70-99

pbm3c2_algorithm = PBM3C2(reg_error=0.01)

correspondences_df = pbm3c2_algorithm.compute(
    epoch0=epoch0,
    epoch1=epoch1,
    correspondences_file='epoch_extended_y.csv',
    apply_ids=apply_ids,
    search_radius=5.0  
)

print(correspondences_df.head())

distances = correspondences_df['distance']
uncertainties = correspondences_df['uncertainty']