# Mean Reversion

Mean reversion is a trading strategy based on a simple idea:

    Prices tend to return to their average value after straying too far from it.

In other words, if the price rises too high above its average, a downward correction is expected, and if it falls too low, a rebound towards the average is expected.

# Importing assets

In [2]:
import sys
from statsmodels.tsa.stattools import adfuller
import pandas as pd
import numpy as np


sys.path.append("../../src/")
sys.path.append("../../src/data_manager/")

In [None]:
GREEN="\033[01;32m"
WHITE="\033[00m"
RED="\033[01;31m"

print(f'{RED}ok')


In [None]:


from data_manager import data_manager

crypto_dic = data_manager.get_securities("crypto", start="2022-11-01")

In [None]:
print(crypto_dic.keys())

In [6]:
for i in crypto_dic:
    crypto_dic[i]["hourly_return"] = crypto_dic[i]["close"].pct_change()


In [None]:
import plotly.graph_objects as go


def plot_dict_unormalized(dic_sec : dict, y_col = "close", template="plotly_dark", x_title = "Date", y_title="price", title="Price non normalized"):
    fig = go.Figure()

    for ticker, df in dic_sec.items():
        x = df.index if 'date' not in df.columns else df['date']
        y = df[y_col]
        
        fig.add_trace(
            go.Scatter(
                x=x,
                y=y,
                mode='lines',
                name=ticker  # affiché dans la légende
            )
        )

    # Personnalisation du graphique
    fig.update_layout(
        title=title,
        xaxis_title=x_title,
        yaxis_title=y_title,
        hovermode="x unified",
        template=template
    )

    fig.show()


plot_dict_unormalized(crypto_dic)

In [None]:
#print(crypto_dic['BTCUSDT']['close'].diff().cumsum().dropna())

In [None]:
import pandas as pd
import plotly.express as px



def plot_dict_normalized(dic_sec : dict, y_col = "close", template="plotly_dark", x_title = "Date", y_title="Price Base 100", title="Price normalized"):
    # FUUUUUUUU-SION HA (We merge dfs)
    merged = pd.concat(
        {ticker: df[y_col] for ticker, df in dic_sec.items()},
        axis=1
    )
    merged.columns = merged.columns.get_level_values(0)  # iciii on garde juste les tickers hehe

    # Normaliser : chaque série commence à 100
    normalized = merged / merged.iloc[0] * 100

    # Plot avec Plotly
    fig = px.line(
        normalized,
        title=title,
        labels={"value": y_title, "index": "Date", "variable": "Ticker"}
    )

    fig.update_layout(
        hovermode="x unified",
        template=template,
        legend_title_text="Tickers"
    )

    fig.show()

plot_dict_normalized(crypto_dic)

# Stationarity check

In [None]:
def check_stationarity(dic_sec, col="close", p_value_threshold=0.05):
    ans = {}
    for ticker, df in dic_sec.items():
        result = adfuller(df[col])
        print(f"with a p-value of {result[1]}, ", end="")
        if result[1] < p_value_threshold:
            print(f'{GREEN}{ticker}{WHITE} is stationary')
            ans[ticker] = df
            
        else:
            print(f'{RED}{ticker}{WHITE} is not stationary')
    return ans

station_dic = check_stationarity(crypto_dic)


There is only two **pairs** data already on a stationarity tag

In [None]:
plot_dict_unormalized(station_dic)

# Detrend methods

Let's try to remove the trends manually

In [None]:
remaining_sec = {i : df for i,df in crypto_dic.items() if i not in station_dic}

remaining_sec.keys()

## Detrend with log returns

In [20]:
def detrend_logs_ret(dic_sec : dict):
    ans = {}

    for ticker, df in dic_sec.items():
        df["log_close"] = np.log(df["close"])
        df["log_returns"] = df["log_close"].diff().bfill()
        ans[ticker+'_logs_ret'] = df
#        pd.DataFrame.b
    return ans

detrended_logs = detrend_logs_ret(remaining_sec)

In [None]:
for ticker, df in detrended_logs.items():
    print(df['log_returns'].head())

In [None]:
station_logs = check_stationarity(detrended_logs, 'log_returns')

## Detrend With Linear regression

In [None]:
from sklearn.linear_model import LinearRegression

