<a href="https://colab.research.google.com/github/Mahi-611/Technical-Indicators-Project/blob/main/prob_signal_with_profit_based_evaluation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# technical indicators with profit based evaluation
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score
from datetime import timedelta
from collections import Counter # New import for calculating probable signal

# --- Common Utility Functions ---

def load_and_clean_data(file_path, date_column='Date', price_column='Price',
                        volume_column='Vol.', change_percent_column='Change %',
                        open_column=None, high_column=None, low_column=None):
    """
    Loads the dataset and performs initial cleaning, with flexible column mapping.
    Handles various data types for price, volume, and change percentage.
    Ensures Date and Price columns are correctly formatted and non-null.
    """
    try:
        df = pd.read_csv(file_path)
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
        return None
    except Exception as e:
        print(f"Error loading file: {e}")
        return None

    # Create a mapping for standard column names
    column_mapping = {
        date_column: 'Date',
        price_column: 'Price',
        volume_column: 'Vol.',
        change_percent_column: 'Change %'
    }
    # Add OHLC columns if provided
    if open_column: column_mapping[open_column] = 'Open'
    if high_column: column_mapping[high_column] = 'High'
    if low_column: column_mapping[low_column] = 'Low'

    # Only map columns that exist in the DataFrame
    existing_columns_to_map = {k: v for k, v in column_mapping.items() if k in df.columns}
    df.rename(columns=existing_columns_to_map, inplace=True)

    # Basic checks for essential columns after renaming
    if 'Date' not in df.columns:
        print(f"Error: Date column '{date_column}' (mapped to 'Date') not found in the dataset. Exiting.")
        return None
    if 'Price' not in df.columns:
        print(f"Error: Price column '{price_column}' (mapped to 'Price') not found in the dataset. Exiting.")
        return None

    # Convert 'Price' column to numeric, handling commas if present, and coercing errors
    df['Price'] = pd.to_numeric(df['Price'].astype(str).str.replace(',', '', regex=False), errors='coerce')

    # Handle 'Vol.' column if it exists
    if 'Vol.' in df.columns:
        df['Vol.'] = df['Vol.'].astype(str).str.replace(',', '', regex=False).str.replace('M', 'e6', regex=False).str.replace('K', 'e3', regex=False)
        df['Vol.'] = pd.to_numeric(df['Vol.'], errors='coerce')

    # Handle 'Change %' column if it exists
    if 'Change %' in df.columns:
        df['Change %'] = df['Change %'].astype(str).str.replace('%', '', regex=False)
        df['Change %'] = pd.to_numeric(df['Change %'], errors='coerce') / 100

    # Convert 'Date' column to datetime objects (flexible format inference)
    df['Date'] = pd.to_datetime(df['Date'], errors='coerce')

    # Drop rows with NaN values in essential columns ('Date', 'Price') after conversion
    df_cleaned = df.dropna(subset=['Date', 'Price']).copy()

    # Ensure the DataFrame is sorted chronologically by Date
    df_cleaned = df_cleaned.sort_values(by='Date').reset_index(drop=True)

    if df_cleaned.empty:
        print("Warning: DataFrame is empty after initial cleaning. Cannot proceed with analysis.")
        return None

    return df_cleaned

def calculate_true_action(df_temp, buy_threshold, sell_threshold, price_col='Price'):
    """Calculates the True_Action based on Next_Day_Return and given thresholds."""
    if price_col not in df_temp.columns:
        print(f"Error: Price column '{price_col}' not found for True Action calculation.")
        df_temp['True_Action'] = None
        return df_temp

    df_temp['Future_Price'] = df_temp[price_col].shift(-1)
    df_temp['Next_Day_Return'] = ((df_temp['Future_Price'] - df_temp[price_col]) / df_temp[price_col]) * 100

    def true_action_logic(row):
        if pd.isna(row['Next_Day_Return']):
            return None
        if row['Next_Day_Return'] > buy_threshold:
            return 'Buy'
        elif row['Next_Day_Return'] < sell_threshold:
            return 'Sell'
        else:
            return 'Hold'
    df_temp['True_Action'] = df_temp.apply(true_action_logic, axis=1)
    return df_temp

def get_train_test_split(df_data, test_samples_count=None, test_ratio=0.2):
    """Splits data into train and test sets based on the latest N samples or a ratio."""
    if test_samples_count is not None:
        if len(df_data) < test_samples_count:
            print(f"Warning: Dataset size ({len(df_data)}) is less than requested test samples ({test_samples_count}). Using entire dataset as test set.")
            return pd.DataFrame(), df_data.copy()
        train_df = df_data.iloc[:-test_samples_count].copy()
        test_df = df_data.iloc[-test_samples_count:].copy()
    else: # Default to ratio if samples_count is None
        split_index = int(len(df_data) * (1 - test_ratio))
        train_df = df_data.iloc[:split_index].copy()
        test_df = df_data.iloc[split_index:].copy()
    return train_df, test_df

def run_indicator_optimization(df_base, indicator_type, param_ranges, signal_calculation_func,
                               fixed_buy_threshold, fixed_sell_threshold, test_ratio=0.2, price_col='Price'):
    """
    Optimizes indicator parameters and returns best accuracy and parameters.
    `signal_calculation_func` is a function that takes (df, params, price_col) and returns df with 'Signal' column.
    """
    best_accuracy = -1
    best_params = {}

    # Calculate True_Action once for the base DataFrame for all iterations
    df_base_with_true_action = calculate_true_action(df_base.copy(), fixed_buy_threshold, fixed_sell_threshold, price_col)

    print(f"\n--- Optimizing {indicator_type} ---")
    param_keys = list(param_ranges.keys())

    # Recursive function to iterate through all parameter combinations
    def optimize_recursive(current_params, param_idx):
        nonlocal best_accuracy, best_params

        if param_idx == len(param_keys):
            df_temp = df_base_with_true_action.copy()

            # Calculate indicator and signal using the specific function and current parameters
            df_temp = signal_calculation_func(df_temp, current_params, price_col)

            # Drop NaNs for accuracy calculation, especially those from indicator and True_Action
            # The signal column name is dynamically generated
            signal_col_name = f'{indicator_type}_Signal'
            df_cleaned_for_accuracy = df_temp.dropna(subset=['True_Action', signal_col_name])

            if len(df_cleaned_for_accuracy) == 0:
                return # Skip if no valid data for accuracy calculation

            _, test_df = get_train_test_split(df_cleaned_for_accuracy, test_samples_count=None, test_ratio=test_ratio)

            if not test_df.empty:
                # Ensure the signal column is present and not entirely NaN in the test_df
                if signal_col_name in test_df.columns and test_df[signal_col_name].notna().any():
                    accuracy = accuracy_score(test_df['True_Action'], test_df[signal_col_name])

                    if accuracy > best_accuracy:
                        best_accuracy = accuracy
                        best_params = current_params.copy()
                        best_params['buy_threshold_for_True_Action'] = fixed_buy_threshold
                        best_params['sell_threshold_for_True_Action'] = fixed_sell_threshold
            return

        param_name = param_keys[param_idx]
        for value in param_ranges[param_name]:
            current_params[param_name] = value
            optimize_recursive(current_params, param_idx + 1)

    optimize_recursive({}, 0)

    return best_accuracy, best_params

# --- Indicator Specific Calculation Functions ---

