In [None]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import torch.optim as optim
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import ta
from collections import Counter

In [None]:
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# print(device)

In [None]:
def prepare_gru_data(data, feature_columns, target_column, sequence_length=50):
    """
    Prepares data for GRU by creating sequences of features and corresponding targets.
    """
    # Normalize the feature columns (convert to NumPy array explicitly)
    scaler = MinMaxScaler()
    data[feature_columns] = scaler.fit_transform(data[feature_columns].to_numpy())

    # Create sequences
    X, y = [], []
    for i in range(len(data) - sequence_length):
        X.append(data[feature_columns].iloc[i:i + sequence_length].values)
        y.append(data[target_column].iloc[i + sequence_length])

    return np.array(X), np.array(y)

In [None]:
class GRUModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super().__init__()
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        out, _ = self.gru(x)
        out = self.fc(out[:, -1, :])  # use last hidden
        return out

In [None]:
# Function from classifier

def create_labels(df, look_ahead_period=1, threshold=0.01):
    """
    Adds a "Action" column to the df
    2 = buy, 0 = sell, 1 = hold   has to be different for GRU
    """
    df["Future Price"] = df["Close"].shift(-look_ahead_period)
    df["Price Change"] = (df["Future Price"] - df["Close"]) / df["Close"]

    df["Action"] = 1
    df.loc[df["Price Change"] > threshold, "Action"] = 2  # We buy
    df.loc[df["Price Change"] < -threshold, "Action"] = 0

    df = df.dropna(subset=["Future Price"])
    return df


def add_technical_indicators(df, sma_short=20, sma_long=50, bollinger_window=20, 
                             macd_fast=12, macd_slow=26, macd_signal=9, rsi_window=14):
    """
    Add technical indicators as features to the df
    """
    # Short and Long SMA
    df[f'SMA_{sma_short}'] = df['Close'].rolling(window=sma_short).mean()
    df[f'SMA_{sma_long}'] = df['Close'].rolling(window=sma_long).mean()

    # Bollinger Bands
    df[f'BB_High'] = ta.volatility.bollinger_hband(df['Close'], window=bollinger_window)
    df[f'BB_Low'] = ta.volatility.bollinger_lband(df['Close'], window=bollinger_window)

    # MACD (Moving Average Convergence Divergence)
    df[f'MACD'] = ta.trend.macd(df['Close'], window_slow=macd_slow, window_fast=macd_fast)
    df[f'MACD_Signal'] = ta.trend.macd_signal(df['Close'], window_slow=macd_slow, window_fast=macd_fast, window_sign=macd_signal)

    # RSI (Relative Strength Index)
    df[f'RSI'] = ta.momentum.rsi(df['Close'], window=rsi_window)

    # Backfill missing values
    df = df.fillna(method='bfill')
    return df

## Functions Above!

In [None]:
# Read in all the data
ticker = "AAPL"
data_path = f"../data/{ticker}_processed_hourly_data.csv"
raw_data = pd.read_csv(data_path, index_col='Datetime', parse_dates=True)

data = create_labels(raw_data)
data = add_technical_indicators(data)

In [None]:
feature_columns = ['SMA_20', 'SMA_50', 'RSI', 'MACD', 'Volume']  # Replace with your actual features
# print(data[feature_columns].head())  
print(data[feature_columns].dtypes) 

In [None]:
# Set up the data
feature_columns = ['SMA_20', 'SMA_50', 'RSI', 'MACD', 'Volume']  
target_column = 'Action' 
sequence_length = 50

X, y = prepare_gru_data(data, feature_columns, target_column, sequence_length)

In [None]:
# Model parameters
input_size = X.shape[2]  # Number of features
hidden_size = 64
num_layers = 2
output_size = 3  

model = GRUModel(input_size, hidden_size, num_layers, output_size)

In [None]:
# Split data and then convert them to tensors
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

# Define loss and optimizer (we are changing weights so it doesnt bias hold to minimise loss)
class_counts = torch.tensor([63, 2398, 70], dtype=torch.float32)  # Sell, Hold, Buy counts
class_weights = 1.0 / class_counts
class_weights = class_weights / class_weights.sum()  # Normalize weights

# Use the weights in CrossEntropyLoss
criterion = nn.CrossEntropyLoss(weight=class_weights.to(y_train_tensor.device))
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Training loop
epochs = 20
batch_size = 32

for epoch in range(20):
    model.train()
    for i in range(0, len(X_train_tensor), batch_size):
        X_batch = X_train_tensor[i:i + batch_size]
        y_batch = y_train_tensor[i:i + batch_size].long()  # Ensure target is Long type

        optimizer.zero_grad()
        outputs = model(X_batch)  # Logits
        loss = criterion(outputs, y_batch)  # CrossEntropyLoss
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch+1}/20, Loss: {loss.item()}")

In [None]:
# Evaluate the model
model.eval()
with torch.no_grad():
    # Get predictions
    y_pred = model(X_test_tensor)  # Logits
    y_pred_class = torch.argmax(y_pred, axis=1)  # Predicted classes (0, 1, 2)

# Calculate overall accuracy
accuracy = (y_pred_class == y_test_tensor).float().mean().item()
print(f"Overall Test Accuracy: {accuracy * 100:.2f}%")

# Calculate accuracy for Buy (2) and Sell (0) only
mask = (y_test_tensor == 0) | (y_test_tensor == 2)  # Filter for 0 (Sell) and 2 (Buy)
y_test_filtered = y_test_tensor[mask]
y_pred_filtered = y_pred_class[mask]
buy_sell_accuracy = (y_test_filtered == y_pred_filtered).float().mean().item()
print(f"Accuracy for Buy/Sell (0 and 2): {buy_sell_accuracy * 100:.2f}%")

# Fix visualization of predictions vs actual values
plt.figure(figsize=(12, 6))
plt.plot(y_test_tensor.cpu().numpy(), label="Actual", linestyle="--", alpha=0.7)
plt.plot(y_pred_class.cpu().numpy(), label="Predicted", alpha=0.7)
plt.xlabel("Sample Index")
plt.ylabel("Class (0 = Sell, 1 = Hold, 2 = Buy)")
plt.title("Actual vs Predicted Actions")
plt.legend()
plt.grid()
plt.show()