# ü§ñ PredictX: The Trinity Training (Tier 7)
**Reinforcement Learning (PPO) + CNN Pattern Recognition + LSTM Trend Following**

**‚ö†Ô∏è IMPORTANT: Validate that you have run the SETUP cell below before running any other cells!**


## 1. Environment Setup
Run this cell first to install dependencies and create the necessary folder structure.


In [18]:
# Install dependencies
!pip install torch pandas numpy ccxt scikit-learn stable-baselines3 shimmy gymnasium matplotlib

# Create Directory Structure (Robust)
import os
dirs = ['backend/services', 'backend/models', 'backend/logs']
for d in dirs:
    os.makedirs(d, exist_ok=True)
    print(f'‚úÖ Created {d}')

import sys
sys.path.append(os.path.abspath('backend'))

‚úÖ Created backend/services
‚úÖ Created backend/models
‚úÖ Created backend/logs


## 2. Upload Codebase
We write the Python files to the Colab environment.


In [19]:
import os

def safe_write_file(path, content):
    # Ensure directory exists
    os.makedirs(os.path.dirname(path), exist_ok=True)
    # Write file
    with open(path, 'w') as f:
        f.write(content)
    print(f'üìÑ Written: {path}')

In [20]:
safe_write_file('backend/services/data_service.py', r"""
import ccxt
import pandas as pd
from datetime import datetime
import time

def get_historical_data(symbol: str, period: str = "1mo", interval: str = "1h", limit: int = 1000) -> dict:
    \"\"\"
    Fetch historical market data from Binance using CCXT.
    
    Args:
        symbol (str): Ticker symbol (e.g., "BTC-USD", "BTC/USDT")
        period (str): Ignored in CCXT (calculated from limit/since), kept for API compatibility.
        interval (str): Data interval (default: "1h")
        limit (int): Number of candles to fetch (default: 1000)
        
    Returns:
        dict: Processed data including candles and metadata
    \"\"\"
    print(f"Fetching data for {symbol} (Interval: {interval}) using CCXT/Binance...")

    try:
        # Initialize Binance Exchange
        exchange = ccxt.binance()
        
        # Normalize symbol: CCXT expects "BTC/USDT", Frontend might send "BTC-USD" or "BTCUSDT"
        # 1. Replace first hyphen with slash if exists (BTC-USD -> BTC/USD)
        if "-" in symbol:
            symbol = symbol.replace("-", "/")
        # 2. If no slash, assumes it might be raw (BTCUSDT), let CCXT try or manually fix if needed.
        # But for "BTC-USD" from frontend, it usually means BTC/USDT in Binance terms.
        if "USD" in symbol and "USDT" not in symbol:
             symbol = symbol.replace("USD", "USDT")
        
        # Ensure it has a slash for CCXT
        if "/" not in symbol and len(symbol) > 6: 
             # Rough heuristic: insert slash before last 4 chars (USDT) or 3 chars (BTC)
             # Better: just force standard map if known.
             if symbol.endswith("USDT"):
                 symbol = symbol[:-4] + "/USDT"
        
        print(f"Normalized symbol for Binance: {symbol}")

        # Map 'period' to limit/since if needed, but for now we'll fetch a fixed amount suitable for training
        # If period is "1mo", 1h candles = 24 * 30 = 720 candles.
        # Binance call limit is 1000.
        fetch_limit = min(limit, 1000)
        
        # Fetch OHLCV
        # timestamp, open, high, low, close, volume
        ohlcv = exchange.fetch_ohlcv(symbol, timeframe=interval, limit=fetch_limit)
        
        if not ohlcv:
             return {"error": f"No data found for symbol {symbol}"}

        # Convert to DataFrame
        df = pd.DataFrame(ohlcv, columns=['time', 'open', 'high', 'low', 'close', 'volume'])
        
        # Process timestamps (ms to datetime string)
        df['time'] = pd.to_datetime(df['time'], unit='ms').astype(str)
        
        return {
            "symbol": symbol,
            "count": len(df),
            "data": df.to_dict(orient="records")
        }
        
    except Exception as e:
        print(f"CCXT Error: {e}")
        return {"error": str(e)}
""")

üìÑ Written: backend/services/data_service.py


In [21]:
safe_write_file('backend/services/lstm_service.py', r"""
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np

class TimeSeriesDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)
        
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

class LSTMModel(nn.Module):
    def __init__(self, input_size=1, hidden_size=64, num_layers=2, output_size=1):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x):
        # Initialize hidden state with zeros
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))
        
        # Decode the hidden state of the last time step
        out = self.fc(out[:, -1, :])
        return out

def train_model(model, train_loader, num_epochs=10, learning_rate=0.001):
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    
    model.train()
    history = {'loss': []}
    
    for epoch in range(num_epochs):
        epoch_loss = 0
        for inputs, targets in train_loader:
            outputs = model(inputs)
            loss = criterion(outputs, targets.unsqueeze(1)) # Ensure targets have correct shape
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            epoch_loss += loss.item()
        
        avg_loss = epoch_loss / len(train_loader)
        history['loss'].append(avg_loss)
        if (epoch+1) % 5 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.6f}')
            
    return history

def predict(model, data):
    model.eval()
    with torch.no_grad():
        # data shape expected: (1, seq_len, input_size) or (seq_len, input_size)
        if isinstance(data, list):
            data = np.array(data)
        
        inputs = torch.tensor(data, dtype=torch.float32)
        if inputs.dim() == 2:
            inputs = inputs.unsqueeze(0) # Add batch dimension if missing
            
        output = model(inputs)
        return output.item()
""")

üìÑ Written: backend/services/lstm_service.py


