## Runtime Configuration

In [1]:
# User-configurable parameters
symbols_to_predict = ['EURUSD=X', 'GBPUSD=X']
date_start = '2015-01-01'
date_end = '2024-01-01'
lookback_window = 20  # Number of days for the rolling input window


In [2]:
for symbol_to_predict in symbols_to_predict:
    print(f"\n🔁 Processing symbol: {symbol_to_predict}")



🔁 Processing symbol: EURUSD=X

🔁 Processing symbol: GBPUSD=X


# 📈 CNN + LSTM Forex Prediction using RCS + Technical Indicators
This notebook builds a CNN+LSTM hybrid model to predict EURUSD price direction using Relative Currency Strength and common technical indicators.

## Step 1: Install and Import Required Libraries

In [8]:
    !pip install yfinance ta scikit-learn pandas numpy matplotlib seaborn

    import yfinance as yf
    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    import seaborn as sns
    import ta
    from sklearn.preprocessing import StandardScaler
    from sklearn.model_selection import train_test_split, TimeSeriesSplit
    from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
    import tensorflow as tf
    from tensorflow.keras.models import Model
    from tensorflow.keras.layers import Input, Conv1D, LSTM, Dense, Dropout, BatchNormalization, concatenate



ModuleNotFoundError: No module named 'tensorflow'

## Optional: Download Data from Polygon.io (if API key is available)

In [9]:
    import os
    import requests
    from datetime import datetime, timedelta

    POLYGON_API_KEY = os.getenv("POLYGON_API_KEY")  # set in your environment

    def get_polygon_data(symbol, multiplier=1, timespan="day", from_date="2015-01-01", to_date="2024-01-01", limit=5000):
        url = f"https://api.polygon.io/v2/aggs/ticker/C:{symbol}/range/{multiplier}/{timespan}/{from_date}/{to_date}"
        params = {
            "adjusted": "true",
            "sort": "asc",
            "limit": limit,
            "apiKey": POLYGON_API_KEY
        }
        r = requests.get(url, params=params)
        if r.status_code != 200:
            print(f"Failed to download {symbol}: {r.status_code}")
            return None
        data = r.json().get("results", [])
        df = pd.DataFrame(data)
        df['t'] = pd.to_datetime(df['t'], unit='ms')
        df.set_index('t', inplace=True)
        df = df.rename(columns={'c': 'close'})
        return df[['close']]

    # Example: download EURUSD if API key is present
    if POLYGON_API_KEY:
        print("🔄 Using Polygon.io for data...")
        symbols = ['EURUSD', 'GBPUSD', 'USDJPY', 'AUDUSD', 'USDCAD', 'EURJPY', 'GBPJPY']
        data = {}
        for sym in symbols:
            df = get_polygon_data(sym, from_date="2015-01-01", to_date="2024-01-01")
            if df is not None:
                data[sym] = df.rename(columns={'close': sym})
        if len(data) >= 3:
            prices = pd.concat(data.values(), axis=1).dropna()
        else:
            print("⚠️ Not enough Polygon data, falling back to Yahoo Finance.")
    else:
        print("ℹ️ POLYGON_API_KEY not set, using Yahoo Finance.")

ℹ️ POLYGON_API_KEY not set, using Yahoo Finance.


## Step 2: Download OHLC Data from Yahoo Finance

In [10]:
    symbols = ['EURUSD=X', 'GBPUSD=X', 'USDJPY=X', 'AUDUSD=X', 'USDCAD=X', 'EURJPY=X', 'GBPJPY=X']
    data = {}
    for sym in symbols:
        df = yf.download(sym, start='2015-01-01', end='2024-01-01')
        df = df[['Close']].rename(columns={'Close': sym})
        data[sym] = df
    prices = pd.concat(data.values(), axis=1)
    prices.columns = symbols
    prices.dropna(inplace=True)
    prices.head()

Failed to get ticker 'EURUSD=X' reason: Expecting value: line 1 column 1 (char 0)
[*********************100%%**********************]  1 of 1 completed

1 Failed download:
['EURUSD=X']: YFTzMissingError('$%ticker%: possibly delisted; No timezone found')
Failed to get ticker 'GBPUSD=X' reason: Expecting value: line 1 column 1 (char 0)
[*********************100%%**********************]  1 of 1 completed

