In [None]:
import sys
sys.path.append('../..')
import pandas as pd
pd.options.mode.chained_assignment = None  # default='warn'
import matplotlib.pyplot as plt
import ta
import numpy as np
import datetime
from IPython.display import clear_output
from utilities.data_manager import ExchangeDataManager
from utilities.custom_indicators import get_n_columns
from utilities.bt_analysis import get_metrics, backtest_analysis
from utilities.plot_analysis import plot_equity_vs_asset, plot_futur_simulations, plot_bar_by_month
import nest_asyncio
nest_asyncio.apply()

In [None]:
class Strategy():
    def __init__(
        self,
        df_list,
        oldest_pair,
        type=["long"],
        params={},
    ):
        self.df_list = df_list
        self.oldest_pair = oldest_pair
        self.use_long = True if "long" in type else False
        self.use_short = True if "short" in type else False
        self.params = params

        
    def populate_indicators(self):
        for pair in self.df_list:
            params = self.params[pair]
            df = self.df_list[pair]
            df.drop(
                columns=df.columns.difference(['open','high','low','close','volume']), 
                inplace=True
            )
            
            # -- Populate indicators --    
            if params["src"] == "close":
                src = df["close"]
            elif params["src"] == "ohlc4":
                src = (df["close"] + df["high"] + df["low"] + df["open"]) / 4

            df['ma_base'] = ta.trend.sma_indicator(close=src, window=params["ma_base_window"]).shift(1)
            high_envelopes = [round(1/(1-e)-1, 3) for e in params["envelopes"]]
            for i in range(1, len(params["envelopes"]) + 1):
                df[f'ma_high_{i}'] = df['ma_base'] * (1 + high_envelopes[i-1])
                df[f'ma_low_{i}'] = df['ma_base'] * (1 - params["envelopes"][i-1])
        
            self.df_list[pair] = df
                
        return self.df_list[self.oldest_pair]
    
    def populate_buy_sell(self): 
        data_open_long = []
        data_close_long = []
        data_open_short = []
        data_close_short = []

        for pair in self.df_list:
            params = self.params[pair]
            df = self.df_list[pair]
            # -- Initiate populate --
            df["close_long"] = False
            df["close_short"] = False
            for i in range(1, len(params["envelopes"]) + 1):
                df[f"open_short_{i}"] = False
                df[f"open_long_{i}"] = False
            df["pair"] = pair
            df["null"] = np.nan
            
            
            if self.use_long:
                for i in range(1, len(params["envelopes"]) + 1):
                    df.loc[
                        (df['low'] <= df[f'ma_low_{i}'])
                        , f"open_long_{i}"
                    ] = True
                
                # -- Populate close long --
                df.loc[
                    (df['high'] >= df['ma_base'])
                    , "close_long"
                ] = True
                
            
            if self.use_short:
                for i in range(1, len(params["envelopes"]) + 1):
                    df.loc[
                        (df['high'] >= df[f'ma_high_{i}'])
                        , f"open_short_{i}"
                    ] = True
                
                df.loc[
                    (df['low'] <= df['ma_base'])
                    , "close_short"
                ] = True
                
                
            # -- Populate pair list per date (do not touch)--
            data_open_long.append(
                df.loc[
                (df['open_long_1']  == True) 
                ]['pair']
            )
            data_close_long.append(
                df.loc[
                (df['close_long']  == True) 
                ]['pair']
            )
            data_open_short.append(
                df.loc[
                (df['open_short_1']  == True) 
                ]['pair']
            )
            data_close_short.append(
                df.loc[
                (df['close_short']  == True) 
                ]['pair']
            )
        
        del df["pair"]
        del df["null"]
        self.df_list[pair] = df

        data_open_long.append(self.df_list[self.oldest_pair]['null'])
        data_close_long.append(self.df_list[self.oldest_pair]['null'])
        data_open_short.append(self.df_list[self.oldest_pair]['null'])
        data_close_short.append(self.df_list[self.oldest_pair]['null'])
        df_open_long = pd.concat(data_open_long, axis=1)
        df_open_long['combined']= df_open_long.values.tolist()
        df_open_long['combined'] = [[i for i in j if i == i] for j in list(df_open_long['combined'])]
        df_close_long = pd.concat(data_close_long, axis=1)
        df_close_long['combined']= df_close_long.values.tolist()
        df_close_long['combined'] = [[i for i in j if i == i] for j in list(df_close_long['combined'])]
        df_open_short = pd.concat(data_open_short, axis=1)
        df_open_short['combined']= df_open_short.values.tolist()
        df_open_short['combined'] = [[i for i in j if i == i] for j in list(df_open_short['combined'])]
        df_close_short = pd.concat(data_close_short, axis=1)
        df_close_short['combined']= df_close_short.values.tolist()
        df_close_short['combined'] = [[i for i in j if i == i] for j in list(df_close_short['combined'])]
        self.open_long_obj = df_open_long['combined']
        self.close_long_obj = df_close_long['combined']
        self.open_short_obj = df_open_short['combined']
        self.close_short_obj = df_close_short['combined']
        
        
        return self.df_list[self.oldest_pair]
        
    def run_backtest(self, initial_wallet=1000, leverage=1):
        params = self.params
        df_ini = self.df_list[self.oldest_pair][:]
        wallet = initial_wallet
        long_exposition = 0
        short_exposition = 0
        maker_fee = 0.0002
        taker_fee = 0.0006
        trades = []
        days = []
        current_day = 0
        previous_day = 0
        current_positions = {}
        
        for index, row in df_ini.iterrows():
            # -- Add daily report --
            current_day = index.day
            if previous_day != current_day:
                temp_wallet = wallet
                for pair in current_positions:
                    actual_row = self.df_list[pair].loc[index]
                    if current_positions[pair]['side'] == "LONG":
                        close_price = actual_row['open']
                        trade_result = (close_price - current_positions[pair]['price']) / current_positions[pair]['price']
                        close_size = current_positions[pair]['size'] + current_positions[pair]['size']  * trade_result
                        fee = close_size * taker_fee
                        temp_wallet += close_size - current_positions[pair]['size'] - fee
                    elif current_positions[pair]['side'] == "SHORT":
                        close_price = actual_row['open']
                        trade_result = (current_positions[pair]['price'] - close_price) / current_positions[pair]['price']
                        close_size = current_positions[pair]['size'] + current_positions[pair]['size']  * trade_result
                        fee = close_size * taker_fee
                        temp_wallet += close_size - current_positions[pair]['size'] - fee
                    
                days.append({
                    "day":str(index.year)+"-"+str(index.month)+"-"+str(index.day),
                    "wallet":temp_wallet,
                    "price":row['open'],
                    "long_exposition":0,
                    "short_exposition":0,
                    "risk":0,
                })
            previous_day = current_day 
            
            
            close_long_row = self.close_long_obj.loc[index]
            close_short_row = self.close_short_obj.loc[index]
            closed_pair = []
            if len(current_positions) > 0:
                # -- Close LONG --
                long_position_to_close = set({k: v for k,v in current_positions.items() if v['side'] == "LONG"}).intersection(set(close_long_row))
                for pair in long_position_to_close:
                    actual_row = self.df_list[pair].loc[index]
                    close_price = actual_row['ma_base']
                    trade_result = (close_price - current_positions[pair]['price']) / current_positions[pair]['price']
                    close_size = current_positions[pair]['size'] + current_positions[pair]['size'] * trade_result
                    fee = close_size * maker_fee
                    wallet += close_size - current_positions[pair]['size'] - fee
                    trades.append({
                        "pair": pair,
                        "open_date": current_positions[pair]['date'],
                        "close_date": index,
                        "position": current_positions[pair]['side'],
                        "open_reason": current_positions[pair]['reason'],
                        "close_reason": "Market",
                        "open_price": current_positions[pair]['price'],
                        "close_price": close_price,
                        "open_fee": current_positions[pair]['fee'],
                        "close_fee": fee,
                        "open_trade_size":current_positions[pair]['size'],
                        "close_trade_size":close_size,
                        "wallet": wallet,
                    })
                    del current_positions[pair]
                    closed_pair.append(pair)
                    
                # -- Close SHORT market --
                short_position_to_close = set({k: v for k,v in current_positions.items() if v['side'] == "SHORT"}).intersection(set(close_short_row))
                for pair in short_position_to_close:
                    actual_row = self.df_list[pair].loc[index]
                    close_price = actual_row['ma_base']
                    trade_result = (current_positions[pair]['price'] - close_price) / current_positions[pair]['price']
                    close_size = current_positions[pair]['size'] + current_positions[pair]['size'] * trade_result
                    fee = close_size * taker_fee
                    wallet += close_size - current_positions[pair]['size'] - fee
                    trades.append({
                        "pair": pair,
                        "open_date": current_positions[pair]['date'],
                        "close_date": index,
                        "position": current_positions[pair]['side'],
                        "open_reason": current_positions[pair]['reason'],
                        "close_reason": "Market",
                        "open_price": current_positions[pair]['price'],
                        "close_price": close_price,
                        "open_fee": current_positions[pair]['fee'],
                        "close_fee": fee,
                        "open_trade_size":current_positions[pair]['size'],
                        "close_trade_size":close_size,
                        "wallet": wallet,
                    })
                    del current_positions[pair]
                    closed_pair.append(pair)
                    
            # -- Check for opening position --
            # -- Open LONG market --
            open_long_row = self.open_long_obj.loc[index]
            for pair in open_long_row:
                actual_position = None
                actual_row = self.df_list[pair].loc[index]
                for i in range(1, len(params[pair]["envelopes"]) + 1):
                    if pair in current_positions:
                        actual_position = current_positions[pair]
                    if (actual_position and actual_position["side"] == "SHORT") or (actual_row[f"open_long_{i}"] == False) or (pair in closed_pair):
                        break
                    if actual_position and actual_position["envelope"] >= i:
                        continue
                    if actual_row[f"open_long_{i}"]:
                        open_price = actual_row[f'ma_low_{i}']
                        pos_size = (params[pair]["size"] * wallet * leverage) / len(params[pair]["envelopes"])
                        fee = pos_size * maker_fee
                        pos_size -= fee
                        wallet -= fee
                        if actual_position:
                            actual_position["price"] = (actual_position["size"] * actual_position["price"] + open_price * pos_size) / (actual_position["size"] + pos_size)
                            actual_position["size"] = actual_position["size"] + pos_size
                            actual_position["fee"] = actual_position["fee"] + fee
                            actual_position["envelope"] = i
                            actual_position["reason"] = f"Limit Envelop {i}"
                        else:
                            current_positions[pair] = {
                                "size": pos_size,
                                "date": index,
                                "price": open_price,
                                "fee":fee,
                                "reason": f"Limit Envelop {i}",
                                "side": "LONG",
                                "envelope": i,
                            }
                        long_exposition += 0
            # -- Open SHORT market --
            open_short_row = self.open_short_obj.loc[index]
            for pair in open_short_row:
                actual_position = None
                actual_row = self.df_list[pair].loc[index]
                for i in range(1, len(params[pair]["envelopes"]) + 1):
                    if pair in current_positions:
                        actual_position = current_positions[pair]
                    if (actual_position and actual_position["side"] == "LONG") or actual_row[f"open_short_{i}"] == False or (pair in closed_pair):
                        break
                    if actual_position and actual_position["envelope"] >= i:
                        continue
                    if actual_row[f"open_short_{i}"]:
                        open_price = actual_row[f'ma_high_{i}']
                        pos_size = (params[pair]["size"] * wallet * leverage) / len(params[pair]["envelopes"])
                        fee = pos_size * maker_fee
                        pos_size -= fee
                        wallet -= fee
                        if actual_position:
                            actual_position["price"] = (actual_position["size"] * actual_position["price"] + open_price * pos_size) / (actual_position["size"] + pos_size)
                            actual_position["size"] = actual_position["size"] + pos_size
                            actual_position["fee"] = actual_position["fee"] + fee
                            actual_position["envelope"] = i
                            actual_position["reason"] = f"Limit Envelop {i}"
                        else:
                            current_positions[pair] = {
                                "size": pos_size,
                                "date": index,
                                "price": open_price,
                                "fee":fee,
                                "reason": f"Limit Envelop {i}",
                                "side": "SHORT",
                                "envelope": i,
                            }
                        short_exposition += 0             
                        
        df_days = pd.DataFrame(days)
        df_days['day'] = pd.to_datetime(df_days['day'])
        df_days = df_days.set_index(df_days['day'])

        df_trades = pd.DataFrame(trades)
        df_trades['open_date'] = pd.to_datetime(df_trades['open_date'])
        df_trades = df_trades.set_index(df_trades['open_date'])   
        
        return get_metrics(df_trades, df_days) | {
            "wallet": wallet,
            "trades": df_trades,
            "days": df_days
        }      

