<a href="https://colab.research.google.com/github/YoungnohLee/fin/blob/main/%5Bfin%5D_DRL_optuna_ideas%20and%20preparation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install stockstats

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting stockstats
  Downloading stockstats-0.5.2-py2.py3-none-any.whl (20 kB)
Installing collected packages: stockstats
Successfully installed stockstats-0.5.2


In [2]:
import pandas as pd
from stockstats import wrap

# data = pd.read_csv('stock.csv')
# df = wrap(data)

source code from `FinRL/finrl/config.py`

`INDICATORS`부분에 원하는 보조지표를 추가하여 강화학습에 적용시켜 볼 수 있음. `config.py`를 rolling period 에 따른 함수로 만들어 load하면 rolling period 에 대해 `optuna`로 hyper parameter tuning 이 가능할 듯. 다만 데이터의 용량때문에 global 변수말고 lcoal 변수로 부른 다음에 함수 내에서 강화학습 알고리즘 돌리고 return 값만 반환하게 하는 방식을 택해보면 어떨까 하는 생각?
(global 하게 해볼까?)

In [5]:
# directory
from __future__ import annotations

# customized config from finrl.config.py
class config_rolling(sma=30, lma=60): 
    
    DATA_SAVE_DIR = "datasets"
    TRAINED_MODEL_DIR = "trained_models"
    TENSORBOARD_LOG_DIR = "tensorboard_log"
    RESULTS_DIR = "results"

    # date format: '%Y-%m-%d'
    TRAIN_START_DATE = "2014-01-06"  # bug fix: set Monday right, start date set 2014-01-01 ValueError: all the input array dimensions for the concatenation axis must match exactly, but along dimension 0, the array at index 0 has size 1658 and the array at index 1 has size 1657
    TRAIN_END_DATE = "2020-07-31"

    TEST_START_DATE = "2020-08-01"
    TEST_END_DATE = "2021-10-01"

    TRADE_START_DATE = "2021-11-01"
    TRADE_END_DATE = "2021-12-01"

    # stockstats technical indicator column names
    # check https://pypi.org/project/stockstats/ for different names
    def INDICATORS(sma=30, lma=60):
      INDICATORS = [
            "macd",
            "boll_ub",
            "boll_lb",
            "rsi_30",
            "cci_30",
            "dx_30",
            f"close_{sma}_sma",
            f"close_{lma}_sma",
      ]
      return INDICATORS


    # Model Parameters
    A2C_PARAMS = {"n_steps": 5, "ent_coef": 0.01, "learning_rate": 0.0007}
    PPO_PARAMS = {
        "n_steps": 2048,
        "ent_coef": 0.01,
        "learning_rate": 0.00025,
        "batch_size": 64,
    }
    DDPG_PARAMS = {"batch_size": 128, "buffer_size": 50000, "learning_rate": 0.001}
    TD3_PARAMS = {"batch_size": 100, "buffer_size": 1000000, "learning_rate": 0.001}
    SAC_PARAMS = {
        "batch_size": 64,
        "buffer_size": 100000,
        "learning_rate": 0.0001,
        "learning_starts": 100,
        "ent_coef": "auto_0.1",
    }
    ERL_PARAMS = {
        "learning_rate": 3e-5,
        "batch_size": 2048,
        "gamma": 0.985,
        "seed": 312,
        "net_dimension": 512,
        "target_step": 5000,
        "eval_gap": 30,
        "eval_times": 64,  # bug fix:KeyError: 'eval_times' line 68, in get_model model.eval_times = model_kwargs["eval_times"]
    }
    RLlib_PARAMS = {"lr": 5e-5, "train_batch_size": 500, "gamma": 0.99}


    # Possible time zones
    TIME_ZONE_SHANGHAI = "Asia/Shanghai"  # Hang Seng HSI, SSE, CSI
    TIME_ZONE_USEASTERN = "US/Eastern"  # Dow, Nasdaq, SP
    TIME_ZONE_PARIS = "Europe/Paris"  # CAC,
    TIME_ZONE_BERLIN = "Europe/Berlin"  # DAX, TECDAX, MDAX, SDAX
    TIME_ZONE_JAKARTA = "Asia/Jakarta"  # LQ45
    TIME_ZONE_SELFDEFINED = "xxx"  # If neither of the above is your time zone, you should define it, and set USE_TIME_ZONE_SELFDEFINED 1.
    USE_TIME_ZONE_SELFDEFINED = 0  # 0 (default) or 1 (use the self defined)

    # parameters for data sources
    ALPACA_API_KEY = "xxx"  # your ALPACA_API_KEY
    ALPACA_API_SECRET = "xxx"  # your ALPACA_API_SECRET
    ALPACA_API_BASE_URL = "https://paper-api.alpaca.markets"  # alpaca url
    BINANCE_BASE_URL = "https://data.binance.vision/"  # binance url

