In [6]:
import ccxt
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import logging
import json
from datetime import datetime, timedelta
from dataclasses import dataclass, field, asdict
from enum import Enum
from typing import List, Dict, Optional
from scipy.stats import percentileofscore

# ================== SETUP ==================
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Define the absolute path to config.json
CONFIG_PATH = r"C:\Users\drewr\OneDrive\Escritorio\Google Cloud Training\config.json"

# Load configuration from the JSON file
try:
    with open(CONFIG_PATH, 'r') as f:
        config = json.load(f)
except FileNotFoundError:
    logging.error(f"config.json file not found at {CONFIG_PATH}. Please check the path.")
    exit(1)

# Best-performing parameter configuration
BEST_PARAMS = {
    "MACD": [14, 28, 12],
    "MIN_TOTAL_SCORE": 4,
    "MIN_VOLUME_PERCENTILE": 30.0,
    "MIN_VOLATILITY_PERCENTILE": 30.0,
    "BASE_RISK_PER_TRADE": 0.03,
    "ATR_MULTIPLIER_SL": 1.0,
    "ADX_TREND_THRESHOLD": 30,
    "BB_PERIOD": 25
}

# ================== ENUMS AND DATA CLASSES ==================
class SignalType(Enum):
    LONG = 'LONG'
    SHORT = 'SHORT'
    EXIT = 'EXIT'
    NO_SIGNAL = 'NO_SIGNAL'

@dataclass
class TradingParams:
    MIN_TOTAL_SCORE: int = BEST_PARAMS["MIN_TOTAL_SCORE"]
    MIN_VOLUME_PERCENTILE: float = BEST_PARAMS["MIN_VOLUME_PERCENTILE"]
    MIN_VOLATILITY_PERCENTILE: float = BEST_PARAMS["MIN_VOLATILITY_PERCENTILE"]
    BASE_RISK_PER_TRADE: float = BEST_PARAMS["BASE_RISK_PER_TRADE"]
    ATR_MULTIPLIER_SL: float = BEST_PARAMS["ATR_MULTIPLIER_SL"]
    ADX_TREND_THRESHOLD: float = BEST_PARAMS["ADX_TREND_THRESHOLD"]
    BB_PERIOD: int = BEST_PARAMS["BB_PERIOD"]
    MAX_HOLDING_DAYS: int = 5  # Add this line to include MAX_HOLDING_DAYS

@dataclass
class TradeSignal:
    timestamp: datetime
    signal_type: SignalType
    entry_price: float
    stop_loss: float
    take_profits: List[float]
    position_size: float
    market_score: int
    market_condition: str
    risk_amount: float
    trade_id: str
    volume_percentile: float
    volatility_percentile: float
    macd_status: str
    atr: float
    ema_fast: float
    ema_slow: float
    macd_value: float
    signal_value: float
    adx_value: float
    bb_upper: float
    bb_lower: float