In [22]:
safe_write_file('backend/services/cnn_service.py', r"""
import torch
import torch.nn as nn
import torch.nn.functional as F

class CNNPatternModel(nn.Module):
    \"\"\"
    CNN for Candlestick Pattern Recognition
    Input: 20√ó4 matrix (20 candles √ó OHLC)
    Output: Binary probability (Bullish/Bearish)
    \"\"\"
    def __init__(self, sequence_length=20, input_features=4):
        super(CNNPatternModel, self).__init__()
        
        # Conv1D layers for pattern extraction
        self.conv1 = nn.Conv1d(in_channels=input_features, out_channels=32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm1d(32)
        self.pool1 = nn.MaxPool1d(kernel_size=2)
        
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm1d(64)
        self.pool2 = nn.MaxPool1d(kernel_size=2)
        
        self.conv3 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm1d(128)
        
        # Calculate flattened size after convolutions
        # sequence_length=20 -> pool1(10) -> pool2(5)
        self.flat_size = 128 * 5
        
        # Fully connected layers
        self.fc1 = nn.Linear(self.flat_size, 64)
        self.dropout1 = nn.Dropout(0.3)
        self.fc2 = nn.Linear(64, 32)
        self.dropout2 = nn.Dropout(0.2)
        self.fc3 = nn.Linear(32, 1)  # Binary output
        
    def forward(self, x):
        # x shape: (batch, features, sequence)
        # Conv layers
        x = self.pool1(F.relu(self.bn1(self.conv1(x))))
        x = self.pool2(F.relu(self.bn2(self.conv2(x))))
        x = F.relu(self.bn3(self.conv3(x)))
        
        # Flatten
        x = x.view(-1, self.flat_size)
        
        # Dense layers
        x = F.relu(self.fc1(x))
        x = self.dropout1(x)
        x = F.relu(self.fc2(x))
        x = self.dropout2(x)
        x = torch.sigmoid(self.fc3(x))  # Output probability 0-1
        
        return x

def train_cnn_model(model, train_loader, num_epochs=30, learning_rate=0.001):
    \"\"\"
    Train CNN pattern recognition model
    \"\"\"
    criterion = nn.BCELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    
    model.train()
    history = {'loss': [], 'accuracy': []}
    
    for epoch in range(num_epochs):
        epoch_loss = 0
        correct = 0
        total = 0
        
        for batch_x, batch_y in train_loader:
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            
            # Backward pass
            loss.backward()
            optimizer.step()
            
            # Metrics
            epoch_loss += loss.item()
            predicted = (outputs > 0.5).float()
            total += batch_y.size(0)
            correct += (predicted == batch_y).sum().item()
        
        avg_loss = epoch_loss / len(train_loader)
        accuracy = 100 * correct / total
        
        history['loss'].append(avg_loss)
        history['accuracy'].append(accuracy)
        
        if (epoch + 1) % 5 == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%")
    
    return history

def predict_pattern(model, candle_window):
    \"\"\"
    Predict bullish/bearish pattern from candle window
    Args:
        candle_window: numpy array (20, 4) - OHLC data
    Returns:
        probability: float (0-1, where >0.5 = bullish)
    \"\"\"
    model.eval()
    with torch.no_grad():
        # Convert to tensor and reshape for CNN
        # Shape: (1, features=4, sequence=20)
        x = torch.FloatTensor(candle_window.T).unsqueeze(0)
        prob = model(x).item()
    return prob
""")

üìÑ Written: backend/services/cnn_service.py


In [23]:
safe_write_file('backend/services/chart_generator.py', r"""
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

def generate_chart_windows(df, window_size=20):
    \"\"\"
    Generate sliding windows of OHLC data for CNN training
    
    Args:
        df: DataFrame with columns ['open', 'high', 'low', 'close']
        window_size: Number of candles per window
    
    Returns:
        windows: numpy array (n_windows, window_size, 4)
        labels: numpy array (n_windows, 1) - 1 if next candle is bullish, 0 if bearish
    \"\"\"
    windows = []
    labels = []
    
    # Normalize OHLC data
    scaler = MinMaxScaler()
    ohlc_data = df[['open', 'high', 'low', 'close']].values
    
    for i in range(len(df) - window_size - 1):
        # Extract window
        window = ohlc_data[i:i+window_size]
        
        # Normalize window (0-1 range)
        window_normalized = scaler.fit_transform(window)
        
        # Label: Is next candle bullish?
        next_close = df.iloc[i + window_size]['close']
        current_close = df.iloc[i + window_size - 1]['close']
        label = 1.0 if next_close > current_close else 0.0
        
        windows.append(window_normalized)
        labels.append(label)
    
    return np.array(windows), np.array(labels).reshape(-1, 1)

def detect_candlestick_patterns(window):
    \"\"\"
    Detect common candlestick patterns in a window
    
    Args:
        window: numpy array (n, 4) - OHLC data
    
    Returns:
        patterns: dict with pattern names and confidence scores
    \"\"\"
    patterns = {}
    
    # Get last candle
    if len(window) < 1:
        return patterns
    
    last = window[-1]
    o, h, l, c = last[0], last[1], last[2], last[3]
    
    body = abs(c - o)
    range_hl = h - l
    
    if range_hl == 0:
        return patterns
    
    # Doji: Small body relative to range
    if body / range_hl < 0.1:
        patterns['doji'] = 0.8
    
    # Hammer: Long lower shadow, small body at top
    lower_shadow = min(o, c) - l
    upper_shadow = h - max(o, c)
    if lower_shadow > 2 * body and upper_shadow < body:
        patterns['hammer'] = 0.7
    
    # Engulfing (need 2 candles)
    if len(window) >= 2:
        prev = window[-2]
        prev_o, prev_c = prev[0], prev[3]
        
        # Bullish engulfing
        if prev_c < prev_o and c > o and c > prev_o and o < prev_c:
            patterns['bullish_engulfing'] = 0.9
        
        # Bearish engulfing
        if prev_c > prev_o and c < o and c < prev_o and o > prev_c:
            patterns['bearish_engulfing'] = 0.9
    
    return patterns

def prepare_cnn_input(candles, window_size=20):
    \"\"\"
    Prepare candle data for CNN input
    
    Args:
        candles: list of dicts with OHLC data
        window_size: Number of candles to use
    
    Returns:
        numpy array (window_size, 4) ready for CNN
    \"\"\"
    if len(candles) < window_size:
        return None
    
    # Extract last window_size candles
    recent_candles = candles[-window_size:]
    
    # Convert to OHLC array
    ohlc = np.array([
        [c['open'], c['high'], c['low'], c['close']]
        for c in recent_candles
    ])
    
    # Normalize
    scaler = MinMaxScaler()
    ohlc_normalized = scaler.fit_transform(ohlc)
    
    return ohlc_normalized
""")

üìÑ Written: backend/services/chart_generator.py


