In [2]:
import sys
sys.path.append("..")
import Data as dt
import ChevalParesseux_lib as lib

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

# Sets the number of jobs to use for parallel processing
n_jobs = 5

In [3]:
# I. Load data
data = dt.load_data(ticker='SPY')
data['code'] = 'SPY'

# II. Making Samples (we use a temporal sampling here)
full_size = len(data)

training_data = data.iloc[0 : int(full_size * 0.7)].copy()
testing_data = data.loc[int(full_size * 0.7) + 1 : int(full_size * 0.9)].copy()
embargo_data = data.loc[int(full_size * 0.9) + 1 :].copy()

In [4]:
processed_train = training_data.copy()

# ======= I. Set up the labeller =======
labeller = lib.TripleBarrier_labeller(n_jobs=n_jobs)
labeller_params = {
    "upper_barrier": [1.5],
    "lower_barrier": [1],
    "vertical_barrier": [21],
    "vol_window": [21],
    "smoothing_method": ['ewma'],
    "window_smooth": [5],
    "lambda_smooth": [0.2],
}
labeller.set_params(**labeller_params)

# ======= II. Extract the labels =======
labels_df = labeller.extract(data=processed_train['close'])
processed_train['label'] = labels_df[labels_df.columns[0]]

### ***Features Extraction***

In [5]:
import numpy as np
import pandas as pd
from typing import Union, Self
from abc import ABC, abstractmethod
from joblib import Parallel, delayed



#! ==================================================================================== #
#! ================================= Base Model ======================================= #
class Feature(ABC):
    """
    Abstract base class for all features.
    
    This class defines the core structure and interface for feature extraction. It is meant to be subclassed
    by specific feature implementations. 
    Subclasses must implement the following abstract methods:
        - __init__: Initializes the feature with name, and optionally number of jobs.
        - set_params: Defines the parameter grid as a dictionary of lists.
        - process_data: Applies preprocessing to the data.
        - get_feature: Extracts the actual feature(s), returning a DataFrame.

    Main usage involves one core methods:
        - smooth_data: Applies optional smoothing to the input data before feature computation.
        - extract: Returns extracted features.
    """
    #?_____________________________ Initialization methods _______________________________ #
    @abstractmethod
    def __init__(
        self, 
        name: str, 
        n_jobs: int = 1
    ):
        """
        Constructor for the Feature class.
        
        Parameters:
            - name (str): The name identifier for the feature.
            - n_jobs (int): Number of parallel jobs to use during feature computation.
        """
        # ======= I. Initialize Class =======
        self.name = name
        self.n_jobs = n_jobs

        # ======= II. Initialize Auxilaries =======
        self.params = {}
    
    #?____________________________________________________________________________________ #
    @abstractmethod
    def set_params(
        self,
        **kwargs
    ) -> Self:
        """
        Sets the parameter grid for the feature extraction.

        Parameters:
            - **kwargs: Each parameter should be a list of possible values.
                    Example: feature.set_params(window=[5, 10], threshold=[3, 4])

        Returns:
            - Self: The instance of the class with the parameter grid set.
        """
        ...

    #?________________________________ Auxiliary methods _________________________________ #
    @abstractmethod
    def process_data(
        self,
        data: Union[tuple, pd.Series, pd.DataFrame],
        **kwargs
    ) -> Union[tuple, pd.DataFrame, pd.Series]:
        """
        Preprocesses the data before feature extraction.

        Parameters:
            - data (tuple | pd.Series | pd.DataFrame): The input data to be processed.
            - **kwargs: Additional parameters for the data processing.

        Returns:
            - tuple or pd.DataFrame or pd.Series: The processed data ready for feature extraction.
        """
        ...
    
    #?____________________________________________________________________________________ #
    @abstractmethod
    def get_feature(
        self,
        data: Union[tuple, pd.Series, pd.DataFrame],
        **kwargs
    ) -> pd.Series:
        """
        Core method for feature extraction.
        
        Parameters:
            - data (tuple | pd.Series | pd.DataFrame): The input data to extract the feature from
            - **kwargs: Additional parameters for the feature extraction.
        
        Returns:
            - pd.Series : The extracted feature as a pd.Series.
        """
        ...
       
    #?____________________________________________________________________________________ #
    def smooth_data(
        self, 
        data: pd.Series,
        smoothing_method: str = None, 
        window_smooth: int = None, 
        lambda_smooth: float = None
    ):
        """
        Applies optional smoothing to the input data before feature computation.

        Parameters:
            - data (pd.Series): The input data to be processed.
            - smoothing_method (str): Type of smoothing to apply. Options: "ewma", "average", or None.
            - window_smooth (int): Size of the smoothing window.
            - lambda_smooth (float): EWMA decay parameter in [0, 1].

        Returns:
            - smoothed_data (pd.Series): The smoothed series, or raw series if no smoothing is applied.
        """
        # ======= I. Check if any smoothing should be applied =======
        if smoothing_method is None:
            return data
        
        # ======= II. Compute the smoothed series =======
        elif smoothing_method == "ewma":
            smoothed_data = lib.ewma_smoothing(price_series=data, window=window_smooth, ind_lambda=lambda_smooth)
        elif smoothing_method == "average":
            smoothed_data = lib.average_smoothing(price_series=data, window=window_smooth)
            
        else:
            raise ValueError("Smoothing method not recognized")
        
        return smoothed_data
    
    #?_________________________________ Callable methods _________________________________ #
    def extract(
        self, 
        data: Union[tuple, pd.Series, pd.DataFrame]
    ) -> pd.DataFrame:
        """
        Main method to extract features.

        Parameters:
            - data (tuple | pd.Series | pd.DataFrame): The input data to extract the feature from
        
        Returns:
            - features_df (pd.DataFrame): The extracted features as a DataFrame.
        """
        # ======= I. Extract the Parameters Universe =======
        params_grid = lib.get_dict_universe(self.params)

        # ======= II. Extract the features for each Parameters =======
        features = Parallel(n_jobs=self.n_jobs)(delayed(self.get_feature)(data, **params) for params in params_grid)

        # ======= III. Create a DataFrame with the features =======
        features_df = pd.concat(features, axis=1)

        return features_df

