# Stock Trading Bot

The goal is to build a stock trading bot using Reinforcement Learning (Deep Q-Learning)

In [None]:
# Importing necessary libraries for the code

import numpy as np                    # Library for numerical operations
import pandas as pd                   # Library for data manipulation and analysis
import tensorflow as tf               # Library for deep learning
from datetime import datetime         # Library for working with dates and times
import itertools                      # Library for efficient looping and combining elements
import argparse                       # Library for parsing command-line arguments
import re                             # Library for regular expressions
import os                             # Library for interacting with the operating system
import pickle                         # Library for serializing and deserializing Python objects
import matplotlib.pyplot as plt       # Library for creating visualizations
from sklearn.preprocessing import StandardScaler  # Library for standardizing features

%matplotlib inline
# Magic command to display matplotlib plots inline in Jupyter notebooks

In [None]:
def get_data():
  df = pd.read_csv('stock_data.csv')
  return df.values

## Environment

In [None]:
class MultiStockEnvironment:
    
    def __init__(self, data, initial_investment=20000):
        """
        Initializes the multi-stock environment.

        Args:
            data (numpy.ndarray): Stock price history data.
            initial_investment (float): Initial investment amount (default: 20000).
        """

        self.stock_price_history = data
        self.n_step, self.n_stock = self.stock_price_history.shape

        self.initial_investment = initial_investment
        self.cur_step = None
        self.stock_owned = None
        self.stock_price = None
        self.cash_in_hand = None

        self.action_space = np.arange(3**self.n_stock)

        # Generate all possible combinations of actions using itertools
        self.action_list = list(map(list, itertools.product([0, 1, 2], repeat=self.n_stock)))

        self.state_dim = self.n_stock * 2 + 1

        self.reset()

    def reset(self):
        """
        Resets the environment to the initial state.

        Returns:
            numpy.ndarray: Initial observation/state.
        """

        self.cur_step = 0
        self.stock_owned = np.zeros(self.n_stock)
        self.stock_price = self.stock_price_history[self.cur_step]
        self.cash_in_hand = self.initial_investment
        return self._get_obs()

    def step(self, action):
        """
        Executes the given action and advances the environment by one step.

        Args:
            action (int): Action to be performed.

        Returns:
            tuple: A tuple containing the new observation, reward, done flag, and additional information.
        """

        assert action in self.action_space

        prev_val = self._get_val()

        self.cur_step = self.cur_step + 1
        self.stock_price = self.stock_price_history[self.cur_step]

        self._trade(action)

        cur_val = self._get_val()

        reward = cur_val - prev_val

        done = self.cur_step == self.n_step - 1

        info = {'cur_val': cur_val}

        return self._get_obs(), reward, done, info

    def _get_obs(self):
        """
        Returns the current observation/state.

        Returns:
            numpy.ndarray: Current observation/state.
        """

        obs = np.empty(self.state_dim)
        obs[:self.n_stock] = self.stock_owned
        obs[self.n_stock:2 * self.n_stock] = self.stock_price
        obs[-1] = self.cash_in_hand
        return obs

    def _get_val(self):
        """
        Calculates the current portfolio value.

        Returns:
            float: Current portfolio value.
        """

        return self.stock_owned.dot(self.stock_price) + self.cash_in_hand

    def _trade(self, action):
        """
        Performs the trading based on the given action.

        Args:
            action (int): Action to be performed.
        """

        action_vector = self.action_list[action]

        sell_index = []
        buy_index = []
        for i, a in enumerate(action_vector):
            if a == 0:
                sell_index.append(i)
            elif a == 2:
                buy_index.append(i)

        if sell_index:
            # Sell stocks
            for i in sell_index:
                self.cash_in_hand = self.cash_in_hand + self.stock_price[i] * self.stock_owned[i]
                self.stock_owned[i] = 0

        if buy_index:
            # Buy stocks
            buy_me = True
            while buy_me:
                for i in buy_index:
                    if self.cash_in_hand > self.stock_price[i]:
                        self.stock_owned[i] = self.stock_owned[i] + 1
                        self.cash_in_hand = self.cash_in_hand - self.stock_price[i]
                    else:
                        # Stop buying if not enough cash
                        buy_me = False

