# MT5 AI Trading System - Complete Implementation
## XAUUSD Price Prediction & Signal Generation

This notebook implements a complete AI trading system with:
- MT5 data integration
- Technical indicators (BB, MACD, RSI, ADX, ATR)
- ONNX price prediction model
- Signal generation with confidence scoring
- Backtesting and performance analysis

## 1. Setup and Installation

In [None]:
# Install required packages
!pip install MetaTrader5 pandas numpy scikit-learn onnx onnxruntime ta matplotlib seaborn plotly skl2onnx

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

print("✅ Packages installed successfully")

## 2. Data Generation (Simulated MT5 Data)

In [None]:
# Generate realistic XAUUSD data for demonstration
def generate_xauusd_data(n_bars=5000):
    np.random.seed(42)
    
    # Base parameters for XAUUSD
    base_price = 2000.0
    volatility = 0.02
    
    # Generate time series
    dates = pd.date_range(start='2023-01-01', periods=n_bars, freq='5min')
    
    # Generate price movements with trend and volatility
    returns = np.random.normal(0, volatility/100, n_bars)
    
    # Add some trend components
    trend = np.sin(np.arange(n_bars) * 2 * np.pi / 1000) * 0.001
    returns += trend
    
    # Calculate prices
    prices = [base_price]
    for i in range(1, n_bars):
        new_price = prices[-1] * (1 + returns[i])
        prices.append(new_price)
    
    # Generate OHLC data
    data = []
    for i in range(n_bars):
        close = prices[i]
        open_price = close * (1 + np.random.normal(0, 0.0002))
        high = max(open_price, close) * (1 + abs(np.random.normal(0, 0.0005)))
        low = min(open_price, close) * (1 - abs(np.random.normal(0, 0.0005)))
        volume = np.random.randint(100, 1000)
        
        data.append({
            'time': dates[i],
            'open': open_price,
            'high': high,
            'low': low,
            'close': close,
            'tick_volume': volume
        })
    
    return pd.DataFrame(data)

# Generate data
df = generate_xauusd_data(5000)
print(f"✅ Generated {len(df)} bars of XAUUSD data")
print(f"Price range: ${df['close'].min():.2f} - ${df['close'].max():.2f}")

# Display first few rows
df.head()

## 3. Technical Indicators Implementation

In [None]:
import ta

