Skip to content

Commit

Permalink
Merge 1bf3799 into b791423
Browse files Browse the repository at this point in the history
  • Loading branch information
rhugonnet committed Jun 7, 2024
2 parents b791423 + 1bf3799 commit b4f1d14
Show file tree
Hide file tree
Showing 4 changed files with 502 additions and 284 deletions.
3 changes: 2 additions & 1 deletion doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@

# For myst-nb to find the Jupyter kernel (=environment) to run from
nb_kernel_rgx_aliases = {".*xdem.*": "python3"}
nb_execution_raise_on_error = True
nb_execution_raise_on_error = True # To fail documentation build on notebook execution error
nb_execution_show_tb = True # To show full traceback on notebook execution error

# autosummary_generate = True

Expand Down
2 changes: 2 additions & 0 deletions doc/source/coregistration.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
---
file_format: mystnb
mystnb:
execution_timeout: 90
jupytext:
formats: md:myst
text_representation:
Expand Down
277 changes: 166 additions & 111 deletions tests/test_coreg/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,26 @@
import numpy as np
import pandas as pd
import pytest
import pytransform3d.rotations
import rasterio as rio
from geoutils import Raster, Vector
from geoutils.raster import RasterType
from scipy.ndimage import binary_dilation

import xdem
from xdem import coreg, examples, misc, spatialstats
from xdem._typing import NDArrayf
from xdem.coreg.base import Coreg, _apply_matrix_rst
from xdem.coreg.base import Coreg, apply_matrix


def load_examples() -> tuple[RasterType, RasterType, Vector]:
"""Load example files to try coregistration methods with."""

reference_raster = Raster(examples.get_path("longyearbyen_ref_dem"))
to_be_aligned_raster = Raster(examples.get_path("longyearbyen_tba_dem"))
reference_dem = Raster(examples.get_path("longyearbyen_ref_dem"))
to_be_aligned_dem = Raster(examples.get_path("longyearbyen_tba_dem"))
glacier_mask = Vector(examples.get_path("longyearbyen_glacier_outlines"))

return reference_raster, to_be_aligned_raster, glacier_mask
return reference_dem, to_be_aligned_dem, glacier_mask


def assert_coreg_meta_equal(input1: Any, input2: Any) -> bool:
Expand Down Expand Up @@ -889,116 +891,169 @@ def test_blockwise_coreg_large_gaps(self) -> None:
assert np.nanstd(ddem_pre) > np.nanstd(ddem_post)


def test_apply_matrix() -> None:
class TestAffineManipulation:

ref, tba, outlines = load_examples() # Load example reference, to-be-aligned and mask.
ref_arr = gu.raster.get_array_and_mask(ref)[0]

# Test only vertical shift (it should just apply the vertical shift and not make anything else)
vshift = 5
matrix = np.diag(np.ones(4, float))
matrix[2, 3] = vshift
transformed_dem = _apply_matrix_rst(ref_arr, ref.transform, matrix)
reverted_dem = transformed_dem - vshift

# Check that the reverted DEM has the exact same values as the initial one
# (resampling is not an exact science, so this will only apply for vertical shift corrections)
assert np.nanmedian(reverted_dem) == np.nanmedian(np.asarray(ref.data))

# Synthesize a shifted and vertically offset DEM
pixel_shift = 11
vshift = 5
shifted_dem = ref_arr.copy()
shifted_dem[:, pixel_shift:] = shifted_dem[:, :-pixel_shift]
shifted_dem[:, :pixel_shift] = np.nan
shifted_dem += vshift

matrix = np.diag(np.ones(4, dtype=float))
matrix[0, 3] = pixel_shift * tba.res[0]
matrix[2, 3] = -vshift

transformed_dem = _apply_matrix_rst(shifted_dem, ref.transform, matrix, resampling="bilinear")
diff = np.asarray(ref_arr - transformed_dem)