## Replay Buffer

In [None]:
class ReplayBuffer:
    """
    A replay buffer for storing and sampling transitions for reinforcement learning.

    Args:
        obs_dim (int): Dimension of the observation space.
        act_dim (int): Dimension of the action space.
        size (int): Maximum size of the replay buffer.
    """

    def __init__(self, obs_dim, act_dim, size):
        """
        Initializes the replay buffer.

        Args:
            obs_dim (int): Dimension of the observation space.
            act_dim (int): Dimension of the action space.
            size (int): Maximum size of the replay buffer.
        """

        self.obs1_buf = np.zeros([size, obs_dim], dtype=np.float32)   # Buffer for storing current observations
        self.obs2_buf = np.zeros([size, obs_dim], dtype=np.float32)   # Buffer for storing next observations
        self.acts_buf = np.zeros(size, dtype=np.uint8)                # Buffer for storing actions
        self.rews_buf = np.zeros(size, dtype=np.float32)              # Buffer for storing rewards
        self.done_buf = np.zeros(size, dtype=np.uint8)                # Buffer for storing done flags
        self.ptr = 0                                                   # Pointer for the next available index
        self.size = 0                                                  # Current size of the buffer
        self.max_size = size                                           # Maximum size of the buffer

    def store(self, obs, act, rew, next_obs, done):
        """
        Stores a transition in the replay buffer.

        Args:
            obs (numpy.ndarray): Current observation.
            act (int): Action taken.
            rew (float): Reward received.
            next_obs (numpy.ndarray): Next observation.
            done (bool): Done flag indicating if the episode terminated.
        """

        self.obs1_buf[self.ptr] = obs
        self.obs2_buf[self.ptr] = next_obs
        self.acts_buf[self.ptr] = act
        self.rews_buf[self.ptr] = rew
        self.done_buf[self.ptr] = done

        self.ptr = (self.ptr + 1) % self.max_size    # Update the pointer by wrapping around the buffer
        self.size = min(self.size + 1, self.max_size) # Increment the size of the buffer while ensuring it doesn't exceed the maximum size

    def sample_batch(self, batch_size=32):
        """
        Samples a batch of transitions from the replay buffer.

        Args:
            batch_size (int): Number of transitions to sample (default: 32).

        Returns:
            dict: A dictionary containing the sampled batch of transitions with keys 's1', 's2', 'a', 'r', 'd'.
                  's1' and 's2' represent the current and next observations respectively.
                  'a' represents the actions taken.
                  'r' represents the rewards received.
                  'd' represents the done flags indicating if the episodes terminated.
        """

        idxs = np.random.randint(0, self.size, size=batch_size)  # Randomly sample indices from the buffer
        return dict(s1=self.obs1_buf[idxs], s2=self.obs2_buf[idxs], a=self.acts_buf[idxs], r=self.rews_buf[idxs], d=self.done_buf[idxs])

## Neural Network Model

In [None]:
def action_NN(input_dim, n_action, n_hidden_layers=1, hidden_dim=32):
    """
    Creates a neural network model for action selection.

    Args:
        input_dim (int): Dimension of the input.
        n_action (int): Number of possible actions.
        n_hidden_layers (int): Number of hidden layers (default: 1).
        hidden_dim (int): Dimension of the hidden layers (default: 32).

    Returns:
        tf.keras.models.Model: Action selection neural network model.
    """

    # Define the input layer
    i = tf.keras.layers.Input(shape=(input_dim,))
    x = i

    # Add the specified number of hidden layers
    for layer in range(n_hidden_layers):
        x = tf.keras.layers.Dense(hidden_dim, activation='relu')(x)

    # Output layer for action selection
    x = tf.keras.layers.Dense(n_action)(x)

    # Create the model
    model = tf.keras.models.Model(inputs=i, outputs=x)

    # Compile the model
    model.compile(loss='mse', optimizer=tf.optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, amsgrad=False))

    
   return model

