In [4]:
from datetime import datetime
import polars as pl
df = pl.read_csv("/home/sida/YIPENG/Task2_FactorTiming/data/gold_futures_comex/td_comex_xau_cnhusd_210104.csv")
df = df.with_columns(
    td_close_USD = pl.col("td_close") * 31.1035 * pl.col("CNHUSD_close")
)
df = df.with_columns(
    com_td_diff = pl.col("td_close_USD") - pl.col("comex_close"),
    xau_td_diff = pl.col("td_close_USD") - pl.col("xau_close"),
    xau_com_diff = pl.col("td_close_USD") - pl.col("xau_close")
)
df = df.head(100000)
df = df.to_pandas()
df.index = df["date"].apply(lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S"))
df = df.resample("10T").first()

In [11]:
import os
import sys
from typing import List
PROJ_DIR = "/home/sida/YIPENG/Task2_FactorTiming/src/UnifArb"
sys.path.append(PROJ_DIR)
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.isotonic import IsotonicRegression
from typing import Union, Callable
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

from lib.helper.DataLoader import DataLoader
from pykalman import KalmanFilter
from typing import Union, Tuple
import pandas as pd
from pykalman import KalmanFilter

class Detrendor:
    def __init__(self, col: str, window: int = 120, step: int = 3,
                 d_method = "istonic",
                 data_path = "metadata/PredictorLSretWide.csv",
                 start_date: int = 20000101,
                 end_date: int = 20240101,
                 data: pd.Series = pd.Series()) -> None:
        self._col = col
        self._window = window
        self._step = step
        self._data_path = data_path
        self._start_date = str(start_date)
        self._end_date = str(end_date)
        self._data: pd.Series = data.copy()
        
        self.d_method = d_method
    
    @property
    def group_data(self):
        if not hasattr(self, "_group_data"):
            self._group_data = self.create_rolling_datasets(
                self.data, self._window, self._step
            )
        
        return self._group_data

    @property
    def preds(self) -> pd.Series:
        """Results of rolling detrending."""
        if not hasattr(self, "_preds"):
            self.batch_detrend()
        
        return self._preds # type: ignore
    
    @property
    def data(self) -> pd.Series:
        """Original data."""
        if hasattr(self, "_updated"):
            return self._data
        
        if self._data.empty:
            d = DataLoader(path = self._data_path)
            col_data = d.get_full_col(self._col)
            self._data = col_data[
                col_data.first_valid_index() : col_data.last_valid_index()
            ]
        s_time = pd.to_datetime(self._start_date).strftime("%Y-%m-%d")
        e_time = pd.to_datetime(self._end_date).strftime("%Y-%m-%d")
        self._data = self._data[s_time : e_time]
        self._updated = True
        
        return self._data
    
    @property
    def col(self):
        return self._col

    @staticmethod
    def create_rolling_datasets(series: pd.Series,
                                window_size: int = 120, step_size = 6):
        """
            Create rolling datasets for training
        """
        rolling_datasets = []
        assert len(series) >= window_size, "Series is too short."
        for i in range(0, len(series) - window_size + 1, step_size):
            window = series.iloc[i : i + window_size]
            rolling_datasets.append(window)

        return rolling_datasets
    
    @property
    def result(self) -> pd.DataFrame:
        df: pd.DataFrame = pd.concat([self.preds, self.data.cumsum()], axis = 1).sort_index()
        df = df.loc[df.first_valid_index() : df.last_valid_index()]
        df.columns = [self.detrend_col, self.col]

        return df
    
    def batch_detrend(self) -> pd.Series:
        """
            Use a part of data training, predict the future periods.
        """
        preds = []
        for data in self.group_data:
            data: pd.Series
            train_size = len(data) - self._step
            train_data = data.head(train_size)

            if self.d_method == "isotonic":
                test_data = data.cumsum().tail(self._step)
                model = detrend_series_isotonic(train_data, "model")
                pred = model.predict(list(range(train_size, len(data))))
                pred = pd.Series(pred, index = test_data.index)
                pred = test_data - pred  # Detrend.
                preds.append(pred)

            elif self.d_method == "kf":
                res = []
                test_data = data.tail(self._step)
                state, model = detrend_series_kalman(train_data, "model")
                for _d in test_data:
                    state, _ = model.filter_update(state, _d)
                    pred = _d - state[-1]
                    res.append(pred)
                res = pd.Series(res, index = test_data.index)
                preds.append(res)

        self._preds: pd.Series = pd.concat(preds)
        self.detrend_col = f"{self._col}_{self.d_method}_detrend_p"
        self._preds.name = self.detrend_col

        return self.preds
    
    def full_detrend(self):
        """
            Use all data to train, predict the future periods.
        """
        if self.d_method == "isotonic":
            pred: pd.Series = detrend_series_isotonic(self.data, "detrend")
        elif self.d_method == "kf":
            pred: pd.Series = detrend_series_kalman(self.data, "detrend")
        pred.name = f"{self._col}_{self.d_method}_detrend_insample"

        return pred
    
    def plot_detrend(self):
        sns.set_style("darkgrid")
        plt.figure(figsize = (24, 12))
        self.det_df = pd.concat([
            self.preds, self.full_detrend(), self.data.cumsum()
        ], axis = 1).sort_index()
        self.det_df.index = self.det_df.index.map(pd.to_datetime)
        for col in self.det_df:
            sns.lineplot(self.det_df[col], label = col)
        plt.legend()

        return plt.gcf()
    
"""Detrend Cell."""
def detrend_series_ma(
        col: pd.Series, p: int = 12, 
        return_type: str = "detrend",
) -> pd.Series:
    """
        1. For a given factor return, drop the data until the first valid factor return
        2. Fill the following NaN with 0 and calculate the cumulative return
        3. Detrend the cumulative return with moving average
        4. Concat the NaN and detrended cumulative return, return.
    """
    col_use = col[col.first_valid_index() : col.last_valid_index()]
    
    y = col_use.fillna(0).cumsum()
    y_ = y.rolling(p, min_periods = 2).mean().\
        fillna(0).values

    if return_type == "detrend":
        return pd.Series(y - y_, index = col_use.index, 
                         name = f"{col.name}_isotonic_detrend")
    elif return_type == "predict":
        return pd.Series(y_, index = col_use.index, 
                    name = f"{col.name}_isotonic_predict")
    else:
        raise ValueError("return_type must be detrend or predict")

def detrend_series_linear(
        col: pd.Series,
        return_type: str = "detrend",
) -> pd.Series:
    """
        1. For a given factor return, drop the data until the first valid factor return
        2. Fill the following NaN with 0 and calculate the cumulative return
        3. Detrend the cumulative return with linear regression
        4. Concat the NaN and detrended cumulative return.
    """
    col_use = col[col.first_valid_index() : col.last_valid_index()]

    x = np.arange(len(col_use))
    x = np.array([np.ones(len(x)), x]).T
    y = col_use.fillna(0).cumsum()
    reg = LinearRegression().fit(x, y)
    y_ = reg.predict(x)

    if return_type == "detrend":
        return pd.Series(y - y_, index = col_use.index, 
                         name = f"{col.name}_linear_detrend")
    elif return_type == "predict":
        return pd.Series(y_, index = col_use.index, 
                    name = f"{col.name}_linear_predict")
    else:
        raise ValueError("return_type must be detrend or predict")

def detrend_series_isotonic(
        col: pd.Series,   # DO NOT USE CUMSUM
        return_type: str = "detrend",
) -> Union[pd.Series, Callable]:
    """
        col: a series of factor returns
    """
    col_use = col[col.first_valid_index() : col.last_valid_index()]
    x = np.arange(len(col_use))
    y = col_use.fillna(0).cumsum().values
    ir = IsotonicRegression(out_of_bounds = "clip", increasing = "auto").\
        fit(x, y) # type: ignore
    y_ = ir.predict(x) 

    if return_type == "detrend":
        return pd.Series(y - y_, index = col_use.index, 
                    name = f"{col.name}_isotonic_detrend")
    elif return_type == "predict":
        return pd.Series(y_, index = col_use.index, 
                    name = f"{col.name}_isotonic_predict")
    elif return_type == "model":
        return ir # type: ignore
    else:
        raise ValueError("return_type must be detrend or predict")

def detrend_series_kalman(col: pd.Series, return_type: str = "detrend") \
    -> Union[pd.Series, Tuple[float, KalmanFilter]]:
    kf = KalmanFilter(initial_state_mean=0, n_dim_obs=1)
    filtered_state_means, _ = kf.filter(col.values)
    filtered_data = pd.Series(filtered_state_means.flatten(), index=col.index)
    if return_type == "detrend":
        return col - filtered_data
    elif return_type == "model":
        return filtered_state_means[-1], kf
    else:
        raise ValueError("Invalid return_type. Must be one of 'detrend', 'predict', or 'model'.")
    
dt = Detrendor(
    col = "com_td_diff",
    window = 300,
    step = 30,
    d_method = "kf",
    start_date = 20210105,
    end_date = 20210120,
    data = df["com_td_diff"], # type: ignore
)
dt.batch_detrend()

date
2021-01-06 21:00:00   -1.950556
2021-01-06 21:10:00    4.391503
2021-01-06 21:20:00    7.637599
2021-01-06 21:30:00    6.834512
2021-01-06 21:40:00    4.571063
                         ...   
2021-01-20 19:10:00    0.417766
2021-01-20 19:20:00    0.877609
2021-01-20 19:30:00    0.537844
2021-01-20 19:40:00    1.437844
2021-01-20 19:50:00    0.997688
Freq: 10T, Name: com_td_diff_kf_detrend_p, Length: 2010, dtype: float64

In [13]:
from lib.evaluate.PolicyGenerator import PolicyGenerator
pg = PolicyGenerator(dt)
pg.bayes_search(n_trials = 10)

[I 2023-12-14 11:49:44,388] A new study created in memory with name: no-name-b8156387-86dd-49f5-9577-c03cd8f6a104
[I 2023-12-14 11:49:44,978] Trial 0 finished with value: 1.0767541880925176e+35 and parameters: {'a': 0.10773713773535576, 'b': 3.2974600210617773, 'stop_loss': 4.234508578450988, 'max_earning': 0.060000000000000005}. Best is trial 0 with value: 1.0767541880925176e+35.
[I 2023-12-14 11:49:45,692] Trial 1 finished with value: 2.089511204394917e+52 and parameters: {'a': 0.8319851061307144, 'b': 1.5901958765305686, 'stop_loss': 3.6778771613675314, 'max_earning': 0.07}. Best is trial 1 with value: 2.089511204394917e+52.
[I 2023-12-14 11:49:46,493] Trial 2 finished with value: 5.568901026332437e+108 and parameters: {'a': -1.9293347603171727, 'b': 1.5147859041118341, 'stop_loss': 3.9489101905147512, 'max_earning': 0.17}. Best is trial 2 with value: 5.568901026332437e+108.
[I 2023-12-14 11:49:47,292] Trial 3 finished with value: 2.9731428689917115e+115 and parameters: {'a': -0.788

In [15]:
pg.best_result.keys()

dict_keys(['a', 'b', 'stop_loss', 'max_earning', 'start_date', 'end_date', 'return', 'hold_return', 'b_date', 's_date', 'pnl_list', 'stop_loss_date', 'stop_earning_date', 'winning_rate', 'avg_hold_p', 'avg_return', 'ann_return'])

In [16]:
pg.best_result["pnl_list"]

[-253.36553539264946,
 -147.20884156624925,
 -86.73212900530234,
 -110.9131454836006,
 191.25518085869953,
 578.8881151780006,
 282.0544062093991,
 66.9417946287499,
 -225.7464549487513,
 -319.9456641750021,
 -374.06245580094856,
 -567.8721984010463,
 138.19098389310147,
 -57.25251383729983,
 3.1250197570007003,
 106.64252206539913,
 89.71384983924963,
 89.09735178555025,
 -17.167971374899253,
 -70.60701029789811,
 53.75651258000153,
 184.3311336306997,
 126.4401112770006,
 67.04055051120167,
 67.04055051120167,
 38.59580022720115,
 2.618200501202409,
 -223.79401085189966,
 23.199333268499686,
 2.07014957445017,
 31.53172837250031,
 10.90429649394946,
 26.113585561100535,
 -111.62924937519983]