# ================== DATA FETCHER ==================
class DataFetcher:
    def __init__(self, symbol: str, timeframe: str = '1d'):
        self.symbol = symbol
        self.timeframe = timeframe

    def fetch_data(self, since: Optional[int] = None, limit: int = 1000) -> pd.DataFrame:
        """Fetch OHLCV data from Binance."""
        exchange = ccxt.binance()
        try:
            data = exchange.fetch_ohlcv(self.symbol, self.timeframe, since, limit)
            logging.info(f"Fetched {len(data)} rows of data for {self.symbol} ({self.timeframe})")
        except (ccxt.NetworkError, ccxt.ExchangeError) as e:
            logging.error(f"Error fetching data: {e}")
            return None

        df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True)
        df.set_index('timestamp', inplace=True)

        # Calculate technical indicators
        df['volume_usd'] = df['volume'] * df['close']
        df['ema_fast'] = df['close'].ewm(span=BEST_PARAMS["MACD"][0], adjust=False).mean()
        df['ema_slow'] = df['close'].ewm(span=BEST_PARAMS["MACD"][1], adjust=False).mean()
        df['macd'] = df['ema_fast'] - df['ema_slow']
        df['signal'] = df['macd'].ewm(span=BEST_PARAMS["MACD"][2], adjust=False).mean()
        df['histogram'] = df['macd'] - df['signal']
        df['prev_close'] = df['close'].shift(1)
        df['TR'] = np.maximum(df['high'] - df['low'], np.maximum(abs(df['high'] - df['prev_close']), abs(df['low'] - df['prev_close'])))
        df['ATR'] = df['TR'].rolling(window=14).mean()
        df['volume_pct'] = df['volume_usd'].rolling(30).apply(lambda x: percentileofscore(x, x[-1]))
        df['atr_pct'] = df['ATR'].rolling(30).apply(lambda x: percentileofscore(x, x[-1]))
        df['EMA_20'] = df['close'].ewm(span=20, adjust=False).mean()
        df['EMA_50'] = df['close'].ewm(span=50, adjust=False).mean()

        # Calculate ADX
        df['plus_dm'] = df['high'].diff().apply(lambda x: x if x > 0 else 0)
        df['minus_dm'] = -df['low'].diff().apply(lambda x: x if x < 0 else 0)
        df['plus_di'] = 100 * (df['plus_dm'].ewm(span=14, adjust=False).mean() / df['ATR'])
        df['minus_di'] = 100 * (df['minus_dm'].ewm(span=14, adjust=False).mean() / df['ATR'])
        df['dx'] = 100 * abs(df['plus_di'] - df['minus_di']) / (df['plus_di'] + df['minus_di'])
        df['ADX'] = df['dx'].ewm(span=14, adjust=False).mean()

        # Calculate Bollinger Bands
        df['BB_middle'] = df['close'].rolling(window=BEST_PARAMS["BB_PERIOD"]).mean()
        df['BB_std'] = df['close'].rolling(window=BEST_PARAMS["BB_PERIOD"]).std()
        df['BB_upper'] = df['BB_middle'] + (2 * df['BB_std'])
        df['BB_lower'] = df['BB_middle'] - (2 * df['BB_std'])

        logging.info(f"Data sample with indicators:\n{df.tail()}")
        return df

