### Libraries

In [1]:
import numpy as np
import pandas as pd
import datetime
import matplotlib.pyplot as plt

from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dropout, Dense, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import Callback

from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error
from itertools import product

### Data loading and preparation

In [2]:
def get_tickers(path = 'Data\Real\Financial\microprocessor_stocks_2023-25.csv'):

    tickers = pd.read_csv(path)
    tickers = tickers['ticker'].unique()
    return tickers

def load_ticker(ticker,
                include_NLP = False
                ):

    # read data and set date index
    X = pd.read_csv(f'Data\ParsedDataForModel\{ticker}.csv')
    X['date'] = X['Unnamed: 0']
    X['date'] = pd.to_datetime(X['date'])
    X.index = X['date']


    # map the sentiment
    sentiment_mapping = {'Positive': 1, 'Neutral': 0, 'Negative': -1}
    X[f'{ticker}_sentiment'] = X[f'{ticker}_sentiment'].map(sentiment_mapping)

    # resample hourly
    X = X[['close', f'{ticker}_score', f'{ticker}_sentiment']].resample('1h').mean()

    # target variable
    X.loc[:, 'target'] = X['close'].shift(-1)
    X = X.dropna()
    y = X[['target']].copy()

    # additional variables
    X.loc[:, 'FMA'] = X['close'].rolling(window = '5h').mean()
    X.loc[:, 'SMA'] = X['close'].rolling(window = '10h').mean()
    X = X.dropna()

    if include_NLP:
        X = X[['FMA', 'SMA', 'close', f'{ticker}_score', f'{ticker}_sentiment']]
    else:
        X = X[['FMA', 'SMA', 'close']]
        

    return X, y

### Model - standard

In [3]:
def build_and_train_model(
    X_train, y_train,
    X_val,   y_val,
    dropout_rate=0.2,
    learning_rate=0.001,
    epochs=10,
    batch_size=32,
    verbose=0
):
    """
    Builds and trains an LSTM model.
    Returns the trained model and the RMSE on the validation set.
    """
    # 1) Build the model
    model = Sequential()
    model.add(Input(shape=(X_train.shape[1], X_train.shape[2])))  # explicit input layer
    model.add(LSTM(128, return_sequences = True))
    model.add(Dropout(dropout_rate))
    model.add(LSTM(64, return_sequences = True))
    model.add(Dropout(dropout_rate))
    model.add(LSTM(32))
    model.add(Dense(1))  # single numeric output

    # 2) Compile
    optimizer = Adam(learning_rate=learning_rate)
    model.compile(loss='mean_squared_error', optimizer=optimizer)

    # 3) Fit
    model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=batch_size,
        verbose=verbose
    )

    # 4) Compute validation RMSE
    y_val_pred = model.predict(X_val)
    rmse_val = np.sqrt(mean_squared_error(y_val, y_val_pred))

    return model, rmse_val

