In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
from torch import nn

from sklearn.metrics import mean_absolute_error

In [None]:
"""
Functions for model training
"""

def df_to_Xy(df, window_size=10):
  df_as_np = df.to_numpy()
  X = []
  y = []
  for i in range(len(df_as_np)-window_size):
    row = [r for r in df_as_np[i:i+window_size]]
    X.append(row)
    label = df_as_np[i+window_size][3]
    y.append(label)
  return np.array(X), np.array(y)

def df_to_Xy_future(df, window_size=10, future_step=2):
    """
    Converts a DataFrame into sequences of input data (X) and labels (y).
    
    Parameters:
    - df: DataFrame containing the data.
    - window_size: Number of past data points to include in each input sequence.
    - future_step: Number of steps ahead to predict.

    Returns:
    - X: Input features as numpy array.
    - y: Target labels as numpy array.
    """
    df_as_np = df.to_numpy()
    X = []
    y = []
    for i in range(len(df_as_np) - window_size - future_step):
        # Extract the sequence of features
        row = df_as_np[i:i + window_size]
        X.append(row)
        # Extract the target label future_step ahead
        label = df_as_np[i + window_size + future_step - 1][3]  # Index 3 is 'Close'
        y.append(label)
    return np.array(X), np.array(y)

# standardize the data with training set mean and standard deviation                 
def preprocess(X, mean, std, i):
  X[:, :, i] = (X[:, :, i] - mean) / std
  return X

class CNNModel(nn.Module):
  def __init__(self):
      super(CNNModel, self).__init__()
      # Conv1D layer
      self.conv1d = nn.Conv1d(in_channels=7, out_channels=64, kernel_size=2)
      # Flatten layer is implicit in PyTorch
      self.fc1 = nn.Linear(64 * 9, 8)  # Adjusted input size to 64 * 4 = 256
      self.relu = nn.ReLU()
      self.fc2 = nn.Linear(8, 1)  # Final output layer

  def forward(self, x):
      x = self.conv1d(x)  # Apply Conv1D
      #print(f"After Conv1D: {x.shape}")  # Debugging: Print shape after Conv1D
      x = x.view(x.size(0), -1)  # Flatten (equivalent to Flatten layer in Keras)
      #print(f"After Flatten: {x.shape}")  # Debugging: Print shape after flatten
      x = self.relu(self.fc1(x))  # Fully connected + ReLU
      x = self.fc2(x)  # Final linear layer
      return x

# Function to plot predictions
def plot_predictions(model, X, y, title, plot_dir, filename, mean, std):
    model.eval()  # Ensure the model is in evaluation mode
    with torch.no_grad():
        # Transpose X to match Conv1D input format (Batch Size, Channels, Sequence Length)
        X_tensor = torch.tensor(X, dtype=torch.float32).transpose(1, 2)
        predictions = model(X_tensor).flatten().numpy()  # Get predictions

    # Create a DataFrame to compare predictions with actuals
    df = pd.DataFrame(data={"Predictions": predictions, "Actuals": y})

    # Plot predictions vs. actuals
    plt.figure(figsize=(10, 6))
    plt.plot(df["Predictions"], label="Predictions")
    plt.plot(df["Actuals"], label="Actuals")
    plt.ylabel("Stock Price ($)")
    plt.title(title)
    plt.legend()

    # Save the plot in the date-based directory
    plt.savefig(os.path.join(plot_dir, filename))
    plt.close()

    return df, mean_absolute_error(y, predictions)


In [None]:
# Function to plot predictions
def plot_whole(model, X, y, title, show_plot):
    model.eval()  # Ensure the model is in evaluation mode
    with torch.no_grad():
      # Transpose X to match Conv1D input format (Batch Size, Channels, Sequence Length)
      X_tensor = torch.tensor(X, dtype=torch.float32).transpose(1, 2)
      predictions = model(X_tensor).flatten().numpy()  # Get predictions

    # Create a DataFrame to compare predictions with actuals
    df = pd.DataFrame(data={"Predictions": predictions, "Actuals": y})
    
    # Plot predictions vs. actuals
    plt.figure(figsize=(10, 6))
    plt.plot(df["Predictions"], label="Predictions")
    plt.plot(df["Actuals"], label="Actuals")
    plt.ylabel("Stock Price ($)")
    plt.title(title)
    plt.legend()
    
    if show_plot:
      plt.show()
    plt.close()
    return df

