In [13]:
import yfinance as yf
import numpy as np
import pandas as pd
from scipy.signal import argrelextrema
import talib
from abc import ABC, abstractmethod
from sklearn.preprocessing import StandardScaler, MinMaxScaler

In [14]:
# Step 1: Define the Product interface
class Indicator(ABC):
    @abstractmethod
    def compute(self, data=None, *args, **kwargs):
        pass

# todo: there's some wrong in calculate trend ways
# Step 2: Implement Concrete Products


class TrendIndicator(Indicator):
    def compute(self, data, *args, **kwargs):
        method = kwargs.get('method', 'MA')
        ma_days = kwargs.get('ma_days', 20)
        oder_days = kwargs.get('oder_days', 20)
        trend_days = kwargs.get('trend_days', 5)

        if method == 'MA':
            return self.calculate_trend_MA(data, ma_days=ma_days, trend_days=trend_days)
        elif method == 'LocalExtrema':
            return self.calculate_trend_LocalExtrema(data, oder_days=oder_days)
        else:
            raise ValueError(f"Invalid trend calculation method: {method}")

    def calculate_trend_MA(self, data, ma_days=20, trend_days=5):
        data['MA'] = data['Close'].rolling(window=ma_days).mean()
        data['Trend'] = np.nan
        n = len(data)

        for i in range(n - trend_days + 1):
            if all(data['MA'].iloc[i + j] < data['MA'].iloc[i + j + 1] for j in range(trend_days - 1)):
                data['Trend'].iloc[i:i + trend_days] = 0
            elif all(data['MA'].iloc[i + j] > data['MA'].iloc[i + j + 1] for j in range(trend_days - 1)):
                data['Trend'].iloc[i:i + trend_days] = 1
        data['Trend'].fillna(method='ffill', inplace=True)
        return data.drop(columns=['MA'])

    def calculate_trend_LocalExtrema(self, data, oder_days=20):
        local_max_indices = argrelextrema(
            data['Close'].values, np.greater_equal, order=oder_days)[0]
        local_min_indices = argrelextrema(
            data['Close'].values, np.less_equal, order=oder_days)[0]
        data['Local Max'] = data.iloc[local_max_indices]['Close']
        data['Local Min'] = data.iloc[local_min_indices]['Close']
        data['Trend'] = np.nan
        prev_idx = None
        prev_trend = None
        prev_type = None

        for idx in sorted(np.concatenate([local_max_indices, local_min_indices])):
            if idx in local_max_indices:
                current_type = "max"
            else:
                current_type = "min"

            if prev_trend is None:
                if current_type == "max":
                    prev_trend = 1
                else:
                    prev_trend = 0
            else:
                if prev_type == "max" and current_type == "min":
                    data.loc[prev_idx:idx, 'Trend'] = 1
                    prev_trend = 1
                elif prev_type == "min" and current_type == "max":
                    data.loc[prev_idx:idx, 'Trend'] = 0
                    prev_trend = 0
                else:
                    if current_type == "max":
                        data.loc[prev_idx:idx, 'Trend'] = 0
                        prev_trend = 0
                    else:
                        data.loc[prev_idx:idx, 'Trend'] = 1
                        prev_trend = 1

            prev_idx = idx
            prev_type = current_type
        data['Trend'].fillna(method='ffill', inplace=True)
        return data.drop(columns=['Local Max', 'Local Min'])


class MACDIndicator(Indicator):
    def compute(self, data, *args, **kwargs):
        fastperiod = kwargs.get('fastperiod', 5)
        slowperiod = kwargs.get('slowperiod', 10)
        signalperiod = kwargs.get('signalperiod', 9)
        data['MACD'], _, _ = talib.MACD(
            data['Close'], fastperiod=fastperiod, slowperiod=slowperiod, signalperiod=signalperiod)
        return data


class ROCIndicator(Indicator):
    def compute(self, data, *args, **kwargs):
        trend_days = kwargs.get('trend_days', 5)
        data['ROC'] = talib.ROC(data['Close'], timeperiod=trend_days)
        return data

# Step 2: Implement Concrete Products (Continued)