def calculate_bollinger_signal(df_in, params, price_col='Price'):
    """
    Calculates Bollinger Bands and generates signals based on price crossovers.
    'Buy' when price crosses below Lower Band and then crosses back above Lower Band.
    'Sell' when price crosses above Upper Band and then crosses back below Upper Band.
    """
    df = df_in.copy()
    if price_col not in df.columns:
        print(f"Error: Price column '{price_col}' not found for Bollinger Bands calculation.")
        df['Bollinger_Signal'] = None
        return df

    window = params['window']
    num_std_dev = params['num_std_dev']

    df['Middle_Band'] = df[price_col].rolling(window=window).mean()
    df['Std_Dev'] = df[price_col].rolling(window=window).std()
    df['Upper_Band'] = df['Middle_Band'] + (df['Std_Dev'] * num_std_dev)
    df['Lower_Band'] = df['Middle_Band'] - (df['Std_Dev'] * num_std_dev)

    df['Bollinger_Signal'] = 'Hold'

    # Get previous day's price and band values for crossover detection
    df['Price_Prev'] = df[price_col].shift(1)
    df['Upper_Band_Prev'] = df['Upper_Band'].shift(1)
    df['Lower_Band_Prev'] = df['Lower_Band'].shift(1)

    # Ensure all necessary columns for crossover detection are not NaN
    valid_crossover_conditions = (
        df['Price'].notna() & df['Price_Prev'].notna() &
        df['Upper_Band'].notna() & df['Upper_Band_Prev'].notna() &
        df['Lower_Band'].notna() & df['Lower_Band_Prev'].notna()
    )

    # Buy signal: Price crosses UP through the Lower Band
    df.loc[valid_crossover_conditions & \
           (df['Price_Prev'] <= df['Lower_Band_Prev']) & # Price was at or below lower band
           (df['Price'] > df['Lower_Band']),            # Price is now above lower band
           'Bollinger_Signal'] = 'Buy'

    # Sell signal: Price crosses DOWN through the Upper Band
    df.loc[valid_crossover_conditions & \
           (df['Price_Prev'] >= df['Upper_Band_Prev']) & # Price was at or above upper band
           (df['Price'] < df['Upper_Band']),            # Price is now below upper band
           'Bollinger_Signal'] = 'Sell'

    # Clean up temporary columns
    df = df.drop(columns=['Price_Prev', 'Upper_Band_Prev', 'Lower_Band_Prev'], errors='ignore')

    return df

def calculate_macd_signal(df_in, params, price_col='Price'):
    """
    Calculates MACD and generates trading signals.
    'Buy' when MACD crosses above Signal Line, 'Sell' when MACD crosses below Signal Line.
    """
    df = df_in.copy()
    if price_col not in df.columns:
        print(f"Error: Price column '{price_col}' not found for MACD calculation.")
        df['MACD_Signal'] = None
        return df

    df = df.reset_index(drop=True)

    fast_period = params['fast_period']
    slow_period = params['slow_period']
    signal_period = params['signal_period']

    df['EMA_Fast'] = df[price_col].ewm(span=fast_period, adjust=False).mean()
    df['EMA_Slow'] = df[price_col].ewm(span=slow_period, adjust=False).mean()
    df['MACD'] = df['EMA_Fast'] - df['EMA_Slow']
    df['Signal_Line'] = df['MACD'].ewm(span=signal_period, adjust=False).mean()

    df['MACD_Signal'] = 'Hold'
    valid_macd_signals = df['MACD'].notna() & df['Signal_Line'].notna()

    # Buy signal: MACD crosses above Signal Line
    df.loc[valid_macd_signals & (df['MACD'].shift(1) < df['Signal_Line'].shift(1)) & \
           (df['MACD'] > df['Signal_Line']), 'MACD_Signal'] = 'Buy'
    # Sell signal: MACD crosses below Signal Line
    df.loc[valid_macd_signals & (df['MACD'].shift(1) > df['Signal_Line'].shift(1)) & \
           (df['MACD'] < df['Signal_Line']), 'MACD_Signal'] = 'Sell'
    return df

def calculate_rsi_signal(df_in, params, price_col='Price'):
    """
    Calculates RSI and generates trading signals.
    'Buy' when RSI crosses below oversold, 'Sell' when RSI crosses above overbought.
    """
    df = df_in.copy()
    if price_col not in df.columns:
        print(f"Error: Price column '{price_col}' not found for RSI calculation.")
        df['RSI_Signal'] = None
        return df

    window = params['window']
    oversold_threshold = params['oversold_threshold']
    overbought_threshold = params['overbought_threshold']

    df[price_col] = pd.to_numeric(df[price_col], errors='coerce')
    df = df.dropna(subset=[price_col])

    delta = df[price_col].diff()
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)

    avg_gain = gain.ewm(span=window, adjust=False).mean()
    avg_loss = loss.ewm(span=window, adjust=False).mean()

    rs = np.where(avg_loss == 0, np.inf, avg_gain / avg_loss)
    rs = pd.Series(rs, index=df.index)  # Convert numpy array back to Series to align with df index
    rs = rs.replace([np.inf, -np.inf], np.nan)
    rs = rs.fillna(0)  # Fill NaN (e.g., from 0/0 or initial periods) with 0.
    df['RSI'] = 100 - (100 / (1 + rs))

    df['RSI_Signal'] = 'Hold'
    valid_rsi_signals = df['RSI'].notna()

    # Buy signal: RSI crosses above oversold threshold (from below)
    df.loc[valid_rsi_signals & (df['RSI'].shift(1) < oversold_threshold) & \
           (df['RSI'] > oversold_threshold), 'RSI_Signal'] = 'Buy'
    # Sell signal: RSI crosses below overbought threshold (from above)
    df.loc[valid_rsi_signals & (df['RSI'].shift(1) > overbought_threshold) & \
           (df['RSI'] < overbought_threshold), 'RSI_Signal'] = 'Sell'
    return df

def calculate_sma_signal(df_in, params, price_col='Price'):
    """
    Calculates Dual SMA and generates signals based on their crossover.
    'Buy' when the fast SMA crosses above the slow SMA.
    'Sell' when the fast SMA crosses below the slow SMA.
    """
    df = df_in.copy()
    if price_col not in df.columns:
        print(f"Error: Price column '{price_col}' not found for SMA calculation.")
        df['SMA_Signal'] = None
        return df

    fast_window = params['fast_window']
    slow_window = params['slow_window']

    df['SMA_Fast'] = df[price_col].rolling(window=fast_window).mean()
    df['SMA_Slow'] = df[price_col].rolling(window=slow_window).mean()

    df['SMA_Signal'] = 'Hold'
    valid_crossover = df['SMA_Fast'].notna() & df['SMA_Slow'].notna()

    # Buy signal: Fast SMA crosses above Slow SMA
    df.loc[valid_crossover & (df['SMA_Fast'].shift(1) < df['SMA_Slow'].shift(1)) & \
           (df['SMA_Fast'] > df['SMA_Slow']), 'SMA_Signal'] = 'Buy'
    # Sell signal: Fast SMA crosses below Slow SMA
    df.loc[valid_crossover & (df['SMA_Fast'].shift(1) > df['SMA_Slow'].shift(1)) & \
           (df['SMA_Fast'] < df['SMA_Slow']), 'SMA_Signal'] = 'Sell'

    return df

def calculate_ema_signal(df_in, params, price_col='Price'):
    """
    Calculates Dual EMA and generates signals based on their crossover.
    'Buy' when the fast EMA crosses above the slow EMA.
    'Sell' when the fast EMA crosses below the slow EMA.
    """
    df = df_in.copy()
    if price_col not in df.columns:
        print(f"Error: Price column '{price_col}' not found for EMA calculation.")
        df['EMA_Signal'] = None
        return df

    fast_window = params['fast_window']
    slow_window = params['slow_window']

    df['EMA_Fast'] = df[price_col].ewm(span=fast_window, adjust=False).mean()
    df['EMA_Slow'] = df[price_col].ewm(span=slow_window, adjust=False).mean()

    df['EMA_Signal'] = 'Hold'
    valid_crossover = df['EMA_Fast'].notna() & df['EMA_Slow'].notna()

    # Buy signal: Fast EMA crosses above Slow EMA
    df.loc[valid_crossover & (df['EMA_Fast'].shift(1) < df['EMA_Slow'].shift(1)) & \
           (df['EMA_Fast'] > df['EMA_Slow']), 'EMA_Signal'] = 'Buy'
    # Sell signal: Fast EMA crosses below Slow SMA
    df.loc[valid_crossover & (df['EMA_Fast'].shift(1) > df['EMA_Slow'].shift(1)) & \
           (df['EMA_Fast'] < df['EMA_Slow']), 'EMA_Signal'] = 'Sell'

    return df