#! ================================= Example ======================================= #
class Average_feature(Feature):
    """
    Moving Average Feature

    This class computes the normalized moving average of a time series, with optional pre-smoothing filters.
    It inherits from the Feature base class and implements methods to:
        - set_params : define parameter grids.
        - process_data : optionally performs preprocessing on the input series.
        - get_feature : compute the moving average feature over a rolling window
    """
    def __init__(
        self, 
        name: str = "average" , 
        n_jobs: int = 1
    ) -> None:
        """
        Initializes the average_feature object with input data, name, and parallel jobs.
        
        Parameters:
            - name (str): Name of the feature, used in column labeling.
            - n_jobs (int): Number of jobs to run in parallel for feature extraction.
        """
        super().__init__(
            name=name, 
            n_jobs=n_jobs,
        )
    
    #?____________________________________________________________________________________ #
    def set_params(
        self,
        window: list = [5, 10, 30, 60],
        smoothing_method: list = [None, "ewma", "average"],
        window_smooth: list = [5, 10],
        lambda_smooth: list = [0.1, 0.2, 0.5],
    ) -> Self:
        """
        Defines the parameter grid for feature extraction.

        Parameters:
            - window (list): Rolling window sizes for the moving average.
            - smoothing_method (list): Type of pre-smoothing to apply. Options: None, "ewma", "average".
            - window_smooth (list): Window size for smoothing methods.
            - lambda_smooth (list): Smoothing factor for EWMA, in [0, 1].
        """
        self.params = {
            "window": window,
            "smoothing_method": smoothing_method,
            "window_smooth": window_smooth,
            "lambda_smooth": lambda_smooth,
        }

        return self

    #?____________________________________________________________________________________ #
    def process_data(
        self, 
        data: pd.Series,
    ) -> pd.Series:
        """
        Applies preprocessing to the input data before feature extraction.
        
        Parameters:
            - data (pd.Series): The input data to be processed.
        
        Returns:
            - processed_data (pd.Series): The resetted index series.
        """
        processed_data = data.copy()
        processed_data.reset_index(drop=True, inplace=True)

        return processed_data
    
    #?____________________________________________________________________________________ #
    def get_feature(
        self,
        data: pd.Series,
        window: int,
        smoothing_method: str,
        window_smooth: int,
        lambda_smooth: float,
    ) -> pd.Series:
        """
        Computes the normalized rolling average of the processed series.

        Parameters: 
            - data (pd.Series): The input series to be processed.
            - window (int): Rolling window size for the moving average.
            - smoothing_method (str): Smoothing method used.
            - window_smooth (int): Smoothing window size.
            - lambda_smooth (float): Smoothing parameter for EWMA.

        Returns:
            - rolling_average (pd.Series): The resulting normalized moving average feature.
        """
        # ======= I. Smooth the Data & Preprocess =======
        smoothed_series = self.smooth_data(
            data=data, 
            smoothing_method=smoothing_method, 
            window_smooth=window_smooth, 
            lambda_smooth=lambda_smooth
        )
        
        processed_series = self.process_data(data=smoothed_series)

        # ======= II. Compute the moving average =======
        rolling_average = processed_series.rolling(window=window).apply(np.mean, raw=False)

        # ======= III. Convert to pd.Series and Center =======
        rolling_average = (pd.Series(rolling_average, index=processed_series.index) / (processed_series + 1e-8)) - 1
        
        # ======= IV. Change Name =======
        rolling_average.name = f"{self.name}_{window}_{smoothing_method}_{window_smooth}_{lambda_smooth}"
        rolling_average.index = data.index

        return rolling_average


