In [None]:
import numpy as np
import pandas as pd

# ------------------ PARAMETERS ------------------ #
initial_investment = 1000
transaction_cost = 0.005  # e.g., 0.1% per share
slippage = 0.001          # 0.1%
atr_period = 30           # ATR lookback period
base_atr_multiplier = 1.5 # Base multiplier for dynamic trailing stop

# Scaling out base parameters
base_scaling_threshold = 0.05  # 5% profit threshold for partial exit
base_scaling_ratio = 0.5       # Sell 50% of shares when threshold is hit

DEBUG = False  # Toggle debug output

# ------------------ WALK-FORWARD SETTINGS ------------------ #
window_test = 252   # e.g., 252 trading days (~1 year)
step = window_test  # Rolling step size

# ------------------ HELPER FUNCTIONS ------------------ #

def calculate_atr(df, atr_period):
    """Calculate the Average True Range (ATR) for the dataset."""
    df['Previous_Close'] = df['TQQQ Price'].shift(1)
    df['TR'] = df.apply(lambda row: max(
        row['TQQQ High'] - row['TQQQ Low'],
        abs(row['TQQQ High'] - row['Previous_Close']) if pd.notna(row['Previous_Close']) else 0,
        abs(row['TQQQ Low'] - row['Previous_Close']) if pd.notna(row['Previous_Close']) else 0
    ), axis=1)
    df['ATR'] = df['TR'].rolling(window=atr_period, min_periods=1).mean()
    return df

def select_adaptive_parameters(segment, global_median_atr,
                               base_atr_multiplier=base_atr_multiplier,
                               base_scaling_threshold=base_scaling_threshold,
                               base_scaling_ratio=base_scaling_ratio):
    """
    Returns adaptive parameters based on the segment's average ATR.
    In this simple example, if the segment's avg ATR is above the global median,
    we increase the atr_multiplier by 20% and the scaling threshold by 10%.
    """
    segment_avg_atr = segment['ATR'].mean()
    if segment_avg_atr > global_median_atr:
        adaptive_atr_multiplier = base_atr_multiplier * 1.2
        adaptive_scaling_threshold = base_scaling_threshold * 1.1
        adaptive_scaling_ratio = base_scaling_ratio  # No change here, but could be adapted.
    else:
        adaptive_atr_multiplier = base_atr_multiplier
        adaptive_scaling_threshold = base_scaling_threshold
        adaptive_scaling_ratio = base_scaling_ratio
    if DEBUG:
        print(f"Adaptive parameters: ATR Multiplier={adaptive_atr_multiplier}, "
              f"Scaling Threshold={adaptive_scaling_threshold}, "
              f"Scaling Ratio={adaptive_scaling_ratio}")
    return adaptive_atr_multiplier, adaptive_scaling_threshold, adaptive_scaling_ratio

