In [None]:
import numpy as np
import pandas as pd
import sys
sys.path.append("../src/")
import utils
from functools import partial
from itertools import product
from copy import deepcopy
from tqdm.autonotebook import tqdm
import warnings
from scipy.stats import ttest_1samp
from statsmodels.stats import weightstats as stests
warnings.filterwarnings("ignore")
import matplotlib.pyplot as plt
from collections import defaultdict

In [None]:
def align_y_pred(y, y_pred, tolerance=4):
    """
    y_pred can be a different timestamp
    """
    left = pd.DataFrame(index=y.index, data={"y_true": y})
    right = pd.DataFrame(index=y_pred.index, data={"y_pred": y_pred})
    tmp = pd.merge_asof(left, right, left_index=True, right_index=True,
                        tolerance=pd.Timedelta(days=tolerance)).dropna()
    return tmp


def generate_constraints(daily_ret, config, dn_threshold=0.01, up_threshold=0.99, var_window=260):
    def up_cvar(x):
        return x[x > 0].quantile(up_threshold)
    def dn_cvar(x):
        return x[x < 0].quantile(dn_threshold)
    
    cvar = config["CVAR"]
    constraints = pd.DataFrame()
    constraints["var_dn_1d"] = daily_ret.expanding(min_periods=var_window).apply(dn_cvar)
    constraints["var_up_1d"] = daily_ret.expanding(min_periods=var_window).apply(up_cvar)
    constraints["var_dn_1w"] = constraints["var_dn_1d"] * np.sqrt(5)
    constraints["var_up_1w"] = constraints["var_up_1d"] * np.sqrt(5)
    constraints["max_total_size_long"] = cvar / constraints["var_dn_1w"].abs()
    constraints["max_total_size_short"] = cvar / constraints["var_up_1w"].abs()
    constraints["max_trades"] = config["max_trades"]
    constraints["social_size"] = config["social_size"]
    constraints["cost"] = config["cost"]
    return constraints


def generate_positions(signals, constraints):
    positions = [0]    
    prev_position = 0
    prev_signal = signals.iloc[0]

    for i in range(1, len(signals)):
        signal = signals.iloc[i]
        max_long = constraints['max_total_size_long'].iloc[i]
        max_short = constraints['max_total_size_short'].iloc[i]
        max_trades = constraints["max_trades"].iloc[i]
        social_size = constraints["social_size"].iloc[i]
        curr_position = 0
        if signal ==0:
            if prev_position == 0:
                curr_position = 0
            elif prev_position > 0:
                curr_position = max(0, prev_position - (max_trades * social_size))
            elif prev_position < 0:
                curr_position = min(0, prev_position + (max_trades * social_size))
            else:
                raise ValueError(f"invalid value::prev_position {prev_position} at index: {i}")
        elif signal > 0:
            curr_position = min(max_long * signal, prev_position + max_trades * social_size)  
        elif signal < 0:
            curr_position = max(max_short * signal, prev_position - max_trades * social_size)  
        else:
            raise ValueError(f"invalid signla value::signal {signal}  at index: {i}")
        
        prev_position = curr_position

        positions.append(curr_position)
    return pd.Series(index=signals.index, data=positions)


def get_sharpe_ratio(daily_pl):
    return daily_pl.mean() / (1e-10 + daily_pl.std()) * np.sqrt(252)


def generate_daily_states(y, y_pred, constraints, delay=0):
    tmp = align_y_pred(y, y_pred)
    y_pred = tmp["y_pred"]
    y_true = tmp["y_true"]
    positions = generate_positions(y_pred, constraints)
    daily_pl = positions * (y_true - constraints["cost"]).shift(-delay).fillna(0)
    return daily_pl, positions


def sharpe_score(y, y_pred, constraints, delay=0):
    daily_pl, daily_pos = generate_daily_states(y, y_pred, constraints, delay)
    return get_sharpe_ratio(daily_pl)


def sharpe_score_no_cost(y, y_pred, delay=0):
    tmp = align_y_pred(y, y_pred)
    y_pred = tmp["y_pred"]
    y_true = tmp["y_true"]
    daily_pl = y_pred * y_true.shift(-delay).fillna(0)
    return get_sharpe_ratio(daily_pl)

