In [14]:
import torch
import torch.nn as nn
import gym
from gym.spaces import Box
import numpy as np
from statsmodels.tsa.seasonal import seasonal_decompose
import pandas as pd
from stable_baselines3.common.policies import BasePolicy
from sklearn.preprocessing import MinMaxScaler
from stable_baselines3.common.utils import get_device

In [10]:
df = pd.read_csv("smh_stocks_max.csv")
df = df.fillna(0)
df["volume"] = df["volume"].astype(float)

close = df["close"].to_numpy()
tsa = seasonal_decompose(close, model="additive", period=180)
df["trend"] = tsa.trend
df["seasonal"] = tsa.seasonal
df["residual"] = tsa.resid
df = df.fillna(0)

In [11]:
features = df[['volume', 'macd_hist', "macd", "signal_line", "%K", "%D", "ema_200",
       'rsi', "roc", 'bb_upper', 'bb_lower']]
targets = df[["trend","seasonal","residual"]]   

train_size = int(0.8 * len(features))
X_train_set = features.iloc[:train_size, :]
X_test_set = features.iloc[train_size:, :]

y_train_set = targets.iloc[:train_size, :]
y_test_set = targets.iloc[train_size:, :]

scaler = MinMaxScaler((0,1))

X_train_set = scaler.fit_transform(X_train_set.fillna(np.nan).to_numpy())
X_test_set = scaler.transform(X_test_set.fillna(np.nan).to_numpy())

y_train_set = scaler.fit_transform(y_train_set.fillna(np.nan).to_numpy())
y_test_set = scaler.transform(y_test_set.fillna(np.nan).to_numpy())

In [None]:
class CustomLSTMPolicy(BasePolicy):
    def __init__(self, observation_space, action_space, lstm_hidden_size, n_layers, output_size=1, activation_fn=nn.ReLU, **kwargs):
        super(CustomLSTMPolicy, self).__init__(observation_space, action_space, **kwargs)

        self.device = get_device()

        self.lstm = nn.LSTM(input_size=observation_space.shape[2], hidden_size=lstm_hidden_size, num_layers=n_layers, batch_first=True)

        self.actor = self.build_fc(lstm_hidden_size, output_size, activation_fn=activation_fn)
        self.critic = self.build_fc(lstm_hidden_size, 1, activation_fn=activation_fn)

        self.n_layers = n_layers
        self.hidden_size = lstm_hidden_size

    def build_fc(self, input_dim, output_dim, activation_fn):
        layers = []
        layers.append(nn.Linear(input_dim, 64))
        layers.append(activation_fn())
        layers.append(nn.Linear(64, output_dim))
        return nn.Sequential(*layers).to(self.device)
    
    def forward(self, obs):
        out, _ = self.lstm(obs)
        out = out[:,-1,:]

        actor_out = self.actor(out)
        critic_out = self.critic(out)
        return actor_out.unsqueeze(0), critic_out

    def evaluate_actions(self, obs, hidden_state, actions):
        lstm_out, hidden_state = self.lstm(obs, hidden_state)
        action_probs = self.actor(lstm_out[:,-1,:])
        values = self.critic(lstm_out[:,-1,:])

        log_probs = torch.log(action_probs.gather(1, actions.unsqueeze(1)).squeeze(1))

        return values, log_probs, hidden_state
    
    def reset_hidden_state(self):
        return (torch.zeros(self.n_layers,1,self.hidden_size),torch.zeros(self.n_layers,1,self.hidden_size))
        

In [None]:
en

(1, 20, 11)

In [None]:
class StockTradingEnv(gym.Env):
    def __init__(self, data, targets, seq_length, n_features, policy, scaler):
        super().__init__()

        self.action_space = Box(low=0, high=np.inf, shape=(1,1,3))
        self.observation_space = Box(low=0, high=1, shape=(1,seq_length,n_features), dtype=np.float32)

        self.data = data
        self.targets = targets
        self.scaler = scaler
        self.policy = CustomLSTMPolicy(self.observation_space, self.action_space, 128, 3, 3, nn.ReLU)
        self.current_step = self.seq_length
        self.seq_length = seq_length

        self.model = policy

        self.observation = data[self.current_step - seq_length: self.current_step]

    def step(self, action):
        if self.current_step >= len(self.data) - 1:
            self.done = True
            return self.observation, 0, self.done, {}
        
        with torch.no_grad():
            obs_tensor = torch.FloatTensor(self.observation)
            actor_out, critic_out = self.model.forward(self.observation)
            scaled_output = self.scaler.inverse_transform(actor_out)
            predicted_price = scaled_output[0] + scaled_output[1] + scaled_output[2]

        actual_price = self.targets[self.current_step]

        reward = nn.MSELoss()

        self.current_step += 1
        self.observation = self.data[self.current_step - self.seq_length: self.current_step]

        return observation, reward, done, info

    def reset(self):
        self.current_step = self.seq_length
        observation = self.data[self.current_step - self.seq_length: self.current_step]
