In [None]:
import yfinance as yf
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, LSTM, Dense, Dropout
from gpflow.kernels import Matern32
from gpflow.models import GPR
from gpflow import set_trainable
from sklearn.preprocessing import StandardScaler
import vectorbt as vbt

pd.set_option('display.float_format', lambda x: '%.6f' % x)

def fetch_and_process_data(ticker, start_date="2024-01-01", end_date="2024-12-31", train_ratio=0.8):
    try:
        data = yf.download(ticker, start=start_date, end=end_date)
        if data.empty or "Adj Close" not in data:
            raise ValueError(f"No valid data found for {ticker}")
        
        close = data["Adj Close"]
        split_index = int(len(close) * train_ratio)
        
        price_scale = 1000 if close.mean() < 0.1 else 1
        scaled_close = close * price_scale
        
        # Use percentage returns if price is high enough, else use log returns
        returns = scaled_close.pct_change().dropna() if close.mean() > 0.1 else np.log(scaled_close / scaled_close.shift(1)).dropna()
        
        train_returns = returns.iloc[:split_index]
        test_returns = returns.iloc[split_index:]
        
        scaler = StandardScaler()
        scaler.fit(train_returns.values.reshape(-1, 1))
        
        train_std = scaler.transform(train_returns.values.reshape(-1, 1))
        test_std = scaler.transform(test_returns.values.reshape(-1, 1))
        
        std_returns = np.concatenate([train_std, test_std]).flatten()
        
        return close[returns.index], pd.Series(std_returns, index=returns.index), split_index
    
    except Exception as e:
        raise RuntimeError(f"Error processing {ticker}: {str(e)}")

def generate_signals(returns, split_index, epochs=50):
    X = np.arange(len(returns), dtype=np.float64).reshape(-1, 1)
    y = returns.values.reshape(-1, 1)
    
    X_train, X_test = X[:split_index], X[split_index:]
    y_train, y_test = y[:split_index], y[split_index:]

    kernel = Matern32()
    gpr = GPR(data=(X_train, y_train), kernel=kernel)
    set_trainable(gpr.likelihood.variance, False)
    
    trend_train = gpr.predict_f(X_train)[0].numpy().flatten()
    trend_test = gpr.predict_f(X_test)[0].numpy().flatten()
    trend = np.concatenate([trend_train, trend_test])
    
    # Combine original returns and trend as features
    features = np.hstack([returns.values.reshape(-1, 1), trend.reshape(-1, 1)])
    train_features = features[:split_index]
    train_labels = y[:split_index].flatten()
    
    model = Sequential([
        Input(shape=(1, 2)),
        LSTM(64, return_sequences=True),
        Dropout(0.2),
        LSTM(32),
        Dense(1, activation="tanh")
    ])
    
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss="mse")
    model.fit(
        train_features.reshape((train_features.shape[0], 1, train_features.shape[1])),
        train_labels,
        epochs=epochs,
        batch_size=32,
        verbose=0
    )
    
    all_features = features.reshape((features.shape[0], 1, features.shape[1]))
    predicted_signals = model.predict(all_features).flatten()
    # Clip the output to be between -1 and 1
    return np.clip(predicted_signals, -1, 1)

def backtest_strategy(close, positions, entry_threshold=0.2, exit_threshold=-0.2):
    # signals will be True if the positions exceed (or undercut) the thresholds
    entries = positions > entry_threshold
    exits = positions < exit_threshold
    
    pf = vbt.Portfolio.from_signals(
        close=close,
        entries=entries,
        exits=exits,
        size=np.abs(positions),
        freq="1D"
    )
    return pf

