In [None]:
import sys
sys.path.append('../..')
import pandas as pd
pd.options.mode.chained_assignment = None  # default='warn'
import matplotlib.pyplot as plt
import ta
import numpy as np
import datetime
from IPython.display import clear_output
from utilities.data_manager import ExchangeDataManager
from utilities.custom_indicators import get_n_columns
from utilities.bt_analysis import get_metrics, backtest_analysis
from utilities.plot_analysis import plot_equity_vs_asset, plot_futur_simulations, plot_bar_by_month
import nest_asyncio
nest_asyncio.apply()

In [None]:
class Strategy:
    def __init__(
        self,
        df_list,
        oldest_pair,
        type=["long"],
        params={},
    ):
        self.df_list = df_list
        self.oldest_pair = oldest_pair
        self.use_long = True if "long" in type else False
        self.use_short = True if "short" in type else False
        self.params = params

    def populate_indicators(self):
        for pair in self.df_list:
            params = self.params[pair]
            df = self.df_list[pair]
            df.drop(
                columns=df.columns.difference(
                    ["open", "high", "low", "close", "volume"]
                ),
                inplace=True,
            )

            # -- Populate indicators --
            if params["src"] == "close":
                src = df["close"]
            elif params["src"] == "ohlc4":
                src = (df["close"] + df["high"] + df["low"] + df["open"]) / 4

            df["ma_base"] = ta.trend.sma_indicator(
                close=src, window=params["ma_base_window"]
            ).shift(1)
            high_envelopes = [round(1 / (1 - e) - 1, 3) for e in params["envelopes"]]
            for i in range(1, len(params["envelopes"]) + 1):
                df[f"ma_high_{i}"] = df["ma_base"] * (1 + high_envelopes[i - 1])
                df[f"ma_low_{i}"] = df["ma_base"] * (1 - params["envelopes"][i - 1])
                df = get_n_columns(df, [f"ma_high_{i}", f"ma_low_{i}"], 1)

            df = get_n_columns(df, ["low", "high"], 1)

            self.df_list[pair] = df

        return self.df_list[self.oldest_pair]

    def populate_buy_sell(self):
        full_list = []

        for pair, df in self.df_list.items():
            params = self.params[pair]
            df["pair"] = pair
            df["open_long_any"] = False
            df["open_short_any"] = False
            for i in range(1, len(params["envelopes"]) + 1):
                df[f"open_long_{i}"] = np.where(
                    (df["low"] <= df[f"ma_low_{i}"])
                    & (df["n1_low"] > df[f"n1_ma_low_{i}"]),
                    True,
                    False,
                )
                df[f"open_short_{i}"] = np.where(
                    (df['high'] >= df[f'ma_high_{i}'])
                    & (df['n1_high'] < df[f'n1_ma_high_{i}']),
                    True,
                    False,
                )

            df["open_long_any"] = df[[f"open_long_{i}" for i in range(1, len(params["envelopes"]) + 1)]].any(axis=1)
            df["open_short_any"] = df[[f"open_short_{i}" for i in range(1, len(params["envelopes"]) + 1)]].any(axis=1)
            df[f"close_long"] = np.where(
                (df['high'] >= df['ma_base']),
                True,
                False,
            )
            df[f"close_short"] = np.where(
                (df['low'] <= df['ma_base']),
                True,
                False,
            )
            self.df_list[pair] = df
            full_list.append(df)
        
        df_full = pd.concat(full_list)
        df_full = df_full.sort_index()
        self.open_long_obj = df_full[df_full['open_long_any']].groupby('date')['pair'].apply(list).to_dict() if self.use_long else {}
        self.close_long_obj = df_full[df_full['close_long']].groupby('date')['pair'].apply(list).to_dict() if self.use_long else {}
        self.open_short_obj = df_full[df_full['open_short_any']].groupby('date')['pair'].apply(list).to_dict() if self.use_short else {}
        self.close_short_obj = df_full[df_full['close_short']].groupby('date')['pair'].apply(list).to_dict() if self.use_short else {}

        return self.df_list[self.oldest_pair]

    def run_backtest(self, initial_wallet=1000, leverage=1, sl=0.3):
        params = self.params
        df_ini = self.df_list[self.oldest_pair][:]
        wallet = initial_wallet
        long_exposition = 0
        short_exposition = 0
        maker_fee = 0.0002
        taker_fee = 0.0006
        trades = []
        days = []
        current_day = 0
        previous_day = 0
        positions = {}

        for index, ini_row in df_ini.iterrows():
            # -- Add daily report --
            current_day = index.day
            if previous_day != current_day:
                temp_wallet = wallet
                for pair in positions:
                    row = self.df_list[pair].loc[index]
                    position = positions[pair]
                    if position["side"] == "LONG":
                        close_price = row["open"]
                        trade_result = (close_price - position["price"]) / position[
                            "price"
                        ]
                        close_size = position["size"] + position["size"] * trade_result
                        fee = close_size * taker_fee
                        temp_wallet += close_size - position["size"] - fee
                    elif position["side"] == "SHORT":
                        close_price = row["open"]
                        trade_result = (position["price"] - close_price) / position[
                            "price"
                        ]
                        close_size = position["size"] + position["size"] * trade_result
                        fee = close_size * taker_fee
                        temp_wallet += close_size - position["size"] - fee

                days.append(
                    {
                        "day": str(index.year)
                        + "-"
                        + str(index.month)
                        + "-"
                        + str(index.day),
                        "wallet": temp_wallet,
                        "price": ini_row["open"],
                        "long_exposition": 0,
                        "short_exposition": 0,
                        "risk": 0,
                    }
                )
            previous_day = current_day

            close_long_row = self.close_long_obj[index] if index in self.close_long_obj else []
            close_short_row = self.close_short_obj[index] if index in self.close_short_obj else []
            closed_pair = []
            if len(positions) > 0:
                # -- Close SL --
                for pair in positions.copy():
                    row = self.df_list[pair].loc[index]
                    position = positions[pair]
                    if (
                        position["side"] == "LONG"
                        and row["low"] <= position["stop_loss_price"]
                    ):
                        close_price = min(
                            position["stop_loss_price"], row["open"]
                        )
                        trade_result = (close_price - position["price"]) / position[
                            "price"
                        ]
                        close_size = position["size"] + position["size"] * trade_result
                        fee = close_size * maker_fee
                        wallet += close_size - position["size"] - fee
                        trades.append(
                            {
                                "pair": pair,
                                "open_date": position["date"],
                                "close_date": index,
                                "position": position["side"],
                                "open_reason": position["reason"],
                                "close_reason": "SL",
                                "open_price": position["price"],
                                "close_price": close_price,
                                "open_fee": position["fee"],
                                "close_fee": fee,
                                "open_trade_size": position["size"],
                                "close_trade_size": close_size,
                                "wallet": wallet,
                            }
                        )
                        del positions[pair]
                        closed_pair.append(pair)
                    elif (
                        position["side"] == "SHORT"
                        and row["high"] >= position["stop_loss_price"]
                    ):
                        close_price = max(
                            position["stop_loss_price"], row["open"]
                        )
                        trade_result = (position["price"] - close_price) / position[
                            "price"
                        ]
                        close_size = position["size"] + position["size"] * trade_result
                        fee = close_size * taker_fee
                        wallet += close_size - position["size"] - fee
                        trades.append(
                            {
                                "pair": pair,
                                "open_date": position["date"],
                                "close_date": index,
                                "position": position["side"],
                                "open_reason": position["reason"],
                                "close_reason": "SL",
                                "open_price": position["price"],
                                "close_price": close_price,
                                "open_fee": position["fee"],
                                "close_fee": fee,
                                "open_trade_size": position["size"],
                                "close_trade_size": close_size,
                                "wallet": wallet,
                            }
                        )
                        del positions[pair]
                        closed_pair.append(pair)
                # -- Close LONG --
                long_position_to_close = set(
                    {k: v for k, v in positions.items() if v["side"] == "LONG"}
                ).intersection(set(close_long_row))
                for pair in long_position_to_close:
                    if pair in closed_pair:
                        continue
                    row = self.df_list[pair].loc[index]
                    position = positions[pair]
                    close_price = row["ma_base"]
                    trade_result = (close_price - position["price"]) / position["price"]
                    close_size = position["size"] + position["size"] * trade_result
                    fee = close_size * maker_fee
                    wallet += close_size - position["size"] - fee
                    trades.append(
                        {
                            "pair": pair,
                            "open_date": position["date"],
                            "close_date": index,
                            "position": position["side"],
                            "open_reason": position["reason"],
                            "close_reason": "Market",
                            "open_price": position["price"],
                            "close_price": close_price,
                            "open_fee": position["fee"],
                            "close_fee": fee,
                            "open_trade_size": position["size"],
                            "close_trade_size": close_size,
                            "wallet": wallet,
                        }
                    )
                    del positions[pair]
                    closed_pair.append(pair)

                # -- Close SHORT market --
                short_position_to_close = set(
                    {k: v for k, v in positions.items() if v["side"] == "SHORT"}
                ).intersection(set(close_short_row))
                for pair in short_position_to_close:
                    if pair in closed_pair:
                        continue
                    row = self.df_list[pair].loc[index]
                    position = positions[pair]
                    close_price = row["ma_base"]
                    trade_result = (position["price"] - close_price) / position["price"]
                    close_size = position["size"] + position["size"] * trade_result
                    fee = close_size * taker_fee
                    wallet += close_size - position["size"] - fee
                    trades.append(
                        {
                            "pair": pair,
                            "open_date": position["date"],
                            "close_date": index,
                            "position": position["side"],
                            "open_reason": position["reason"],
                            "close_reason": "Market",
                            "open_price": position["price"],
                            "close_price": close_price,
                            "open_fee": position["fee"],
                            "close_fee": fee,
                            "open_trade_size": position["size"],
                            "close_trade_size": close_size,
                            "wallet": wallet,
                        }
                    )
                    del positions[pair]
                    closed_pair.append(pair)

            # -- Check for opening position --
            # -- Open LONG limit --
            open_long_row = self.open_long_obj[index] if index in self.open_long_obj else []
            for pair in open_long_row:
                position = None
                row = self.df_list[pair].loc[index]

                for i in range(1, len(params[pair]["envelopes"]) + 1):
                    if pair in positions:
                        position = positions[pair]
                    if (position and position["side"] == "SHORT") or (
                        pair in closed_pair
                    ):
                        break
                    if position and position["envelope"] >= i:
                        continue
                    if row[f"open_long_{i}"]:
                        open_price = min(row[f"ma_low_{i}"], row["open"])
                        pos_size = (params[pair]["size"] * wallet * leverage) / len(
                            params[pair]["envelopes"]
                        )
                        fee = pos_size * maker_fee
                        pos_size -= fee
                        wallet -= fee
                        if position:
                            position["price"] = (
                                position["size"] * position["price"]
                                + open_price * pos_size
                            ) / (position["size"] + pos_size)
                            position["size"] = position["size"] + pos_size
                            position["fee"] = position["fee"] + fee
                            position["envelope"] = i
                            position["reason"] = f"Limit Envelop {i}"
                        else:
                            stop_loss_price = open_price * (1 - sl)
                            positions[pair] = {
                                "size": pos_size,
                                "date": index,
                                "price": open_price,
                                "fee": fee,
                                "reason": f"Limit Envelop {i}",
                                "side": "LONG",
                                "envelope": i,
                                "stop_loss_price": stop_loss_price,
                            }
            # -- Open SHORT limit --
            open_short_row = self.open_short_obj[index] if index in self.open_short_obj else []
            for pair in open_short_row:
                position = None
                row = self.df_list[pair].loc[index]
                for i in range(1, len(params[pair]["envelopes"]) + 1):
                    if pair in positions:
                        position = positions[pair]
                    # if (position and position["side"] == "LONG") or row[f"open_short_{i}"] == False or (pair in closed_pair):
                    if (position and position["side"] == "LONG") or (
                        pair in closed_pair
                    ):
                        break
                    if position and position["envelope"] >= i:
                        continue
                    if row[f"open_short_{i}"]:
                        open_price = max(row[f"ma_high_{i}"], row["open"])
                        pos_size = (params[pair]["size"] * wallet * leverage) / len(
                            params[pair]["envelopes"]
                        )
                        fee = pos_size * maker_fee
                        pos_size -= fee
                        wallet -= fee

                        if position:
                            position["price"] = (
                                position["size"] * position["price"]
                                + open_price * pos_size
                            ) / (position["size"] + pos_size)
                            position["size"] = position["size"] + pos_size
                            position["fee"] = position["fee"] + fee
                            position["envelope"] = i
                            position["reason"] = f"Limit Envelop {i}"
                        else:
                            stop_loss_price = open_price * (1 + sl)
                            positions[pair] = {
                                "size": pos_size,
                                "date": index,
                                "price": open_price,
                                "fee": fee,
                                "reason": f"Limit Envelop {i}",
                                "side": "SHORT",
                                "envelope": i,
                                "stop_loss_price": stop_loss_price,
                            }

        df_days = pd.DataFrame(days)
        df_days["day"] = pd.to_datetime(df_days["day"])
        df_days = df_days.set_index(df_days["day"])

        df_trades = pd.DataFrame(trades)
        df_trades["open_date"] = pd.to_datetime(df_trades["open_date"])
        df_trades = df_trades.set_index(df_trades["open_date"])

        return get_metrics(df_trades, df_days) | {
            "wallet": wallet,
            "trades": df_trades,
            "days": df_days,
        }