def run_segment(segment, starting_value, ma, global_median_atr):
    """
    Run the trading simulation on one segment (walk-forward window) using a given moving average.
    Returns the updated segment and the ending portfolio value.
    Adaptive parameters are selected based on segment volatility.
    """
    # Set starting portfolio value.
    segment.at[segment.index[0], 'Portfolio_Value'] = starting_value
    # Initialize required columns.
    segment['Signal'] = 0           # 1 = Buy, -1 = Sell, 0 = Hold
    segment['Trade_Price'] = np.nan # Entry/Exit Price
    segment['Trailing_Stop'] = np.nan

    # Select adaptive parameters for this segment.
    adaptive_atr_multiplier, adaptive_scaling_threshold, adaptive_scaling_ratio = \
        select_adaptive_parameters(segment, global_median_atr)

    # Initialize state variables.
    in_trade = False
    pending_buy = False
    pending_buy_idx = None
    pending_exit = False
    pending_exit_idx = None
    trade_price = 0
    shares_held = 0
    trailing_stop = 0
    entry_date = None
    partial_exit_done = False
    cash_on_hand = 0
    hold_period = 3  # Days to hold before considering MA exit.

    for i in range(len(segment)):
        current_date = segment['Date'].iloc[i]
        current_open = segment['TQQQ Open'].iloc[i]
        current_close = segment['TQQQ Price'].iloc[i]
        current_low = segment['TQQQ Low'].iloc[i]
        current_high = segment['TQQQ High'].iloc[i]
        current_atr = segment['ATR'].iloc[i]

        if DEBUG:
            print("\n---")
            print(f"Index: {i}, Date: {current_date}")
            print(f"Open: {current_open}, Close: {current_close}, Low: {current_low}, High: {current_high}")
            print(f"ATR: {current_atr}")

        # ----- Execute Pending Buy Order -----
        if pending_buy:
            current_notional = segment.at[segment.index[i-1], 'Portfolio_Value'] if i > 0 else starting_value
            trade_price = current_open + transaction_cost + (current_open * slippage)
            shares_held = current_notional / trade_price
            trailing_stop = trade_price - (adaptive_atr_multiplier * current_atr)
            partial_exit_done = False
            cash_on_hand = 0
            entry_date = segment['Date'].iloc[pending_buy_idx]
            segment.at[segment.index[i], 'Signal'] = 1
            segment.at[segment.index[i], 'Trade_Price'] = trade_price
            in_trade = True
            pending_buy = False
            if DEBUG:
                print(f"BUY EXECUTED: Trade Price = {trade_price}, Shares Held = {shares_held}, "
                      f"Trailing Stop = {trailing_stop}")

        # ----- Execute Pending Exit Order -----
        if pending_exit:
            exit_price = current_open - transaction_cost - (current_open * slippage)
            trade_return = (exit_price - trade_price) / trade_price
            segment.at[segment.index[i], 'Signal'] = -1
            segment.at[segment.index[i], 'Trade_Price'] = exit_price
            segment.at[segment.index[i], 'Trade_Return'] = trade_return
            final_value = cash_on_hand + (shares_held * exit_price)
            segment.at[segment.index[i], 'Portfolio_Value'] = final_value
            if DEBUG:
                print(f"SELL EXECUTED: Exit Price = {exit_price}, Trade Return = {trade_return}, "
                      f"Final Value = {final_value}")
            shares_held = 0
            cash_on_hand = 0
            in_trade = False
            pending_exit = False

        # ----- Signal Generation -----
        # For example: if 'Nasdaq Open' > MA value then generate buy signal.
        if (not in_trade) and (not pending_buy):
            if segment['Nasdaq Open'].iloc[i] > segment[ma].iloc[i]:
                pending_buy = True
                pending_buy_idx = i
                if DEBUG:
                    print(f"BUY SIGNAL GENERATED at {current_date} for {ma}")

        # ----- In-Trade Processing -----
        if in_trade:
            # Update dynamic trailing stop.
            trailing_stop = max(trailing_stop, current_high - (adaptive_atr_multiplier * current_atr))
            segment.at[segment.index[i], 'Trailing_Stop'] = trailing_stop

            # Scaling out: if profit exceeds threshold, sell a portion.
            if not partial_exit_done:
                current_gain = (current_close - trade_price) / trade_price
                if current_gain >= adaptive_scaling_threshold:
                    shares_to_sell = adaptive_scaling_ratio * shares_held
                    realized_value = shares_to_sell * current_close
                    cash_on_hand += realized_value
                    shares_held -= shares_to_sell
                    partial_exit_done = True
                    if DEBUG:
                        print(f"Partial exit triggered at gain={current_gain*100:.2f}%. Sold {shares_to_sell} shares. "
                              f"Cash now = {cash_on_hand}")

            # Mark-to-market: update portfolio value.
            segment.at[segment.index[i], 'Portfolio_Value'] = cash_on_hand + (shares_held * current_close)

            # Exit conditions.
            if current_low < trailing_stop and not pending_exit:
                pending_exit = True
                pending_exit_idx = i
                if DEBUG:
                    print(f"STOP-LOSS condition met on {current_date}")
            elif (entry_date is not None and 
                  (segment['Date'].iloc[i] - entry_date).days >= hold_period and 
                  current_close < segment[ma].iloc[i] * 0.98 and not pending_exit):
                pending_exit = True
                pending_exit_idx = i
                if DEBUG:
                    print(f"MA SELL condition met on {current_date}")
        else:
            # If not in trade, carry forward previous portfolio value.
            segment.at[segment.index[i], 'Portfolio_Value'] = (segment.at[segment.index[i-1], 'Portfolio_Value']
                                                                if i > 0 else starting_value)
    ending_value = segment['Portfolio_Value'].iloc[-1]
    return segment, ending_value