def optimize_parameters(ticker="BTC-USD"):
    # Define candidate parameters for grid search
    epochs_list = [30, 50, 70]
    entry_thresholds = [0.15, 0.2, 0.25]
    exit_thresholds = [-0.15, -0.2, -0.25]
    
    best_config = None
    best_return = -np.inf
    results = []
    
    close, returns, split_index = fetch_and_process_data(ticker)
    
    for epochs in epochs_list:
        # Use the same signals generation method but vary the LSTM training epochs
        positions = generate_signals(returns, split_index, epochs=epochs)
        for entry_thresh in entry_thresholds:
            for exit_thresh in exit_thresholds:
                # Backtest with current parameters
                pf = backtest_strategy(close, positions,
                                        entry_threshold=entry_thresh,
                                        exit_threshold=exit_thresh)
                tot_ret = pf.total_return()
                sharpe = pf.sharpe_ratio()
                max_dd = pf.max_drawdown()
                config = {
                    'epochs': epochs,
                    'entry_threshold': entry_thresh,
                    'exit_threshold': exit_thresh,
                    'Total Return': tot_ret,
                    'Sharpe Ratio': sharpe,
                    'Max Drawdown': max_dd
                }
                results.append(config)
                
                if tot_ret > best_return:
                    best_return = tot_ret
                    best_config = config
                    
    results_df = pd.DataFrame(results)
    print("Grid Search Results:")
    print(results_df.to_markdown(index=False))
    
    print("\nBest Configuration:")
    print(best_config)
    return best_config

def main():
    ticker = "BTC-USD"
    print(f"\nBacktesting {ticker} with default parameters...")
    try:
        close, returns, split_index = fetch_and_process_data(ticker)
        positions = generate_signals(returns, split_index, epochs=50)
        portfolio = backtest_strategy(close, positions)
        
		
        print(f"Total Return: {portfolio.total_return():.2%}")
        print(f"Sharpe Ratio: {portfolio.sharpe_ratio():.2f}")
        print(f"Max Drawdown: {portfolio.max_drawdown():.2%}")
        
        print("\nStarting parameter optimization...")
        best_config = optimize_parameters(ticker)
        
    except Exception as e:
        print(f"Error in backtesting {ticker}: {str(e)}")

if __name__ == "__main__":
    main()


: 

: 

In [None]:
import yfinance as yf
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, LSTM, Dense, Dropout
from gpflow.kernels import Matern32
from gpflow.models import GPR
from gpflow import set_trainable
from sklearn.preprocessing import StandardScaler
import vectorbt as vbt

pd.set_option('display.float_format', lambda x: '%.6f' % x)

def fetch_and_process_data(ticker, start_date="2024-01-01", end_date="2024-12-31", train_ratio=0.8):
    try:
        data = yf.download(ticker, start=start_date, end=end_date)
        if data.empty or "Adj Close" not in data:
            raise ValueError(f"No valid data found for {ticker}")
        
        close = data["Adj Close"]
        split_index = int(len(close) * train_ratio)
        
        price_scale = 1000 if close.mean() < 0.1 else 1
        scaled_close = close * price_scale
        
        # Use percentage returns if price is high enough, else use log returns
        returns = scaled_close.pct_change().dropna() if close.mean() > 0.1 else np.log(scaled_close / scaled_close.shift(1)).dropna()
        
        train_returns = returns.iloc[:split_index]
        test_returns = returns.iloc[split_index:]
        
        scaler = StandardScaler()
        scaler.fit(train_returns.values.reshape(-1, 1))
        
        train_std = scaler.transform(train_returns.values.reshape(-1, 1))
        test_std = scaler.transform(test_returns.values.reshape(-1, 1))
        
        std_returns = np.concatenate([train_std, test_std]).flatten()
        
        return close[returns.index], pd.Series(std_returns, index=returns.index), split_index
    
    except Exception as e:
        raise RuntimeError(f"Error processing {ticker}: {str(e)}")

def backtest_strategy(close, positions, entry_threshold=0.2, exit_threshold=-0.2):
    # Use thresholds to decide entries and exits
    entries = positions > entry_threshold
    exits = positions < exit_threshold
    pf = vbt.Portfolio.from_signals(
        close=close,
        entries=entries,
        exits=exits,
        size=np.abs(positions),
        freq="1D"
    )
    return pf

# Strategy signal generators

def generate_signals_long_only(close, **kwargs):
    # A simple long-only strategy: go long when the price increases day-over-day
    signals = np.where(close.diff().fillna(0) > 0, 1, 0)
    return signals.astype(np.float64)

def generate_signals_macd(close, **kwargs):
    # Simple MACD: difference between two exponential moving averages
    ema12 = close.ewm(span=12, adjust=False).mean()
    ema26 = close.ewm(span=26, adjust=False).mean()
    macd = ema12 - ema26
    # Normalize MACD signal to be between -1 and 1 using a tanh transformation
    signals = np.tanh(macd)
    return signals