def gridsearch_lstm(
    X, y,
    train_prop=0.7, val_prop=0.15, test_prop=0.15,
    param_grid=None, verbose = 1
):
    """
    Splits (X, y) into train/val/test according to train_prop, val_prop, test_prop,
    then grid-searches an LSTM over param_grid on (train, val), 
    and returns predictions on the test set.

    :param X: 2D numpy array, shape (N, num_features).
    :param y: 1D numpy array, shape (N,).
    :param train_prop: float, proportion for training (e.g. 0.7).
    :param val_prop: float, proportion for validation (e.g. 0.15).
    :param test_prop: float, proportion for test (e.g. 0.15).
    :param param_grid: dict of lists, e.g. {
         'dropout_rate': [0.0, 0.2],
         'learning_rate': [0.01, 0.001],
         'epochs': [5, 10],
         'batch_size': [32, 64]
      }

    :return:
      - best_model: the Keras model that had the best val RMSE
      - test_preds: predictions of the best model on test set
      - y_test: true target values for test set
      - best_params: dict of hyperparams that gave the best val RMSE
      - test_rmse: RMSE on the test set
    """
    if param_grid is None:
        # A small default param grid
        param_grid = {
            'dropout_rate': [0.0, 0.2],
            'learning_rate': [0.001],
            'epochs': [5],
            'batch_size': [32]
        }

    n = len(X)
    # 1) Split indices
    train_end = int(n * train_prop)
    val_end   = int(n * (train_prop + val_prop))  # up to but not including
    # test is [val_end..]

    # 2) Slice data
    X_train_raw = X[:train_end]
    y_train_raw = y[:train_end]

    X_val_raw   = X[train_end:val_end]
    y_val_raw   = y[train_end:val_end]

    X_test_raw  = X[val_end:]
    y_test      = y[val_end:]

    # 3) Reshape for LSTM if you want (samples, timesteps=1, features)
    #    i.e. each sample is 1 time-step with 'num_features' inputs
    X_train = X_train_raw.reshape(X_train_raw.shape[0], 1, X_train_raw.shape[1])
    X_val   = X_val_raw.reshape(X_val_raw.shape[0], 1, X_val_raw.shape[1])
    X_test  = X_test_raw.reshape(X_test_raw.shape[0], 1, X_test_raw.shape[1])

    # 4) Grid Search
    best_rmse = float('inf')
    best_model = None
    best_params = None

    # Convert param_grid dict -> list of keys & list of lists for product
    param_keys, param_values = zip(*param_grid.items())  # e.g. (('dropout_rate','lr','epochs','batch_size'), ([0.0,0.2],[...],...))
    for combo in product(*param_values):
        current_params = dict(zip(param_keys, combo))

        # Train model on train, measure val RMSE
        model, rmse_val = build_and_train_model(
            X_train, y_train_raw,
            X_val,   y_val_raw,
            dropout_rate=current_params.get('dropout_rate', 0.2),
            learning_rate=current_params.get('learning_rate', 0.001),
            epochs=current_params.get('epochs', 10),
            batch_size=current_params.get('batch_size', 32),
            verbose = verbose
        )

        if rmse_val < best_rmse:
            best_rmse = rmse_val
            best_model = model
            best_params = current_params

    # 5) Predict on test with best model
    test_preds = best_model.predict(X_test).ravel()

    return test_preds, y_test, best_params, best_model

### Model - Walk Forward

In [4]:
class CustomCheckpoint(Callback):

    def __init__(self, validation_data, scaler, filepath="best_custom_model.keras"):

        self.validation_data = validation_data
        self.filepath = filepath
        self.best_score = -np.inf  # Higher is better for information ratio
        self.scaler = scaler

    def information_ratio_metric(self, y_val, y_val_pred):

        y_val = y_val.flatten()
        y_val_pred = self.scaler.inverse_transform(y_val_pred.reshape(-1, 1)).flatten()
        y_val = self.scaler.inverse_transform(y_val.reshape(-1, 1)).flatten()

        df = pd.DataFrame(
            {'target' : y_val.flatten(), 'prediction' : y_val_pred}
        )

        def sign_func(x, threshold = 0):
            if x > threshold:
                return 1
            elif x < -threshold:
                return -1
            else:
                return 0
            
        df['prediction'] = df['prediction'].pct_change()
        df['target'] = df['target'].pct_change() # r_t
        df = df.dropna()

        df['prediction'] = df['prediction'].apply(sign_func) # 1 if r_t >0, -1 if r_t <0
        df['prediction'] = df['prediction'] * df['target'] # (1 or -1 or 1) * r_t
        aSD = np.sqrt(252) * (df['prediction'].std())
        df['prediction'] = df['prediction'] + 1
        df['prediction'] = df['prediction'].cumprod() # K_t -> equity curve

        aRC = (df['prediction'].values[-1]) ** (252 / (len(df))) - 1
        #print(aSD, aRC)
        
        return aRC/aSD

    def on_epoch_end(self, epoch, logs=None):
        X_val, y_val = self.validation_data
        y_val_pred = self.model.predict(X_val, verbose=0).flatten()

        # Compute custom metric
        score = self.information_ratio_metric(y_val, y_val_pred)

        # Save best model based on custom metric
        if score > self.best_score:
            self.best_score = score
            self.model.save(self.filepath)
            print(f"\nEpoch {epoch + 1}: Information Ratio improved to {score:.5f}, saving model.\n")