In [24]:
safe_write_file('backend/ai_engine.py', r"""
import torch
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader
from services.lstm_service import LSTMModel, train_model, predict, TimeSeriesDataset
from services.data_service import get_historical_data
from services.db_service import db_service
import os
import logging
import time
from sklearn.preprocessing import StandardScaler
import joblib

# RL Integration
try:
    from stable_baselines3 import PPO
    RL_AVAILABLE = True
except ImportError:
    RL_AVAILABLE = False
    print("‚ö†Ô∏è stable-baselines3 not installed. RL features disabled.")

# Configure Logging
logging.basicConfig(
    filename='training.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

def add_indicators(df):
    \"\"\"
    Feature Engineering: Adds Log Returns (Stationary), RSI, EMA Trend Difference, and EMA 200
    \"\"\"
    # 1. Log Returns (The Target)
    # Log Change = ln(Pt / Pt-1)
    # This makes the data stationary, which is crucial for LSTM
    df['log_return'] = np.log(df['close'] / df['close'].shift(1))
    
    # RSI (Relative Strength Index)
    delta = df['close'].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
    rs = gain / loss
    df['rsi'] = 100 - (100 / (1 + rs))
    
    # EMA (Exponential Moving Average) - Trend Divergence
    df['ema_20'] = df['close'].ewm(span=20, adjust=False).mean()
    df['ema_diff'] = (df['close'] - df['ema_20']) / df['ema_20'] # Percentage distance to trend
    
    # EMA 200 - Major Trend Filter (Tier 5)
    df['ema_200'] = df['close'].ewm(span=200, adjust=False).mean()
    
    # ATR (Average True Range) - Volatility
    high_low = df['high'] - df['low']
    high_close = np.abs(df['high'] - df['close'].shift())
    low_close = np.abs(df['low'] - df['close'].shift())
    ranges = pd.concat([high_low, high_close, low_close], axis=1)
    true_range = np.max(ranges, axis=1)
    df['atr'] = true_range.rolling(14).mean()

    # Fill NaN from rolling calculations
    df.fillna(0, inplace=True) 
    return df

class AIEngine:
    def __init__(self):
        self.models_loaded = False
        print("Initializing AI Engine (Tier 5 - The Trend Surfer)...")
        logging.info("Initializing AI Engine (Tier 5 - The Trend Surfer)...")
        
        # New Feature Set: Close, RSI, EMA_Diff
        self.input_size = 3 
        self.seq_length = 60
        self.hidden_size = 128 
        self.num_layers = 3    
        
        self.scaler = StandardScaler()
        self.scaler_path = 'models/scaler_v2.pkl'
        self.model_path = 'models/predictx_v2.pth'
        
        # RL Agent (Tier 6)
        self.rl_agent = None
        self.rl_enabled = False
        
        # CNN Model (Tier 7)
        self.cnn_model = None
        self.cnn_enabled = False
        
        # Initialize LSTM Model
        try:
            self.lstm_model = LSTMModel(input_size=self.input_size, 
                                      hidden_size=self.hidden_size, 
                                      num_layers=self.num_layers)
            self.load_model()
            self.load_rl_agent()
            self.load_cnn_model()
                
        except Exception as e:
            print(f"Failed to initialize LSTM: {e}")
            self.models_loaded = False

    def load_model(self):
        if os.path.exists(self.model_path):
            try:
                self.lstm_model.load_state_dict(torch.load(self.model_path))
                self.lstm_model.eval()
                
                # Load Scaler if exists
                if os.path.exists(self.scaler_path):
                    self.scaler = joblib.load(self.scaler_path)
                    
                self.models_loaded = True
                print(f"LSTM Model (Tier 5) Loaded from {self.model_path}")
            except Exception as e:
                print(f"Error loading model: {e}")
        else:
            print("LSTM Model Initialized (Random Weights) - Waiting for training")

    def prepare_data(self, data, seq_length):
        xs, ys = [], []
        for i in range(len(data) - seq_length):
            x = data[i:(i + seq_length)]
            # Predict only Close price (index 0) for next step
            y = data[i + seq_length, 0] 
            xs.append(x)
            ys.append(y)
        return np.array(xs), np.array(ys)

    def train(self, symbol="BTC-USD", epochs=50, interval="1h"):
        start_time = time.time()
        print(f"Starting Tier 5 training for {symbol} ({interval})...")
        logging.info(f"Starting Tier 5 training session for {symbol} ({interval}) with {epochs} epochs")
        
        # 1. Fetch Data
        raw_data = get_historical_data(symbol, period="1y", interval=interval)
        if "error" in raw_data:
            return {"status": "error", "message": raw_data["error"]}
            
        df = pd.DataFrame(raw_data["data"])
        if df.empty:
             return {"status": "error", "message": "No data received"}

        # 2. Feature Engineering
        df = add_indicators(df)
        
        # Select features: Log Return, RSI, EMA_Diff
        # We replace 'close' with 'log_return' to prevent non-stationarity issues
        features = df[['log_return', 'rsi', 'ema_diff']].values
        
        # 3. Scaling (Fit only on training data)
        # For simplicity in this pipeline, we fit on the whole fetched dataset 
        # (assuming it's a batch update)
        scaled_data = self.scaler.fit_transform(features)
        
        # Save scaler for inference
        if not os.path.exists('models'):
            os.makedirs('models')
        joblib.dump(self.scaler, self.scaler_path)
        
        # 4. Prepare Sequences
        X, y = self.prepare_data(scaled_data, self.seq_length)
        
        # Split train/test
        train_size = int(len(X) * 0.8)
        X_train, X_test = X[:train_size], X[train_size:]
        y_train, y_test = y[:train_size], y[train_size:]
        
        train_dataset = TimeSeriesDataset(X_train, y_train)
        train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
        
        # 5. Train
        self.lstm_model.train()
        history = train_model(self.lstm_model, train_loader, num_epochs=epochs)
        
        # 6. Save Model
        torch.save(self.lstm_model.state_dict(), self.model_path)
        self.models_loaded = True
        
        final_loss = history['loss'][-1] if history['loss'] else 0
        duration = time.time() - start_time
        
        print(f"Training complete. Final Loss: {final_loss:.6f}")
        
        # Log to DB
        db_service.log_training_session({
            "symbol": symbol,
            "epochs": epochs,
            "final_loss": float(final_loss),
            "status": "SUCCESS",
            "duration_seconds": round(duration, 2),
            "metadata": {"algorithm": "LSTM_Tier5_TrendSurfer"}
        })
        
        return {
            "status": "success", 
            "message": "Tier 5 Model trained successfully", 
            "final_loss": final_loss,
            "epochs": epochs
        }

    def predict_next_move(self, candles: list):
        \"\"\"
        Returns probability of Uptrend next candle (0.0 - 1.0)
        \"\"\"
        # Need enough data for lag features (EMA 200 needs 200 candles)
        if not self.models_loaded or len(candles) < 205:
            return 0.5
            
        try:
            df = pd.DataFrame(candles)
            df = add_indicators(df)

            # Take last seq_length features
            current_features = df[['log_return', 'rsi', 'ema_diff']].tail(self.seq_length).values
            
            # Use saved scaler
            scaled_input = self.scaler.transform(current_features)
            
            # Predict
            self.lstm_model.eval()
            with torch.no_grad():
                # [1, seq, feat]
                input_tensor = torch.FloatTensor(scaled_input).unsqueeze(0) 
                pred_scaled_return = self.lstm_model(input_tensor).item()
            
            # LOGIC: Return-based Probability
            # pred_scaled_return is the predicted Z-Score of the next log return
            # A positive value means price increase, negative means decrease
            
            # Normalize to 0-1 probability
            # Sigmoid(x) centers at 0. Positive x -> >0.5
            # We add a gain factor (e.g., 3) to make it more decisive
            prob = 1 / (1 + np.exp(-pred_scaled_return * 3)) 
            
            # --- TIER 5: TREND SURFER LOGIC ---
            current_close = df['close'].iloc[-1]
            ema_200 = df['ema_200'].iloc[-1]
            current_rsi = df['rsi'].iloc[-1]
            
            # 1. Trend Filter (EMA 200)
            # LONG ONLY if Price > EMA 200
            if current_close > ema_200:
                # Allow Bullish Prob. Suppress Bearish Prob.
                if prob < 0.5:
                    prob = 0.5 + (prob - 0.5) * 0.5 # Dampen bearish signal
                # Boost Bullish Confidence slightly if strong trend
                prob = min(0.99, prob * 1.05)
            
            # SHORT ONLY if Price < EMA 200
            elif current_close < ema_200:
                # Allow Bearish Prob. Suppress Bullish Prob.
                if prob > 0.5:
                    prob = 0.5 + (prob - 0.5) * 0.5 # Dampen bullish signal
                # Boost Bearish Confidence slightly
                prob = max(0.01, prob * 0.95)
            
            # 2. RSI Sanity Check (Relaxed)
            # Prevent longing the absolute top (>85) or shorting absolute bottom (<15)
            if current_rsi > 85: 
                prob = min(prob, 0.45) # Force Sell/Hold
            if current_rsi < 15: 
                prob = max(prob, 0.55) # Force Buy/Hold
            
            return float(prob)
            
        except Exception as e:
            print(f"Prediction error: {e}")
            return 0.5

    def load_rl_agent(self):
        \"\"\"
        Load trained PPO agent for Tier 6 hybrid decision-making
        \"\"\"
        if not RL_AVAILABLE:
            print("‚ö†Ô∏è RL agent not available (stable-baselines3 not installed)")
            return
            
        rl_model_path = "models/ppo_agent.zip"
        if os.path.exists(rl_model_path):
            try:
                self.rl_agent = PPO.load(rl_model_path)
                self.rl_enabled = True
                print(f"‚úÖ RL Agent (PPO) Loaded from {rl_model_path}")
            except Exception as e:
                print(f"‚ö†Ô∏è Failed to load RL agent: {e}")
        else:
            print(f"‚ö†Ô∏è RL model not found at {rl_model_path}. Run train_rl_agent.py first.")
    
    def get_rl_recommendation(self, state_vector):
        \"\"\"
        Get action recommendation from RL agent
        Returns: (action_name, leverage, confidence)
        \"\"\"
        if not self.rl_enabled or self.rl_agent is None:
            return None
            
        try:
            action, _states = self.rl_agent.predict(state_vector, deterministic=True)
            
            # Map action to trading decision
            action_map = {
                0: ("HOLD", 1, 50),
                1: ("BUY", 1, 70),
                2: ("BUY", 3, 85),
                3: ("BUY", 5, 95),
                4: ("SELL", 1, 80)
            }
            
            return action_map.get(action, ("HOLD", 1, 50))
        except Exception as e:
            print(f"RL prediction error: {e}")
            return None
    
    def load_cnn_model(self):
        \"\"\"
        Load trained CNN model for Tier 7 ensemble
        \"\"\"
        from services.cnn_service import CNNPatternModel
        
        cnn_model_path = "models/cnn_pattern_v1.pth"
        if os.path.exists(cnn_model_path):
            try:
                self.cnn_model = CNNPatternModel(sequence_length=20, input_features=4)
                self.cnn_model.load_state_dict(torch.load(cnn_model_path))
                self.cnn_model.eval()
                self.cnn_enabled = True
                print(f"‚úÖ CNN Pattern Model Loaded from {cnn_model_path}")
            except Exception as e:
                print(f"‚ö†Ô∏è Failed to load CNN model: {e}")
        else:
            print(f"‚ö†Ô∏è CNN model not found at {cnn_model_path}. Run train_cnn.py first.")
    
    def get_cnn_prediction(self, candles):
        \"\"\"
        Get pattern prediction from CNN model
        Returns: probability (0-1, where >0.5 = bullish pattern)
        \"\"\"
        if not self.cnn_enabled or self.cnn_model is None:
            return None
        
        try:
            from services.chart_generator import prepare_cnn_input
            
            # Prepare 20-candle window
            window = prepare_cnn_input(candles, window_size=20)
            if window is None:
                return None
            
            # Get CNN prediction
            from services.cnn_service import predict_pattern
            prob = predict_pattern(self.cnn_model, window)
            return prob
            
        except Exception as e:
            print(f"CNN prediction error: {e}")
            return None

    def decide_action(self, trend_prob: float, state_vector=None, candles=None):
        \"\"\"
        Tier 7 (Trinity): LSTM + CNN + RL Ensemble
        - LSTM: Trend Direction (Base)
        - CNN: Pattern Recognition (Confirmation)
        - RL: Strategic Decision (Entry/Exit + Leverage)
        \"\"\"
        # 1. Base Score (LSTM)
        ensemble_score = trend_prob
        
        # 2. Add CNN Influence (if available)
        cnn_prob = None
        if self.cnn_enabled and candles is not None:
            cnn_prob = self.get_cnn_prediction(candles)
            if cnn_prob is not None:
                # Weighted Fusion: LSTM 60%, CNN 40%
                ensemble_score = (trend_prob * 0.6) + (cnn_prob * 0.4)
        
        # 3. Get RL Recommendation (if available)
        rl_action = "HOLD"
        rl_leverage = 1
        rl_conf = 0
        
        if self.rl_enabled and state_vector is not None:
            rl_decision = self.get_rl_recommendation(state_vector)
            if rl_decision:
                rl_action, rl_leverage, rl_conf = rl_decision
        
        # --- TRINITY FUSION LOGIC ---
        
        confidence = abs(ensemble_score - 0.5) * 2 * 100
        
        # CASE A: BUY SIGNAL
        if ensemble_score > 0.55: # Bullish Trend
            # If RL agrees (BUY), confidence boosted
            if "BUY" in rl_action:
                final_leverage = rl_leverage
                return f"BUY_{final_leverage}x", min(99, confidence + 15)
            # If RL says HOLD, weak buy (1x)
            elif rl_action == "HOLD":
                return "BUY_1x", confidence
            # If RL says SELL, conflict -> HOLD
            elif rl_action == "SELL":
                return "HOLD", confidence
                
        # CASE B: SELL SIGNAL
        elif ensemble_score < 0.45: # Bearish Trend
            # If RL agrees (SELL), strong exit
            if rl_action == "SELL":
                return "SELL", min(99, confidence + 15)
            # If RL says HOLD/BUY, but Trend is Bearish -> Weak Exit
            else:
                return "SELL", confidence
                
        # CASE C: NEUTRAL CHART (Side-ways)
        else:
            # RL is the Tie-Breaker
            if "BUY" in rl_action and confidence > 40:
                return f"BUY_{rl_leverage}x", 50 # Speculative Buy
            elif rl_action == "SELL":
                return "SELL", 50 # Speculative Exit
                
        return "HOLD", round(confidence, 1)

ai_engine = AIEngine()
""")

