In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import itertools
import random
import logging
from datetime import datetime
import ta
import yaml
import os

ModuleNotFoundError: No module named 'torch'

In [None]:
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler("backtesting.log"),
        logging.StreamHandler()
    ]
)

NameError: name 'logging' is not defined

In [None]:
# Load configuration from a YAML file
config_file = 'config.yaml'

if not os.path.exists(config_file):
    # Default configuration
    config = {
        'data_path': 'btcusd_30m.csv',
        'initial_balance': 250,
        'broker_fee': 0.002,
        'slippage': 0.00005,
        'risk_per_trade': [0.005, 0.01, 0.02],
        'trailing_stop_pct': [0.01, 0.02, 0.03],
        'slice_sizes': [500, 1000, 1500, 2000, 2500],
        'indicators': {
            'SMA': {'periods': [10, 20]},
            'EMA': {'periods': [10, 20]},
            'RSI': {'periods': [14], 'overbought': [70], 'oversold': [30]},
            'MACD': {'fast_periods': [12], 'slow_periods': [26], 'signal_periods': [9], 'thresholds': [0]},
            'BB': {'periods': [20], 'std_devs': [2]},
            'ATR': {'periods': [14]},
        },
        'strategies': {
            'BreakoutStrategy': {
                'period': [5, 10],
                'max_length': [100, 200],
                'threshold_rate': [1, 2],
                'min_tests': [2, 3],
                'indicator_threshold': [2, 3],
                'weights': {
                    'breakout_weight': 1,
                    'ma_weight': 1,
                    'rsi_weight': 1,
                    'macd_weight': 1,
                    'bb_weight': 1,
                }
            }
        },
        'stop_loss_pct': [0.01, 0.02],
        'take_profit_pct': [0.02, 0.04],
    }
    # Save default configuration to file
    with open(config_file, 'w') as f:
        yaml.dump(config, f)
else:
    # Load configuration from file
    with open(config_file, 'r') as f:
        config = yaml.safe_load(f)

# Load dataset
data_path = config['data_path']
data = pd.read_csv(data_path, parse_dates=True, index_col="date")

In [None]:
# Compute technical indicators based on configuration
def compute_indicators(data, indicators_config):
    for indicator_name, params in indicators_config.items():
        if indicator_name in ['SMA', 'EMA']:
            for period in params['periods']:
                if indicator_name == 'SMA':
                    data[f'SMA_{period}'] = data['close'].rolling(window=period).mean()
                else:
                    data[f'EMA_{period}'] = data['close'].ewm(span=period, adjust=False).mean()
        elif indicator_name == 'RSI':
            for period in params['periods']:
                data[f'RSI_{period}'] = ta.momentum.rsi(data['close'], window=period)
        elif indicator_name == 'MACD':
            for fast, slow, signal in itertools.product(params['fast_periods'], params['slow_periods'], params['signal_periods']):
                macd = ta.trend.macd_diff(data['close'], 
                                          window_slow=slow, 
                                          window_fast=fast, 
                                          window_sign=signal)
                data[f'MACD_diff_{fast}_{slow}_{signal}'] = macd
        elif indicator_name == 'BB':
            for period, std_dev in itertools.product(params['periods'], params['std_devs']):
                bb = ta.volatility.BollingerBands(data['close'], window=period, window_dev=std_dev)
                data[f'BB_high_{period}_{std_dev}'] = bb.bollinger_hband()
                data[f'BB_low_{period}_{std_dev}'] = bb.bollinger_lband()
        elif indicator_name == 'ATR':
            for period in params['periods']:
                data[f'ATR_{period}'] = ta.volatility.average_true_range(data['high'], data['low'], data['close'], window=period)
    return data

In [None]:
data = compute_indicators(data, config['indicators'])

# Handle NaN values resulting from indicator calculations
data.dropna(inplace=True)

# Constants
INITIAL_BALANCE = config['initial_balance']
BROKER_FEE = config['broker_fee']
SLIPPAGE = config['slippage']
SLICE_SIZES = config['slice_sizes']