class TechnicalIndicators:
    @staticmethod
    def add_bollinger_bands(df, period=20, std=2):
        df['bb_upper'] = ta.volatility.bollinger_hband(df['close'], window=period, window_dev=std)
        df['bb_middle'] = ta.volatility.bollinger_mavg(df['close'], window=period)
        df['bb_lower'] = ta.volatility.bollinger_lband(df['close'], window=period, window_dev=std)
        df['bb_signal'] = 0
        df.loc[df['close'] > df['bb_upper'], 'bb_signal'] = -1
        df.loc[df['close'] < df['bb_lower'], 'bb_signal'] = 1
        df.loc[(df['close'].shift(1) < df['bb_middle']) & (df['close'] > df['bb_middle']), 'bb_signal'] = 1
        df.loc[(df['close'].shift(1) > df['bb_middle']) & (df['close'] < df['bb_middle']), 'bb_signal'] = -1
        return df
    
    @staticmethod
    def add_macd(df, fast=12, slow=26, signal=9):
        df['macd'] = ta.trend.macd_diff(df['close'], window_fast=fast, window_slow=slow, window_sign=signal)
        df['macd_signal'] = ta.trend.macd_signal(df['close'], window_fast=fast, window_slow=slow, window_sign=signal)
        df['macd_histogram'] = ta.trend.macd_diff(df['close'], window_fast=fast, window_slow=slow, window_sign=signal)
        df['macd_buy'] = ((df['macd'] > df['macd_signal']) & (df['macd'].shift(1) <= df['macd_signal'].shift(1))).astype(int)
        df['macd_sell'] = ((df['macd'] < df['macd_signal']) & (df['macd'].shift(1) >= df['macd_signal'].shift(1))).astype(int)
        return df
    
    @staticmethod
    def add_rsi(df, period=14):
        df['rsi'] = ta.momentum.rsi(df['close'], window=period)
        df['rsi_overbought'] = (df['rsi'] > 70).astype(int)
        df['rsi_oversold'] = (df['rsi'] < 30).astype(int)
        return df
    
    @staticmethod
    def add_adx(df, period=14):
        df['adx'] = ta.trend.adx(df['high'], df['low'], df['close'], window=period)
        df['adx_strong'] = (df['adx'] > 25).astype(int)
        return df
    
    @staticmethod
    def add_atr(df, period=14):
        df['atr'] = ta.volatility.average_true_range(df['high'], df['low'], df['close'], window=period)
        return df
    
    @staticmethod
    def add_trading_sessions(df):
        df['hour'] = df['time'].dt.hour
        df['london_session'] = ((df['hour'] >= 8) & (df['hour'] < 16)).astype(int)
        df['newyork_session'] = ((df['hour'] >= 13) & (df['hour'] < 21)).astype(int)
        df['overlap_session'] = ((df['hour'] >= 13) & (df['hour'] < 16)).astype(int)
        return df
    
    @staticmethod
    def calculate_all_indicators(df):
        df = TechnicalIndicators.add_bollinger_bands(df)
        df = TechnicalIndicators.add_macd(df)
        df = TechnicalIndicators.add_rsi(df)
        df = TechnicalIndicators.add_adx(df)
        df = TechnicalIndicators.add_atr(df)
        df = TechnicalIndicators.add_trading_sessions(df)
        return df

# Apply technical indicators
df = TechnicalIndicators.calculate_all_indicators(df)
print(f"✅ Technical indicators added")
print(f"Columns: {list(df.columns)}")

# Display sample with indicators
df[['time', 'close', 'bb_upper', 'bb_middle', 'bb_lower', 'rsi', 'macd', 'atr']].tail()

## 4. ONNX Price Prediction Model

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
import onnx
import onnxruntime as ort
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
import pickle
import os

# Create directories
os.makedirs('onnx_models', exist_ok=True)