def detrend_lin_reg(dic_sec : dict):
    ans = {}

    for ticker, df in dic_sec.items():
        X = np.arange(len(df)).reshape(-1, 1)
        y = df["close"].values
        model = LinearRegression().fit(X, y)
        trend = model.predict(X)

        df["detrended"] = df["close"] - trend
        ans[ticker+'_lin_reg'] = df
#        pd.DataFrame.b
    return ans

detrended_lin_reg = detrend_lin_reg(remaining_sec)
station_lin_reg = check_stationarity(detrended_lin_reg, 'detrended')

## Detrend with pass-high

### Manually

In [None]:
def detrend_pass_high(dic_sec : dict):
    ans = {}

    for ticker, df in dic_sec.items():
        df["trend"] = df["close"].rolling(window=50).mean()
        df["high_pass"] = df["close"] - df["trend"]
        df['high_pass'] = df['high_pass'].bfill()
        ans[ticker+'_pass_high'] = df
#        pd.DataFrame.b
    return ans


detrended_high_pass = detrend_pass_high(remaining_sec)
station_high_pass = check_stationarity(detrended_high_pass, 'high_pass')

### With Scipy (filtfilt)

In [None]:
from scipy import signal

def detrend_pass_high_scipy(dic_sec : dict):
    ans = {}

    def filtre_passe_haut(x, fc, Fe):
        wc = fc / (Fe / 2)             # normalisation de la fréquence
        b, a = signal.butter(4, wc, 'high')
        y = signal.filtfilt(b, a, x)   # filtrage avant/arrière (pas de décalage)
        return y
    Fe = 1        # fréquence d'échantillonnage (1 point par unité de temps)
    fc = 1/50     # fréquence de coupure : on enlève les tendances plus lentes que 50 pas


    for ticker, df in dic_sec.items():
        df["high_pass"] = filtre_passe_haut(df["close"].values, fc, Fe)
        df['high_pass'] = df['high_pass'].bfill()
        ans[ticker+'_pass_high_scipy'] = df
#        pd.DataFrame.b
    return ans

detrended_high_pass_scipy = detrend_pass_high_scipy(remaining_sec)
station_high_pass_scipy = check_stationarity(detrended_high_pass_scipy, 'high_pass')

### With Scipy (lfilter)

In [None]:
from scipy import signal

def detrend_pass_high_scipy_left(dic_sec : dict):
    ans = {}

    def filtre_passe_haut(x, fc, Fe):
        wc = fc / (Fe / 2)             # normalisation de la fréquence
        b, a = signal.butter(4, wc, 'high')
        y = signal.lfilter(b, a, x)   # filtrage avant/arrière (pas de décalage)
        return y
    Fe = 1        # fréquence d'échantillonnage (1 point par unité de temps)
    fc = 1/50     # fréquence de coupure : on enlève les tendances plus lentes que 50 pas


    for ticker, df in dic_sec.items():
        df["high_pass"] = filtre_passe_haut(df["close"].values, fc, Fe)
        df['high_pass'] = df['high_pass'].bfill()
        ans[ticker+'_pass_high_scipy_left'] = df
#        pd.DataFrame.b
    return ans

detrended_high_pass_scipy_left = detrend_pass_high_scipy_left(remaining_sec)
station_high_pass_scipy_left = check_stationarity(detrended_high_pass_scipy_left, 'high_pass')

# Mean Reversion appliying

In [12]:

from plotly.subplots import make_subplots

def plot_strategy_and_benchmark(strat : pd.Series, benchmark : pd.Series, legend_strat_title="strat", legend_benchmark_title="benchmark", width=1800, height=600, is_strat_secondary_y_axis=False
                                ,title="Sah", x_axis_title='Time', y_axis_title="Performance", y_axis_secondary_title="<b>performance</b> strat", has_to_save=False,save_path="./strat",save_extension='png', template="plotly_dark"):

    fig = make_subplots(specs=[[{"secondary_y": True}]])
    

    # Add traces
    fig.add_trace(
        go.Scatter(x=strat.index , y=strat, name=legend_strat_title,mode='lines'
    ),
        secondary_y=is_strat_secondary_y_axis
    )

    fig.add_trace(
        go.Scatter(x=benchmark.index, y=benchmark, name=legend_benchmark_title),
        secondary_y=False,
    )
    #fig.add_trace(
    #    go.Scatter(x=benchmark.index, y=benchmark, name="benchmark returns", line=dict(color='white')),
    #    secondary_y=False,
    #)

    # Add figure title
    fig.update_layout(
        title_text=title,
        template=template
    )

    # Set x-axis title
    fig.update_xaxes(title_text=x_axis_title)

    # Set y-axes titles
    fig.update_yaxes(title_text=y_axis_title, secondary_y=False)
    if is_strat_secondary_y_axis:
        fig.update_yaxes(title_text=y_axis_secondary_title, secondary_y=True)
    

    #bench white
    # strat yellow
    #background black ou gris fonce
    #writtings white

    if (has_to_save):
        fig.write_image(save_path,format=save_extension, width=width, height=height)
    fig.show()

