In [3]:
# Adapted from https://medium.com/coinmonks/deep-reinforcement-learning-for-trading-cryptocurrencies-5b5502b1ece1
import numpy as np
import yfinance as yf
from sklearn.preprocessing import StandardScaler

In [4]:
STATE_SPACE = 28
ACTION_SPACE = 3

ACTION_LOW = -1
ACTION_HIGH = 1

GAMMA = 0.9995
TAU = 1e-3
EPS_START = 1.0
EPS_END = 0.1
EPS_DECAY = 0.9

MEMORY_LEN = 10000
MEMORY_THRESH = 500
BATCH_SIZE = 200

LR_DQN = 5e-4

LEARN_AFTER = MEMORY_THRESH
LEARN_EVERY = 3
UPDATE_EVERY = 9

COST = 3e-4
CAPITAL = 100000
NEG_MUL = 2

In [None]:
class DataGetter:

  def __init__(self, asset="BTC-USD", start_date=None, end_date=None, freq="1d", timeframes=[1, 2, 5, 10, 20, 40]):
    self.asset = asset
    self.sd = start_date
    self.ed = end_date
    self.freq = freq

    self.timeframes = timeframes
    self.getData()

    self.scaler = StandardScaler()
    self.scaler.fit(self.data[:, 1:])

  def getData(self):
    asset = self.asset  
    if self.sd is not None and self.ed is not None:
      df =  yf.download([asset], start=self.sd, end=self.ed, interval=self.freq)
      df_spy = yf.download(["BTC-USD"], start=self.sd, end=self.ed, interval=self.freq)
    elif self.sd is None and self.ed is not None:
      df =  yf.download([asset], end=self.ed, interval=self.freq)
      df_spy = yf.download(["BTC-USD"], end=self.ed, interval=self.freq)
    elif self.sd is not None and self.ed is None:
      df =  yf.download([asset], start=self.sd, interval=self.freq)
      df_spy = yf.download(["BTC-USD"], start=self.sd, interval=self.freq)
    else:
      df = yf.download([asset], period="max", interval=self.freq)
      df_spy = yf.download(["BTC-USD"], interval=self.freq)
    
    # Reward - Not included in Observation Space.
    df["rf"] = df["Adj Close"].pct_change().shift(-1)

    # Returns and Trading Volume Changes
    for i in self.timeframes:
      df_spy[f"spy_ret-{i}"] = df_spy["Adj Close"].pct_change(i)
      df_spy[f"spy_v-{i}"] = df_spy["Volume"].pct_change(i)

      df[f"r-{i}"] = df["Adj Close"].pct_change(i)      
      df[f"v-{i}"] = df["Volume"].pct_change(i)
    
    # Volatility
    for i in [5, 10, 20, 40]:
      df[f'sig-{i}'] = np.log(1 + df["r-1"]).rolling(i).std()

    # Moving Average Convergence Divergence (MACD)
    df["macd_lmw"] = df["r-1"].ewm(span=26, adjust=False).mean()
    df["macd_smw"] = df["r-1"].ewm(span=12, adjust=False).mean()
    df["macd_bl"] = df["r-1"].ewm(span=9, adjust=False).mean()
    df["macd"] = df["macd_smw"] - df["macd_lmw"]

    # Relative Strength Indicator (RSI)
    rsi_lb = 5
    pos_gain = df["r-1"].where(df["r-1"] > 0, 0).ewm(rsi_lb).mean()
    neg_gain = df["r-1"].where(df["r-1"] < 0, 0).ewm(rsi_lb).mean()
    rs = np.abs(pos_gain/neg_gain)
    df["rsi"] = 100 * rs/(1 + rs)

    # Bollinger Bands
    bollinger_lback = 10
    df["bollinger"] = df["r-1"].ewm(bollinger_lback).mean()
    df["low_bollinger"] = df["bollinger"] - 2 * df["r-1"].rolling(bollinger_lback).std()
    df["high_bollinger"] = df["bollinger"] + 2 * df["r-1"].rolling(bollinger_lback).std()

    # SP500
    df = df.merge(df_spy[[f"spy_ret-{i}" for i in self.timeframes] + [f"spy_sig-{i}" for i in [5, 10, 20, 40]]], 
                  how="left", right_index=True, left_index=True)

    # Filtering
    for c in df.columns:
      df[c].interpolate('linear', limit_direction='both', inplace=True)
    df.replace([np.inf, -np.inf], np.nan, inplace=True)
    df.dropna(inplace=True)

    self.frame = df
    self.data = np.array(df.iloc[:, 6:])
    return
  
  def scaleData(self):
    self.scaled_data = self.scaler.fit_transform(self.data[:, 1:])

  def __len__(self):
    return len(self.data)


  def __getitem__(self, idx, col_idx=None):
    if col_idx is None:
      return self.data[idx]
    elif col_idx < len(list(self.data.columns)):
      return self.data[idx][col_idx]
    else:
      raise IndexError