class StochasticOscillatorIndicator(Indicator):
    def compute(self, data, *args, **kwargs):
        trend_days = kwargs.get('trend_days', 5)
        data['StoK'], data['StoD'] = talib.STOCH(
            data['High'], data['Low'], data['Close'], fastk_period=trend_days, slowk_period=3, slowd_period=3)
        return data


class CCIIndicator(Indicator):
    def compute(self, data, *args, **kwargs):
        timeperiod = kwargs.get('timeperiod', 14)
        data['CCI'] = talib.CCI(data['High'], data['Low'],
                                data['Close'], timeperiod=timeperiod)
        return data


class RSIIndicator(Indicator):
    def compute(self, data, *args, **kwargs):
        timeperiod = kwargs.get('timeperiod', 14)
        data['RSI'] = talib.RSI(data['Close'], timeperiod=timeperiod)
        return data


class VMAIndicator(Indicator):
    def compute(self, data, *args, **kwargs):
        timeperiod = kwargs.get('timeperiod', 20)
        data['VMA'] = talib.MA(data['Volume'], timeperiod=timeperiod)
        return data


class PctChangeIndicator(Indicator):
    def compute(self, data, *args, **kwargs):
        data['pctChange'] = data['Close'].pct_change() * 100
        return data


class ThreeMonthTreasuryYield(Indicator):
    def compute(self, data, *args, **kwargs):
        start_date = kwargs.get('start_date')
        end_date = kwargs.get('end_date')
        three_month_treasury_yield = yf.download(
            "^IRX", start_date, end_date)["Close"]
        data['3M Treasury Yield'] = three_month_treasury_yield
        return data


class FiveYearTreasuryYield(Indicator):
    def compute(self, data, *args, **kwargs):
        start_date = kwargs.get('start_date')
        end_date = kwargs.get('end_date')
        five_year_treasury_yield = yf.download(
            "^FVX", start_date, end_date)["Close"]
        data['5Y Treasury Yield'] = five_year_treasury_yield
        return data


class TenYearTreasuryYield(Indicator):
    def compute(self, data, *args, **kwargs):
        start_date = kwargs.get('start_date')
        end_date = kwargs.get('end_date')
        ten_year_treasury_yield = yf.download(
            "^TNX", start_date, end_date)["Close"]
        data['10Y Treasury Yield'] = ten_year_treasury_yield
        return data


class ThirtyYearTreasuryYield(Indicator):
    def compute(self, data, *args, **kwargs):
        start_date = kwargs.get('start_date')
        end_date = kwargs.get('end_date')
        thirty_year_treasury_yield = yf.download(
            "^TYX", start_date, end_date)["Close"]
        data['30Y Treasury Yield'] = thirty_year_treasury_yield
        return data

# Add other indicators here as needed


# Step 3: Define the Factory
class IndicatorFactory:
    @staticmethod
    def get_indicator(indicator_type):
        indicators = {
            "Trend": TrendIndicator,
            "MACD": MACDIndicator,
            "ROC": ROCIndicator,
            "Stochastic Oscillator": StochasticOscillatorIndicator,
            "CCI": CCIIndicator,
            "RSI": RSIIndicator,
            "VMA": VMAIndicator,
            "PctChange": PctChangeIndicator,
            "3M Treasury Yield": ThreeMonthTreasuryYield,
            "5Y Treasury Yield": FiveYearTreasuryYield,
            "10Y Treasury Yield": TenYearTreasuryYield,
            "30Y Treasury Yield": ThirtyYearTreasuryYield,
            # Add other indicators here as needed
        }
        indicator = indicators.get(indicator_type)
        if indicator is None:
            raise ValueError(f"Invalid indicator type: {indicator_type}")
        return indicator()


class DataCleaner(ABC):
    """Abstract base class for data processors."""

    @abstractmethod
    def check(self, data):
        """Method to check the data for issues."""
        pass

    @abstractmethod
    def clean(self, data):
        """Method to clean the data from identified issues."""
        pass