TypeError: ignored

## Before We Start...

Concept Ideas from baseline

- 기간마다 최고의 수익을 내는 알고리즘이 달랐음. 그래서 특정 기간마다 maximize reward 하는 알고리즘이 다를수 있다

- 1. 그렇다면 전체 데이터에 대해 하나의 알고리즘으로 train하는게 아니라, 기간별로 다르게 train하면 좋지 않을까?

- 2. 또한, 기간별로 같은 알고리즘을 쓸게 아니라, 기간별로 다른 알고리즘을 쓴다면 좋지 않을까?

- 3. 이걸 더 확장해서 기간별로 데이터를 학습하되, 각각의 기간마다 여러 알고리즘을 앙상블 한다면, 일반 알고리즘을 선택하는 걸 넘어서 최적의 알고리즘이 쓰일 수 있지 않을까?

- 그런데 과연 앙상블된 이 모델이, 실전에 적용된다고 했을때 최적의 일반화 성능을 낸다는 것을 보장할 수 있을까?

- 만약 안된다면 우리는 최적의 일반화 성능을 도출해 내기 위해서 어떤 (앙상블)알고리즘을 택해야 하는걸까? 또한 어떤 방법(기간별 vs 전체 학습)을 적용해야 최고의 수익률을 내는데에 근접할 수 있을까?

- 또한, 알고리즘의 근본이 다른데, 애초에 이것들을 앙상블 한다는 것이 가당키나 한걸까? 각각의 장단점을 지닌 모델들을 앙상블 한다는 것이, 각각의 장점을 살려서 단점을 보완하는 방식이 아니라, 각각의 단점을 살리지 못하고 단점을 극대화 시키는 방식이 아닐까? 
\
예를 들어서 Env를 근사하는 방식에 있어서 TD와 MC 방식을 살펴보았을때, TD는 부정확하지만 전체 데이터를 모으지 않고도 sequential 하게 데이터를 사용할 수 있고 random outlier에 error가 robust하다는 장점이 있지만 초반에 학습이 부정확하다는 단점이 있다. MC는 전체 데이터를 한번에 학습해서 Env를 추정하기 때문에 정확하다는 장점이 있지만 학습시간이 오래 걸리고 random outlier에 sensitive 하다는 단점이 있다. 이 둘을 앙상블 했을때, TD만 사용한다면 학습시간의 단축이라는 장점을 죽이고 MC가 모든 trajectiory를 모을때까지 기다려야 되는 것이다. 이런 상황에서 강화학습의 여러 알고리즘들을 앙상블 한다는 것이 좋다고 할 수 있을까?


- 또한 해당 FInRL의 `StockTradingEnv` class는 risk에 대한 대응을 다 팔고 가만히 있는 것으로 대응하는데, volatility가 높아지면 헷징을 하거나 단기투자로 바꾸는 등의 시도도 가능하지 않을까? 단순히 action_space를 `[ -hmax, hmax ] 로 하는게 아니라, 헷징이나 단기투자로 변환, 또는 공매도 포지션을 취하게 하는 등 여러가지 action이 가능하다면 훨씬 다양한 전략의 구현이 가능하지 않을까?


## 해야되는 것

- `finrl.config.py` 에 INDICATORS = [
    "macd",
    "boll_ub",
    "boll_lb",
    "rsi_30",
    "cci_30",
    "dx_30",
    "close_30_sma",
    "close_60_sma",
]` 라고 정의되어 있음.

- `f'close_{sma}_sma, close_{lma}_sma'` 꼴로 변환시켜서 sma, lma 를 optuna로 empirical 하게 변현시켜 rollin period 가 다른 데이터를 생성하려고 함.

- 아래 코드를 보면 config.py에서 불러온 string element로 이루어진 INDICATORS 리스트를 FeatureEngineer module로 import 하고 있음.

- FeatureEngineer module은 `finrl.meta.preprocessor.preprocessors`에서 불러옴.

