In [102]:
import numpy as np
import pandas as pd
import yaml
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import BinaryAccuracy

In [104]:
def load_config(config_path):
    with open(config_path, 'r') as file:
        config = yaml.safe_load(file)
    return config

In [106]:
def prepare_data_for_cnn(df, window_size):
    data = df.values
    num_samples = (len(data) - window_size) // window_size
    reshaped_data = []

    for i in range(num_samples):
        start = i * window_size
        end = start + window_size
        reshaped_data.append(data[start:end])

    reshaped_data = np.array(reshaped_data)
    reshaped_data = reshaped_data.reshape(
        (reshaped_data.shape[0], window_size, data.shape[1], 1))  # Add channel dimension

    return reshaped_data

In [108]:
def prepare_labels(df, window_size):
    # Shift 'Close' prices to create labels
    y_labels = df['Close'].shift(-1).values

    # Remove the last `window_size` labels which will be NaN after shifting
    y_labels = y_labels[:-window_size]

    # Remove the first `window_size` entries from 'Close' to align with labels
    close_prices = df['Close'].values[window_size:]

    # Create binary labels: 1 if the next price is higher, else 0
    y_labels = np.where(y_labels > close_prices, 1, 0).astype(np.float32)

    return y_labels

In [120]:
def build_functional_model(input_shape, num_filters, kernel_size, pool_size, dropout_rate, activation, output_activation, num_classes):
    input_layer = Input(shape=input_shape)

    # First Convolutional Block
    x = Conv2D(num_filters[0], kernel_size, activation=activation, padding='same')(input_layer)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=pool_size)(x)

    # Second Convolutional Block
    x = Conv2D(num_filters[1], kernel_size, activation=activation, padding='same')(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=pool_size)(x)

    # Third Convolutional Block
    x = Conv2D(num_filters[2], kernel_size, activation=activation, padding='same')(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=pool_size)(x)

    # Additional Convolutional Block
    x = Conv2D(256, kernel_size, activation=activation, padding='same')(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=pool_size)(x)

    # Flatten and Dense layers
    x = Flatten()(x)
    x = Dense(512, activation=activation, kernel_regularizer='l2')(x)  # L2 regularization
    x = Dropout(dropout_rate)(x)
    x = Dense(128, activation=activation, kernel_regularizer='l2')(x)

    # Output layer
    output_layer = Dense(num_classes, activation=output_activation)(x)

    # Create the model
    model = Model(inputs=input_layer, outputs=output_layer)

    return model  # Ensure that the model is returned

In [124]:
def main():
    # Load configuration
    config = load_config(r"C:\Users\Arun2\Documents\Project\Trading Strat\Config\config.yaml")

    # Load preprocessed data
    df = pd.read_csv(config['data']['data_path'])

    # Prepare data for CNN
    window_size = config['data']['sequence_length']
    cnn_data = prepare_data_for_cnn(df, window_size)

    # Prepare labels
    y_labels = prepare_labels(df, window_size)

    # Adjust labels to match data length
    y_labels = y_labels[:cnn_data.shape[0]]

    # Split data into training and testing sets
    validation_split = config['data']['validation_split']
    X_train, X_test, y_train, y_test = train_test_split(cnn_data, y_labels, test_size=validation_split,
                                                        random_state=config['misc']['random_seed'])

    # Build the CNN model
    input_shape = (window_size, cnn_data.shape[2], 1)  # (time steps, features, channels)
    num_classes = 1  # Binary classification
    model = build_functional_model(
        input_shape=input_shape,
        num_filters=config['keras']['model']['cnn']['num_filters'],
        kernel_size=tuple(config['keras']['model']['cnn']['kernel_size']),
        pool_size=tuple(config['keras']['model']['cnn']['pool_size']),
        dropout_rate=config['keras']['model']['cnn']['dropout_rate'],
        activation=config['keras']['model']['cnn']['activation'],
        output_activation='sigmoid',  # Sigmoid for binary classification
        num_classes=num_classes
    )

    # Compile the model with the optimizer, loss function, and metrics from config
    optimizer = Adam(learning_rate=config['keras']['training']['learning_rate'])
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=[BinaryAccuracy()])

    # Train the model
    model.fit(X_train, y_train, epochs=30,
              batch_size=config['keras']['training']['batch_size'], validation_split=validation_split)
main()

Epoch 1/10
[1m2380/2380[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m134s[0m 52ms/step - binary_accuracy: 0.5356 - loss: 1.6341 - val_binary_accuracy: 0.5435 - val_loss: 0.6905
Epoch 2/10
[1m2380/2380[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m130s[0m 55ms/step - binary_accuracy: 0.5407 - loss: 0.6906 - val_binary_accuracy: 0.5435 - val_loss: 0.6897
Epoch 3/10
[1m2380/2380[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m121s[0m 51ms/step - binary_accuracy: 0.5319 - loss: 0.6913 - val_binary_accuracy: 0.5435 - val_loss: 0.6895
Epoch 4/10
[1m2380/2380[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m113s[0m 47ms/step - binary_accuracy: 0.5383 - loss: 0.6905 - val_binary_accuracy: 0.5435 - val_loss: 0.6896
Epoch 5/10
[1m2380/2380[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m113s[0m 47ms/step - binary_accuracy: 0.5337 - loss: 0.6910 - val_binary_accuracy: 0.5435 - val_loss: 0.6898
Epoch 6/10
[1m2380/2380[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m111s[0m 46ms/step - bi

In [126]:
# Generate predictions
predictions = model.predict(X_test)

# Translate model outputs into signals
buy_threshold = 0.6   # Set threshold for a strong buy
sell_threshold = 0.4  # Set threshold for a strong sell

buy_signals = predictions > buy_threshold
sell_signals = predictions < sell_threshold


[1m93/93[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step


In [128]:
initial_balance = 10000  # Starting with $10,000
balance = initial_balance
positions = 0  # Tracks how many units are held
trade_log = []  # Log to store trade details

for i in range(len(X_test)):
    price = df['Close'].iloc[i + len(X_train)]  # Get the current close price

    # Buy signal
    if buy_signals[i]:
        units = balance // price  # Buy as many units as possible
        balance -= units * price
        positions += units
        trade_log.append(f"Buy at {price}, Units: {units}")

    # Sell signal
    elif sell_signals[i] and positions > 0:
        balance += positions * price  # Sell all units
        trade_log.append(f"Sell at {price}, Profit: {positions * price}")
        positions = 0

# Final balance and positions after backtest
if positions > 0:
    final_value = balance + positions * df['Close'].iloc[-1]
else:
    final_value = balance
profit = final_value - initial_balance

In [130]:
total_trades = len(trade_log)
successful_trades = sum([1 for trade in trade_log if "Profit" in trade])
accuracy = successful_trades / total_trades * 100

ZeroDivisionError: division by zero