In [None]:
params = {
    "BTC/USDT:USDT":{
        "src": "close",
        "ma_base_window": 7,
        "envelopes": [0.07, 0.1, 0.15],
        "size": 0.1,
    },
    "ETH/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15],
        "size": 0.1,
    },
    "ADA/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.09, 0.12, 0.15],
        "size": 0.1,
    },
    "AVAX/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.09, 0.12, 0.15],
        "size": 0.1,
    },
    "EGLD/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "KSM/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "OCEAN/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "REN/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "ACH/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "APE/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "CRV/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "DOGE/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "ENJ/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "FET/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "ICP/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "IMX/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "LDO/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "MAGIC/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "REEF/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "SAND/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "TRX/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
    "XTZ/USDT:USDT":{
        "src": "close",
        "ma_base_window": 5,
        "envelopes": [0.07, 0.1, 0.15, 0.2],
        "size": 0.05,
    },
}

pair_list = list(params.keys())
exchange_name = "bitget"
tf = '1h'
oldest_pair = "BTC/USDT:USDT"

exchange = ExchangeDataManager(
    exchange_name=exchange_name, 
    path_download="../database/exchanges"
)

df_list = {}
for pair in pair_list:
    df = exchange.load_data(pair, tf)
    df_list[pair] = df.loc[:]