## Naturally Stationnary

In [None]:
import numpy as np

def compute_sharpe(strat):
    sharpe = (strat.mean() * 365 * 24) / (strat.std() * np.sqrt(365 * 24))
    return sharpe

def get_z_score_signal(df, col='close'):
    window = 24  # exemple : 24h
    df['mean'] = df[col].rolling(window).mean()
    df['std'] = df[col].rolling(window).std()
    df['zscore'] = (df[col] - df['mean']) / df['std']

    upper_threshold = 1
    lower_threshold = -1

    def generate_signal(z):
        if z > upper_threshold:
            return -1    # prix trop haut -> short
        elif z < lower_threshold:
            return 1     # prix trop bas -> long
        else:
            return 0     # neutre

    signal = df['zscore'].apply(generate_signal)
    return signal.shift(1)

def backtest_strat(dic_sec, trading_fees=0.0001, sharp_treshold=0.99, vol_treshold=0.33,
                        return_treshold = 0.01, drawdown_treshold = -0.3, enable_plot=1, col='close'):
    ans = {}
    for ticker, df in dic_sec.items():
        signal = get_z_score_signal(df, col=col)
        strat = signal * df["hourly_return"]
        strat -= (signal.diff().abs() * trading_fees)
#        strat.cumsum().plot()
        ans[ticker] = strat

        sharpe = compute_sharpe(strat)
        annualized_returns = (strat.mean() * 365 * 24)
        annualized_vol = (strat.std() * np.sqrt(365 * 24))
        drawdown = strat - strat.cummax()
        max_drawdown = drawdown.min()
        nbr_position = (signal != 0).sum()
        win_position = (strat  > 0).sum()
        loss_position = (strat < 0).sum()
        flat_position = (strat == 0).sum()
        print(f'== \033[96m{ticker} \033[0m==\n')
        print('let\'s print some metrics\n')

    #    print('let\'s print some metrics\n')
        print(f"number of non flat positions: {nbr_position}")
        print(f"number of winning positions: {win_position}")
        print(f"number of loosing positions (including fees after returning flat): {loss_position}")
        print(f"number of flat positions: {flat_position}")
        print(f"Annualized Return: {"\033[92m" if annualized_returns > return_treshold else '\033[91m'} {annualized_returns:.2%}\033[0m==")
        print(f"Annualized Volatility:{"\033[92m" if annualized_vol < vol_treshold else '\033[91m'} {annualized_vol:.2%}\033[0m==")
        print(f"Sharpe Ratio: {"\033[92m" if sharpe > sharp_treshold else '\033[91m'} {sharpe:.2f}\033[0m==")
        print(f"Max Drawdown: {"\033[92m" if max_drawdown > drawdown_treshold else '\033[91m'} {max_drawdown:.2%}\033[0m==\n\n")
        if enable_plot:
            plot_strategy_and_benchmark(strat.cumsum(), df["hourly_return"].cumsum(), title=ticker)

    return ans

station_strat = backtest_strat(station_dic)

## Logs stationary

In [None]:
station_logs_strat = backtest_strat(station_logs,col="log_returns")

## Linear regression stationary

In [None]:
station_lin_reg_strat = backtest_strat(station_lin_reg,col="detrended")

## Pass High

### Manually

In [None]:
station_pass_high_strat = backtest_strat(station_high_pass,col="high_pass")

### With Scipy filtfilt

In [None]:
station_pass_high_scipy_strat = backtest_strat(station_high_pass_scipy,col="high_pass")

### With Scipy Lfilter

In [None]:
station_pass_high_scipy_left_strat = backtest_strat(station_high_pass_scipy_left,col="high_pass")

In [None]:
station_high_pass_scipy['AAVEETH_pass_high_scipy']['high_pass']