def build_and_train_model(
    
    # Data
    X_train, y_train,
    X_val,   y_val,

    # StandardScaler to transform preds into prices for inner IR optimization
    scaler_y,
    
    # Grid search params
    dropout_rate=0.2, learning_rate=0.001, epochs=10, batch_size=32,

    # For comments in console
    verbose=0
):
    
    # 1) Build the model
    model = Sequential()
    model.add(Input(shape=(X_train.shape[1], X_train.shape[2])))  # explicit input layer
    model.add(LSTM(128, return_sequences = True))
    model.add(Dropout(dropout_rate))
    model.add(LSTM(64, return_sequences = True))
    model.add(Dropout(dropout_rate))
    model.add(LSTM(32))
    model.add(Dense(1))  # single numeric output

    # 2) Compile
    optimizer = Adam(learning_rate=learning_rate)
    model.compile(loss='mean_squared_error', optimizer=optimizer)

    # 3) Define custom checkpoint callback
    custom_checkpoint = CustomCheckpoint(validation_data=(X_val, y_val), scaler = scaler_y)

    # 4) Train the model with the custom checkpoint
    model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=batch_size,
        verbose=verbose,
        callbacks=[custom_checkpoint]
    )

    # 5) Load the best model (selected based on information_ratio_metric)
    best_model = load_model("best_custom_model.keras")


    return best_model

def gridsearch_lstm_walk_forward(
        
    # data
    X_train, y_train,
    X_val, y_val,
    X_test,

    # Gridsearch
    param_grid,

    # parameters
    verbose = 1
):
    
    # StandardScaler to transform the data
    scaler_X = StandardScaler()
    X_train = scaler_X.fit_transform(X_train)
    X_val = scaler_X.transform(X_val)
    X_test = scaler_X.transform(X_test)

    scaler_y = StandardScaler()
    y_train = scaler_y.fit_transform(y_train.values.reshape(-1, 1)).flatten()
    y_val = scaler_y.transform(y_val.values.reshape(-1, 1)).flatten()


    # Reshape the data  
    #X_train, y_train = X_train.to_numpy(), y_train.to_numpy() 
    #X_val, y_val = X_val.to_numpy(), y_val.to_numpy()
    #X_test = X_test.to_numpy()

    X_train = X_train.reshape(X_train.shape[0], 1, X_train.shape[1])
    X_val   = X_val.reshape(X_val.shape[0], 1, X_val.shape[1])
    X_test  = X_test.reshape(X_test.shape[0], 1, X_test.shape[1])


    # Perform gridsearch for given data and params grid
    param_keys, param_values = zip(*param_grid.items()) 

    for combo in product(*param_values):

        current_params = dict(zip(param_keys, combo))

        # Train model on train, measure val RMSE
        model = build_and_train_model(
            X_train, y_train,
            X_val,   y_val,
            scaler_y,
            dropout_rate=current_params.get('dropout_rate'),
            learning_rate=current_params.get('learning_rate'),
            epochs=current_params.get('epochs'),
            batch_size=current_params.get('batch_size'),
            verbose = verbose
        )

    # Predict on test with best model
    test_preds = model.predict(X_test).ravel()
    test_preds = scaler_y.inverse_transform(test_preds.reshape(-1, 1)).flatten()

    return test_preds

def walk_forward_block_optimization(
    X, y,
    n_blocks = 10, 
    train_blocks = 5, val_blocks = 1, test_blocks = 1,
    param_grid=None,
    verbose=1
):
    
    if train_blocks + val_blocks + test_blocks > n_blocks:
        print('Incorrect number of blocks specified')
        return 0
    
    n = len(X)
    block_size = n // n_blocks  # Number of samples per block
    all_results = []

    for start_block in range(n_blocks - (train_blocks + val_blocks + test_blocks) + 1):

        # Define block indices
        train_start = start_block * block_size
        train_end = train_start + train_blocks * block_size
        val_end = train_end + val_blocks * block_size
        test_end = val_end + test_blocks * block_size

        # Extract rolling block window data
        X_train, y_train = X.iloc[train_start:train_end], y.iloc[train_start:train_end]
        X_val, y_val = X.iloc[train_end:val_end], y.iloc[train_end:val_end]
        X_test, y_test = X.iloc[val_end:test_end], y.iloc[val_end:test_end]

        if verbose:
            print(f"Iteration {start_block+1}/{n_blocks - (train_blocks + val_blocks + test_blocks) + 1}: Train [{train_start}:{train_end}], Val [{train_end}:{val_end}], Test [{val_end}:{test_end}]")


        # Run gridsearch on this block-wise set
        test_preds = gridsearch_lstm_walk_forward(

            X_train, y_train,
            X_val, y_val,
            X_test,
            param_grid,
            verbose
        )

        # Store results
        segment_results = pd.DataFrame({
            "timestamp": X_test.index,  # Keep correct timestamps
            "true_value": y_test['target'],
            "predicted_value": test_preds
        })
        all_results.append(segment_results)

    # Concatenate results from all splits
    final_results = pd.concat(all_results, ignore_index=True)
    final_results.index = final_results['timestamp']
    final_results = final_results[['true_value', 'predicted_value']]
    return final_results