# --- Profit Simulation Function ---
def simulate_trading_returns(df: pd.DataFrame, predicted_col: str, price_col: str, stop_loss: float = 0.05) -> float:
    """
    Simulates trading based on predicted signals and calculates the profit rate.

    Returns:
        float: The final profit rate.
    """
    # Map string labels to numerical labels as per the algorithm: Buy=1, Sell=-1, Hold=0
    signal_map = {'Buy': 1, 'Sell': -1, 'Hold': 0}
    y = df[predicted_col].map(signal_map).fillna(0).values
    C = df[price_col].values
    N = len(y)

    init_fund = 100000.0
    fund = init_fund
    quantity = 0
    i = 0

    # We need prices at i-1, so we start the loop from i=1
    if i == 0:
        i = 1

    while i < N:
        # Check for BUY signal
        if y[i] == 1 and fund > 0:
            # Buy at the previous day's closing price C[i-1]
            buy_price = C[i-1]
            if buy_price > 0:
                # Buy as many shares as possible
                quantity_to_buy = fund / buy_price
                cost = quantity_to_buy * buy_price

                # Update fund and quantity
                fund -= cost
                quantity += quantity_to_buy

                # Move to find a SELL signal or stop-loss
                j = i + 1
                while j < N:
                    # Condition 1: Sell signal
                    is_sell_signal = (y[j] == -1)

                    # Condition 2: Stop-loss
                    # Loss is calculated as (current_price - buy_price) / buy_price
                    # A drop of S means the ratio is -S.
                    price_drop_ratio = (C[j-1] - buy_price) / buy_price
                    is_stop_loss_hit = (price_drop_ratio < -stop_loss)

                    if is_sell_signal or is_stop_loss_hit:
                        # Sell all quantity at the previous day's closing price C[j-1]
                        sell_price = C[j-1]
                        fund += quantity * sell_price
                        quantity = 0
                        i = j # Continue simulation from this point
                        break
                    else:
                        j += 1
                else: # This 'else' belongs to the inner 'while' loop
                    # If no sell signal was found until the end, break the outer loop
                    i = N
        else:
            i += 1

    # If there's any remaining quantity at the end of the simulation period, sell it
    if quantity > 0 and N > 0:
        final_price = C[N-1] # Sell at the last available closing price
        fund += quantity * final_price
        quantity = 0

    # Calculate the profit rate
    profit_rate = (fund - init_fund) / init_fund
    return profit_rate

# --- Main Orchestration Function ---