def walk_forward_backtest(df, ma_list):
    """
    Perform walk-forward backtesting for each moving average in ma_list.
    Adaptive parameters are selected for each segment based on its volatility.
    Returns a dictionary with results keyed by moving average.
    """
    results = {}
    df_copy = df.copy()
    df_copy.sort_values(by='Date', inplace=True)
    df_copy = calculate_atr(df_copy, atr_period)
    # Compute the global median ATR over the entire dataset.
    global_median_atr = df_copy['ATR'].median()

    for ma in ma_list:
        overall_results_list = []
        current_portfolio_value = initial_investment

        # Walk-forward loop: split the dataset into segments.
        for start in range(0, len(df_copy), step):
            end = start + window_test
            segment = df_copy.iloc[start:end].copy()
            if len(segment) == 0:
                break

            seg_result, ending_value = run_segment(segment, current_portfolio_value, ma, global_median_atr)
            current_portfolio_value = ending_value  # Propagate to next segment.
            overall_results_list.append(seg_result)

        overall_results = pd.concat(overall_results_list)
        results[ma] = overall_results
    return results

# ------------------ MAIN EXECUTION ------------------ #
ma_list = ['MA_10', 'MA_20', 'MA_50', 'MA_100', 'MA_200']
results = walk_forward_backtest(df, ma_list)
print("Walk-forward simulation with adaptive parameters completed successfully.")


In [None]:
import numpy as np
import pandas as pd

# ------------------ PARAMETERS ------------------ #
initial_investment = 1000
transaction_cost = 0.005  # e.g., 0.1% per share
slippage = 0.001          # 0.1%
atr_period = 30           # ATR lookback period
atr_multiplier = 1.5      # Multiplier for dynamic trailing stop

# Scaling out parameters
scaling_threshold = 0.05  # 5% profit threshold for partial exit
scaling_ratio = 0.5       # Sell 50% of shares when threshold is hit

# Walk-Forward Settings
window_test = 252  
step = window_test  # Rolling step size

DEBUG = False  # Toggle debug output


# ------------------ HELPER FUNCTIONS ------------------ #

def calculate_atr(df, atr_period):
    """Calculate the Average True Range (ATR) and return the DataFrame."""
    df['Previous_Close'] = df['TQQQ Price'].shift(1)
    df['TR'] = df.apply(lambda row: max(
        row['TQQQ High'] - row['TQQQ Low'],
        abs(row['TQQQ High'] - row['Previous_Close']) if pd.notna(row['Previous_Close']) else 0,
        abs(row['TQQQ Low'] - row['Previous_Close']) if pd.notna(row['Previous_Close']) else 0
    ), axis=1)
    df['ATR'] = df['TR'].rolling(window=atr_period, min_periods=1).mean()
    return df

def execute_buy(segment, i, current_notional):
    """Handles buying logic and returns updated trade state."""
    trade_price = segment.at[segment.index[i], 'TQQQ Open'] + transaction_cost + (segment.at[segment.index[i], 'TQQQ Open'] * slippage)
    shares_held = current_notional / trade_price
    trailing_stop = trade_price - (atr_multiplier * segment.at[segment.index[i], 'ATR'])

    return trade_price, shares_held, trailing_stop

def execute_sell(segment, i, shares_held, cash_on_hand):
    """Handles selling logic and returns the final value."""
    exit_price = segment.at[segment.index[i], 'TQQQ Open'] - transaction_cost - (segment.at[segment.index[i], 'TQQQ Open'] * slippage)
    trade_return = (exit_price - trade_price) / trade_price
    final_value = cash_on_hand + (shares_held * exit_price)
    return exit_price, trade_return, final_value