In [6]:
general_params = {
    "window": [5, 10, 20, 50],
    "smoothing_method": [None, 'ewma'],
    "window_smooth": [5],
    "lambda_smooth": [0.2],
}

In [7]:
class Kama_feature(Feature):
    """
    """
    def __init__(
        self, 
        name: str = "kama" , 
        n_jobs: int = 1
    ) -> None:
        """
        """
        super().__init__(
            name=name,
            n_jobs=n_jobs,
        )
    
    #?____________________________________________________________________________________ #
    def set_params(
        self,
        window: list = [5, 10, 30, 60],
        smoothing_method: list = [None, "ewma", "average"],
        window_smooth: list = [5, 10],
        lambda_smooth: list = [0.1, 0.2, 0.5],
        fastest_window: list = [2, 5, 10],
        slowest_window: list = [20, 30],
    ) -> Self:
        """
        Sets the parameter grid for momentum feature extraction.

        Parameters:
            - window (list): Rolling window sizes for momentum calculation.
            - smoothing_method (list): Type of smoothing to apply before calculation.
            - window_smooth (list): Smoothing window sizes.
            - lambda_smooth (list): Decay factors for EWMA.
        """
        self.params = {
            "window": window,
            "smoothing_method": smoothing_method,
            "window_smooth": window_smooth,
            "lambda_smooth": lambda_smooth,
            "fastest_window": fastest_window,
            "slowest_window": slowest_window,
        }

        return self

    #?____________________________________________________________________________________ #
    def process_data(
        self, 
        data: pd.Series,
    ) -> pd.Series:
        """
        """
        processed_data = data.copy()
        processed_data.reset_index(drop=True, inplace=True)

        return processed_data
    
    #?____________________________________________________________________________________ #
    def get_feature(
        self,
        data: pd.Series,
        window: int,
        smoothing_method: str,
        window_smooth: int,
        lambda_smooth: float,
        fastest_window: int,
        slowest_window: int,
    ) -> pd.Series:
        """
        """
        # ======= I. Smooth the Data & Preprocess =======
        smoothed_series = self.smooth_data(
            data=data, 
            smoothing_method=smoothing_method, 
            window_smooth=window_smooth, 
            lambda_smooth=lambda_smooth
        )
        
        processed_series = self.process_data(data=smoothed_series)

        # ======= II. Compute the moving momentum =======
        rolling_kama = processed_series.rolling(window=window ).apply(get_kama, args=(fastest_window, slowest_window), raw=False)
        
        # ======= III. Convert to pd.Series and Center =======
        rolling_kama = (pd.Series(rolling_kama, index=processed_series.index) / (processed_series + 1e-8)) - 1
        
        # ======= IV. Change Name =======
        rolling_kama.name = f"{self.name}_f{fastest_window}_s{slowest_window}_{window}_{smoothing_method}_{window_smooth}_{lambda_smooth}"
        rolling_kama.index = data.index

        return rolling_kama