1 Failed download:
['GBPUSD=X']: YFTzMissingError('$%ticker%: possibly delisted; No timezone found')
Failed to get ticker 'USDJPY=X' reason: Expecting value: line 1 column 1 (char 0)
[*********************100%%**********************]  1 of 1 completed

1 Failed download:
['USDJPY=X']: YFTzMissingError('$%ticker%: possibly delisted; No timezone found')
Failed to get ticker 'AUDUSD=X' reason: Expecting value: line 1 column 1 (char 0)
[*********************100%%**********************]  1 of 1 completed

1 Failed download:
['AUDUSD=X']: YFTzMissingError('$%ticker%: possibly delisted; No timez

Unnamed: 0_level_0,EURUSD=X,GBPUSD=X,USDJPY=X,AUDUSD=X,USDCAD=X,EURJPY=X,GBPJPY=X
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1


## Step 3: Calculate Log Returns and Relative Currency Strength (RCS)

In [11]:
    def compute_rcs(logrets):
        currencies = list(set([s[:3] for s in logrets.columns] + [s[3:6] for s in logrets.columns]))
        rcs_data = {c: [] for c in currencies}
        for i in range(len(logrets)):
            row = logrets.iloc[i]
            daily_strength = {c: 0 for c in currencies}
            counts = {c: 0 for c in currencies}
            for pair, ret in row.items():
                base, quote = pair[:3], pair[3:]
                daily_strength[base] += ret
                daily_strength[quote] -= ret
                counts[base] += 1
                counts[quote] += 1
            for c in currencies:
                avg = daily_strength[c] / counts[c] if counts[c] else 0
                rcs_data[c].append(avg)
        return pd.DataFrame(rcs_data, index=logrets.index)

    log_returns = np.log(prices / prices.shift(1)).dropna()
    rcs = compute_rcs(log_returns)
    rcs.head()

Unnamed: 0_level_0,JPY,USD,AUD,GBP,EUR,CAD
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1


## Step 4: Calculate Technical Indicators

In [None]:
    tech_df = prices.copy()
    target = tech_df['EURUSD=X'].shift(-1) > tech_df['EURUSD=X']
    target = target.astype(int).iloc[20:-1]

    indicators = pd.DataFrame(index=tech_df.index)
    indicators['rsi'] = ta.momentum.RSIIndicator(close=tech_df['EURUSD=X']).rsi()
    indicators['macd'] = ta.trend.MACD(tech_df['EURUSD=X']).macd()
    indicators['momentum'] = ta.momentum.ROCIndicator(close=tech_df['EURUSD=X']).roc()
    indicators['cci'] = ta.trend.CCIIndicator(high=tech_df['EURUSD=X'], low=tech_df['EURUSD=X'], close=tech_df['EURUSD=X']).cci()
    indicators = indicators.join(rcs).dropna()

    features = indicators.iloc[20:-1]
    features = StandardScaler().fit_transform(features)

    X = np.array([features[i-20:i] for i in range(20, len(features))])
    y = target.values
    X.shape, y.shape

In [None]:
# Additional Feature Engineering
from ta.volatility import BollingerBands, AverageTrueRange
from ta.trend import ADXIndicator
from ta.momentum import StochasticOscillator, ROCIndicator
from ta.utils import dropna

# Ensure indicators have no NaNs
data = dropna(indicators)

# Expanded Technical Indicators
data['atr'] = AverageTrueRange(high=data['high'], low=data['low'], close=data['close']).average_true_range()
data['adx'] = ADXIndicator(high=data['high'], low=data['low'], close=data['close']).adx()
data['stoch_k'] = StochasticOscillator(high=data['high'], low=data['low'], close=data['close']).stoch()
data['stoch_d'] = StochasticOscillator(high=data['high'], low=data['low'], close=data['close']).stoch_signal()
data['roc'] = ROCIndicator(close=data['close']).roc()
data['bbw'] = BollingerBands(close=data['close']).bollinger_wband()

# Lagged/Engineered Features
data['return_1d'] = data['close'].pct_change(1)
data['return_3d'] = data['close'].pct_change(3)
data['rolling_mean_5'] = data['close'].rolling(window=5).mean()
data['rolling_std_5'] = data['close'].rolling(window=5).std()
data['momentum_slope'] = data['close'].diff(1)

# Placeholder Macro & Cross-Asset Features
data['dxy'] = prices.get('DXY', pd.Series(index=data.index)).ffill()
data['vix'] = prices.get('^VIX', pd.Series(index=data.index)).ffill()
data['spx'] = prices.get('^GSPC', pd.Series(index=data.index)).ffill()
data['gold_oil_ratio'] = prices.get('GC=F', pd.Series(index=data.index)) / prices.get('CL=F', pd.Series(index=data.index))

# Time Features
data['day_of_week'] = data.index.dayofweek
data['month'] = data.index.month

# Recombine indicators
indicators = data.dropna()


## Step 5: Train/Test Split

In [None]:
    split = int(len(X) * 0.8)
    X_train, X_test = X[:split], X[split:]
    y_train, y_test = y[:split], y[split:]

## Step 6: Build CNN + LSTM Model

In [None]:
    input_layer = Input(shape=(X.shape[1], X.shape[2]))
    x = Conv1D(filters=64, kernel_size=3, activation='relu')(input_layer)
    x = BatchNormalization()(x)
    x = Conv1D(filters=32, kernel_size=3, activation='relu')(x)
    x = LSTM(64, return_sequences=False)(x)
    x = Dropout(0.3)(x)
    output = Dense(1, activation='sigmoid')(x)

    model = Model(inputs=input_layer, outputs=output)
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    model.summary()

## Step 7: Train the Model

In [None]:
    history = model.fit(X_train, y_train, epochs=10, batch_size=32, validation_split=0.1)

## Step 8: Evaluate the Model

In [None]:
    y_pred = (model.predict(X_test) > 0.5).astype(int)
    print("Accuracy:", accuracy_score(y_test, y_pred))
    print(confusion_matrix(y_test, y_pred))
    print(classification_report(y_test, y_pred))

## Step 9: Export Trained Model

In [None]:
    # Save model as .h5
    model.save("cnn_lstm_model.h5")

    # Export to ONNX
    import keras2onnx
    onnx_model = keras2onnx.convert_keras(model, model.name)
    import onnx
    onnx.save_model(onnx_model, "cnn_lstm_model.onnx")
    print("✅ Model exported to cnn_lstm_model.h5 and cnn_lstm_model.onnx")

In [None]:
    # Save model
    model.save(f"model_{symbol_to_predict}.h5")

    # Export to ONNX
    import keras2onnx
    onnx_model = keras2onnx.convert_keras(model, model.name)
    import onnx
    onnx.save_model(onnx_model, f"model_{symbol_to_predict}.onnx")
    print(f"✅ Exported: model_{symbol_to_predict}.h5 and .onnx")


In [None]:
    # Simple backtest
    probs = model.predict(X_test).flatten()
    threshold = 0.5
    signals = np.where(probs > threshold, 1, -1)
    ret = np.log(prices[symbol_to_predict].shift(-1) / prices[symbol_to_predict]).iloc[-len(signals):]
    strategy_returns = ret.values * signals

    cum_ret = np.cumsum(strategy_returns)
    cum_bh = np.cumsum(ret.values)

    plt.figure(figsize=(12,5))
    plt.plot(cum_ret, label=f'{symbol_to_predict} Strategy')
    plt.plot(cum_bh, label='Buy & Hold', linestyle='--')
    plt.title(f"Cumulative Returns: {symbol_to_predict}")
    plt.legend()
    plt.grid(True)
    plt.show()


In [None]:
print("✅ All models completed.")

## Step 11: Feature Importance via Permutation

In [None]:
import copy
from sklearn.metrics import accuracy_score

def compute_permutation_importance(model, X_val, y_val, feature_names):
    base_preds = (model.predict(X_val) > 0.5).astype(int)
    base_acc = accuracy_score(y_val, base_preds)
    importances = []

    for i in range(X_val.shape[2]):
        X_permuted = copy.deepcopy(X_val)
        np.random.shuffle(X_permuted[:, :, i])
        perm_preds = (model.predict(X_permuted) > 0.5).astype(int)
        perm_acc = accuracy_score(y_val, perm_preds)
        importance = base_acc - perm_acc
        importances.append((feature_names[i], importance))

    return sorted(importances, key=lambda x: x[1], reverse=True)

# Extract last 20 days of indicators used (same as model inputs)
feature_names = indicators.columns.tolist()

# Compute and display importances
importances = compute_permutation_importance(model, X_test, y_test, feature_names)
importance_df = pd.DataFrame(importances, columns=["Feature", "Importance"])

plt.figure(figsize=(10,5))
sns.barplot(data=importance_df, x="Importance", y="Feature")
plt.title(f"Feature Importance via Permutation: {symbol_to_predict}")
plt.grid(True)
plt.show()


## Step 12: Feature Set Comparison

In [None]:
# --- SWITCHABLE FEATURE SELECTION ENGINE ---

import shap
from sklearn.ensemble import RandomForestClassifier
from sklearn.inspection import permutation_importance

# Select strategy
feature_selection_strategy = "manual"  # Options: "manual", "permutation", "shap"

# Define feature sets to test (used if manual)
manual_feature_sets = {
    "RCS only": rcs.columns.tolist(),
    "Indicators only": ['rsi', 'macd', 'momentum', 'cci'],
    "RCS + RSI + MACD": rcs.columns.tolist() + ['rsi', 'macd'],
    "All features": indicators.columns.tolist()
}

# Prepare unified data matrix
feature_matrix = indicators.copy()
target = (tech_df[symbol_to_predict].shift(-1) > tech_df[symbol_to_predict]).astype(int).iloc[20:-1]
feature_matrix = feature_matrix.iloc[20:-1]
scaled_features = StandardScaler().fit_transform(feature_matrix)
X = np.array([scaled_features[i-lookback_window:i] for i in range(lookback_window, len(scaled_features))])
y = target.values[lookback_window:]

selected_feature_sets = {}

if feature_selection_strategy == "manual":
    selected_feature_sets = manual_feature_sets

elif feature_selection_strategy == "permutation":
    print("🔁 Running permutation-based feature importance...")
    rf = RandomForestClassifier(n_estimators=100, random_state=42)
    rf.fit(feature_matrix, target)
    result = permutation_importance(rf, feature_matrix, target, n_repeats=10, random_state=42)
    importances = pd.Series(result.importances_mean, index=feature_matrix.columns).sort_values(ascending=False)
    top_feats = importances.head(10).index.tolist()
    selected_feature_sets = {"Top 10 Permutation": top_feats}

elif feature_selection_strategy == "shap":
    print("📊 Running SHAP-based feature importance...")
    rf = RandomForestClassifier(n_estimators=100, random_state=42)
    rf.fit(feature_matrix, target)
    explainer = shap.TreeExplainer(rf)
    shap_values = explainer.shap_values(feature_matrix)
    shap_sum = np.abs(shap_values[1]).mean(axis=0)
    shap_importance = pd.Series(shap_sum, index=feature_matrix.columns).sort_values(ascending=False)
    top_feats = shap_importance.head(10).index.tolist()
    selected_feature_sets = {"Top 10 SHAP": top_feats}

print("✅ Selected feature sets:")
print(selected_feature_sets)


In [None]:
# Identify best by accuracy
best_row = results_df.sort_values(by='Accuracy', ascending=False).iloc[0]
results_df['Selected Features'] = list(feature_sets.values())

# Save all results and best row to CSV
results_df.to_csv(f'feature_set_results_{symbol_to_predict}.csv', index=False)
best_row.to_frame().T.to_csv(f'best_feature_set_{symbol_to_predict}.csv', index=False)

print(f"📁 Saved results to feature_set_results_{symbol_to_predict}.csv")
print(f"🏆 Best set saved to best_feature_set_{symbol_to_predict}.csv")


In [None]:
import argparse

def cli_main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--mode", choices=["train", "export"], default="train", help="Run training or export")
    parser.add_argument("--feature_strategy", choices=["manual", "permutation", "shap"], default="manual")
    parser.add_argument("--symbol", type=str, default="EURUSD")
    args = parser.parse_args()

    global feature_selection_strategy, symbol_to_predict
    feature_selection_strategy = args.feature_strategy
    symbol_to_predict = args.symbol

    if args.mode == "train":
        print(f"🔁 Running training for {symbol_to_predict} using '{feature_selection_strategy}' features...")
        # Main block is already under __name__ guard
        if __name__ == "__main__":
            pass  # triggers main loop
    elif args.mode == "export":
        print("🧠 Exporting model (not implemented here). Use inference.py.")

if __name__ == "__main__":
    cli_main()


In [None]:
from data_loader import load_or_fetch, load_twelve_data

# Example with Twelve Data (can be switched based on CLI/config)
symbol = config['symbol']
provider = config.get('provider', 'twelvedata')
api_key = config['api_keys'][provider]

# Use selected loader function
provider_loaders = {
    "twelvedata": load_twelve_data,
    "polygon": load_polygon_data,
    "alphavantage": load_alpha_vantage,
    "currencystack": load_currencystack,
    "tiingo": load_tiingo
}

df = load_or_fetch(
    symbol=symbol,
    provider=provider,
    loader_func=provider_loaders[provider],
    api_key=api_key,
    interval=config.get("interval", "1min"),
    outputsize=config.get("outputsize", 500),
)


In [None]:
import os
import tensorflow as tf

def export_models(model, model_name="best_model"):
    # Save Keras HDF5 model
    h5_path = f"{model_name}.h5"
    model.save(h5_path)
    print(f"✅ Saved Keras model to: {h5_path}")

    # Save ONNX model
    try:
        import tf2onnx
        import onnx

        spec = (tf.TensorSpec((None, *model.input.shape[1:]), tf.float32),)
        onnx_model, _ = tf2onnx.convert.from_keras(model, input_signature=spec, opset=13)
        onnx_path = f"{model_name}.onnx"
        onnx.save(onnx_model, onnx_path)
        print(f"✅ Saved ONNX model to: {onnx_path}")
    except Exception as e:
        print("⚠️ ONNX export failed:", e)


In [None]:
if __name__ == "__main__":
    # --- RUN TRAINING AND EVALUATION ---
    results = []

    for test_name, selected_features in selected_feature_sets.items():
        print(f"\n🔎 Testing feature set: {test_name}")
        X_selected = indicators[selected_features].dropna()
        y_target = (tech_df[symbol_to_predict].shift(-1) > tech_df[symbol_to_predict]).astype(int)
        common_index = X_selected.index.intersection(y_target.index)
        X_selected = X_selected.loc[common_index]
        y_target = y_target.loc[common_index]

        # Scale features
        X_scaled = StandardScaler().fit_transform(X_selected)

        # Create sequences for LSTM
        X_seq = np.array([X_scaled[i-lookback_window:i] for i in range(lookback_window, len(X_scaled))])
        y_seq = y_target.values[lookback_window:]

        # Train/test split
        split_index = int(len(X_seq) * 0.8)
        X_train, X_test = X_seq[:split_index], X_seq[split_index:]
        y_train, y_test = y_seq[:split_index], y_seq[split_index:]

        # Reshape for CNN+LSTM
        X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], X_train.shape[2], 1))
        X_test = X_test.reshape((X_test.shape[0], X_test.shape[1], X_test.shape[2], 1))

        # Model
        model = create_cnn_lstm_model(X_train.shape[1:])
        history = model.fit(X_train, y_train, epochs=10, batch_size=32, verbose=0, validation_split=0.1)
        accuracy = model.evaluate(X_test, y_test, verbose=0)[1]

        # Predict returns
        preds = model.predict(X_test).flatten()
        predicted_signal = (preds > 0.5).astype(int)
        actual_returns = tech_df[symbol_to_predict].pct_change().fillna(0).values[-len(predicted_signal):]
        strategy_return = (predicted_signal * actual_returns).sum()

        print(f"✅ Accuracy: {accuracy:.4f}, Return: {strategy_return:.4f}")
        results.append((test_name, accuracy, strategy_return, selected_features))

    # Save and export results
    results_df = pd.DataFrame(results, columns=["Feature Set", "Accuracy", "Return", "Features"])
    results_df.to_csv(f'feature_set_results_{symbol_to_predict}.csv', index=False)
    best_row = results_df.sort_values(by='Accuracy', ascending=False).iloc[0]
    best_row.to_frame().T.to_csv(f'best_feature_set_{symbol_to_predict}.csv', index=False)
    print("📁 Results saved.")