### Data

In [5]:
TICKERS = get_tickers()

# All stocks

In [None]:
# List to store all the results
results = []

# Iterate over all stocks
for ticker in TICKERS:

    # Load the data
    print(f'Loading data for {ticker}...')
    X_nlp, y_nlp = load_ticker(ticker, include_NLP = True)
    X_base, y_base = load_ticker(ticker, include_NLP = False)
    print(f'Finished loading data for {ticker}')

    # Define hyperparam grid
    hyperparams_grid = {
    'dropout_rate': [0.0],
    'learning_rate': [0.01],
    'epochs': [5],
    'batch_size' : [32]}

    # Run the model for base scenario
    final_results_base = walk_forward_block_optimization(
    X = X_base,
    y = y_base,
    n_blocks = 15,
    train_blocks = 10,
    val_blocks = 1,
    test_blocks = 1,
    param_grid = hyperparams_grid,
    verbose = 0)

    # Run the model for NLP scenario
    final_results_nlp = walk_forward_block_optimization(
    X = X_nlp,
    y = y_nlp,
    n_blocks = 15,
    train_blocks = 10,
    val_blocks = 1,
    test_blocks = 1,
    param_grid = hyperparams_grid,
    verbose = 0)

    # Store results
    results.append([ticker, final_results_base, final_results_nlp])

### Backtest

In [18]:
def is_trading_hour(idx):
    """
    Return True if timestamp idx is a weekday (Mon-Fri) between 09:30 and 16:30.
    Otherwise False.
    """
    # idx is a pandas Timestamp
    if idx.weekday() >= 5:  # 5=Saturday,6=Sunday => market closed
        return False
    # Check time of day
    if idx.time() < datetime.time(9, 30) or idx.time() >= datetime.time(16, 30):
        return False
    return True

def sign_func(x, threshold=0.0):
    if x > threshold:
        return 1
    elif x < -threshold:
        return -1
    else:
        return 0

def backtest_equity_with_mask(df):

    # Get data
    df = df.copy()
    df['actual_return'] = df['true_value'].pct_change()
    df['pred_return'] = df['predicted_value'].pct_change()

    # Generate all signals and map trading hours
    df['signal'] = df['pred_return'].apply(sign_func)
    df['in_market'] = df.index.map(is_trading_hour).astype(int)  

    # Calculate returns
    df['signal'] = df['signal'] * df['in_market']  
    df['strategy_return'] = df['signal'].shift(1) * df['actual_return']

    # Calculate equity curve
    df['strategy_equity'] = (1 + df['strategy_return'].fillna(0)).cumprod()
    df['buy_and_hold']    = (1 + df['actual_return'].fillna(0)).cumprod()


    return df

In [84]:
COSTS = 0.0

def backtest_long_only(df: pd.DataFrame, cost_rate: float = COSTS) -> pd.DataFrame:
    """
    1) Compute actual_return, pred_return
    2) Convert pred_return -> raw_signal = {+1,0,-1} via sign_func
    3) For 'long only', we set negative signals to 0 => final_signal = max(raw_signal, 0)
    4) Zero out signals if outside trading hours
    5) strategy_return[t] = signal[t-1] * actual_return[t] - transaction_cost
    6) Build equity curves
    """

    df = df.copy()

    # 1) Compute returns
    df['actual_return'] = df['true_value'].pct_change()
    df['pred_return']   = df['predicted_value'].pct_change()

    # 2) Raw signal
    df['raw_signal'] = df['pred_return'].apply(sign_func)

    # 3) Long only => if raw_signal < 1 => 0
    df['signal'] = df['raw_signal'].clip(lower=0)

    # 4) Zero-out signals outside market hours
    df['in_market'] = df.index.map(is_trading_hour).astype(int)
    df['signal'] = df['signal'] * df['in_market']

    # 5) Strategy return
    #    - shift the signal by 1 so position[t-1] is applied to actual_return[t]
    df['strategy_return'] = df['signal'].shift(1) * df['actual_return']

    # Transaction cost (simplified):
    # cost_rate * abs(pos_change)
    df['pos_change'] = df['signal'].diff().fillna(df['signal'])
    df['transaction_cost'] = cost_rate * df['pos_change'].abs()
    df['strategy_return'] = df['strategy_return'] - df['transaction_cost'].fillna(0)

    # 6) Equity curves
    df['strategy_equity'] = (1 + df['strategy_return'].fillna(0)).cumprod()
    df['buy_and_hold']    = (1 + df['actual_return'].fillna(0)).cumprod()

    return df

