**This jupyter notebook was originally run on Google Colab. If not being run on Google Colab, skip the following drive mounting step

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

%cd gdrive/MyDrive/Deep-RL-Stock-Trading

In [4]:
from indicators import get_indicators, sma, macd, rsi, cci
from environment import StockEnvironment

import csv
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torch.nn.utils
import torch.optim as optim
import numpy as np
import random

Reading the stock data from a csv file that was originally downloaded from kaggle. The data being read stores one set of values for each day the market is open. It contains information about the date, high, low, open, close, and adjusted close for the day.

In [5]:
data_dict = {'Date': [], 'Open': [], 'Close': [], 'High': [], 'Low': [], 'Adj Close': []}

with open('./data/stocks/AAPL.csv', newline = '') as csvfile:
  reader = csv.DictReader(csvfile)
  for row in reader:
    data_dict['Date'].append(row['Date'])
    data_dict['Open'].append(float(row['Open']))
    data_dict['Close'].append(float(row['Close']))
    data_dict['High'].append(float(row['High']))
    data_dict['Low'].append(float(row['Low']))
    data_dict['Adj Close'].append(float(row['Adj Close']))

indicators = get_indicators(data_dict)

Actor Critic architecture

In [6]:
torch.set_default_dtype(torch.float64)

class Critic(nn.Module):
  def __init__(self):
    super(Critic, self).__init__()
    self.cfc1 = nn.Linear(17, 256)
    self.cfc2 = nn.Linear(256, 128)
    self.cfc3 = nn.Linear(128, 1)

  def forward(self, state, action):
    x = torch.cat((state, action), dim=1)
    x = F.relu(self.cfc1(x))
    x = F.relu(self.cfc2(x))
    x = self.cfc3(x)
    return x

  
class Actor(nn.Module):
  def __init__(self):
    super(Actor, self).__init__()
    self.afc1 = nn.Linear(8, 256)
    self.afc2 = nn.Linear(256, 128)
    self.afc3 = nn.Linear(128, 9)
  
  def forward(self, state):
    x = F.relu(self.afc1(state))
    x = F.relu(self.afc2(x))
    x = self.afc3(x)
    return x

def normal_init(m):
  if isinstance(m, (nn.Linear)):
      m.weight.data.normal_(0, .02)
      if m.bias.data is not None:
          m.bias.data.zero_()

Helper trainer class that allows for easier training. The trainer supports functions that get actions according to an epsilon greedy exploration strategy, optimizing by sampling from the replay memory, adding to the replay memory and clearing the replay memory

In [None]:
class Trainer():
  def __init__(self):
    self.actor = Actor()
    self.critic = Critic()

    self.actor.apply(normal_init)
    self.critic.apply(normal_init)
    
    self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), .00001)
    self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), .0001)

    self.replay = []
    self.eps = .15
    self.softmax = nn.Softmax(dim=1)

  
  def get_action(self, state):
    output = self.actor(state)
    probs = self.softmax(output)
    
    if random.random() < self.eps:
      ## explore
      index = random.randint(0, 8)
      action = torch.zeros((1, 9))
      action[0][index] = 1
    else:
      ## exploit
      index = torch.argmax(probs, dim=1).item()
      action = torch.zeros((1, 9))
      action[0][index] = 1
    
    return action

  
  def optimize(self):
    ## sample a random (s, a, r, s) tuple from the replay memory
    s1, a1, r1, s2 = self.replay[random.randint(0, len(self.replay)-1)]

    ## updating the critic
    a2 = self.actor(s2).detach()
    next_val = self.critic(s2, a2).detach()

    expected_val = r1 + .4*next_val
    predicted_val = self.critic(s1, a1)

    td_error = (expected_val - predicted_val).detach()
    critic_loss = torch.square(expected_val - predicted_val)
    
    self.critic_optimizer.zero_grad()
    critic_loss.backward()
    torch.nn.utils.clip_grad_norm_(self.critic.parameters(), 0.5)
    self.critic_optimizer.step()

    ## updating the actor
    output = self.actor(s1)
    probs = self.softmax(output)
    action_index = torch.argmax(a1, dim=1).item()
    log_prob = torch.log(probs[0][action_index])
    actor_loss = -td_error * log_prob
    
    self.actor_optimizer.zero_grad()
    actor_loss.backward()
    torch.nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5)
    self.actor_optimizer.step()

  def clear_replay(self):
    self.replay = []

  def add_replay(self, x):
    self.replay.append(x)

Training loop

In [None]:
env = StockEnvironment(data_dict, indicators, 800, 'AC')
state = torch.tensor([env.reset()])
trainer = Trainer()
terminal = False


for epoch in range(50):
  action_list = [0] * 9
  while not terminal:
    action = trainer.get_action(state)
    action_index = torch.argmax(action, dim=1).item()
    action_list[action_index] += 1
    new_state, reward, terminal, info = env.step(action_index)
    new_state = torch.tensor([new_state])
    trainer.add_replay((state, action, reward, new_state))
    state = new_state

  for i in range(1000):
    trainer.optimize()

  trainer.clear_replay()

  print('Epoch', epoch)
  print(action_list)
  env.render()
  state = torch.tensor([env.reset()])
  terminal = False

In [None]:
torch.save(trainer.critic.state_dict(), './trained models/AAPL Critic')
torch.save(trainer.actor.state_dict(), './trained models/AAPL Actor')

In [None]:
actor = Actor()
actor.load_state_dict(torch.load('./trained models/AAPL Actor'))

Computing some statistics regarding the actions taken during the first 5000 days.

In [None]:
env = StockEnvironment(data_dict, indicators, 5200, 'AC')
state = env.reset()
counter = 0
day = 201
action_list = [0] * 9
portfolio_value = []
terminal = False

while not terminal:
  action = actor(torch.tensor([state]))
  action_index = torch.argmax(action, dim=1).item()
  action_list[action_index] += 1
  new_state, reward, terminal, info = env.step(action_index)
  state = new_state
  portfolio_value.append((state[0] + state[1]*data_dict['Close'][day]).item())
  counter += 1
  day += 1

print(counter)
print('Actions: ', action_list)
env.render()
state = env.reset()
terminal = False

Creates a plot that compares stock price to portfolio value where the starting stock price has be adjusted to be equal to the portfolio value in order to allow for a direct comparison.

In [None]:
other_prices = [num * 367 for num in data_dict['Close']]

plt.plot(portfolio_value)
plt.plot(other_prices[200:5200])
plt.ylabel('value')
plt.xlabel('day')
plt.legend(["portfolio", "price of stock"])
plt.title('Advantage Actor Critic Trading Agent')
plt.show()