In [None]:
class Optimizer:
    def __init__(self, space, score_func, random_state=0, n_rounds=10, n_calls=10):
        self.space = space
        self.score_func = score_func
        self.random_state = random_state
        self.n_rounds = n_rounds
        self.n_calls = n_calls
    
    def optimize(self, estimator, X, y):
        best_params = None
        best_val = -100
        for values in product(*self.space.values()):
            params = {key: val for key, val in zip(self.space.keys(), values)}
            estimator.set_params(**params)
            score = self.score_func(y, estimator.predict(X))
            if score > best_val:
                best_val = score
                best_params = params
        estimator.set_params(**best_params)
        return estimator, best_val

In [None]:
class WalkForwardSplit:
    """
    To create split to online update models.

    expanding window:
    |-------train-----------|---gap---|----test-----|
    
    rolling window:
                |---train---|---gap---|----test-----|
    
    In situations where the "y" is observed with a delay of `gap`, one can
    only train model and deploy it on test set with a gap.
    
    Parameter
    ----------------------

    train_size: int/float
    gap: int, The delay in deploying the trained model.
    test_size: int/float
    expanding: bool, (default = False)
    """

    def __init__(self, train_size, test_size, gap, expanding=False):
        self.train_size = train_size
        self.test_size = test_size
        self.gap = gap
        self.expanding = expanding
    
    def get_size(self, sz, y):
        if isinstance(sz, int):
            return sz
        elif isinstance(sz, float):
            assert sz <= 1 and sz >= 0
            return int(sz * len(y))
        else:
            raise ValueError(f"sz must be an integer or float in [0, 1], it is {sz} now.")

    def split(self, y, verbose=True):
        train_size = self.get_size(self.train_size, y)
        test_size = self.get_size(self.test_size, y)
        gap = self.gap
        splits = self._split(y, train_size, test_size, gap)

        if verbose:
            return tqdm(splits, total=np.ceil((len(y) - train_size - gap) / test_size))
        else:
            return splits
    
    def _split(self, y, train_size, test_size, gap):
        min_test_start = train_size + gap
        
        for test_start in range(min_test_start, len(y), test_size):
            if self.expanding:
                train_start = 0
            else:
                train_start = test_start - min_test_start
            train_end = test_start - gap
            test_end = min(test_start + self.test_size, len(y))
            yield np.arange(train_start, train_end), np.arange(test_start, test_end)

            
def walkforward_training(X, y, model, splitter):
    y_pred = pd.Series(index=y.index, data=np.nan)
    models = []
    for train_idx, test_idx in splitter.split(y):
        mdl = deepcopy(model)
        mdl.fit(X, y[train_idx])
        y_pred_tmp = mdl.predict(X)
        
        # after fitting on train, we write down the train/test prediction on the batch
        tmp = align_y_pred(y[test_idx], y_pred_tmp)
        y_pred_test = tmp["y_pred"]

        tmp = align_y_pred(y[train_idx], y_pred_tmp)
        y_pred_train = tmp["y_pred"]
        
        y_pred[test_idx] = y_pred_test
        models.append([mdl, train_idx, test_idx, y_pred_train, y_pred_test])

    return models, y_pred


print("checking the splitter is right")

print("rolling window: ")
splitter = WalkForwardSplit(train_size=0.5, test_size=10, gap=1)
y = np.arange(100)
for train, test in splitter.split(y):
    print(f"train set: {train[0]} -- {train[-1]}, test set: {test[0]} -- {test[-1]}")
    

print("expanding window: ")
splitter = WalkForwardSplit(train_size=0.5, test_size=10, gap=1, expanding=True)
y = np.arange(100)
for train, test in splitter.split(y):
    print(f"train set: {train[0]} -- {train[-1]}, test set: {test[0]} -- {test[-1]}")
    

print("rounding off: ")
splitter = WalkForwardSplit(train_size=0.3, test_size=15, gap=3, expanding=True)
y = np.arange(100)
for train, test in splitter.split(y):
    print(f"train set: {train[0]} -- {train[-1]}, test set: {test[0]} -- {test[-1]}")