def run_all_indicator_optimizations(
    file_path,
    date_column='Date',
    price_column='Price',
    volume_column='Vol.',
    change_percent_column='Change %',
    open_column=None, high_column=None, low_column=None,
    fixed_buy_threshold=0.5,
    fixed_sell_threshold=-0.5,
    test_ratio=0.2 # Using test_ratio from the first code
):

    print(f"--- Starting All Technical Indicator Optimizations for {file_path} ---")

    # --- Data Loading and Initial Preprocessing (using the robust function) ---
    df_base = load_and_clean_data(
        file_path, date_column, price_column, volume_column, change_percent_column,
        open_column, high_column, low_column
    )

    if df_base is None:
        print("Data loading or initial cleaning failed. Aborting optimizations.")
        return {}, None # Return empty dict and None for results

    all_results = {}
    print(f"\nTotal data points available for analysis: {len(df_base)}")
    print(f"Using {test_ratio*100:.0f}% of data for testing in optimizations.")

    # --- 1. Bollinger Bands Optimization ---
    bollinger_param_ranges = {
        'window': [20],
        'num_std_dev': [2.0]
    }
    bollinger_accuracy, bollinger_params = run_indicator_optimization(
        df_base.copy(), 'Bollinger', bollinger_param_ranges,
        calculate_bollinger_signal, fixed_buy_threshold, fixed_sell_threshold,
        test_ratio, price_column
    )
    # Calculate profit rate for Bollinger Bands
    df_bollinger_signals = calculate_bollinger_signal(df_base.copy(), bollinger_params, price_column)
    _, test_df_bollinger = get_train_test_split(df_bollinger_signals.dropna(subset=['Bollinger_Signal', price_column]), test_samples_count=None, test_ratio=test_ratio)
    bollinger_profit_rate = simulate_trading_returns(test_df_bollinger, 'Bollinger_Signal', price_column) if not test_df_bollinger.empty else 0.0
    all_results['Bollinger Bands'] = {'Accuracy': bollinger_accuracy, 'Params': bollinger_params, 'Profit Rate': bollinger_profit_rate}

    # --- 2. MACD Optimization ---
    macd_param_ranges = {
        'fast_period': [12],
        'slow_period': [26],
        'signal_period': [9]
    }
    macd_accuracy, macd_params = run_indicator_optimization(
        df_base.copy(), 'MACD', macd_param_ranges,
        calculate_macd_signal, fixed_buy_threshold, fixed_sell_threshold,
        test_ratio, price_column
    )
    # Calculate profit rate for MACD
    df_macd_signals = calculate_macd_signal(df_base.copy(), macd_params, price_column)
    _, test_df_macd = get_train_test_split(df_macd_signals.dropna(subset=['MACD_Signal', price_column]), test_samples_count=None, test_ratio=test_ratio)
    macd_profit_rate = simulate_trading_returns(test_df_macd, 'MACD_Signal', price_column) if not test_df_macd.empty else 0.0
    all_results['MACD'] = {'Accuracy': macd_accuracy, 'Params': macd_params, 'Profit Rate': macd_profit_rate}

    # --- 3. RSI Optimization ---
    rsi_param_ranges = {
        'window': [14],
        'oversold_threshold': [30],
        'overbought_threshold': [70]
    }
    rsi_accuracy, rsi_params = run_indicator_optimization(
        df_base.copy(), 'RSI', rsi_param_ranges,
        calculate_rsi_signal, fixed_buy_threshold, fixed_sell_threshold,
        test_ratio, price_column
    )
    # Calculate profit rate for RSI
    df_rsi_signals = calculate_rsi_signal(df_base.copy(), rsi_params, price_column)
    _, test_df_rsi = get_train_test_split(df_rsi_signals.dropna(subset=['RSI_Signal', price_column]), test_samples_count=None, test_ratio=test_ratio)
    rsi_profit_rate = simulate_trading_returns(test_df_rsi, 'RSI_Signal', price_column) if not test_df_rsi.empty else 0.0
    all_results['RSI'] = {'Accuracy': rsi_accuracy, 'Params': rsi_params, 'Profit Rate': rsi_profit_rate}

   # --- 4. SMA Optimization (Using Dual Moving Averages) ---
    sma_param_ranges = {
        'fast_window': [20],
        'slow_window': [50]
    }
    sma_accuracy, sma_params = run_indicator_optimization(
        df_base.copy(), 'SMA', sma_param_ranges,
        calculate_sma_signal, fixed_buy_threshold, fixed_sell_threshold,
        test_ratio, price_column
    )
    # Calculate profit rate for SMA
    df_sma_signals = calculate_sma_signal(df_base.copy(), sma_params, price_column)
    _, test_df_sma = get_train_test_split(df_sma_signals.dropna(subset=['SMA_Signal', price_column]), test_samples_count=None, test_ratio=test_ratio)
    sma_profit_rate = simulate_trading_returns(test_df_sma, 'SMA_Signal', price_column) if not test_df_sma.empty else 0.0
    all_results['SMA'] = {'Accuracy': sma_accuracy, 'Params': sma_params, 'Profit Rate': sma_profit_rate}

    # --- 5. EMA Optimization (Using Dual Moving Averages) ---
    ema_param_ranges = {
        'fast_window': [12],
        'slow_window': [26]
    }
    ema_accuracy, ema_params = run_indicator_optimization(
        df_base.copy(), 'EMA', ema_param_ranges,
        calculate_ema_signal, fixed_buy_threshold, fixed_sell_threshold,
        test_ratio, price_column
    )
    # Calculate profit rate for EMA
    df_ema_signals = calculate_ema_signal(df_base.copy(), ema_params, price_column)
    _, test_df_ema = get_train_test_split(df_ema_signals.dropna(subset=['EMA_Signal', price_column]), test_samples_count=None, test_ratio=test_ratio)
    ema_profit_rate = simulate_trading_returns(test_df_ema, 'EMA_Signal', price_column) if not test_df_ema.empty else 0.0
    all_results['EMA'] = {'Accuracy': ema_accuracy, 'Params': ema_params, 'Profit Rate': ema_profit_rate}

    # --- Generate a final DataFrame with all best signals for combined analysis ---
    print("\n--- Generating Final DataFrame with All Best Individual Signals ---")
    df_final_all_signals = df_base.copy()
    # Calculate True_Action and Future_Price/Next_Day_Return on this comprehensive DataFrame
    df_final_all_signals = calculate_true_action(df_final_all_signals, fixed_buy_threshold, fixed_sell_threshold, price_column)

    # Apply best parameters for each indicator to the final DataFrame
    if all_results['Bollinger Bands']['Accuracy'] != -1:
        df_final_all_signals = calculate_bollinger_signal(df_final_all_signals, all_results['Bollinger Bands']['Params'], price_column)
    if all_results['MACD']['Accuracy'] != -1:
        df_final_all_signals = calculate_macd_signal(df_final_all_signals, all_results['MACD']['Params'], price_column)
    if all_results['RSI']['Accuracy'] != -1:
        df_final_all_signals = calculate_rsi_signal(df_final_all_signals, all_results['RSI']['Params'], price_column)
    if all_results['SMA']['Accuracy'] != -1:
        df_final_all_signals = calculate_sma_signal(df_final_all_signals, all_results['SMA']['Params'], price_column)
    if all_results['EMA']['Accuracy'] != -1:
        df_final_all_signals = calculate_ema_signal(df_final_all_signals, all_results['EMA']['Params'], price_column)

    # --- Calculate Probable Signal ---
    print("\n--- Calculating Probable Signal (Most Frequent of All Individual Signals) ---")
    potential_signal_cols = [
        'Bollinger_Signal', 'MACD_Signal', 'RSI_Signal', 'SMA_Signal', 'EMA_Signal'
    ]
    # Filter for only those signal columns that actually exist in the DataFrame
    active_individual_signal_cols = [col for col in potential_signal_cols if col in df_final_all_signals.columns]

    if not active_individual_signal_cols:
        print("Warning: No individual indicator signal columns found to calculate 'prob_signal'. 'prob_signal' column will be set to 'Hold'.")
        df_final_all_signals['prob_signal'] = 'Hold'
    else:
        def get_most_probable_signal(row, signal_cols):
            # Collect all valid signals for the current row
            row_signals = [row[col] for col in signal_cols if pd.notna(row[col])]

            if not row_signals:
                # If no valid signals are found for this row, default to 'Hold'
                return 'Hold'

            # Count occurrences of each signal
            counts = Counter(row_signals)
            # Return the most common signal. If there's a tie, Counter.most_common picks one deterministically.
            return counts.most_common(1)[0][0]

        df_final_all_signals['prob_signal'] = df_final_all_signals.apply(
            lambda row: get_most_probable_signal(row, active_individual_signal_cols), axis=1
        )

    # --- Calculate Accuracy and Profit for Probable Signal ---
    prob_signal_accuracy = -1
    prob_signal_profit_rate = 0.0
    if 'True_Action' in df_final_all_signals.columns and 'prob_signal' in df_final_all_signals.columns:
        # Drop NaNs for 'True_Action' and 'prob_signal' specifically for this accuracy calculation
        df_for_prob_eval = df_final_all_signals.dropna(subset=['True_Action', 'prob_signal', price_column])

        # Split for evaluation
        _, test_df_prob = get_train_test_split(df_for_prob_eval, test_samples_count=None, test_ratio=test_ratio)

        if not test_df_prob.empty:
            prob_signal_accuracy = accuracy_score(test_df_prob['True_Action'], test_df_prob['prob_signal'])
            prob_signal_profit_rate = simulate_trading_returns(test_df_prob, 'prob_signal', price_column)
            print(f"Accuracy for 'prob_signal': {prob_signal_accuracy:.2%}")
            print(f"Profit Rate for 'prob_signal': {prob_signal_profit_rate:.2%}")
        else:
            print("Warning: Not enough non-null 'True_Action' and 'prob_signal' values for combined signal evaluation.")
    else:
        print("Error: 'True_Action' or 'prob_signal' column missing for combined signal evaluation.")

    all_results['Probable Signal (Combined)'] = {'Accuracy': prob_signal_accuracy, 'Params': 'N/A', 'Profit Rate': prob_signal_profit_rate}

    # --- Final Summary and Display ---
    print("\n" + "="*50)
    print("--- Overall Optimization Results Summary ---")
    print("="*50)
    best_overall_accuracy = -1
    best_overall_profit_rate = -float('inf')
    best_overall_indicator_accuracy = "N/A"
    best_overall_indicator_profit = "N/A"

    for indicator, data in all_results.items():
        accuracy = data['Accuracy']
        profit_rate = data['Profit Rate']
        params = data['Params']
        if accuracy == -1:
            print(f"\n{indicator}: No valid results found. Check data and parameters.")
        else:
            print(f"\n{indicator}:")
            print(f"   Max Test Set Accuracy: {accuracy:.2%}")
            print(f"   Simulated Profit Rate: {profit_rate:.2%}")
            if params != 'N/A':
                print(f"   Best Parameters: {params}")

            if accuracy > best_overall_accuracy:
                best_overall_accuracy = accuracy
                best_overall_indicator_accuracy = indicator
            if profit_rate > best_overall_profit_rate:
                best_overall_profit_rate = profit_rate
                best_overall_indicator_profit = indicator

    print("\n" + "="*50)
    if best_overall_accuracy != -1:
        print(f"Overall Maximum Accuracy: {best_overall_accuracy:.2%}")
        print(f"Achieved by Indicator (Accuracy): {best_overall_indicator_accuracy}")
    else:
        print("Could not determine overall maximum accuracy due to no valid results.")

    if best_overall_profit_rate != -float('inf'):
        print(f"Overall Maximum Profit Rate: {best_overall_profit_rate:.2%}")
        print(f"Achieved by Indicator (Profit Rate): {best_overall_indicator_profit}")
    else:
        print("Could not determine overall maximum profit rate due to no valid results.")
    print("="*50)

    # --- Prepare Final Test Set Data for Display ---
    signal_cols_for_final_dropna = ['True_Action', 'prob_signal'] + active_individual_signal_cols
    existing_cols_for_final_dropna = [col for col in signal_cols_for_final_dropna if col in df_final_all_signals.columns]
    df_final_cleaned_for_display = df_final_all_signals.dropna(subset=existing_cols_for_final_dropna)
    _, test_df_all_signals = get_train_test_split(df_final_cleaned_for_display, test_samples_count=None, test_ratio=test_ratio)

    print(f"\n--- Test Set Dates (Based on data used for final display) ---")
    if not test_df_all_signals.empty:
        print(f"Test set dates: {test_df_all_signals['Date'].min().strftime('%d-%m-%Y')} to {test_df_all_signals['Date'].max().strftime('%d-%m-%Y')}")
    else:
        print("Test set is empty. No dates to display.")

    print(f"\n--- Test Set Data with All Indicator Signals & Probable Signal ({len(test_df_all_signals)} samples) ---")
    if not test_df_all_signals.empty:
        desired_order_base = ['Date', 'Price', 'Open', 'High', 'Low', 'Vol.', 'Change %', 'Future_Price', 'Next_Day_Return']
        final_display_order = [col for col in desired_order_base if col in test_df_all_signals.columns]
        final_display_order += [col for col in active_individual_signal_cols if col not in final_display_order]
        if 'prob_signal' in test_df_all_signals.columns:
            final_display_order.append('prob_signal')
        if 'True_Action' in test_df_all_signals.columns:
            final_display_order.append('True_Action')

        existing_display_cols = [col for col in final_display_order if col in test_df_all_signals.columns]
        print(test_df_all_signals[existing_display_cols].to_markdown(index=False, numalign="left", stralign="left"))
    else:
        print("Test set is empty. No data to display.")

    return all_results, test_df_all_signals

# --- Example Usage ---
if __name__ == "__main__":
    file_path_to_use = "GDFR Historical Data.csv" # Example file name

    results, final_test_data_df = run_all_indicator_optimizations(
        file_path=file_path_to_use,
        date_column='Date',
        price_column='Price',
        volume_column='Vol.',
        change_percent_column='Change %',
        # open_column='Open',
        # high_column='High',
        # low_column='Low',
        fixed_buy_threshold=0.5,
        fixed_sell_threshold=-0.5,
        test_ratio=0.2
    )

    # You can now access the full results dictionary and the final test DataFrame:
    # print("\nFull Optimization Results Dictionary:")
    # print(results)
    #
    # print("\nFinal Test Data DataFrame:")
    # print(final_test_data_df.head())

--- Starting All Technical Indicator Optimizations for GDFR Historical Data.csv ---

Total data points available for analysis: 280
Using 20% of data for testing in optimizations.

--- Optimizing Bollinger ---

--- Optimizing MACD ---

--- Optimizing RSI ---

--- Optimizing SMA ---

--- Optimizing EMA ---