In [None]:
params = {
    "BTC/USDT:USDT":{
        "src": "close",
        "ma_base_window": 7,
        "envelopes": [0.07, 0.1, 0.15],
        "size": 0.1,
    },
    "ETH/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15],
        "size": 0.1,
    },
    "ADA/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.09, 0.12, 0.15],
        "size": 0.1,
    },
    "AVAX/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.09, 0.12, 0.15],
        "size": 0.1,
    },
    "EGLD/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "KSM/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "OCEAN/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "ACH/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "APE/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "CRV/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "DOGE/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "ENJ/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "FET/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "ICP/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "IMX/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "LDO/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "MAGIC/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "SAND/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "TRX/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "XTZ/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
}

pair_list = list(params.keys())
exchange_name = "bitget"
tf = '1h'
# Restrict backtest to 2024
start_date = "2024-01-01 00:00:00"
end_date = "2025-01-01 00:00:00"
oldest_pair = "BTC/USDT:USDT"

exchange = ExchangeDataManager(
    exchange_name=exchange_name, 
    path_download="../database"
)

df_list = {}
for pair in pair_list:
    try:
        df = exchange.load_data(pair, tf)
        df = df.loc[start_date:end_date]
        df_list[pair] = df.loc[:]
    except FileNotFoundError:
        print(f"Warning: Data file for {pair} not found, skipping...")
        continue