- 아래 `FeatureEngineer` class에서 `tech_indicator_list` 를 보면 `for indicator in self.tech_indicator_list:` 라고 되어있으면서, 그 아래에는 `stock = Sdf.retype(df.copy())` 에 `stock[stock.tic == unique_ticker[i]][indicator]` 로 되어있어서, stockstats/stockstats.py안에 **`StockDataFrame`** class 내부 `def retype(value):`에서 value에 `df.copy()`를 넣음으로써, 맨 마지막에 밑에서 4번째 줄에 있는 `ret = StockDataFrame(value)`에 `df.copy`가 들어가면서 indicator계산이 되는 형식임.

- (질문) source code를 보니 `StockDataFrame(pd.DataFrame):`으로 구현이 되어있던데, 정작 안에서는 `pd.DataFrame`이 없었음. 그럼 input이 어떤 원리로 구현이 되는건지 잘 모르겠음. 

  optuna로 empirical 하게 구현하기 위해서는 rolling period를 def 나 class의 default argument로 할당해줘야 하는데 그렇게 하기 위해서 input을 어떻게 할당해주는 지 알 필요가 있다. 이 부분이 미해결로 남아있음.

Supported statistics/indicators are:

- change (in percent)
- delta
- permutation (zero-based)
- log return
- max in range
- min in range
- middle = (close + high + low) / 3
- compare: le, ge, lt, gt, eq, ne
- count: both backward(c) and forward(fc)
- cross: including upward cross and downward cross
- SMA: Simple Moving Average
- EMA: Exponential Moving Average
- MSTD: Moving Standard Deviation
- MVAR: Moving Variance
- RSV: Raw Stochastic Value
- RSI: Relative Strength Index
- KDJ: Stochastic Oscillator
- Bolling: Bollinger Band
- MACD: Moving Average Convergence Divergence
- CR: Energy Index (Intermediate Willingness Index)
- WR: Williams Overbought/Oversold index
- CCI: Commodity Channel Index
- TR: True Range
- ATR: Average True Range
- DMA: Different of Moving Average (10, 50)
- DMI: Directional Moving Index, including
- +DI: Positive Directional Indicator
- -DI: Negative Directional Indicator
- ADX: Average Directional Movement Index
- ADXR: Smoothed Moving Average of ADX
- TRIX: Triple Exponential Moving Average
- TEMA: Another Triple Exponential Moving Average
- VR: Volume Variation Index
- MFI: Money Flow Index
- VWMA: Volume Weighted Moving Average
- CHOP: Choppiness Index
- KAMA: Kaufman's Adaptive Moving Average
- PPO: Percentage Price Oscillator
- StochRSI: Stochastic RSI
- WT: LazyBear's Wave Trend
- Supertrend: with the Upper Band and Lower Band

In [None]:
# stockstats/stockstats.py 내 class STockDataFrame(): 내 def retype 메소드 source_code
    @staticmethod
    def retype(value, index_column=None):
        """ if the input is a `DataFrame`, convert it to this class.

        :param index_column: name of the index column, default to `date`
        :param value: value to convert
        :return: this extended class
        """
        if index_column is None:
            index_column = 'date'

        if isinstance(value, StockDataFrame):
            return value
        elif isinstance(value, pd.DataFrame):
            name = value.columns.name
            # use all lower case for column name
            value.columns = map(lambda c: c.lower(), value.columns)

            if index_column in value.columns:
                value.set_index(index_column, inplace=True)
            ret = StockDataFrame(value)
            ret.columns.name = name
            return ret
        return value

In [None]:
from finrl import config
from finrl import config_tickers
import os
from finrl.main import check_and_make_directories
from finrl.config import (
    DATA_SAVE_DIR,
    TRAINED_MODEL_DIR,
    TENSORBOARD_LOG_DIR,
    RESULTS_DIR,
    INDICATORS,
    TRAIN_START_DATE,
    TRAIN_END_DATE,
    TEST_START_DATE,
    TEST_END_DATE,
    TRADE_START_DATE,
    TRADE_END_DATE,
)
check_and_make_directories([DATA_SAVE_DIR, TRAINED_MODEL_DIR, TENSORBOARD_LOG_DIR, RESULTS_DIR])

In [None]:
fe = FeatureEngineer(
    use_technical_indicator = True,
    tech_indicator_list = INDICATORS,
    use_vix=True,
    use_turbulence=True,
    user_defined_feature=False
)
preprocessed = fe.preprocess_data(df)