In [None]:
class HybridEstimator:
    def __init__(self, base_estimator, optimizer):
        self.base_estimator = base_estimator
        self.optimizer = optimizer
    
    def fit(self, X, y):
        best_model, _ = self.optimizer.optimize(self.base_estimator, X, y)
        self.base_estimator = best_model
        return self.base_estimator

    def predict(self, X):
        return self.base_estimator.predict(X)


class HybridBaseEstimator:
    def __init__(self, **params):
        self.params = dict()
        self.set_params(**params)
        
    def set_params(self, **params):
        self.params.update(params)
    
    def get_params(self):
        return self.params
    
    def predict(self, X):
        raise NotImplementedError("no")

    def zscore(self, x, **kwargs):
        grp = x.ewm(**kwargs)
        return (x - grp.mean()) / (1e-10 + grp.std())

    def get_weighted_sum(self, x, weights):
        # get the weighted sum of an arithmatic weight series
        # compute x.rolling(len(weights)).apply(lambda x: sum(y * w for y, w in zip([x, weights])))
        n = len(weights)
        cx = x.cumsum()
        gap = weights[-1] - weights[-2]
        return weights[-1] * cx - gap * cx.rolling(n-1).sum().shift() - weights[0] * cx.shift(n)


class DiffReturn(HybridBaseEstimator):
    def predict(self, X):
        return self.get_params()["direction"] * np.sign(X.diff(self.get_params()["window"])).fillna(0)
        

class WeightedReturn(HybridBaseEstimator):
    def predict(self, X):
        n = self.get_params()["window"]
        signal = n * X - X.rolling(n).sum().shift()
        return self.get_params()["direction"] * np.sign(signal).fillna(0)


class ZScoreTrend(HybridBaseEstimator):
    def predict(self, X):
        zscore = self.zscore(X, halflife=self.get_params()["window"])
        return self.get_params()["direction"] * np.sign(zscore).fillna(0)


class OLSTrendSignal(HybridBaseEstimator):
    def predict(self, X):
        n = self.params["window"]
        weights = np.arange(n)
        cov_xy = self.get_weighted_sum(X, weights)
        cov_xmu = X.rolling(n).sum() * weights.sum() / n
        beta = (cov_xy - cov_xmu) / np.var(weights) / n
        return self.get_params()["direction"] * np.sign(beta).fillna(0)


# trend/reversion

In [None]:
configs = {
    "ADXY": {
        "CVAR": 1e7,
        "social_size": 1e6,
        "max_trades": 5000,
        "cost": 0.00666 * 1e-2
    },
    "TRY": {
        "CVAR": 1e7,
        "social_size": 1e6,
        "max_trades": 600,
        "cost": 0.05
    }
}

estimators = {
    "zscore": ZScoreTrend(),
    "past-diff": DiffReturn(),
    "ols-beta": OLSTrendSignal(),
    "weighted-gap": WeightedReturn()
}


spaces = {
    "short": range(3, 10),
    "medium": range(15, 30),
    "long": range(50, 150, 5)
}


index = "TRY"

config = configs[index]
tri = utils.get_total_return_index(index)
constraints = generate_constraints(tri["tri"].diff(), config).ffill().bfill()
X = utils.get_total_return_index(index)["tri"]
y = X.diff().shift(-1)


stats = []
score_func = partial(sharpe_score, delay=0, constraints=constraints)

scores = {
    "sharpe-cost": score_func,
    "sharpe-cost-delay-1": partial(sharpe_score, delay=1, constraints=constraints),
    "sharpe-no-cost": partial(sharpe_score_no_cost, delay=0),
    "sharpe-no-cost-delay-1": partial(sharpe_score_no_cost, delay=1),
}


