## Strategy Detail

In [2]:
@dataclass
class Context:
    """
    store your context here for your bot logic
    """
    first_run: bool = True

    shoot_list: list = field(default_factory=list)

    buy_counter: int = 0
    sell_counter: int = 0

## Work like on init in FxDreema
def on_init_data(algo: ea.Algo) -> pd.DataFrame:
    """
    this function will be called after read csv or get data from exchange
    """

    # Step 3 : Set up the input parameter
    floor = 30_000  # Starting amount
    ceiling = 70_000  # Maximum amount
    spread_percentage = 1  # %spread (e.g., 10 for 10%)
    spread = spread_percentage / 100

    # Create the new dataframe from the following data
    GAP = spread * ceiling
    amount = round((ceiling - floor) / GAP) - 1

    # Step 5 : Set up the money to use
    initial_money = 1500
    money_per_layer = initial_money / amount

    # Step 4 : (on Tick) Create the Trading Sheet that will use to Trade
    layer = []
    price_to_trade = []

    for i in range(amount):
        layer.append(i + 1)
        price_to_trade.append(ceiling - (GAP * (i + 1)))

    # Create a DataFrame
    df = pd.DataFrame({
        'layer': layer,
        'price_to_trade': price_to_trade,
    })

    data_df = algo.get_klines("BTC/USDT:USDT", '5m')
    data_df["highest_layer"] = df.loc[0, 'price_to_trade']
    data_df["lowest_layer"] = df.loc[amount-1, 'price_to_trade']
    data_df['GAP'] = GAP
    data_df['money_per_layer'] = money_per_layer

    # print("Calcualte entry signal time taken:", elapsed_time, "seconds")
    # data_df.to_csv("on_init_data_div_original.csv",index=False)
    return data_df[["lowest_layer", "highest_layer", "GAP", "money_per_layer"]]


## work every tick as the FxDreema
def on_interval(algo: ea.Algo, ctx: Context):
    """
    this function will be called every interval
    """
    if ctx.get_initial_balance:
        # do at the First RUN
        ctx.balance = algo.get_balance().available
        ctx.margin_fix = get_margin_fix(balance=ctx.balance, pct_open=ctx.pct_open)
        algo.log_info(f"balance = {ctx.balance} | pct_open = {ctx.pct_open}")
        ctx.get_initial_balance = False

    # NESSESSARY DATA DEFINE
    symbol = "BTC/USDT:USDT"
    close = algo.get_last_price(symbol)
    position = algo.get_positions()
    position_long = position[symbol]["LONG"]
    position_short = position[symbol]["SHORT"]
    status = get_status(position_long, position_short)
    balance = algo.get_balance().available

    lowest_layer = algo.get_data('lowest_layer')
    highest_layer = algo.get_data('highest_layer')
    GAP = algo.get_data('GAP')
    money_per_layer = algo.get_data('money_per_layer')

    algo.log_info(f"close = {close}| long_q = {position_long.quantity}| short_q = {position_short.quantity} ")
    if ctx.first_run:
        quantity = money_per_layer / close
        algo.buy(quantity=quantity, symbol=symbol, position_side="LONG")

        shoot_buy_list = []
        shoot_sell_list = []
        # Setup the layer
        _ = close
        while _ >= lowest_layer:
            _ -= GAP
            shoot_buy_list.insert(0, _)

        _ = close
        while _ <= highest_layer:
            _ += GAP
            shoot_sell_list.append(_)

        # shoot_buy_list = shoot_buy_list.sort(reverse=True)
        ctx.shoot_list = shoot_buy_list + [close] + shoot_sell_list

        ctx.buy_counter = ctx.shoot_list.index(close) - 1
        ctx.sell_counter = ctx.shoot_list.index(close) + 1
        ctx.active = ctx.shoot_list.index(close)

        # finish the first run
        ctx.first_run = False

    if not ctx.first_run:
        try:
            if close <= ctx.shoot_list[ctx.buy_counter]:
                # act 1 : buy the order
                quantity = money_per_layer / close
                algo.buy(quantity=quantity, symbol=symbol, position_side="LONG")
                # act 2 : update the sell
                ctx.buy_counter -= 1
                ctx.sell_counter -= 1
        except:
            print('The value is the same')

        if position_long.quantity > 0:
            try:
                if close >= ctx.shoot_list[ctx.sell_counter]:
                    # act 1 : buy the order
                    quantity = money_per_layer / close

                    if position_long.quantity < quantity:
                        quantity = position_long.quantity

                    algo.sell(quantity=quantity, symbol=symbol, position_side="LONG")
                    # act 2 : update the buy
                    ctx.buy_counter += 1
                    ctx.sell_counter += 1
            except:
                print('The value is the same')

        if position_long.quantity == 0:
            try:
                if close >= ctx.shoot_list[ctx.sell_counter]:
                    # act 1 : buy the order
                    quantity = money_per_layer / close

                    algo.buy(quantity=quantity, symbol=symbol, position_side="LONG")
                    # act 2 : update the buy
                    ctx.buy_counter += 1
                    ctx.sell_counter += 1
            except:
                print('The value is the same')



    return