# ================== STRATEGY ENGINE ==================
class StrategyEngine:
    def __init__(self, params: TradingParams, timeframe: str = '1d'):
        self.params = params
        self.timeframe = timeframe
        self._adjust_parameters_for_timeframe()

    def _adjust_parameters_for_timeframe(self):
        """Dynamically adjust parameters based on timeframe."""
        if self.timeframe == '1h':
            self.params.BB_PERIOD = 20
            self.params.ADX_TREND_THRESHOLD = 20
            self.params.ATR_MULTIPLIER_SL = 1.0
        elif self.timeframe == '4h':
            self.params.BB_PERIOD = 20
            self.params.ADX_TREND_THRESHOLD = 22
            self.params.ATR_MULTIPLIER_SL = 1.2

    def calculate_stop_loss(self, entry_price: float, atr: float, signal_type: SignalType) -> float:
        """Calculate stop-loss based on ATR multiplier."""
        if signal_type == SignalType.LONG:
            return entry_price - (atr * self.params.ATR_MULTIPLIER_SL)
        return entry_price + (atr * self.params.ATR_MULTIPLIER_SL)

    def calculate_take_profits(self, entry_price: float, stop_loss: float, signal_type: SignalType) -> List[float]:
        """Calculate take-profit levels based on risk-reward ratios."""
        risk = abs(entry_price - stop_loss)
        if signal_type == SignalType.LONG:
            return [entry_price + (risk * r) for r in [1.5, 2.5, 3.5]]
        return [entry_price - (risk * r) for r in [1.5, 2.5, 3.5]]

    def calculate_position_size(self, account_size: float, entry_price: float, stop_loss: float, risk_percentage: float) -> float:
        """Calculate position size based on risk percentage."""
        risk_amount = account_size * risk_percentage
        risk_per_unit = abs(entry_price - stop_loss)
        return risk_amount / risk_per_unit if risk_per_unit != 0 else 0

    def generate_trade_signal(self, df: pd.DataFrame, account_size: float = 100000) -> Optional[TradeSignal]:
        """Generate a trade signal based on the strategy logic."""
        current = df.iloc[-1]
        logging.info(f"Current data:\n{current}")

        # Check volume/volatility thresholds
        if (current['volume_pct'] < self.params.MIN_VOLUME_PERCENTILE or
            current['atr_pct'] < self.params.MIN_VOLATILITY_PERCENTILE):
            logging.info("Volume or volatility below threshold. No signal generated.")
            return None

        # Determine market condition
        is_sideways = current['ADX'] < self.params.ADX_TREND_THRESHOLD
        is_bullish_trend = current['EMA_20'] > current['EMA_50'] and current['ADX'] >= self.params.ADX_TREND_THRESHOLD
        is_bearish_trend = current['EMA_20'] < current['EMA_50'] and current['ADX'] >= self.params.ADX_TREND_THRESHOLD

        # Initialize variables
        signal_type = None
        stop_loss = None
        risk_percentage = None
        near_support = False
        near_resistance = False
        macd_turning_up = False
        macd_turning_down = False

        # --- Sideways Market Strategy ---
        if is_sideways:
            macd_turning_up = (current['histogram'] > 0) and (df['histogram'].iloc[-2] <= 0)
            macd_turning_down = (current['histogram'] < 0) and (df['histogram'].iloc[-2] >= 0)

            buffer = current['ATR'] * 0.5
            near_support = current['close'] <= (current['BB_lower'] + buffer)
            near_resistance = current['close'] >= (current['BB_upper'] - buffer)

        if near_support and macd_turning_up:
            signal_type = SignalType.LONG
            stop_loss = current['BB_lower'] - (current['ATR'] * 1.0)
            risk_percentage = self.params.BASE_RISK_PER_TRADE
        elif near_resistance and macd_turning_down:
            signal_type = SignalType.SHORT
            stop_loss = current['BB_upper'] + (current['ATR'] * 1.0)
            risk_percentage = self.params.BASE_RISK_PER_TRADE

        # --- Trend-Following Strategy ---
        elif is_bullish_trend and current['macd'] > current['signal']:
            signal_type = SignalType.LONG
            stop_loss = self.calculate_stop_loss(current['close'], current['ATR'], signal_type)
            risk_percentage = self.params.BASE_RISK_PER_TRADE
        elif is_bearish_trend and current['macd'] < current['signal']:
            signal_type = SignalType.SHORT
            stop_loss = self.calculate_stop_loss(current['close'], current['ATR'], signal_type)
            risk_percentage = self.params.BASE_RISK_PER_TRADE

        if not signal_type:
            return None

        # Position sizing
        position_size = self.calculate_position_size(account_size, current['close'], stop_loss, risk_percentage)
        take_profits = self.calculate_take_profits(current['close'], stop_loss, signal_type)

        # Generate unique trade ID
        trade_id = f"{signal_type.value}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

        # Calculate market score based on conditions
        market_score = sum([
            1 if current['volume_pct'] > self.params.MIN_VOLUME_PERCENTILE else 0,
            1 if current['atr_pct'] > self.params.MIN_VOLATILITY_PERCENTILE else 0,
            1 if is_sideways and (near_support or near_resistance) else 0,
            1 if (is_bullish_trend or is_bearish_trend) else 0
        ])

        # Determine market condition string
        if is_sideways:
            market_condition = "Sideways"
        elif is_bullish_trend:
            market_condition = "Bullish"
        else:
            market_condition = "Bearish"

        # Create and return the TradeSignal
        return TradeSignal(
            timestamp=df.index[-1],
            signal_type=signal_type,
            entry_price=current['close'],
            stop_loss=stop_loss,
            take_profits=take_profits,
            position_size=position_size,
            market_score=market_score,
            market_condition=market_condition,
            risk_amount=account_size * risk_percentage,
            trade_id=trade_id,
            volume_percentile=current['volume_pct'],
            volatility_percentile=current['atr_pct'],
            macd_status="Bullish" if current['macd'] > current['signal'] else "Bearish",
            atr=current['ATR'],
            ema_fast=current['ema_fast'],
            ema_slow=current['ema_slow'],
            macd_value=current['macd'],
            signal_value=current['signal'],
            adx_value=current['ADX'],
            bb_upper=current['BB_upper'],
            bb_lower=current['BB_lower']
        )