# define horizon
studies = list(product(spaces.items(), estimators.items()))
for idx, ((term, search), (estimator_name, base_estimator)) in tqdm(enumerate(studies)):
    space = {"window": search, "direction": [-1, 1]}
    optimizer = Optimizer(space, score_func)
    hybrid = HybridEstimator(base_estimator, optimizer)
    splitter = WalkForwardSplit(train_size=0.5, test_size=252, gap=0, expanding=True)
    models, y_pred = walkforward_training(X, y, hybrid, splitter)

    f = y_pred.notnull() & y.notnull()
    cnt = f.sum()
    directions = np.array([x[0].base_estimator.get_params()["direction"] for x in models])
    best_window = np.array([x[0].base_estimator.get_params()["window"] for x in models])
    zstat = stests.ztest(y[f] * y_pred[f], x2=None, alternative="larger")

    xx = defaultdict(list)
    for model, train_idx, test_idx, y_pred_train, y_pred_test in models:
        y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]
        for func_name, func in scores.items():
            xx[func_name].append([func(y_train, y_pred_train), func(y_test, y_pred_test)])

    d = dict(index=index, estimator_name=estimator_name, term=term, cnt=cnt,
             zstat=zstat[0], pval=zstat[1], directions=directions,
             window=best_window)
    yy = {func_name: np.array(vals) for func_name, vals in xx.items()}
    d.update(yy)
    d["y_true"] = y
    d["y_pred"] = y_pred
    
    for score_name, func in scores.items():
        d[score_name + "_all"] = func(y, y_pred)
    stats.append(d)

    daily_pl, daily_pos = generate_daily_states(y, y_pred, constraints, delay=0)
    d["daily_pl"] = daily_pl
    d["position"] = daily_pos 
stats = pd.DataFrame(stats)

In [None]:
from data_analysis_toolbox import gscatter, statx

In [None]:
stats

In [None]:
stats["pval"].le(0.05).mean()

In [None]:
statx(stats.iloc[5]["sharpe-cost"])    


In [None]:
plt.plot(stats.iloc[5]["daily_pl"].cumsum())

In [None]:
score_name = "sharpe-no-cost"
xx = stats[score_name].values
xx = np.vstack(xx)
plt.scatter(xx[:, 0], xx[:, 1])
plt.xlabel("train score")
plt.ylabel("val score")
plt.title(score_name)
plt.show()

In [None]:
y_pred_random = pd.Series(index=y.index, data=np.sign(np.random.randn(len(y))))
print("with cost:", sharpe_score(y, y_pred_random, constraints=constraints))
print("with no cost:", sharpe_score_no_cost(y, y_pred_random))

In [None]:
stats = []
tris = [
    "SGDINR",
    "TRY",
    "CNHTWD",
    "TWD1m",
    "CNH1m",
    "ADXY",
    "CNYCNH",
    "CNH2y",
    "CNH3m",
    "HUFCZK",
    "RUBvsEUR",
    "CNH3m12m",
    "IDR1m3m",
    "CNH12m",
    "CNH6m",
    "CNH",
    "TWD1m3m",
    "AUDJPY",
    "TWD3m12m"
]

score_func = partial(sharpe_score, delay=0)
score_func_1 = partial(sharpe_score, delay=1)

estimators = {
    "zscore": ZScoreTrend(),
    "past-diff": DiffReturn(),
    "ols-beta": OLSTrendSignal(),
    "weighted-gap": WeightedReturn()
}


spaces = {
    "short": range(3, 10),
    "medium": range(15, 30),
    "long": range(50, 150)
}


for index in tqdm(tris):
    X = utils.get_total_return_index(index=index)["tri"]
    
    y = X.diff().shift(-1)
    # define horizon
    for term, search in spaces.items():
        # define the concrete base estimator
        for name, base_estimator in estimators.items():
            space = {"window": search, "direction": [-1, 1]}
            optimizer = Optimizer(space, score_func)
            hybrid = HybridEstimator(base_estimator, optimizer)
            splitter = WalkForwardSplit(train_size=0.5, test_size=252, gap=0, expanding=True)
            models, y_pred = walkforward_training(X, y, hybrid, splitter)
            f = y_pred.notnull() & y.notnull()
            cnt = f.sum()
            direction = np.mean([x[0].base_estimator.get_params()["direction"] for x in models])
            best_window = np.mean([x[0].base_estimator.get_params()["window"] for x in models])
            zstat =stests.ztest(y[f] * y_pred[f], x2=None, alternative="larger")
            score = score_func(y, y_pred)
            score_delay = score_func_1(y, y_pred)
            d = dict(index=index, name=name, term=term, sharpe=score, sharpe_delay1=score_delay,
                     cnt=cnt, zstat=zstat[0], pval=zstat[1], direction=direction, window=best_window)
            stats.append(d)
            