def backtest_leveraged_long_only(df: pd.DataFrame, cost_rate: float = COSTS) -> pd.DataFrame:
    """
    Similar to backtest_long_only, but consecutive +1 signals => keep adding more.
    The moment the signal is not +1, we go flat (position=0).
    """

    df = df.copy()

    # 1) Compute returns
    df['actual_return'] = df['true_value'].pct_change()
    df['pred_return']   = df['predicted_value'].pct_change()

    # 2) raw signal => +1/0/-1
    df['raw_signal'] = df['pred_return'].apply(sign_func)

    # 3) Build 'signal' in a loop:
    #    - if raw_signal[t] == +1, position[t] = position[t-1] + 1
    #    - else 0
    position = 0
    sig_array = []
    for t in range(len(df)):
        val = df['raw_signal'].iloc[t]
        if val == 1:
            position += 1
        else:
            position = 0
        sig_array.append(position)

    df['signal'] = sig_array

    # 4) Zero out if outside market hours
    df['in_market'] = df.index.map(is_trading_hour).astype(int)
    df['signal'] = df['signal'] * df['in_market']

    # 5) Strategy return
    df['strategy_return'] = df['signal'].shift(1) * df['actual_return']

    # Transaction cost
    df['pos_change'] = df['signal'].diff().fillna(df['signal'])
    df['transaction_cost'] = cost_rate * df['pos_change'].abs()
    df['strategy_return'] = df['strategy_return'] - df['transaction_cost'].fillna(0)

    # 6) Equity curves
    df['strategy_equity'] = (1 + df['strategy_return'].fillna(0)).cumprod()
    df['buy_and_hold']    = (1 + df['actual_return'].fillna(0)).cumprod()

    return df

def backtest_both_ways(df: pd.DataFrame, cost_rate: float = COSTS) -> pd.DataFrame:
    """
    Both ways (one lot):
      - raw_signal=+1 => position=+1
      - raw_signal=-1 => position=-1
      - raw_signal= 0 => position=0
    """

    df = df.copy()

    # 1) returns
    df['actual_return'] = df['true_value'].pct_change()
    df['pred_return']   = df['predicted_value'].pct_change()

    # 2) raw_signal => +1/-1/0
    df['raw_signal'] = df['pred_return'].apply(sign_func)

    # 3) Final signal => same as raw_signal, but ensure it's only -1,0,+1
    #    (sign_func already ensures that)
    df['signal'] = df['raw_signal']

    # 4) Zero out if outside hours
    df['in_market'] = df.index.map(is_trading_hour).astype(int)
    df['signal'] = df['signal'] * df['in_market']

    # 5) Strategy return
    df['strategy_return'] = df['signal'].shift(1) * df['actual_return']

    # cost
    df['pos_change'] = df['signal'].diff().fillna(df['signal'])
    df['transaction_cost'] = cost_rate * df['pos_change'].abs()
    df['strategy_return'] = df['strategy_return'] - df['transaction_cost'].fillna(0)

    # 6) equity
    df['strategy_equity'] = (1 + df['strategy_return'].fillna(0)).cumprod()
    df['buy_and_hold']    = (1 + df['actual_return'].fillna(0)).cumprod()

    return df