class ONNXPricePredictor:
    def __init__(self):
        self.model = None
        self.scaler = StandardScaler()
        self.session = None
        
    def prepare_data(self, df, target_col='close', lookback=10):
        # Create features for prediction
        features = []
        targets = []
        
        for i in range(lookback, len(df)):
            # Use previous lookback periods as features
            feature_row = []
            for j in range(lookback):
                idx = i - lookback + j
                feature_row.extend([
                    df.iloc[idx]['open'], df.iloc[idx]['high'], 
                    df.iloc[idx]['low'], df.iloc[idx]['close'],
                    df.iloc[idx]['rsi'], df.iloc[idx]['macd'],
                    df.iloc[idx]['bb_signal'], df.iloc[idx]['atr']
                ])
            
            features.append(feature_row)
            targets.append(df.iloc[i][target_col])
        
        return np.array(features), np.array(targets)
    
    def train_model(self, df, test_size=0.2):
        X, y = self.prepare_data(df)
        
        # Split data
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=42)
        
        # Scale features
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)
        
        # Train model
        self.model = RandomForestRegressor(n_estimators=100, random_state=42)
        self.model.fit(X_train_scaled, y_train)
        
        # Evaluate
        y_pred = self.model.predict(X_test_scaled)
        mse = mean_squared_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)
        
        print(f"Model Performance - MSE: {mse:.6f}, R2: {r2:.4f}")
        
        # Convert to ONNX
        self.convert_to_onnx(X_train_scaled)
        
        return mse, r2
    
    def convert_to_onnx(self, X_sample):
        # Define input type
        initial_type = [('float_input', FloatTensorType([None, X_sample.shape[1]]))]
        
        # Convert to ONNX
        onnx_model = convert_sklearn(self.model, initial_types=initial_type)
        
        # Save ONNX model
        with open('onnx_models/price_predictor.onnx', 'wb') as f:
            f.write(onnx_model.SerializeToString())
        
        # Save scaler
        with open('onnx_models/scaler.pkl', 'wb') as f:
            pickle.dump(self.scaler, f)
        
        # Create ONNX runtime session
        self.session = ort.InferenceSession('onnx_models/price_predictor.onnx')
        
        print("Model converted to ONNX successfully")
    
    def predict(self, features):
        if self.session is None:
            self.load_onnx_model()
        
        # Scale features
        features_scaled = self.scaler.transform(features.reshape(1, -1))
        
        # Predict using ONNX
        input_name = self.session.get_inputs()[0].name
        prediction = self.session.run(None, {input_name: features_scaled.astype(np.float32)})
        
        # Ensure scalar return value
        result = prediction[0]
        if isinstance(result, (list, np.ndarray)):
            if hasattr(result, '__len__') and len(result) > 0:
                return float(result[0])
            else:
                return float(result)
        return float(result)
    
    def load_onnx_model(self):
        self.session = ort.InferenceSession('onnx_models/price_predictor.onnx')
        with open('onnx_models/scaler.pkl', 'rb') as f:
            self.scaler = pickle.load(f)
    
    def get_prediction_confidence(self, features, current_price):
        prediction = self.predict(features)
        # Ensure scalar values
        if isinstance(prediction, np.ndarray):
            prediction = float(prediction[0]) if len(prediction) > 0 else float(prediction)
        else:
            prediction = float(prediction)
        
        current_price = float(current_price)
        confidence = abs(prediction - current_price) / current_price
        return prediction, float(1 - confidence)  # Higher confidence for closer predictions

# Train the ONNX model
predictor = ONNXPricePredictor()
mse, r2 = predictor.train_model(df.dropna())

print(f"\n✅ ONNX Model trained successfully!")
print(f"📊 Performance: MSE={mse:.6f}, R²={r2:.4f}")

## 5. Signal Generation System

In [None]:
class SignalGenerator:
    def __init__(self, predictor=None):
        self.predictor = predictor
        
    def generate_signals(self, df, lookback=10):
        signals = []
        
        for i in range(lookback, len(df)):
            current_row = df.iloc[i]
            
            # Technical analysis signals
            bb_signal = self.get_bb_signal(current_row)
            macd_signal = self.get_macd_signal(current_row)
            rsi_signal = self.get_rsi_signal(current_row)
            session_signal = self.get_session_signal(current_row)
            
            # Price prediction signal
            prediction_signal = 0
            confidence = 0.5
            
            if self.predictor:
                try:
                    # Prepare features for prediction
                    features = []
                    for j in range(lookback):
                        idx = i - lookback + j
                        row = df.iloc[idx]
                        features.extend([
                            row['open'], row['high'], row['low'], row['close'],
                            row['rsi'], row['macd'], row['bb_signal'], row['atr']
                        ])
                    
                    features_array = np.array(features)
                    prediction, confidence = self.predictor.get_prediction_confidence(
                        features_array, current_row['close']
                    )
                    
                    if prediction > current_row['close'] * 1.001:  # 0.1% threshold
                        prediction_signal = 1
                    elif prediction < current_row['close'] * 0.999:
                        prediction_signal = -1
                        
                except Exception as e:
                    print(f"Prediction error: {e}")
            
            # Combine signals
            total_signal = bb_signal + macd_signal + rsi_signal + session_signal + prediction_signal
            
            # Determine action
            action = 'HOLD'
            if total_signal >= 2:
                action = 'BUY'
            elif total_signal <= -2:
                action = 'SELL'
            
            # Calculate stop loss and take profit
            atr = current_row['atr']
            if action == 'BUY':
                stop_loss = current_row['close'] - (2 * atr)
                take_profit = current_row['close'] + (3 * atr)
            elif action == 'SELL':
                stop_loss = current_row['close'] + (2 * atr)
                take_profit = current_row['close'] - (3 * atr)
            else:
                stop_loss = take_profit = current_row['close']
            
            signal = {
                'time': current_row['time'],
                'price': current_row['close'],
                'action': action,
                'confidence': confidence,
                'stop_loss': stop_loss,
                'take_profit': take_profit,
                'signals': {
                    'bb': bb_signal,
                    'macd': macd_signal,
                    'rsi': rsi_signal,
                    'session': session_signal,
                    'prediction': prediction_signal,
                    'total': total_signal
                }
            }
            
            signals.append(signal)
        
        return signals
    
    def get_bb_signal(self, row):
        if row['bb_signal'] == 1:  # Price below lower band
            return 1
        elif row['bb_signal'] == -1:  # Price above upper band
            return -1
        return 0
    
    def get_macd_signal(self, row):
        if row['macd_buy'] == 1:
            return 1
        elif row['macd_sell'] == 1:
            return -1
        return 0
    
    def get_rsi_signal(self, row):
        if row['rsi_oversold'] == 1:
            return 1
        elif row['rsi_overbought'] == 1:
            return -1
        return 0
    
    def get_session_signal(self, row):
        # Favor trading during active sessions
        if row['overlap_session'] == 1:
            return 0.5  # Slight bias during overlap
        return 0