üìÑ Written: backend/ai_engine.py


In [25]:
safe_write_file('backend/rl_trading_env.py', r"""
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pandas as pd
from ai_engine import add_indicators

class TradingEnv(gym.Env):
    \"\"\"
    Custom Trading Environment for RL Agent (PPO)
    Optimized for Rp 250k capital with leverage support
    \"\"\"
    metadata = {'render_modes': ['human']}
    
    def __init__(self, df, initial_balance=250000, fee_rate=0.001):
        super(TradingEnv, self).__init__()
        
        self.df = df
        self.initial_balance = initial_balance
        self.fee_rate = fee_rate
        
        # State: [Close_norm, RSI, EMA_Diff, LSTM_Prob, Position, Balance_norm, Leverage]
        self.observation_space = spaces.Box(
            low=np.array([0, 0, -1, 0, -1, 0, 0]),
            high=np.array([1, 100, 1, 1, 1, 10, 5]),
            dtype=np.float32
        )
        
        # Action: 0=Hold, 1=Buy_1x, 2=Buy_3x, 3=Buy_5x, 4=Sell
        self.action_space = spaces.Discrete(5)
        
        self.reset()
    
    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        
        self.current_step = 60  # Start after enough data for indicators
        self.balance = self.initial_balance
        self.position = 0.0
        self.entry_price = 0.0
        self.leverage = 1
        self.total_profit = 0
        self.trades = []
        self.equity_curve = [self.initial_balance]
        
        return self._get_observation(), {}
    
    def _get_observation(self):
        \"\"\"
        Returns normalized state vector
        \"\"\"
        row = self.df.iloc[self.current_step]
        
        # Normalize close price (0-1 range based on recent window)
        recent_prices = self.df['close'].iloc[max(0, self.current_step-100):self.current_step+1]
        close_norm = (row['close'] - recent_prices.min()) / (recent_prices.max() - recent_prices.min() + 1e-8)
        
        # RSI (already 0-100)
        rsi = row['rsi']
        
        # EMA Diff (already percentage)
        ema_diff = row['ema_diff']
        
        # LSTM Probability (placeholder - will be filled by ai_engine)
        lstm_prob = 0.5  # Neutral default
        
        # Position (-1=short, 0=flat, 1=long)
        position_state = 1 if self.position > 0 else 0
        
        # Balance normalized (relative to initial)
        balance_norm = self.balance / self.initial_balance
        
        # Current leverage
        leverage_state = self.leverage
        
        return np.array([
            close_norm,
            rsi / 100.0,  # Normalize to 0-1
            ema_diff,
            lstm_prob,
            position_state,
            balance_norm,
            leverage_state / 5.0  # Normalize to 0-1
        ], dtype=np.float32)
    
    def step(self, action):
        \"\"\"
        Execute action and return (observation, reward, done, truncated, info)
        \"\"\"
        # Convert numpy array to int if needed
        if hasattr(action, 'item'):
            action = action.item()
        
        current_price = self.df.iloc[self.current_step]['close']
        done = False
        reward = 0
        
        # Action Mapping
        # 0 = Hold
        # 1 = Buy 1x
        # 2 = Buy 3x
        # 3 = Buy 5x
        # 4 = Sell
        
        # Execute Action
        if action in [1, 2, 3] and self.position == 0 and self.balance > 10000:  # Min Rp 10k per trade
            # BUY with leverage
            leverage_map = {1: 1, 2: 3, 3: 5}
            self.leverage = leverage_map[action]
            
            # Calculate position size (use 20% of balance for risk management)
            trade_amount = self.balance * 0.2 * self.leverage
            fee = trade_amount * self.fee_rate
            net_amount = trade_amount - fee
            
            self.position = net_amount / current_price
            self.entry_price = current_price
            self.balance -= (trade_amount / self.leverage)  # Deduct margin
            
            self.trades.append({
                'type': 'BUY',
                'price': current_price,
                'leverage': self.leverage,
                'step': self.current_step
            })
            
        elif action == 4 and self.position > 0:
            # SELL
            sell_value = self.position * current_price
            fee = sell_value * self.fee_rate
            net_value = sell_value - fee
            
            # Calculate P&L
            profit = net_value - (self.entry_price * self.position)
            self.total_profit += profit
            
            # Return margin + profit
            self.balance += (self.entry_price * self.position / self.leverage) + profit
            
            self.trades.append({
                'type': 'SELL',
                'price': current_price,
                'profit': profit,
                'step': self.current_step
            })
            
            # Calculate reward based on profit percentage
            profit_pct = profit / (self.entry_price * self.position)
            reward = profit_pct * 100  # Scale reward
            
            # Reset position
            self.position = 0
            self.entry_price = 0
            self.leverage = 1
        
        # Check Liquidation (if price moves against us by 1/leverage)
        if self.position > 0:
            liquidation_price = self.entry_price * (1 - 0.9 / self.leverage)
            if current_price <= liquidation_price:
                # LIQUIDATED
                self.balance = 0
                self.position = 0
                reward = -100  # Heavy penalty
                done = True
        
        # Update equity curve
        current_equity = self.balance + (self.position * current_price if self.position > 0 else 0)
        self.equity_curve.append(current_equity)
        
        # Move to next step
        self.current_step += 1
        
        # Episode end conditions
        if self.current_step >= len(self.df) - 1:
            done = True
            # Final reward based on total return
            final_return = (current_equity - self.initial_balance) / self.initial_balance
            reward += final_return * 100
        
        # Bankruptcy check
        if self.balance < 5000:  # Below minimum tradeable amount
            done = True
            reward = -50
        
        observation = self._get_observation()
        info = {
            'balance': self.balance,
            'position': self.position,
            'total_profit': self.total_profit,
            'trades': len(self.trades)
        }
        
        return observation, reward, done, False, info
    
    def render(self, mode='human'):
        current_equity = self.balance + (self.position * self.df.iloc[self.current_step]['close'] if self.position > 0 else 0)
        print(f"Step: {self.current_step} | Balance: Rp {self.balance:,.0f} | Equity: Rp {current_equity:,.0f} | Trades: {len(self.trades)}")
""")

