In [63]:
%load_ext autoreload
%autoreload 2
import sys
from pathlib import Path
path = str(Path.cwd().parent)
print(path)
sys.path.insert(1, path)

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
c:\Users\Joaquín Amat\Documents\GitHub\skforecast


In [64]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import KBinsDiscretizer
from timeit import repeat
import numpy as np
from typing import Optional
# non fittederror
from sklearn.exceptions import NotFittedError

In [65]:
class QuantileBinner:
    """
    QuantileBinner class to bin data into quantile-based bins using `numpy.percentile`.
    This class is similar to `KBinsDiscretizer` but faster for binning data into
    quantile-based bins.
    
    Parameters
    ----------
    n_bins : int
        The number of quantile-based bins to create.
    method : str, default='linear'
        The method used to compute the quantiles. This parameter is passed to 
        `numpy.percentile`. Default is 'linear'. Valid values are "inverse_cdf",
        "averaged_inverse_cdf", "closest_observation", "interpolated_inverse_cdf",
        "hazen", "weibull", "linear", "median_unbiased", "normal_unbiased".
    subsample : int, default=200000
        The number of samples to use for computing quantiles. If the dataset 
        has more samples than `subsample`, a random subset will be used.
    random_state : int, default=789654
        The random seed to use for generating a random subset of the data.
    dtype : data type, default=numpy.float16
        The data type to use for the bin indices. Default is `numpy.float16`.
    
    Attributes
    ----------
    n_bins : int
        The number of quantile-based bins to create.
    method : str, default='linear'
        The method used to compute the quantiles. This parameter is passed to 
        `numpy.percentile`. Default is 'linear'. Valid values are 'linear',
        'lower', 'higher', 'midpoint', 'nearest'.
    subsample : int, default=200000
        The number of samples to use for computing quantiles. If the dataset 
        has more samples than `subsample`, a random subset will be used.
    random_state : int, default=789654
        The random seed to use for generating a random subset of the data.
    dtype : data type, default=numpy.float16
        The data type to use for the bin indices. Default is `numpy.float16`.
     n_bins_ : int
        The number of bins learned during fitting.
    bin_edges_ : numpy ndarray
        The edges of the bins learned during fitting.
    """

    def __init__(
        self,
        n_bins: int,
        method: Optional[str] = "linear",
        subsample: int = 200000,
        dtype: Optional[type] = np.float16,
        random_state: Optional[int] = 789654
    ):
        self.n_bins       = n_bins
        self.method       = method
        self.subsample    = subsample
        self.random_state = random_state
        self.dtype        = dtype
        self.n_bins_      = None
        self.bin_edges_   = None
        self.intervals_   = None

    def __repr__(self):
        """
        Return the string representation of the quantile binner.
        """
        return (
            f"QuantileBinner(n_bins={self.n_bins}, method={self.method}, "
            f"subsample={self.subsample}, dtype={self.dtype})"
        )

    def _validate_init_params(self):
        """
        Validate the parameters passed to the class.
        """
        if not isinstance(self.n_bins, int) or self.n_bins < 2:
            raise ValueError("The number of bins must be an integer greater than 1.")

        valid_methods = [
            "inverse_cdf",
            "averaged_inverse_cdf",
            "closest_observation",
            "interpolated_inverse_cdf",
            "hazen",
            "weibull",
            "linear",
            "median_unbiased",
            "normal_unbiased",
        ]
        if self.method not in valid_methods:
            raise ValueError(
                f"The method must be one of {valid_methods}. Got '{self.method}'."
            )
        if not isinstance(self.subsample, int) or self.subsample < 1:
            raise ValueError("The subsample must be an integer greater than 0.")
        if not isinstance(self.random_state, int) or self.random_state < 0:
            raise ValueError(
                "The random state must be an integer greater than or equal to 0."
            )
        if not isinstance(self.dtype, type):
            raise ValueError("The dtype must be a valid numpy data type.")

    def fit(self, X: np.ndarray):
        """
        Learn the bin edges based on quantiles from the training data.
        
        Parameters
        ----------
        X : numpy ndarray
            The training data used to compute the quantiles.
        
        Returns
        -------
        self : QuantileBinner
            Fitted estimator.
        """

        if X.size == 0:
            raise ValueError("Input data `X` cannot be empty.")
        if len(X) > self.subsample:
            rng = np.random.default_rng(self.random_state)
            X = X[rng.integers(0, len(X), self.subsample)]

        self.bin_edges_ = np.percentile(
            a      = X,
            q      = np.linspace(0, 100, self.n_bins + 1),
            method = self.method
        )

        self.n_bins_ = len(self.bin_edges_) - 1
        self.intervals_ = {
            i: (float(self.bin_edges_[i]), float(self.bin_edges_[i + 1]))
            for i in range(self.n_bins_)
        }

        return self

    def transform(self, X: np.ndarray):
        """
        Assign new data to the learned bins.
        
        Parameters
        ----------
        X : numpy ndarray
            The data to assign to the bins.
        
        Returns
        -------
        bin_indices : numpy ndarray 
            The indices of the bins each value belongs to.
            Values less than the smallest bin edge are assigned to the first bin,
            and values greater than the largest bin edge are assigned to the last bin.
        """
        if self.bin_edges_ is None:
            raise NotFittedError(
                "The model has not been fitted yet. Call 'fit' with training data first."
            )

        bin_indices = np.digitize(X, bins=self.bin_edges_, right=True)
        bin_indices = np.clip(bin_indices, 1, self.n_bins_).astype(self.dtype) - 1

        return bin_indices

    def fit_transform(self, X):
        """
        Fit the model to the data and return the bin indices for the same data.
        
        Parameters
        ----------
        X : numpy.ndarray
            The data to fit and transform.
        
        Returns
        -------
        bin_indices : numpy.ndarray
            The indices of the bins each value belongs to.
            Values less than the smallest bin edge are assigned to the first bin,
            and values greater than the largest bin edge are assigned to the last bin.
        """
        self.fit(X)

        return self.transform(X)

    def get_params(self):
        """
        Get the parameters of the quantile binner.
        
        Returns
        -------
        params : dict
            A dictionary of the parameters of the quantile binner.
        """
        return {
            "n_bins": self.n_bins,
            "method": self.method,
            "subsample": self.subsample,
            "dtype": self.dtype
        }

    def set_params(self, **params):
        """
        Set the parameters of the quantile binner.
        
        Parameters
        ----------
        params : dict
            A dictionary of the parameters to set.
        """
        for param, value in params.items():
            setattr(self, param, value)
        return self