# Check that the median is very close to zero
assert np.abs(np.nanmedian(diff)) < 0.01
# Check that the NMAD is low
assert spatialstats.nmad(diff) < 0.01

def rotation_matrix(rotation: float = 30) -> NDArrayf:
rotation = np.deg2rad(rotation)
matrix = np.array(
[
[1, 0, 0, 0],
[0, np.cos(rotation), -np.sin(rotation), 0],
[0, np.sin(rotation), np.cos(rotation), 0],
[0, 0, 0, 1],
]
)
return matrix

rotation = 4
centroid = (
np.mean([ref.bounds.left, ref.bounds.right]),
np.mean([ref.bounds.top, ref.bounds.bottom]),
ref.data.mean(),
)
rotated_dem = _apply_matrix_rst(ref.data.squeeze(), ref.transform, rotation_matrix(rotation), centroid=centroid)
# Make sure that the rotated DEM is way off, but is centered around the same approximate point.
assert np.abs(np.nanmedian(rotated_dem - ref.data.data)) < 1
assert spatialstats.nmad(rotated_dem - ref.data.data) > 500

# Apply a rotation in the opposite direction
unrotated_dem = (
_apply_matrix_rst(rotated_dem, ref.transform, rotation_matrix(-rotation * 0.99), centroid=centroid) + 4.0
) # TODO: Check why the 0.99 rotation and +4 vertical shift were introduced.

diff = np.asarray(ref.data.squeeze() - unrotated_dem)

# if False:
# import matplotlib.pyplot as plt
#
# vmin = 0
# vmax = 1500
# extent = (ref.bounds.left, ref.bounds.right, ref.bounds.bottom, ref.bounds.top)
# plot_params = dict(
# extent=extent,
# vmin=vmin,
# vmax=vmax
# )
# plt.figure(figsize=(22, 4), dpi=100)
# plt.subplot(151)
# plt.title("Original")
# plt.imshow(ref.data.squeeze(), **plot_params)
# plt.xlim(*extent[:2])
# plt.ylim(*extent[2:])
# plt.subplot(152)
# plt.title(f"Rotated {rotation} degrees")
# plt.imshow(rotated_dem, **plot_params)
# plt.xlim(*extent[:2])
# plt.ylim(*extent[2:])
# plt.subplot(153)
# plt.title(f"De-rotated {-rotation} degrees")
# plt.imshow(unrotated_dem, **plot_params)
# plt.xlim(*extent[:2])
# plt.ylim(*extent[2:])
# plt.subplot(154)
# plt.title("Original vs. de-rotated")
# plt.imshow(diff, extent=extent, vmin=-10, vmax=10, cmap="coolwarm_r")
# plt.colorbar()
# plt.xlim(*extent[:2])
# plt.ylim(*extent[2:])
# plt.subplot(155)
# plt.title("Original vs. de-rotated")
# plt.hist(diff[np.isfinite(diff)], bins=np.linspace(-10, 10, 100))
# plt.tight_layout(w_pad=0.05)
# plt.show()

# Check that the median is very close to zero
assert np.abs(np.nanmedian(diff)) < 0.5
# Check that the NMAD is low
assert spatialstats.nmad(diff) < 5
print(np.nanmedian(diff), spatialstats.nmad(diff))
# Identity transformation
matrix_identity = np.diag(np.ones(4, float))

# Vertical shift
matrix_vertical = matrix_identity.copy()
matrix_vertical[2, 3] = 1

# Vertical and horizontal shifts
matrix_translations = matrix_identity.copy()
matrix_translations[:3, 3] = [0.5, 1, 1.5]

# Single rotation
rotation = np.deg2rad(5)
matrix_rotations = matrix_identity.copy()
matrix_rotations[1, 1] = np.cos(rotation)
matrix_rotations[2, 2] = np.cos(rotation)
matrix_rotations[2, 1] = -np.sin(rotation)
matrix_rotations[1, 2] = np.sin(rotation)

