In [1]:
# Imports
import pandas as pd

# Construct the pipeline
# Note: unused, because the test to integrate the scaler object within a sklearn pipeline failed.
# from sklearn.base import TransformerMixin, BaseEstimator
# from sklearn.pipeline import Pipeline
# from sklearn.exceptions import NotFittedError

from re_forecast.data.get_data import get_rte_data
from re_forecast.preprocessing.handle_datetime import construct_time_consistent_df
from re_forecast.exploration.plot import plot_time_serie

# Preprocessing step 2 bis : Clean values

Goals of this notebook : 
- Develop a function to remove rogue values 
- Develop a function to normalize the datas and its inverse  
- Develop a function to stationarize the data and its inverse  
- Develop a function to remove volatility and its inverse  
- Develop a function to remove the average seasonality and its inverse  
- Pack all these functions into a CleanValues object  

/!\ Option : Pack all the functions into a sklearn pipeline

Don't forget to find how to deal with nan values

## 0/ Load data

We load the dataset corresponding to the Fecamp generation unit :

In [2]:
# Set the parameters
ressource_nb = 2
start_date = "2022-06-01 00:00:00" # Note: don't modify the start & end date for following requests
end_date = "2024-03-01 00:00:00"
eic_code = "17W0000014455708"
production_type = None
production_subtype = None

# Download generation data: don't forget to set the api_delay_bypass to true in the params in this case,
# and to reset it to false if you know that you will download fresh data from the api
gen_data = get_rte_data(ressource_nb = ressource_nb,
                        start_date = start_date,
                        end_date = end_date,
                        eic_code = eic_code,
                        production_type = production_type,
                        production_subtype = production_subtype)

# Constuct the time consistent df
dt_columns = ["start_date", "end_date"]
gen_data_dt = construct_time_consistent_df(gen_data, dt_columns)

## 0 bis/ Construct a function to format the dataframe

We want to construct a function that extract a specific datetime column from a df, set it as an index and drop all other columns appart the value column

In [3]:
# droped_columns = gen_data_dt.columns[~gen_data_dt.columns.isin(["value"])]
# gen_data_dt.drop(droped_columns, axis = 1)

In [4]:
def peel_time_serie_df(gen_df: pd.DataFrame,
                       keeped_columns = {"dt_column": "start_date_complete", "value_column": "value"}
                       ) -> pd.DataFrame:
    """Drop all columns appart the choosen value column and the choosen
    datetime column"""

    # /!\ Copy the gen df to avoid setting with copy warning
    gen_df = gen_df.copy(deep = True)

    # Extract the dt column and the value column names
    dt_column = keeped_columns["dt_column"]
    value_column = keeped_columns["value_column"]

    # Set the dt column as index
    gen_df.set_index(dt_column, drop = True, inplace = True)

    # Drop all others columns appart the choosen value column
    droped_columns = gen_df.columns[~gen_df.columns.isin([value_column])]
    gen_df.drop(droped_columns, axis = 1, inplace = True)

    return gen_df

In [5]:
# Test the function
gen_data_dt_peeled = peel_time_serie_df(gen_data_dt)

## 0 ter/ Construct a function to cap max and min values

In [6]:
# gen_data_dt_copy = gen_data_dt.copy(deep = True)
# min_values = gen_data_dt.loc[gen_data_dt["value"] < 0, "value"].values
# print(gen_data_dt_copy.replace({value: 0 for value in min_values}).min())

In [7]:
def set_min_max_limits_time_serie(gen_df: pd.DataFrame,
                                  value_col: str,
                                  min_value: float | None = None,
                                  max_value: float | None = None
                                  ) -> pd.DataFrame:
    """Limits min and max values of a time serie df.
    Arguments:
    - gen_df: A consistent time serie df with one or more complete datetime columns
    and one value column
    - value_col: the name of the value column
    - min_value: minimum limit value
    - max_value: maximum limit value
    """

    # Copy the gen_df to avoid the setting with copy warning
    gen_df_copy = gen_df.copy(deep = True)

    # Replace values bellow min_value by min_value
    if isinstance(min_value, (int, float)):
        min_values = gen_df_copy.loc[gen_df_copy[value_col] < min_value, value_col].values
        gen_df_copy.replace({value: min_value for value in min_values}, inplace = True)

    # Replace values above max_value by max_value
    if isinstance(max_value, (int, float)):
        max_values = gen_df_copy.loc[gen_df_copy[value_col] > max_value, value_col].values
        gen_df_copy.replace({value: max_value for value in max_values}, inplace = True)

    return gen_df_copy