#*_____________________________________________________________________________________ #
def get_kama(
    series: pd.Series,
    fastest_window: int,
    slowest_window: int,
) -> float:
    """
    Efficiently compute the last value of KAMA using the last two values only.
    
    Parameters:
        series (pd.Series): Price series.
        fastest_window (int): Fastest EMA window.
        slowest_window (int): Slowest EMA window.
    
    Returns:
        float: The last KAMA value.
    """
    # ======= I. Inputs =======  
    slowest_window = min(slowest_window, len(series) - 2)
    
    fast_sc = 2 / (fastest_window + 1)
    slow_sc = 2 / (slowest_window + 1)
    
    # ======= II. Compute KAMA value for t-1 =======
    change_t0 = abs(series.iloc[-2] - series.iloc[-2 - slowest_window])
    vol_t0 = series.diff().abs().iloc[-2 - slowest_window + 1 : -1].sum()
    efficiency_ratio_t0 = change_t0 / (vol_t0 + 1e-8)

    smoothing_constant_t0 = (efficiency_ratio_t0 * (fast_sc - slow_sc) + slow_sc) ** 2
    kama_t0 = series.iloc[-3] + smoothing_constant_t0 * (series.iloc[-2] - series.iloc[-3])

    # ======= III. Compute KAMA value for t =======
    change_t1 = abs(series.iloc[-1] - series.iloc[-1 - slowest_window])
    vol_t1 = series.diff().abs().iloc[-1 - slowest_window + 1 :].sum()
    efficiency_ratio_t1 = change_t1 / (vol_t1 + 1e-8)

    smoothing_constant_t1 = (efficiency_ratio_t1 * (fast_sc - slow_sc) + slow_sc) ** 2
    kama_t1 = kama_t0 + smoothing_constant_t1 * (series.iloc[-1] - kama_t0)

    return kama_t1

#!_____________________________________________________________________________________ #
kama_feature = Kama_feature(n_jobs=n_jobs)
kama_params = {
    "window": [20, 50],
    "smoothing_method": [None, 'ewma'],
    "window_smooth": [5],
    "lambda_smooth": [0.2],
    "fastest_window": [2, 5, 10],
    "slowest_window": [20, 30],
}
kama_feature.set_params(**kama_params)

kama_feature_df = kama_feature.extract(data=processed_train['close'])

In [8]:
class StochasticRSI_feature(Feature):
    """
    """
    def __init__(
        self, 
        name: str = "stochastic_rsi", 
        n_jobs: int = 1
    ) -> None:
        """
        """
        super().__init__(
            name=name,
            n_jobs=n_jobs,
        )
    
    #?____________________________________________________________________________________ #
    def set_params(
        self,
        window: list = [5, 10, 30, 60],
        smoothing_method: list = [None, "ewma", "average"],
        window_smooth: list = [5, 10],
        lambda_smooth: list = [0.1, 0.2, 0.5],
    ) -> Self:
        """
        Sets the parameter grid for momentum feature extraction.

        Parameters:
            - window (list): Rolling window sizes for momentum calculation.
            - smoothing_method (list): Type of smoothing to apply before calculation.
            - window_smooth (list): Smoothing window sizes.
            - lambda_smooth (list): Decay factors for EWMA.
        """
        self.params = {
            "window": window,
            "smoothing_method": smoothing_method,
            "window_smooth": window_smooth,
            "lambda_smooth": lambda_smooth,
        }

        return self

    #?____________________________________________________________________________________ #
    def process_data(
        self, 
        data: pd.Series,
    ) -> pd.Series:
        """
        """
        processed_data = data.copy()
        processed_data.reset_index(drop=True, inplace=True)

        return processed_data
    
    #?____________________________________________________________________________________ #
    def get_feature(
        self,
        data: pd.Series,
        window: int,
        smoothing_method: str,
        window_smooth: int,
        lambda_smooth: float,
    ) -> pd.Series:
        """
        """
        # ======= I. Smooth the Data & Preprocess =======
        smoothed_series = self.smooth_data(
            data=data, 
            smoothing_method=smoothing_method, 
            window_smooth=window_smooth, 
            lambda_smooth=lambda_smooth
        )
        
        processed_series = self.process_data(data=smoothed_series)

        # ======= II. Compute the moving momentum =======
        rolling_stoch_rsi = processed_series.rolling(window=window ).apply(get_stochastic_rsi, raw=False)
        
        # ======= III. Convert to pd.Series and Center =======
        rolling_stoch_rsi = pd.Series(rolling_stoch_rsi, index=processed_series.index) 
        
        # ======= IV. Change Name =======
        rolling_stoch_rsi.name = f"{self.name}_{window}_{smoothing_method}_{window_smooth}_{lambda_smooth}"
        rolling_stoch_rsi.index = data.index

        return rolling_stoch_rsi