def walk_forward_backtest(df, ma_list):
    """Runs the walk-forward analysis for a list of moving averages."""
    results = {}

    for ma in ma_list:
        df_copy = df.copy()
        df_copy = calculate_atr(df_copy, atr_period)  # Precompute ATR

        overall_results_list = []
        current_portfolio_value = initial_investment

        # Walk-forward loop
        for start in range(0, len(df_copy), step):
            end = start + window_test
            segment = df_copy.iloc[start:end].copy()
            if len(segment) == 0:
                break

            segment.at[segment.index[0], 'Portfolio_Value'] = current_portfolio_value
            segment['Signal'], segment['Trade_Price'], segment['Trailing_Stop'] = 0, np.nan, np.nan

            # Strategy state variables
            in_trade, pending_buy, pending_exit = False, False, False
            pending_buy_idx, pending_exit_idx = None, None
            trade_price, shares_held, trailing_stop, entry_date = 0, 0, 0, None
            partial_exit_done, cash_on_hand = False, 0

            for i in range(len(segment)):
                current_open, current_close = segment.at[segment.index[i], 'TQQQ Open'], segment.at[segment.index[i], 'TQQQ Price']
                current_low, current_high, current_atr = segment.at[segment.index[i], 'TQQQ Low'], segment.at[segment.index[i], 'TQQQ High'], segment.at[segment.index[i], 'ATR']

                # Buy execution
                if pending_buy:
                    trade_price, shares_held, trailing_stop = execute_buy(segment, i, current_portfolio_value)
                    segment.at[segment.index[i], 'Signal'] = 1
                    segment.at[segment.index[i], 'Trade_Price'] = trade_price
                    in_trade, pending_buy = True, False

                # Sell execution
                if pending_exit:
                    exit_price, trade_return, final_value = execute_sell(segment, i, shares_held, cash_on_hand)
                    segment.at[segment.index[i], 'Signal'] = -1
                    segment.at[segment.index[i], 'Trade_Price'] = exit_price
                    segment.at[segment.index[i], 'Trade_Return'] = trade_return
                    segment.at[segment.index[i], 'Portfolio_Value'] = final_value
                    shares_held, cash_on_hand, in_trade, pending_exit = 0, 0, False, False

                # Signal generation
                if not in_trade and not pending_buy and segment.at[segment.index[i], 'Nasdaq Open'] > segment.at[segment.index[i], ma]:
                    pending_buy, pending_buy_idx = True, i

                # In-Trade Processing
                if in_trade:
                    trailing_stop = max(trailing_stop, current_high - (atr_multiplier * current_atr))
                    segment.at[segment.index[i], 'Trailing_Stop'] = trailing_stop

                    # Scaling out
                    if not partial_exit_done and (current_close - trade_price) / trade_price >= scaling_threshold:
                        shares_to_sell = scaling_ratio * shares_held
                        cash_on_hand += shares_to_sell * current_close
                        shares_held -= shares_to_sell
                        partial_exit_done = True

                    # Mark-to-market
                    segment.at[segment.index[i], 'Portfolio_Value'] = cash_on_hand + (shares_held * current_close)

                    # Exit conditions
                    if current_low < trailing_stop:
                        pending_exit, pending_exit_idx = True, i
                    elif entry_date is not None and (segment.at[segment.index[i], 'Date'] - entry_date).days >= 3 and current_close < segment.at[segment.index[i], ma] * 0.98:
                        pending_exit, pending_exit_idx = True, i
                else:
                    segment.at[segment.index[i], 'Portfolio_Value'] = segment.at[segment.index[i-1], 'Portfolio_Value'] if i > 0 else current_portfolio_value

            # Update portfolio for next window
            current_portfolio_value = segment['Portfolio_Value'].iloc[-1]
            overall_results_list.append(segment)

        # Store results
        results[ma] = pd.concat(overall_results_list)

    return results

# ------------------ EXECUTE BACKTEST ------------------ #
ma_list = ['MA_10', 'MA_20', 'MA_50', 'MA_100', 'MA_200']
results = walk_forward_backtest(df, ma_list)
print("Walk-forward simulation completed successfully.")