class MissingDataCleaner(DataCleaner):
    """Concrete class for checking and handling missing data."""

    def check(self, data):
        """Check for missing data in the dataframe."""
        return data.isnull().sum()

    def clean(self, data, strategy='auto'):
        """Handle missing data based on the chosen strategy."""
        if strategy == 'auto':
            # Step 1: Handle missing data at the beginning
            while data.iloc[0].isnull().any():
                data = data.iloc[1:]

            # Step 2: Fill missing data in the middle with the previous row's values
            data.fillna(method='ffill', inplace=True)

        elif strategy == 'drop':
            data.dropna(inplace=True)

        elif strategy == 'fillna':
            data.fillna(method='ffill', inplace=True)

        elif strategy == 'none':
            # Do nothing and return the original data
            pass

        else:
            raise ValueError("Invalid strategy provided.")

        return data


class ScalerFactory:
    """
    Factory class dedicated to creating scalers.
    """
    @staticmethod
    def get_scaler(method):
        if method == 'StandardScaler':
            return StandardScaler()
        elif method == 'MinMaxScaler':
            return MinMaxScaler()
        else:
            raise ValueError(f"Invalid scaler method: {method}.")


class DataProcessorFactory:
    """Factory class to create data processors."""

    @staticmethod
    def create_cleaner(clean_type, *args, **kwargs):
        """Create a data processor based on the provided type."""
        if clean_type == "MissingData":
            return MissingDataCleaner(*args, **kwargs)
        else:
            raise ValueError(f"Processor type {clean_type} not recognized.")

    @staticmethod
    def standardize_data(data, method='StandardScaler'):
        """
        Standardize the data based on the chosen method.

        Parameters:
        - data: The data to be standardized.
        - method: The standardization method.

        Returns:
        - The standardized data.
        """
        scaler = ScalerFactory.get_scaler(method)
        return scaler.fit_transform(data)

    # add a new method that can split data by date
    @staticmethod
    def standardize_and_split_data(data, split_ratio=0.7, target_col="Trend", feature_cols=None):
        """Standardize the data and split it into training and testing sets."""
        if not feature_cols:
            feature_cols = data.columns.to_list()

        x_data = data[feature_cols]

        # Generate the one-hot encoding
        y_data = pd.get_dummies(data[target_col], prefix='Trend')

        # something wrong here
        # # Ensure y_data has two columns
        # missing_columns = set(["Trend_0", "Trend_1"]) - set(y_data.columns)
        # for col in missing_columns:
        #     y_data[col] = 0

        # # Ensure columns are in the correct order
        # y_data = y_data[["Trend_0", "Trend_1"]]

        # Check if the split index is valid
        split_idx = int(len(x_data) * split_ratio)
        if split_idx < 1 or split_idx >= len(x_data):
            raise ValueError(
                "Invalid split ratio leading to incorrect data partitioning.")

        X_test = x_data.iloc[split_idx:]
        y_test = y_data.iloc[split_idx:]
        X_train = x_data.iloc[:split_idx]
        y_train = y_data.iloc[:split_idx]

        return X_train, y_train, X_test, y_test

    @staticmethod
    def prepare_multistep_data(x_data, y_data, look_back, predict_steps, slide_steps=1):
        """
        Prepare the data for multi-step prediction and apply standardization within each sliding window.
        """
        x_date = []
        y_date = []
        x_data_multistep = []
        y_data_multistep = []

        for i in range(0, len(x_data) - look_back - predict_steps + 1, slide_steps):
            x_date.append(x_data.index[i:i + look_back])

            # For y_date extraction
            y_date.append(
                x_data.index[i + look_back:i + look_back + predict_steps])

            # Extract data for the current window
            x_window = x_data.iloc[i:i + look_back].values
            y_window = y_data.iloc[i + look_back:i +
                                   look_back + predict_steps].values

            # Standardize the data within the window
            scaler_x = StandardScaler()
            x_window_standardized = scaler_x.fit_transform(x_window)

            x_data_multistep.append(x_window_standardized)
            y_data_multistep.append(y_window)

        return np.array(x_data_multistep), np.array(y_data_multistep), np.array(x_date), np.array(y_date)


