# Import Packages

In [None]:
import os
import random
import numpy as np
import tensorflow as tf
from tensorflow.keras import backend as K
import GPUtil
import pandas as pd
import yfinance as y
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.utils import plot_model
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, Add, Dense, Conv2D, MaxPooling2D, Flatten
from tensorflow.keras.optimizers import SGD  
import matplotlib.pyplot as plt
from IPython.display import Image
from datetime import timedelta
from scipy.spatial.distance import euclidean
import warnings

# Set Seeds

In [None]:
# Set environment variables for reproducibility
os.environ['TF_DETERMINISTIC_OPS'] = '1'
os.environ['CUDA_VISIBLE_DEVICES'] = ''
os.environ['PYTHONHASHSEED'] = '0'
os.environ['TF_CUDNN_DETERMINISTIC'] = '1'

# Set seeds for reproducibility
random.seed(314)
np.random.seed(314)
tf.random.set_seed(1618)

# Set TensorFlow threading
tf.config.threading.set_inter_op_parallelism_threads(1)
tf.config.threading.set_intra_op_parallelism_threads(1)

# Establish Link to GPU

In [None]:
# List all available GPUs
gpus = tf.config.experimental.list_physical_devices('GPU')
print("Num GPUs Available: ", len(gpus))

# Check if GPUs are available
if gpus:
    for gpu in gpus:
        gpu_details = tf.config.experimental.get_device_details(gpu)
        
    # Use GPUtil to get detailed GPU information
    gpus_info = GPUtil.getGPUs()
    for gpu in gpus_info:
        print(f"GPU Model: {gpu.name}")
        print(f"VRAM Total: {gpu.memoryTotal} MB")
        print(f"VRAM Free: {gpu.memoryFree} MB")
        print(f"VRAM Used: {gpu.memoryUsed} MB")
else:
    print("No GPUs found.")

# Set Model Hyperparameters

In [None]:
H = 120 #Number of days for matching procedure
time_step = 20 
holding_period = 10
epochs = 64 

stock = 'AAPL'

# Euclidian Distance
$d(\vec{p},\vec{q}) = \sqrt{ \sum_{i=1}^{H} (q_i - p_i)^2 }$

# Euclidian Matching Algorithm on Price Action
This function will take as input an element of $p = [0,1]^{H}$ and return $(x,y)$, a historical window of H days (from date $x$ to date $y$) which matches the price action of the test point the most.

In essence, $$ r = \underset{i}{\arg \min} \quad d(p,Tr_{i})  $$ Where $Tr = \{Tr_1, \dots, Tr_N\}$ and each $Tr_1, \dots, Tr_N \in [0,1]^{H}$

In [None]:
def match(test_block, historical_blocks):

    # Unpack the test block data
    test_start_date, test_end_date, test_data_scaled = test_block

    # Initialize variables to keep track of the minimum distance and the best matching block
    min_distance = float('inf')
    best_match = None
    best_match_data = None

    # Iterate over all historical blocks
    for start_date, end_date, historical_data_scaled in historical_blocks:
        # Compute the Euclidean distance between the test block and the current historical block
        distance = euclidean(test_data_scaled.flatten(), historical_data_scaled.flatten())

        # Update the minimum distance and best match if the current distance is smaller
        if distance < min_distance:
            min_distance = distance
            best_match = (start_date, end_date)
            best_match_data = historical_data_scaled
    return best_match

# Retrieve Historical Data

In [None]:
train_block = yf.download(stock, start = '1990-01-01', end = '2014-12-31')[['Close']]
train_sub_blocks = []

for i in range(len(train_block) - H + 1):
    datum = train_block['Close'][i:i+H].values.reshape(-1, 1)
    scaler = MinMaxScaler()
    scaler.fit(datum)
    datum_scaled = scaler.transform(datum)

    train_sub_blocks.append((train_block.index[i], train_block.index[i + H -1 ], datum_scaled))

# Retrieve Test Data

In [None]:
test_block = yf.download(stock, start = '2016-01-01', end = '2024-02-01')[['Close']] #1 year gap between train and test
test_sub_blocks = []

