Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-structure AffineCoreg subclasses to be more modular #530

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 98 additions & 23 deletions xdem/coreg/affine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

import warnings
from typing import Any, Callable, TypeVar
from typing import Any, Callable, TypeVar, Iterable

import xdem.coreg.base

Expand Down Expand Up @@ -34,6 +34,7 @@
deramping,
)
from xdem.spatialstats import nmad
from xdem.fit import polynomial_2d

try:
import pytransform3d.transformations
Expand Down Expand Up @@ -73,6 +74,39 @@ def apply_xy_shift(transform: rio.transform.Affine, dx: float, dy: float) -> rio
# Functions for affine coregistrations
######################################

def _check_inputs_bin_before_fit(bin_before_fit, fit_optimizer, bin_sizes, bin_statistic):
"""
Check input types of fit or bin_and_fit affine functions.

:param bin_before_fit: Whether to bin data before fitting the coregistration function.
:param fit_optimizer: Optimizer to minimize the coregistration function.
:param bin_sizes: Size (if integer) or edges (if iterable) for binning variables later passed in .fit().
:param bin_statistic: Statistic of central tendency (e.g., mean) to apply during the binning.
"""

if not callable(fit_optimizer):
raise TypeError(
"Argument `fit_optimizer` must be a function (callable), " "got {}.".format(type(fit_optimizer))
)

if bin_before_fit:

# Check input types for "bin" to raise user-friendly errors
if not (
isinstance(bin_sizes, int)
or (isinstance(bin_sizes, dict) and all(isinstance(val, (int, Iterable)) for val in bin_sizes.values()))
):
raise TypeError(
"Argument `bin_sizes` must be an integer, or a dictionary of integers or iterables, "
"got {}.".format(type(bin_sizes))
)

if not callable(bin_statistic):
raise TypeError(
"Argument `bin_statistic` must be a function (callable), " "got {}.".format(type(bin_statistic))
)



def _calculate_slope_and_aspect_nuthkaab(dem: NDArrayf) -> tuple[NDArrayf, NDArrayf]:
"""
Expand Down Expand Up @@ -618,15 +652,42 @@ class Tilt(AffineCoreg):
the key "fit_func".
"""

def __init__(self, subsample: int | float = 5e5) -> None:
def __init__(
self,
bin_before_fit: bool = False,
fit_optimizer: Callable[..., tuple[NDArrayf, Any]] = scipy.optimize.curve_fit,
bin_sizes: int | dict[str, int | Iterable[float]] = 10,
bin_statistic: Callable[[NDArrayf], np.floating[Any]] = np.nanmedian,
subsample: int | float = 5e5
) -> None:
"""
Instantiate a tilt correction object.

:param bin_before_fit: Whether to bin data before fitting the coregistration function.
:param fit_optimizer: Optimizer to minimize the coregistration function.
:param bin_sizes: Size (if integer) or edges (if iterable) for binning variables later passed in .fit().
:param bin_statistic: Statistic of central tendency (e.g., mean) to apply during the binning.
:param subsample: Subsample the input for speed-up. <1 is parsed as a fraction. >1 is a pixel count.
"""
self.poly_order = 1

super().__init__(subsample=subsample)
# Define parameters exactly as in BiasCorr, but with only "fit" or "bin_and_fit" as option, so a bin_before_fit
# boolean, no bin apply option, and fit_func is preferefind
if not bin_before_fit:
meta_fit = {"fit_func": polynomial_2d, "fit_optimizer": fit_optimizer}
self._fit_or_bin = "fit"
super().__init__(subsample=subsample, meta=meta_fit)
else:
meta_bin_and_fit = {
"fit_func": polynomial_2d,
"fit_optimizer": fit_optimizer,
"bin_sizes": bin_sizes,
"bin_statistic": bin_statistic
}
self._fit_or_bin = "bin_and_fit"
super().__init__(subsample=subsample, meta=meta_bin_and_fit)

self._meta["poly_order"] = 1