# Step 4: The ModelDataAPI using Factory Pattern
class ModelDataAPI:
    def __init__(self, data=None, start_date=None, end_date=None):
        self.data = data
        self.start_date = start_date
        self.end_date = end_date
        self.trend_method = "MA"
        self.indicators = []
        self.processors = []
        self.X_train = None
        self.y_train = None
        self.X_test = None
        self.y_test = None

    def set_seed(self, seed_value=42):
        """Set seed for reproducibility."""
        np.random.seed(seed_value)

    def fetch_stock_data(self, stock_symbol, start_date=None, end_date=None):
        """Fetch stock data from Yahoo Finance."""
        if start_date:
            self.start_date = start_date
        if end_date:
            self.end_date = end_date
        return yf.download(stock_symbol, start=self.start_date, end=self.end_date)

    def add_indicator(self, indicator_type, *args, **kwargs):
        indicator = IndicatorFactory.get_indicator(indicator_type)
        self.data = indicator.compute(self.data, *args, **kwargs)

    def add_data_cleaner(self, clean_type='MissingData', strategy='drop'):
        """Method to check and clean the data using a specific processor."""
        processor = DataProcessorFactory.create_cleaner(clean_type)
        issues = processor.check(self.data)
        self.data = processor.clean(self.data, strategy=strategy)
        return issues

    def process_data(self, split_ratio=0.7, target_col="Trend", feature_cols=None, look_back=64, predict_steps=16, train_slide_steps=1, test_slide_steps=16):
        """
        Use DataProcessorFactory to standardize and split the data, and prepare it for multi-step prediction if required.
        """
        self.X_train, self.y_train, self.X_test, self.y_test = DataProcessorFactory.standardize_and_split_data(
            self.data, split_ratio, target_col, feature_cols)

        if look_back and predict_steps:
            self.X_train, self.y_train, self.train_dates, _ = DataProcessorFactory.prepare_multistep_data(
                self.X_train, self.y_train, look_back, predict_steps, train_slide_steps)
            self.X_test, self.y_test, _, self.test_dates = DataProcessorFactory.prepare_multistep_data(
                self.X_test, self.y_test, look_back, predict_steps, test_slide_steps)


# Test the ModelDataAPI with Factory Pattern
ac = ModelDataAPI()
ac.set_seed(42)
start_date = "2001-01-01"
stop_date = "2021-01-01"
stock_symbol = "^GSPC"
ac.data = ac.fetch_stock_data(stock_symbol, start_date, stop_date)
# x = np.linspace(0, 50, 1000)  # Generate 1000 points between 0 and 50
# sin_wave = np.sin(x)  # Generate a sinusoidal wave
# ac.data = pd.DataFrame({
#     'Open': sin_wave,
#     'High': sin_wave + 0.1,  # Adding a small value to simulate the 'High' value for the day
#     'Low': sin_wave - 0.1,  # Subtracting a small value to simulate the 'Low' value for the day
#     'Close': sin_wave,
#     'Trend': [0 for i in range(1000)]
#     # 'Trend': [(i % 2) for i in range(1000)]
# })

indicators = [
    {"type": "Trend", "method": "MA", "oder_days": 20,
        "ma_days": 20, "trend_days": 5},
    {"type": "MACD", "fastperiod": 5, "slowperiod": 10, "signalperiod": 9},
    {"type": "ROC", "trend_days": 5},
    {"type": "Stochastic Oscillator", "trend_days": 5},
    {"type": "CCI", "timeperiod": 14},
    {"type": "RSI", "timeperiod": 14},
    {"type": "VMA", "timeperiod": 20},
    {"type": "PctChange"},
    {"type": "3M Treasury Yield", "start_date": "2001-01-01", "end_date": "2021-01-01"},
    {"type": "5Y Treasury Yield", "start_date": "2001-01-01", "end_date": "2021-01-01"},
    {"type": "10Y Treasury Yield", "start_date": "2001-01-01", "end_date": "2021-01-01"},
    {"type": "30Y Treasury Yield", "start_date": "2001-01-01", "end_date": "2021-01-01"},
]  # Add other indicators here as needed

for indicator_params in indicators:
    indicator_type = indicator_params["type"]
    ac.add_indicator(indicator_type, **indicator_params)

issues_detected = ac.add_data_cleaner("MissingData", strategy='auto')