In [None]:
# Define the Strategy class
class Strategy:
    def __init__(self, config, strategy_params):
        self.config = config
        self.strategy_params = strategy_params
        strategy_name = list(config['strategies'].keys())[0]
        strategy_config = config['strategies'][strategy_name]
        self.period = strategy_params['period']
        self.max_length = strategy_params['max_length']
        self.threshold_rate = strategy_params['threshold_rate'] / 100
        self.min_tests = strategy_params['min_tests']
        self.indicator_threshold = strategy_params['indicator_threshold']
        self.weights = strategy_config['weights']
        # Indicators
        self.ma_type = strategy_params.get('ma_type')
        self.ma_period = strategy_params.get('ma_period')
        self.rsi_period = strategy_params.get('rsi_period')
        self.rsi_overbought = strategy_params.get('rsi_overbought')
        self.rsi_oversold = strategy_params.get('rsi_oversold')
        self.macd_fast_period = strategy_params.get('macd_fast_period')
        self.macd_slow_period = strategy_params.get('macd_slow_period')
        self.macd_signal_period = strategy_params.get('macd_signal_period')
        self.macd_threshold = strategy_params.get('macd_threshold')
        self.bb_period = strategy_params.get('bb_period')
        self.bb_std_dev = strategy_params.get('bb_std_dev')
        self.stop_loss_pct = strategy_params['stop_loss_pct']
        self.take_profit_pct = strategy_params['take_profit_pct']
        self.risk_per_trade = strategy_params['risk_per_trade']
        self.trailing_stop_pct = strategy_params['trailing_stop_pct']
        self.atr_period = strategy_params.get('atr_period')
        self.use_atr_stop = strategy_params.get('use_atr_stop', False)
        self.atr_multiplier = strategy_params.get('atr_multiplier', 1)

    def find_breakouts(self, data):
        highs = data['high'].rolling(window=self.period).max()
        lows = data['low'].rolling(window=self.period).min()
        close_prices = data['close']
        signals = pd.Series(0, index=data.index)

        # Breakout signals
        breakout_signal = (close_prices > highs.shift(1))
        signals[breakout_signal] = 1

        # Breakdown signals
        breakdown_signal = (close_prices < lows.shift(1))
        signals[breakdown_signal] = -1

        return signals

    def apply_trading_strategy(self, data):
        signals = self.find_breakouts(data)

        # MA crossover signals
        ma_column = f'{self.ma_type}_{self.ma_period}'
        if ma_column in data.columns:
            ma_series = data[ma_column]
            ma_signal = np.where(data['close'] > ma_series, 1, -1)
            signals += ma_signal

        # RSI signals
        rsi_column = f'RSI_{self.rsi_period}'
        if rsi_column in data.columns:
            rsi_series = data[rsi_column]
            rsi_signal = np.where(rsi_series < self.rsi_oversold, 1,
                                  np.where(rsi_series > self.rsi_overbought, -1, 0))
            signals += rsi_signal

        # MACD signals
        macd_column = f'MACD_diff_{self.macd_fast_period}_{self.macd_slow_period}_{self.macd_signal_period}'
        if macd_column in data.columns:
            macd_series = data[macd_column]
            macd_signal = np.where(macd_series > self.macd_threshold, 1, -1)
            signals += macd_signal

        # Bollinger Bands signals
        bb_high_column = f'BB_high_{self.bb_period}_{self.bb_std_dev}'
        bb_low_column = f'BB_low_{self.bb_period}_{self.bb_std_dev}'
        if bb_high_column in data.columns and bb_low_column in data.columns:
            bb_high = data[bb_high_column]
            bb_low = data[bb_low_column]
            bb_signal = np.where(data['close'] < bb_low, 1,
                                 np.where(data['close'] > bb_high, -1, 0))
            signals += bb_signal

        # Combine signals
        final_signals = np.where(signals >= self.indicator_threshold, 1,
                                 np.where(signals <= -self.indicator_threshold, -1, 0))
        return final_signals