print("Data load 100%")
df_list[oldest_pair]

In [None]:
strat = Strategy(
    df_list=df_list,
    oldest_pair=oldest_pair,
    type=["long","short"],
    params=params,
)

strat.populate_indicators()
strat.populate_buy_sell()
bt_result = strat.run_backtest(initial_wallet=1000, leverage=4, sl=0.4)

df_trades, df_days = backtest_analysis(
    trades=bt_result['trades'], 
    days=bt_result['days'],
    general_info=True,
    trades_info=True,
    days_info=True,
    long_short_info=True,
    entry_exit_info=True,
    exposition_info=True,
    pair_info=True,
    indepedant_trade=True
)

In [None]:
df_trades.loc[df_trades["close_reason"] == "SL"]

In [None]:
plot_equity_vs_asset(df_days=df_days.loc[:])

In [None]:
plot_bar_by_month(df_days=df_days)

In [None]:
df_trades.sort_values(by=["trade_result_pct"])

In [None]:
plot_futur_simulations(
    df_trades=df_trades,
    trades_multiplier=3,
    trades_to_forecast=500,
    number_of_simulations=100,
    true_trades_to_show=100,
    # show_all_simulations=True,
)

In [None]:
# from lightweight_charts import Chart
# from lightweight_charts import JupyterChart
# pair = "AVAX/USDT:USDT"
# dt = df_list[pair].copy().loc["2023-11"]
# dt_trades = df_trades.copy().loc["2023-11"]
# dt_trades = dt_trades.loc[dt_trades["pair"] == pair]
# chart = JupyterChart(width=900, height=400)