def _fit_rst_rst(
self,
Expand All @@ -641,16 +702,23 @@ def _fit_rst_rst(
verbose: bool = False,
**kwargs: Any,
) -> None:
"""Fit the dDEM between the DEMs to a least squares polynomial equation."""
ddem = ref_elev - tba_elev
ddem[~inlier_mask] = np.nan
x_coords, y_coords = _get_x_and_y_coords(ref_elev.shape, transform)
fit_ramp, coefs = deramping(
ddem, x_coords, y_coords, degree=self.poly_order, subsample=self._meta["subsample"], verbose=verbose
)
"""Fit the tilt function to an elevation dataset."""

# The number of parameters in the first guess defines the polynomial order when calling np.polyval2d
p0 = np.ones(shape=((self._meta["poly_order"] + 1) ** 2))

self._meta["fit_params"] = coefs[0]
self._meta["fit_func"] = fit_ramp
# Coordinates (we don't need the actual ones, just array coordinates)
xx, yy = _get_x_and_y_coords(ref_elev.shape, transform)

self._bin_or_and_fit_nd(
values=ref_elev - tba_elev,
inlier_mask=inlier_mask,
bias_vars={"xx": xx, "yy": yy},
weights=weights,
verbose=verbose,
p0=p0,
**kwargs,
)

def _apply_rst(
self,
Expand All @@ -661,11 +729,13 @@ def _apply_rst(
**kwargs: Any,
) -> tuple[NDArrayf, rio.transform.Affine]:
"""Apply the deramp function to a DEM."""
x_coords, y_coords = _get_x_and_y_coords(elev.shape, transform)

ramp = self._meta["fit_func"](x_coords, y_coords)
# Define the coordinates for applying the correction
xx, yy = _get_x_and_y_coords(elev.shape, transform)

tilt = self._meta["fit_func"](xx, yy, *self._meta["fit_params"])

return elev + ramp, transform
return elev + tilt, transform

def _apply_pts(
self,
Expand All @@ -675,24 +745,28 @@ def _apply_pts(
**kwargs: Any,
) -> gpd.GeoDataFrame:
"""Apply the deramp function to a set of points."""

dem_copy = elev.copy()
dem_copy[z_name].values += self._meta["fit_func"](dem_copy.geometry.x.values, dem_copy.geometry.y.values)

xx = dem_copy.geometry.x.values
yy = dem_copy.geometry.y.values

dem_copy[z_name].values += self._meta["fit_func"](xx, yy, *self._meta["fit_params"])

return dem_copy

def _to_matrix_func(self) -> NDArrayf:
"""Return a transform matrix if possible."""
if self.degree > 1:
if self.meta["poly_order"] > 1:
raise ValueError(
"Nonlinear deramping degrees cannot be represented as transformation matrices."
f" (max 1, given: {self.poly_order})"
f" (max 1, given: {self.meta['poly_order']})"
)
if self.degree == 1:
if self.meta["poly_order"] == 1:
raise NotImplementedError("Vertical shift, rotation and horizontal scaling has to be implemented.")

# If degree==0, it's just a bias correction
empty_matrix = np.diag(np.ones(4, dtype=float))

empty_matrix[2, 3] += self._meta["fit_params"][0]

return empty_matrix
Expand Down Expand Up @@ -747,8 +821,9 @@ def _fit_rst_rst(
# Check that DEM CRS is projected, otherwise slope is not correctly calculated
if not crs.is_projected:
raise NotImplementedError(
f"DEMs CRS is {crs}. NuthKaab coregistration only works with \
projected CRS. First, reproject your DEMs in a local projected CRS, e.g. UTM, and re-run."
f"NuthKaab coregistration only works with in a projected CRS, current CRS is {crs}. Reproject "
f"your DEMs with DEM.reproject() in a local projected CRS such as UTM, that you can find"
f"using DEM.get_metric_crs()."
)

# Calculate slope and aspect maps from the reference DEM
Expand Down
Loading
Loading