In [1]:
import pandas as pd
import numpy as np

In [2]:
class FeatureEngineer:
    """
    A class for engineering technical indicators used in quantitative finance.
    
    Attributes:
    ----------
    data : pd.DataFrame
        The input dataset containing financial time-series data. Must include a 'close' column.
    """
    
    def __init__(self, data: pd.DataFrame):
        """
        Initializes the FeatureEngineer with financial data.

        Parameters:
        ----------
        data : pd.DataFrame
            A DataFrame containing historical financial data, including a 'close' column.
        """
        if 'close' not in data.columns:
            raise ValueError("Input data must contain a 'close' column.")
        self.data = data

    def generate_technical_indicators(self) -> pd.DataFrame:
        """
        Generate key technical indicators and append them to the dataset.

        Returns:
        -------
        pd.DataFrame
            The dataset with additional technical indicator columns.
        """
        self.data['RSI'] = self.calculate_rsi()
        self.data['MACD'] = self.calculate_macd()
        upper_band, lower_band = self.calculate_bollinger_bands()
        self.data['Bollinger_Upper'] = upper_band
        self.data['Bollinger_Lower'] = lower_band
        self.data['Momentum'] = self.calculate_momentum()
        
        # Handle missing values in technical indicators
        self.data = self.handle_technical_indicator_nans()
        
        return self.data

    def calculate_rsi(self, periods: int = 10) -> pd.Series:
        """
        Calculate the Relative Strength Index (RSI).

        Parameters:
        ----------
        periods : int, optional
            The lookback period for calculating RSI (default is 14).

        Returns:
        -------
        pd.Series
            A Series containing the RSI values.
        """
        delta = self.data['close'].diff()
        gain = delta.where(delta > 0, 0).rolling(window=periods).mean()
        loss = -delta.where(delta < 0, 0).rolling(window=periods).mean()
        relative_strength = gain / loss
        rsi = 100 - (100 / (1 + relative_strength))
        return rsi

    def calculate_macd(self, slow: int = 52, fast: int = 24, signal: int = 10) -> pd.Series:
        """
        Calculate the Moving Average Convergence Divergence (MACD).

        Parameters:
        ----------
        slow : int, optional
            The period for the slow EMA (default is 26).
        fast : int, optional
            The period for the fast EMA (default is 12).
        signal : int, optional
            The period for the signal line (default is 9).

        Returns:
        -------
        pd.Series
            A Series containing the MACD values.
        """
        fast_ema = self.data['close'].ewm(span=fast, adjust=False).mean()
        slow_ema = self.data['close'].ewm(span=slow, adjust=False).mean()
        macd = fast_ema - slow_ema
        return macd

    def calculate_bollinger_bands(self, window: int = 40, num_std: int = 2) -> tuple:
        """
        Calculate Bollinger Bands (upper and lower).

        Parameters:
        ----------
        window : int, optional
            The rolling window size for calculating the moving average (default is 20).
        num_std : int, optional
            The number of standard deviations for the bands (default is 2).

        Returns:
        -------
        tuple
            A tuple containing the upper and lower Bollinger Bands.
        """
        rolling_mean = self.data['close'].rolling(window=window).mean()
        rolling_std = self.data['close'].rolling(window=window).std()
        upper_band = rolling_mean + (num_std * rolling_std)
        lower_band = rolling_mean - (num_std * rolling_std)
        return upper_band, lower_band

    def calculate_momentum(self, periods: int = 10) -> pd.Series:
        """
        Calculate the Momentum indicator.

        Parameters:
        ----------
        periods : int, optional
            The lookback period for calculating momentum (default is 10).

        Returns:
        -------
        pd.Series
            A Series containing the Momentum values.
        """
        return self.data['close'].diff(periods=periods)

    def handle_technical_indicator_nans(self) -> pd.DataFrame:
        """
        Handle NaN values in the technical indicators.

        Strategies:
        ----------
        - RSI: Fill missing values with forward fill; pad remaining NaNs with 50 (neutral sentiment).
        - MACD: Use linear interpolation to maintain trend continuity.
        - Bollinger Bands: Use backward fill for initial missing values.
        - Momentum: Replace NaNs with 0 (neutral price movement).

        Returns:
        -------
        pd.DataFrame
            The dataset with missing values handled.
        """
        # Handle RSI NaNs
        self.data['RSI'] = self.data['RSI'].ffill().fillna(50)

        # Handle MACD NaNs
        self.data['MACD'] = self.data['MACD'].interpolate(method='linear')

        # Handle Bollinger Bands NaNs
        self.data['Bollinger_Upper'] = self.data['Bollinger_Upper'].bfill()
        self.data['Bollinger_Lower'] = self.data['Bollinger_Lower'].bfill()

        # Handle Momentum NaNs
        self.data['Momentum'] = self.data['Momentum'].fillna(0)

        return self.data