In [None]:
def backtest_strategy(df, initial_capital=10000, transaction_cost=0.0, 
                      price_change_threshold=0.002, rolling_window=40, 
                      cooldown_period=4, trend_window=4, initial_stock_allocation=0.5, 
                      trade_proportion=0.50):
    """
    Backtests a refined trading strategy using predictions and actual prices.
    
    Parameters:
    - df: DataFrame with 'Predictions' and 'Actuals' columns.
    - initial_capital: Starting capital for the backtest.
    - transaction_cost: Proportional transaction cost (e.g., 0.001 for 0.1% cost).
    - price_change_threshold: Minimum price change percentage to trigger a trade (e.g., 0.05 for 5%).
    - rolling_window: Window size for rolling averages.
    - cooldown_period: Minimum steps between trades.
    - trend_window: Minimum consecutive steps for a trend.
    - initial_stock_allocation: Fraction of the initial portfolio to allocate to stock holdings (default: 50%).
    - trade_proportion: Fraction of the position or capital to trade (default: 25%).

    Returns:
    - portfolio: List of portfolio values over time.
    - final_value: Final portfolio value.
    - trade_counts: Tuple of total buys and sells (Buys, Sells).
    - final_capital: Final free cash flow.
    - final_stock_value: Value of stocks held at the end.
    """
    # Calculate initial stock holdings
    starting_stock_value = initial_capital * initial_stock_allocation
    initial_stock_price = df['Actuals'].iloc[0]
    position = starting_stock_value // initial_stock_price  # Number of shares
    capital = initial_capital - position * initial_stock_price  # Remaining free cash after purchase

    portfolio = [capital + position * initial_stock_price]  # Start with total portfolio value
    last_trade = -cooldown_period  # Initialise cooldown tracker
    max_price_since_buy = 0  # Track highest price since last buy

    # Trade counters
    buy_count = 0
    sell_count = 0
    
    # Calculate rolling averages for predictions and actuals
    df['Smoothed_Predictions'] = df['Predictions'].rolling(rolling_window).mean()
    df['Smoothed_Actuals'] = df['Actuals'].rolling(rolling_window).mean()
    
    # Calculate moving averages for technical indicator
    df['Short_MA'] = df['Actuals'].rolling(window=5).mean()
    df['Long_MA'] = df['Actuals'].rolling(window=20).mean()

    for i in range(max(rolling_window, 20), len(df)):
        # Update live portfolio value
        portfolio_value = capital + position * df['Actuals'].iloc[i]

        stop_loss_triggered = False
        
        # Skip if cooldown period not met
        if i - last_trade < cooldown_period:
            portfolio.append(portfolio_value)
            continue

        # Trend indicators
        is_uptrend = all(df['Smoothed_Predictions'].iloc[i - j] > df['Smoothed_Predictions'].iloc[i - j - 1] for j in range(1, trend_window + 1))
        is_downtrend = all(df['Smoothed_Predictions'].iloc[i - j] < df['Smoothed_Predictions'].iloc[i - j - 1] for j in range(1, trend_window + 1))
        steep_drop = df['Smoothed_Predictions'].iloc[i] < df['Smoothed_Predictions'].iloc[i - trend_window] * (1 - price_change_threshold)
        recent_momentum = (df['Actuals'].iloc[i] - df['Actuals'].iloc[i - trend_window]) / df['Actuals'].iloc[i - trend_window]
        steep_decline = recent_momentum < -price_change_threshold
        slow_growth = all(df['Smoothed_Predictions'].iloc[i - j] < df['Smoothed_Predictions'].iloc[i - j + 1] for j in range(trend_window, trend_window + 5))

        # Dynamic trade proportion
        trend_strength = abs(df['Smoothed_Predictions'].iloc[i] - df['Smoothed_Predictions'].iloc[i - trend_window]) / df['Smoothed_Predictions'].iloc[i - trend_window]
        trade_proportion = min(0.5, max(0.1, trend_strength))

        # Moving averages
        short_ma = df['Short_MA'].iloc[i]
        long_ma = df['Long_MA'].iloc[i]
        ma_crossover_up = short_ma > long_ma
        ma_crossover_down = short_ma < long_ma

        # Stop-loss tracking
        if position > 0:
            max_price_since_buy = max(max_price_since_buy, df['Actuals'].iloc[i])
            stop_loss_triggered = df['Actuals'].iloc[i] < max_price_since_buy * (1 - 0.1)  # 10% drop

        # Trading logic
        if (is_uptrend or slow_growth) and ma_crossover_up:  # Buy
            max_shares_to_buy = capital // df['Actuals'].iloc[i]
            shares_to_buy = int(max_shares_to_buy * trade_proportion)
            if shares_to_buy > 0:
                capital -= shares_to_buy * df['Actuals'].iloc[i] * (1 + transaction_cost)
                position += shares_to_buy
                max_price_since_buy = df['Actuals'].iloc[i]  # Update after buying
                last_trade = i
                buy_count += 1
        elif (is_downtrend or steep_drop or steep_decline or ma_crossover_down or stop_loss_triggered):  # Sell
            shares_to_sell = int(position * trade_proportion)
            if shares_to_sell > 0:
                capital += shares_to_sell * df['Actuals'].iloc[i] * (1 - transaction_cost)
                position -= shares_to_sell
                last_trade = i
                sell_count += 1

        # Append updated portfolio value
        portfolio_value = capital + position * df['Actuals'].iloc[i]
        portfolio.append(portfolio_value)

    # Final portfolio components
    final_stock_value = position * df['Actuals'].iloc[-1]
    final_value = capital + final_stock_value

    # Return buy and sell counts as a tuple
    trade_counts = (buy_count, sell_count)
    
    return portfolio, final_value, trade_counts, capital, final_stock_value