In [8]:
# Test the function
set_min_max_limits_time_serie(gen_data_dt,
                              "value",
                              min_value = 0).min()

  min_value = 0).min()


start_date_complete    2023-11-02 00:00:00
end_date_complete      2023-11-02 01:00:00
start_date             2023-11-02 00:00:00
end_date               2023-11-02 01:00:00
updated_date           2023-11-29 15:23:49
value                                  0.0
dtype: object

## Bonus : Create a parent class for the time series scalers classes bellow

In [9]:
class BaseTsScaler:
    """The BaseTsScaler is only used to avoid implementing
    a fit_transform method each time for each Ts scalers
    objects."""

    def __init__(self) -> None:

        # Create the error message
        self.error_message = "The fit method must be called before the transform and the inverse_transform method"

    def fit(self, *args) -> None:
        return

    def transform(self, *args) -> None:
        return

    def fit_transform(self, *args) -> any:
        """The fit_transform method to be inherited
        by the child objects."""

        # Fit
        self.fit(*args)

        # Transform
        return self.transform(*args)

## 1/ Normalization function

In [10]:
# Define a custom error NotFittedError
class NotFittedError(Exception):
    """This exception is raised in a scaler object, when a transform
    or an inverse_transform method is called before the fit method"""

In [11]:
class NormalScalerTs(BaseTsScaler):

    def __init__(self) -> None:
        """Initialize self.mean, self.std and the
        error message used in case of error handling."""

        # Bring back the arguments defined inside the inherited init method
        super().__init__()

        # Create null std and mean
        self.mean, self.std = (0, 0)

    def fit(self, gen_df: pd.DataFrame) -> None:
        """Extract the mean and the std of a time serie df
        Arguments:
        - gen_df: df with one column value and a datetime index"""

        # Extract the mean and the std of the df
        self.mean, self.std = gen_df.mean(), gen_df.std()

    def transform(self, gen_df: pd.DataFrame) -> pd.DataFrame:
        """Normalize the time serie.
        Arguments:
        - gen_df: df with one column value and a datetime index"""

        # Raise the not fitted error if the transform method is called before fit
        if isinstance(self.mean, int):
            raise NotFittedError(f"{self.error_message}")

        # Normalize the df
        return (gen_df - self.mean) / self.std

    def inverse_transform(self, gen_df_normalized: pd.DataFrame) -> pd.DataFrame:
        """Inverse normalize the time serie.
        Arguments:
        - gen_df_normalized: normalized df with one column value
        and a datetime index"""

        # Raise the not fitted error if the inverse_transform method is called before fit
        if isinstance(self.mean, int):
            raise NotFittedError(f"{self.error_message}")

        # Inverse a normalize df
        return gen_df_normalized * self.std + self.mean

In [12]:
# Instanciate the NormalScaler
norm_scaler = NormalScalerTs()

# Fit and normalize
gen_data_dt_norm = norm_scaler.fit_transform(gen_data_dt_peeled)

# Plot
plot_time_serie(gen_data_dt_norm,
                "",
                "value",
                dt_index = True)

In [13]:
# Test the inverse transformation
gen_data_dt_inv_norm = norm_scaler.inverse_transform(gen_data_dt_norm)

# Plot
plot_time_serie(gen_data_dt_inv_norm,
                "",
                "value",
                dt_index = True)

## 2/ Stationarize

In [14]:
# gen_data_dt_peeled.loc[gen_data_dt_peeled.notnull().idxmax()].values[0][0]
# gen_data_dt_peeled.notnull().idxmax()

In [15]:
# gen_df_diff_1 = gen_data_dt_peeled.interpolate().diff()
# initial_value = gen_data_dt_peeled.loc[gen_data_dt_peeled.notnull().idxmax()].values[0][0]
# gen_df_diff_1.iloc[0] = initial_value
# display(gen_df_diff_1.cumsum())
# display(gen_data_dt_peeled)

In [16]:
# def stationarize_time_serie(gen_df: pd.DataFrame,
#                             order: int,
#                             # inverse: bool
#                             ) -> pd.DataFrame:
#     """Stationarize the time serie by the order given.
#     Arguments:
#     - gen_df: df with one column value and a datetime index
#     - inverse: wether to return the inverse function"""

#     # Copy the original df
#     gen_df_diff = gen_df.copy(deep = True)

#     # Case order == 0, just return the copy
#     if not order:
#         return gen_df_diff