# Generate signals
signal_gen = SignalGenerator(predictor)
signals = signal_gen.generate_signals(df.dropna())

print(f"✅ Generated {len(signals)} trading signals")

# Show signal distribution
actions = [s['action'] for s in signals]
from collections import Counter
signal_counts = Counter(actions)
print(f"📊 Signal Distribution: {dict(signal_counts)}")

# Show sample signals
buy_signals = [s for s in signals if s['action'] == 'BUY'][:5]
sell_signals = [s for s in signals if s['action'] == 'SELL'][:5]

print("\n🔵 Sample BUY Signals:")
for signal in buy_signals:
    print(f"Time: {signal['time']}, Price: {signal['price']:.2f}, Confidence: {signal['confidence']:.3f}")

print("\n🔴 Sample SELL Signals:")
for signal in sell_signals:
    print(f"Time: {signal['time']}, Price: {signal['price']:.2f}, Confidence: {signal['confidence']:.3f}")

## 6. Backtesting System

In [None]:
class Backtester:
    def __init__(self, initial_balance=10000):
        self.initial_balance = initial_balance
        self.balance = initial_balance
        self.trades = []
        self.open_positions = []
        
    def run_backtest(self, signals, df):
        self.balance = self.initial_balance
        self.trades = []
        self.open_positions = []
        
        for i, signal in enumerate(signals):
            if signal['action'] in ['BUY', 'SELL']:
                self.open_position(signal)
            
            # Check for position closures
            self.check_position_closures(signal, df)
        
        # Close any remaining positions
        if signals:
            final_price = signals[-1]['price']
            for pos in self.open_positions[:]:
                self.close_position(pos, final_price, 'END_OF_DATA')
        
        return self.calculate_metrics()
    
    def open_position(self, signal):
        if len(self.open_positions) >= 3:  # Max 3 open positions
            return
        
        # Calculate position size based on confidence
        risk_per_trade = 0.02  # 2% risk per trade
        confidence_multiplier = max(0.5, signal['confidence'])
        position_size = (self.balance * risk_per_trade * confidence_multiplier) / abs(signal['price'] - signal['stop_loss'])
        position_size = min(position_size, self.balance * 0.1)  # Max 10% of balance per trade
        
        position = {
            'type': signal['action'],
            'entry_price': signal['price'],
            'entry_time': signal['time'],
            'size': position_size,
            'stop_loss': signal['stop_loss'],
            'take_profit': signal['take_profit'],
            'confidence': signal['confidence']
        }
        
        self.open_positions.append(position)
    
    def check_position_closures(self, current_signal, df):
        current_price = current_signal['price']
        
        for position in self.open_positions[:]:
            if position['type'] == 'BUY':
                if current_price <= position['stop_loss']:
                    self.close_position(position, current_price, 'STOP_LOSS')
                elif current_price >= position['take_profit']:
                    self.close_position(position, current_price, 'TAKE_PROFIT')
            
            elif position['type'] == 'SELL':
                if current_price >= position['stop_loss']:
                    self.close_position(position, current_price, 'STOP_LOSS')
                elif current_price <= position['take_profit']:
                    self.close_position(position, current_price, 'TAKE_PROFIT')
    
    def close_position(self, position, exit_price, reason):
        if position['type'] == 'BUY':
            pnl = (exit_price - position['entry_price']) * position['size']
        else:  # SELL
            pnl = (position['entry_price'] - exit_price) * position['size']
        
        self.balance += pnl
        
        trade = {
            'type': position['type'],
            'entry_price': position['entry_price'],
            'exit_price': exit_price,
            'entry_time': position['entry_time'],
            'size': position['size'],
            'pnl': pnl,
            'reason': reason,
            'confidence': position['confidence']
        }
        
        self.trades.append(trade)
        self.open_positions.remove(position)
    
    def calculate_metrics(self):
        if not self.trades:
            return {'error': 'No trades executed'}
        
        total_return = (self.balance - self.initial_balance) / self.initial_balance
        
        winning_trades = [t for t in self.trades if t['pnl'] > 0]
        losing_trades = [t for t in self.trades if t['pnl'] < 0]
        
        win_rate = len(winning_trades) / len(self.trades) if self.trades else 0
        
        avg_win = np.mean([t['pnl'] for t in winning_trades]) if winning_trades else 0
        avg_loss = np.mean([t['pnl'] for t in losing_trades]) if losing_trades else 0
        
        profit_factor = abs(sum(t['pnl'] for t in winning_trades) / sum(t['pnl'] for t in losing_trades)) if losing_trades else float('inf')
        
        # Calculate Sharpe ratio (simplified)
        returns = [t['pnl'] / self.initial_balance for t in self.trades]
        sharpe_ratio = np.mean(returns) / np.std(returns) if np.std(returns) > 0 else 0
        
        return {
            'total_trades': len(self.trades),
            'winning_trades': len(winning_trades),
            'losing_trades': len(losing_trades),
            'win_rate': win_rate,
            'total_return': total_return,
            'final_balance': self.balance,
            'avg_win': avg_win,
            'avg_loss': avg_loss,
            'profit_factor': profit_factor,
            'sharpe_ratio': sharpe_ratio
        }