In [None]:
def initialize_backtest(company, checkpoint_path, show_plot=False, future=False):# change company dataset
    company = company.lower().replace(' ', '_')
    
    df = pd.read_csv(f"data/processed/{company}_processed.csv")
    df = df[['Open', 'High', 'Low', 'Close', 'Volume', 'Time', 'Finbert_score']]

    df.fillna(method='ffill', inplace=True)  # Replace NaNs with the previous row's values
    df.fillna(0, inplace=True)
    
    if future:
        X, y = df_to_Xy_future(df)
    else:
        X, y = df_to_Xy(df)
    
    
    X_train = X[:2000]

    for i in range(2, len(X_train[0,0])):
        mean = np.mean(X_train[:, :, i]) # mean of training temp
        std = np.std(X_train[:, :, i]) # std of training temp
        preprocess(X, mean, std, i)

        # Instantiate the model
    backtest_model = CNNModel()

    # change checkpoint path
    #nvidia_path = 'checkpoints/20241125/intel/checkpoint_epoch_474.pth'
    backtest_checkpoint = torch.load(checkpoint_path)
    backtest_model.load_state_dict(backtest_checkpoint['model_state_dict'])

    # change plot title
    df = plot_whole(backtest_model, X, y, company, show_plot)
    
    return df


def backtest(df, initial_capital=100000, show_plot=True):
    # Example usage:
    portfolio, final_value, num_trades, free_cash_flow, held_value = backtest_strategy(df, initial_capital=initial_capital)
    print(f"Final Portfolio Value: {final_value}")
    print(f'Number of Trades: {num_trades}')
    print("Realized Gain: ", final_value - initial_capital)
    print(f"Final Free Cash Flow (Capital): {free_cash_flow}")
    print(f"Final Value Held in Stocks: {held_value}")

    plt.figure(figsize=(12, 6))
    plt.plot(portfolio, label='Portfolio Value')
    plt.xlabel('Time Steps')
    plt.ylabel('Portfolio Value')
    plt.title('Backtest Portfolio Performance')
    plt.legend()
    plt.grid()
    
    if show_plot:
        plt.show()
    plt.close()
    return portfolio, final_value, num_trades, free_cash_flow, held_value

In [None]:
def extract_company_and_checkpoint(file_path):
    checkpoint_paths = []
    with open(file_path, 'r') as file:
        for line in file:
            if line.strip():  # Skip empty lines
                company = line.split(", Best Epoch")[0].replace("Company: ", "").strip()

                checkpoint_path = line.split("Checkpoint: ")[1].strip().replace("\\", "/")

                checkpoint_paths.append((company, checkpoint_path))
    return checkpoint_paths


In [None]:
# Compute Sharpe Ratio
def compute_sharpe_ratio(portfolio, risk_free_rate, trading_days=252):
    # Calculate daily returns from portfolio values
    returns = np.diff(portfolio) / portfolio[:-1]

    # Convert risk-free rate to daily (assuming 252 trading days per year)
    daily_risk_free_rate = (1 + risk_free_rate) ** (1 / trading_days) - 1

    # Calculate excess returns
    excess_returns = returns - daily_risk_free_rate

    # Compute Sharpe Ratio
    sharpe_ratio = np.mean(excess_returns) / np.std(excess_returns)

    # Annualize Sharpe Ratio (if needed)
    annualized_sharpe = sharpe_ratio * np.sqrt(trading_days)

    return sharpe_ratio, annualized_sharpe