## Agent

In [None]:
class DQNAgent(object):
    """
    Deep Q-Network agent for reinforcement learning.

    Args:
        state_size (int): Dimension of the state space.
        action_size (int): Dimension of the action space.
    """

    def __init__(self, state_size, action_size):
        """
        Initializes the DQNAgent.

        Args:
            state_size (int): Dimension of the state space.
            action_size (int): Dimension of the action space.
        """

        self.state_size = state_size
        self.action_size = action_size
        self.memory = ReplayBuffer(state_size, action_size, size=500)
        self.gamma = 0.95                                                            # Discount factor for future rewards
        self.epsilon = 1.0                                                           # Exploration rate
        self.epsilon_min = 0.01                                                      # Minimum exploration rate
        self.epsilon_decay = 0.995                                                    # Decay rate for exploration rate
        self.model = action_NN(state_size, action_size)                                # Neural network model for action selection

    def update_replay_buffer(self, state, action, reward, next_state, done):
        """
        Updates the replay buffer with a new transition.

        Args:
            state (numpy.ndarray): Current state.
            action (int): Action taken.
            reward (float): Reward received.
            next_state (numpy.ndarray): Next state.
            done (bool): Done flag indicating if the episode terminated.
        """

        self.memory.store(state, action, reward, next_state, done)

    def act(self, state):
        """
        Selects an action based on the current state.

        Args:
            state (numpy.ndarray): Current state.

        Returns:
            int: Selected action.
        """

        if np.random.rand() <= self.epsilon:
            return np.random.choice(self.action_size)  # Exploration: select a random action

        act_values = self.model.predict(state)
        return np.argmax(act_values[0])  # Exploitation: select the action with the highest Q-value

    def replay(self, batch_size=32):
        """
        Performs replay to update the agent's Q-network.

        Args:
            batch_size (int): Number of transitions to sample from the replay buffer (default: 32).
        """

        if self.memory.size < batch_size:
            return  # Not enough samples in the replay buffer, skip replay

        minibatch = self.memory.sample_batch(batch_size)
        states = minibatch['s1']
        next_states = minibatch['s2']
        actions = minibatch['a']
        rewards = minibatch['r']
        done = minibatch['d']

        target = rewards + self.gamma * np.max(self.model.predict(next_states), axis=1)

        target[done] = rewards[done]  # Set the target to be the immediate reward if the episode terminated

        target_full = self.model.predict(states)
        target_full[np.arange(batch_size), actions] = target

        self.model.train_on_batch(states, target_full)

        if self.epsilon > self.epsilon_min:
            self.epsilon = self.epsilon * self.epsilon_decay  # Decay the exploration rate

    def load(self, name):
        """
        Loads the model weights from a file.

        Args:
            name (str): Name of the file to load the weights from.
        """

        self.model.load_weights(name)

    def save(self, name):
        """
        Saves the model weights to a file.

        Args:
            name (str): Name of the file to save the weights to.
        """

        self.model.save_weights(name)

## Helpers

In [None]:
def make_dir(directory):
  if not os.path.exists(directory):
    os.makedirs(directory)

In [None]:
def plot_rewards(choice):
  a = np.load(f'{rewards_folder}/{choice}.npy')

  print(f"Average Reward: {a.mean():.2f} | Min: {a.min():.2f} | Max: {a.max():.2f}")

  plt.hist(a, bins=30)
  plt.title(choice)
  plt.show()

## Standardizing / Normalizing

In [None]:
def get_scaler(env):
    """
    Generates a scaler for normalizing state values.

    Args:
        env (MultiStockEnvironment): Environment object.

    Returns:
        sklearn.preprocessing.StandardScaler: Scaler for normalizing state values.
    """

    states = []
    for _ in range(env.n_step):
        action = np.random.choice(env.action_space)
        state, reward, done, info = env.step(action)
        states.append(state)
        if done:
            break

    scaler = StandardScaler()
    scaler.fit(states)
    return scaler

