In [None]:
%pip install numpy pandas tensorflow sklearn matplotlib math

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Dropout, GlobalAveragePooling1D, LayerNormalization, MultiHeadAttention
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import math

# Positional encoding function
def positional_encoding(seq_length, d_model):
    """Create standard transformer positional encoding"""
    positions = np.arange(seq_length)[:, np.newaxis]
    angles = np.arange(d_model)[np.newaxis, :] / np.power(10000, 2 * (np.arange(d_model)[np.newaxis, :] // 2) / d_model)
    
    pos_encoding = np.zeros((seq_length, d_model))
    pos_encoding[:, 0::2] = np.sin(positions * angles[:, 0::2])
    pos_encoding[:, 1::2] = np.cos(positions * angles[:, 1::2])
    
    return tf.cast(pos_encoding[np.newaxis, ...], tf.float32)

# Create sequences for time series data
def create_sequences(X, y, seq_length):
    X_seq, y_seq = [], []
    for i in range(len(X) - seq_length):
        X_seq.append(X[i:i+seq_length])
        y_seq.append(y[i+seq_length])
    
    return np.array(X_seq), np.array(y_seq)

# Data preparation function
def prepare_data(price_data, target_col='return_forward', seq_length=24, test_size=0.2):
    """
    Prepare data for transformer model training
    """
    # Drop rows with NaN values
    price_data = price_data.dropna()
    
    # Separate features and target
    y = price_data[target_col].values
    X = price_data.drop(columns=[target_col])
    
    # Save feature names for later
    feature_names = X.columns.tolist()
    X = X.values
    
    # Create sequences
    X_seq, y_seq = create_sequences(X, y, seq_length)
    
    # Split into train and test
    train_size = int(len(X_seq) * (1 - test_size))
    X_train, X_test = X_seq[:train_size], X_seq[train_size:]
    y_train, y_test = y_seq[:train_size], y_seq[train_size:]
    
    # Normalize features
    scaler = StandardScaler()
    n_samples, n_timesteps, n_features = X_train.shape
    X_train_reshaped = X_train.reshape(n_samples * n_timesteps, n_features)
    X_train_scaled = scaler.fit_transform(X_train_reshaped)
    X_train = X_train_scaled.reshape(n_samples, n_timesteps, n_features)
    
    # Scale test data
    n_samples, n_timesteps, n_features = X_test.shape
    X_test_reshaped = X_test.reshape(n_samples * n_timesteps, n_features)
    X_test_scaled = scaler.transform(X_test_reshaped)
    X_test = X_test_scaled.reshape(n_samples, n_timesteps, n_features)
    
    return X_train, X_test, y_train, y_test, scaler, feature_names

# Build transformer model
def build_transformer_model(seq_length, n_features, n_heads=4, d_model=64, dff=128, n_layers=2, dropout_rate=0.1):
    """
    Build a transformer model for time series prediction
    """
    inputs = Input(shape=(seq_length, n_features))
    
    # Add positional encoding
    pos_encoding = positional_encoding(seq_length, n_features)
    x = inputs + pos_encoding
    
    # Transformer blocks
    for _ in range(n_layers):
        # Multi-head attention
        attention_output = MultiHeadAttention(
            num_heads=n_heads, key_dim=d_model//n_heads
        )(x, x)
        attention_output = Dropout(dropout_rate)(attention_output)
        
        # Add & Norm
        x = LayerNormalization(epsilon=1e-6)(x + attention_output)
        
        # Feed Forward Network
        ffn_output = Dense(dff, activation='relu')(x)
        ffn_output = Dense(n_features)(ffn_output)
        ffn_output = Dropout(dropout_rate)(ffn_output)
        
        # Add & Norm
        x = LayerNormalization(epsilon=1e-6)(x + ffn_output)
    
    # Global pooling
    x = GlobalAveragePooling1D()(x)
    
    # Final dense layers
    x = Dense(64, activation='relu')(x)
    x = Dropout(0.2)(x)
    outputs = Dense(1)(x)
    
    model = Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer='adam', loss='mse', metrics=['mae'])
    
    return model

# Train the model
def train_transformer_model(X_train, y_train, X_test, y_test, seq_length, n_features, 
                           epochs=100, batch_size=32, patience=20):
    """
    Train the transformer model and evaluate
    """
    # Build model
    model = build_transformer_model(seq_length, n_features)
    
    # Callbacks
    early_stopping = EarlyStopping(
        monitor='val_loss',
        patience=patience,
        restore_best_weights=True
    )
    
    model_checkpoint = ModelCheckpoint(
        'transformer_model_best.h5',
        monitor='val_loss',
        save_best_only=True,
        verbose=1
    )
    
    # Train model
    history = model.fit(
        X_train, y_train,
        validation_data=(X_test, y_test),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=[early_stopping, model_checkpoint],
        verbose=1
    )
    
    # Evaluate model
    train_preds = model.predict(X_train)
    test_preds = model.predict(X_test)
    
    # Calculate metrics
    train_rmse = math.sqrt(mean_squared_error(y_train, train_preds))
    test_rmse = math.sqrt(mean_squared_error(y_test, test_preds))
    train_mae = mean_absolute_error(y_train, train_preds)
    test_mae = mean_absolute_error(y_test, test_preds)
    train_r2 = r2_score(y_train, train_preds)
    test_r2 = r2_score(y_test, test_preds)
    
    print(f'Train RMSE: {train_rmse:.4f}')
    print(f'Test RMSE: {test_rmse:.4f}')
    print(f'Train MAE: {train_mae:.4f}')
    print(f'Test MAE: {test_mae:.4f}')
    print(f'Train R²: {train_r2:.4f}')
    print(f'Test R²: {test_r2:.4f}')
    
    return model, history, train_preds, test_preds

# Plot results 
def plot_results(history, y_train, train_preds, y_test, test_preds):
    """Plot training history and predictions"""
    # Create figure with 2x2 subplots
    fig, axs = plt.subplots(2, 2, figsize=(18, 12))
    
    # Plot 1: Training and Validation Loss
    axs[0, 0].plot(history.history['loss'], label='Training Loss')
    axs[0, 0].plot(history.history['val_loss'], label='Validation Loss')
    axs[0, 0].set_title('Model Loss')
    axs[0, 0].set_xlabel('Epoch')
    axs[0, 0].set_ylabel('Loss (MSE)')
    axs[0, 0].legend(loc='upper right')
    axs[0, 0].grid(True)
    
    # Plot 2: Training Set Predictions
    axs[0, 1].plot(y_train, label='Actual Values', color='blue', alpha=0.6)
    axs[0, 1].plot(train_preds, label='Predicted Values', color='red', alpha=0.6)
    axs[0, 1].set_title('Training Set: Actual vs Predicted')
    axs[0, 1].set_xlabel('Time Step')
    axs[0, 1].set_ylabel('Value')
    axs[0, 1].legend(loc='upper right')
    axs[0, 1].grid(True)
    
    # Plot 3: Test Set Predictions
    axs[1, 0].plot(y_test, label='Actual Values', color='blue', alpha=0.6)
    axs[1, 0].plot(test_preds, label='Predicted Values', color='red', alpha=0.6)
    axs[1, 0].set_title('Test Set: Actual vs Predicted')
    axs[1, 0].set_xlabel('Time Step')
    axs[1, 0].set_ylabel('Value')
    axs[1, 0].legend(loc='upper right')
    axs[1, 0].grid(True)
    
    # Plot 4: Actual vs Predicted Scatter Plot (Test Set)
    axs[1, 1].scatter(y_test, test_preds, alpha=0.5)
    axs[1, 1].plot([min(y_test), max(y_test)], [min(y_test), max(y_test)], 'k--', lw=2)
    axs[1, 1].set_title('Test Set: Actual vs Predicted Scatter Plot')
    axs[1, 1].set_xlabel('Actual Values')
    axs[1, 1].set_ylabel('Predicted Values')
    axs[1, 1].grid(True)
    
    plt.tight_layout()
    plt.savefig('transformer_model_results.png', dpi=300)
    plt.show()

# Generate trading signals from model predictions
def generate_signals(model, X_data, threshold=0.0):
    """
    Generate trading signals from model predictions
    1 for buy (predicted return > threshold)
    0 for hold (predicted return = threshold)
    -1 for sell (predicted return < threshold)
    """
    predictions = model.predict(X_data)
    signals = np.where(predictions > threshold, 1, np.where(predictions < -threshold, -1, 0))
    return signals.flatten()

# Main execution function for transformer model
def run_transformer_prediction(price_data, target_col='return_forward', seq_length=24, 
                              test_size=0.2, epochs=100, batch_size=32):
    """
    Complete end-to-end transformer model pipeline
    """
    # Prepare data
    X_train, X_test, y_train, y_test, scaler, feature_names = prepare_data(
        price_data, target_col, seq_length, test_size
    )
    
    # Train model
    model, history, train_preds, test_preds = train_transformer_model(
        X_train, y_train, X_test, y_test, seq_length, len(feature_names),
        epochs, batch_size
    )
    
    # Plot results
    plot_results(history, y_train, train_preds, y_test, test_preds)
    
    # Generate signals for backtest
    test_signals = generate_signals(model, X_test)
    
    return model, history, train_preds, test_preds, test_signals

# Example usage:
if __name__ == "__main__":
    # Load data
    price_indicator = pd.read_parquet("/workspaces/fyp/bitcoin_historical_price/btcusd_hourly_price_indicators.parquet")
    
    # Run transformer prediction
    model, history, train_preds, test_preds, test_signals = run_transformer_prediction(
        price_indicator,
        target_col='return_forward',
        seq_length=24,  # 24 hours of data
        test_size=0.2,
        epochs=100,
        batch_size=32
    )
    
    # For backtesting (assuming data aligns correctly)
    # We'd need to create a dataframe with the signals and pass it to run_backtest
    # This is a simplified example - you'd need to match the test signals with the correct dates
    test_indices = price_indicator.index[-len(test_signals):]
    backtest_df = pd.DataFrame({
        'signal': test_signals
    }, index=test_indices)
    
    # Run backtest (if the backtest function is available)
    # run_backtest(backtest_df, freq='h')