def backtest_both_ways_leveraged(df: pd.DataFrame, cost_rate: float = COSTS) -> pd.DataFrame:
    """
    Both ways leveraged:
      - If raw_signal[t]==+1 repeatedly => accumulate +1 each bar
      - If raw_signal[t]==-1 repeatedly => accumulate -1 each bar
      - Flip from + to - => close out all (pos=0), then open -1
      - Flip from - to + => close out all (pos=0), then open +1
      - raw_signal=0 => go flat
    """

    df = df.copy()

    # 1) returns
    df['actual_return'] = df['true_value'].pct_change()
    df['pred_return']   = df['predicted_value'].pct_change()

    # 2) raw_signal => +1/0/-1
    df['raw_signal'] = df['pred_return'].apply(sign_func)

    # 3) Build 'signal' in a loop
    position = 0
    final_sig = []
    for t in range(len(df)):
        s = df['raw_signal'].iloc[t]
        if s > 0:  # +1
            if position >= 0:
                # keep adding +1
                position += 1
            else:
                # we were negative, close out
                position = 1
        elif s < 0:  # -1
            if position <= 0:
                # keep adding -1
                position -= 1
            else:
                # we were positive, close out
                position = -1
        else:
            # s=0 => go flat
            position = 0
        final_sig.append(position)

    df['signal'] = final_sig

    # 4) zero out if outside market hours
    df['in_market'] = df.index.map(is_trading_hour).astype(int)
    df['signal'] = df['signal'] * df['in_market']

    # 5) strategy_return
    df['strategy_return'] = df['signal'].shift(1) * df['actual_return']

    # cost
    df['pos_change'] = df['signal'].diff().fillna(df['signal'])
    df['transaction_cost'] = cost_rate * df['pos_change'].abs()
    df['strategy_return'] = df['strategy_return'] - df['transaction_cost'].fillna(0)

    # 6) equity
    df['strategy_equity'] = (1 + df['strategy_return'].fillna(0)).cumprod()
    df['buy_and_hold']    = (1 + df['actual_return'].fillna(0)).cumprod()

    return df


In [87]:
# Dataframe to store all backtests
combined = backtest_long_only(results[0][1]).copy()
combined = combined[['buy_and_hold']]

for iter in range(len(results)):

    # Get ticker name
    ticker = results[iter][0]

    # Get model outputs
    base, nlp = results[iter][1], results[iter][2]

    # Collect all models

    combined[f'Long_only_base_{ticker}'] = backtest_long_only(base)['strategy_equity']
    combined[f'Long_only_nlp_{ticker}'] = backtest_long_only(nlp)['strategy_equity']

    combined[f'leveraged_long_only_base_{ticker}'] = backtest_leveraged_long_only(base)['strategy_equity']
    combined[f'leveraged_long_only_nlp_{ticker}'] = backtest_leveraged_long_only(nlp)['strategy_equity']

    combined[f'Both_ways_base_{ticker}'] = backtest_both_ways(base)['strategy_equity']
    combined[f'Both_ways_nlp_{ticker}'] = backtest_both_ways(nlp)['strategy_equity']

    combined[f'both_ways_leveraged_base_{ticker}'] = backtest_both_ways_leveraged(base)['strategy_equity']
    combined[f'both_ways_leveraged_nlp_{ticker}'] = backtest_both_ways_leveraged(nlp)['strategy_equity']

combined['final_leveraged_long_only_base'] = combined[[elem for elem in combined.columns if 'leveraged_long_only_base' in elem]].mean(axis = 1)
combined['final_Long_only_base'] = combined[[elem for elem in combined.columns if 'Long_only_base' in elem]].mean(axis = 1)
combined['final_both_ways_leveraged_base'] = combined[[elem for elem in combined.columns if 'both_ways_leveraged_base' in elem]].mean(axis = 1)
combined['final_Both_ways_base'] = combined[[elem for elem in combined.columns if 'Both_ways_base' in elem]].mean(axis = 1)

combined['final_leveraged_long_only_nlp'] = combined[[elem for elem in combined.columns if 'leveraged_long_only_nlp' in elem]].mean(axis = 1)
combined['final_Long_only_nlp'] = combined[[elem for elem in combined.columns if 'Long_only_nlp' in elem]].mean(axis = 1)
combined['final_both_ways_leveraged_nlp'] = combined[[elem for elem in combined.columns if 'both_ways_leveraged_nlp' in elem]].mean(axis = 1)
combined['final_Both_ways_nlp'] = combined[[elem for elem in combined.columns if 'Both_ways_nlp' in elem]].mean(axis = 1)