--- Generating Final DataFrame with All Best Individual Signals ---

--- Calculating Probable Signal (Most Frequent of All Individual Signals) ---
Accuracy for 'prob_signal': 12.50%
Profit Rate for 'prob_signal': 0.00%

--- Overall Optimization Results Summary ---

Bollinger Bands:
   Max Test Set Accuracy: 14.29%
   Simulated Profit Rate: 0.00%
   Best Parameters: {'window': 20, 'num_std_dev': 2.0, 'buy_threshold_for_True_Action': 0.5, 'sell_threshold_for_True_Action': -0.5}

MACD:
   Max Test Set Accuracy: 17.86%
   Simulated Profit Rate: 2.04%
   Best Parameters: {'fast_period': 12, 'slow_period': 26, 'signal_period': 9, 'buy_threshold_for_True_Action': 0.5, 'sell_threshold_for_Tru

In [3]:
# technical indicators with profit based evaluation

import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score
from datetime import timedelta # Kept for potential future use or consistency

# --- Common Utility Functions ---

def load_and_clean_data(file_path, date_column='Date', price_column='Price',
                        volume_column='Vol.', change_percent_column='Change %',
                        open_column=None, high_column=None, low_column=None):

    try:
        df = pd.read_csv(file_path)
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
        return None
    except Exception as e:
        print(f"Error loading file: {e}")
        return None

    # Create a mapping for standard column names
    column_mapping = {
        date_column: 'Date',
        price_column: 'Price',
        volume_column: 'Vol.',
        change_percent_column: 'Change %'
    }
    # Add OHLC columns if provided
    if open_column: column_mapping[open_column] = 'Open'
    if high_column: column_mapping[high_column] = 'High'
    if low_column: column_mapping[low_column] = 'Low'

    # Only map columns that exist in the DataFrame
    existing_columns_to_map = {k: v for k, v in column_mapping.items() if k in df.columns}
    df.rename(columns=existing_columns_to_map, inplace=True)

    # Basic checks for essential columns after renaming
    if 'Date' not in df.columns:
        print(f"Error: Date column '{date_column}' (mapped to 'Date') not found in the dataset. Exiting.")
        return None
    if 'Price' not in df.columns:
        print(f"Error: Price column '{price_column}' (mapped to 'Price') not found in the dataset. Exiting.")
        return None

    # Convert 'Price' column to numeric, handling commas if present, and coercing errors
    df['Price'] = pd.to_numeric(df['Price'].astype(str).str.replace(',', '', regex=False), errors='coerce')

    # Handle 'Vol.' column if it exists
    if 'Vol.' in df.columns:
        df['Vol.'] = df['Vol.'].astype(str).str.replace(',', '', regex=False).str.replace('M', 'e6', regex=False).str.replace('K', 'e3', regex=False)
        df['Vol.'] = pd.to_numeric(df['Vol.'], errors='coerce')

    # Handle 'Change %' column if it exists
    if 'Change %' in df.columns:
        df['Change %'] = df['Change %'].astype(str).str.replace('%', '', regex=False)
        df['Change %'] = pd.to_numeric(df['Change %'], errors='coerce') / 100

    # Convert 'Date' column to datetime objects (flexible format inference)
    df['Date'] = pd.to_datetime(df['Date'], errors='coerce')

    # Drop rows with NaN values in essential columns ('Date', 'Price') after conversion
    df_cleaned = df.dropna(subset=['Date', 'Price']).copy()

    # Ensure the DataFrame is sorted chronologically by Date
    df_cleaned = df_cleaned.sort_values(by='Date').reset_index(drop=True)

    if df_cleaned.empty:
        print("Warning: DataFrame is empty after initial cleaning. Cannot proceed with analysis.")
        return None

    return df_cleaned

def calculate_true_action(df_temp, buy_threshold, sell_threshold, price_col='Price'):
    """Calculates the True_Action based on Next_Day_Return and given thresholds."""
    if price_col not in df_temp.columns:
        print(f"Error: Price column '{price_col}' not found for True Action calculation.")
        df_temp['True_Action'] = None
        return df_temp

    df_temp['Future_Price'] = df_temp[price_col].shift(-1)
    df_temp['Next_Day_Return'] = ((df_temp['Future_Price'] - df_temp[price_col]) / df_temp[price_col]) * 100

    def true_action_logic(row):
        if pd.isna(row['Next_Day_Return']):
            return None
        if row['Next_Day_Return'] > buy_threshold:
            return 'Buy'
        elif row['Next_Day_Return'] < sell_threshold:
            return 'Sell'
        else:
            return 'Hold'
    df_temp['True_Action'] = df_temp.apply(true_action_logic, axis=1)
    return df_temp

def get_train_test_split(df_data, test_samples_count=None, test_ratio=0.2):
    """Splits data into train and test sets based on the latest N samples or a ratio."""
    if test_samples_count is not None:
        if len(df_data) < test_samples_count:
            print(f"Warning: Dataset size ({len(df_data)}) is less than requested test samples ({test_samples_count}). Using entire dataset as test set.")
            return pd.DataFrame(), df_data.copy()
        train_df = df_data.iloc[:-test_samples_count].copy()
        test_df = df_data.iloc[-test_samples_count:].copy()
    else: # Default to ratio if samples_count is None
        split_index = int(len(df_data) * (1 - test_ratio))
        train_df = df_data.iloc[:split_index].copy()
        test_df = df_data.iloc[split_index:].copy()
    return train_df, test_df

def run_indicator_optimization(df_base, indicator_type, param_ranges, signal_calculation_func,
                               fixed_buy_threshold, fixed_sell_threshold, test_ratio=0.2, price_col='Price'):
    """
    Optimizes indicator parameters and returns best accuracy and parameters.
    `signal_calculation_func` is a function that takes (df, params, price_col) and returns df with 'Signal' column.
    """
    best_accuracy = -1
    best_params = {}

    # Calculate True_Action once for the base DataFrame for all iterations
    df_base_with_true_action = calculate_true_action(df_base.copy(), fixed_buy_threshold, fixed_sell_threshold, price_col)

    print(f"\n--- Optimizing {indicator_type} ---") # Keep this line
    param_keys = list(param_ranges.keys())

    # Recursive function to iterate through all parameter combinations
    def optimize_recursive(current_params, param_idx):
        nonlocal best_accuracy, best_params

        if param_idx == len(param_keys):
            df_temp = df_base_with_true_action.copy()

            # Calculate indicator and signal using the specific function and current parameters
            df_temp = signal_calculation_func(df_temp, current_params, price_col)

            # Drop NaNs for accuracy calculation, especially those from indicator and True_Action
            # The signal column name is dynamically generated
            signal_col_name = f'{indicator_type}_Signal'
            df_cleaned_for_accuracy = df_temp.dropna(subset=['True_Action', signal_col_name])

            if len(df_cleaned_for_accuracy) == 0:
                return # Skip if no valid data for accuracy calculation

            _, test_df = get_train_test_split(df_cleaned_for_accuracy, test_samples_count=None, test_ratio=test_ratio)

            if not test_df.empty:
                # Ensure the signal column is present and not entirely NaN in the test_df
                if signal_col_name in test_df.columns and test_df[signal_col_name].notna().any():
                    accuracy = accuracy_score(test_df['True_Action'], test_df[signal_col_name])

                    if accuracy > best_accuracy:
                        best_accuracy = accuracy
                        best_params = current_params.copy()
                        best_params['buy_threshold_for_True_Action'] = fixed_buy_threshold
                        best_params['sell_threshold_for_True_Action'] = fixed_sell_threshold
            return

        param_name = param_keys[param_idx]
        for value in param_ranges[param_name]:
            current_params[param_name] = value
            optimize_recursive(current_params, param_idx + 1)

    optimize_recursive({}, 0)

    # Removed the specific print statements for accuracy and parameters here
    # print(f"Optimization Complete for {indicator_type}.")
    # print(f"Max Test Set Accuracy for {indicator_type}: {best_accuracy:.2%}")
    # print(f"Best Parameters for {indicator_type}: {best_params}")

    return best_accuracy, best_params

# --- Indicator Specific Calculation Functions ---

def calculate_bollinger_signal(df_in, params, price_col='Price'):
    """
    Calculates Bollinger Bands and generates signals based on price crossovers.
    'Buy' when price crosses below Lower Band and then crosses back above Lower Band.
    'Sell' when price crosses above Upper Band and then crosses back below Upper Band.
    """
    df = df_in.copy()
    if price_col not in df.columns:
        print(f"Error: Price column '{price_col}' not found for Bollinger Bands calculation.")
        df['Bollinger_Signal'] = None
        return df

    window = params['window']
    num_std_dev = params['num_std_dev']

    df['Middle_Band'] = df[price_col].rolling(window=window).mean()
    df['Std_Dev'] = df[price_col].rolling(window=window).std()
    df['Upper_Band'] = df['Middle_Band'] + (df['Std_Dev'] * num_std_dev)
    df['Lower_Band'] = df['Middle_Band'] - (df['Std_Dev'] * num_std_dev)

    df['Bollinger_Signal'] = 'Hold'

    # Get previous day's price and band values for crossover detection
    df['Price_Prev'] = df[price_col].shift(1)
    df['Upper_Band_Prev'] = df['Upper_Band'].shift(1)
    df['Lower_Band_Prev'] = df['Lower_Band'].shift(1)

    # Ensure all necessary columns for crossover detection are not NaN
    valid_crossover_conditions = (
        df['Price'].notna() & df['Price_Prev'].notna() &
        df['Upper_Band'].notna() & df['Upper_Band_Prev'].notna() &
        df['Lower_Band'].notna() & df['Lower_Band_Prev'].notna()
    )

    # Buy signal: Price crosses UP through the Lower Band
    df.loc[valid_crossover_conditions &\
           (df['Price_Prev'] <= df['Lower_Band_Prev']) & # Price was at or below lower band
           (df['Price'] > df['Lower_Band']),            # Price is now above lower band
           'Bollinger_Signal'] = 'Buy'

    # Sell signal: Price crosses DOWN through the Upper Band
    df.loc[valid_crossover_conditions &\
           (df['Price_Prev'] >= df['Upper_Band_Prev']) & # Price was at or above upper band
           (df['Price'] < df['Upper_Band']),            # Price is now below upper band
           'Bollinger_Signal'] = 'Sell'

    # Clean up temporary columns
    df = df.drop(columns=['Price_Prev', 'Upper_Band_Prev', 'Lower_Band_Prev'], errors='ignore')

    return df

def calculate_macd_signal(df_in, params, price_col='Price'):
    """
    Calculates MACD and generates trading signals.
    'Buy' when MACD crosses above Signal Line, 'Sell' when MACD crosses below Signal Line.
    """
    df = df_in.copy()
    if price_col not in df.columns:
        print(f"Error: Price column '{price_col}' not found for MACD calculation.")
        df['MACD_Signal'] = None
        return df

    df = df.reset_index(drop=True)

    fast_period = params['fast_period']
    slow_period = params['slow_period']
    signal_period = params['signal_period']

    df['EMA_Fast'] = df[price_col].ewm(span=fast_period, adjust=False).mean()
    df['EMA_Slow'] = df[price_col].ewm(span=slow_period, adjust=False).mean()
    df['MACD'] = df['EMA_Fast'] - df['EMA_Slow']
    df['Signal_Line'] = df['MACD'].ewm(span=signal_period, adjust=False).mean()

    df['MACD_Signal'] = 'Hold'
    valid_macd_signals = df['MACD'].notna() & df['Signal_Line'].notna()

    # Buy signal: MACD crosses above Signal Line
    df.loc[valid_macd_signals & (df['MACD'].shift(1) < df['Signal_Line'].shift(1)) & \
       (df['MACD'] > df['Signal_Line']), 'MACD_Signal'] = 'Buy'
    # Sell signal: MACD crosses below Signal Line
    df.loc[valid_macd_signals & (df['MACD'].shift(1) > df['Signal_Line'].shift(1)) & \
       (df['MACD'] < df['Signal_Line']), 'MACD_Signal'] = 'Sell'
    return df

def calculate_rsi_signal(df_in, params, price_col='Price'):
    """
    Calculates RSI and generates trading signals.
    'Buy' when RSI crosses below oversold, 'Sell' when RSI crosses above overbought.
    """
    df = df_in.copy()
    if price_col not in df.columns:
        print(f"Error: Price column '{price_col}' not found for RSI calculation.")
        df['RSI_Signal'] = None
        return df

    window = params['window']
    oversold_threshold = params['oversold_threshold']
    overbought_threshold = params['overbought_threshold']

    df[price_col] = pd.to_numeric(df[price_col], errors='coerce')
    df = df.dropna(subset=[price_col])

    delta = df[price_col].diff()
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)

    avg_gain = gain.ewm(span=window, adjust=False).mean()
    avg_loss = loss.ewm(span=window, adjust=False).mean()

    rs = np.where(avg_loss == 0, np.inf, avg_gain / avg_loss)
    rs = pd.Series(rs, index=df.index)  # Convert numpy array back to Series to align with df index
    rs = rs.replace([np.inf, -np.inf], np.nan)
    rs = rs.fillna(0)  # Fill NaN (e.g., from 0/0 or initial periods) with 0.
    df['RSI'] = 100 - (100 / (1 + rs))

    df['RSI_Signal'] = 'Hold'
    valid_rsi_signals = df['RSI'].notna()

    # Buy signal: RSI crosses above oversold threshold (from below)
    df.loc[valid_rsi_signals & (df['RSI'].shift(1) < oversold_threshold) & \
       (df['RSI'] > oversold_threshold), 'RSI_Signal'] = 'Buy'
    # Sell signal: RSI crosses below overbought threshold (from above)
    df.loc[valid_rsi_signals & (df['RSI'].shift(1) > overbought_threshold) & \
       (df['RSI'] < overbought_threshold), 'RSI_Signal'] = 'Sell'
    return df