print("Data load 100%")
df_list[oldest_pair]

In [None]:
strat = Strategy(
    df_list=df_list,
    oldest_pair=oldest_pair,
    type=["long","short"],
    params=params,
)

strat.populate_indicators()
strat.populate_buy_sell()
bt_result = strat.run_backtest(initial_wallet=1000, leverage=5)

df_trades, df_days = backtest_analysis(
    trades=bt_result['trades'], 
    days=bt_result['days'],
    general_info=True,
    trades_info=True,
    days_info=True,
    long_short_info=True,
    entry_exit_info=True,
    exposition_info=True,
    pair_info=True,
    indepedant_trade=True
)

In [None]:
plot_equity_vs_asset(df_days=df_days.loc[:])

In [None]:
plot_bar_by_month(df_days=df_days)

In [None]:
plot_futur_simulations(
    df_trades=df_trades,
    trades_multiplier=3,
    trades_to_forecast=500,
    number_of_simulations=100,
    true_trades_to_show=100,
    # show_all_simulations=True,
)

In [None]:
from lightweight_charts import Chart
from lightweight_charts import JupyterChart
pair = "AVAX/USDT:USDT"
dt = df_list[pair].copy().loc["2023"]
dt_trades = df_trades.copy().loc["2023"]
dt_trades = dt_trades.loc[dt_trades["pair"] == pair]
chart = JupyterChart(width=900, height=400)

dt["time"] = dt.index
chart.set(dt)

lines = ["ma_low_1", "ma_high_1", "ma_base", "ma_low_2", "ma_low_3", "ma_high_2", "ma_high_3"]
for line in lines:
    line_object = chart.create_line(line, width=1)
    line_data = pd.DataFrame({"time": dt.index, line: dt[line]})
    line_object.set(line_data)

def place_buy_order(key):
    print(f'Buy {key} shares.')

for trades in dt_trades.iterrows():
    if trades[1]['position'] == "LONG":
        chart.marker(time=trades[1]['open_date'], position="below", shape="arrow_up", color="green", text="Long")
        chart.marker(time=trades[1]['close_date'], position="above", shape="arrow_down", color="white", text="Close")
    elif trades[1]['position'] == "SHORT":
        chart.marker(time=trades[1]['open_date'], position="above", shape="arrow_down", color="red", text="Short")
        chart.marker(time=trades[1]['close_date'], position="below", shape="arrow_up", color="white", text="Close")

# chart.show(block=True)
chart.load()