combined['final_BH'] = combined['buy_and_hold']
combined = combined[[elem for elem in combined.columns if 'final_' in elem]]

### Basket of all

In [None]:
def plot_comparison(combined):
    # Example color palettes (4 shades of green, 4 shades of blue)
    green_shades = ['limegreen', 'forestgreen', 'seagreen', 'darkgreen']
    blue_shades  = ['dodgerblue', 'royalblue', 'cornflowerblue', 'navy']

    # The "base names" of your strategies in the DataFrame columns
    strategies = [
        'Long_only',
        'leveraged_long_only',
        'Both_ways',
        'both_ways_leveraged'
    ]

    fig, ax1 = plt.subplots(figsize=(15, 8))

    for i, st in enumerate(strategies):
        # 1) "base" version => pick a green color
        col_base = f'final_{st}_base'
        ax1.plot(
            combined[col_base], 
            label=f'{st}_base',
            color=green_shades[i]
        )

        # 2) "nlp" version => pick a blue color
        col_nlp = f'final_{st}_nlp'
        ax1.plot(
            combined[col_nlp], 
            label=f'{st}_nlp',
            color=blue_shades[i]
        )

    ax1.plot(combined['final_BH'], label = 'Buy And Hold', color = 'red')

    ax1.set_title('Comparison of B&H, LSTM_NLP and LSTM_Base')
    ax1.legend()
    ax1.grid(True)
    plt.show()

plot_comparison(combined)

In [142]:
def MaximumDrawdown(equity, text = ""):

    # Drawdown calculation
    equity['drawdowns'] = 1 - equity[text]/(equity[text].cummax())
    max_drawdown = equity['drawdowns'].max()
    start_index = equity['drawdowns'].idxmax()
    start_index = equity[text].loc[:start_index].idxmax()
    
    # Find the level of equity at the start of the drawdown
    start_equity_level = equity[text].loc[start_index]
    
    # Search for the index where equity returns to a level higher than or equal to the level at the start of the drawdown
    equity = equity.loc[start_index:]
    end_index = equity[text][equity[text] > start_equity_level].index.min()

    if str(start_index) == 'NaT':
        start_index = equity.index[0]
    if str(end_index) == 'NaT':
        end_index = equity.index[-1]


    return max_drawdown, start_index, end_index

def PerformanceMetrics(df, fix = 'base'):

    # Data
    equity = df.copy()
    equity = equity[[fix]]
    equity['Daily_return'] = equity[fix].pct_change()
    equity = equity.dropna()

    # ARC
    ARC = (equity[fix].values[-1])**(252/(len(equity)/24)) - 1
    

    # aSD
    aSD = (equity[f'Daily_return'].std()) * (np.sqrt(252))
    

    # Maximum Drawdown and Maximum Drawdown Duration
    MD, start_drawdown, end_drawdown = MaximumDrawdown(equity, fix)
    MLD = np.abs((end_drawdown - start_drawdown).days)/252.03

    # Information Ratio *
    IR1 = ARC/aSD

    # Information Ratio **
    IR2 = ARC**3/(aSD*ARC*MD)

    # Information Ratio ***
    IR3 = ARC**3/(aSD*MD*MLD)

    #print(f'ARC = {round(100*ARC, 3)}% aSD = {round(aSD, 5)} MD = {round(100*MD, 3)}% MLD = {round(MLD, 3)}yrs IR1 = {round(IR1, 3)} IR2 = {round(IR2, 3)} IR3 = {round(IR3, 3)}')

    metrics = {
        'name' : fix,
        'ARC': 100 * ARC,
        'aSD': aSD,
        'MD': 100 * MD,
        'MLD': MLD,
        'IR1': IR1,
        'IR2': IR2,
        'IR3': IR3
    }
    
    return metrics

def PerformanceMetricsTable(combined):

    performance_metrics_table = pd.concat(
    [pd.DataFrame(PerformanceMetrics(combined, fix), index = list(range(1)))
     for fix in combined.columns])

    performance_metrics_table.index = performance_metrics_table['name']
    performance_metrics_table = performance_metrics_table[['ARC', 'aSD', 'MD', 'MLD', 'IR1', 'IR2', 'IR3']]
    performance_metrics_table

    return performance_metrics_table.round(3)

In [None]:
table = PerformanceMetricsTable(combined)
table