def calculate_sma_signal(df_in, params, price_col='Price'):
    """
    Calculates Dual SMA and generates signals based on their crossover.
    'Buy' when the fast SMA crosses above the slow SMA.
    'Sell' when the fast SMA crosses below the slow SMA.
    """
    df = df_in.copy()
    if price_col not in df.columns:
        print(f"Error: Price column '{price_col}' not found for SMA calculation.")
        df['SMA_Signal'] = None
        return df

    fast_window = params['fast_window']
    slow_window = params['slow_window']

    df['SMA_Fast'] = df[price_col].rolling(window=fast_window).mean()
    df['SMA_Slow'] = df[price_col].rolling(window=slow_window).mean()

    df['SMA_Signal'] = 'Hold'
    valid_crossover = df['SMA_Fast'].notna() & df['SMA_Slow'].notna()

    # Buy signal: Fast SMA crosses above Slow SMA
    df.loc[valid_crossover & (df['SMA_Fast'].shift(1) < df['SMA_Slow'].shift(1)) & \
           (df['SMA_Fast'] > df['SMA_Slow']), 'SMA_Signal'] = 'Buy'
    # Sell signal: Fast SMA crosses below Slow SMA
    df.loc[valid_crossover & (df['SMA_Fast'].shift(1) > df['SMA_Slow'].shift(1)) & \
           (df['SMA_Fast'] < df['SMA_Slow']), 'SMA_Signal'] = 'Sell'

    return df


def calculate_ema_signal(df_in, params, price_col='Price'):
    """
    Calculates Dual EMA and generates signals based on their crossover.
    'Buy' when the fast EMA crosses above the slow EMA.
    'Sell' when the fast EMA crosses below the slow EMA.
    """
    df = df_in.copy()
    if price_col not in df.columns:
        print(f"Error: Price column '{price_col}' not found for EMA calculation.")
        df['EMA_Signal'] = None
        return df

    fast_window = params['fast_window']
    slow_window = params['slow_window']

    df['EMA_Fast'] = df[price_col].ewm(span=fast_window, adjust=False).mean()
    df['EMA_Slow'] = df[price_col].ewm(span=slow_window, adjust=False).mean()

    df['EMA_Signal'] = 'Hold'
    valid_crossover = df['EMA_Fast'].notna() & df['EMA_Slow'].notna()

    # Buy signal: Fast EMA crosses above Slow EMA
    df.loc[valid_crossover & (df['EMA_Fast'].shift(1) < df['EMA_Slow'].shift(1)) & \
           (df['EMA_Fast'] > df['EMA_Slow']), 'EMA_Signal'] = 'Buy'
    # Sell signal: Fast EMA crosses below Slow SMA
    df.loc[valid_crossover & (df['EMA_Fast'].shift(1) > df['EMA_Slow'].shift(1)) & \
           (df['EMA_Fast'] < df['EMA_Slow']), 'EMA_Signal'] = 'Sell'

    return df