# Specify data processing parameters
split_ratio = 0.7
target_col = "Trend"
# feature_cols = None  # None means use all columns
feature_cols = ['Close']
look_back = 64  # number of previous days' data to consider
predict_steps = 16  # number of days to predict in the future
slide_steps = 1  # sliding window step size

# Call the process_data method to standardize and split the data
ac.process_data(split_ratio=0.7, target_col="Trend", feature_cols=feature_cols, look_back=look_back,
                predict_steps=predict_steps, train_slide_steps=1, test_slide_steps=predict_steps)

# Display the shapes of the training and testing datasets
ac.X_train.shape, ac.y_train.shape, ac.X_test.shape, ac.y_test.shape

[*********************100%***********************]  1 of 1 completed


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data['Trend'].iloc[i:i + trend_days] = 0
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data['Trend'].iloc[i:i + trend_days] = 1


[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


Open                   0
High                   0
Low                    0
Close                  0
Adj Close              0
Volume                 0
Trend                 19
MACD                  17
ROC                    5
StoK                   8
StoD                   8
CCI                   13
RSI                   14
VMA                   19
pctChange              1
3M Treasury Yield      7
5Y Treasury Yield      7
10Y Treasury Yield     7
30Y Treasury Yield     7
dtype: int64

In [15]:
ac.data.isnull().sum()

Open                  0
High                  0
Low                   0
Close                 0
Adj Close             0
Volume                0
Trend                 0
MACD                  0
ROC                   0
StoK                  0
StoD                  0
CCI                   0
RSI                   0
VMA                   0
pctChange             0
3M Treasury Yield     0
5Y Treasury Yield     0
10Y Treasury Yield    0
30Y Treasury Yield    0
dtype: int64

In [16]:
# Specify data processing parameters
split_ratio = 0.7
target_col = "Trend"
# feature_cols = None  # None means use all columns
feature_cols = ['Close']
look_back = 64  # number of previous days' data to consider
predict_steps = 16  # number of days to predict in the future
slide_steps = 1  # sliding window step size

# Call the process_data method to standardize and split the data
ac.process_data(split_ratio=0.7, target_col="Trend", feature_cols=feature_cols, look_back=look_back,
                predict_steps=predict_steps, train_slide_steps=1, test_slide_steps=predict_steps)

# Display the shapes of the training and testing datasets
ac.X_train.shape, ac.y_train.shape, ac.X_test.shape, ac.y_test.shape

((3430, 64, 1), (3430, 16, 2), (90, 64, 1), (90, 16, 2))

In [17]:
# Extracting the last dimension and calculating the ratio of the two classes in ac.y_train
class_0_count = np.sum(ac.data['Trend'] == 0)
class_1_count = np.sum(ac.data['Trend'] == 1)

class_ratio = {
    "Trend_0": class_0_count,
    "Trend_1": class_1_count
}

class_ratio

{'Trend_0': 3252, 'Trend_1': 1761}

In [18]:
# Extracting the last dimension and calculating the ratio of the two classes in ac.y_train
class_0_count = np.sum(ac.y_train[:, :, 0])
class_1_count = np.sum(ac.y_train[:, :, 1])

class_ratio = {
    "Trend_0": class_0_count,
    "Trend_1": class_1_count
}

class_ratio

{'Trend_0': 34900, 'Trend_1': 19980}

In [19]:
# Extracting the last dimension and calculating the ratio of the two classes in ac.y_train
class_0_count = np.sum(ac.y_test[:, :, 0])
class_1_count = np.sum(ac.y_test[:, :, 1])

class_ratio = {
    "Trend_0": class_0_count,
    "Trend_1": class_1_count
}

class_ratio

{'Trend_0': 1008, 'Trend_1': 432}

In [20]:
import unittest


class TestDataAPI(unittest.TestCase):

    def setUp(self):
        self.ac = ModelDataAPI()
        self.ac.set_seed(42)
        self.start_date = "2020-01-01"
        self.end_date = "2021-01-01"
        self.stock_symbol = "^GSPC"

        # Mock data for the tests to avoid yfinance dependency
        x = np.linspace(0, 50, 1000)  # Generate 1000 points between 0 and 50
        sin_wave = np.sin(x)  # Generate a sinusoidal wave
        self.ac.data = pd.DataFrame({
            'Open': sin_wave,
            'High': sin_wave + 0.1,  # Adding a small value to simulate the 'High' value for the day
            # Subtracting a small value to simulate the 'Low' value for the day
            'Low': sin_wave - 0.1,
            'Close': sin_wave,
            'Trend': [0 for i in range(1000)]
            # 'Trend': [(i % 2) for i in range(1000)]
        })

    def test_fetch_stock_data(self):
        self.assertIsNotNone(self.ac.data)
        self.assertFalse(self.ac.data.empty)

    def test_add_single_indicator(self):
        initial_columns = set(self.ac.data.columns)
        self.ac.add_indicator("MACD", fastperiod=5,
                              slowperiod=10, signalperiod=9)
        new_columns = set(self.ac.data.columns)
        self.assertGreater(len(new_columns), len(initial_columns))

    def test_add_multiple_indicators(self):
        initial_columns = set(self.ac.data.columns)
        self.ac.add_indicator("RSI", timeperiod=14)
        self.ac.add_indicator("CCI", timeperiod=14)
        new_columns = set(self.ac.data.columns)
        self.assertGreater(len(new_columns), len(initial_columns))

    def test_invalid_indicator(self):
        with self.assertRaises(ValueError):
            self.ac.add_indicator("InvalidIndicatorName")

    def test_data_cleaning(self):
        # Introducing missing values into the mock data
        self.ac.data.iloc[2, 1] = np.nan
        self.ac.data.iloc[4, 3] = np.nan
        initial_missing_count = self.ac.data.isnull().sum().sum()

        # Use the add_data_cleaner method to clean the data
        self.ac.add_data_cleaner("MissingData", strategy='auto')

        # Verify that missing values have been cleaned
        final_missing_count = self.ac.data.isnull().sum().sum()
        self.assertLess(final_missing_count, initial_missing_count)

    def test_data_splitting(self):
        # Populate the mock data with necessary indicators
        indicators = [
            {"type": "Trend", "method": "MA", "oder_days": 20,
                "ma_days": 20, "trend_days": 5},
            # Add other necessary indicators here
        ]

        for indicator_params in indicators:
            indicator_type = indicator_params["type"]
            self.ac.add_indicator(indicator_type, **indicator_params)

        # Call the process_data method to standardize and split the data
        split_ratio = 0.7
        target_col = "Trend"
        feature_cols = None
        look_back = 10
        predict_steps = 5
        slide_steps = 1

        self.ac.process_data(split_ratio=split_ratio, target_col=target_col,
                             look_back=look_back, predict_steps=predict_steps, slide_steps=slide_steps)

        # Test the shape of the training and testing datasets
        self.assertEqual(self.ac.X_train.shape[0], self.ac.y_train.shape[0])
        self.assertEqual(self.ac.X_test.shape[0], self.ac.y_test.shape[0])

        # Ensure sum of lengths of train and test datasets matches the length of the original dataset
        total_data_points = len(self.ac.data) - look_back - predict_steps + 1
        self.assertEqual(
            self.ac.X_train.shape[0] + self.ac.X_test.shape[0], total_data_points)

        # Ensure data split ratio is approximately maintained
        train_ratio = self.ac.X_train.shape[0] / total_data_points
        self.assertAlmostEqual(train_ratio, split_ratio, places=1)


# Running the tests
unittest_result_splitting = unittest.TextTestRunner().run(
    unittest.TestLoader().loadTestsFromTestCase(TestDataAPI))
unittest_result_splitting

...

E..
ERROR: test_data_splitting (__main__.TestDataAPI)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "C:\Users\e4903\AppData\Local\Temp\ipykernel_1608\3007816089.py", line 81, in test_data_splitting
    self.ac.process_data(split_ratio=split_ratio, target_col=target_col,
TypeError: ModelDataAPI.process_data() got an unexpected keyword argument 'slide_steps'

----------------------------------------------------------------------
Ran 6 tests in 0.469s

FAILED (errors=1)


<unittest.runner.TextTestResult run=6 errors=1 failures=0>