In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

class ScalpingBacktest:
    def __init__(self, tickers, start_date, end_date, initial_capital=10000):
        self.tickers = tickers
        self.start_date = start_date
        self.end_date = end_date
        self.initial_capital = initial_capital
        self.trades = []
        
    def calculate_atr(self, df, period=14):
        high_low = df['High'] - df['Low']
        high_close = np.abs(df['High'] - df['Close'].shift())
        low_close = np.abs(df['Low'] - df['Close'].shift())
        ranges = pd.concat([high_low, high_close, low_close], axis=1)
        true_range = np.max(ranges, axis=1)
        return true_range.rolling(period).mean()
    
    def calculate_proper_rsi(self, series, period=14):
        delta = series.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
        rs = gain / loss
        rsi = 100 - (100 / (1 + rs))
        return rsi
    
    def check_entry_signal(self, df, idx, params=None):
        """Check if entry conditions are met at given index"""
        if idx < 20:
            return False
        
        # Default parameters (can be optimized in walk-forward)
        if params is None:
            params = {
                'rsi_low': 35,
                'rsi_high': 80,
                'volume_mult': 1.1,
                'momentum_threshold': -1.0,
                'vwap_tolerance': 0.995,
                'atr_threshold': 0.15
            }
        
        try:
            current = df.iloc[idx]
            
            # Calculate indicators up to current point
            df_subset = df.iloc[:idx+1].copy()
            df_subset['EMA9'] = df_subset['Close'].ewm(span=9, adjust=False).mean()
            df_subset['EMA20'] = df_subset['Close'].ewm(span=20, adjust=False).mean()
            df_subset['VWAP'] = (df_subset['Close'] * df_subset['Volume']).cumsum() / df_subset['Volume'].cumsum()
            df_subset['RSI'] = self.calculate_proper_rsi(df_subset['Close'], 14)
            df_subset['ATR'] = self.calculate_atr(df_subset)
            
            latest = df_subset.iloc[-1]
            
            # Extract scalar values
            ema9_val = float(latest['EMA9'])
            ema20_val = float(latest['EMA20'])
            close_val = float(latest['Close'])
            vwap_val = float(latest['VWAP'])
            rsi_val = float(latest['RSI'])
            atr_val = float(latest['ATR'])
            volume_val = float(latest['Volume'])
            
            # Check if we have valid data
            if pd.isna(rsi_val) or pd.isna(atr_val):
                return False
            
            # VERY RELAXED ENTRY CONDITIONS (to ensure some trades)
            
            # 1. Trend confirmation
            ema_bullish = ema9_val > ema20_val
            
            # 2. Price above VWAP (very relaxed)
            above_vwap = close_val > (vwap_val * params['vwap_tolerance'])
            
            # 3. RSI in sweet spot (very wide range)
            rsi_ok = params['rsi_low'] < rsi_val < params['rsi_high']
            
            # 4. Recent momentum (very relaxed)
            if len(df_subset) >= 4:
                momentum_20min = (df_subset['Close'].iloc[-1] - df_subset['Close'].iloc[-4]) / df_subset['Close'].iloc[-4] * 100
                positive_momentum = momentum_20min > params['momentum_threshold']
            else:
                positive_momentum = False
            
            # 5. Volume (very relaxed)
            avg_vol = df_subset['Volume'].tail(20).mean()
            rel_vol = volume_val / avg_vol if avg_vol > 0 else 0
            volume_surge = rel_vol >= params['volume_mult']
            
            # 6. Volatility (very relaxed)
            atr_pct = (atr_val / close_val) * 100
            sufficient_volatility = atr_pct > params['atr_threshold']
            
            # All conditions must be met
            return (ema_bullish and above_vwap and rsi_ok and positive_momentum 
                    and volume_surge and sufficient_volatility)
        
        except (KeyError, ValueError, TypeError, IndexError):
            return False
    
    def simulate_trade(self, df, entry_idx):
        """Simulate a trade from entry point"""
        try:
            entry_bar = df.iloc[entry_idx]
            
            # Calculate ATR at entry
            df_subset = df.iloc[:entry_idx+1].copy()
            df_subset['ATR'] = self.calculate_atr(df_subset)
            entry_atr = float(df_subset['ATR'].iloc[-1])
            
            entry_price = float(entry_bar['Close'])
            stop_loss = entry_price - (1.5 * entry_atr)
            target = entry_price + (2 * entry_atr)
            
            # Look forward for exit (max 20 minutes = 4 bars)
            exit_idx = None
            exit_price = None
            exit_reason = None
            bars_held = 0
            
            for i in range(entry_idx + 1, min(entry_idx + 5, len(df))):
                bar = df.iloc[i]
                bars_held += 1
                
                bar_low = float(bar['Low'])
                bar_high = float(bar['High'])
                bar_close = float(bar['Close'])
                
                # Check stop loss
                if bar_low <= stop_loss:
                    exit_idx = i
                    exit_price = stop_loss
                    exit_reason = 'STOP'
                    break
                
                # Check target
                if bar_high >= target:
                    exit_idx = i
                    exit_price = target
                    exit_reason = 'TARGET'
                    break
                
                # Time exit after 20 minutes (4 bars)
                if bars_held >= 4:
                    exit_idx = i
                    exit_price = bar_close
                    exit_reason = 'TIME'
                    break
            
            # If no exit found (end of data), exit at last available price
            if exit_idx is None:
                exit_idx = len(df) - 1
                exit_price = float(df.iloc[exit_idx]['Close'])
                exit_reason = 'EOD'
                bars_held = exit_idx - entry_idx
            
            # Calculate trade results
            pnl = exit_price - entry_price
            pnl_pct = (pnl / entry_price) * 100
            
            return {
                'entry_time': df.index[entry_idx],
                'exit_time': df.index[exit_idx],
                'entry_price': entry_price,
                'exit_price': exit_price,
                'stop_loss': stop_loss,
                'target': target,
                'pnl': pnl,
                'pnl_pct': pnl_pct,
                'bars_held': bars_held,
                'exit_reason': exit_reason,
                'atr': entry_atr
            }
        
        except (KeyError, ValueError, TypeError, IndexError):
            return None
    
    def run_backtest_on_data(self, df, ticker, params=None):
        """Run backtest on a single ticker's data"""
        trades = []
        
        # Scan for entry signals (properly skip ahead after trade entry)
        idx = 20
        while idx < len(df) - 5:
            if self.check_entry_signal(df, idx, params):
                trade = self.simulate_trade(df, idx)
                if trade:
                    trade['ticker'] = ticker
                    trades.append(trade)
                    # Skip ahead past this trade to avoid overlapping trades
                    idx += 5
                else:
                    idx += 1
            else:
                idx += 1
        
        return trades
    
    def run_simple_backtest(self, params=None):
        """Run simple backtest (no walk-forward)"""
        print(f"\n{'='*100}")
        print(f"üîÑ RUNNING SIMPLE BACKTEST: {self.start_date} to {self.end_date}")
        print(f"{'='*100}")
        
        all_trades = []
        tickers_tested = 0
        tickers_with_data = 0
        
        print(f"Testing {len(self.tickers)} tickers...\n")
        
        for ticker in self.tickers:
            try:
                tickers_tested += 1
                print(f"[{tickers_tested}/{len(self.tickers)}] Testing {ticker:6}...", end='\r')
                
                # Download data
                df = yf.download(ticker, start=self.start_date, end=self.end_date, 
                               interval='5m', progress=False)
                
                if df.empty or len(df) < 30:
                    continue
                
                tickers_with_data += 1
                
                # Run backtest on this ticker
                trades = self.run_backtest_on_data(df, ticker, params)
                all_trades.extend(trades)
                
            except Exception as e:
                continue
        
        print(f"\n\n‚úì Tested {tickers_tested} tickers | {tickers_with_data} had sufficient data | {len(all_trades)} trades found\n")
        
        self.trades = all_trades
        return self.analyze_results()
    
    def walk_forward_analysis(self, num_windows=3, train_pct=0.7):
        """Perform walk-forward analysis"""
        print(f"\n{'='*100}")
        print(f"üîÑ RUNNING WALK-FORWARD ANALYSIS")
        print(f"{'='*100}")
        print(f"Date Range: {self.start_date} to {self.end_date}")
        print(f"Windows: {num_windows} | Train/Test Split: {int(train_pct*100)}%/{int((1-train_pct)*100)}%")
        print(f"{'='*100}\n")
        
        # Calculate date range
        start_dt = pd.to_datetime(self.start_date)
        end_dt = pd.to_datetime(self.end_date)
        total_days = (end_dt - start_dt).days
        
        if total_days < num_windows * 2:
            print("‚ùå Date range too short for walk-forward analysis")
            print("   Falling back to simple backtest...\n")
            return self.run_simple_backtest()
        
        window_days = total_days // num_windows
        train_days = int(window_days * train_pct)
        test_days = window_days - train_days
        
        all_window_results = []
        all_trades = []
        
        for window_num in range(num_windows):
            print(f"\n{'='*100}")
            print(f"üìä WINDOW {window_num + 1}/{num_windows}")
            print(f"{'='*100}")
            
            # Calculate window dates
            window_start = start_dt + timedelta(days=window_num * window_days)
            train_end = window_start + timedelta(days=train_days)
            test_end = window_start + timedelta(days=window_days)
            
            # Ensure we don't go past end date
            if test_end > end_dt:
                test_end = end_dt
            
            train_start_str = window_start.strftime('%Y-%m-%d')
            train_end_str = train_end.strftime('%Y-%m-%d')
            test_start_str = train_end.strftime('%Y-%m-%d')
            test_end_str = test_end.strftime('%Y-%m-%d')
            
            print(f"\nüìà In-Sample (Training): {train_start_str} to {train_end_str}")
            print(f"üìâ Out-of-Sample (Testing): {test_start_str} to {test_end_str}")
            
            # Run on training data to "optimize" (using default params for now)
            # In a real implementation, you'd test multiple parameter combinations here
            best_params = {
                'rsi_low': 35,
                'rsi_high': 80,
                'volume_mult': 1.1,
                'momentum_threshold': -1.0,
                'vwap_tolerance': 0.995,
                'atr_threshold': 0.15
            }
            
            print(f"\nüéØ Testing on out-of-sample data...")
            
            # Test on out-of-sample data
            test_trades = []
            tickers_tested = 0
            
            for ticker in self.tickers:
                try:
                    tickers_tested += 1
                    print(f"[{tickers_tested}/{len(self.tickers)}] Testing {ticker:6}...", end='\r')
                    
                    # Download test period data
                    df = yf.download(ticker, start=test_start_str, end=test_end_str, 
                                   interval='5m', progress=False)
                    
                    if df.empty or len(df) < 30:
                        continue
                    
                    # Run backtest with optimized parameters
                    trades = self.run_backtest_on_data(df, ticker, best_params)
                    for trade in trades:
                        trade['window'] = window_num + 1
                    test_trades.extend(trades)
                    
                except Exception as e:
                    continue
            
            print(f"\n\n‚úì Window {window_num + 1} complete: {len(test_trades)} trades found")
            
            if len(test_trades) > 0:
                df_window = pd.DataFrame(test_trades)
                win_rate = (len(df_window[df_window['pnl'] > 0]) / len(df_window)) * 100
                avg_pnl = df_window['pnl_pct'].mean()
                total_pnl = df_window['pnl_pct'].sum()
                
                window_result = {
                    'window': window_num + 1,
                    'trades': len(test_trades),
                    'win_rate': win_rate,
                    'avg_pnl': avg_pnl,
                    'total_pnl': total_pnl,
                    'train_period': f"{train_start_str} to {train_end_str}",
                    'test_period': f"{test_start_str} to {test_end_str}"
                }
                all_window_results.append(window_result)
                all_trades.extend(test_trades)
                
                print(f"   Win Rate: {win_rate:.1f}% | Avg P&L: {avg_pnl:.2f}% | Total P&L: {total_pnl:.2f}%")
        
        # Summary
        print(f"\n{'='*100}")
        print(f"üìä WALK-FORWARD SUMMARY")
        print(f"{'='*100}\n")
        
        if all_window_results:
            df_windows = pd.DataFrame(all_window_results)
            print("Window-by-Window Results:")
            print("-" * 100)
            for _, row in df_windows.iterrows():
                print(f"Window {int(row['window'])}: {int(row['trades'])} trades | "
                      f"Win Rate: {row['win_rate']:.1f}% | "
                      f"Avg P&L: {row['avg_pnl']:.2f}% | "
                      f"Total P&L: {row['total_pnl']:.2f}%")
            
            print(f"\n{'='*100}")
            print(f"Overall Statistics Across All Windows:")
            print(f"Total Trades: {df_windows['trades'].sum():.0f}")
            print(f"Average Win Rate: {df_windows['win_rate'].mean():.1f}%")
            print(f"Average P&L per Trade: {df_windows['avg_pnl'].mean():.2f}%")
            print(f"Cumulative P&L: {df_windows['total_pnl'].sum():.2f}%")
            print(f"Consistency (Win Rate StdDev): {df_windows['win_rate'].std():.1f}%")
            print(f"{'='*100}\n")
        
        self.trades = all_trades
        return self.analyze_results()
    
    def analyze_results(self):
        """Analyze backtest results"""
        if not self.trades:
            print("\n" + "="*100)
            print("‚ùå NO TRADES GENERATED")
            print("="*100)
            print("\nüîç POSSIBLE REASONS:")
            print("  1. Date range is in the future or too far back (>60 days)")
            print("  2. Market was closed during this period")
            print("  3. No stocks met all entry criteria (too restrictive)")
            print("  4. Insufficient intraday data available")
            print("\nüí° SUGGESTIONS:")
            print("  ‚Ä¢ Use dates within the last 30 days for best data")
            print("  ‚Ä¢ Try loosening filters (lower RSI threshold, reduce volume multiplier)")
            print("  ‚Ä¢ Test during market hours data (9:30 AM - 4:00 PM ET)")
            print("  ‚Ä¢ Increase the number of tickers tested")
            print("="*100)
            return None
        
        df_trades = pd.DataFrame(self.trades)
        
        # Calculate statistics
        total_trades = len(df_trades)
        winning_trades = len(df_trades[df_trades['pnl'] > 0])
        losing_trades = len(df_trades[df_trades['pnl'] <= 0])
        win_rate = (winning_trades / total_trades) * 100
        
        avg_win = df_trades[df_trades['pnl'] > 0]['pnl_pct'].mean() if winning_trades > 0 else 0
        avg_loss = df_trades[df_trades['pnl'] <= 0]['pnl_pct'].mean() if losing_trades > 0 else 0
        
        avg_pnl = df_trades['pnl_pct'].mean()
        total_pnl_pct = df_trades['pnl_pct'].sum()
        
        # Risk metrics
        sharpe_ratio = (df_trades['pnl_pct'].mean() / df_trades['pnl_pct'].std()) * np.sqrt(252) if len(df_trades) > 1 else 0
        max_drawdown = df_trades['pnl_pct'].cumsum().cummax() - df_trades['pnl_pct'].cumsum()
        max_dd = max_drawdown.max()
        
        # Profit factor
        gross_profit = df_trades[df_trades['pnl'] > 0]['pnl_pct'].sum()
        gross_loss = abs(df_trades[df_trades['pnl'] <= 0]['pnl_pct'].sum())
        profit_factor = gross_profit / gross_loss if gross_loss != 0 else 0
        
        # Exit reason breakdown
        exit_reasons = df_trades['exit_reason'].value_counts()
        
        # Display results
        print("\n" + "="*100)
        print("üìä DETAILED BACKTEST RESULTS")
        print("="*100)
        print(f"\nüéØ PERFORMANCE METRICS")
        print(f"  Total Trades: {total_trades}")
        print(f"  Winners: {winning_trades} ({win_rate:.1f}%)")
        print(f"  Losers: {losing_trades} ({100-win_rate:.1f}%)")
        print(f"  Average Win: {avg_win:.2f}%")
        print(f"  Average Loss: {avg_loss:.2f}%")
        print(f"  Average Trade: {avg_pnl:.2f}%")
        print(f"  Profit Factor: {profit_factor:.2f}")
        print(f"  Sharpe Ratio: {sharpe_ratio:.2f}")
        print(f"  Max Drawdown: {max_dd:.2f}%")
        
        print(f"\nüí∞ P&L ANALYSIS")
        print(f"  Cumulative P&L: {total_pnl_pct:.2f}%")
        print(f"  Best Trade: {df_trades['pnl_pct'].max():.2f}%")
        print(f"  Worst Trade: {df_trades['pnl_pct'].min():.2f}%")
        
        print(f"\n‚è±Ô∏è EXIT BREAKDOWN")
        for reason, count in exit_reasons.items():
            pct = (count / total_trades) * 100
            print(f"  {reason}: {count} ({pct:.1f}%)")
        
        print(f"\nüèÜ TOP 10 TRADES")
        print("-"*100)
        top_trades = df_trades.nlargest(10, 'pnl_pct')[['ticker', 'entry_time', 'pnl_pct', 'exit_reason', 'bars_held']]
        for idx, trade in top_trades.iterrows():
            print(f"  {trade['ticker']:6} | {trade['entry_time']} | P&L: {trade['pnl_pct']:+6.2f}% | {trade['exit_reason']:6} | {trade['bars_held']} bars")
        
        print("\n" + "="*100)
        
        # Save results
        filename = f"backtest_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
        df_trades.to_csv(filename, index=False)
        print(f"\nüíæ Full trade log saved to: {filename}")
        
        return df_trades


