In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from IPython.display import display, HTML
display(HTML("<style>.container { width: 100% !important; }</style>"))

# Import Dependencies

In [None]:
from typing import *
import pandas as pd
import numpy as np
from datetime import datetime
from datetime import timedelta
from scipy import stats

import plotly.graph_objects as go
import plotly.express as px
import cufflinks as cf
from plotly.offline import (download_plotlyjs, init_notebook_mode, plot, iplot)
init_notebook_mode(connected=True)
cf.go_offline()

from utils import calculate_drawdown, calculate_sharpe_ratio, nearcorr

# Contour plot of 2 signals IC weighting

In [None]:
perfs = np.linspace(-0.2, 0.2, 100)
corrs = np.linspace(-0.2, 0.9, 100)
weights = np.full((100, 100), np.nan)
for i, p in enumerate(perfs):
    for j, c in enumerate(corrs):
        perf = np.array([[0.1, p]])
        corr = np.array([[1, c], [c, 1]])
        w = perf @ np.linalg.pinv(corr)
        w = w[0]
        weights[j, i] = w[1]


fig = go.Figure(data =
    go.Contour(
        z=weights,
        x=perfs, # horizontal axis
        y=corrs, # vertical axis
        contours=dict(size=0.05, start=-2, end=0.5, showlabels=True, labelfont=dict(color="white"))
    ))
fig.update_yaxes(title="Signal Correlation")
fig.update_xaxes(title="Signal B IC")
fig.update_layout(
    title="Two Signals (A,B) - Contour Plot of Weight Assigned to Signal B. Signal A IC Fixed at 0.1",
    height=600, width=900
)
fig.show()

In [None]:
ics = np.array([[0.1, 0.18]])
corrs = np.array(
    [
        [1, 0.8],
        [0.8, 1],
    ]
)
ics @ np.linalg.pinv(corrs)

In [None]:
ics = np.array([[0.1, -0.18]])
corrs = np.array(
    [
        [1, 0.8],
        [0.8, 1],
    ]
)
ics @ np.linalg.pinv(corrs)

In [None]:
ics = np.array([[0.1, -0.18]])
corrs = np.array(
    [
        [1, -0.1],
        [-0.1, 1],
    ]
)
ics @ np.linalg.pinv(corrs)

In [None]:
ics = np.array([[0.1, 0.18]])
corrs = np.array(
    [
        [1, -0.1],
        [-0.1, 1],
    ]
)
ics @ np.linalg.pinv(corrs)

# Define helper functions