for i in range(len(test_block) - H +1):
    datum = test_block['Close'][i:i+H].values.reshape(-1, 1)
    scaler = MinMaxScaler()
    scaler.fit(datum)
    datum_scaled = scaler.transform(datum)

    test_sub_blocks.append((test_block.index[i], test_block.index[i + H - 1], datum_scaled))

# ResNet Block for Skip Connections

In [None]:
def res_block(x, filters, kernel_size, strides=(1, 1), activation='relu', padding='same'):
    """
    A ResNet block with two convolutional layers and a skip connection.
    """
    # Main path
    y = Conv2D(filters, kernel_size, strides=strides, activation=activation, padding=padding)(x)
    y = Conv2D(filters, kernel_size, strides=strides, activation=None, padding=padding)(y)
    
    # Skip connection
    if x.shape[-1] != filters:
        x = Conv2D(filters, kernel_size=(1, 1), strides=strides, activation=None, padding=padding)(x)
    
    # Add skip connection
    out = Add()([x, y])
    out = tf.keras.layers.Activation(activation)(out)
    return out

# Build Structure of the CNN Model

In [None]:
def create_model():
    # Input layer
    input_layer = Input(shape=(time_step, 6, 1))
    
    # ResNet blocks
    x = res_block(input_layer, 64, (5, 6))
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = res_block(x, 128, (3, 3))
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = res_block(x, 256, (1, 1))
    x = MaxPooling2D(pool_size=(1, 1))(x)
    
    # Flatten the output of the pooling layer to feed into the dense layer
    x = Flatten()(x)
    
    # Dense layers
    x = Dense(128, activation='relu')(x)
    x = Dense(64, activation='relu')(x)
    x = Dense(32, activation='relu')(x)
    x = Dense(10, activation='relu')(x)
    
    # Output layer
    output_layer = Dense(1, activation='sigmoid')(x)  # For binary classification
    
    # Create model
    model = Model(inputs=input_layer, outputs=output_layer)
    
    # Compile the model
    model.compile(optimizer='Adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

model = create_model()

# Iteratively Train The Model
1. Take in an input testing block, $\vec{p} \in [0,1]^H$.
2. Find the historical dates $x,y$ corresponding to a signature vector $\vec{q}^* \in [0,1]^H$, where $\vec{q}^* = \underset{\vec{q} \in Tr}{\arg \min}  \quad d(\vec{p}, \vec{q})$.
3. Retrieve Open, High, Low, Close, Volume, 14d Momentum of the period $x-50$ to $y+50$, call this $M$.
4. Roll a window over $M$ to split it into many $20\times6$ matricies.
5. Add these many $20 \times 6$ matricies to our sample of training instances.
6. Feed this new data into the existing model for **more training**.
7. Form a $20 \times 6$ matrix over the period of the tail end of the $\vec{p}$, containing Open, High, Low, Close, Volume, 14d Momentum.
8. Predict the Label of this testing datum.

In [None]:
warnings.filterwarnings('ignore')

actuals = []
rounded_preds= []
returns = []
temp = []

In [None]:
for i in range(0, 50):
    print('Prediction: ', i+1)
    current_entry = test_sub_blocks[i]
    most_similar_block_dates = match(current_entry, train_sub_blocks)
    
    # Gather data for most similar historical period
    highest_similarity = yf.download(stock, 
                                     start = most_similar_block_dates[0].date() - timedelta(days = 50), 
                                     end = most_similar_block_dates[1].date() + timedelta(days = 50))[['Open', 'High', 'Low', 'Close', 'Volume']]
    highest_similarity['Momentum'] = highest_similarity['Close'].diff(14) #14d momentum

    # Find the labels for each day in the historical period
    labels = []
    buys = []
    sells = []
    for i in range(len(highest_similarity)-holding_period):
        buy_price = highest_similarity['Open'][i+1]
        sell_price = highest_similarity['Close'][i+holding_period]
        buys.append(buy_price)
        sells.append(sell_price)
        if sell_price >= buy_price:
            labels.append(1)
        elif sell_price < buy_price:
            labels.append(0)

    # Match shape of lists
    for i in range(len(highest_similarity) - len(labels)):
        labels.append(None)
        buys.append(None)
        sells.append(None)

    # Add lists to dataframe
    highest_similarity['Label'] = labels
    highest_similarity['Buy'] = buys
    highest_similarity['Sell'] = sells

    # Remove NAs and reset index
    highest_similarity = highest_similarity.dropna()
    highest_similarity.reset_index(drop=True, inplace=True)


    # Format training and testing split 
    X_train = []
    y_train = []
    for i in range(len(highest_similarity)-time_step):
        datum = np.array(highest_similarity[['Open', 'High', 'Low', 'Close', 'Volume', 'Momentum']][i:i+time_step])
        scaler = MinMaxScaler()
        scaler.fit(datum)
        datum_scaled = scaler.transform(datum)
        datum_scaled = datum_scaled.reshape(time_step, 6, 1)  # Reshape to match the input shape expected by the model
        X_train.append(datum_scaled)
        y_train.append(highest_similarity['Label'][i+time_step-1])

    
    X_train = np.array(X_train)
    y_train = np.array(y_train)

    # Fit the model (two choices)
    # model = create_model() #[you can choose to fit a new model each day]
    history = model.fit(X_train, y_train, epochs=epochs, batch_size=64, verbose=0)

    # Get the financial data for the current period
    current = yf.download(stock, 
                          start =  current_entry[1].date() - timedelta(days=70),  
                          end =  current_entry[1].date() +timedelta(days =20) )[['Open', 'High', 'Low', 'Close', 'Volume']]
    current['Momentum'] = current['Close'].diff(14)

    # Find the labels for each day in the current period
    labels = []
    buys = []
    sells = []
    for i in range(len(current)-holding_period):
        buy_price = current['Open'][i+1]
        sell_price = current['Close'][i+holding_period]
        
        buys.append(buy_price)
        sells.append(sell_price)
        
        if sell_price >= buy_price:
            labels.append(1)
        elif sell_price < buy_price:
            labels.append(0)

    # Match shape of lists
    for i in range(len(current) - len(labels)):
        labels.append(None)
        buys.append(None)
        sells.append(None)

    # Add lists to dataframe
    current['Label'] = labels
    current['Buy'] = buys
    current['Sell'] = sells

    # Drop NA values
    current = current.dropna()

    # Ensure the index is a DatetimeIndex
    current.index = pd.to_datetime(current.index)
    
    # Obtain the target date from current_entry and convert to pd.Timestamp
    target_date = pd.Timestamp(current_entry[1].date())
    
    # Check if the date exists in the index to avoid KeyError
    target_index = current.index.get_loc(target_date)
    
    # Calculate the start index for slicing
    start_index = max(0, target_index - 20)
    
    # Slice the dataframe
    subset_df = current.iloc[start_index + 1:target_index + 1]

    # Get the matrix of the last 20 days from today
    X_test = []
    y_test = []
    datum = np.array(subset_df[['Open', 'High', 'Low', 'Close', 'Volume', 'Momentum']])
    scaler = MinMaxScaler()
    scaler.fit(datum)
    datum_scaled = scaler.transform(datum)
    datum_scaled = datum_scaled.reshape((time_step, 6, 1))  # Ensure correct reshaping
    X_test.append(datum_scaled)
    y_test.append(subset_df['Label'][-1])

    # Store the returns 
    returns.append( (subset_df['Sell'][-1] - subset_df['Buy'][-1])/subset_df['Buy'][-1] )

    # Make prediction
    pred = model.predict(X_test[-1].reshape(1, 20, 6, 1))
    print(pred)

    # Store data for backtesting
    temp.append( (current_entry[1].date(), pred, subset_df['Label'][-1]) )

    # Store actual label
    actuals.append(y_test[-1])

# Make the Rounded Predictions over a certain Threshold

In [None]:
threshold_buy = 0.9
stock_date_label = []
rounded_preds = []

for i in range (len(temp)):
    if temp[i][1][0][0] >= threshold_buy:
        rounded_pred = 1
    else:
        rounded_pred =0
    rounded_preds.append(rounded_pred)
    stock_date_label.append( (temp[i][0], rounded_pred, temp[i][-1]) )

# Backtest on Stocks

In [None]:
# Initial balance and tracking variables
initial_balance = 10000
strategy_balance = [initial_balance]
baseline_balance = [initial_balance]

# Track strategy earnings
bought = -10
for i in range(len(rounded_preds)):
    if rounded_preds[i] == 1 and i > bought + 9:
        bought = i
        strategy_balance.append(strategy_balance[-1] * (1 + returns[i]))
    else:
        strategy_balance.append(strategy_balance[-1])

# Obtain baseline close prices and calculate returns
stock_data = yf.download(stock, start=start_date, end=end_date)
stock_data['Returns'] = stock_data['Close'].pct_change().fillna(0)
baseline_balance = [initial_balance * (1 + stock_data['Returns'].cumsum().iloc[i]) for i in range(len(stock_data))]

# Plotting strategy vs. baseline
plt.figure(figsize=(14, 7))
plt.plot(strategy_balance, label='Strategy Earnings')
plt.plot(baseline_balance, label='Baseline Earnings', linestyle='--')
plt.title('Strategy vs. Baseline Earnings')
plt.xlabel('Days')
plt.ylabel('Balance')
plt.legend()
plt.show()

# Calculating maximum drawdown
def max_drawdown(equity_curve):
    """Calculate the maximum drawdown."""
    drawdowns = []
    peak = equity_curve[0]
    for value in equity_curve:
        if value > peak:
            peak = value
        drawdown = (peak - value) / peak
        drawdowns.append(drawdown)
    return max(drawdowns)

print('Maximum Drawdown: ', round(max_drawdown(strategy_balance) * 100, 2), '%')

# Precision of the strategy
tp = 0
fp = 0
tn = 0
fn = 0

for i in range(len(rounded_preds)):
    if rounded_preds[i] == 1 and actuals[i] == 1:
        tp += 1
    elif rounded_preds[i] == 1 and actuals[i] == 0:
        fp += 1
    elif rounded_preds[i] == 0 and actuals[i] == 0:
        tn += 1
    elif rounded_preds[i] == 0 and actuals[i] == 1:
        fn += 1

precision = tp / (tp + fp) if (tp + fp) != 0 else 0
print('Strategy Precision: ', round(precision * 100, 2), '%')

# Store baseline performance
baseline_returns = stock_data['Returns']
baseline_tp = actuals.count(1)
baseline_fp = len(actuals)-actuals.count(1)
baseline_precision = baseline_tp / (baseline_tp + baseline_fp) if (baseline_tp + baseline_fp) != 0 else 0

print('Baseline Precision: ', round(baseline_precision * 100, 2), '%')

In [None]:
# Counters for correct, incorrect and total trades
correct = 0
incorrect = 0
trades = 0

# Backtesting logic
bought = -10
initial = 10000
balance = initial
for i in range(len(rounded_preds)):
    if rounded_preds[i] ==1 and i> bought + 9:
        trades+= 1
        bought = i
        balance = balance + balance * returns[i]
        
        if returns[i]>0:
            correct += 1
            percents_win.append(returns[i])
        elif returns[i]<= 0:
            incorrect +=1
            percents_loss.append(returns[i])

print('Starting Balance: $', initial)
print('End Balance: $', round(balance,2))
print('ROI: ', round( ((balance-initial)/initial)*100,2),'%')
print('Total Trades: ', trades)
print('Resultant Precision: ', round( (correct/(correct+incorrect))*100,2), '%')
print('######################')
print('Overall Precision : ', round((tp/(tp+fp))*100,2), '%')
print('Overall Accuracy : ', round(( (tp +tn)/(tp + fp + tn + fn))*100,2), '%')
print('Average % Change When Win ', np.mean(percents_win))
print('Average % Change When Lose ', np.mean(percents_loss))
win_rate = tp/(tp+fp)
print('Average Expected Value: ', win_rate*np.mean(percents_win) + (1-win_rate)*np.mean(percents_loss))