üìÑ Written: backend/rl_trading_env.py


In [26]:
safe_write_file('backend/train_cnn.py', r"""
import sys
import os
import pandas as pd
import numpy as np
import torch
from torch.utils.data import DataLoader, TensorDataset

sys.path.append(os.path.dirname(os.path.abspath(__file__)))

from services.cnn_service import CNNPatternModel, train_cnn_model
from services.chart_generator import generate_chart_windows
from services.data_service import get_historical_data
from ai_engine import add_indicators

def train_cnn_pattern_model(epochs=40):
    \"\"\"
    Train CNN model for candlestick pattern recognition using multiple assets
    \"\"\"
    symbols = ["BTC-USD", "ETH-USD", "BNB-USD", "SOL-USD", "ADA-USD"]
    print(f"üß† Training CNN Pattern Model for {len(symbols)} symbols")
    print("=" * 60)
    
    all_windows = []
    all_labels = []

    for symbol in symbols:
        # 1. Fetch Historical Data
        print(f"\n[1/5] Fetching data for {symbol}...")
        raw_data = get_historical_data(symbol, period="2y", interval="1h", limit=1000)
        
        if "error" in raw_data:
            print(f"   ‚ùå Error: {raw_data['error']}")
            continue
        
        df = pd.DataFrame(raw_data["data"])
        df = add_indicators(df)
        print(f"   ‚úÖ Loaded {len(df)} candles")
        
        # 2. Generate Training Windows
        windows, labels = generate_chart_windows(df, window_size=20)
        print(f"   ‚úÖ Generated {len(windows)} training samples")
        
        all_windows.append(windows)
        all_labels.append(labels)

    if not all_windows:
        print("‚ùå No data loaded. Aborting.")
        return

    windows = np.concatenate(all_windows, axis=0)
    labels = np.concatenate(all_labels, axis=0)
    print(f"\n‚ú® TOTAL SAMPLES: {len(windows)}")
    
    # 3. Split Train/Test
    # Shuffle first to mix symbols
    indices = np.arange(len(windows))
    np.random.shuffle(indices)
    windows = windows[indices]
    labels = labels[indices]

    split_idx = int(len(windows) * 0.8)
    train_windows = windows[:split_idx]
    train_labels = labels[:split_idx]
    test_windows = windows[split_idx:]
    test_labels = labels[split_idx:]
    
    print(f"   Train: {len(train_windows)} | Test: {len(test_windows)}")
    
    # 4. Create DataLoaders
    print("\n[3/5] Preparing data loaders...")
    
    # Convert to tensors (batch, features, sequence)
    train_x = torch.FloatTensor(train_windows).permute(0, 2, 1)  # (N, 4, 20)
    train_y = torch.FloatTensor(train_labels)
    test_x = torch.FloatTensor(test_windows).permute(0, 2, 1)
    test_y = torch.FloatTensor(test_labels)
    
    train_dataset = TensorDataset(train_x, train_y)
    test_dataset = TensorDataset(test_x, test_y)
    
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
    
    # 5. Initialize Model
    print("\n[4/5] Training CNN model...")
    model = CNNPatternModel(sequence_length=20, input_features=4)
    
    # Train
    history = train_cnn_model(model, train_loader, num_epochs=epochs, learning_rate=0.001)
    
    # 6. Evaluate on Test Set
    print("\n[5/5] Evaluating on test set...")
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for batch_x, batch_y in test_loader:
            outputs = model(batch_x)
            predicted = (outputs > 0.5).float()
            total += batch_y.size(0)
            correct += (predicted == batch_y).sum().item()
    
    test_accuracy = 100 * correct / total
    print(f"‚úÖ Test Accuracy: {test_accuracy:.2f}%")
    
    # 7. Save Model
    model_path = "models/cnn_pattern_v2.pth"
    # Also overwrite the active model path if it exists or use v1 as default
    torch.save(model.state_dict(), model_path)
    torch.save(model.state_dict(), "models/cnn_pattern_v1.pth") # Active model
    print(f"\n‚úÖ Model saved to {model_path} and models/cnn_pattern_v1.pth")
    
    # 8. Summary
    print("\n" + "=" * 60)
    print("üìä TRAINING SUMMARY")
    print("=" * 60)
    print(f"Total Symbols     : {len(symbols)}")
    print(f"Total Samples     : {len(windows)}")
    print(f"Final Train Acc   : {history['accuracy'][-1]:.2f}%")
    print(f"Test Accuracy     : {test_accuracy:.2f}%")
    print(f"Epochs            : {epochs}")
    
    if test_accuracy > 60:
        print("\n‚úÖ Status: READY FOR ENSEMBLE")
    elif test_accuracy > 52:
        print("\n‚ö†Ô∏è Status: ACCEPTABLE (Noisy data)")
    else:
        print("\n‚ùå Status: NEEDS IMPROVEMENT")
    
    return model, history

if __name__ == "__main__":
    train_cnn_pattern_model(epochs=40)
""")