## Perform and Learn 

In [None]:
def play_one_episode(agent, env, is_train):
    """
    Plays one episode of the environment using the specified agent.

    Args:
        agent (DQNAgent): DQN agent.
        env (MultiStockEnvironment): Environment object.
        is_train (str): Training mode flag ('train' or 'test').

    Returns:
        float: The final portfolio value achieved in the episode.
    """

    state = env.reset()
    state = scaler.transform([state])  # Normalize the state using the scaler
    done = False

    while not done:
        action = agent.act(state)  # Select an action
        next_state, reward, done, info = env.step(action)  # Take a step in the environment
        next_state = scaler.transform([next_state])  # Normalize the next state using the scaler

        if is_train == 'train':
            agent.update_replay_buffer(state, action, reward, next_state, done)  # Update the replay buffer
            agent.replay(batch_size)  # Perform replay to update the agent's Q-network

        state = next_state  # Update the current state

    return info['cur_val']  # Return the final portfolio value achieved in the episode

## Training

In [None]:
models_folder = '../root/models/'
rewards_folder = '../root/rewards/'
num_episodes = 500
batch_size = 32
initial_investment = 50000

make_dir(models_folder)
make_dir(rewards_folder)

data = get_data()
n_timesteps, n_stocks = data.shape

choice = 'train'

n_train = n_timesteps // 2

train_data = data[:n_train]
test_data = data[n_train:]

env = MultiStockEnvironment(train_data, initial_investment)
state_size = env.state_dim
action_size = len(env.action_space)
agent = DQNAgent(state_size, action_size)
scaler = get_scaler(env)

portfolio_value = []

if choice == 'test':
    # Load scaler and environment for testing
    with open(f'{models_folder}/standard_scaler.dat', 'rb') as f:
        scaler = pickle.load(f)

    env = MultiStockEnvironment(test_data, initial_investment)

    # Set exploration rate to a small value for testing
    agent.epsilon = 0.01

    # Load pre-trained agent model for testing
    agent.load(f'{models_folder}/nn_agent.h5')

for e in range(num_episodes):
    t0 = datetime.now()
    val = play_one_episode(agent, env, choice)
    dt = datetime.now() - t0
    print(f"Episode {e+1}/{num_episodes}\nEpisode end value: {val:.2f}  |  Duration: {dt}")
    portfolio_value.append(val)

In [None]:
if choice == 'train':
    # Save the trained agent model
    agent.save(f'{models_folder}/nn_agent.h5')

    # Save the scaler
    with open(f'{models_folder}/standard_scaler.dat', 'wb') as f:
        pickle.dump(scaler, f)

# Save the portfolio values
np.save(f'{rewards_folder}/{choice}.npy', portfolio_value)

In [None]:
plot_rewards(choice)

### Testing

In [None]:
num_episodes = 100
batch_size = 32
initial_investment = 30000

choice = 'test'

state_size = env.state_dim
action_size = len(env.action_space)
agent = DQNAgent(state_size, action_size)

portfolio_value = []

if choice == 'test':
    # Load the saved scaler for testing
    with open(f'{models_folder}/standard_scaler.dat', 'rb') as f:
        scaler = pickle.load(f)

    env = MultiStockEnvironment(test_data, initial_investment)

    # Set exploration rate to a small value for testing
    agent.epsilon = 0.01

    # Load pre-trained agent model for testing
    agent.load(f'{models_folder}/nn_agent.h5')

for e in range(num_episodes):
    t0 = datetime.now()
    val = play_one_episode(agent, env, choice)
    dt = datetime.now() - t0
    print(f"Episode {e+1}/{num_episodes}\nEpisode end value: {val:.2f}  |  Duration: {dt}")
    portfolio_value.append(val)

# Save the portfolio values
np.save(f'{rewards_folder}/{choice}.npy', portfolio_value)

In [None]:
plot_rewards(choice)