#*_____________________________________________________________________________________ #
def get_stochastic_rsi(
    series: pd.Series
) -> float:
    """
    Computes the Stochastic RSI for a given price series.
    
    Parameters:
        - series (pd.Series): Price series to compute Stochastic RSI on.
    
    Returns:
        - float: The Stochastic RSI value for the last point in the series.
    """
    # ========== 0. Define a function to compute the Stochastic RSI =======
    def get_relative_strength_index(
        series: pd.Series
    ) -> pd.Series:
        """
        Computes the Relative Strength Index (RSI) for a given price series.
        
        Parameters:
            - series (pd.Series): Price series to compute RSI on.
        
        Returns:
            - pd.Series: The RSI values for the input series.
        """
        # ======= I. Compute Gain and Loss =======
        delta = series.diff()
        gain = delta.where(delta > 0, 0.0)
        loss = -delta.where(delta < 0, 0.0)

        # ======= II. Compute Average Gain and Loss =======
        avg_gain = gain.rolling(window=len(series)).mean()
        avg_loss = loss.rolling(window=len(series)).mean()

        # ======= III. Compute Relative Strength and RSI =======
        rs = avg_gain / (avg_loss + 1e-8)
        rsi = 100 - (100 / (1 + rs))

        return rsi
    
    # ======= I. Compute the Relative Strength Index (RSI) =======
    rsi = get_relative_strength_index(series)

    # ======= II. Extract last few RSI values to get the range for StochRSI =======
    rsi_values = rsi.dropna()
    if len(rsi_values) == 0:
        return np.nan

    last_rsi = rsi_values.iloc[-1]
    min_rsi = rsi_values.min()
    max_rsi = rsi_values.max()

    # ======= III. Compute Stochastic RSI =======
    stoch_rsi = (last_rsi - min_rsi) / (max_rsi - min_rsi + 1e-8)
    
    return stoch_rsi

#!_____________________________________________________________________________________ #
stochrsi_feature = Kama_feature(n_jobs=n_jobs)
stochrsi_params = {
    "window": [5, 10, 20, 50],
    "smoothing_method": [None, 'ewma'],
    "window_smooth": [5],
    "lambda_smooth": [0.2],
}
stochrsi_feature.set_params(**stochrsi_params)

stochrsi_feature_df = stochrsi_feature.extract(data=processed_train['close'])