In [None]:
columns = ["company", "portfolio", "final_value", "num_trades", "free_cash_flow", "held_value"]

backtest_results_df = pd.DataFrame(columns=columns)

file_path = "best_epochs/20241126/best_epochs_20241126114831.txt"
checkpoint_paths = extract_company_and_checkpoint(file_path)

risk_free_rate = 0.02
for tup in checkpoint_paths:
    company = tup[0]
    checkpoint_path = tup[1]
    df = initialize_backtest(company, checkpoint_path, show_plot=False)
    
    portfolio, final_value, num_trades, free_cash_flow, held_value = backtest(df, initial_capital=100000)
    backtest_results_df = pd.concat([backtest_results_df, pd.DataFrame([{
        "company": company,
        "portfolio": portfolio,
        "final_value": final_value,
        "num_trades": num_trades,
        "free_cash_flow": free_cash_flow,
        "held_value": held_value
    }])], ignore_index=True)
    
    sharpe, annualized_sharpe = compute_sharpe_ratio(portfolio, risk_free_rate, trading_days=209)

    print(f"Sharpe Ratio: {sharpe:.2f}, Annualized Sharpe Ratio: {annualized_sharpe:.2f}")
    print()

In [None]:
import itertools
from tqdm import tqdm

# Grid search function
def grid_search_backtest(df, initial_capital=10000, risk_free_rate=0.02, trading_days=252):
    # Define the parameter grid
    param_grid = {
        'transaction_cost': [0.0],
        'price_change_threshold': [0.001, 0.002, 0.005],
        'rolling_window': [20, 40, 60],
        'cooldown_period': [2, 4, 8, 16],
        'trend_window': [2, 4, 8, 16],
        'initial_stock_allocation': [0.3, 0.5, 0.7],
        'trade_proportion': [0.25, 0.5, 0.75]
    }

    # Create all combinations of parameters
    param_combinations = list(itertools.product(*param_grid.values()))

    # Column names for the results DataFrame
    columns = list(param_grid.keys()) + ["sharpe_ratio", "annualized_sharpe"]

    # Initialize results DataFrame
    results_df = pd.DataFrame(columns=columns)

    # Perform grid search
    for params in tqdm(param_combinations, desc="Grid Search"):
        param_dict = dict(zip(param_grid.keys(), params))

        # Run backtest with the current parameter set
        portfolio, final_value, _, _, _ = backtest_strategy(
            df,
            initial_capital=initial_capital,
            transaction_cost=param_dict['transaction_cost'],
            price_change_threshold=param_dict['price_change_threshold'],
            rolling_window=param_dict['rolling_window'],
            cooldown_period=param_dict['cooldown_period'],
            trend_window=param_dict['trend_window'],
            initial_stock_allocation=param_dict['initial_stock_allocation'],
            trade_proportion=param_dict['trade_proportion']
        )

        # Compute Sharpe ratio
        sharpe, annualized_sharpe = compute_sharpe_ratio(portfolio, risk_free_rate, trading_days)

        # Store results
        result = {**param_dict, "sharpe_ratio": sharpe, "annualized_sharpe": annualized_sharpe}
        results_df = pd.concat([results_df, pd.DataFrame([result])], ignore_index=True)

    # Return sorted results by Sharpe ratio
    return results_df.sort_values(by="annualized_sharpe", ascending=False)

In [None]:
file_path = "best_epochs/20241126/best_epochs_20241126195347.txt"
checkpoint_paths = extract_company_and_checkpoint(file_path)

for tup in checkpoint_paths:
    company = tup[0]
    checkpoint_path = tup[1]
    
    print(f"Computing Best Sharpe for {company}")
    
    df = initialize_backtest(company, checkpoint_path, show_plot=False)

    # Perform grid search
    results = grid_search_backtest(df, initial_capital=100000, risk_free_rate=0.02, trading_days=209)

    backtest_path = 'backtest_results'
    if not os.path.exists(backtest_path):
        os.makedirs(backtest_path)
        
    results.to_csv(os.path.join(backtest_path, f"{company.lower().replace(' ', '_')}_grid_search_results.csv"), index=False)

    print(f"{company} - Top 5 configurations:")
    print(results.head())
    print()