# Optimizer PSO & Walk Forward analysis

In [None]:
# PSO objective function
def objective_function(params, start, end, l_interval, s_interval):
    ######## Fix Here ##########
    floor_value, ceiling_value, spread_percentage_value = params

    global TEST_PERIOD_START, TEST_PERIOD_END, INTERVAL, SMALL_INTERVAL
    global floor, ceiling, spread_percentage, initial_money
    global kline_dict

    ########## FIX HERE ###########
    floor = floor_value * 1000
    ceiling = ceiling_value * 1000
    spread_percentage = spread_percentage_value
    initial_money = 1000


    TEST_PERIOD_START = start
    TEST_PERIOD_END = end
    INTERVAL = l_interval
    SMALL_INTERVAL = s_interval

    # print("obj_func : ", LOOK_BACK, ADX_value, BB_value, BB_std_value)
    print('start', TEST_PERIOD_START,
          'end', TEST_PERIOD_END)
    
    try :
        ctx = Context()
        trade_df, summary_df, positions_df, trades_bt = ea.run_backtest(
            ctx,
            asset="USDT",
            on_interval_func=on_interval,
            large_interval=INTERVAL,
            small_interval=SMALL_INTERVAL,
            start_date=TEST_PERIOD_START,
            end_date=TEST_PERIOD_END,
            kline_dict=kline_dict,  # type:ignore
            balance_asset_initial=1_000,
            pct_fee=0.05 / 100,
            leverage=1,
            on_init_data_func=on_init_data,
            is_sl_before_tp=True,
        )

        ## REWARD FUNCTION
        statistice_data_df, winrate, apr,prom,total_trade,max_absolute_drawdown,RoMAD,sharpe_ratio,pct_positive_months,pct_positive_week,offset_total = get_stats(trade_df=trade_df, summary_df=summary_df,trades_bt=trades_bt)
        print(f"winrate = {winrate}% | APR = {apr}% | trade count {total_trade}")    
        score = (0.15 * apr) + (0.3 * sharpe_ratio) - (max_absolute_drawdown * 0.5)
        profit = trade_df['realized_profit'].sum()
    except:
        score = 0
        total_trade=0
        profit=0
        max_absolute_drawdown=0
    return score, total_trade, profit ,max_absolute_drawdown # Negate since we want to maximize