# Mix of translations and rotations in all axes (X, Y, Z) simultaneously
rotation_x = 5
rotation_y = 10
rotation_z = 3
e = np.deg2rad(np.array([rotation_x, rotation_y, rotation_z]))
# This is a 3x3 rotation matrix
rot_matrix = pytransform3d.rotations.matrix_from_euler(e=e, i=0, j=1, k=2, extrinsic=True)
matrix_all = matrix_rotations.copy()
matrix_all[0:3, 0:3] = rot_matrix
matrix_all[:3, 3] = [0.5, 1, 1.5]

list_matrices = [matrix_identity, matrix_vertical, matrix_translations, matrix_rotations, matrix_all]

@pytest.mark.parametrize("matrix", list_matrices) # type: ignore
def test_apply_matrix__points_opencv(self, matrix: NDArrayf) -> None:
"""
Test that apply matrix's exact transformation for points (implemented with NumPy matrix multiplication)
is exactly the same as the one of OpenCV (optional dependency).
"""

# Create random points
points = np.random.default_rng(42).normal(size=(10, 3))

# Convert to a geodataframe and use apply_matrix for the point cloud
epc = gpd.GeoDataFrame(data={"z": points[:, 2]}, geometry=gpd.points_from_xy(x=points[:, 0], y=points[:, 1]))
trans_epc = apply_matrix(epc, matrix=matrix)

# Run the same operation with openCV
import cv2

trans_cv2_arr = cv2.perspectiveTransform(points[:, :].reshape(1, -1, 3), matrix)[0, :, :]

# Transform point cloud back to array
trans_numpy = np.array([trans_epc.geometry.x.values, trans_epc.geometry.y.values, trans_epc["z"].values]).T
assert np.allclose(trans_numpy, trans_cv2_arr)

@pytest.mark.parametrize("regrid_method", [None, "iterative", "griddata"]) # type: ignore
@pytest.mark.parametrize("matrix", list_matrices) # type: ignore
def test_apply_matrix__raster(self, regrid_method: None | str, matrix: NDArrayf) -> None:
"""Test that apply matrix gives consistent results between points and rasters (thus validating raster
implementation, as point implementation is validated above), for all possible regridding methods."""

# Create a synthetic raster and convert to point cloud
# dem = gu.Raster(self.ref)
dem_arr = np.linspace(0, 2, 25).reshape(5, 5)
transform = rio.transform.from_origin(0, 5, 1, 1)
dem = gu.Raster.from_array(dem_arr, transform=transform, crs=4326, nodata=100)
epc = dem.to_pointcloud(data_column_name="z").ds

# If a centroid was not given, default to the center of the DEM (at Z=0).
centroid = (np.mean(epc.geometry.x.values), np.mean(epc.geometry.y.values), 0.0)

# Apply affine transformation to both datasets
trans_dem = apply_matrix(dem, matrix=matrix, centroid=centroid, force_regrid_method=regrid_method)
trans_epc = apply_matrix(epc, matrix=matrix, centroid=centroid)

# Interpolate transformed DEM at coordinates of the transformed point cloud
# Because the raster created as a constant slope (plan-like), the interpolated values should be very close
z_points = trans_dem.interp_points(points=(trans_epc.geometry.x.values, trans_epc.geometry.y.values))
valids = np.isfinite(z_points)
assert np.count_nonzero(valids) > 0
assert np.allclose(z_points[valids], trans_epc.z.values[valids], rtol=10e-5)

def test_apply_matrix__raster_nodata(self) -> None:
"""Test the nodatas created by apply_matrix are consistent between methods"""

# Use matrix with all transformations
matrix = self.matrix_all

# Create a synthetic raster, add NaNs, and convert to point cloud
dem_arr = np.linspace(0, 2, 400).reshape(20, 20)
dem_arr[10:14, 10:14] = np.nan
dem_arr[5, 5] = np.nan
dem_arr[:2, :] = np.nan
transform = rio.transform.from_origin(0, 5, 1, 1)
dem = gu.Raster.from_array(dem_arr, transform=transform, crs=4326, nodata=100)
epc = dem.to_pointcloud(data_column_name="z").ds