#     # Iterate over the order of the derivative
#     for _ in range(1, order + 1):
#         # Differenciate sequentially the gen_df_diff
#         gen_df_diff = gen_df_diff.diff()

#     return gen_df_diff

In [17]:
# # Test the function
# gen_data_dt_st = stationarize_time_serie(gen_data_dt_norm, 0)

# # Plot
# plot_time_serie(gen_data_dt_st,
#                 "",
#                 "value",
#                 dt_index = True)

In [18]:
class NotTransformedError(Exception):
    """This exception is raised in a scaler object, when an
    inverse_transform method is called before the transform method"""

In [19]:
class StationarizerTs(BaseTsScaler):

    def __init__(self, order: int = 0) -> None:
        """Set the initial_values list and the error message for
        the error handling in the inverse transform method"""

        # Bring back the arguments defined inside the inherited init method
        super().__init__()

        # Init the order of diferenciation
        self.order = order

        # Initialize the list of the initial values
        # This list store the initial values and their index
        # The initial values are used as constant of integration for the
        # inverse transform method
        self.initial_values = list()

    def fit(self, gen_df: pd.DataFrame) -> None:
        """Define the fit method in order to the inherited
        fit_transform method to work properly"""

        return

    def transform(self, gen_df: pd.DataFrame) -> pd.DataFrame:
        """Stationarize the time serie by the order given.
        Arguments:
        - gen_df: df with one column value and a datetime index"""

        # Copy the original df
        gen_df_diff = gen_df.copy(deep = True)

        # Case order == 0, just return the copy
        if not self.order:
            return gen_df_diff

        # Iterate over the order of the derivative
        for _ in range(self.order):

            # 1/ Extract the id of the initial value and the initial value and store them into initial values list #

            # Row number of the first non null value (the initial value)
            id_initial_value = gen_df_diff.notnull().idxmax()

            # The initial value itself
            initial_value = gen_df_diff.loc[id_initial_value].values[0][0]

            # We pack the id and its initial value into a tuple
            initial_value_tuple = (id_initial_value, initial_value)

            # Add the first non null value of the time serie to the initial values list
            self.initial_values.append(initial_value_tuple)

            # 2/ Differenciate sequentially the gen_df_diff #
            gen_df_diff = gen_df_diff.diff()

        return gen_df_diff

    def inverse_transform(self, gen_df_diff: pd.DataFrame) -> pd.DataFrame:
        """Un-stationarize the time serie whatever its order.
        Note: The transform method must be called before the inverse_transform method.
        Arguments:
        - gen_df: df with one column value and a datetime index"""

        # First check if the initial values list is empty and if it is, raise an exception
        if not self.initial_values:
            raise NotTransformedError(f"{self.error_message}")

        # Copy the original df
        gen_df_undiff = gen_df_diff.copy(deep = True)

        # Iterate over the inverted initial values list
        for initial_value_tuple in self.initial_values[::-1]:
            # Unpack the row number and its initial value
            id_initial_value, initial_value = initial_value_tuple

            # Add the initial value to the gen_df_undiff
            # The initial value act as the "constant of integration"
            gen_df_undiff.loc[id_initial_value] = initial_value

            # Cumsum the gen_df_undiff.
            # The cumsum act as an integral
            gen_df_undiff = gen_df_undiff.cumsum()

        return gen_df_undiff


In [20]:
# Test the transform method

# Instanciate the Stationarizer
stationarizer = StationarizerTs(order = 2)

# Call the transform method, order = 2
gen_df_diff_2 = stationarizer.transform(gen_data_dt_peeled.interpolate())

# Plot the result
plot_time_serie(gen_df_diff_2,
                "",
                "value",
                dt_index = True)

In [21]:
# Test the inverse transform method

# Call the inverse transform method
gen_df_undiff_2 = stationarizer.inverse_transform(gen_df_diff_2)

# Plot the result
plot_time_serie(gen_df_undiff_2,
                "",
                "value",
                dt_index = True)

## 3/ Remove volatility

The volatility is the variation of the standard deviation over time. To remove the volatility, we have to compute the standard deviation of each day or month, and remove this from the values.