stats = pd.DataFrame(stats)

In [None]:
#### short term signal is more sensitive to delay
stats = pd.read_csv("tri_momentum_reversion_stats.csv")
print("percentage of trail that passed 5% test:", 100 * stats["pval"].le(0.05).mean(0))
stats.set_index("index").to_csv("tri_momentum_reversion_stats.csv")
stats[stats["pval"].le(0.05)].groupby("term")[["sharpe", "sharpe_delay1"]].describe().T

# sentiment

#### guolong's sentiment trading strategy
go short if both zscore and sentiment are negative, otherwise go long

In [None]:
X = utils.get_ssl_sentiment()

underlyings = [
    "SPX", "NDX", "NKY", "HSCEI", "HSI", "KOSPI2", "SX5E", "DAX",
    "HYG", "TLT", "LQD",
    "GLD", "USO",
    "USDNOK", "USDRUB", "USDMXN", "AUDJPY"
]

spaces = {
    "short": range(3, 10),
    "medium": range(15, 30, 2),
    "long": range(50, 150, 5)
}

class SentimentSignal(HybridBaseEstimator):
    def predict(self, X):
        sentiment = X["sentiment"]
        zscore = self.zscore(X["sentiment"], halflife=self.get_params()["window"])
        signal = pd.Series(index=X.index)
        signal[sentiment.le(0) & zscore.le(0)] = -1
        signal[sentiment.ge(0) | zscore.ge(0)] = 1
        signal = signal.ffill().fillna(0)
        return self.get_params()["direction"] * signal

score_func = partial(sharpe_score, delay=0)
score_func_1 = partial(sharpe_score, delay=1)
stats = []
splitter = WalkForwardSplit(train_size=0.5, test_size=252, gap=0, expanding=True)

for index in tqdm(underlyings):
    y = utils.get_spot(index)["spot"].diff().shift(-1)["2013-01-01":]
    for term, search in spaces.items():
        space = {"window": search, "direction": [-1, 1]}
        optimizer = Optimizer(space, score_func)
        hybrid = HybridEstimator(SentimentSignal(), optimizer)
        models, y_pred = walkforward_training(X, y, hybrid, splitter)
        f = y_pred.notnull() & y.notnull()
        cnt = f.sum()
        score = score_func(y, y_pred)
        score_delay = score_func_1(y, y_pred)
        direction = np.mean([x[0].base_estimator.get_params()["direction"] for x in models])
        best_window = np.mean([x[0].base_estimator.get_params()["window"] for x in models])
        zstat =stests.ztest(y[f] * y_pred[f], x2=None, alternative="larger")
        d = dict(index=index, name=name, term=term, sharpe=score, sharpe_delay1=score_delay,
                 cnt=cnt, zstat=zstat[0], pval=zstat[1], direction=direction, window=best_window)
        stats.append(d)
stats = pd.DataFrame(stats)

In [None]:
stats = pd.read_csv("sentiment_signal_study.csv")
print("percentage of trail that passed 5% test:", 100 * stats["pval"].le(0.05).mean(0))
stats.set_index("index").to_csv("sentiment_signal_study.csv")
stats[stats["pval"].le(0.05)]

#### EDF zscore diff
signal that we previously use in AUDJPY

In [None]:
class ZScoreDiff(HybridBaseEstimator):
    def predict(self, X):
        zscore = self.zscore(X["sentiment"], halflife=self.get_params()["window"])
        signal = zscore.diff(self.get_params()["diff"])
        return self.get_params()["direction"] * np.sign(signal).ffill().fillna(0)

spaces = {
    "short": {"window": range(3, 10), "diff": range(3, 40, 2)},
    "medium": {"window": range(10, 30), "diff": range(15, 120, 5)},
}


score_func = partial(sharpe_score, delay=0)
score_func_1 = partial(sharpe_score, delay=1)

splitter = WalkForwardSplit(train_size=0.5, test_size=252, gap=0, expanding=True)

X = utils.get_ssl_sentiment()

##### on total return indices