In [66]:
X = np.random.normal(10, 10, 10000)
X_reshaped = X.reshape(-1, 1)

In [67]:
binner = KBinsDiscretizer(n_bins=10, encode='ordinal', strategy='quantile', dtype=np.float64)
times_fit = repeat("binner.fit_transform(X_reshaped)", repeat=100, number=10, globals=globals())
times_transform = repeat("binner.transform(X_reshaped[0].reshape(1, -1))", repeat=1000, number=10, globals=globals())
print(f"Sklearn KBinsDiscretizer {binner.get_params()}")
print(f"    Fit      : {1000 * np.mean(times_fit):.6f} μs +- {1000 * np.std(times_fit):.6f}")
print(f"    Transform: {1000 * np.mean(times_transform):.6f} μs +- {1000 * np.std(times_transform):.6f}")
print("")

binner = QuantileBinner(n_bins=10, method="linear", dtype=np.float16)
times_fit = repeat("binner.fit_transform(X)", repeat=100, number=10, globals=globals())
times_transform = repeat("binner.transform(X[0])", repeat=1000, number=10, globals=globals())
print(f"QuantileBinner {binner.get_params()}")
print(f"    Fit      : {1000 * np.mean(times_fit):.6f} μs +- {1000 * np.std(times_fit):.6f}")
print(f"    Transform: {1000 * np.mean(times_transform):.6f} μs +- {1000 * np.std(times_transform):.6f}")
print("")

binner = QuantileBinner(n_bins=10, method="closest_observation", dtype=np.uint8)
times_fit = repeat("binner.fit_transform(X)", repeat=100, number=10, globals=globals())
times_transform = repeat("binner.transform([0])", repeat=1000, number=10, globals=globals())
print(f"QuantileBinner {binner.get_params()}")
print(f"    Fit      : {1000 * np.mean(times_fit):.6f} μs +- {1000 * np.std(times_fit):.6f}")
print(f"    Transform: {1000 * np.mean(times_transform):.6f} μs +- {1000 * np.std(times_transform):.6f}")

Sklearn KBinsDiscretizer {'dtype': <class 'numpy.float64'>, 'encode': 'ordinal', 'n_bins': 10, 'random_state': None, 'strategy': 'quantile', 'subsample': 200000}
    Fit      : 7.268094 μs +- 2.418357
    Transform: 0.437913 μs +- 0.082739

QuantileBinner {'n_bins': 10, 'method': 'linear', 'subsample': 200000, 'dtype': <class 'numpy.float16'>}
    Fit      : 5.845347 μs +- 0.638219
    Transform: 0.076105 μs +- 0.014681

QuantileBinner {'n_bins': 10, 'method': 'closest_observation', 'subsample': 200000, 'dtype': <class 'numpy.uint8'>}
    Fit      : 4.468196 μs +- 0.354149
    Transform: 0.069654 μs +- 0.013048


In [68]:
from sklearn.model_selection import ParameterGrid

params = {
    "n_bins": [2, 10, 20],
    "method": ["linear"],
    "subsample": [200000],
}

parm_grid = ParameterGrid(params)

X = np.random.normal(10, 10, 10000)

for param in parm_grid:
    print(param)
    binner_1 = KBinsDiscretizer(
        n_bins=param["n_bins"],
        encode="ordinal",
        strategy="quantile",
        dtype=np.float64,
        random_state=789654,
    )
    binner_2 = QuantileBinner(
        n_bins=param["n_bins"],
        method=param["method"],
        subsample=param["subsample"],
        dtype=np.float16,
        random_state=789654,
    )

    binner_1.fit(X.reshape(-1, 1))
    binner_2.fit(X)

    transformed_1 = binner_1.transform(X.reshape(-1, 1)).flatten()
    transformed_2 = binner_2.transform(X)

    np.testing.assert_array_almost_equal(binner_1.bin_edges_[0], binner_2.bin_edges_)
    np.testing.assert_array_almost_equal(transformed_1, transformed_2)

{'method': 'linear', 'n_bins': 2, 'subsample': 200000}
{'method': 'linear', 'n_bins': 10, 'subsample': 200000}
{'method': 'linear', 'n_bins': 20, 'subsample': 200000}