# ================== TRADE MANAGER ==================
class TradeManager:
    def __init__(self, trading_rules: StrategyEngine):
        self.trading_rules = trading_rules
        self.active_trades: Dict[str, TradeSignal] = {}
        self.trade_history: List[Dict] = []

    def backtest(self, df: pd.DataFrame):
        """Backtest the strategy on historical data."""
        logging.info(f"Starting backtest with {len(df)} rows of data.")
        for i in range(len(df)):
            current_time = df.index[i]
            current_data = df.iloc[:i+1]
            self.update(current_data, current_time)

    def update(self, df: pd.DataFrame, current_time: datetime):
        """Update trade management system with new data."""
        self._check_exits(df, current_time)
        if len(self.active_trades) < 3:  # Maximum 3 concurrent trades
            signal = self.trading_rules.generate_trade_signal(df)
            if signal:
                self._process_new_signal(signal)

    def _check_exits(self, df: pd.DataFrame, current_time: datetime):
        """Check all active trades for exit signals."""
        current_price = df.iloc[-1]['close']
        for trade_id, trade in list(self.active_trades.items()):
            exit_signal = self._check_exit_conditions(df, trade, current_time, current_price)
            if exit_signal:
                self._close_trade(trade_id, current_price, current_time, exit_signal)

    def _check_exit_conditions(self, df: pd.DataFrame, trade: TradeSignal, current_time: datetime, current_price: float) -> Optional[str]:
        """Check various exit conditions."""
        if (trade.signal_type == SignalType.LONG and current_price <= trade.stop_loss or
            trade.signal_type == SignalType.SHORT and current_price >= trade.stop_loss):
            return "Stop Loss"
        for i, tp in enumerate(trade.take_profits):
            if (trade.signal_type == SignalType.LONG and current_price >= tp or
                trade.signal_type == SignalType.SHORT and current_price <= tp):
                return f"Take Profit {i+1}"
        if (current_time - trade.timestamp) > timedelta(days=self.trading_rules.params.MAX_HOLDING_DAYS):
            return "Max Hold Time"
        return None

    def _process_new_signal(self, signal: TradeSignal):
        """Process a new trade signal."""
        self.active_trades[signal.trade_id] = signal

    def _close_trade(self, trade_id: str, exit_price: float, exit_time: datetime, exit_reason: str):
        """Close a trade and record it in history."""
        trade = self.active_trades[trade_id]
        pnl = (exit_price - trade.entry_price) * trade.position_size if trade.signal_type == SignalType.LONG else (trade.entry_price - exit_price) * trade.position_size
        holding_days = (exit_time - trade.timestamp).days
        trade_record = {
            'trade_id': trade_id,
            'timestamp': trade.timestamp,  # Use 'timestamp' instead of 'entry_time'
            'exit_time': exit_time,
            'signal_type': trade.signal_type.value,
            'entry_price': trade.entry_price,
            'exit_price': exit_price,
            'position_size': trade.position_size,
            'pnl': pnl,
            'holding_days': holding_days,
            'exit_reason': exit_reason,
            'market_score': trade.market_score,
            'risk_amount': trade.risk_amount
        }
        self.trade_history.append(trade_record)
        del self.active_trades[trade_id]

    def calculate_performance_metrics(self, lookback_days: int) -> Dict:
        """Calculate performance metrics including requested metrics."""
        if not self.trade_history:
            return {
                "total_trades": 0,
                "win_rate": 0.0,
                "profit_factor": 0.0,
                "average_profit": 0.0,
                "max_drawdown": 0.0,
                "sharpe_ratio": 0.0,
                "avg_holding_time": "0 days",
                "aggregated_returns": 0.0,
                "best_trade": 0.0,
                "worst_trade": 0.0,
                "average_trade_pct": 0.0,
                "days_reviewed": lookback_days,
                "trade_details": []
            }

        df = pd.DataFrame(self.trade_history)
        df['pnl_pct'] = df['pnl'] / df['risk_amount'] * 100

        # Calculate cumulative PnL
        df['cumulative_pnl'] = df['pnl'].cumsum()

        # Add trade details
        trade_details = df[['timestamp', 'exit_time', 'entry_price', 'exit_price', 'pnl', 'holding_days', 'exit_reason']].to_dict('records')

        return {
            "total_trades": len(df),
            "win_rate": (len(df[df['pnl'] > 0]) / len(df)) * 100,
            "profit_factor": df[df['pnl'] > 0]['pnl'].sum() / abs(df[df['pnl'] <= 0]['pnl'].sum()),
            "average_profit": df['pnl'].mean(),
            "max_drawdown": (df['cumulative_pnl'].max() - df['cumulative_pnl'].min()),
            "sharpe_ratio": np.sqrt(252) * (df['pnl'].mean() / df['pnl'].std()),
            "avg_holding_time": str((df['exit_time'] - df['timestamp']).mean()),
            "aggregated_returns": df['pnl'].sum(),
            "best_trade": df['pnl'].max(),
            "worst_trade": df['pnl'].min(),
            "average_trade_pct": df['pnl_pct'].mean(),
            "days_reviewed": lookback_days,
            "trade_details": trade_details
        }