In [None]:
from finrl.meta.preprocessor.preprocessors import FeatureEngineer, data_split

In [None]:
# FinRL/finrl/meta/preprocessor/preprocessors.py

from __future__ import annotations

import datetime
from multiprocessing.sharedctypes import Value

import numpy as np
import pandas as pd
from stockstats import StockDataFrame as Sdf

from finrl import config
from finrl.meta.preprocessor.yahoodownloader import YahooDownloader


def load_dataset(*, file_name: str) -> pd.DataFrame:
    """
    load csv dataset from path
    :return: (df) pandas dataframe
    """
    # _data = pd.read_csv(f"{config.DATASET_DIR}/{file_name}")
    _data = pd.read_csv(file_name)
    return _data


def data_split(df, start, end, target_date_col="date"):
    """
    split the dataset into training or testing using date
    :param data: (df) pandas dataframe, start, end
    :return: (df) pandas dataframe
    """
    data = df[(df[target_date_col] >= start) & (df[target_date_col] < end)]
    data = data.sort_values([target_date_col, "tic"], ignore_index=True)
    data.index = data[target_date_col].factorize()[0]
    return data


def convert_to_datetime(time):
    time_fmt = "%Y-%m-%dT%H:%M:%S"
    if isinstance(time, str):
        return datetime.datetime.strptime(time, time_fmt)


