In [1]:
import math
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque

device = torch.device(
    "mps" if torch.backends.mps.is_available() else 
    "cuda" if torch.cuda.is_available() else 
    "cpu"
)
print(f"Using device: {device}")

################################################################################
# 1) StockTrading environment
################################################################################
class StockTradingEnv:
    def __init__(self, prices, window_size=30, initial_capital=10000, max_steps=1000):
        self.prices = prices
        self.window_size = window_size
        self.initial_capital = initial_capital
        self.max_steps = min(max_steps, len(prices) - window_size - 1)
        self.reset()

    def reset(self):
        self.current_step = 0
        self.done = False
        self.position = 0
        self.capital = self.initial_capital
        self.last_price = self.prices[self.window_size - 1]
        return self._get_observation()

    def _get_observation(self):
        start = self.current_step
        end = self.current_step + self.window_size
        return self.prices[start:end]

    def step(self, action):
        if self.done:
            return self._get_observation(), 0.0, True, {}

        new_position = -1 if action == 0 else (1 if action == 2 else 0)
        current_price = self.prices[self.current_step + self.window_size - 1]
        reward = 0.0

        if self.position != 0:
            reward += (current_price - self.last_price) * self.position

        self.position = new_position
        self.last_price = current_price
        self.current_step += 1

        if self.current_step >= self.max_steps:
            self.done = True
            if self.position != 0:
                final_price = self.prices[self.current_step + self.window_size - 1]
                reward += (final_price - self.last_price) * self.position

        return self._get_observation(), reward, self.done, {}