# ================== MAIN FUNCTION ==================
def main():
    symbol = 'BTC/USDT'
    account_size = 100000
    timeframe = '1d'  # Use the best-performing timeframe

    # Load configuration from the JSON file
    try:
        with open(CONFIG_PATH, 'r') as f:
            config = json.load(f)
    except FileNotFoundError:
        logging.error(f"config.json file not found at {CONFIG_PATH}. Please check the path.")
        exit(1)

    lookback_days = config.get("LOOKBACK_DAYS", 180)  # Get LOOKBACK_DAYS from config

    # Fetch data
    data_fetcher = DataFetcher(symbol, timeframe)
    df = data_fetcher.fetch_data()

    if df is not None:
        # Initialize TradingParams with the best configuration
        trading_params = TradingParams()
        strategy_engine = StrategyEngine(trading_params, timeframe)
        trade_manager = TradeManager(strategy_engine)

        # Run the backtest
        trade_manager.backtest(df)

        # Calculate performance metrics
        metrics = trade_manager.calculate_performance_metrics(lookback_days)  # Pass lookback_days
        logging.info("\nPerformance Metrics:")
        for key, value in metrics.items():
            logging.info(f"{key}: {value}")

        # Export trade details to CSV
        trade_details_df = pd.DataFrame(metrics['trade_details'])
        trade_details_df.to_csv('trade_details.csv', index=False)
        logging.info("Trade details exported to trade_details.csv")

        # Export signals for successful and failed trades
        successful_trades = trade_details_df[trade_details_df['pnl'] > 0]
        failed_trades = trade_details_df[trade_details_df['pnl'] <= 0]

        successful_trades.to_csv('successful_trades.csv', index=False)
        failed_trades.to_csv('failed_trades.csv', index=False)
        logging.info("Successful trades exported to successful_trades.csv")
        logging.info("Failed trades exported to failed_trades.csv")

        # Export signals and parameters to CSV
        signals_df = pd.DataFrame([asdict(TradeSignal(
            timestamp=trade['timestamp'],  # Map 'timestamp' from trade_history
            signal_type=SignalType(trade['signal_type']),  # Convert string to SignalType enum
            entry_price=trade['entry_price'],
            stop_loss=None,  # Add missing required fields with default values
            take_profits=[],
            position_size=trade['position_size'],
            market_score=trade['market_score'],
            market_condition="",  # Add missing required fields with default values
            risk_amount=trade['risk_amount'],
            trade_id=trade['trade_id'],
            volume_percentile=0.0,  # Add missing required fields with default values
            volatility_percentile=0.0,
            macd_status="",
            atr=0.0,
            ema_fast=0.0,
            ema_slow=0.0,
            macd_value=0.0,
            signal_value=0.0,
            adx_value=0.0,
            bb_upper=0.0,
            bb_lower=0.0
        )) for trade in trade_manager.trade_history])
        signals_df.to_csv('signals_and_parameters.csv', index=False)
        logging.info("Signals and parameters exported to signals_and_parameters.csv")

if __name__ == "__main__":
    main()

2025-02-22 06:47:08,085 - INFO - Fetched 1000 rows of data for BTC/USDT (1d)
  df['volume_pct'] = df['volume_usd'].rolling(30).apply(lambda x: percentileofscore(x, x[-1]))
  df['atr_pct'] = df['ATR'].rolling(30).apply(lambda x: percentileofscore(x, x[-1]))
2025-02-22 06:47:08,275 - INFO - Data sample with indicators:
                               open      high       low     close  \
timestamp                                                           
2025-02-18 00:00:00+00:00  95780.01  96753.91  93388.09  95671.74   
2025-02-19 00:00:00+00:00  95671.74  96899.99  95029.99  96644.37   
2025-02-20 00:00:00+00:00  96644.37  98711.36  96415.09  98305.00   
2025-02-21 00:00:00+00:00  98305.01  99475.00  94871.95  96181.98   
2025-02-22 00:00:00+00:00  96181.99  96595.00  95770.49  96573.89   

                                volume    volume_usd      ema_fast  \
timestamp                                                            
2025-02-18 00:00:00+00:00  23368.19471  2.235676e+09  971