# --- Profit Simulation Function ---
def simulate_trading_returns(df: pd.DataFrame, predicted_col: str, price_col: str, stop_loss: float = 0.05) -> float:
    """
    Simulates trading based on predicted signals and calculates the profit rate.

    Returns:
        float: The final profit rate.
    """
    # Map string labels to numerical labels as per the algorithm: Buy=1, Sell=-1, Hold=0
    signal_map = {'Buy': 1, 'Sell': -1, 'Hold': 0}
    y = df[predicted_col].map(signal_map).fillna(0).values
    C = df[price_col].values
    N = len(y)

    init_fund = 100000.0
    fund = init_fund
    quantity = 0
    i = 0

    # We need prices at i-1, so we start the loop from i=1
    if i == 0:
        i = 1

    while i < N:
        # Check for BUY signal
        if y[i] == 1 and fund > 0:
            # Buy at the previous day's closing price C[i-1]
            buy_price = C[i-1]
            if buy_price > 0:
                # Buy as many shares as possible
                quantity_to_buy = fund / buy_price
                cost = quantity_to_buy * buy_price

                # Update fund and quantity
                fund -= cost
                quantity += quantity_to_buy

                # Move to find a SELL signal or stop-loss
                j = i + 1
                while j < N:
                    # Condition 1: Sell signal
                    is_sell_signal = (y[j] == -1)

                    # Condition 2: Stop-loss
                    # Loss is calculated as (current_price - buy_price) / buy_price
                    # A drop of S means the ratio is -S.
                    price_drop_ratio = (C[j-1] - buy_price) / buy_price
                    is_stop_loss_hit = (price_drop_ratio < -stop_loss)

                    if is_sell_signal or is_stop_loss_hit:
                        # Sell all quantity at the previous day's closing price C[j-1]
                        sell_price = C[j-1]
                        fund += quantity * sell_price
                        quantity = 0
                        i = j # Continue simulation from this point
                        break
                    else:
                        j += 1
                else: # This 'else' belongs to the inner 'while' loop
                    # If no sell signal was found until the end, break the outer loop
                    i = N
        else:
            i += 1

    # If there's any remaining quantity at the end of the simulation period, sell it
    if quantity > 0 and N > 0:
        final_price = C[N-1] # Sell at the last available closing price
        fund += quantity * final_price
        quantity = 0

    # Calculate the profit rate
    profit_rate = (fund - init_fund) / init_fund
    return profit_rate


# --- Main Orchestration Function ---