centroid = (np.mean(epc.geometry.x.values), np.mean(epc.geometry.y.values), 0.0)

trans_dem_it = apply_matrix(dem, matrix=matrix, centroid=centroid, force_regrid_method="iterative")
trans_dem_gd = apply_matrix(dem, matrix=matrix, centroid=centroid, force_regrid_method="griddata")

# Get nodata mask
mask_nodata_it = trans_dem_it.data.mask
mask_nodata_gd = trans_dem_gd.data.mask

# The iterative mask should be larger and contain the other (as griddata interpolates up to 1 pixel away)
assert np.array_equal(np.logical_or(mask_nodata_gd, mask_nodata_it), mask_nodata_it)

# Verify nodata masks are located within two pixels of each other (1 pixel can be added by griddata,
# and 1 removed by regular-grid interpolation by the iterative method)
smallest_mask = ~binary_dilation(
~mask_nodata_it, iterations=2
) # Invert before dilate to avoid spreading at the edges
# All smallest mask value should exist in the mask of griddata
assert np.array_equal(np.logical_or(smallest_mask, mask_nodata_gd), mask_nodata_gd)

def test_apply_matrix__raster_realdata(self) -> None:
"""Testing real data no complex matrix only to avoid all loops"""

# Use real data
dem = self.ref
dem.crop((dem.bounds.left, dem.bounds.bottom, dem.bounds.left + 2000, dem.bounds.bottom + 2000))
epc = dem.to_pointcloud(data_column_name="z").ds

# Only testing complex matrices for speed
matrix = self.matrix_all

# If a centroid was not given, default to the center of the DEM (at Z=0).
centroid = (np.mean(epc.geometry.x.values), np.mean(epc.geometry.y.values), 0.0)

# Apply affine transformation to both datasets
trans_dem_it = apply_matrix(dem, matrix=matrix, centroid=centroid, force_regrid_method="iterative")
trans_dem_gd = apply_matrix(dem, matrix=matrix, centroid=centroid, force_regrid_method="griddata")
trans_epc = apply_matrix(epc, matrix=matrix, centroid=centroid)

# Interpolate transformed DEM at coordinates of the transformed point cloud, and check values are very close
z_points_it = trans_dem_it.interp_points(points=(trans_epc.geometry.x.values, trans_epc.geometry.y.values))
z_points_gd = trans_dem_gd.interp_points(points=(trans_epc.geometry.x.values, trans_epc.geometry.y.values))

valids = np.logical_and(np.isfinite(z_points_it), np.isfinite(z_points_gd))
assert np.count_nonzero(valids) > 0

diff_it = z_points_it[valids] - trans_epc.z.values[valids]
diff_gd = z_points_gd[valids] - trans_epc.z.values[valids]

# Because of outliers, noise and slope near 90°, several solutions can exist
# Additionally, contrary to the check in the __raster test which uses a constant slope DEM, the slopes vary
# here so the interpolation check is less accurate so all values can vary a bit
assert np.percentile(np.abs(diff_it), 90) < 1
assert np.percentile(np.abs(diff_it), 50) < 0.2
assert np.percentile(np.abs(diff_gd), 90) < 1
assert np.percentile(np.abs(diff_gd), 50) < 0.2

# But, between themselves, the two re-gridding methods should yield much closer results
# (no errors due to 2D interpolation for checking)
diff_it_gd = z_points_gd[valids] - z_points_it[valids]
assert np.percentile(np.abs(diff_it_gd), 99) < 1 # 99% of values are within a meter (instead of 90%)
assert np.percentile(np.abs(diff_it_gd), 50) < 0.02 # 10 times more precise than above


def test_warp_dem() -> None:
Expand Down
Loading

0 comments on commit b4f1d14

Please sign in to comment.