In [22]:
class VolatilityRemoverTs(BaseTsScaler):

    def __init__(self, window_size: int = 1) -> None:
        """Initialize self.volatility and the
        error message used in case of error handling."""

        # Bring back the arguments defined inside the inherited init method
        super().__init__()

        # Init the window size
        self.window_size = window_size

        # Set the volatility to 0, for the error handling
        self.volatility = 0

        # Set the error message
        self.error_message = "The fit method must be called before the transform and the inverse_transform method"

    def fit(self, gen_df: pd.DataFrame) -> None:
        """Extract the seasonal volatility of a time serie df
        Arguments:
        - gen_df: df with one column value and a datetime index
        - window_size: size of the window for the rolling standard deviation"""

        # Compute the volatility
        self.volatility = gen_df.rolling(self.window_size).std().bfill()

    def transform(self, gen_df: pd.DataFrame) -> pd.DataFrame:
        """Remove the volatility of the time serie.
        Arguments:
        - gen_df: df with one column value and a datetime index"""

        # Raise the not fitted error if the inverse_transform method is called before fit
        if isinstance(self.volatility, int):
            raise NotFittedError(f"{self.error_message}")

        # Remove the volatility from the original dataframe
        return gen_df / self.volatility

    def inverse_transform(self, gen_df: pd.DataFrame) -> pd.DataFrame:
        """Re-add the volatility to the time serie.
        Arguments:
        - gen_df: df without volatility with one column value
        and a datetime index"""

        # Raise the not fitted error if the inverse_transform method is called before fit
        if isinstance(self.volatility, int):
            raise NotFittedError(f"{self.error_message}")

        # Multiply by the volatility
        return gen_df * self.volatility

## 4/ Remove the average seasonality

Just copy and modify the VolatilityRemover object

In [23]:
class AverageSeasonalityRemoverTs(BaseTsScaler):

    def __init__(self, window_size: int = 1) -> None:
        """Initialize self.avg_seasonality and the
        error message used in case of error handling."""

        # Bring back the arguments defined inside the inherited init method
        super().__init__()

        # Init the window_size
        self.window_size = window_size

        # Set the avg_seasonality to 0, for the error handling
        self.avg_seasonality = 0

        # Set the error message
        self.error_message = "The fit method must be called before the transform and the inverse_transform method"

    def fit(self, gen_df: pd.DataFrame) -> None:
        """Extract the seasonal avg_seasonality of a time serie df
        Arguments:
        - gen_df: df with one column value and a datetime index
        - window_size: size of the window for the rolling standard deviation"""

        # Compute the avg_seasonality
        self.avg_seasonality = gen_df.rolling(self.window_size).mean().bfill()

    def transform(self, gen_df: pd.DataFrame) -> pd.DataFrame:
        """Remove the avg_seasonality of the time serie.
        Arguments:
        - gen_df: df with one column value and a datetime index"""

        # Raise the not fitted error if the inverse_transform method is called before fit
        if isinstance(self.avg_seasonality, int):
            raise NotFittedError(f"{self.error_message}")

        # Remove the avg_seasonality from the original dataframe
        return gen_df - self.avg_seasonality

    def inverse_transform(self, gen_df: pd.DataFrame) -> pd.DataFrame:
        """Re-add the avg_seasonality to the time serie.
        Arguments:
        - gen_df: df without avg_seasonality with one column value
        and a datetime index"""

        # Raise the not fitted error if the inverse_transform method is called before fit
        if isinstance(self.avg_seasonality, int):
            raise NotFittedError(f"{self.error_message}")

        # Multiply by the avg_seasonality
        return gen_df + self.avg_seasonality

In [24]:
# Instanciate the AverageSeasonalityRemover
avg_seasonality_rmv = AverageSeasonalityRemoverTs(window_size = 24)

# Fit and normalize
gen_data_dt_seasonality_rmv = avg_seasonality_rmv.fit_transform(gen_data_dt_peeled)

# Plot
plot_time_serie(gen_data_dt_seasonality_rmv,
                "",
                "value",
                dt_index = True)

In [25]:
# Test the inverse transformation
plot_time_serie(avg_seasonality_rmv.inverse_transform(gen_data_dt_seasonality_rmv),
                "",
                "value",
                dt_index = True)

## Option : Construct a pipeline including all the transformation functions

In priority, we want to construct a pipeline using scikit-learn Pipeline object. It is usefull to construct a pipeline with sklearn to later add missing values imputation techniques to this pipeline.

In [26]:
# Construct the pipeline
# pipeline_test = Pipeline([("normal_scaler", NormalScaler()),
#                           ("stationarizer", Stationarizer()),
#                           ("volatility_remover", VolatilityRemover()),
#                           ("average_seasonality_remover", AverageSeasonalityRemover())])

# Test the pipeline
# pipeline_test.fit_transform(gen_data_dt_peeled)

As expected, the custom scaling object adapted to time series are not well suited for an usage with scikit-learn pipeline. The solution can be to use the scaling object as raw inside a script, or developping a custom pipeline object.