üìÑ Written: backend/train_cnn.py


In [27]:
safe_write_file('backend/train_rl_agent.py', r"""
import sys
import os
import pandas as pd
import numpy as np
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.callbacks import BaseCallback

sys.path.append(os.path.dirname(os.path.abspath(__file__)))

from rl_trading_env import TradingEnv
from services.data_service import get_historical_data
from ai_engine import add_indicators

class TensorboardCallback(BaseCallback):
    \"\"\"
    Custom callback for logging training metrics
    \"\"\"
    def __init__(self, verbose=0):
        super(TensorboardCallback, self).__init__(verbose)
        self.episode_rewards = []
        self.episode_lengths = []
    
    def _on_step(self) -> bool:
        # Log episode rewards
        if len(self.locals.get('infos', [])) > 0:
            for info in self.locals['infos']:
                if 'episode' in info:
                    self.episode_rewards.append(info['episode']['r'])
                    self.episode_lengths.append(info['episode']['l'])
                    
                    if len(self.episode_rewards) % 10 == 0:
                        avg_reward = np.mean(self.episode_rewards[-10:])
                        print(f"Episode {len(self.episode_rewards)} | Avg Reward (last 10): {avg_reward:.2f}")
        
        return True

def train_rl_agent(symbol="BTC-USD", total_timesteps=100000):
    \"\"\"
    Train PPO agent on historical trading data
    \"\"\"
    print(f"ü§ñ Starting RL Agent Training for {symbol}")
    print(f"Total Timesteps: {total_timesteps:,}")
    
    # 1. Fetch Historical Data (2 years for better training)
    print("\n[1/4] Fetching historical data...")
    raw_data = get_historical_data(symbol, period="2y", interval="1h")
    
    if "error" in raw_data:
        print(f"‚ùå Error: {raw_data['error']}")
        return
    
    df = pd.DataFrame(raw_data["data"])
    if df.empty:
        print("‚ùå No data received")
        return
    
    # 2. Add Indicators
    print("[2/4] Adding technical indicators...")
    df = add_indicators(df)
    print(f"‚úÖ Prepared {len(df)} candles with indicators")
    
    # 3. Create Environment
    print("[3/4] Creating trading environment...")
    env = TradingEnv(df, initial_balance=250000)
    env = DummyVecEnv([lambda: env])  # Vectorize for SB3
    
    # 4. Initialize PPO Agent
    print("[4/4] Training PPO agent...")
    model = PPO(
        "MlpPolicy",
        env,
        verbose=1,
        learning_rate=0.0003,
        n_steps=2048,
        batch_size=64,
        n_epochs=10,
        gamma=0.99,
        gae_lambda=0.95,
        clip_range=0.2,
        ent_coef=0.01
    )
    
    # Train
    callback = TensorboardCallback()
    model.learn(total_timesteps=total_timesteps, callback=callback)
    
    # 5. Save Model
    model_path = "models/ppo_agent"
    model.save(model_path)
    print(f"\n‚úÖ Model saved to {model_path}.zip")
    
    # 6. Quick Evaluation
    print("\n--- Quick Evaluation ---")
    obs = env.reset()
    total_reward = 0
    done = False
    steps = 0
    
    while not done and steps < 1000:
        action, _states = model.predict(obs, deterministic=True)
        obs, reward, done, info = env.step(action)
        total_reward += reward[0]
        steps += 1
    
    print(f"Evaluation Steps: {steps}")
    print(f"Total Reward: {total_reward:.2f}")
    print(f"Final Balance: Rp {info[0]['balance']:,.0f}")
    
    return model

if __name__ == "__main__":
    # Train with 100k timesteps (adjust based on compute power)
    # For faster testing, use 10000. For production, use 500000+
    train_rl_agent(symbol="BTC-USD", total_timesteps=50000)
""")

üìÑ Written: backend/train_rl_agent.py


