From 53c34f3e741a32c1a6c5a4c0a04c2da1c242319e Mon Sep 17 00:00:00 2001 From: Romain Hugonnet Date: Thu, 23 May 2024 17:58:47 -0800 Subject: [PATCH] Incremental commit on affine reorg --- xdem/coreg/affine.py | 121 ++++++++++++++---- xdem/coreg/base.py | 204 ++++++++++++++++++++++++++++++ xdem/coreg/biascorr.py | 280 ++++++----------------------------------- 3 files changed, 340 insertions(+), 265 deletions(-) diff --git a/xdem/coreg/affine.py b/xdem/coreg/affine.py index f75aeeac..1d310082 100644 --- a/xdem/coreg/affine.py +++ b/xdem/coreg/affine.py @@ -3,7 +3,7 @@ from __future__ import annotations import warnings -from typing import Any, Callable, TypeVar +from typing import Any, Callable, TypeVar, Iterable import xdem.coreg.base @@ -34,6 +34,7 @@ deramping, ) from xdem.spatialstats import nmad +from xdem.fit import polynomial_2d try: import pytransform3d.transformations @@ -73,6 +74,39 @@ def apply_xy_shift(transform: rio.transform.Affine, dx: float, dy: float) -> rio # Functions for affine coregistrations ###################################### +def _check_inputs_bin_before_fit(bin_before_fit, fit_optimizer, bin_sizes, bin_statistic): + """ + Check input types of fit or bin_and_fit affine functions. + + :param bin_before_fit: Whether to bin data before fitting the coregistration function. + :param fit_optimizer: Optimizer to minimize the coregistration function. + :param bin_sizes: Size (if integer) or edges (if iterable) for binning variables later passed in .fit(). + :param bin_statistic: Statistic of central tendency (e.g., mean) to apply during the binning. + """ + + if not callable(fit_optimizer): + raise TypeError( + "Argument `fit_optimizer` must be a function (callable), " "got {}.".format(type(fit_optimizer)) + ) + + if bin_before_fit: + + # Check input types for "bin" to raise user-friendly errors + if not ( + isinstance(bin_sizes, int) + or (isinstance(bin_sizes, dict) and all(isinstance(val, (int, Iterable)) for val in bin_sizes.values())) + ): + raise TypeError( + "Argument `bin_sizes` must be an integer, or a dictionary of integers or iterables, " + "got {}.".format(type(bin_sizes)) + ) + + if not callable(bin_statistic): + raise TypeError( + "Argument `bin_statistic` must be a function (callable), " "got {}.".format(type(bin_statistic)) + ) + + def _calculate_slope_and_aspect_nuthkaab(dem: NDArrayf) -> tuple[NDArrayf, NDArrayf]: """ @@ -618,15 +652,42 @@ class Tilt(AffineCoreg): the key "fit_func". """ - def __init__(self, subsample: int | float = 5e5) -> None: + def __init__( + self, + bin_before_fit: bool = False, + fit_optimizer: Callable[..., tuple[NDArrayf, Any]] = scipy.optimize.curve_fit, + bin_sizes: int | dict[str, int | Iterable[float]] = 10, + bin_statistic: Callable[[NDArrayf], np.floating[Any]] = np.nanmedian, + subsample: int | float = 5e5 + ) -> None: """ Instantiate a tilt correction object. + :param bin_before_fit: Whether to bin data before fitting the coregistration function. + :param fit_optimizer: Optimizer to minimize the coregistration function. + :param bin_sizes: Size (if integer) or edges (if iterable) for binning variables later passed in .fit(). + :param bin_statistic: Statistic of central tendency (e.g., mean) to apply during the binning. :param subsample: Subsample the input for speed-up. <1 is parsed as a fraction. >1 is a pixel count. """ - self.poly_order = 1 - super().__init__(subsample=subsample) + # Define parameters exactly as in BiasCorr, but with only "fit" or "bin_and_fit" as option, so a bin_before_fit + # boolean, no bin apply option, and fit_func is preferefind + if not bin_before_fit: + meta_fit = {"fit_func": polynomial_2d, "fit_optimizer": fit_optimizer} + self._fit_or_bin = "fit" + super().__init__(subsample=subsample, meta=meta_fit) + else: + meta_bin_and_fit = { + "fit_func": polynomial_2d, + "fit_optimizer": fit_optimizer, + "bin_sizes": bin_sizes, + "bin_statistic": bin_statistic + } + self._fit_or_bin = "bin_and_fit" + super().__init__(subsample=subsample, meta=meta_bin_and_fit) + + self._meta["poly_order"] = 1 + def _fit_rst_rst( self, @@ -641,16 +702,23 @@ def _fit_rst_rst( verbose: bool = False, **kwargs: Any, ) -> None: - """Fit the dDEM between the DEMs to a least squares polynomial equation.""" - ddem = ref_elev - tba_elev - ddem[~inlier_mask] = np.nan - x_coords, y_coords = _get_x_and_y_coords(ref_elev.shape, transform) - fit_ramp, coefs = deramping( - ddem, x_coords, y_coords, degree=self.poly_order, subsample=self._meta["subsample"], verbose=verbose - ) + """Fit the tilt function to an elevation dataset.""" + + # The number of parameters in the first guess defines the polynomial order when calling np.polyval2d + p0 = np.ones(shape=((self._meta["poly_order"] + 1) ** 2)) - self._meta["fit_params"] = coefs[0] - self._meta["fit_func"] = fit_ramp + # Coordinates (we don't need the actual ones, just array coordinates) + xx, yy = _get_x_and_y_coords(ref_elev.shape, transform) + + self._bin_or_and_fit_nd( + values=ref_elev - tba_elev, + inlier_mask=inlier_mask, + bias_vars={"xx": xx, "yy": yy}, + weights=weights, + verbose=verbose, + p0=p0, + **kwargs, + ) def _apply_rst( self, @@ -661,11 +729,13 @@ def _apply_rst( **kwargs: Any, ) -> tuple[NDArrayf, rio.transform.Affine]: """Apply the deramp function to a DEM.""" - x_coords, y_coords = _get_x_and_y_coords(elev.shape, transform) - ramp = self._meta["fit_func"](x_coords, y_coords) + # Define the coordinates for applying the correction + xx, yy = _get_x_and_y_coords(elev.shape, transform) + + tilt = self._meta["fit_func"](xx, yy, *self._meta["fit_params"]) - return elev + ramp, transform + return elev + tilt, transform def _apply_pts( self, @@ -675,24 +745,28 @@ def _apply_pts( **kwargs: Any, ) -> gpd.GeoDataFrame: """Apply the deramp function to a set of points.""" + dem_copy = elev.copy() - dem_copy[z_name].values += self._meta["fit_func"](dem_copy.geometry.x.values, dem_copy.geometry.y.values) + + xx = dem_copy.geometry.x.values + yy = dem_copy.geometry.y.values + + dem_copy[z_name].values += self._meta["fit_func"](xx, yy, *self._meta["fit_params"]) return dem_copy def _to_matrix_func(self) -> NDArrayf: """Return a transform matrix if possible.""" - if self.degree > 1: + if self.meta["poly_order"] > 1: raise ValueError( "Nonlinear deramping degrees cannot be represented as transformation matrices." - f" (max 1, given: {self.poly_order})" + f" (max 1, given: {self.meta['poly_order']})" ) - if self.degree == 1: + if self.meta["poly_order"] == 1: raise NotImplementedError("Vertical shift, rotation and horizontal scaling has to be implemented.") # If degree==0, it's just a bias correction empty_matrix = np.diag(np.ones(4, dtype=float)) - empty_matrix[2, 3] += self._meta["fit_params"][0] return empty_matrix @@ -747,8 +821,9 @@ def _fit_rst_rst( # Check that DEM CRS is projected, otherwise slope is not correctly calculated if not crs.is_projected: raise NotImplementedError( - f"DEMs CRS is {crs}. NuthKaab coregistration only works with \ -projected CRS. First, reproject your DEMs in a local projected CRS, e.g. UTM, and re-run." + f"NuthKaab coregistration only works with in a projected CRS, current CRS is {crs}. Reproject " + f"your DEMs with DEM.reproject() in a local projected CRS such as UTM, that you can find" + f"using DEM.get_metric_crs()." ) # Calculate slope and aspect maps from the reference DEM diff --git a/xdem/coreg/base.py b/xdem/coreg/base.py index c1169914..d4dbc7c0 100644 --- a/xdem/coreg/base.py +++ b/xdem/coreg/base.py @@ -51,6 +51,13 @@ from xdem._typing import MArrayf, NDArrayb, NDArrayf from xdem.spatialstats import nmad +from xdem.fit import ( + polynomial_1d, + robust_nfreq_sumsin_fit, + robust_norder_polynomial_fit, + sumsin_1d, +) +from xdem.spatialstats import nd_binning try: import pytransform3d.transformations @@ -61,6 +68,11 @@ _HAS_P3D = False +fit_workflows = { + "norder_polynomial": {"func": polynomial_1d, "optimizer": robust_norder_polynomial_fit}, + "nfreq_sumsin": {"func": sumsin_1d, "optimizer": robust_nfreq_sumsin_fit}, +} + ########################################### # Generic functions for preprocessing ########################################### @@ -1060,6 +1072,7 @@ class Coreg: _is_affine: bool | None = None _needs_vars: bool = False _meta: CoregDict + _fit_or_bin: Literal["fit", "bin", "bin_and_fit"] | None = None def __init__(self, meta: CoregDict | None = None) -> None: """Instantiate a generic processing step method.""" @@ -1763,6 +1776,197 @@ def _apply_func(self, **kwargs: Any) -> tuple[NDArrayf | gpd.GeoDataFrame, affin return applied_elev, out_transform + def _bin_or_and_fit_nd( # type: ignore + self, + values: NDArrayf, + inlier_mask: NDArrayb, + bias_vars: None | dict[str, NDArrayf] = None, + weights: None | NDArrayf = None, + verbose: bool = False, + **kwargs, + ) -> None: + """ + Generic binning and/or fitting method to model values along N variables for a coregistration/correction, + used for all affine and bias-correction subclasses. Expects either 2D arrays for rasters, or 1D arrays for + points. + + Should only be called through subclassing. + """ + + if self._fit_or_bin is None: + raise ValueError("This function should not be called for methods not supporting fit_or_bin logic.") + + # This is called by subclasses, so the bias_var should always be defined + if bias_vars is None: + raise ValueError("At least one `bias_var` should be passed to the fitting function, got None.") + + # Check number of variables + nd = self._meta["nd"] + if nd is not None and len(bias_vars) != nd: + raise ValueError( + "A number of {} variable(s) has to be provided through the argument 'bias_vars', " + "got {}.".format(nd, len(bias_vars)) + ) + + # If bias var names were explicitly passed at instantiation, check that they match the one from the dict + if self._meta["bias_var_names"] is not None: + if not sorted(bias_vars.keys()) == sorted(self._meta["bias_var_names"]): + raise ValueError( + "The keys of `bias_vars` do not match the `bias_var_names` defined during " + "instantiation: {}.".format(self._meta["bias_var_names"]) + ) + # Otherwise, store bias variable names from the dictionary + else: + self._meta["bias_var_names"] = list(bias_vars.keys()) + + # Compute difference and mask of valid data + # TODO: Move the check up to Coreg.fit()? + + valid_mask = np.logical_and.reduce( + (inlier_mask, np.isfinite(values), *(np.isfinite(var) for var in bias_vars.values())) + ) + + # Raise errors if all values are NaN after introducing masks from the variables + # (Others are already checked in Coreg.fit()) + if np.all(~valid_mask): + raise ValueError("Some 'bias_vars' have only NaNs in the inlier mask.") + + subsample_mask = self._get_subsample_on_valid_mask(valid_mask=valid_mask, verbose=verbose) + + # Get number of variables + nd = len(bias_vars) + + # Remove random state for keyword argument if its value is not in the optimizer function + if self._fit_or_bin in ["fit", "bin_and_fit"]: + fit_func_args = inspect.getfullargspec(self._meta["fit_optimizer"]).args + if "random_state" not in fit_func_args and "random_state" in kwargs: + kwargs.pop("random_state") + + # We need to sort the bin sizes in the same order as the bias variables if a dict is passed for bin_sizes + if self._fit_or_bin in ["bin", "bin_and_fit"]: + if isinstance(self._meta["bin_sizes"], dict): + var_order = list(bias_vars.keys()) + # Declare type to write integer or tuple to the variable + bin_sizes: int | tuple[int, ...] | tuple[NDArrayf, ...] = tuple( + np.array(self._meta["bin_sizes"][var]) for var in var_order + ) + # Otherwise, write integer directly + else: + bin_sizes = self._meta["bin_sizes"] + + # Option 1: Run fit and save optimized function parameters + if self._fit_or_bin == "fit": + + # Print if verbose + if verbose: + print( + "Estimating alignment along variables {} by fitting " + "with function {}.".format(", ".join(list(bias_vars.keys())), self._meta["fit_func"].__name__) + ) + + results = self._meta["fit_optimizer"]( + f=self._meta["fit_func"], + xdata=np.array([var[subsample_mask].flatten() for var in bias_vars.values()]).squeeze(), + ydata=values[subsample_mask].flatten(), + sigma=weights[subsample_mask].flatten() if weights is not None else None, + absolute_sigma=True, + **kwargs, + ) + + # Option 2: Run binning and save dataframe of result + elif self._fit_or_bin == "bin": + + if verbose: + print( + "Estimating alignment along variables {} by binning " + "with statistic {}.".format(", ".join(list(bias_vars.keys())), self._meta["bin_statistic"].__name__) + ) + + df = nd_binning( + values=values[subsample_mask], + list_var=[var[subsample_mask] for var in bias_vars.values()], + list_var_names=list(bias_vars.keys()), + list_var_bins=bin_sizes, + statistics=(self._meta["bin_statistic"], "count"), + ) + + # Option 3: Run binning, then fitting, and save both results + else: + + # Print if verbose + if verbose: + print( + "Estimating alignment along variables {} by binning with statistic {} and then fitting " + "with function {}.".format( + ", ".join(list(bias_vars.keys())), + self._meta["bin_statistic"].__name__, + self._meta["fit_func"].__name__, + ) + ) + + df = nd_binning( + values=values[subsample_mask], + list_var=[var[subsample_mask] for var in bias_vars.values()], + list_var_names=list(bias_vars.keys()), + list_var_bins=bin_sizes, + statistics=(self._meta["bin_statistic"], "count"), + ) + + # Now, we need to pass this new data to the fitting function and optimizer + # We use only the N-D binning estimates (maximum dimension, equal to length of variable list) + df_nd = df[df.nd == len(bias_vars)] + + # We get the middle of bin values for variable, and statistic for the diff + new_vars = [pd.IntervalIndex(df_nd[var_name]).mid.values for var_name in bias_vars.keys()] + new_diff = df_nd[self._meta["bin_statistic"].__name__].values + # TODO: pass a new sigma based on "count" and original sigma (and correlation?)? + # sigma values would have to be binned above also + + # Valid values for the binning output + ind_valid = np.logical_and.reduce((np.isfinite(new_diff), *(np.isfinite(var) for var in new_vars))) + + if np.all(~ind_valid): + raise ValueError("Only NaN values after binning, did you pass the right bin edges?") + + results = self._meta["fit_optimizer"]( + f=self._meta["fit_func"], + xdata=np.array([var[ind_valid].flatten() for var in new_vars]).squeeze(), + ydata=new_diff[ind_valid].flatten(), + sigma=weights[ind_valid].flatten() if weights is not None else None, + absolute_sigma=True, + **kwargs, + ) + + if verbose: + print(f"{nd}D bias estimated.") + + # Save results if fitting was performed + if self._fit_or_bin in ["fit", "bin_and_fit"]: + + # Write the results to metadata in different ways depending on optimizer returns + if self._meta["fit_optimizer"] in (w["optimizer"] for w in fit_workflows.values()): + params = results[0] + order_or_freq = results[1] + if self._meta["fit_optimizer"] == robust_norder_polynomial_fit: + self._meta["poly_order"] = order_or_freq + else: + self._meta["nb_sin_freq"] = order_or_freq + + elif self._meta["fit_optimizer"] == scipy.optimize.curve_fit: + params = results[0] + # Calculation to get the error on parameters (see description of scipy.optimize.curve_fit) + perr = np.sqrt(np.diag(results[1])) + self._meta["fit_perr"] = perr + + else: + params = results[0] + + self._meta["fit_params"] = params + + # Save results of binning if it was perfrmed + elif self._fit_or_bin in ["bin", "bin_and_fit"]: + self._meta["bin_dataframe"] = df + def _fit_rst_rst( self, ref_elev: NDArrayf, diff --git a/xdem/coreg/biascorr.py b/xdem/coreg/biascorr.py index 6a4eb111..07314afb 100644 --- a/xdem/coreg/biascorr.py +++ b/xdem/coreg/biascorr.py @@ -1,31 +1,18 @@ """Bias corrections (i.e., non-affine coregistration) classes.""" from __future__ import annotations -import inspect from typing import Any, Callable, Iterable, Literal, TypeVar import geopandas as gpd import geoutils as gu import numpy as np -import pandas as pd import rasterio as rio import scipy import xdem.spatialstats from xdem._typing import NDArrayb, NDArrayf -from xdem.coreg.base import Coreg -from xdem.fit import ( - polynomial_1d, - polynomial_2d, - robust_nfreq_sumsin_fit, - robust_norder_polynomial_fit, - sumsin_1d, -) - -fit_workflows = { - "norder_polynomial": {"func": polynomial_1d, "optimizer": robust_norder_polynomial_fit}, - "nfreq_sumsin": {"func": sumsin_1d, "optimizer": robust_nfreq_sumsin_fit}, -} +from xdem.coreg.base import Coreg, fit_workflows +from xdem.fit import polynomial_2d BiasCorrType = TypeVar("BiasCorrType", bound="BiasCorr") @@ -52,7 +39,19 @@ def __init__( subsample: float | int = 1.0, ): """ - Instantiate a bias correction object. + Instantiate an N-dimensional bias correction using binning, fitting or both sequentially. + + All "fit_" arguments apply to "fit" and "bin_and_fit", and "bin_" arguments to "bin" and "bin_and_fit". + + :param fit_or_bin: Whether to fit or bin, or both. Use "fit" to correct by optimizing a function or + "bin" to correct with a statistic of central tendency in defined bins, or "bin_and_fit" to perform a fit on + the binned statistics. + :param fit_func: Function to fit to the bias with variables later passed in .fit(). + :param fit_optimizer: Optimizer to minimize the function. + :param bin_sizes: Size (if integer) or edges (if iterable) for binning variables later passed in .fit(). + :param bin_statistic: Statistic of central tendency (e.g., mean) to apply during the binning. + :param bin_apply_method: Method to correct with the binned statistics, either "linear" to interpolate linearly + between bins, or "per_bin" to apply the statistic for each bin. """ # Raise error if fit_or_bin is not defined if fit_or_bin not in ["fit", "bin", "bin_and_fit"]: @@ -143,196 +142,6 @@ def __init__( self._is_affine = False self._needs_vars = True - def _fit_biascorr( # type: ignore - self, - ref_elev: NDArrayf, - tba_elev: NDArrayf, - inlier_mask: NDArrayb, - transform: rio.transform.Affine, # Never None thanks to Coreg.fit() pre-process - crs: rio.crs.CRS, # Never None thanks to Coreg.fit() pre-process - z_name: str, - bias_vars: None | dict[str, NDArrayf] = None, - weights: None | NDArrayf = None, - verbose: bool = False, - **kwargs, - ) -> None: - """ - Generic fit method for all biascorr subclasses, expects either 2D arrays for rasters or 1D arrays for points. - Should only be called through subclassing. - """ - - # This is called by subclasses, so the bias_var should always be defined - if bias_vars is None: - raise ValueError("At least one `bias_var` should be passed to the fitting function, got None.") - - # Check number of variables - nd = self._meta["nd"] - if nd is not None and len(bias_vars) != nd: - raise ValueError( - "A number of {} variable(s) has to be provided through the argument 'bias_vars', " - "got {}.".format(nd, len(bias_vars)) - ) - - # If bias var names were explicitly passed at instantiation, check that they match the one from the dict - if self._meta["bias_var_names"] is not None: - if not sorted(bias_vars.keys()) == sorted(self._meta["bias_var_names"]): - raise ValueError( - "The keys of `bias_vars` do not match the `bias_var_names` defined during " - "instantiation: {}.".format(self._meta["bias_var_names"]) - ) - # Otherwise, store bias variable names from the dictionary - else: - self._meta["bias_var_names"] = list(bias_vars.keys()) - - # Compute difference and mask of valid data - # TODO: Move the check up to Coreg.fit()? - - diff = ref_elev - tba_elev - valid_mask = np.logical_and.reduce( - (inlier_mask, np.isfinite(diff), *(np.isfinite(var) for var in bias_vars.values())) - ) - - # Raise errors if all values are NaN after introducing masks from the variables - # (Others are already checked in Coreg.fit()) - if np.all(~valid_mask): - raise ValueError("Some 'bias_vars' have only NaNs in the inlier mask.") - - subsample_mask = self._get_subsample_on_valid_mask(valid_mask=valid_mask, verbose=verbose) - - # Get number of variables - nd = len(bias_vars) - - # Remove random state for keyword argument if its value is not in the optimizer function - if self._fit_or_bin in ["fit", "bin_and_fit"]: - fit_func_args = inspect.getfullargspec(self._meta["fit_optimizer"]).args - if "random_state" not in fit_func_args and "random_state" in kwargs: - kwargs.pop("random_state") - - # We need to sort the bin sizes in the same order as the bias variables if a dict is passed for bin_sizes - if self._fit_or_bin in ["bin", "bin_and_fit"]: - if isinstance(self._meta["bin_sizes"], dict): - var_order = list(bias_vars.keys()) - # Declare type to write integer or tuple to the variable - bin_sizes: int | tuple[int, ...] | tuple[NDArrayf, ...] = tuple( - np.array(self._meta["bin_sizes"][var]) for var in var_order - ) - # Otherwise, write integer directly - else: - bin_sizes = self._meta["bin_sizes"] - - # Option 1: Run fit and save optimized function parameters - if self._fit_or_bin == "fit": - - # Print if verbose - if verbose: - print( - "Estimating bias correction along variables {} by fitting " - "with function {}.".format(", ".join(list(bias_vars.keys())), self._meta["fit_func"].__name__) - ) - - results = self._meta["fit_optimizer"]( - f=self._meta["fit_func"], - xdata=np.array([var[subsample_mask].flatten() for var in bias_vars.values()]).squeeze(), - ydata=diff[subsample_mask].flatten(), - sigma=weights[subsample_mask].flatten() if weights is not None else None, - absolute_sigma=True, - **kwargs, - ) - - # Option 2: Run binning and save dataframe of result - elif self._fit_or_bin == "bin": - - if verbose: - print( - "Estimating bias correction along variables {} by binning " - "with statistic {}.".format(", ".join(list(bias_vars.keys())), self._meta["bin_statistic"].__name__) - ) - - df = xdem.spatialstats.nd_binning( - values=diff[subsample_mask], - list_var=[var[subsample_mask] for var in bias_vars.values()], - list_var_names=list(bias_vars.keys()), - list_var_bins=bin_sizes, - statistics=(self._meta["bin_statistic"], "count"), - ) - - # Option 3: Run binning, then fitting, and save both results - else: - - # Print if verbose - if verbose: - print( - "Estimating bias correction along variables {} by binning with statistic {} and then fitting " - "with function {}.".format( - ", ".join(list(bias_vars.keys())), - self._meta["bin_statistic"].__name__, - self._meta["fit_func"].__name__, - ) - ) - - df = xdem.spatialstats.nd_binning( - values=diff[subsample_mask], - list_var=[var[subsample_mask] for var in bias_vars.values()], - list_var_names=list(bias_vars.keys()), - list_var_bins=bin_sizes, - statistics=(self._meta["bin_statistic"], "count"), - ) - - # Now, we need to pass this new data to the fitting function and optimizer - # We use only the N-D binning estimates (maximum dimension, equal to length of variable list) - df_nd = df[df.nd == len(bias_vars)] - - # We get the middle of bin values for variable, and statistic for the diff - new_vars = [pd.IntervalIndex(df_nd[var_name]).mid.values for var_name in bias_vars.keys()] - new_diff = df_nd[self._meta["bin_statistic"].__name__].values - # TODO: pass a new sigma based on "count" and original sigma (and correlation?)? - # sigma values would have to be binned above also - - # Valid values for the binning output - ind_valid = np.logical_and.reduce((np.isfinite(new_diff), *(np.isfinite(var) for var in new_vars))) - - if np.all(~ind_valid): - raise ValueError("Only NaNs values after binning, did you pass the right bin edges?") - - results = self._meta["fit_optimizer"]( - f=self._meta["fit_func"], - xdata=np.array([var[ind_valid].flatten() for var in new_vars]).squeeze(), - ydata=new_diff[ind_valid].flatten(), - sigma=weights[ind_valid].flatten() if weights is not None else None, - absolute_sigma=True, - **kwargs, - ) - - if verbose: - print(f"{nd}D bias estimated.") - - # Save results if fitting was performed - if self._fit_or_bin in ["fit", "bin_and_fit"]: - - # Write the results to metadata in different ways depending on optimizer returns - if self._meta["fit_optimizer"] in (w["optimizer"] for w in fit_workflows.values()): - params = results[0] - order_or_freq = results[1] - if self._meta["fit_optimizer"] == robust_norder_polynomial_fit: - self._meta["poly_order"] = order_or_freq - else: - self._meta["nb_sin_freq"] = order_or_freq - - elif self._meta["fit_optimizer"] == scipy.optimize.curve_fit: - params = results[0] - # Calculation to get the error on parameters (see description of scipy.optimize.curve_fit) - perr = np.sqrt(np.diag(results[1])) - self._meta["fit_perr"] = perr - - else: - params = results[0] - - self._meta["fit_params"] = params - - # Save results of binning if it was perfrmed - elif self._fit_or_bin in ["bin", "bin_and_fit"]: - self._meta["bin_dataframe"] = df - def _fit_rst_rst( self, ref_elev: NDArrayf, @@ -348,13 +157,11 @@ def _fit_rst_rst( ) -> None: """Should only be called through subclassing""" - self._fit_biascorr( - ref_elev=ref_elev, - tba_elev=tba_elev, + diff = ref_elev - tba_elev + + self._bin_or_and_fit_nd( + values=diff, inlier_mask=inlier_mask, - transform=transform, - crs=crs, - z_name=z_name, weights=weights, bias_vars=bias_vars, verbose=verbose, @@ -426,14 +233,12 @@ def _fit_rst_pts( # type: ignore bias_vars_pts = None # Send to raster-raster fit but using 1D arrays instead of 2D arrays (flattened anyway during analysis) - self._fit_biascorr( - ref_elev=ref_elev_pts, - tba_elev=tba_elev_pts, + diff = ref_elev_pts - tba_elev_pts + + self._bin_or_and_fit_nd( + values=diff, inlier_mask=inlier_pts_alltrue, bias_vars=bias_vars_pts, - transform=transform, - crs=crs, - z_name=z_name, weights=weights, verbose=verbose, **kwargs, @@ -510,8 +315,9 @@ def __init__( :param angle: Angle in which to perform the directional correction (degrees) with 0° corresponding to X axis direction and increasing clockwise. - :param fit_or_bin: Whether to fit or bin. Use "fit" to correct by optimizing a function or - "bin" to correct with a statistic of central tendency in defined bins. + :param fit_or_bin: Whether to fit or bin, or both. Use "fit" to correct by optimizing a function or + "bin" to correct with a statistic of central tendency in defined bins, or "bin_and_fit" to perform a fit on + the binned statistics. :param fit_func: Function to fit to the bias with variables later passed in .fit(). :param fit_optimizer: Optimizer to minimize the function. :param bin_sizes: Size (if integer) or edges (if iterable) for binning variables later passed in .fit(). @@ -554,14 +360,10 @@ def _fit_rst_rst( # type: ignore average_res = (transform[0] + abs(transform[4])) / 2 kwargs.update({"hop_length": average_res}) - self._fit_biascorr( - ref_elev=ref_elev, - tba_elev=tba_elev, + self._bin_or_and_fit_nd( + values=ref_elev - tba_elev, inlier_mask=inlier_mask, bias_vars={"angle": x}, - transform=transform, - crs=crs, - z_name=z_name, weights=weights, verbose=verbose, **kwargs, @@ -658,8 +460,9 @@ def __init__( Instantiate a terrain bias correction. :param terrain_attribute: Terrain attribute to use for correction. - :param fit_or_bin: Whether to fit or bin. Use "fit" to correct by optimizing a function or - "bin" to correct with a statistic of central tendency in defined bins. + :param fit_or_bin: Whether to fit or bin, or both. Use "fit" to correct by optimizing a function or + "bin" to correct with a statistic of central tendency in defined bins, or "bin_and_fit" to perform a fit on + the binned statistics. :param fit_func: Function to fit to the bias with variables later passed in .fit(). :param fit_optimizer: Optimizer to minimize the function. :param bin_sizes: Size (if integer) or edges (if iterable) for binning variables later passed in .fit(). @@ -714,14 +517,10 @@ def _fit_rst_rst( # type: ignore ) # Run the parent function - self._fit_biascorr( - ref_elev=ref_elev, - tba_elev=tba_elev, + self._bin_or_and_fit_nd( + values=ref_elev - tba_elev, inlier_mask=inlier_mask, bias_vars={self._meta["terrain_attribute"]: attr}, - transform=transform, - crs=crs, - z_name=z_name, weights=weights, verbose=verbose, **kwargs, @@ -817,8 +616,9 @@ def __init__( Instantiate a directional bias correction. :param poly_order: Order of the 2D polynomial to fit. - :param fit_or_bin: Whether to fit or bin. Use "fit" to correct by optimizing a function or - "bin" to correct with a statistic of central tendency in defined bins. + :param fit_or_bin: Whether to fit or bin, or both. Use "fit" to correct by optimizing a function or + "bin" to correct with a statistic of central tendency in defined bins, or "bin_and_fit" to perform a fit on + the binned statistics. :param fit_func: Function to fit to the bias with variables later passed in .fit(). :param fit_optimizer: Optimizer to minimize the function. :param bin_sizes: Size (if integer) or edges (if iterable) for binning variables later passed in .fit(). @@ -860,14 +660,10 @@ def _fit_rst_rst( # type: ignore # Coordinates (we don't need the actual ones, just array coordinates) xx, yy = np.meshgrid(np.arange(0, ref_elev.shape[1]), np.arange(0, ref_elev.shape[0])) - self._fit_biascorr( - ref_elev=ref_elev, - tba_elev=tba_elev, + self._bin_or_and_fit_nd( + values=ref_elev - tba_elev, inlier_mask=inlier_mask, bias_vars={"xx": xx, "yy": yy}, - transform=transform, - crs=crs, - z_name=z_name, weights=weights, verbose=verbose, p0=p0,