In [None]:
class SingleAssetTradingEnvironment:
  """
  Trading Environment for trading a single asset.
  The Agent interacts with the environment class through the step() function.
  Action Space: {-1: Sell, 0: Do Nothing, 1: Buy}
  """

  def __init__(self, asset_data,
               initial_money=CAPITAL, trans_cost=COST, store_flag=1, asset_ph=0, 
               capital_frac=0.2, running_thresh=0.1, cap_thresh=0.3):
    
    self.past_holding = asset_ph
    self.capital_frac = capital_frac
    self.cap_thresh = cap_thresh
    self.running_thresh = running_thresh
    self.trans_cost = trans_cost

    self.asset_data = asset_data
    self.terminal_idx = len(self.asset_data) - 1
    self.scaler = self.asset_data.scaler    

    self.initial_cap = initial_money

    self.capital = self.initial_cap
    self.running_capital = self.capital
    self.asset_inv = self.past_holding

    self.pointer = 0
    self.next_return, self.current_state = 0, None
    self.prev_act = 0
    self.current_act = 0
    self.current_reward = 0
    self.current_price = self.asset_data.frame.iloc[self.pointer, :]['Adj Close']
    self.done = False

    self.store_flag = store_flag
    if self.store_flag == 1:
      self.store = {"action_store": [],
                    "reward_store": [],
                    "running_capital": [],
                    "port_ret": []}


  def reset(self):
    self.capital = self.initial_cap
    self.running_capital = self.capital
    self.asset_inv = self.past_holding

    self.pointer = 0
    self.next_return, self.current_state = self.get_state(self.pointer)
    self.prev_act = 0
    self.current_act = 0
    self.current_reward = 0
    self.current_price = self.asset_data.frame.iloc[self.pointer, :]['Adj Close']
    self.done = False
    
    if self.store_flag == 1:
      self.store = {"action_store": [],
                    "reward_store": [],
                    "running_capital": [],
                    "port_ret": []}

    return self.current_state


  def step(self, action):
    self.current_act = action
    self.current_price = self.asset_data.frame.iloc[self.pointer, :]['Adj Close']
    self.current_reward = self.calculate_reward()
    self.prev_act = self.current_act
    self.pointer += 1
    self.next_return, self.current_state = self.get_state(self.pointer)
    self.done = self.check_terminal()

    if self.done:
      reward_offset = 0
      ret = (self.store['running_capital'][-1]/self.store['running_capital'][-0]) - 1
      if self.pointer < self.terminal_idx:
        reward_offset += -1 * max(0.5, 1 - self.pointer/self.terminal_idx)
      if self.store_flag:
        reward_offset += 10 * ret
      self.current_reward += reward_offset

    if self.store_flag:
      self.store["action_store"].append(self.current_act)
      self.store["reward_store"].append(self.current_reward)
      self.store["running_capital"].append(self.capital)
      info = self.store
    else:
      info = None
    
    return self.current_state, self.current_reward, self.done, info


  def calculate_reward(self):
    investment = self.running_capital * self.capital_frac
    reward_offset = 0

    # Buy Action
    if self.current_act == 1: 
      if self.running_capital > self.initial_cap * self.running_thresh:
        self.running_capital -= investment
        asset_units = investment/self.current_price
        self.asset_inv += asset_units
        self.current_price *= (1 - self.trans_cost)

    # Sell Action
    elif self.current_act == -1:
      if self.asset_inv > 0:
        self.running_capital += self.asset_inv * self.current_price * (1 - self.trans_cost)
        self.asset_inv = 0

    # Do Nothing
    elif self.current_act == 0:
      if self.prev_act == 0:
        reward_offset += -0.1
      pass
    
    # Reward to give
    prev_cap = self.capital
    self.capital = self.running_capital + (self.asset_inv) * self.current_price
    reward = 100*(self.next_return) * self.current_act - np.abs(self.current_act - self.prev_act) * self.trans_cost
    if self.store_flag==1:
      self.store['port_ret'].append((self.capital - prev_cap)/prev_cap)
    
    if reward < 0:
      reward *= NEG_MUL  # To make the Agent more risk averse towards negative returns.
    reward += reward_offset

    return reward


  def check_terminal(self):
    if self.pointer == self.terminal_idx:
      return True
    elif self.capital <= self.initial_cap * self.cap_thresh:
      return True
    else:
      return False


  def get_state(self, idx):
    state = self.asset_data[idx][1:]
    state = self.scaler.transform(state.reshape(1, -1))

    state = np.concatenate([state, [[self.capital/self.initial_cap,
                                     self.running_capital/self.capital,
                                     self.asset_inv * self.current_price/self.initial_cap,
                                     self.prev_act]]], axis=-1)
    
    next_ret = self.asset_data[idx][0]
    return next_ret, state