diff --git a/tests/test_coreg/test_affine.py b/tests/test_coreg/test_affine.py index daceec5d..fddac97e 100644 --- a/tests/test_coreg/test_affine.py +++ b/tests/test_coreg/test_affine.py @@ -9,10 +9,13 @@ import rasterio as rio from geoutils import Raster, Vector from geoutils.raster import RasterType +from geoutils.raster.raster import _shift_transform +from geoutils._typing import NDArrayNum +from scipy.ndimage import binary_dilation import xdem from xdem import coreg, examples -from xdem.coreg.affine import AffineCoreg, CoregDict +from xdem.coreg.affine import _reproject_horizontal_shift_samecrs, AffineCoreg, CoregDict def load_examples() -> tuple[RasterType, RasterType, Vector]: @@ -24,6 +27,54 @@ def load_examples() -> tuple[RasterType, RasterType, Vector]: return reference_raster, to_be_aligned_raster, glacier_mask +def gdal_reproject_horizontal_samecrs(filepath_example: str, xoff: float, yoff: float) -> NDArrayNum: + """ + Reproject horizontal shift in same CRS with GDAL for testing purposes. + + :param filepath_example: Path to raster file. + :param xoff: X shift in georeferenced unit. + :param yoff: Y shift in georeferenced unit. + + :return: Reprojected shift array in the same CRS. + """ + + from osgeo import gdal, gdalconst + + # Open source raster from file + src = gdal.Open(filepath_example, gdalconst.GA_ReadOnly) + + # Create output raster in memory + driver = "MEM" + method = gdal.GRA_Bilinear + drv = gdal.GetDriverByName(driver) + filename = '' + dest = drv.Create('', src.RasterXSize, src.RasterYSize, + 1, gdal.GDT_Float32) + proj = src.GetProjection() + ndv = src.GetRasterBand(1).GetNoDataValue() + dest.SetProjection(proj) + + # Shift the horizontally shifted geotransform + gt = src.GetGeoTransform() + gtl = list(gt) + gtl[0] += xoff + gtl[3] += yoff + gtl = tuple(gtl) + dest.SetGeoTransform(gtl) + + # Copy the raster metadata of the source to dest + dest.SetMetadata(src.GetMetadata()) + dest.GetRasterBand(1).SetNoDataValue(ndv) + dest.GetRasterBand(1).Fill(ndv) + + # Reproject with resampling + gdal.ReprojectImage(src, dest, proj, proj, method) + + # Extract reprojected array + array = dest.GetRasterBand(1).ReadAsArray().astype("float32") + array[array == ndv] = np.nan + + return array class TestAffineCoreg: @@ -44,6 +95,40 @@ class TestAffineCoreg: geometry=gpd.points_from_xy(x=points_arr[:, 0], y=points_arr[:, 1], crs=ref.crs), data={"z": points_arr[:, 2]} ) + @pytest.mark.parametrize("xoff_yoff", [(ref.res[0], ref.res[1]), (10*ref.res[0], 10*ref.res[1]), + (-1.2*ref.res[0], -1.2*ref.res[1])]) + def test_reproject_horizontal_shift_samecrs__gdal(self, xoff_yoff: tuple[float, float]): + """Check that the same-CRS reprojection based on SciPy (replacing Rasterio due to subpixel errors) + is accurate by comparing to GDAL.""" + + # Reproject with SciPy + xoff, yoff = xoff_yoff + dst_transform = _shift_transform(transform=self.ref.transform, xoff=xoff, yoff=yoff, distance_unit="georeferenced") + output = _reproject_horizontal_shift_samecrs(raster_arr=self.ref.data, src_transform=self.ref.transform, + dst_transform=dst_transform) + + # Reproject with GDAL + output2 = gdal_reproject_horizontal_samecrs(filepath_example=self.ref.filename, xoff=xoff, yoff=yoff) + + # Reproject and NaN propagation is exactly the same for shifts that are a multiple of pixel resolution + if xoff % self.ref.res[0] == 0 and yoff % self.ref.res[1] == 0: + assert np.array_equal(output, output2, equal_nan=True) + + # For sub-pixel shifts, NaN propagation differs slightly (within 1 pixel) but the resampled values are the same + else: + # Verify all close values + valids = np.logical_and(np.isfinite(output), np.isfinite(output2)) + # Max relative tolerance that is reached just for a small % of points + assert np.allclose(output[valids], output2[valids], rtol=10e-2) + # Median precision is much higher + # (here absolute, equivalent to around 10e-7 relative as raster values are in the 1000s) + assert np.nanmedian(np.abs(output[valids] - output2[valids])) < 0.0001 + + # NaNs differ by 1 pixel max, i.e. the mask dilated by one includes the other + mask_nans = ~np.isfinite(output) + mask_dilated_plus_one = binary_dilation(mask_nans, iterations=1).astype(bool) + assert np.array_equal(np.logical_or(mask_dilated_plus_one, ~np.isfinite(output2)), mask_dilated_plus_one) + def test_from_classmethods(self) -> None: # Check that the from_matrix function works as expected. diff --git a/xdem/coreg/__init__.py b/xdem/coreg/__init__.py index eac0f29a..a942c740 100644 --- a/xdem/coreg/__init__.py +++ b/xdem/coreg/__init__.py @@ -7,7 +7,6 @@ AffineCoreg, GradientDescending, NuthKaab, - Tilt, VerticalShift, ) from xdem.coreg.base import ( # noqa diff --git a/xdem/coreg/affine.py b/xdem/coreg/affine.py index d2c1a5f0..a1b9ade3 100644 --- a/xdem/coreg/affine.py +++ b/xdem/coreg/affine.py @@ -3,7 +3,7 @@ from __future__ import annotations import warnings -from typing import Any, Callable, TypeVar +from typing import Any, Callable, TypeVar, Iterable, Literal import xdem.coreg.base @@ -21,6 +21,8 @@ import scipy.ndimage import scipy.optimize from geoutils.raster import Raster, get_array_and_mask +from geoutils.raster.interpolate import _interp_points +from geoutils.raster.georeferencing import _coords from tqdm import trange from xdem._typing import NDArrayb, NDArrayf @@ -31,7 +33,6 @@ _mask_dataframe_by_dem, _residuals_df, _transform_to_bounds_and_res, - deramping, ) from xdem.spatialstats import nmad @@ -53,26 +54,69 @@ # Generic functions for affine methods ###################################### - -def apply_xy_shift(transform: rio.transform.Affine, dx: float, dy: float) -> rio.transform.Affine: +def _reproject_horizontal_shift_samecrs( + raster_arr: NDArrayf, + src_transform: rio.transform.Affine, + dst_transform: rio.transform.Affine = None, + return_interpolator: bool = False, + resampling: Literal["nearest", "linear", "cubic", "quintic", "slinear", "pchip", "splinef2d"] = "linear") \ + -> NDArrayf | Callable[[tuple[NDArrayf, NDArrayf]], NDArrayf]: """ - Apply horizontal shift to a rasterio Affine transform - :param transform: The Affine transform of the raster - :param dx: dx shift value - :param dy: dy shift value + Reproject a raster only for a horizontal shift (transform update) in the same CRS. + + This function exists independently of Raster.reproject() because Rasterio has unexplained reprojection issues + that can create non-negligible sub-pixel shifts that should be crucially avoided for coregistration. + See https://github.com/rasterio/rasterio/issues/2052#issuecomment-2078732477. - Returns: Updated transform + Here we use SciPy interpolation instead, modified for nodata propagation in geoutils.interp_points(). """ - transform_shifted = rio.transform.Affine( - transform.a, transform.b, transform.c + dx, transform.d, transform.e, transform.f + dy - ) - return transform_shifted + # We are reprojecting the raster array relative to itself without changing its pixel interpreation, so we can + # force any pixel interpretation (area_or_point) without it having any influence on the result + coords_dst = _coords(transform=dst_transform, area_or_point="Area", shape=raster_arr.shape) + + output = _interp_points(array=raster_arr, area_or_point="Area", transform=src_transform, + points=coords_dst, method=resampling, return_interpolator=return_interpolator) + + return output ###################################### # Functions for affine coregistrations ###################################### +def _check_inputs_bin_before_fit(bin_before_fit, fit_optimizer, bin_sizes, bin_statistic): + """ + Check input types of fit or bin_and_fit affine functions. + + :param bin_before_fit: Whether to bin data before fitting the coregistration function. + :param fit_optimizer: Optimizer to minimize the coregistration function. + :param bin_sizes: Size (if integer) or edges (if iterable) for binning variables later passed in .fit(). + :param bin_statistic: Statistic of central tendency (e.g., mean) to apply during the binning. + """ + + if not callable(fit_optimizer): + raise TypeError( + "Argument `fit_optimizer` must be a function (callable), " "got {}.".format(type(fit_optimizer)) + ) + + if bin_before_fit: + + # Check input types for "bin" to raise user-friendly errors + if not ( + isinstance(bin_sizes, int) + or (isinstance(bin_sizes, dict) and all(isinstance(val, (int, Iterable)) for val in bin_sizes.values())) + ): + raise TypeError( + "Argument `bin_sizes` must be an integer, or a dictionary of integers or iterables, " + "got {}.".format(type(bin_sizes)) + ) + + if not callable(bin_statistic): + raise TypeError( + "Argument `bin_statistic` must be a function (callable), " "got {}.".format(type(bin_statistic)) + ) + + def _calculate_slope_and_aspect_nuthkaab(dem: NDArrayf) -> tuple[NDArrayf, NDArrayf]: """ @@ -98,6 +142,22 @@ def _calculate_slope_and_aspect_nuthkaab(dem: NDArrayf) -> tuple[NDArrayf, NDArr return slope_tan, aspect +def nuth_kaab_fit_func(xx: NDArrayf, params: tuple[float, float, float]) -> NDArrayf: + """ + Nuth and Kääb (2011) fitting function. + + Describes the elevation differences divided by the slope tangente (y) as a 1D function of the aspect. + + y(x) = a * cos(b - x) + c + + where y = dh/tan(slope) and x = aspect. + + :param xx: The aspect in radians. + :param params: Parameters. + + :returns: Estimated y-values with the same shape as the given x-values + """ + return params[0] * np.cos(params[1] - xx) + params[2] def get_horizontal_shift( elevation_difference: NDArrayf, slope: NDArrayf, aspect: NDArrayf, min_count: int = 20 @@ -369,30 +429,6 @@ def _fit_rst_rst( self._meta["shift_z"] = vshift - def _apply_rst( - self, - elev: NDArrayf, - transform: rio.transform.Affine, - crs: rio.crs.CRS, - bias_vars: dict[str, NDArrayf] | None = None, - **kwargs: Any, - ) -> tuple[NDArrayf, rio.transform.Affine]: - """Apply the VerticalShift function to a DEM.""" - return elev + self._meta["shift_z"], transform - - def _apply_pts( - self, - elev: gpd.GeoDataFrame, - z_name: str = "z", - bias_vars: dict[str, NDArrayf] | None = None, - **kwargs: Any, - ) -> gpd.GeoDataFrame: - - """Apply the VerticalShift function to a set of points.""" - dem_copy = elev.copy() - dem_copy[z_name] += self._meta["shift_z"] - return dem_copy - def _to_matrix_func(self) -> NDArrayf: """Convert the vertical shift to a transform matrix.""" empty_matrix = np.diag(np.ones(4, dtype=float)) @@ -607,96 +643,6 @@ def _fit_rst_pts( self._meta["shift_z"] = matrix[2, 3] -class Tilt(AffineCoreg): - """ - Tilt alignment. - - Estimates an 2-D plan correction between the difference of two elevation datasets. This is close to a rotation - alignment at small angles, but introduces a scaling at large angles. - - The tilt parameters are stored in the `self.meta` key "fit_parameters", with associated polynomial function in - the key "fit_func". - """ - - def __init__(self, subsample: int | float = 5e5) -> None: - """ - Instantiate a tilt correction object. - - :param subsample: Subsample the input for speed-up. <1 is parsed as a fraction. >1 is a pixel count. - """ - self.poly_order = 1 - - super().__init__(subsample=subsample) - - def _fit_rst_rst( - self, - ref_elev: NDArrayf, - tba_elev: NDArrayf, - inlier_mask: NDArrayb, - transform: rio.transform.Affine, - crs: rio.crs.CRS, - z_name: str, - weights: NDArrayf | None = None, - bias_vars: dict[str, NDArrayf] | None = None, - verbose: bool = False, - **kwargs: Any, - ) -> None: - """Fit the dDEM between the DEMs to a least squares polynomial equation.""" - ddem = ref_elev - tba_elev - ddem[~inlier_mask] = np.nan - x_coords, y_coords = _get_x_and_y_coords(ref_elev.shape, transform) - fit_ramp, coefs = deramping( - ddem, x_coords, y_coords, degree=self.poly_order, subsample=self._meta["subsample"], verbose=verbose - ) - - self._meta["fit_params"] = coefs[0] - self._meta["fit_func"] = fit_ramp - - def _apply_rst( - self, - elev: NDArrayf, - transform: rio.transform.Affine, - crs: rio.crs.CRS, - bias_vars: dict[str, NDArrayf] | None = None, - **kwargs: Any, - ) -> tuple[NDArrayf, rio.transform.Affine]: - """Apply the deramp function to a DEM.""" - x_coords, y_coords = _get_x_and_y_coords(elev.shape, transform) - - ramp = self._meta["fit_func"](x_coords, y_coords) - - return elev + ramp, transform - - def _apply_pts( - self, - elev: gpd.GeoDataFrame, - z_name: str = "z", - bias_vars: dict[str, NDArrayf] | None = None, - **kwargs: Any, - ) -> gpd.GeoDataFrame: - """Apply the deramp function to a set of points.""" - dem_copy = elev.copy() - dem_copy[z_name].values += self._meta["fit_func"](dem_copy.geometry.x.values, dem_copy.geometry.y.values) - - return dem_copy - - def _to_matrix_func(self) -> NDArrayf: - """Return a transform matrix if possible.""" - if self.degree > 1: - raise ValueError( - "Nonlinear deramping degrees cannot be represented as transformation matrices." - f" (max 1, given: {self.poly_order})" - ) - if self.degree == 1: - raise NotImplementedError("Vertical shift, rotation and horizontal scaling has to be implemented.") - - # If degree==0, it's just a bias correction - empty_matrix = np.diag(np.ones(4, dtype=float)) - - empty_matrix[2, 3] += self._meta["fit_params"][0] - - return empty_matrix - class NuthKaab(AffineCoreg): """ @@ -709,15 +655,44 @@ class NuthKaab(AffineCoreg): in the "matrix" transform. """ - def __init__(self, max_iterations: int = 10, offset_threshold: float = 0.05, subsample: int | float = 5e5) -> None: + def __init__( + self, + max_iterations: int = 10, + offset_threshold: float = 0.05, + bin_before_fit: bool = False, + fit_optimizer: Callable[..., tuple[NDArrayf, Any]] = scipy.optimize.curve_fit, + bin_sizes: int | dict[str, int | Iterable[float]] = 10, + bin_statistic: Callable[[NDArrayf], np.floating[Any]] = np.nanmedian, + subsample: int | float = 5e5) -> None: """ Instantiate a new Nuth and Kääb (2011) coregistration object. :param max_iterations: The maximum allowed iterations before stopping. :param offset_threshold: The residual offset threshold after which to stop the iterations (in pixels). + :param bin_before_fit: Whether to bin data before fitting the coregistration function. For the Nuth and Kääb + (2011) algorithm, this corresponds to aspect bins along dh/tan(slope). + :param fit_optimizer: Optimizer to minimize the coregistration function. + :param bin_sizes: Size (if integer) or edges (if iterable) for binning variables later passed in .fit(). + :param bin_statistic: Statistic of central tendency (e.g., mean) to apply during the binning. :param subsample: Subsample the input for speed-up. <1 is parsed as a fraction. >1 is a pixel count. """ - self._meta: CoregDict + + # Define parameters exactly as in BiasCorr, but with only "fit" or "bin_and_fit" as option, so a bin_before_fit + # boolean, no bin apply option, and fit_func is preferefind + if not bin_before_fit: + meta_fit = {"fit_func": nuth_kaab_fit_func, "fit_optimizer": fit_optimizer} + self._fit_or_bin = "fit" + super().__init__(subsample=subsample, meta=meta_fit) + else: + meta_bin_and_fit = { + "fit_func": nuth_kaab_fit_func, + "fit_optimizer": fit_optimizer, + "bin_sizes": bin_sizes, + "bin_statistic": bin_statistic + } + self._fit_or_bin = "bin_and_fit" + super().__init__(subsample=subsample, meta=meta_bin_and_fit) + self.max_iterations = max_iterations self.offset_threshold = offset_threshold @@ -747,8 +722,9 @@ def _fit_rst_rst( # Check that DEM CRS is projected, otherwise slope is not correctly calculated if not crs.is_projected: raise NotImplementedError( - f"DEMs CRS is {crs}. NuthKaab coregistration only works with \ -projected CRS. First, reproject your DEMs in a local projected CRS, e.g. UTM, and re-run." + f"NuthKaab coregistration only works with in a projected CRS, current CRS is {crs}. Reproject " + f"your DEMs with DEM.reproject() in a local projected CRS such as UTM, that you can find" + f"using DEM.get_metric_crs()." ) # Calculate slope and aspect maps from the reference DEM @@ -1011,8 +987,8 @@ def _fit_rst_pts( print(" Statistics on coregistered dh:") print(f" Median = {vshift:.3f} - NMAD = {nmad_new:.3f}") - self._meta["shift_x"] = offset_east * resolution if ref == "point" else -offset_east - self._meta["shift_y"] = offset_north * resolution if ref == "point" else -offset_north + self._meta["shift_x"] = offset_east * resolution if ref == "point" else -offset_east * resolution + self._meta["shift_y"] = offset_north * resolution if ref == "point" else -offset_north * resolution self._meta["shift_z"] = vshift if ref == "point" else -vshift def _to_matrix_func(self) -> NDArrayf: @@ -1025,40 +1001,6 @@ def _to_matrix_func(self) -> NDArrayf: return matrix - def _apply_rst( - self, - elev: NDArrayf, - transform: rio.transform.Affine, - crs: rio.crs.CRS, - bias_vars: dict[str, NDArrayf] | None = None, - **kwargs: Any, - ) -> tuple[NDArrayf, rio.transform.Affine]: - """Apply the Nuth & Kaab shift to a DEM.""" - - updated_transform = apply_xy_shift(transform, -self._meta["shift_x"], -self._meta["shift_y"]) - vshift = self._meta["shift_z"] - return elev + vshift, updated_transform - - def _apply_pts( - self, - elev: gpd.GeoDataFrame, - z_name: str = "z", - bias_vars: dict[str, NDArrayf] | None = None, - **kwargs: Any, - ) -> gpd.GeoDataFrame: - """Apply the Nuth & Kaab shift to an elevation point cloud.""" - - applied_epc = gpd.GeoDataFrame( - geometry=gpd.points_from_xy( - x=elev.geometry.x.values + self._meta["shift_x"], - y=elev.geometry.y.values + self._meta["shift_y"], - crs=elev.crs, - ), - data={z_name: elev[z_name].values + self._meta["shift_z"]}, - ) - - return applied_epc - class GradientDescending(AffineCoreg): """ @@ -1208,8 +1150,8 @@ def func_cost(x: tuple[float, float]) -> np.floating[Any]: offset_east = res.x[0] offset_north = res.x[1] - self._meta["shift_x"] = offset_east * resolution if ref == "point" else -offset_east - self._meta["shift_y"] = offset_north * resolution if ref == "point" else -offset_north + self._meta["shift_x"] = offset_east * resolution if ref == "point" else -offset_east * resolution + self._meta["shift_y"] = offset_north * resolution if ref == "point" else -offset_north * resolution self._meta["shift_z"] = vshift if ref == "point" else -vshift def _fit_rst_rst( diff --git a/xdem/coreg/base.py b/xdem/coreg/base.py index 1e685d8c..b9990d73 100644 --- a/xdem/coreg/base.py +++ b/xdem/coreg/base.py @@ -45,6 +45,13 @@ from xdem._typing import MArrayf, NDArrayb, NDArrayf from xdem.spatialstats import nmad +from xdem.fit import ( + polynomial_1d, + robust_nfreq_sumsin_fit, + robust_norder_polynomial_fit, + sumsin_1d, +) +from xdem.spatialstats import nd_binning try: import pytransform3d.rotations @@ -56,6 +63,11 @@ _HAS_P3D = False +fit_workflows = { + "norder_polynomial": {"func": polynomial_1d, "optimizer": robust_norder_polynomial_fit}, + "nfreq_sumsin": {"func": sumsin_1d, "optimizer": robust_nfreq_sumsin_fit}, +} + ########################################### # Generic functions for preprocessing ########################################### @@ -637,101 +649,6 @@ def _postprocess_coreg_apply( return applied_elev, out_transform - -def deramping( - ddem: NDArrayf | MArrayf, - x_coords: NDArrayf, - y_coords: NDArrayf, - degree: int, - subsample: float | int = 1.0, - verbose: bool = False, -) -> tuple[Callable[[NDArrayf, NDArrayf], NDArrayf], tuple[NDArrayf, int]]: - """ - Calculate a deramping function to remove spatially correlated elevation differences that can be explained by \ - a polynomial of degree `degree`. - - :param ddem: The elevation difference array to analyse. - :param x_coords: x-coordinates of the above array (must have the same shape as elevation_difference) - :param y_coords: y-coordinates of the above array (must have the same shape as elevation_difference) - :param degree: The polynomial degree to estimate the ramp. - :param subsample: Subsample the input to increase performance. <1 is parsed as a fraction. >1 is a pixel count. - :param verbose: Print the least squares optimization progress. - - :returns: A callable function to estimate the ramp and the output of scipy.optimize.leastsq - """ - # Extract only valid pixels - valid_mask = np.isfinite(ddem) - ddem = ddem[valid_mask] - x_coords = x_coords[valid_mask] - y_coords = y_coords[valid_mask] - - # Formulate the 2D polynomial whose coefficients will be solved for. - def poly2d(x_coords: NDArrayf, y_coords: NDArrayf, coefficients: NDArrayf) -> NDArrayf: - """ - Estimate values from a 2D-polynomial. - - :param x_coords: x-coordinates of the difference array (must have the same shape as - elevation_difference). - :param y_coords: y-coordinates of the difference array (must have the same shape as - elevation_difference). - :param coefficients: The coefficients (a, b, c, etc.) of the polynomial. - :param degree: The degree of the polynomial. - - :raises ValueError: If the length of the coefficients list is not compatible with the degree. - - :returns: The values estimated by the polynomial. - """ - # Check that the coefficient size is correct. - coefficient_size = (degree + 1) * (degree + 2) / 2 - if len(coefficients) != coefficient_size: - raise ValueError() - - # Build the polynomial of degree `degree` - estimated_values = np.sum( - [ - coefficients[k * (k + 1) // 2 + j] * x_coords ** (k - j) * y_coords**j - for k in range(degree + 1) - for j in range(k + 1) - ], - axis=0, - ) - return estimated_values # type: ignore - - def residuals(coefs: NDArrayf, x_coords: NDArrayf, y_coords: NDArrayf, targets: NDArrayf) -> NDArrayf: - """Return the optimization residuals""" - res = targets - poly2d(x_coords, y_coords, coefs) - return res[np.isfinite(res)] - - if verbose: - print("Estimating deramp function...") - - # reduce number of elements for speed - rand_indices = subsample_array(x_coords, subsample=subsample, return_indices=True) - x_coords = x_coords[rand_indices] - y_coords = y_coords[rand_indices] - ddem = ddem[rand_indices] - - # Optimize polynomial parameters - coefs = scipy.optimize.leastsq( - func=residuals, - x0=np.zeros(shape=((degree + 1) * (degree + 2) // 2)), - args=(x_coords, y_coords, ddem), - ) - - def fit_ramp(x: NDArrayf, y: NDArrayf) -> NDArrayf: - """ - Get the elevation difference biases (ramp) at the given coordinates. - - :param x_coordinates: x-coordinates of interest. - :param y_coordinates: y-coordinates of interest. - - :returns: The estimated elevation difference bias. - """ - return poly2d(x, y, coefs[0]) - - return fit_ramp, coefs - - ############################################### # Affine matrix manipulation and transformation ############################################### @@ -1226,6 +1143,7 @@ class Coreg: _is_affine: bool | None = None _needs_vars: bool = False _meta: CoregDict + _fit_or_bin: Literal["fit", "bin", "bin_and_fit"] | None = None def __init__(self, meta: CoregDict | None = None) -> None: """Instantiate a generic processing step method.""" @@ -1928,6 +1846,197 @@ def _apply_func(self, **kwargs: Any) -> tuple[NDArrayf | gpd.GeoDataFrame, affin return applied_elev, out_transform + def _bin_or_and_fit_nd( # type: ignore + self, + values: NDArrayf, + inlier_mask: NDArrayb, + bias_vars: None | dict[str, NDArrayf] = None, + weights: None | NDArrayf = None, + verbose: bool = False, + **kwargs, + ) -> None: + """ + Generic binning and/or fitting method to model values along N variables for a coregistration/correction, + used for all affine and bias-correction subclasses. Expects either 2D arrays for rasters, or 1D arrays for + points. + + Should only be called through subclassing. + """ + + if self._fit_or_bin is None: + raise ValueError("This function should not be called for methods not supporting fit_or_bin logic.") + + # This is called by subclasses, so the bias_var should always be defined + if bias_vars is None: + raise ValueError("At least one `bias_var` should be passed to the fitting function, got None.") + + # Check number of variables + nd = self._meta["nd"] + if nd is not None and len(bias_vars) != nd: + raise ValueError( + "A number of {} variable(s) has to be provided through the argument 'bias_vars', " + "got {}.".format(nd, len(bias_vars)) + ) + + # If bias var names were explicitly passed at instantiation, check that they match the one from the dict + if self._meta["bias_var_names"] is not None: + if not sorted(bias_vars.keys()) == sorted(self._meta["bias_var_names"]): + raise ValueError( + "The keys of `bias_vars` do not match the `bias_var_names` defined during " + "instantiation: {}.".format(self._meta["bias_var_names"]) + ) + # Otherwise, store bias variable names from the dictionary + else: + self._meta["bias_var_names"] = list(bias_vars.keys()) + + # Compute difference and mask of valid data + # TODO: Move the check up to Coreg.fit()? + + valid_mask = np.logical_and.reduce( + (inlier_mask, np.isfinite(values), *(np.isfinite(var) for var in bias_vars.values())) + ) + + # Raise errors if all values are NaN after introducing masks from the variables + # (Others are already checked in Coreg.fit()) + if np.all(~valid_mask): + raise ValueError("Some 'bias_vars' have only NaNs in the inlier mask.") + + subsample_mask = self._get_subsample_on_valid_mask(valid_mask=valid_mask, verbose=verbose) + + # Get number of variables + nd = len(bias_vars) + + # Remove random state for keyword argument if its value is not in the optimizer function + if self._fit_or_bin in ["fit", "bin_and_fit"]: + fit_func_args = inspect.getfullargspec(self._meta["fit_optimizer"]).args + if "random_state" not in fit_func_args and "random_state" in kwargs: + kwargs.pop("random_state") + + # We need to sort the bin sizes in the same order as the bias variables if a dict is passed for bin_sizes + if self._fit_or_bin in ["bin", "bin_and_fit"]: + if isinstance(self._meta["bin_sizes"], dict): + var_order = list(bias_vars.keys()) + # Declare type to write integer or tuple to the variable + bin_sizes: int | tuple[int, ...] | tuple[NDArrayf, ...] = tuple( + np.array(self._meta["bin_sizes"][var]) for var in var_order + ) + # Otherwise, write integer directly + else: + bin_sizes = self._meta["bin_sizes"] + + # Option 1: Run fit and save optimized function parameters + if self._fit_or_bin == "fit": + + # Print if verbose + if verbose: + print( + "Estimating alignment along variables {} by fitting " + "with function {}.".format(", ".join(list(bias_vars.keys())), self._meta["fit_func"].__name__) + ) + + results = self._meta["fit_optimizer"]( + f=self._meta["fit_func"], + xdata=np.array([var[subsample_mask].flatten() for var in bias_vars.values()]).squeeze(), + ydata=values[subsample_mask].flatten(), + sigma=weights[subsample_mask].flatten() if weights is not None else None, + absolute_sigma=True, + **kwargs, + ) + + # Option 2: Run binning and save dataframe of result + elif self._fit_or_bin == "bin": + + if verbose: + print( + "Estimating alignment along variables {} by binning " + "with statistic {}.".format(", ".join(list(bias_vars.keys())), self._meta["bin_statistic"].__name__) + ) + + df = nd_binning( + values=values[subsample_mask], + list_var=[var[subsample_mask] for var in bias_vars.values()], + list_var_names=list(bias_vars.keys()), + list_var_bins=bin_sizes, + statistics=(self._meta["bin_statistic"], "count"), + ) + + # Option 3: Run binning, then fitting, and save both results + else: + + # Print if verbose + if verbose: + print( + "Estimating alignment along variables {} by binning with statistic {} and then fitting " + "with function {}.".format( + ", ".join(list(bias_vars.keys())), + self._meta["bin_statistic"].__name__, + self._meta["fit_func"].__name__, + ) + ) + + df = nd_binning( + values=values[subsample_mask], + list_var=[var[subsample_mask] for var in bias_vars.values()], + list_var_names=list(bias_vars.keys()), + list_var_bins=bin_sizes, + statistics=(self._meta["bin_statistic"], "count"), + ) + + # Now, we need to pass this new data to the fitting function and optimizer + # We use only the N-D binning estimates (maximum dimension, equal to length of variable list) + df_nd = df[df.nd == len(bias_vars)] + + # We get the middle of bin values for variable, and statistic for the diff + new_vars = [pd.IntervalIndex(df_nd[var_name]).mid.values for var_name in bias_vars.keys()] + new_diff = df_nd[self._meta["bin_statistic"].__name__].values + # TODO: pass a new sigma based on "count" and original sigma (and correlation?)? + # sigma values would have to be binned above also + + # Valid values for the binning output + ind_valid = np.logical_and.reduce((np.isfinite(new_diff), *(np.isfinite(var) for var in new_vars))) + + if np.all(~ind_valid): + raise ValueError("Only NaN values after binning, did you pass the right bin edges?") + + results = self._meta["fit_optimizer"]( + f=self._meta["fit_func"], + xdata=np.array([var[ind_valid].flatten() for var in new_vars]).squeeze(), + ydata=new_diff[ind_valid].flatten(), + sigma=weights[ind_valid].flatten() if weights is not None else None, + absolute_sigma=True, + **kwargs, + ) + + if verbose: + print(f"{nd}D bias estimated.") + + # Save results if fitting was performed + if self._fit_or_bin in ["fit", "bin_and_fit"]: + + # Write the results to metadata in different ways depending on optimizer returns + if self._meta["fit_optimizer"] in (w["optimizer"] for w in fit_workflows.values()): + params = results[0] + order_or_freq = results[1] + if self._meta["fit_optimizer"] == robust_norder_polynomial_fit: + self._meta["poly_order"] = order_or_freq + else: + self._meta["nb_sin_freq"] = order_or_freq + + elif self._meta["fit_optimizer"] == scipy.optimize.curve_fit: + params = results[0] + # Calculation to get the error on parameters (see description of scipy.optimize.curve_fit) + perr = np.sqrt(np.diag(results[1])) + self._meta["fit_perr"] = perr + + else: + params = results[0] + + self._meta["fit_params"] = params + + # Save results of binning if it was perfrmed + elif self._fit_or_bin in ["bin", "bin_and_fit"]: + self._meta["bin_dataframe"] = df + def _fit_rst_rst( self, ref_elev: NDArrayf, diff --git a/xdem/coreg/biascorr.py b/xdem/coreg/biascorr.py index 5a0481be..d6d890f9 100644 --- a/xdem/coreg/biascorr.py +++ b/xdem/coreg/biascorr.py @@ -1,31 +1,18 @@ """Bias corrections (i.e., non-affine coregistration) classes.""" from __future__ import annotations -import inspect from typing import Any, Callable, Iterable, Literal, TypeVar import geopandas as gpd import geoutils as gu import numpy as np -import pandas as pd import rasterio as rio import scipy import xdem.spatialstats from xdem._typing import NDArrayb, NDArrayf -from xdem.coreg.base import Coreg -from xdem.fit import ( - polynomial_1d, - polynomial_2d, - robust_nfreq_sumsin_fit, - robust_norder_polynomial_fit, - sumsin_1d, -) - -fit_workflows = { - "norder_polynomial": {"func": polynomial_1d, "optimizer": robust_norder_polynomial_fit}, - "nfreq_sumsin": {"func": sumsin_1d, "optimizer": robust_nfreq_sumsin_fit}, -} +from xdem.coreg.base import Coreg, fit_workflows +from xdem.fit import polynomial_2d BiasCorrType = TypeVar("BiasCorrType", bound="BiasCorr") @@ -52,7 +39,19 @@ def __init__( subsample: float | int = 1.0, ): """ - Instantiate a bias correction object. + Instantiate an N-dimensional bias correction using binning, fitting or both sequentially. + + All "fit_" arguments apply to "fit" and "bin_and_fit", and "bin_" arguments to "bin" and "bin_and_fit". + + :param fit_or_bin: Whether to fit or bin, or both. Use "fit" to correct by optimizing a function or + "bin" to correct with a statistic of central tendency in defined bins, or "bin_and_fit" to perform a fit on + the binned statistics. + :param fit_func: Function to fit to the bias with variables later passed in .fit(). + :param fit_optimizer: Optimizer to minimize the function. + :param bin_sizes: Size (if integer) or edges (if iterable) for binning variables later passed in .fit(). + :param bin_statistic: Statistic of central tendency (e.g., mean) to apply during the binning. + :param bin_apply_method: Method to correct with the binned statistics, either "linear" to interpolate linearly + between bins, or "per_bin" to apply the statistic for each bin. """ # Raise error if fit_or_bin is not defined if fit_or_bin not in ["fit", "bin", "bin_and_fit"]: @@ -143,196 +142,6 @@ def __init__( self._is_affine = False self._needs_vars = True - def _fit_biascorr( # type: ignore - self, - ref_elev: NDArrayf, - tba_elev: NDArrayf, - inlier_mask: NDArrayb, - transform: rio.transform.Affine, # Never None thanks to Coreg.fit() pre-process - crs: rio.crs.CRS, # Never None thanks to Coreg.fit() pre-process - z_name: str, - bias_vars: None | dict[str, NDArrayf] = None, - weights: None | NDArrayf = None, - verbose: bool = False, - **kwargs, - ) -> None: - """ - Generic fit method for all biascorr subclasses, expects either 2D arrays for rasters or 1D arrays for points. - Should only be called through subclassing. - """ - - # This is called by subclasses, so the bias_var should always be defined - if bias_vars is None: - raise ValueError("At least one `bias_var` should be passed to the fitting function, got None.") - - # Check number of variables - nd = self._meta["nd"] - if nd is not None and len(bias_vars) != nd: - raise ValueError( - "A number of {} variable(s) has to be provided through the argument 'bias_vars', " - "got {}.".format(nd, len(bias_vars)) - ) - - # If bias var names were explicitly passed at instantiation, check that they match the one from the dict - if self._meta["bias_var_names"] is not None: - if not sorted(bias_vars.keys()) == sorted(self._meta["bias_var_names"]): - raise ValueError( - "The keys of `bias_vars` do not match the `bias_var_names` defined during " - "instantiation: {}.".format(self._meta["bias_var_names"]) - ) - # Otherwise, store bias variable names from the dictionary - else: - self._meta["bias_var_names"] = list(bias_vars.keys()) - - # Compute difference and mask of valid data - # TODO: Move the check up to Coreg.fit()? - - diff = ref_elev - tba_elev - valid_mask = np.logical_and.reduce( - (inlier_mask, np.isfinite(diff), *(np.isfinite(var) for var in bias_vars.values())) - ) - - # Raise errors if all values are NaN after introducing masks from the variables - # (Others are already checked in Coreg.fit()) - if np.all(~valid_mask): - raise ValueError("Some 'bias_vars' have only NaNs in the inlier mask.") - - subsample_mask = self._get_subsample_on_valid_mask(valid_mask=valid_mask, verbose=verbose) - - # Get number of variables - nd = len(bias_vars) - - # Remove random state for keyword argument if its value is not in the optimizer function - if self._fit_or_bin in ["fit", "bin_and_fit"]: - fit_func_args = inspect.getfullargspec(self._meta["fit_optimizer"]).args - if "random_state" not in fit_func_args and "random_state" in kwargs: - kwargs.pop("random_state") - - # We need to sort the bin sizes in the same order as the bias variables if a dict is passed for bin_sizes - if self._fit_or_bin in ["bin", "bin_and_fit"]: - if isinstance(self._meta["bin_sizes"], dict): - var_order = list(bias_vars.keys()) - # Declare type to write integer or tuple to the variable - bin_sizes: int | tuple[int, ...] | tuple[NDArrayf, ...] = tuple( - np.array(self._meta["bin_sizes"][var]) for var in var_order - ) - # Otherwise, write integer directly - else: - bin_sizes = self._meta["bin_sizes"] - - # Option 1: Run fit and save optimized function parameters - if self._fit_or_bin == "fit": - - # Print if verbose - if verbose: - print( - "Estimating bias correction along variables {} by fitting " - "with function {}.".format(", ".join(list(bias_vars.keys())), self._meta["fit_func"].__name__) - ) - - results = self._meta["fit_optimizer"]( - f=self._meta["fit_func"], - xdata=np.array([var[subsample_mask].flatten() for var in bias_vars.values()]).squeeze(), - ydata=diff[subsample_mask].flatten(), - sigma=weights[subsample_mask].flatten() if weights is not None else None, - absolute_sigma=True, - **kwargs, - ) - - # Option 2: Run binning and save dataframe of result - elif self._fit_or_bin == "bin": - - if verbose: - print( - "Estimating bias correction along variables {} by binning " - "with statistic {}.".format(", ".join(list(bias_vars.keys())), self._meta["bin_statistic"].__name__) - ) - - df = xdem.spatialstats.nd_binning( - values=diff[subsample_mask], - list_var=[var[subsample_mask] for var in bias_vars.values()], - list_var_names=list(bias_vars.keys()), - list_var_bins=bin_sizes, - statistics=(self._meta["bin_statistic"], "count"), - ) - - # Option 3: Run binning, then fitting, and save both results - else: - - # Print if verbose - if verbose: - print( - "Estimating bias correction along variables {} by binning with statistic {} and then fitting " - "with function {}.".format( - ", ".join(list(bias_vars.keys())), - self._meta["bin_statistic"].__name__, - self._meta["fit_func"].__name__, - ) - ) - - df = xdem.spatialstats.nd_binning( - values=diff[subsample_mask], - list_var=[var[subsample_mask] for var in bias_vars.values()], - list_var_names=list(bias_vars.keys()), - list_var_bins=bin_sizes, - statistics=(self._meta["bin_statistic"], "count"), - ) - - # Now, we need to pass this new data to the fitting function and optimizer - # We use only the N-D binning estimates (maximum dimension, equal to length of variable list) - df_nd = df[df.nd == len(bias_vars)] - - # We get the middle of bin values for variable, and statistic for the diff - new_vars = [pd.IntervalIndex(df_nd[var_name]).mid.values for var_name in bias_vars.keys()] - new_diff = df_nd[self._meta["bin_statistic"].__name__].values - # TODO: pass a new sigma based on "count" and original sigma (and correlation?)? - # sigma values would have to be binned above also - - # Valid values for the binning output - ind_valid = np.logical_and.reduce((np.isfinite(new_diff), *(np.isfinite(var) for var in new_vars))) - - if np.all(~ind_valid): - raise ValueError("Only NaNs values after binning, did you pass the right bin edges?") - - results = self._meta["fit_optimizer"]( - f=self._meta["fit_func"], - xdata=np.array([var[ind_valid].flatten() for var in new_vars]).squeeze(), - ydata=new_diff[ind_valid].flatten(), - sigma=weights[ind_valid].flatten() if weights is not None else None, - absolute_sigma=True, - **kwargs, - ) - - if verbose: - print(f"{nd}D bias estimated.") - - # Save results if fitting was performed - if self._fit_or_bin in ["fit", "bin_and_fit"]: - - # Write the results to metadata in different ways depending on optimizer returns - if self._meta["fit_optimizer"] in (w["optimizer"] for w in fit_workflows.values()): - params = results[0] - order_or_freq = results[1] - if self._meta["fit_optimizer"] == robust_norder_polynomial_fit: - self._meta["poly_order"] = order_or_freq - else: - self._meta["nb_sin_freq"] = order_or_freq - - elif self._meta["fit_optimizer"] == scipy.optimize.curve_fit: - params = results[0] - # Calculation to get the error on parameters (see description of scipy.optimize.curve_fit) - perr = np.sqrt(np.diag(results[1])) - self._meta["fit_perr"] = perr - - else: - params = results[0] - - self._meta["fit_params"] = params - - # Save results of binning if it was perfrmed - elif self._fit_or_bin in ["bin", "bin_and_fit"]: - self._meta["bin_dataframe"] = df - def _fit_rst_rst( self, ref_elev: NDArrayf, @@ -348,13 +157,11 @@ def _fit_rst_rst( ) -> None: """Should only be called through subclassing""" - self._fit_biascorr( - ref_elev=ref_elev, - tba_elev=tba_elev, + diff = ref_elev - tba_elev + + self._bin_or_and_fit_nd( + values=diff, inlier_mask=inlier_mask, - transform=transform, - crs=crs, - z_name=z_name, weights=weights, bias_vars=bias_vars, verbose=verbose, @@ -426,14 +233,12 @@ def _fit_rst_pts( # type: ignore bias_vars_pts = None # Send to raster-raster fit but using 1D arrays instead of 2D arrays (flattened anyway during analysis) - self._fit_biascorr( - ref_elev=ref_elev_pts, - tba_elev=tba_elev_pts, + diff = ref_elev_pts - tba_elev_pts + + self._bin_or_and_fit_nd( + values=diff, inlier_mask=inlier_pts_alltrue, bias_vars=bias_vars_pts, - transform=transform, - crs=crs, - z_name=z_name, weights=weights, verbose=verbose, **kwargs, @@ -510,8 +315,9 @@ def __init__( :param angle: Angle in which to perform the directional correction (degrees) with 0° corresponding to X axis direction and increasing clockwise. - :param fit_or_bin: Whether to fit or bin. Use "fit" to correct by optimizing a function or - "bin" to correct with a statistic of central tendency in defined bins. + :param fit_or_bin: Whether to fit or bin, or both. Use "fit" to correct by optimizing a function or + "bin" to correct with a statistic of central tendency in defined bins, or "bin_and_fit" to perform a fit on + the binned statistics. :param fit_func: Function to fit to the bias with variables later passed in .fit(). :param fit_optimizer: Optimizer to minimize the function. :param bin_sizes: Size (if integer) or edges (if iterable) for binning variables later passed in .fit(). @@ -554,14 +360,10 @@ def _fit_rst_rst( # type: ignore average_res = (transform[0] + abs(transform[4])) / 2 kwargs.update({"hop_length": average_res}) - self._fit_biascorr( - ref_elev=ref_elev, - tba_elev=tba_elev, + self._bin_or_and_fit_nd( + values=ref_elev - tba_elev, inlier_mask=inlier_mask, bias_vars={"angle": x}, - transform=transform, - crs=crs, - z_name=z_name, weights=weights, verbose=verbose, **kwargs, @@ -658,8 +460,9 @@ def __init__( Instantiate a terrain bias correction. :param terrain_attribute: Terrain attribute to use for correction. - :param fit_or_bin: Whether to fit or bin. Use "fit" to correct by optimizing a function or - "bin" to correct with a statistic of central tendency in defined bins. + :param fit_or_bin: Whether to fit or bin, or both. Use "fit" to correct by optimizing a function or + "bin" to correct with a statistic of central tendency in defined bins, or "bin_and_fit" to perform a fit on + the binned statistics. :param fit_func: Function to fit to the bias with variables later passed in .fit(). :param fit_optimizer: Optimizer to minimize the function. :param bin_sizes: Size (if integer) or edges (if iterable) for binning variables later passed in .fit(). @@ -714,14 +517,10 @@ def _fit_rst_rst( # type: ignore ) # Run the parent function - self._fit_biascorr( - ref_elev=ref_elev, - tba_elev=tba_elev, + self._bin_or_and_fit_nd( + values=ref_elev - tba_elev, inlier_mask=inlier_mask, bias_vars={self._meta["terrain_attribute"]: attr}, - transform=transform, - crs=crs, - z_name=z_name, weights=weights, verbose=verbose, **kwargs, @@ -817,8 +616,9 @@ def __init__( Instantiate a directional bias correction. :param poly_order: Order of the 2D polynomial to fit. - :param fit_or_bin: Whether to fit or bin. Use "fit" to correct by optimizing a function or - "bin" to correct with a statistic of central tendency in defined bins. + :param fit_or_bin: Whether to fit or bin, or both. Use "fit" to correct by optimizing a function or + "bin" to correct with a statistic of central tendency in defined bins, or "bin_and_fit" to perform a fit on + the binned statistics. :param fit_func: Function to fit to the bias with variables later passed in .fit(). :param fit_optimizer: Optimizer to minimize the function. :param bin_sizes: Size (if integer) or edges (if iterable) for binning variables later passed in .fit(). @@ -860,14 +660,10 @@ def _fit_rst_rst( # type: ignore # Coordinates (we don't need the actual ones, just array coordinates) xx, yy = np.meshgrid(np.arange(0, ref_elev.shape[1]), np.arange(0, ref_elev.shape[0])) - self._fit_biascorr( - ref_elev=ref_elev, - tba_elev=tba_elev, + self._bin_or_and_fit_nd( + values=ref_elev - tba_elev, inlier_mask=inlier_mask, bias_vars={"xx": xx, "yy": yy}, - transform=transform, - crs=crs, - z_name=z_name, weights=weights, verbose=verbose, p0=p0, @@ -924,3 +720,139 @@ def _apply_rst( xx, yy = np.meshgrid(np.arange(0, elev.shape[1]), np.arange(0, elev.shape[0])) return super()._apply_rst(elev=elev, transform=transform, crs=crs, bias_vars={"xx": xx, "yy": yy}, **kwargs) + + + + +# +# +# class Tilt(AffineCoreg): +# """ +# Tilt alignment. +# +# Estimates an 2-D plan correction between the difference of two elevation datasets. This is close to a rotation +# alignment at small angles, but introduces a scaling at large angles. +# +# The tilt parameters are stored in the `self.meta` key "fit_parameters", with associated polynomial function in +# the key "fit_func". +# """ +# +# def __init__( +# self, +# bin_before_fit: bool = False, +# fit_optimizer: Callable[..., tuple[NDArrayf, Any]] = scipy.optimize.curve_fit, +# bin_sizes: int | dict[str, int | Iterable[float]] = 10, +# bin_statistic: Callable[[NDArrayf], np.floating[Any]] = np.nanmedian, +# subsample: int | float = 5e5 +# ) -> None: +# """ +# Instantiate a tilt correction object. +# +# :param bin_before_fit: Whether to bin data before fitting the coregistration function. +# :param fit_optimizer: Optimizer to minimize the coregistration function. +# :param bin_sizes: Size (if integer) or edges (if iterable) for binning variables later passed in .fit(). +# :param bin_statistic: Statistic of central tendency (e.g., mean) to apply during the binning. +# :param subsample: Subsample the input for speed-up. <1 is parsed as a fraction. >1 is a pixel count. +# """ +# +# # Define Nuth and Kääb fitting function +# def nuth_kaab_fit_func(xx: NDArrayf, params: tuple[float, float, float]) -> NDArrayf: +# """ +# Fit a cosinus function to the terrain aspect (x) to describe the elevation differences divided by the slope +# tangente (y). +# +# y(x) = a * cos(b - x) + c +# +# where y = dh/tan(slope) and x = aspect. +# +# :param xx: The aspect in radians. +# :param params: Parameters. +# +# :returns: Estimated y-values with the same shape as the given x-values +# """ +# return params[0] * np.cos(params[1] - xx) + params[2] +# +# # Define parameters exactly as in BiasCorr, but with only "fit" or "bin_and_fit" as option, so a bin_before_fit +# # boolean, no bin apply option, and fit_func is preferefind +# if not bin_before_fit: +# meta_fit = {"fit_func": nuth_kaab_fit_func, "fit_optimizer": fit_optimizer} +# self._fit_or_bin = "fit" +# super().__init__(subsample=subsample, meta=meta_fit) +# else: +# meta_bin_and_fit = { +# "fit_func": nuth_kaab_fit_func, +# "fit_optimizer": fit_optimizer, +# "bin_sizes": bin_sizes, +# "bin_statistic": bin_statistic +# } +# self._fit_or_bin = "bin_and_fit" +# super().__init__(subsample=subsample, meta=meta_bin_and_fit) +# +# self._meta["poly_order"] = 1 +# +# +# def _fit_rst_rst( +# self, +# ref_elev: NDArrayf, +# tba_elev: NDArrayf, +# inlier_mask: NDArrayb, +# transform: rio.transform.Affine, +# crs: rio.crs.CRS, +# z_name: str, +# weights: NDArrayf | None = None, +# bias_vars: dict[str, NDArrayf] | None = None, +# verbose: bool = False, +# **kwargs: Any, +# ) -> None: +# """Fit the tilt function to an elevation dataset.""" +# +# # The number of parameters in the first guess defines the polynomial order when calling np.polyval2d +# p0 = np.ones(shape=((self._meta["poly_order"] + 1) ** 2)) +# +# # Coordinates (we don't need the actual ones, just array coordinates) +# xx, yy = _get_x_and_y_coords(ref_elev.shape, transform) +# +# self._bin_or_and_fit_nd( +# values=ref_elev - tba_elev, +# inlier_mask=inlier_mask, +# bias_vars={"xx": xx, "yy": yy}, +# weights=weights, +# verbose=verbose, +# p0=p0, +# **kwargs, +# ) +# +# def _apply_rst( +# self, +# elev: NDArrayf, +# transform: rio.transform.Affine, +# crs: rio.crs.CRS, +# bias_vars: dict[str, NDArrayf] | None = None, +# **kwargs: Any, +# ) -> tuple[NDArrayf, rio.transform.Affine]: +# """Apply the deramp function to a DEM.""" +# +# # Define the coordinates for applying the correction +# xx, yy = _get_x_and_y_coords(elev.shape, transform) +# +# tilt = self._meta["fit_func"](xx, yy, *self._meta["fit_params"]) +# +# return elev + tilt, transform +# +# def _apply_pts( +# self, +# elev: gpd.GeoDataFrame, +# z_name: str = "z", +# bias_vars: dict[str, NDArrayf] | None = None, +# **kwargs: Any, +# ) -> gpd.GeoDataFrame: +# """Apply the deramp function to a set of points.""" +# +# dem_copy = elev.copy() +# +# xx = dem_copy.geometry.x.values +# yy = dem_copy.geometry.y.values +# +# dem_copy[z_name].values += self._meta["fit_func"](xx, yy, *self._meta["fit_params"]) +# +# return dem_copy \ No newline at end of file