In [None]:
class EhlersFisher_feature(Feature):
    """
    """
    def __init__(
        self, 
        name: str = "ehlers_fisher", 
        n_jobs: int = 1
    ) -> None:
        """
        Initializes the Cointegration_feature object.

        Parameters:
            - name (str): Feature name.
            - n_jobs (int): Number of parallel jobs to use.
        """
        super().__init__(
            name=name,
            n_jobs=n_jobs,
        )

    #?____________________________________________________________________________________ #
    def set_params(
        self, 
        window: list = [5, 10, 30, 60], 
        smoothing_method: list = [None, "ewma", "average"], 
        window_smooth: list = [5, 10], 
        lambda_smooth: list = [0.1, 0.2, 0.5]
    ) -> Self:
        """
        Sets the parameter grid for cointegration feature extraction.

        Parameters:
            - window (list): Rolling window sizes for cointegration tests.
            - smoothing_method (list): Smoothing method to apply before testing.
            - window_smooth (list): Smoothing window sizes.
            - lambda_smooth (list): Decay factors for EWMA smoothing.
        """
        self.params = {
            "window": window,
            "smoothing_method": smoothing_method,
            "window_smooth": window_smooth,
            "lambda_smooth": lambda_smooth,
        }
        
        return self

    #?____________________________________________________________________________________ #
    def process_data(
        self, 
        data: Union[tuple, pd.DataFrame],
    ) -> tuple:
        """
        """
        # ======= I. Extract Series =======
        if isinstance(data, pd.DataFrame):
            nb_series = data.shape[1]
            if nb_series != 2:
                raise ValueError(f"DataFrame must have exactly 2 columns, but got {nb_series}.")
            
            series_high = data.iloc[:, 0]
            series_low = data.iloc[:, 1]
        
        elif isinstance(data, tuple) and len(data) == 2:
            series_high = data[0]
            series_low = data[1]
        else:
            raise ValueError("Data must be either a tuple of two series or a DataFrame with two columns.")
        
        # ======= II. Ensure Series have the same indexation =======
        series_df = pd.DataFrame({"series_high": series_high, "series_low": series_low})
        series_df = series_df.dropna()
        series_high = series_df["series_high"]
        series_low = series_df["series_low"]
        
        # ======= III. Return Processed Data =======    
        processed_data = (series_high, series_low)

        return processed_data

    #?____________________________________________________________________________________ #
    def get_feature(
        self, 
        data: Union[tuple, pd.DataFrame],
        window: int,
        smoothing_method: str,
        window_smooth: int,
        lambda_smooth: float,
    ) -> pd.DataFrame:
        """
        """
        # ======= I. Process Data =======
        processed_data = self.process_data(data=data)
        series_high = processed_data[0]
        series_low = processed_data[1]

        # ======= II. Apply Smoothing if Needed =======
        if smoothing_method is not None:
            series_high = self.smooth_data(data=series_high, smoothing_method=smoothing_method, window_smooth=window_smooth, lambda_smooth=lambda_smooth)
            series_low = self.smooth_data(data=series_low, smoothing_method=smoothing_method, window_smooth=window_smooth, lambda_smooth=lambda_smooth)

        # ======= II. Ensure the window is not too large =======
        num_obs = len(series_high) - window
        if num_obs <= 0:
            raise ValueError(f"Window size {window} is too large for the given data length {len(series_high)}.")
        
        # ======= III. Initialize Output Arrays =======
        elhers_fisher_values = np.full(num_obs, np.nan)

        # ======== IV. Iterate Over Observations ========
        for i in range(num_obs):
            # IV.1 Extract Time Windows
            series_high_window = series_high.iloc[i : i + window]
            series_low_window = series_low.iloc[i : i + window]

            # IV.2 Perform Elhers Fisher transform Test
            elhers_fisher = get_ehlers_fisher_transform(
                series_high=series_high_window, 
                series_low=series_low_window
            )

            # IV.3 Store Results
            elhers_fisher_values[i] = elhers_fisher

        # ======== V. Create the Final DataFrame ========
        index = series_high.index[window:]
        features_df = pd.DataFrame({
            f"{self.name}_{window}_{smoothing_method}_{window_smooth}_{lambda_smooth}": elhers_fisher_values,
        }, index=index)

        return features_df

#*_____________________________________________________________________________________ #
def get_ehlers_fisher_transform(
    series_high: pd.Series, 
    series_low: pd.Series, 
) -> float:
    """
    Computes the Ehlers Fisher Transform on a high-low price window.
    Returns only the last transformed value.
    """
    mid_series = (series_high + series_low) / 2

    min_val = mid_series.min()
    max_val = mid_series.max()
    if max_val - min_val == 0:
        return 0.0  # avoid division by zero

    # Normalize entire series to [-1, 1]
    normalized = 2 * ((mid_series - min_val) / (max_val - min_val)) - 1
    normalized = np.clip(normalized, -0.999, 0.999)

    # Apply Fisher Transform
    fisher = 0.5 * np.log((1 + normalized) / (1 - normalized))

    return fisher.iloc[-1]  # Only return the last value

#!_____________________________________________________________________________________ #
ehlers_fisher_feature = EhlersFisher_feature(n_jobs=n_jobs)
ehlers_fisher_params = {
    "window": [5, 10, 20, 50],
    "smoothing_method": [None, 'ewma'],
    "window_smooth": [5],
    "lambda_smooth": [0.2],
}
ehlers_fisher_feature.set_params(**ehlers_fisher_params)

ehlers_fisher_feature_df = ehlers_fisher_feature.extract(data=processed_train[['high', 'low']])