# dt["time"] = dt.index
# chart.set(dt)

# lines = ["ma_base"]
# for i in range(1, len(params[pair]["envelopes"]) + 1):
#     lines.append(f"ma_low_{i}")
#     lines.append(f"ma_high_{i}")
# for line in lines:
#     line_object = chart.create_line(line, width=1)
#     line_data = pd.DataFrame({"time": dt.index, line: dt[line]})
#     line_object.set(line_data)

# def place_buy_order(key):
#     print(f'Buy {key} shares.')

# for trades in dt_trades.iterrows():
#     if trades[1]['position'] == "LONG":
#         chart.marker(time=trades[1]['open_date'], position="below", shape="arrow_up", color="green", text="Long")
#         chart.marker(time=trades[1]['close_date'], position="above", shape="arrow_down", color="white", text=trades[1]["close_reason"])
#     elif trades[1]['position'] == "SHORT":
#         chart.marker(time=trades[1]['open_date'], position="above", shape="arrow_down", color="red", text="Short")
#         chart.marker(time=trades[1]['close_date'], position="below", shape="arrow_up", color="white", text=trades[1]["close_reason"])

# # chart.show(block=True)
# chart.load()



In [None]:
import os

def save_equity_csv(df_days, filepath="multi_envelope_equity.csv"):
    """Save equity curve to CSV.
    - df_days: DataFrame with a 'wallet' column (and optional 'day').
    - filepath: output CSV path. Defaults to project root.
    """
    if df_days is None:
        raise ValueError("df_days is None. Provide a valid DataFrame.")
    if 'wallet' not in df_days.columns:
        raise ValueError("df_days must contain a 'wallet' column.")

    columns_to_save = ['day', 'wallet'] if 'day' in df_days.columns else ['wallet']

    parent_dir = os.path.dirname(filepath)
    if parent_dir:
        os.makedirs(parent_dir, exist_ok=True)

    df_days.loc[:, columns_to_save].to_csv(filepath, index=('day' not in df_days.columns))
    print(f"Saved equity curve to {os.path.abspath(filepath)}")

save_equity_csv(df_days) 