In [None]:
def backtest(data, signals, strategy_params):
    balance = INITIAL_BALANCE
    position = 0
    portfolio = []
    trade_log = []
    entry_price = 0
    trailing_stop_price = 0
    peak = INITIAL_BALANCE
    drawdowns = []
    wins = 0
    losses = 0
    profit = 0

    for i in range(len(data)):
        signal = signals[i]
        close_price = data['close'].iloc[i]

        # Update trailing stop if position is open
        if position > 0:
            # Update trailing stop price
            potential_trailing_stop = close_price * (1 - strategy_params['trailing_stop_pct'])
            trailing_stop_price = max(trailing_stop_price, potential_trailing_stop)

            # Check for stop-loss or take-profit
            if strategy_params.get('use_atr_stop', False):
                atr_value = data[f'ATR_{strategy_params["atr_period"]}'].iloc[i]
                stop_loss_price = entry_price - atr_value * strategy_params['atr_multiplier']
                take_profit_price = entry_price + atr_value * strategy_params['atr_multiplier']
            else:
                stop_loss_price = entry_price * (1 - strategy_params['stop_loss_pct'])
                take_profit_price = entry_price * (1 + strategy_params['take_profit_pct'])

            if close_price <= stop_loss_price:
                balance += position * close_price * (1 - BROKER_FEE - SLIPPAGE)
                profit += balance - INITIAL_BALANCE
                position = 0
                losses += 1
                trade_log.append((i, "STOP LOSS SELL", close_price, balance, position))
                logging.info(f"Stop-loss triggered at index {i} at price {close_price}")
            elif close_price >= take_profit_price:
                balance += position * close_price * (1 - BROKER_FEE - SLIPPAGE)
                profit += balance - INITIAL_BALANCE
                position = 0
                wins += 1
                trade_log.append((i, "TAKE PROFIT SELL", close_price, balance, position))
                logging.info(f"Take-profit triggered at index {i} at price {close_price}")
            elif close_price <= trailing_stop_price:
                balance += position * close_price * (1 - BROKER_FEE - SLIPPAGE)
                profit += balance - INITIAL_BALANCE
                position = 0
                losses += 1
                trade_log.append((i, "TRAILING STOP SELL", close_price, balance, position))
                logging.info(f"Trailing stop-loss triggered at index {i} at price {close_price}")

        # Execute trades based on signals
        if signal == 1 and position == 0 and balance > 0:
            # Calculate position size based on risk per trade
            if strategy_params.get('use_atr_stop', False):
                atr_value = data[f'ATR_{strategy_params["atr_period"]}'].iloc[i]
                stop_loss_price = close_price - atr_value * strategy_params['atr_multiplier']
                risk_amount = balance * strategy_params['risk_per_trade']
                position_size = risk_amount / (close_price - stop_loss_price)
            else:
                risk_amount = balance * strategy_params['risk_per_trade']
                stop_loss_price = close_price * (1 - strategy_params['stop_loss_pct'])
                position_size = risk_amount / (close_price - stop_loss_price)
            position_size = min(position_size, balance / close_price)
            position = position_size * (1 - BROKER_FEE - SLIPPAGE)
            balance -= position_size * close_price
            entry_price = close_price
            trailing_stop_price = close_price * (1 - strategy_params['trailing_stop_pct'])
            trade_log.append((i, "BUY", close_price, balance, position))
            logging.info(f"Bought at index {i} at price {close_price} with position size {position}")
        elif signal == -1 and position > 0:
            balance += position * close_price * (1 - BROKER_FEE - SLIPPAGE)
            profit += balance - INITIAL_BALANCE
            position = 0
            wins += 1 if balance > INITIAL_BALANCE else 0
            losses += 1 if balance <= INITIAL_BALANCE else 0
            trade_log.append((i, "SELL", close_price, balance, position))
            logging.info(f"Sold at index {i} at price {close_price}")

        total_portfolio_value = balance + position * close_price if position > 0 else balance
        portfolio.append(total_portfolio_value)

        # Calculate drawdowns
        peak = max(peak, total_portfolio_value)
        drawdown = (peak - total_portfolio_value) / peak
        drawdowns.append(drawdown)

    # Calculate performance metrics
    portfolio_array = np.array(portfolio)
    returns = np.diff(portfolio_array) / portfolio_array[:-1]
    sharpe_ratio = (np.mean(returns) / np.std(returns)) * np.sqrt(252) if np.std(returns) != 0 else 0
    max_drawdown = max(drawdowns) if drawdowns else 0
    win_rate = wins / (wins + losses) if (wins + losses) > 0 else 0

    metrics = {
        'final_value': portfolio[-1] if len(portfolio) > 0 else INITIAL_BALANCE,
        'profit': profit,
        'sharpe_ratio': sharpe_ratio,
        'max_drawdown': max_drawdown,
        'win_rate': win_rate,
        'trades': wins + losses
    }

    return portfolio, trade_log, metrics

In [None]:
def run_single_test(strategy_params, data):
    # Adjust slice_size if it's too large
    max_possible_slice_size = len(data)
    slice_size = random.choice(SLICE_SIZES)
    slice_size = min(slice_size, max_possible_slice_size)

    if slice_size <= 0:
        logging.warning("Not enough data for the selected slice_size and indicator periods.")
        return strategy_params, None, None, None

    start_idx = random.randint(0, len(data) - slice_size)
    sliced_data = data.iloc[start_idx:start_idx + slice_size].copy()

    strategy = Strategy(config, strategy_params)

    signals = strategy.apply_trading_strategy(sliced_data)
    portfolio, trade_log, metrics = backtest(sliced_data, signals, strategy_params)

    logging.info(f"Tested params: {strategy_params} | Metrics: {metrics}")
    return strategy_params, portfolio, trade_log, metrics