In [None]:
stats = []
tris = [
    "SGDINR",
    "TRY",
    "CNHTWD",
    "TWD1m",
    "CNH1m",
    "ADXY",
    "CNYCNH",
    "CNH2y",
    "CNH3m",
    "HUFCZK",
    "RUBvsEUR",
    "CNH3m12m",
    "IDR1m3m",
    "CNH12m",
    "CNH6m",
    "CNH",
    "TWD1m3m",
    "AUDJPY",
    "TWD3m12m"
]



for index in tqdm(tris):
    y = utils.get_total_return_index(index)["tri"].diff().shift(-1)["2013-01-01":]
    # define horizon
    for term, search in spaces.items():
        space = search.copy()
        space.update({"direction": [-1, 1]})
        optimizer = Optimizer(space, score_func)
        hybrid = HybridEstimator(ZScoreDiff(), optimizer)
        models, y_pred = walkforward_training(X, y, hybrid, splitter)
        f = y_pred.notnull() & y.notnull()
        cnt = f.sum()
        score = score_func(y, y_pred)
        score_delay = score_func_1(y, y_pred)
        direction = np.mean([x[0].base_estimator.get_params()["direction"] for x in models])
        best_window = np.mean([x[0].base_estimator.get_params()["window"] for x in models])
        best_diff = np.mean([x[0].base_estimator.get_params()["diff"] for x in models])
        sharpe = score_func(y, y_pred)
        
        zstat =stests.ztest(y[f] * y_pred[f], x2=None, alternative="larger")
        d = dict(index=index, name="zscore-diff", term=term, sharpe=score, sharpe_delay1=score_delay,
                 cnt=cnt, zstat=zstat[0], pval=zstat[1], direction=direction, window=best_window, diff=best_diff)
        stats.append(d)
stats = pd.DataFrame(stats)

In [None]:
# stats = pd.read_csv("sentiment_tri_zscore_diff_study.csv")
print("percentage of trail that passed 5% test:", 100 * stats["pval"].le(0.05).mean(0))
stats.set_index("index").to_csv("sentiment_tri_zscore_diff_study.csv")
stats[stats["pval"].le(0.05)]

##### on various assets

In [None]:
underlyings = [
    "SPX", "NDX", "NKY", "HSCEI", "HSI", "KOSPI2", "SX5E", "DAX",
    "HYG", "TLT", "LQD",
    "GLD", "USO",
    "USDNOK", "USDRUB", "USDMXN", "AUDJPY"
]




stats = []
splitter = WalkForwardSplit(train_size=0.5, test_size=252, gap=0, expanding=True)

for index in tqdm(underlyings):
    y = utils.get_spot(index)["spot"].diff().shift(-1)["2013-01-01":]
    for term, search in spaces.items():
        space = search.copy()
        space.update({"direction": [-1, 1]})
        optimizer = Optimizer(space, score_func)
        hybrid = HybridEstimator(ZScoreDiff(), optimizer)
        models, y_pred = walkforward_training(X, y, hybrid, splitter)
        f = y_pred.notnull() & y.notnull()
        cnt = f.sum()
        score = score_func(y, y_pred)
        score_delay = score_func_1(y, y_pred)
        direction = np.mean([x[0].base_estimator.get_params()["direction"] for x in models])
        best_window = np.mean([x[0].base_estimator.get_params()["window"] for x in models])
        best_diff = np.mean([x[0].base_estimator.get_params()["diff"] for x in models])
        sharpe = score_func(y, y_pred)
        
        zstat =stests.ztest(y[f] * y_pred[f], x2=None, alternative="larger")
        d = dict(index=index, name="zscore-diff", term=term, sharpe=score, sharpe_delay1=score_delay,
                 cnt=cnt, zstat=zstat[0], pval=zstat[1], direction=direction, window=best_window, diff=best_diff)
        stats.append(d)
stats = pd.DataFrame(stats)

In [None]:
print("percentage of trail that passed 5% test:", 100 * stats["pval"].le(0.05).mean(0))
stats.set_index("index").to_csv("sentiment_zscore_diff_study.csv")
stats[stats["pval"].le(0.05)]

# cross-asset feature

In [None]:
vix = utils.get_chained_future('VIX')
vix["diff"].cumsum().plot()

In [None]:
brent = utils.get_chained_future("BRENT")

In [None]:
brent["diff"].cumsum().plot()