In [None]:
def generate_asset_returns(seed: int, n_assets: int, n_days: int) -> pd.DataFrame:
    np.random.seed(seed + 69)
    a = np.random.normal(0.5, 1, (n_assets, n_assets))
    m = a.T @ a
    d = np.zeros_like(m)
    np.fill_diagonal(d, np.sqrt(np.diag(m)))
    corr = np.linalg.inv(d) @ m @ np.linalg.inv(d)
    corr = np.clip(corr, -0.2, 0.9)
    np.fill_diagonal(corr, 1)
    corr = 0.5 * (corr + corr.T)
    corr = nearcorr(corr)

    np.random.seed(seed + 42)
    asset_stds = np.random.normal(60, 30, n_assets)
    asset_stds = np.clip(asset_stds, 20, 140)
    asset_stds = asset_stds / (100 * np.sqrt(365))
    covs = corr.copy()
    for i in range(covs.shape[0]):
        for j in range(covs.shape[1]):
            covs[i, j] = corr[i, j] * asset_stds[i] * asset_stds[j]

    np.random.seed(8008135 // seed)
    mu = np.random.normal(0.001, 0.001, n_assets)
    asset_returns = pd.DataFrame(
        stats.multivariate_t(df=10, loc=mu, shape=covs).rvs(size=n_days, random_state=seed + 7),
        index=pd.date_range("2025-01-01", periods=n_days, freq="D"),
        columns=[f"asset_{i}" for i in range(n_assets)]
    ).clip(-0.2, 0.2)
    return asset_returns


def generate_signal_weights(
    seed: int,
    asset_returns: pd.DataFrame,
) -> pd.DataFrame:
    # create a signal with some noise added
    np.random.seed(seed + 111)
    signal_weights = asset_returns * np.random.normal(0.01, 0.2, asset_returns.shape)
    signal_weights = signal_weights.sub(signal_weights.mean(axis=1), axis=0)
    signal_weights = signal_weights.div(signal_weights.abs().sum(axis=1), axis=0)
    return signal_weights

In [None]:
def _rescale_weights(weights: pd.DataFrame) -> pd.DataFrame:
    """Re-scale weights so they are positive and sum to 1."""
    weights = weights.sub(weights.mean(axis=1), axis=0)
    weights = weights.div(weights.abs().sum(axis=1), axis=0)
    weights = 0.5 + weights
    weights = weights.div(weights.abs().sum(axis=1), axis=0)
    return weights


def get_ic_corr_weight(ics: pd.DataFrame) -> pd.DataFrame:
    """Get Type 1 weights."""
    weights = []
    idx = ics.index[28:]
    for dt in idx:
        curr_ics = ics.loc[dt - timedelta(days=365): dt]
        mu = curr_ics.mean().values.reshape(1, -1)
        icorr = np.linalg.pinv(curr_ics.corr().values)
        w = mu @ icorr
        # apply timedelta to avoid lookahead bias
        weights.append(
            pd.DataFrame(w, columns=curr_ics.columns, index=[dt + timedelta(days=1)])
        )

    weights = pd.concat(weights, axis=0)
    weights = _rescale_weights(weights=weights)
    return weights


def get_perf_corr_weight(perf_metric: pd.DataFrame) -> pd.DataFrame:
    """Get Type 2 weights."""
    weights = []
    idx = perf_metric.index[28:]
    for dt in idx:
        curr_perf = perf_metric.loc[dt - timedelta(days=365): dt]
        mu = curr_perf.iloc[-1].values.reshape(1, -1)
        icorr = np.linalg.pinv(curr_perf.corr().values)
        w = mu @ icorr
        # apply timedelta to avoid lookahead bias
        weights.append(
            pd.DataFrame(w, columns=curr_perf.columns, index=[dt + timedelta(days=1)])
        )

    weights = pd.concat(weights, axis=0)
    weights = _rescale_weights(weights=weights)
    return weights


def get_perf_corr_returns_weight(perf_metric: pd.DataFrame, signal_returns: pd.DataFrame) -> pd.DataFrame:
    """Get Type 3 weights."""
    weights = []
    idx = perf_metric.index[28:]
    for dt in idx:
        curr_signal_rets = signal_returns.loc[dt - timedelta(days=365): dt]
        mu = perf_metric.loc[dt].values.reshape(1, -1)
        icorr = np.linalg.pinv(curr_signal_rets.corr().values)
        w = mu @ icorr
        # apply timedelta to avoid lookahead bias
        weights.append(
            pd.DataFrame(w, columns=curr_signal_rets.columns, index=[dt + timedelta(days=1)])
        )

    weights = pd.concat(weights, axis=0)
    weights = _rescale_weights(weights=weights)
    return weights


def calculate_metrics(
    signal_weights: pd.DataFrame,
    asset_returns: pd.DataFrame,
    signal_returns: pd.DataFrame,
    weighting_name: str,
) -> pd.DataFrame:
    sharpe = calculate_sharpe_ratio(returns=signal_returns, scale=365, geometric=True)
    dd = calculate_drawdown(returns=signal_returns).min() * 100
    vol = signal_returns.std() * np.sqrt(365) * 100
    ic = signal_weights.corrwith(asset_returns, axis=1).mean() * 100
    
    out = pd.DataFrame(
        [
            [
                sharpe,
                ic,
                dd,
                vol,
            ],
        ], columns=["Sharpe", "IC (%)", "MDD (%)", "Ann. Vol. (%)"]
    )
    out.columns = pd.MultiIndex.from_tuples(
        [
            (c, weighting_name) for c in out.columns
        ]
    )

    return out


def apply_weights(weights: pd.DataFrame, signals: Dict[str, pd.DataFrame]) -> pd.DataFrame:
    final_weights = None
    for sig in weights.columns:
        tmp = signals[sig].mul(weights[sig], axis=0)
        if final_weights is None:
            final_weights = tmp
        else:
            final_weights = final_weights + tmp

    final_weights = final_weights / len(weights.columns)
    final_weights = final_weights.sub(final_weights.mean(axis=1), axis=0)
    final_weights = final_weights.div(final_weights.abs().sum(axis=1), axis=0)
    final_weights = final_weights.dropna(how="all")
    return final_weights

In [None]:
def generate_all(rng: int, n_assets: int, n_days: int, n_sigs: int):
    asset_returns = generate_asset_returns(seed=rng, n_assets=n_assets, n_days=n_days)
    base_sig_weight = generate_signal_weights(seed=rng + 1, asset_returns=asset_returns)
    sig_weights = {"signal_0": base_sig_weight}
    ics = [
        (base_sig_weight.corrwith(asset_returns, axis=1)).to_frame("signal_0")
    ]
    curr_sig_weight = None
    for i in range(1, n_sigs):
        np.random.seed(rng * (i + 2))
        if curr_sig_weight is None:
            curr_sig_weight = base_sig_weight * np.random.normal(0.4, 0.35, base_sig_weight.shape)
        else:
            curr_sig_weight = curr_sig_weight * np.random.normal(0.4, 0.35, base_sig_weight.shape)
        curr_sig_weight = curr_sig_weight.sub(curr_sig_weight.mean(axis=1), axis=0)
        curr_sig_weight = curr_sig_weight.div(curr_sig_weight.abs().sum(axis=1), axis=0)
        sig_weights[f"signal_{i}"] = curr_sig_weight
        ics.append(
            (curr_sig_weight.corrwith(asset_returns, axis=1)).to_frame(f"signal_{i}")
        )

    ics = pd.concat(ics, axis=1)

    sig_rets = []
    for name, weights in sig_weights.items():
        sr = (weights * asset_returns).sum(axis=1)
        sig_rets.append(sr.to_frame(name))

    sig_rets = pd.concat(sig_rets, axis=1)
    
    return sig_weights, sig_rets, ics, asset_returns

In [None]:
n_assets = 200
n_days = 365 * 5
n_sigs = 20
seed = 212
_, sig_rets, ics, _ = generate_all(rng=seed, n_assets=n_assets, n_days=n_days, n_sigs=n_sigs)
sig_rets = sig_rets.iloc[:, 2:12]
sig_rets.columns = [f"signal_{i}" for i in range(sig_rets.shape[1])]
ics = ics.iloc[:, 2:12]
ics.columns = [f"signal_{i}" for i in range(ics.shape[1])]

fig = px.imshow(sig_rets.corr().mul(100).round(1), text_auto=True)
fig.update_layout(title="Signal Returns Correlation (%)", height=700, width=700)
fig.show()

fig = px.imshow(ics.corr().mul(100).round(1), text_auto=True)
fig.update_layout(title="Signal IC's Correlation (%)", height=700, width=700)
fig.show()

In [None]:
fig = px.line(sig_rets.add(1).cumprod().add(-1))
fig.show()

# Run the full simulation.

In [None]:
all_metrics = []
n_sim = 1000
n_assets = 200
n_days = 365 * 5
n_sigs = 20
seed = 0
while len(all_metrics) <= n_sim:
    if seed % 50 == 0:
        print(f"On iter={seed}")
    try:
        seed += 1
        sig_weights, sig_rets, ics, asset_returns = generate_all(rng=seed, n_assets=n_assets, n_days=n_days, n_sigs=n_sigs)
        sig_rets = sig_rets.iloc[:, 2:12]
        sig_rets.columns = [f"signal_{i}" for i in range(sig_rets.shape[1])]
        ics = ics.iloc[:, 2:12]
        ics.columns = [f"signal_{i}" for i in range(ics.shape[1])]
        if (sig_rets.mean() > 0.003).any() or (ics.mean() > 0.06).any():
            print(f"{seed=}, Average return too high, continuing")
            continue
        ic_weight = get_ic_corr_weight(ics)
        weight_type1 = apply_weights(weights=ic_weight, signals=sig_weights)
        pr1 = (weight_type1 * asset_returns).sum(axis=1).loc[datetime(2025, 2, 1):]

        m1 = calculate_metrics(
            signal_weights=weight_type1,
            asset_returns=asset_returns,
            signal_returns=pr1,
            weighting_name="Type 1"
        )

        rol = sig_rets.rolling(min_periods=28, window=365)
        sharpes = np.sqrt(365) * rol.mean() / rol.std()

        sharpe_weight = get_perf_corr_weight(perf_metric=sharpes)
        weight_type2 = apply_weights(weights=sharpe_weight, signals=sig_weights)
        pr2 = (weight_type2 * asset_returns).sum(axis=1).loc[datetime(2025, 2, 1):]

        m2 = calculate_metrics(
            signal_weights=weight_type2,
            asset_returns=asset_returns,
            signal_returns=pr2,
            weighting_name="Type 2"
        )

        sharpe_ret_weight = get_perf_corr_returns_weight(perf_metric=sharpes, signal_returns=sig_rets)
        weight_type3 = apply_weights(weights=sharpe_ret_weight, signals=sig_weights)
        pr3 = (weight_type3 * asset_returns).sum(axis=1).loc[datetime(2025, 2, 1):]

        m3 = calculate_metrics(
            signal_weights=weight_type3,
            asset_returns=asset_returns,
            signal_returns=pr3,
            weighting_name="Type 3"
        )

        sharpes_scale = sharpes.copy()
        sharpes_scale = sharpes_scale.sub(sharpes_scale.mean(axis=1), axis=0)
        sharpes_scale = sharpes_scale.div(sharpes_scale.abs().sum(axis=1), axis=0)
        sharpes_scale = 0.5 + sharpes_scale
        sharpes_scale = sharpes_scale.div(sharpes_scale.sum(axis=1), axis=0)
        sharpes_scale = sharpes_scale.shift(1)
        weight_type4 = apply_weights(weights=sharpes_scale, signals=sig_weights)
        pr4 = (weight_type4 * asset_returns).sum(axis=1).loc[datetime(2025, 2, 1):]

        m4 = calculate_metrics(
            signal_weights=weight_type4,
            asset_returns=asset_returns,
            signal_returns=pr4,
            weighting_name="Type 4"
        )

        eq_weight = sig_rets.copy()
        eq_weight = eq_weight / eq_weight
        eq_weight = eq_weight / eq_weight.shape[1]
        weight_eq = apply_weights(weights=eq_weight, signals=sig_weights)
        preq = (weight_eq * asset_returns).sum(axis=1).loc[datetime(2025, 2, 1):]

        meq = calculate_metrics(
            signal_weights=weight_eq,
            asset_returns=asset_returns,
            signal_returns=preq,
            weighting_name="Type Eq."
        )

        rnd_weight = sig_rets.copy()
        rnd_weight = rnd_weight / rnd_weight
        rnd_weight = rnd_weight * np.random.uniform(0.1, 1, rnd_weight.shape)
        rnd_weight = rnd_weight.div(rnd_weight.sum(axis=1), axis=0)
        weight_rnd = apply_weights(weights=rnd_weight, signals=sig_weights)
        prrnd = (weight_rnd * asset_returns).sum(axis=1).loc[datetime(2025, 2, 1):]

        mernd = calculate_metrics(
            signal_weights=weight_rnd,
            asset_returns=asset_returns,
            signal_returns=prrnd,
            weighting_name="Type Rnd."
        )

        curr_sim_metrics = pd.concat(
            [
                m1, m2, m3, m4, meq, mernd
            ], axis=1
        )
        if (curr_sim_metrics["Sharpe"] > 100).any().any():
            print(f"Breaking - {seed=}")
            break
        
        all_metrics.append(curr_sim_metrics)
    except:
        continue

all_metrics = pd.concat(all_metrics, axis=0).reset_index(drop=True)

In [None]:
fig = px.line(ic_weight)
fig.update_yaxes(title="Weight w_i")
fig.update_xaxes(title="")
fig.update_layout(title="IC weighting (Type 1)")
fig.show()

In [None]:
fig = px.line(sharpes_scale.dropna())
fig.update_yaxes(title="Weight w_i")
fig.update_xaxes(title="")
fig.update_layout(title="Sharpe weighting (Type 4)")
fig.show()

In [None]:
mu = all_metrics["IC (%)"]["Type Eq."].median()
_, p = stats.ttest_rel(a=all_metrics["IC (%)"]["Type Eq."], b=all_metrics["IC (%)"]["Type 1"])
fig = px.box(all_metrics["IC (%)"])
fig.add_hline(y=mu, line_dash="dash")
fig.update_yaxes(title="IC (%)")
fig.update_xaxes(title="")
fig.update_layout(title=f"IC of Different Weighting Schemes. Difference in Type1 and Equal p={round(p, 4)}", width=800)
fig.show()

In [None]:
mu = all_metrics["Sharpe"]["Type Eq."].median()
_, p = stats.ttest_rel(a=all_metrics["Sharpe"]["Type Eq."], b=all_metrics["Sharpe"]["Type 1"])
fig = px.box(all_metrics["Sharpe"])
fig.add_hline(y=mu, line_dash="dash")
fig.update_yaxes(title="Sharpe")
fig.update_xaxes(title="")
fig.update_layout(title=f"Sharpe of Different Weighting Schemes. Difference in Type1 and Equal p={round(p, 4)}", width=800)
fig.show()

In [None]:
mu = all_metrics["MDD (%)"]["Type Eq."].median()
_, p = stats.ttest_rel(a=all_metrics["MDD (%)"]["Type Eq."], b=all_metrics["MDD (%)"]["Type 1"])
fig = px.box(all_metrics["MDD (%)"])
fig.add_hline(y=mu, line_dash="dash")
fig.update_yaxes(title="MDD (%)")
fig.update_xaxes(title="")
fig.update_layout(title=f"MDD of Different Weighting Schemes. Difference in Type1 and Equal p={round(p, 4)}", width=800)
fig.show()