# Run backtest
backtester = Backtester(initial_balance=10000)
results = backtester.run_backtest(signals, df)

print("\n📈 BACKTESTING RESULTS")
print("=" * 50)
for key, value in results.items():
    if isinstance(value, float):
        if 'rate' in key or 'return' in key or 'ratio' in key:
            print(f"{key.replace('_', ' ').title()}: {value:.2%}")
        else:
            print(f"{key.replace('_', ' ').title()}: {value:.2f}")
    else:
        print(f"{key.replace('_', ' ').title()}: {value}")

# Show sample trades
if backtester.trades:
    print("\n💰 Sample Trades:")
    for trade in backtester.trades[:5]:
        pnl_str = f"+${trade['pnl']:.2f}" if trade['pnl'] > 0 else f"-${abs(trade['pnl']):.2f}"
        print(f"{trade['type']} @ {trade['entry_price']:.2f} → {trade['exit_price']:.2f} | {pnl_str} ({trade['reason']})")

## 7. Results Visualization

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Set style
plt.style.use('seaborn-v0_8')
fig, axes = plt.subplots(2, 2, figsize=(15, 12))

# 1. Price chart with signals
ax1 = axes[0, 0]
price_data = df.tail(500)  # Last 500 bars
ax1.plot(price_data.index, price_data['close'], label='Close Price', alpha=0.7)
ax1.plot(price_data.index, price_data['bb_upper'], label='BB Upper', alpha=0.5)
ax1.plot(price_data.index, price_data['bb_lower'], label='BB Lower', alpha=0.5)
ax1.fill_between(price_data.index, price_data['bb_upper'], price_data['bb_lower'], alpha=0.1)