class FeatureEngineer:
    """Provides methods for preprocessing the stock price data

    Attributes
    ----------
        use_technical_indicator : boolean
            we technical indicator or not
        tech_indicator_list : list
            a list of technical indicator names (modified from neofinrl_config.py)
        use_turbulence : boolean
            use turbulence index or not
        user_defined_feature:boolean
            use user defined features or not

    Methods
    -------
    preprocess_data()
        main method to do the feature engineering

    """

    def __init__(
        self,
        use_technical_indicator=True,
        tech_indicator_list=config.INDICATORS,
        use_vix=False,
        use_turbulence=False,
        user_defined_feature=False,
    ):
        self.use_technical_indicator = use_technical_indicator
        self.tech_indicator_list = tech_indicator_list
        self.use_vix = use_vix
        self.use_turbulence = use_turbulence
        self.user_defined_feature = user_defined_feature

    def preprocess_data(self, df):
        """main method to do the feature engineering
        @:param config: source dataframe
        @:return: a DataMatrices object
        """
        # clean data
        df = self.clean_data(df)

        # add technical indicators using stockstats
        if self.use_technical_indicator:
            df = self.add_technical_indicator(df)
            print("Successfully added technical indicators")

        # add vix for multiple stock
        if self.use_vix:
            df = self.add_vix(df)
            print("Successfully added vix")

        # add turbulence index for multiple stock
        if self.use_turbulence:
            df = self.add_turbulence(df)
            print("Successfully added turbulence index")

        # add user defined feature
        if self.user_defined_feature:
            df = self.add_user_defined_feature(df)
            print("Successfully added user defined features")

        # fill the missing values at the beginning and the end
        df = df.fillna(method="ffill").fillna(method="bfill")
        return df

    def clean_data(self, data):
        """
        clean the raw data
        deal with missing values
        reasons: stocks could be delisted, not incorporated at the time step
        :param data: (df) pandas dataframe
        :return: (df) pandas dataframe
        """
        df = data.copy()
        df = df.sort_values(["date", "tic"], ignore_index=True)
        df.index = df.date.factorize()[0]
        merged_closes = df.pivot_table(index="date", columns="tic", values="close")
        merged_closes = merged_closes.dropna(axis=1)
        tics = merged_closes.columns
        df = df[df.tic.isin(tics)]
        # df = data.copy()
        # list_ticker = df["tic"].unique().tolist()
        # only apply to daily level data, need to fix for minute level
        # list_date = list(pd.date_range(df['date'].min(),df['date'].max()).astype(str))
        # combination = list(itertools.product(list_date,list_ticker))

        # df_full = pd.DataFrame(combination,columns=["date","tic"]).merge(df,on=["date","tic"],how="left")
        # df_full = df_full[df_full['date'].isin(df['date'])]
        # df_full = df_full.sort_values(['date','tic'])
        # df_full = df_full.fillna(0)
        return df

    def add_technical_indicator(self, data):
        """
        calculate technical indicators
        use stockstats package to add technical inidactors
        :param data: (df) pandas dataframe
        :return: (df) pandas dataframe
        """
        df = data.copy()
        df = df.sort_values(by=["tic", "date"])
        stock = Sdf.retype(df.copy())
        unique_ticker = stock.tic.unique()

        for indicator in self.tech_indicator_list:
            indicator_df = pd.DataFrame()
            for i in range(len(unique_ticker)):
                try:
                    temp_indicator = stock[stock.tic == unique_ticker[i]][indicator]
                    temp_indicator = pd.DataFrame(temp_indicator)
                    temp_indicator["tic"] = unique_ticker[i]
                    temp_indicator["date"] = df[df.tic == unique_ticker[i]][
                        "date"
                    ].to_list()
                    # indicator_df = indicator_df.append(
                    #     temp_indicator, ignore_index=True
                    # )
                    indicator_df = pd.concat(
                        [indicator_df, temp_indicator], axis=0, ignore_index=True
                    )
                except Exception as e:
                    print(e)
            df = df.merge(
                indicator_df[["tic", "date", indicator]], on=["tic", "date"], how="left"
            )
        df = df.sort_values(by=["date", "tic"])
        return df
        # df = data.set_index(['date','tic']).sort_index()
        # df = df.join(df.groupby(level=0, group_keys=False).apply(lambda x, y: Sdf.retype(x)[y], y=self.tech_indicator_list))
        # return df.reset_index()

    def add_user_defined_feature(self, data):
        """
         add user defined features
        :param data: (df) pandas dataframe
        :return: (df) pandas dataframe
        """
        df = data.copy()
        df["daily_return"] = df.close.pct_change(1)
        # df['return_lag_1']=df.close.pct_change(2)
        # df['return_lag_2']=df.close.pct_change(3)
        # df['return_lag_3']=df.close.pct_change(4)
        # df['return_lag_4']=df.close.pct_change(5)
        return df

    def add_vix(self, data):
        """
        add vix from yahoo finance
        :param data: (df) pandas dataframe
        :return: (df) pandas dataframe
        """
        df = data.copy()
        df_vix = YahooDownloader(
            start_date=df.date.min(), end_date=df.date.max(), ticker_list=["^VIX"]
        ).fetch_data()
        vix = df_vix[["date", "close"]]
        vix.columns = ["date", "vix"]

        df = df.merge(vix, on="date")
        df = df.sort_values(["date", "tic"]).reset_index(drop=True)
        return df

    def add_turbulence(self, data):
        """
        add turbulence index from a precalcualted dataframe
        :param data: (df) pandas dataframe
        :return: (df) pandas dataframe
        """
        df = data.copy()
        turbulence_index = self.calculate_turbulence(df)
        df = df.merge(turbulence_index, on="date")
        df = df.sort_values(["date", "tic"]).reset_index(drop=True)
        return df

    def calculate_turbulence(self, data):
        """calculate turbulence index based on dow 30"""
        # can add other market assets
        df = data.copy()
        df_price_pivot = df.pivot(index="date", columns="tic", values="close")
        # use returns to calculate turbulence
        df_price_pivot = df_price_pivot.pct_change()

        unique_date = df.date.unique()
        # start after a year
        start = 252
        turbulence_index = [0] * start
        # turbulence_index = [0]
        count = 0
        for i in range(start, len(unique_date)):
            current_price = df_price_pivot[df_price_pivot.index == unique_date[i]]
            # use one year rolling window to calcualte covariance
            hist_price = df_price_pivot[
                (df_price_pivot.index < unique_date[i])
                & (df_price_pivot.index >= unique_date[i - 252])
            ]
            # Drop tickers which has number missing values more than the "oldest" ticker
            filtered_hist_price = hist_price.iloc[
                hist_price.isna().sum().min() :
            ].dropna(axis=1)

            cov_temp = filtered_hist_price.cov()
            current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(
                filtered_hist_price, axis=0
            )
            # cov_temp = hist_price.cov()
            # current_temp=(current_price - np.mean(hist_price,axis=0))

            temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot(
                current_temp.values.T
            )
            if temp > 0:
                count += 1
                if count > 2:
                    turbulence_temp = temp[0][0]
                else:
                    # avoid large outlier because of the calculation just begins
                    turbulence_temp = 0
            else:
                turbulence_temp = 0
            turbulence_index.append(turbulence_temp)
        try:
            turbulence_index = pd.DataFrame(
                {"date": df_price_pivot.index, "turbulence": turbulence_index}
            )
        except ValueError:
            raise Exception("Turbulence information could not be added.")
        return turbulence_index