################################################################################
# 2) Transformer-based Q-Network
################################################################################
class TransformerQNetwork(nn.Module):
    def __init__(self, window_size=30, d_model=64, nhead=4, num_layers=2, num_actions=3, dropout=0.1):
        super(TransformerQNetwork, self).__init__()
        self.embedding = nn.Linear(1, d_model)
        self.positional_encoding = self._generate_positional_encoding(window_size, d_model)
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(
                d_model=d_model, 
                nhead=nhead, 
                dropout=dropout, 
                batch_first=True  # Enable batch_first for better performance
            ),
            num_layers=num_layers
        )
        self.fc = nn.Linear(d_model, num_actions)

    def _generate_positional_encoding(self, length, d_model):
        position = torch.arange(0, length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        pe = torch.zeros(length, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe.unsqueeze(0)

    def forward(self, x):
        if x.dim() == 2:
            x = x.unsqueeze(-1)
        x = self.embedding(x)
        x = x + self.positional_encoding.to(x.device)
        x = self.transformer(x)
        x = x[:, -1, :]  # Take the output corresponding to the last time step
        return self.fc(x)


################################################################################
# 3) Replay Buffer
################################################################################
class ReplayBuffer:
    def __init__(self, capacity=10000):
        self.capacity = capacity
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return states, actions, rewards, next_states, dones
    
    def __len__(self):
        return len(self.buffer)

################################################################################
# 4) Utility Functions
################################################################################
def get_epsilon(it, max_it, min_epsilon=0.01, max_epsilon=1.0):
    slope = -(max_epsilon - min_epsilon) / max_it
    return max(min_epsilon, max_epsilon + slope * it)

def process_state_transformer(state):
    """
    Converts the environment's state (NumPy array) into a PyTorch tensor on the correct device.
    """
    if isinstance(state, np.ndarray):  # Handle NumPy input
        state = torch.FloatTensor(state).to(device)
    return state.unsqueeze(0)  # Add batch dimension


################################################################################
# 5) DQN Training Loop
################################################################################
def train_dqn(env, num_episodes=100, window_size=30, gamma=0.99,
              lr=1e-4, batch_size=32, max_steps_per_episode=1000):
    q_net = TransformerQNetwork(window_size=window_size, d_model=64, nhead=4, num_layers=2, num_actions=3).to(device)
    optimizer = optim.Adam(q_net.parameters(), lr=lr)
    replay_buffer = ReplayBuffer(capacity=10000)
    episode_rewards = []
    max_iterations = num_episodes * max_steps_per_episode
    iteration = 0

    for episode in range(num_episodes):
        state = env.reset()  # Initial state from the environment (NumPy array)
        episode_reward = 0.0

        for step in range(max_steps_per_episode):
            iteration += 1
            epsilon = get_epsilon(iteration, max_iterations)

            # Epsilon-greedy action selection
            if random.random() < epsilon:
                action = random.choice([0, 1, 2])
            else:
                s_t = process_state_transformer(state)  # Convert state to tensor
                with torch.no_grad():
                    q_values = q_net(s_t)
                    action = q_values.argmax(dim=1).item()

            # Take an action in the environment
            next_state, reward, done, _ = env.step(action)
            replay_buffer.push(state, action, reward, next_state, done)

            # Update state and accumulate rewards
            state = next_state
            episode_reward += reward

            # Perform training if replay buffer has enough samples
            if len(replay_buffer) > batch_size:
                states_b, actions_b, rewards_b, next_states_b, dones_b = replay_buffer.sample(batch_size)
                states_b_t = torch.cat([process_state_transformer(s) for s in states_b])
                actions_b_t = torch.LongTensor(actions_b).to(device)
                rewards_b_t = torch.FloatTensor(rewards_b).to(device)
                next_states_b_t = torch.cat([process_state_transformer(ns) for ns in next_states_b])
                dones_b_t = torch.FloatTensor(dones_b).to(device)

                # Calculate Q-values and targets
                q_values_b = q_net(states_b_t)
                q_values_chosen = q_values_b.gather(1, actions_b_t.unsqueeze(1)).squeeze(1)

                with torch.no_grad():
                    q_next = q_net(next_states_b_t)
                    q_next_max = q_next.max(dim=1)[0]
                    q_target = rewards_b_t + gamma * q_next_max * (1 - dones_b_t)

                # Backpropagation
                loss = nn.MSELoss()(q_values_chosen, q_target)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            if done:
                break

        episode_rewards.append(episode_reward)
        print(f"Episode {episode+1}/{num_episodes}, Reward: {episode_reward:.2f}, Eps: {epsilon:.3f}")

    return q_net, episode_rewards


################################################################################
# 6) Run Trained Agent
################################################################################
def run_trained_agent(env, q_net, max_steps=1000):
    state = torch.FloatTensor(env.reset()).to(device)
    total_reward = 0.0
    done = False
    steps = 0

    while not done and steps < max_steps:
        s_t = process_state_transformer(state)
        with torch.no_grad():
            q_values = q_net(s_t)
            action = q_values.argmax(dim=1).item()
        next_state, reward, done, _ = env.step(action)
        state = torch.FloatTensor(next_state).to(device)
        total_reward += reward
        steps += 1

    return total_reward

################################################################################
# 7) Main Function
################################################################################
if __name__ == "__main__":
    def generate_synthetic_prices(T=3000, s0=100, mu=0.0005, sigma=0.01):
        prices = [s0]
        for t in range(1, T):
            prices.append(prices[-1] * math.exp((mu - 0.5 * sigma**2) + sigma * random.gauss(0, 1)))
        return np.array(prices, dtype=np.float32)

    prices_array = generate_synthetic_prices(T=3000, s0=100)
    window_size = 30
    env = StockTradingEnv(prices_array, window_size=window_size, initial_capital=10000, max_steps=1000)

    trained_qnet, rewards_history = train_dqn(env, num_episodes=5, window_size=window_size, gamma=0.99, lr=1e-3, batch_size=32, max_steps_per_episode=1000)
    print("\nTraining complete!\n")
    test_reward = run_trained_agent(env, trained_qnet)
    print(f"Test reward with trained Transformer policy: {test_reward:.2f}")

Using device: mps
Episode 1/5, Reward: -31.24, Eps: 0.802
Episode 2/5, Reward: -17.67, Eps: 0.604
Episode 3/5, Reward: 64.03, Eps: 0.406
Episode 4/5, Reward: 15.50, Eps: 0.208
Episode 5/5, Reward: 14.88, Eps: 0.010

Training complete!

Test reward with trained Transformer policy: 114.73