def run_all_indicator_optimizations(
    file_path,
    date_column='Date',
    price_column='Price',
    volume_column='Vol.',
    change_percent_column='Change %',
    open_column=None, high_column=None, low_column=None,
    fixed_buy_threshold=0.5,
    fixed_sell_threshold=-0.5,
    test_ratio=0.2 # Changed to test_ratio from test_samples_count
):

    print(f"--- Starting All Technical Indicator Optimizations for {file_path} ---")

    # --- Data Loading and Initial Preprocessing (using the robust function) ---
    df_base = load_and_clean_data(
        file_path, date_column, price_column, volume_column, change_percent_column,
        open_column, high_column, low_column
    )

    if df_base is None:
        print("Data loading or initial cleaning failed. Aborting optimizations.")
        return {}, None # Return empty dict and None for results

    all_results = {}
    print(f"\nTotal data points available for analysis: {len(df_base)}")
    print(f"Using {test_ratio*100:.0f}% of data for testing in optimizations.") # Updated message

    # --- 1. Bollinger Bands Optimization ---
    bollinger_param_ranges = {
        'window': [20],
        'num_std_dev': [2.0]
    }
    bollinger_accuracy, bollinger_params = run_indicator_optimization(
        df_base.copy(), 'Bollinger', bollinger_param_ranges,
        calculate_bollinger_signal, fixed_buy_threshold, fixed_sell_threshold,
        test_ratio, price_column
    )
    # Calculate profit rate for Bollinger Bands
    df_bollinger_signals = calculate_bollinger_signal(df_base.copy(), bollinger_params, price_column)
    _, test_df_bollinger = get_train_test_split(df_bollinger_signals.dropna(subset=['Bollinger_Signal', price_column]), test_samples_count=None, test_ratio=test_ratio)
    bollinger_profit_rate = simulate_trading_returns(test_df_bollinger, 'Bollinger_Signal', price_column) if not test_df_bollinger.empty else 0.0
    all_results['Bollinger Bands'] = {'Accuracy': bollinger_accuracy, 'Params': bollinger_params, 'Profit Rate': bollinger_profit_rate}


    # --- 2. MACD Optimization ---
    macd_param_ranges = {
        'fast_period': [12],
        'slow_period': [26],
        'signal_period': [9]
    }
    macd_accuracy, macd_params = run_indicator_optimization(
        df_base.copy(), 'MACD', macd_param_ranges,
        calculate_macd_signal, fixed_buy_threshold, fixed_sell_threshold,
        test_ratio, price_column
    )
    # Calculate profit rate for MACD
    df_macd_signals = calculate_macd_signal(df_base.copy(), macd_params, price_column)
    _, test_df_macd = get_train_test_split(df_macd_signals.dropna(subset=['MACD_Signal', price_column]), test_samples_count=None, test_ratio=test_ratio)
    macd_profit_rate = simulate_trading_returns(test_df_macd, 'MACD_Signal', price_column) if not test_df_macd.empty else 0.0
    all_results['MACD'] = {'Accuracy': macd_accuracy, 'Params': macd_params, 'Profit Rate': macd_profit_rate}

    # --- 3. RSI Optimization ---
    rsi_param_ranges = {
        'window': [14],
        'oversold_threshold': [30],
        'overbought_threshold': [70]
    }
    rsi_accuracy, rsi_params = run_indicator_optimization(
        df_base.copy(), 'RSI', rsi_param_ranges,
        calculate_rsi_signal, fixed_buy_threshold, fixed_sell_threshold,
        test_ratio, price_column
    )
    # Calculate profit rate for RSI
    df_rsi_signals = calculate_rsi_signal(df_base.copy(), rsi_params, price_column)
    _, test_df_rsi = get_train_test_split(df_rsi_signals.dropna(subset=['RSI_Signal', price_column]), test_samples_count=None, test_ratio=test_ratio)
    rsi_profit_rate = simulate_trading_returns(test_df_rsi, 'RSI_Signal', price_column) if not test_df_rsi.empty else 0.0
    all_results['RSI'] = {'Accuracy': rsi_accuracy, 'Params': rsi_params, 'Profit Rate': rsi_profit_rate}

   # --- 4. SMA Optimization (Using Dual Moving Averages) ---
    sma_param_ranges = {
        'fast_window': [20], # Example fast windows
        'slow_window': [50] # Example slow windows
    }

    sma_accuracy, sma_params = run_indicator_optimization(
        df_base.copy(), 'SMA', sma_param_ranges,
        calculate_sma_signal, fixed_buy_threshold, fixed_sell_threshold,
        test_ratio, price_column
    )
    # Calculate profit rate for SMA
    df_sma_signals = calculate_sma_signal(df_base.copy(), sma_params, price_column)
    _, test_df_sma = get_train_test_split(df_sma_signals.dropna(subset=['SMA_Signal', price_column]), test_samples_count=None, test_ratio=test_ratio)
    sma_profit_rate = simulate_trading_returns(test_df_sma, 'SMA_Signal', price_column) if not test_df_sma.empty else 0.0
    all_results['SMA'] = {'Accuracy': sma_accuracy, 'Params': sma_params, 'Profit Rate': sma_profit_rate}


    # --- 5. EMA Optimization (Using Dual Moving Averages) ---
    ema_param_ranges = {
        'fast_window': [12], # Example fast windows
        'slow_window': [26] # Example slow windows
    }

    ema_accuracy, ema_params = run_indicator_optimization(
        df_base.copy(), 'EMA', ema_param_ranges,
        calculate_ema_signal, fixed_buy_threshold, fixed_sell_threshold,
        test_ratio, price_column
    )
    # Calculate profit rate for EMA
    df_ema_signals = calculate_ema_signal(df_base.copy(), ema_params, price_column)
    _, test_df_ema = get_train_test_split(df_ema_signals.dropna(subset=['EMA_Signal', price_column]), test_samples_count=None, test_ratio=test_ratio)
    ema_profit_rate = simulate_trading_returns(test_df_ema, 'EMA_Signal', price_column) if not test_df_ema.empty else 0.0
    all_results['EMA'] = {'Accuracy': ema_accuracy, 'Params': ema_params, 'Profit Rate': ema_profit_rate}

    # --- Final Summary and Display ---
    print("\n" + "="*50)
    print("--- Overall Optimization Results Summary ---")
    print("="*50)
    best_overall_accuracy = -1
    best_overall_profit_rate = -float('inf') # Initialize with negative infinity for profit rate
    best_overall_indicator_accuracy = "N/A"
    best_overall_indicator_profit = "N/A"


    for indicator, data in all_results.items():
        accuracy = data['Accuracy']
        params = data['Params']
        profit_rate = data['Profit Rate']
        if accuracy == -1:
            print(f"\n{indicator}: No valid accuracy found. Check data and parameters.")
        else:
            print(f"\n{indicator}:")
            print(f"   Max Test Set Accuracy: {accuracy:.2%}")
            print(f"   Simulated Profit Rate: {profit_rate:.2%}")
            print(f"   Best Parameters: {params}")
            if accuracy > best_overall_accuracy:
                best_overall_accuracy = accuracy
                best_overall_indicator_accuracy = indicator
            if profit_rate > best_overall_profit_rate:
                best_overall_profit_rate = profit_rate
                best_overall_indicator_profit = indicator


    print("\n" + "="*50)
    if best_overall_accuracy != -1:
        print(f"Overall Maximum Accuracy: {best_overall_accuracy:.2%}")
        print(f"Achieved by Indicator (Accuracy): {best_overall_indicator_accuracy}")
    else:
        print("Could not determine overall maximum accuracy due to no valid results.")

    if best_overall_profit_rate != -float('inf'):
        print(f"Overall Maximum Profit Rate: {best_overall_profit_rate:.2%}")
        print(f"Achieved by Indicator (Profit Rate): {best_overall_indicator_profit}")
    else:
        print("Could not determine overall maximum profit rate due to no valid results.")
    print("="*50)

    # --- Generate a final DataFrame with all best signals ---
    print("\n--- Generating Final DataFrame with All Best Signals ---")
    df_final_all_signals = df_base.copy()
    df_final_all_signals = calculate_true_action(df_final_all_signals, fixed_buy_threshold, fixed_sell_threshold, price_column)

    # Apply best parameters for each indicator
    if all_results['Bollinger Bands']['Accuracy'] != -1: # Only apply if optimization was successful
        df_final_all_signals = calculate_bollinger_signal(df_final_all_signals, all_results['Bollinger Bands']['Params'], price_column)
    if all_results['MACD']['Accuracy'] != -1:
        df_final_all_signals = calculate_macd_signal(df_final_all_signals, all_results['MACD']['Params'], price_column)
    if all_results['RSI']['Accuracy'] != -1:
        df_final_all_signals = calculate_rsi_signal(df_final_all_signals, all_results['RSI']['Params'], price_column)
    if all_results['SMA']['Accuracy'] != -1:
        df_final_all_signals = calculate_sma_signal(df_final_all_signals, all_results['SMA']['Params'], price_column)
    if all_results['EMA']['Accuracy'] != -1:
        df_final_all_signals = calculate_ema_signal(df_final_all_signals, all_results['EMA']['Params'], price_column)

    # Prepare for final test set display
    # List all possible signal columns plus True_Action
    signal_cols_to_check = ['True_Action', 'Bollinger_Signal', 'MACD_Signal', 'RSI_Signal', 'SMA_Signal', 'EMA_Signal']
    # Ensure all columns exist before trying to drop NaNs
    existing_signal_cols = [col for col in signal_cols_to_check if col in df_final_all_signals.columns]

    # Drop NaNs from the relevant signal columns AND 'True_Action' for a clean comparison set
    df_final_cleaned_for_display = df_final_all_signals.dropna(subset=existing_signal_cols)


    _, test_df_all_signals = get_train_test_split(df_final_cleaned_for_display, test_samples_count=None, test_ratio=test_ratio)

    print(f"\n--- Test Set Dates (Based on data used for final display) ---")
    if not test_df_all_signals.empty:
        print(f"Test set dates: {test_df_all_signals['Date'].min().strftime('%d-%m-%Y')} to {test_df_all_signals['Date'].max().strftime('%d-%m-%Y')}")
    else:
        print("Test set is empty. No dates to display.")


    print(f"\n--- Test Set Data with All Indicator Signals ({len(test_df_all_signals)} samples) ---")
    if not test_df_all_signals.empty:
        # Define display columns: Date, Price, followed by all originally present cols, then future prices,
        # then all signals, and finally True_Action
        initial_cols_to_display = ['Date', 'Price', 'Open', 'High', 'Low', 'Vol.', 'Change %']
        calculated_cols_to_display = ['Future_Price', 'Next_Day_Return']

        # Combine all parts of the desired display columns
        display_cols_final = []
        for col in initial_cols_to_display + calculated_cols_to_display:
            if col in test_df_all_signals.columns:
                display_cols_final.append(col)

        # Add existing signal columns (excluding True_Action for now, to add it last)
        for col in existing_signal_cols:
            if col != 'True_Action' and col not in display_cols_final:
                display_cols_final.append(col)

        # Add True_Action at the very end if it exists
        if 'True_Action' in test_df_all_signals.columns:
            display_cols_final.append('True_Action')


        # Filter for only columns that actually exist in the final test_df
        existing_display_cols = [col for col in display_cols_final if col in test_df_all_signals.columns]
        print(test_df_all_signals[existing_display_cols].to_markdown(index=False, numalign="left", stralign="left"))
    else:
        print("Test set is empty. No data to display.")

    return all_results, test_df_all_signals


# --- Example Usage ---
if __name__ == "__main__":
    # Define your file path and column names
    # Adjust these according to your CSV file.
    # Make sure the specified CSV file exists in the same directory as this script.
    file_path_to_use = "GDFR Historical Data.csv" # Example file name

    results, final_test_data_df = run_all_indicator_optimizations(
        file_path=file_path_to_use,
        date_column='Date',           # Default: 'Date'
        price_column='Price',         # Default: 'Price'
        volume_column='Vol.',         # Default: 'Vol.'
        change_percent_column='Change %', # Default: 'Change %'
        # Uncomment and adjust these if your data has Open, High, Low columns:
        # open_column='Open',
        # high_column='High',
        # low_column='Low',
        fixed_buy_threshold=0.5,      # Percentage threshold for 'Buy' in True_Action (e.g., 0.5 for 0.5%)
        fixed_sell_threshold=-0.5,    # Percentage threshold for 'Sell' in True_Action (e.g., -0.5 for -0.5%)
        test_ratio=0.2         # Changed from test_samples_count to test_ratio
    )

    # You can now access the full results dictionary and the final test DataFrame:
    # print("\nFull Optimization Results Dictionary:")
    # print(results)
    #
    # print("\nFinal Test Data DataFrame:")
    # print(final_test_data_df.head())

--- Starting All Technical Indicator Optimizations for GDFR Historical Data.csv ---

Total data points available for analysis: 280
Using 20% of data for testing in optimizations.

--- Optimizing Bollinger ---

--- Optimizing MACD ---

--- Optimizing RSI ---

--- Optimizing SMA ---

--- Optimizing EMA ---

--- Overall Optimization Results Summary ---

Bollinger Bands:
   Max Test Set Accuracy: 14.29%
   Simulated Profit Rate: 0.00%
   Best Parameters: {'window': 20, 'num_std_dev': 2.0, 'buy_threshold_for_True_Action': 0.5, 'sell_threshold_for_True_Action': -0.5}

MACD:
   Max Test Set Accuracy: 17.86%
   Simulated Profit Rate: 2.04%
   Best Parameters: {'fast_period': 12, 'slow_period': 26, 'signal_period': 9, 'buy_threshold_for_True_Action': 0.5, 'sell_threshold_for_True_Action': -0.5}

RSI:
   Max Test Set Accuracy: 17.86%
   Simulated Profit Rate: 0.00%
   Best Parameters: {'window': 14, 'oversold_threshold': 30, 'overbought_threshold': 70, 'buy_threshold_for_True_Action': 0.5, 'sell