In [None]:
def run_tests():
    strategy_name = list(config['strategies'].keys())[0]
    strategy_config = config['strategies'][strategy_name]
    indicators_config = config['indicators']

    # Generate parameter combinations
    param_grid = {
        'period': strategy_config['period'],
        'max_length': strategy_config['max_length'],
        'threshold_rate': strategy_config['threshold_rate'],
        'min_tests': strategy_config['min_tests'],
        'indicator_threshold': strategy_config['indicator_threshold'],
        'stop_loss_pct': config['stop_loss_pct'],
        'take_profit_pct': config['take_profit_pct'],
        'risk_per_trade': config['risk_per_trade'],
        'trailing_stop_pct': config['trailing_stop_pct'],
    }

    # Add indicator parameters
    if 'SMA' in indicators_config or 'EMA' in indicators_config:
        param_grid['ma_type'] = ['SMA', 'EMA']
        param_grid['ma_period'] = indicators_config.get('SMA', {}).get('periods', []) + indicators_config.get('EMA', {}).get('periods', [])
    if 'RSI' in indicators_config:
        param_grid['rsi_period'] = indicators_config['RSI']['periods']
        param_grid['rsi_overbought'] = indicators_config['RSI']['overbought']
        param_grid['rsi_oversold'] = indicators_config['RSI']['oversold']
    if 'MACD' in indicators_config:
        param_grid['macd_fast_period'] = indicators_config['MACD']['fast_periods']
        param_grid['macd_slow_period'] = indicators_config['MACD']['slow_periods']
        param_grid['macd_signal_period'] = indicators_config['MACD']['signal_periods']
        param_grid['macd_threshold'] = indicators_config['MACD']['thresholds']
    if 'BB' in indicators_config:
        param_grid['bb_period'] = indicators_config['BB']['periods']
        param_grid['bb_std_dev'] = indicators_config['BB']['std_devs']
    if 'ATR' in indicators_config:
        param_grid['atr_period'] = indicators_config['ATR']['periods']
        param_grid['use_atr_stop'] = [True, False]
        param_grid['atr_multiplier'] = config.get('atr_multiplier', [1, 1.5, 2])

    # Include weights
    param_grid.update(strategy_config.get('weights', {}))

    # Implement random search to limit the number of combinations
    total_combinations = 100  # Set a limit
    param_combinations = []
    param_names = list(param_grid.keys())
    for _ in range(total_combinations):
        params = {name: random.choice(values) for name, values in param_grid.items()}
        param_combinations.append(params)

    logging.info(f"Total parameter combinations to test: {len(param_combinations)}")
    results = []

    for i, params in enumerate(param_combinations):
        logging.info(f"Testing combination {i + 1}/{len(param_combinations)}")
        result = run_single_test(params, data)
        if result[1] is not None:
            results.append(result)

    return results

In [None]:
# Execute the hyperparameter testing
logging.info("Executing hyperparameter testing...")
start_time = datetime.now()
results = run_tests()
end_time = datetime.now()

# Check if we have results
if results:
    # Analyze results
    metrics_list = []
    for r in results:
        metrics = r[3]
        metrics['params'] = r[0]
        metrics_list.append(metrics)
    results_df = pd.DataFrame(metrics_list).sort_values(by="final_value", ascending=False)

    # Extract best params and portfolio
    best_result = results_df.iloc[0]
    best_params = best_result['params']
    best_portfolio = [r for r in results if r[0] == best_params][0][1]
    best_trade_log = [r for r in results if r[0] == best_params][0][2]

    # Detailed logging of best trade log
    logging.info("Best Trade Log:")
    for entry in best_trade_log:
        logging.info(f"Index: {entry[0]}, Action: {entry[1]}, Price: {entry[2]}, Balance: {entry[3]}, Position: {entry[4]}")

    # Plot best portfolio performance
    plt.figure(figsize=(14, 7))
    plt.plot(best_portfolio, label=f"Best Portfolio")
    plt.title("Best Portfolio Performance")
    plt.xlabel("Time")
    plt.ylabel("Portfolio Value (USD)")
    plt.legend()
    plt.grid()
    plt.show()

    # Save results and log completion time
    results_df.to_csv("backtesting_results.csv", index=False)
    logging.info(f"Hyperparameter testing completed in {end_time - start_time}.")
    logging.info(f"Best Parameters: {best_params}")
    logging.info("Results saved to 'backtesting_results.csv'.")
else:
    logging.warning("No results to display.")