def generate_signals_tsmom(returns, w=0, **kwargs):
    # TSMOM: combine returns with a weight parameter w
    # For demonstration, simply scale the returns by a weight and clip to [-1, 1]
    signals = np.tanh(w * returns.values)
    return signals

def generate_signals_lstm_cpd(returns, split_index, lbw=21, epochs=50, **kwargs):
    # LSTM with CPD flavor using a lookback window (lbw).
    # Create a second feature as the moving average computed over the lookback window.
    ma_feature = returns.rolling(window=lbw, min_periods=1).mean().values
    features = np.column_stack((returns.values, ma_feature))
    
    X = np.arange(len(returns), dtype=np.float64).reshape(-1, 1)
    y = returns.values.reshape(-1, 1)
    
    # Split features
    X_train, X_test = X[:split_index], X[split_index:]
    train_features = features[:split_index]
    train_labels = y[:split_index].flatten()
    
    # Reshape features for LSTM input
    train_features = train_features.reshape((train_features.shape[0], 1, train_features.shape[1]))
    all_features = features.reshape((features.shape[0], 1, features.shape[1]))
    
    model = Sequential([
        Input(shape=(1, 2)),
        LSTM(64, return_sequences=True),
        Dropout(0.2),
        LSTM(32),
        Dense(1, activation="tanh")
    ])
    
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss="mse")
    model.fit(train_features, train_labels, epochs=epochs, batch_size=32, verbose=0)
    
    predicted_signals = model.predict(all_features).flatten()
    return np.clip(predicted_signals, -1, 1)

from itertools import product

def optimize_all_strategies(ticker="BTC-USD"):
    # Define grids for each strategy
    strategies = {
        "Long Only": {
            "generator": generate_signals_long_only,
            "params_grid": [{}]  
        },
        "MACD": {
            "generator": generate_signals_macd,
            "params_grid": [{}]
        },
        "TSMOM": {
            "generator": generate_signals_tsmom,
            "params_grid": [{'w': w} for w in [0, 0.5, 1]]
        },
        "LSTM w/ CPD": {
            "generator": generate_signals_lstm_cpd,
            "params_grid": [ {'lbw': lbw, 'epochs': 50} for lbw in [10, 21, 63, 126, 252] ]
        }
    }
    
    # Grid for backtest thresholds
    entry_thresholds = [0.15, 0.2, 0.25]
    exit_thresholds = [-0.15, -0.2, -0.25]
    
    close, returns, split_index = fetch_and_process_data(ticker)
    best_results = []
    
    for strat_name, strat_info in strategies.items():
        best_config = None
        best_return = -np.inf
        results = []
        print(f"\nOptimizing strategy: {strat_name}")
        
        for params in strat_info["params_grid"]:
            # Generate signals based on strategy
            if strat_name == "LSTM w/ CPD":
                # LSTM based strategy needs split_index
                signals = strat_info["generator"](returns, split_index, **params)
            else:
                signals = strat_info["generator"](close if strat_name in ["Long Only", "MACD"] else returns, **params)
            
            for et, xt in product(entry_thresholds, exit_thresholds):
                pf = backtest_strategy(close, signals, entry_threshold=et, exit_threshold=xt)
                tot_ret = pf.total_return()
                sharpe = pf.sharpe_ratio()
                max_dd = pf.max_drawdown()
                config = {
                    'Strategy': strat_name,
                    **params,
                    'entry_threshold': et,
                    'exit_threshold': xt,
                    'Total Return': tot_ret,
                    'Sharpe Ratio': sharpe,
                    'Max Drawdown': max_dd
                }
                results.append(config)
                if tot_ret > best_return:
                    best_return = tot_ret
                    best_config = config
        
        results_df = pd.DataFrame(results)
        print(f"\n{strat_name} Grid Search Results:")
        print(results_df.to_markdown(index=False))
        print(f"\nBest {strat_name} Configuration:")
        print(best_config)
        best_results.append(best_config)
    
    best_overall = sorted(best_results, key=lambda x: x['Total Return'], reverse=True)[0]
    print("\nBest Overall Strategy:", best_overall['Strategy'])
    print(best_overall)
    return best_results

def main():
    ticker = "BTC-USD"
    print(f"\nBacktesting {ticker} with multi-strategy optimization...")
    try:
        optimize_all_strategies(ticker)
    except Exception as e:
        print(f"Error in optimization: {str(e)}")

if __name__ == "__main__":
    main()


: 