# Add signal markers
buy_signals_viz = [s for s in signals[-500:] if s['action'] == 'BUY']
sell_signals_viz = [s for s in signals[-500:] if s['action'] == 'SELL']

if buy_signals_viz:
    buy_times = [s['time'] for s in buy_signals_viz]
    buy_prices = [s['price'] for s in buy_signals_viz]
    ax1.scatter(range(len(buy_times)), buy_prices, color='green', marker='^', s=50, label='Buy Signals')

if sell_signals_viz:
    sell_times = [s['time'] for s in sell_signals_viz]
    sell_prices = [s['price'] for s in sell_signals_viz]
    ax1.scatter(range(len(sell_times)), sell_prices, color='red', marker='v', s=50, label='Sell Signals')

ax1.set_title('Price Chart with Trading Signals')
ax1.legend()
ax1.grid(True, alpha=0.3)

# 2. RSI
ax2 = axes[0, 1]
ax2.plot(price_data.index, price_data['rsi'], label='RSI', color='purple')
ax2.axhline(y=70, color='r', linestyle='--', alpha=0.7, label='Overbought')
ax2.axhline(y=30, color='g', linestyle='--', alpha=0.7, label='Oversold')
ax2.set_title('RSI Indicator')
ax2.set_ylim(0, 100)
ax2.legend()
ax2.grid(True, alpha=0.3)

# 3. MACD
ax3 = axes[1, 0]
ax3.plot(price_data.index, price_data['macd'], label='MACD', color='blue')
ax3.plot(price_data.index, price_data['macd_signal'], label='Signal', color='red')
ax3.bar(price_data.index, price_data['macd_histogram'], alpha=0.3, label='Histogram')
ax3.set_title('MACD Indicator')
ax3.legend()
ax3.grid(True, alpha=0.3)

# 4. Performance metrics
ax4 = axes[1, 1]
if backtester.trades:
    cumulative_pnl = np.cumsum([t['pnl'] for t in backtester.trades])
    ax4.plot(range(len(cumulative_pnl)), cumulative_pnl, label='Cumulative P&L', color='green')
    ax4.axhline(y=0, color='black', linestyle='-', alpha=0.5)
    ax4.set_title('Cumulative P&L')
    ax4.legend()
    ax4.grid(True, alpha=0.3)
else:
    ax4.text(0.5, 0.5, 'No trades executed', ha='center', va='center', transform=ax4.transAxes)
    ax4.set_title('No Trading Data')

plt.tight_layout()
plt.show()

# Summary statistics
print("\n📊 SYSTEM SUMMARY")
print("=" * 50)
print(f"🔹 Data Points: {len(df):,}")
print(f"🔹 Signals Generated: {len(signals):,}")
print(f"🔹 Buy Signals: {len([s for s in signals if s['action'] == 'BUY']):,}")
print(f"🔹 Sell Signals: {len([s for s in signals if s['action'] == 'SELL']):,}")
print(f"🔹 Model R² Score: {r2:.4f}")
if 'win_rate' in results:
    print(f"🔹 Backtest Win Rate: {results['win_rate']:.2%}")
    print(f"🔹 Total Return: {results['total_return']:.2%}")

print("\n✅ MT5 AI Trading System Analysis Complete!")