In [None]:
#| default_exp target_transforms

In [None]:
#| hide
%load_ext autoreload
%autoreload 2

# Target transforms
Transformations that can be applied to the target before fitting and restored after predicting.

In [None]:
#| export
import abc
from typing import Tuple

try:
    from numba import njit
except ImportError:
    raise ImportError(
        "Please install numba. "
        "You can find detailed instructions at https://numba.pydata.org/numba-doc/latest/user/installing.html"
    )
import numpy as np

from utilsforecast.grouped_array import GroupedArray

In [None]:
#| export
class BaseTargetTransform(abc.ABC):
    """Base class used for target transformations."""
    @abc.abstractmethod
    def fit_transform(self, ga: GroupedArray) -> np.ndarray:
        raise NotImplementedError

    @abc.abstractmethod
    def inverse_transform(self, ga: GroupedArray) -> np.ndarray:
        raise NotImplementedError

In [None]:
#| exporti
@njit
def _standard_scaler_transform(
    data: np.ndarray,
    indptr: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
    n_groups = len(indptr) - 1
    stats = np.empty((n_groups, 2))
    out = np.empty_like(data)
    for i in range(n_groups):
        sl = slice(indptr[i], indptr[i + 1])
        mean = np.mean(data[sl])
        std = np.std(data[sl])
        stats[i, :] = mean, std
        out[sl] = (data[sl] - mean) / std
    return out, stats


@njit
def _standard_scaler_inverse_transform(
    data: np.ndarray,
    indptr: np.ndarray,
    stats: np.ndarray,
) -> np.ndarray:
    n_groups = len(indptr) - 1
    out = np.empty_like(data)
    for i in range(n_groups):
        sl = slice(indptr[i], indptr[i + 1])
        mean, std = stats[i]
        out[sl] = data[sl] * std + mean
    return out

In [None]:
#| export
class LocalStandardScaler(BaseTargetTransform):
    """Standardizes each serie by subtracting its mean and dividing by its standard deviation."""
    def fit_transform(self, ga: GroupedArray) -> np.ndarray:
        transformed, self.stats_ = _standard_scaler_transform(ga.data, ga.indptr)
        return transformed

    def inverse_transform(self, ga: GroupedArray) -> np.ndarray:
        return _standard_scaler_inverse_transform(ga.data, ga.indptr, self.stats_)

In [None]:
from utilsforecast.data import generate_series

In [None]:
series = generate_series(10, min_length=50, max_length=100)

In [None]:
sc = LocalStandardScaler()
data = series['y'].values
sizes = series.groupby('unique_id', observed=True).size().values
indptr = np.append(0, sizes.cumsum())
ga = GroupedArray(data, indptr)
transformed = sc.fit_transform(ga)
transformed_ga = GroupedArray(transformed, ga.indptr)
np.testing.assert_allclose(
    sc.inverse_transform(transformed_ga),
    data,
)

In [None]:
class LocalBoxCox(BaseTargetTransform):
    """Finds optimum lambda for each serie and applies Box-Cox transformation."""
    def fit_transform(self, ga: GroupedArray) -> np.ndarray:
        from scipy.stats import boxcox

        out = np.empty_like(ga.data)
        self.lmbdas_ = np.empty(ga.n_groups)
        for i in range(ga.n_groups):
            sl = slice(ga.indptr[i], ga.indptr[i + 1])
            transformed, self.lmbdas_[i] = boxcox(ga.data[sl], lmbda=None)
            if np.isclose(transformed * self.lmbdas_[i], -1).any():
                # in this case we can't reliably invert transformation
                # fallback to log
                transformed, self.lmbdas_[i] = boxcox(ga.data[sl], lmbda=0)
            out[sl] = transformed
        return out

    def inverse_transform(self, ga: GroupedArray) -> np.ndarray:
        from scipy.special import inv_boxcox

        sizes = np.diff(ga.indptr)
        lmbdas = np.repeat(self.lmbdas_, sizes, axis=0)
        return inv_boxcox(ga.data, lmbdas)

In [None]:
bc = LocalBoxCox()
transformed = bc.fit_transform(ga)
transformed_ga = GroupedArray(transformed, ga.indptr)
np.testing.assert_allclose(
    bc.inverse_transform(transformed_ga),
    data,
)