In [3]:
class CryptoFeatureEngineeringPipeline:
    """
    A pipeline for feature engineering cryptocurrency data across multiple exchanges.

    Attributes:
    ----------
    historical_data : pd.DataFrame
        The input historical data containing OHLCV and exchange information.
    exchanges : list
        List of exchanges to process.
    """

    def __init__(self, historical_data_path, exchanges=None):
        """
        Initializes the pipeline with historical data and exchanges.

        Parameters:
        ----------
        historical_data_path : str
            Path to the CSV file containing historical data.
        exchanges : list, optional
            List of exchange names to filter and process (default includes popular exchanges).
        """
        self.historical_data = pd.read_csv(historical_data_path, parse_dates=['timestamp'])
        if exchanges is None:
            exchanges = [
                'binanceus', 'coinbase', 'kraken', 'bitfinex', 'kucoin', 'okx', 
                'gateio', 'bitstamp', 'poloniex', 'huobi', 'hitbtc', 
                'bitget', 'exmo', 'phemex', 'upbit'
            ]
        self.exchanges = exchanges

    def process_data(self):
        """
        Process historical data by filtering for each exchange, engineering features,
        and combining the results.

        Returns:
        -------
        pd.DataFrame
            A single DataFrame containing feature-engineered data for all exchanges.
        """
        processed_data = []

        for exchange in self.exchanges:
            # Filter data for the current exchange
            exchange_data = self.historical_data[self.historical_data["exchange"] == exchange]
            exchange_data = exchange_data.sort_values('timestamp')

            # Apply feature engineering
            feature_engineer = FeatureEngineer(exchange_data)
            engineered_data = feature_engineer.generate_technical_indicators()
            processed_data.append(engineered_data)

        # Combine feature-engineered data from all exchanges
        return pd.concat(processed_data, ignore_index=True)

In [4]:
pipeline = CryptoFeatureEngineeringPipeline('btc_data.csv')
featured_data = pipeline.process_data()

In [5]:
featured_data.tail()

Unnamed: 0,timestamp,open,high,low,close,volume,exchange,RSI,MACD,Bollinger_Upper,Bollinger_Lower,Momentum
23503,2024-11-30 05:00:00,96759.95,96759.95,96759.95,96759.95,0.000732,upbit,67.71293,-98.164213,96956.016755,96034.259245,594.12
23504,2024-11-30 05:05:00,96759.95,96759.96,96759.95,96759.95,0.00242,upbit,66.090529,-84.917704,96963.352293,96032.012207,513.88
23505,2024-11-30 05:15:00,96759.96,96759.97,96759.95,96759.95,0.002099,upbit,54.580684,-72.934439,96970.583763,96029.870237,109.22
23506,2024-11-30 05:20:00,96759.95,96759.95,96759.95,96759.95,0.006031,upbit,57.006439,-62.105679,96977.713058,96027.830442,159.95
23507,2024-11-30 05:30:00,96905.29,96929.55,96800.0,96800.0,0.004783,upbit,58.463817,-50.638994,96991.050238,96024.493262,200.0


In [6]:
featured_data['timestamp'] = pd.to_datetime(featured_data['timestamp'])
featured_data.to_csv('featured_btc_data.csv',  index=False, date_format='%Y-%m-%d %H:%M:%S')