def optimize_strategy_with_symbol_and_tf_by_PSO(sym, tf):
    global SYMBOL, config_list_long, config_list_short, risk_percent_of_balance, default, pct_open, LEVERAGE
    global kline_dict
    global floor, ceiling, spread_percentage, initial_money

    # ----------------- Your Logic ------------------
    SYMBOL = f"{sym}/USDT:USDT"
    SYMBOL_FILE_NAME = f"OKX_{sym}-USDT-SWAP"
    LOAD_SYMBOLS = [SYMBOL]
    INTERVAL = tf
    SMALL_INTERVAL = "5m"
    LOAD_INTERVALS = [INTERVAL]
    PCT_FEE = 0.0005
    LEVERAGE = 10
    ASSET = "USDT"
    PCT_SLIPPAGE_BACKTEST = None  # for market = None and for 0.01 = 1%
    PCT_SLIPPAGE_TOLERANCE_LIVE = 0.05 / 100
    # Common parameters
    pct_open = 0.15
    count_placed_order_max: int = 4


    ######## Fix Here ##########
    # Input parameters
    floor = 10_000  # Starting amount
    ceiling = 40_000  # Maximum amount
    spread_percentage = 1  # %spread (e.g., 10 for 10%)
    initial_money = 1000


    # Optimization Setup
    param_1_name = 'floor'
    param_1_lower_bound, param_1_upper_bound = 5, 30
    param_2_name = 'ceiling'
    param_2_lower_bound, param_2_upper_bound = 40, 70
    param_3_name = 'spread_percentage'
    param_3_lower_bound, param_3_upper_bound = 1, 20
    # param_4_name = 'initial_money'
    # param_4_lower_bound, param_4_upper_bound = 1, 5

    strategy_name = "KZM_Geometric_x_PSO"
    side = "LONG"

    ### Walk Forward Setup ###
    TEST_PERIOD_START = "2023-01-01"
    TEST_PERIOD_END = "2024-05-21"
    optimization_length_months = 5
    walk_forward_length_months = 5

    ######## Fix Here ##########
    # sup_m_1, sup_m_2, RSI_Period = params
    # Particle Swarm Setup : For Train Period
    bounds = [  (param_1_lower_bound, param_1_upper_bound),
                (param_2_lower_bound, param_2_upper_bound), 
                (param_3_lower_bound, param_3_upper_bound),
                # (param_4_lower_bound, param_4_upper_bound),
                # (param_5_lower_bound, param_5_upper_bound), 
                # (param_6_lower_bound, param_6_upper_bound),
                # (param_7_lower_bound, param_7_upper_bound),
                # (param_8_lower_bound, param_8_upper_bound), 
                # (param_9_lower_bound, param_9_upper_bound),
                # (param_10_lower_bound, param_10_upper_bound),
                ]  
    nv = len(bounds)  # number of optimize variable

    mm = 1  # minimize = -1 , Maximize = 1
    particle_size = 8  # num of particles
    iteration = 100  # max num of iteration

    w = 1  # Weight
    c1 = 2  # cognative
    c2 = 4  # social

    no_improvement_count = 0  # Counter for early stopping
    no_improvement_not_exceed = 3

    path = f"./0-Load data/binance-public-data/"
    kline_dict = {
        (f"{SYMBOL}", "4H"): read_binancekline(
            f"{path}/{SYMBOL_FILE_NAME}_4H.csv"),
        (f"{SYMBOL}", "1H"): read_binancekline(
            f"{path}/{SYMBOL_FILE_NAME}_1H.csv"),
        (f"{SYMBOL}", "30m"): read_binancekline(
            f"{path}/{SYMBOL_FILE_NAME}_30m.csv"),
        (f"{SYMBOL}", "15m"): read_binancekline(
            f"{path}/{SYMBOL_FILE_NAME}_15m.csv"),
        (f"{SYMBOL}", "5m"): read_binancekline(
            f"{path}/{SYMBOL_FILE_NAME}_5m.csv"),
    }

    num_periods = calculate_num_periods(start_year=TEST_PERIOD_START,
                                        end_year=TEST_PERIOD_END,
                                        optimization_length_months=optimization_length_months,
                                        walk_forward_length_months=walk_forward_length_months)

    walk_forward_sets = generate_walk_forward_sets(TEST_PERIOD_START,
                                                    optimization_length_months,
                                                    walk_forward_length_months,
                                                    num_periods, TEST_PERIOD_END)

    if mm == -1:
        initial_fitness = float("inf")
    if mm == 1:
        initial_fitness = -float("inf")

    
    #### FROM THIS IS PSO ####
    for sheet_num, walk_forward in tqdm(enumerate(walk_forward_sets)):

        TEST_PERIOD_START = walk_forward['optimization_period'][0]
        TEST_PERIOD_END = walk_forward['optimization_period'][1]

        # Run the Walk Forward Analysis
        fitness_global_best_particle_position = initial_fitness
        global_best_particle_position = []
        swarm_particle = []
        all_explore_position = []
        all_evaluate_result = []

        ### FIX HERE ###
        total_trade_list = []
        profit_list = []
        max_absolute_drawdown_list = []

        for i in range(particle_size):
            swarm_particle.append(Particle(bounds = bounds, initial_fitness = initial_fitness, nv = nv,  mm = mm,
                                           w = w,  c1 = c1,  c2 = c2))

        A = []
        for i in range(iteration):
            for j in range(particle_size):
                swarm_particle[j].evaluate(objective_function, 
                                           start=TEST_PERIOD_START, end=TEST_PERIOD_END,
                                           l_interval = INTERVAL, 
                                           s_interval = SMALL_INTERVAL)
                # log all evaluation
                all_explore_position.append(swarm_particle[j].particle_position)
                all_evaluate_result.append(swarm_particle[j].fitness_particle_position)

                ## Score Equation ##
                total_trade_list.append(swarm_particle[j].total_trade)     
                profit_list.append(swarm_particle[j].profit)     
                max_absolute_drawdown_list.append(swarm_particle[j].max_absolute_drawdown)     

                if mm == -1:
                    if swarm_particle[j].fitness_particle_position < fitness_global_best_particle_position:
                        global_best_particle_position = list(
                            swarm_particle[j].particle_position)  # update position
                        fitness_global_best_particle_position = float(
                            swarm_particle[j].fitness_particle_position)
                        no_improvement_count = 0  # Reset the counter
                    else:
                        no_improvement_count += 1

                if mm == 1:
                    if swarm_particle[j].fitness_particle_position > fitness_global_best_particle_position:
                        global_best_particle_position = list(
                            swarm_particle[j].particle_position)  # update position
                        fitness_global_best_particle_position = float(
                            swarm_particle[j].fitness_particle_position)
                        no_improvement_count = 0  # Reset the counter
                    else:
                        no_improvement_count += 1

            if no_improvement_count >= no_improvement_not_exceed:
                print(f"Stopping early at iteration {i} due to no improvement in global best for 3 iterations.")
                break

            for j in range(particle_size):
                swarm_particle[j].update_velocity(global_best_particle_position)
                swarm_particle[j].update_position(bounds)

        
        ### LOG THE OPTIMIZATION HERE ###
        optimization_file_path = f'./data/strategy_optimization/{strategy_name}/{side}/{sym}_{tf}_sheet_{sheet_num + 1}'
        writer = pd.ExcelWriter(f"{optimization_file_path}.xlsx", engine="xlsxwriter")
        log_df = pd.DataFrame(all_explore_position)

        ######### FIX HERE ###########
        log_df = log_df.rename(columns={0: f'{param_1_name}',
                                        1: f'{param_2_name}',
                                        2: f'{param_3_name}', 
                                        # 3: f'{param_4_name}',
                                        # 4: f'{param_5_name}',
                                        # 5: f'{param_6_name}', 
                                        # 6: f'{param_7_name}',
                                        # 7: f'{param_8_name}',
                                        # 8: f'{param_9_name}', 
                                        # 9: f'{param_10_name}',

                                        })  # Rename columns of 4H data
        
        
        log_df['total_trade'] = total_trade_list
        log_df['norm_total_trade'] = (log_df['total_trade'] - log_df['total_trade'].min()) / (log_df['total_trade'].max() - log_df['total_trade'].min())

        log_df['profit'] = profit_list
        log_df['norm_profit'] = (log_df['profit'] - log_df['profit'].min()) / (log_df['profit'].max() - log_df['profit'].min())
        
        log_df['max_absolute_drawdown'] = max_absolute_drawdown_list
        log_df['norm_max_dd'] = (log_df['max_absolute_drawdown'] - log_df['max_absolute_drawdown'].min()) / (log_df['max_absolute_drawdown'].max() - log_df['max_absolute_drawdown'].min())

        log_df['rank_score'] = all_evaluate_result
        log_df['norm_rank_score'] = log_df['norm_total_trade'] + log_df['norm_profit'] - log_df['norm_max_dd']

        log_df = log_df.sort_values(by='rank_score', ascending=False)
        log_df.to_excel(writer, sheet_name=f'sheet_{sheet_num + 1}')
        writer.close()
        ### End of Optimization ###

        ### Start walk forward ###
        TEST_PERIOD_START = walk_forward['walk_forward_period'][0]
        TEST_PERIOD_END = walk_forward['walk_forward_period'][1]

        excel_df = pd.read_excel(f"{optimization_file_path}.xlsx",
                                    f'sheet_{sheet_num + 1}')

        ######### Fix HERE #############
        rank_1_in_the_swarn = excel_df[[f'{param_1_name}', 
                                        f'{param_2_name}', 
                                        f'{param_3_name}',
                                        # f'{param_4_name}',
                                        # f'{param_5_name}', 
                                        # f'{param_6_name}',
                                        # f'{param_7_name}', 
                                        # f'{param_8_name}', 
                                        # f'{param_9_name}',
                                        # f'{param_10_name}'
                                        ]].iloc[0].tolist()

        ### WALK FORWARD ###
        score, total_trade, profit, max_absolute_drawdown = objective_function(rank_1_in_the_swarn,
                                                                                    start=TEST_PERIOD_START, end=TEST_PERIOD_END,
                                                                                    l_interval = INTERVAL, 
                                                                                    s_interval = SMALL_INTERVAL)
        
        ######### Fix HERE #############
        walk_log_data = {
            'SYMBOL': [SYMBOL],
            'TF': [INTERVAL],
            'START': [TEST_PERIOD_START],
            'END': [TEST_PERIOD_END],
            'Test_Name': ["walk-forward-profile"],
            f'{param_1_name}': [rank_1_in_the_swarn[0]],
            f'{param_2_name}': [rank_1_in_the_swarn[1]],
            f'{param_3_name}': [rank_1_in_the_swarn[2]],
            # f'{param_4_name}': [rank_1_in_the_swarn[3]],
            # f'{param_5_name}': [rank_1_in_the_swarn[4]],
            # f'{param_6_name}': [rank_1_in_the_swarn[5]],
            # f'{param_7_name}': [rank_1_in_the_swarn[6]],
            # f'{param_8_name}': [rank_1_in_the_swarn[7]],
            # f'{param_9_name}': [rank_1_in_the_swarn[8]],
            # f'{param_10_name}': [rank_1_in_the_swarn[9]],
            'trade_count' : [total_trade],
            'max_ab_dd' : [max_absolute_drawdown],
            'rank_score': [score],
            'profit': [profit],
        }
        walk_log_file = f'./data/strategy_optimization/{strategy_name}/{side}/walk_forward_profile.xlsx'
        try:
            df = pd.read_excel(walk_log_file)
            update_df = pd.DataFrame(walk_log_data)
            df = pd.concat([df, update_df])
            df.to_excel(f'{walk_log_file}', index=False, )
        except:
            log_df = pd.DataFrame(walk_log_data)
            log_df.to_excel(f'{walk_log_file}', index=False, )