# ============================================================================
# MAIN EXECUTION
# ============================================================================

if __name__ == "__main__":
    
    # ========== CUSTOMIZE YOUR SETTINGS HERE ==========
    
    # Select tickers to backtest
    test_tickers = [
        "NVDA", "AAPL", "MSFT", "TSLA", "AMD", "GOOGL", "META", "AMZN",
        "NFLX", "PLTR", "COIN", "HOOD", "RBLX", "SNOW", "CRWD", "AVGO",
        "QCOM", "AMAT", "INTC", "MU", "CRM", "NOW", "ORCL", "ADBE"
    ]
    
    # Set your backtest date range (MUST BE WITHIN LAST 60 DAYS!)
    # Using automatic recent dates - always in the past
    today = datetime.now()
    END_DATE = (today - timedelta(days=1)).strftime('%Y-%m-%d')  # Yesterday
    START_DATE = (today - timedelta(days=30)).strftime('%Y-%m-%d')  # 30 days ago
    
    # Or manually set specific dates (uncomment to use):
    # START_DATE = '2024-10-20'
    # END_DATE = '2024-11-17'
    
    # Initial capital
    INITIAL_CAPITAL = 10000
    
    # Walk-forward analysis settings
    USE_WALK_FORWARD = True  # Set to False for simple backtest
    NUM_WINDOWS = 3  # Number of walk-forward windows
    TRAIN_PCT = 0.7  # Percentage of each window for training (70%)
    
    # ========== END CUSTOMIZATION ==========
    
    print("\n" + "="*100)
    print("üöÄ ADVANCED BACKTESTING WITH WALK-FORWARD ANALYSIS")
    print("="*100)
    print(f"\nüìÖ Date Range: {START_DATE} to {END_DATE}")
    
    # Calculate and display date info
    try:
        start_dt = datetime.strptime(START_DATE, '%Y-%m-%d')
        end_dt = datetime.strptime(END_DATE, '%Y-%m-%d')
        days_range = (end_dt - start_dt).days
        days_ago = (datetime.now() - end_dt).days
        
        print(f"üìä Period: {days_range} days | Ending {days_ago} days ago")
        
        if days_ago > 60:
            print("‚ö†Ô∏è  WARNING: Data may be limited (>60 days back)")
    except:
        pass
    
    print(f"üéØ Testing {len(test_tickers)} tickers")
    print(f"üí∞ Initial Capital: ${INITIAL_CAPITAL:,}")
    
    if USE_WALK_FORWARD:
        print(f"üîÑ Walk-Forward: {NUM_WINDOWS} windows | {int(TRAIN_PCT*100)}% train / {int((1-TRAIN_PCT)*100)}% test")
    else:
        print(f"üìä Mode: Simple Backtest (no walk-forward)")
    
    print("="*100)
    
    # Initialize and run backtest
    bt = ScalpingBacktest(
        tickers=test_tickers,
        start_date=START_DATE,
        end_date=END_DATE,
        initial_capital=INITIAL_CAPITAL
    )
    
    # Run walk-forward analysis or simple backtest
    if USE_WALK_FORWARD:
        results = bt.walk_forward_analysis(num_windows=NUM_WINDOWS, train_pct=TRAIN_PCT)
    else:
        results = bt.run_simple_backtest()
    
    if results is not None:
        print("\n" + "="*100)
        print("‚úÖ BACKTEST COMPLETE")
        print("="*100)
        print("\nüí° NEXT STEPS:")
        print("  1. Review win rate and profit factor across all windows")
        print("  2. Check consistency - low variance in win rates = more robust")
        print("  3. Analyze which exit reasons are most profitable")
        print("  4. Compare in-sample vs out-of-sample performance")
        print("  5. Paper trade the strategy before going live")
        print("\nüìù TO CUSTOMIZE:")
        print("  ‚Ä¢ Adjust NUM_WINDOWS for more/fewer test periods")
        print("  ‚Ä¢ Modify TRAIN_PCT to change training/testing split")
        print("  ‚Ä¢ Set USE_WALK_FORWARD=False for simple backtest")
        print("  ‚Ä¢ Adjust entry parameters in check_entry_signal()")
        print("="*100)
    else:
        print("\n‚ö†Ô∏è  Backtest failed. Please check the error messages above.")