In [27]:
# Create the custom pipeline
class PipelineTs(BaseTsScaler):

    def __init__(self, scalers: list[tuple]) -> None:
        """Set the scalers list of tuples and the scalers_list
        to store fitted scaler instances."""

        # Bring back the arguments defined inside the inherited init method
        super().__init__()

        # List of tuples containing the name of the pipeline step, the scaler
        # object and the parameters of this object in form of a dict
        self.scalers = scalers

        # List where to store fitted scaler instances
        self.scalers_list = list()

    def fit(self, gen_df: pd.DataFrame) -> None:
        """Apply the fit methods from the scaler objects
        iteratively to the time serie df.
        Arguments:
        - gen_df: gen_df: df with one column value and a datetime index"""

        # Iterate over the scaler tuples
        for scaler in self.scalers:
            # Unpack the step name, the scaler object and the params
            # from the scaler tuple
            step_name, scaler_object, kwargs = scaler

            # Instanciate the curent scaler object
            scaler_instance = scaler_object(**kwargs)

            # Fit the scaler instance
            scaler_instance.fit(gen_df)

            # Add the fitted scaler instance to the scalers_list
            self.scalers_list.append(scaler_instance)

    def transform(self, gen_df: pd.DataFrame) -> pd.DataFrame:
        """Apply the transform methods from the scaler objects
        iteratively to the time serie df.
        Arguments:
        - gen_df: df with one column value and a datetime index"""

        # Check if the pipeline was fitted
        if not len(self.scalers_list):
            raise NotFittedError(self.error_message)

        # Copy the gen_df
        gen_df_transformed = gen_df.copy(deep = True)

        # Iterate over the scalers_list
        for scaler_instance in self.scalers_list:
            # For each scaler instance, call the transform method
            gen_df_transformed = scaler_instance.transform(gen_df_transformed)

        return gen_df_transformed

    def inverse_transform(self, gen_df: pd.DataFrame) -> pd.DataFrame:
        """Apply the inverse_transform methods from the scaler objects
        iteratively to the time serie df.
        Arguments:
        - gen_df: df with one column value and a datetime index"""

        # Check if the pipeline was fitted
        if not len(self.scalers_list):
            raise NotFittedError(self.error_message)

        # Copy the gen_df
        gen_df_inverse_transformed = gen_df.copy(deep = True)

        # Iterate over the inverted scalers_list to apply back the
        # inverse transformation corresponding to the right transformation
        for scaler_instance in self.scalers_list[::-1]:
            # Call the inverse_transform method for each scaler instance
            gen_df_inverse_transformed = scaler_instance.inverse_transform(gen_df_inverse_transformed)

        return gen_df_inverse_transformed

In [28]:
# Construct the list of scalers
scalers = [("normalization", NormalScalerTs, {}),
           ("stationarize", StationarizerTs, {"order": 1}),
           ("remove_volatility", VolatilityRemoverTs, {"window_size": 24}),
           ("remove_average_volatility", AverageSeasonalityRemoverTs, {"window_size": 24})
           ]

# Test the pipeline
test_pipeline = PipelineTs(scalers = scalers)

test_pipeline.fit(gen_data_dt_peeled.interpolate())

# We interpolate the time serie df in order to
# avoid problems with the stationarization step
gen_data_dt_transformed = test_pipeline.transform(gen_data_dt_peeled.interpolate())

gen_data_dt_inverse_transformed = test_pipeline.inverse_transform(gen_data_dt_transformed)

# Display
plot_time_serie(gen_data_dt_transformed,
                "",
                "value",
                dt_index = True)

plot_time_serie(gen_data_dt_inverse_transformed,
                "",
                "value",
                dt_index = True)

In [29]:
# Construct the list of scalers
scalers = [("normalization", NormalScalerTs, {}),
           ("stationarize", StationarizerTs, {"order": 0}),
           ("remove_volatility", VolatilityRemoverTs, {"window_size": 24}),
           ("remove_average_volatility", AverageSeasonalityRemoverTs, {"window_size": 24})
           ]

# Test the pipeline
test_pipeline = PipelineTs(scalers = scalers)

# Test the fit_transform method
gen_data_dt_fit_transformed = test_pipeline.fit_transform(gen_data_dt_peeled.interpolate())

# Display
plot_time_serie(gen_data_dt_fit_transformed,
                "",
                "value",
                dt_index = True)

The pipeline work properly, but we have to be careful with the Stationarizer object and its rank in the pipeline queue, because it can cause some computation problems.