In [28]:
safe_write_file('backend/services/backtest_service.py', r"""
import pandas as pd
import numpy as np
import time
from services.data_service import get_historical_data

def calculate_max_drawdown(balances):
    \"\"\"
    Menghitung persentase penurunan terdalam dari titik puncak (Peak)
    \"\"\"
    if not balances:
        return 0
    
    # Konversi ke numpy array untuk perhitungan cepat
    equity_curve = np.array(balances)
    
    # Hitung running maximum (titik tertinggi sejauh ini)
    peak = np.maximum.accumulate(equity_curve)
    
    # Hitung drawdown (selisih dari peak)
    # Avoid division by zero
    drawdown = np.zeros_like(peak)
    mask = peak > 0
    drawdown[mask] = (equity_curve[mask] - peak[mask]) / peak[mask]
    
    # Ambil nilai minimum (penurunan paling dalam)
    max_drawdown = np.min(drawdown) if len(drawdown) > 0 else 0
    
    return max_drawdown * 100 # Dalam persen

def calculate_detailed_metrics(item_equity, trades, df, initial_balance):
    \"\"\"
    Menghitung metrik backtest komprehensif.
    \"\"\"
    if df.empty or not trades:
        return {}

    # Basic Data
    start_price = df.iloc[0]['close']
    end_price = df.iloc[-1]['close']
    start_time = pd.to_datetime(df.iloc[0]['time'])
    end_time = pd.to_datetime(df.iloc[-1]['time'])
    duration_days = (end_time - start_time).days
    duration_years = duration_days / 365.25
    duration_months = duration_days / 30.44

    final_equity = item_equity[-1] if item_equity else initial_balance
    total_return_abs = final_equity - initial_balance
    agent_return_pct = (total_return_abs / initial_balance) * 100
    
    # 1. Buy & Hold Return
    buy_hold_return = ((end_price - start_price) / start_price) * 100
    
    # 2. Sell & Hold Return (Inverse)
    sell_hold_return = ((start_price - end_price) / start_price) * 100
    
    # 3. Process Trades for Win/Loss/Duration
    wins = []
    losses = []
    durations = []
    
    # Reconstruct paired trades
    current_buy = None
    
    for t in trades:
        if 'BUY' in t['type']:
            current_buy = t
        elif 'SELL' in t['type'] and current_buy:
            pnl = t['balance'] - current_buy['balance']
            
            # Duration
            t_time = pd.to_datetime(t['time'])
            o_time = pd.to_datetime(current_buy['time'])
            duration_mins = (t_time - o_time).total_seconds() / 60
            durations.append(duration_mins)
            
            if pnl > 0:
                wins.append(pnl)
            else:
                losses.append(abs(pnl))
            
            current_buy = None

    total_trades = len(wins) + len(losses)
    win_rate = (len(wins) / total_trades * 100) if total_trades > 0 else 0
    
    avg_win = np.mean(wins) if wins else 0
    avg_loss = np.mean(losses) if losses else 0
    
    # Risk:Reward Ratio
    risk_reward = (avg_win / avg_loss) if avg_loss > 0 else 0
    
    # Profit Factor
    gross_profit = sum(wins)
    gross_loss = sum(losses)
    profit_factor = (gross_profit / gross_loss) if gross_loss > 0 else 0
    
    # Expectancy (R)
    expectancy = (len(wins)/total_trades * avg_win) - (len(losses)/total_trades * avg_loss) if total_trades > 0 else 0

    # Avg Profit / Month
    avg_profit_month = total_return_abs / duration_months if duration_months > 0 else 0

    # Time in Market
    total_time_in_market_min = sum(durations)
    total_period_min = duration_days * 24 * 60
    time_in_market_pct = (total_time_in_market_min / total_period_min * 100) if total_period_min > 0 else 0
    
    # 4. Returns & Ratios (Sharpe, Calmar)
    equity_series = pd.Series(item_equity)
    returns = equity_series.pct_change().dropna()
    
    # Sharpe Ratio (Annualized) - Assuming hourly data approx
    n_periods = 252 * 24 
    if returns.std() != 0:
        sharpe = (returns.mean() / returns.std()) * np.sqrt(n_periods)
    else:
        sharpe = 0
        
    # Max Drawdown
    mdd_pct = calculate_max_drawdown(item_equity)
    
    # Calmar Ratio
    annualized_return_pct = ((final_equity / initial_balance) ** (365/duration_days) - 1) * 100 if duration_days > 0 else 0
    calmar = abs(annualized_return_pct / mdd_pct) if mdd_pct != 0 else 0

    return {
        "agent_return": agent_return_pct,
        "avg_profit_month": avg_profit_month,
        "buy_hold_return": buy_hold_return,
        "sell_hold_return": sell_hold_return,
        "sharpe_ratio": sharpe,
        "calmar_ratio": calmar,
        "profit_factor": profit_factor,
        "expectancy": expectancy,
        "risk_reward": risk_reward,
        "max_drawdown": mdd_pct,
        "win_rate": win_rate,
        "avg_win": avg_win,
        "avg_loss": avg_loss,
        "time_in_market": time_in_market_pct,
        "total_trades": total_trades,
        "buy_count": len([t for t in trades if 'BUY' in t['type']]),
        "sell_count": len([t for t in trades if 'SELL' in t['type']])
    }

def print_metrics_report(metrics):
    print("\n" + "="*40)
    print("BACKTEST PERFORMANCE REPORT")
    print("="*40)
    print(f"{'Agent Return':<25}: {metrics.get('agent_return', 0):.2f}%")
    print(f"{'Avg Profit/Month':<25}: ${metrics.get('avg_profit_month', 0):.2f}")
    print(f"{'Buy & Hold Return':<25}: {metrics.get('buy_hold_return', 0):.2f}%")
    print(f"{'Sell & Hold Return':<25}: {metrics.get('sell_hold_return', 0):.2f}%")
    print("-" * 40)
    print(f"{'Sharpe Ratio':<25}: {metrics.get('sharpe_ratio', 0):.2f}")
    print(f"{'Calmar Ratio':<25}: {metrics.get('calmar_ratio', 0):.2f}")
    print(f"{'Profit Factor':<25}: {metrics.get('profit_factor', 0):.2f}")
    print(f"{'Expectancy (R)':<25}: ${metrics.get('expectancy', 0):.2f}")
    print(f"{'Risk:Reward Ratio':<25}: {metrics.get('risk_reward', 0):.2f}")
    print("-" * 40)
    print(f"{'Max Drawdown':<25}: {metrics.get('max_drawdown', 0):.2f}%")
    print(f"{'Win Rate':<25}: {metrics.get('win_rate', 0):.2f}%")
    print(f"{'Avg Win':<25}: ${metrics.get('avg_win', 0):.2f}")
    print(f"{'Avg Loss':<25}: ${metrics.get('avg_loss', 0):.2f}")
    print("-" * 40)
    print(f"{'Time in Market':<25}: {metrics.get('time_in_market', 0):.2f}%")
    print(f"{'Total Trades':<25}: {metrics.get('total_trades', 0)}")
    print(f"{'Orders (BUY:SELL)':<25}: {metrics.get('buy_count', 0)} : {metrics.get('sell_count', 0)}")
    print("="*40 + "\n")

def plot_backtest_results(df, history):
    try:
        import matplotlib.pyplot as plt
        balances = [t['balance'] for t in history if 'balance' in t]
        if not balances:
            print("No balance history to plot")
            return
        plt.figure(figsize=(12, 6))
        plt.plot(range(len(balances)), balances, label='Equity ($)', color='green')
        plt.title('Backtest Equity Curve')
        plt.xlabel('Trade #')
        plt.ylabel('Balance ($)')
        plt.grid(True, alpha=0.3)
        plt.legend()
        plt.savefig('backtest_equity.png')
        print("üìà Equity curve saved to 'backtest_equity.png'")
    except ImportError:
        print("‚ö†Ô∏è Matplotlib not installed. Skipping Graph.")

def run_backtest_v2(engine, symbol="BTC-USD", period="3mo", interval="1h"):
    print(f"Strategy: The Trinity Hunter (Tier 7 - RL+CNN+LSTM)")
    
    # 1. Fetch Historical Data
    raw_data = get_historical_data(symbol, period=period, interval=interval)
    
    if "error" in raw_data:
        print(f"‚ùå Error fetching data: {raw_data['error']}")
        return 0, [], pd.DataFrame()

    df = pd.DataFrame(raw_data["data"])
    if df.empty:
        print("‚ùå No data received.")
        return 0, [], pd.DataFrame()

    # --- FEATURE ENGINEERING ---
    from ai_engine import add_indicators
    df = add_indicators(df)
    print(f"‚úÖ Loaded {len(df)} candles with Indicators.")

    initial_balance = 1000.0
    balance = initial_balance
    position = 0.0
    trades = []
    
    fee_rate = 0.001
    entry_price = 0.0
    
    # Buffer needed for indicators (200 for EMA200) + sequence length
    start_idx = max(engine.seq_length + 20, 205)
    
    equity_history = []
    
    # Pre-fill equity history for accurate metrics
    for _ in range(start_idx):
        equity_history.append(initial_balance)
    
    print("‚è≥ Running simulation (Trinity Ensemble)...")
    
    for i in range(start_idx, len(df)):
        current_window_df = df.iloc[:i]
        current_candles = current_window_df.to_dict('records')
        
        current_price = float(df.iloc[i]['close'])
        current_low = float(df.iloc[i]['low'])
        current_high = float(df.iloc[i]['high'])
        timestamp = df.iloc[i]['time']
        
        executed_trade = False
        
        # 1. Check Open Position (Exit Logic)
        if position > 0:
            pnl_pct = (current_price - entry_price) / entry_price
            
            # Dynamic TP/SL or Signal Exit?
            # Basic Safety TP/SL
            tp_hit = current_high >= entry_price * 1.06
            sl_hit = current_low <= entry_price * 0.95 # Looser SL for Swing
            
            # Ask AI for Exit Signal (Trinity)
            # We need to construct State Vector for RL
            prob = engine.predict_next_move(current_candles)
            
            # Construct State Vector [close_norm, rsi, ema, lstm_prob, pos, bal, lev]
            recent_prices = df['close'].iloc[max(0, i-100):i+1]
            if recent_prices.max() == recent_prices.min():
                 close_norm = 0.5
            else:
                 close_norm = (current_price - recent_prices.min()) / (recent_prices.max() - recent_prices.min())
            
            rsi_norm = df.iloc[i]['rsi'] / 100.0
            ema_diff = df.iloc[i]['ema_diff']
            pos_state = 1
            bal_norm = balance / initial_balance # Approximation
            
            # RL State
            state_vector = np.array([
               close_norm, rsi_norm, ema_diff, prob, pos_state, bal_norm, 0.2
            ], dtype=np.float32)
            
            # Fix State Vector shape issues if any
            if state_vector.shape != (7,):
                 state_vector = np.zeros(7, dtype=np.float32)

            action_code, confidence = engine.decide_action(prob, state_vector, current_candles)
            
            exit_reason = ""
            exit_price = current_price
            
            if tp_hit:
                exit_reason = "TP (+6%)"
                exit_price = entry_price * 1.06
            elif sl_hit:
                exit_reason = "SL (-5%)"
                exit_price = entry_price * 0.95
            elif "SELL" in action_code:
                # Only exit on Sell signal if confidence is decent
                if confidence > 45:
                     exit_reason = f"AI Sell ({confidence}%)"
                     exit_price = current_price
            
            if exit_reason:
                sell_val = position * exit_price
                fee = sell_val * fee_rate
                balance = sell_val - fee
                trades.append({
                    "time": timestamp, "type": "SELL", "price": exit_price,
                    "balance": balance, "reason": exit_reason
                })
                position = 0
                entry_price = 0
                executed_trade = True

        # 2. Check Entry
        if not executed_trade and position == 0:
            prob = engine.predict_next_move(current_candles)
            
            # Construct State Vector
            recent_prices = df['close'].iloc[max(0, i-100):i+1]
            if recent_prices.max() == recent_prices.min():
                 close_norm = 0.5
            else:
                 close_norm = (current_price - recent_prices.min()) / (recent_prices.max() - recent_prices.min())
            
            rsi_norm = df.iloc[i]['rsi'] / 100.0
            ema_diff = df.iloc[i]['ema_diff']
            pos_state = 0
            bal_norm = balance / initial_balance
            
            state_vector = np.array([
               close_norm, rsi_norm, ema_diff, prob, pos_state, bal_norm, 0
            ], dtype=np.float32)
            
            if state_vector.shape != (7,):
                 state_vector = np.zeros(7, dtype=np.float32)
            
            # Trinity Decision
            action_code, confidence = engine.decide_action(prob, state_vector, current_candles)
            
            if "BUY" in action_code and balance > 10:
                # Extract Leverage if present (e.g. BUY_3x)
                leverage = 1
                if "_" in action_code:
                     try:
                         leverage = int(action_code.split("_")[1].replace("x",""))
                     except:
                         leverage = 1
                
                # Filter weak confidence
                if confidence > 55:
                    buy_val = balance
                    fee = buy_val * fee_rate
                    net_buy = buy_val - fee
                    
                    position = net_buy / current_price
                    balance = 0
                    entry_price = current_price
                    
                    trades.append({
                        "time": timestamp, "type": f"BUY ({leverage}x Signal)",
                        "price": current_price, "confidence": confidence,
                        "balance": balance, "reason": f"Trinity {action_code} ({confidence}%)"
                    })

        # TRACK EQUITY
        current_val = balance + (position * current_price) if position > 0 else balance
        equity_history.append(current_val)

    # Final Value
    final_equity = balance + (position * df.iloc[-1]['close'])
    if equity_history:
        equity_history[-1] = final_equity
    
    # Calculate Detailed Metrics
    metrics = calculate_detailed_metrics(equity_history, trades, df, initial_balance)
    print_metrics_report(metrics)
    
    return final_equity, trades, df
""")

üìÑ Written: backend/services/backtest_service.py


## 3. Start Training
Train the models sequentially.


In [35]:
!python backend/train_cnn.py

UnboundLocalError: cannot access local variable 'child' where it is not associated with a value

In [36]:
!python backend/train_rl_agent.py

  File [35m"/Users/weka/Learning/predictx/backend/train_rl_agent.py"[0m, line [35m17[0m
    \[1;31m"[0m\"\"
     [1;31m^[0m
[1;35mSyntaxError[0m: [35munexpected character after line continuation character[0m


## 4. Download Trained Models
Zip and download the models to your local machine.


In [31]:
!zip -r trained_models.zip backend/models
from google.colab import files
files.download('trained_models.zip')

updating: backend/models/ (stored 0%)
updating: backend/models/predictx_v2.pth (deflated 7%)
updating: backend/models/ppo_agent.zip (stored 0%)
updating: backend/models/cnn_pattern_v2.pth (deflated 9%)
updating: backend/models/cnn_pattern_v1.pth (deflated 9%)
updating: backend/models/lstm_v1.pth (deflated 8%)
updating: backend/models/scaler_v2.pkl (deflated 24%)


ModuleNotFoundError: No module named 'google.colab'