In [1]:
# dqn test for cover trading
# reference： https://pytorch-lightning.readthedocs.io/en/latest/notebooks/lightning_examples/reinforce-learning-DQN.html

In [25]:
import os
from collections import OrderedDict, deque, namedtuple
from typing import List, Tuple
import os, sys
sys.path.append('/mnt/data/projects/wankun01/workdir/playground/dqn/venv/lib/python3.9/site-packages')
import gym
from gym import spaces
import numpy as np
import pandas as pd
import torch
from pytorch_lightning import LightningModule, Trainer
from pytorch_lightning.utilities import DistributedType
from torch import Tensor, nn
from torch.optim import Adam, Optimizer
from torch.utils.data import DataLoader
from torch.utils.data.dataset import IterableDataset

PATH_DATASETS = os.environ.get("PATH_DATASETS", ".")
AVAIL_GPUS = min(1, torch.cuda.device_count())

# !pip install git+https://github.com/PytorchLightning/lightning-bolts.git@master --upgrade

# model setup and preparation of lightning

In [3]:
class DQN(nn.Module):
    """Simple MLP network."""

    def __init__(self, obs_size: int, n_actions: int, hidden_size: int = 128):
        """
        Args:
            obs_size: observation/state size of the environment
            n_actions: number of discrete actions available in the environment
            hidden_size: size of hidden layers
        """
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, n_actions),
        )

    def forward(self, x):
        return self.net(x.float())

In [4]:
# Named tuple for storing experience steps gathered in training
Experience = namedtuple(
    "Experience",
    field_names=["state", "action", "reward", "done", "new_state"],
)

In [5]:
class ReplayBuffer:
    """Replay Buffer for storing past experiences allowing the agent to learn from them.

    Args:
        capacity: size of the buffer
    """

    def __init__(self, capacity: int) -> None:
        self.buffer = deque(maxlen=capacity)

    def __len__(self) -> None:
        return len(self.buffer)

    def append(self, experience: Experience) -> None:
        """Add experience to the buffer.

        Args:
            experience: tuple (state, action, reward, done, new_state)
        """
        self.buffer.append(experience)

    def sample(self, batch_size: int) -> Tuple:
        indices = np.random.choice(len(self.buffer), batch_size, replace=False)
        states, actions, rewards, dones, next_states = zip(*(self.buffer[idx] for idx in indices))

        return (
            np.array(states),
            np.array(actions),
            np.array(rewards, dtype=np.float32),
            np.array(dones, dtype=np.bool),
            np.array(next_states),
        )


In [6]:
class RLDataset(IterableDataset):
    """Iterable Dataset containing the ExperienceBuffer which will be updated with new experiences during training.

    Args:
        buffer: replay buffer
        sample_size: number of experiences to sample at a time
    """

    def __init__(self, buffer: ReplayBuffer, sample_size: int = 200) -> None:
        self.buffer = buffer
        self.sample_size = sample_size

    def __iter__(self):
        states, actions, rewards, dones, new_states = self.buffer.sample(self.sample_size)
        for i in range(len(dones)):
            yield states[i], actions[i], rewards[i], dones[i], new_states[i]

In [7]:
class Agent:
    """Base Agent class handeling the interaction with the environment."""

    def __init__(self, env: gym.Env, replay_buffer: ReplayBuffer) -> None:
        """
        Args:
            env: training environment
            replay_buffer: replay buffer storing experiences
        """
        self.env = env
        self.replay_buffer = replay_buffer
        self.reset()
        self.state = self.env.reset()

    def reset(self) -> None:
        """Resents the environment and updates the state."""
        self.state = self.env.reset()

    def get_action(self, net: nn.Module, epsilon: float, device: str) -> int:
        """Using the given network, decide what action to carry out using an epsilon-greedy policy.

        Args:
            net: DQN network
            epsilon: value to determine likelihood of taking a random action
            device: current device

        Returns:
            action
        """
        if np.random.random() < epsilon:
            # randomly take an action
            action = self.env.action_space.sample()
        else:
            # use model defined action
            state = torch.tensor([self.state])
            if device not in ["cpu"]:
                state = state.cuda(device)

            q_values = net(state)
            _, action = torch.max(q_values, dim=1) # take the larges q-value action
            action = int(action.item())

        return action

    @torch.no_grad()
    def play_step(
        self,
        net: nn.Module,
        epsilon: float = 0.0,
        device: str = "cpu",
    ) -> Tuple[float, bool]:
        """Carries out a single interaction step between the agent and the environment.

        Args:
            net: DQN network
            epsilon: value to determine likelihood of taking a random action
            device: current device

        Returns:
            reward, done
        """

        action = self.get_action(net, epsilon, device)

        # do step in the environment (it's env's job and return the new state and reward)
        new_state, reward, done, _ = self.env.step(action)

        exp = Experience(self.state, action, reward, done, new_state)

        self.replay_buffer.append(exp)

        # replace the state with th e new state
        self.state = new_state
        if done:
            self.reset()
        return reward, done

In [31]:
class DQNLightning(LightningModule):
    """ Basic DQN model
    """
    def __init__(
        self, 
        batch_size:int = 16,
        lr:float = 1e-2,
        env = None,
        gamma: float = 0.99,
        sync_rate:int = 10,
        replay_size:int = 1000,
        warm_start_size:int = 1000,
        eps_last_frame: int = 1000,
        eps_start: float = 1.0,
        eps_end: float = 0.01,
        episode_length:int = 200,
        warm_start_steps:int = 1000,
    ) -> None:
        """
        Args:
            batch_size: size of the batches")
            lr: learning rate
            env: gym environment tag
            gamma: discount factor
            sync_rate: how many frames do we update the target network
            replay_size: capacity of the replay buffer
            warm_start_size: how many samples do we use to fill our buffer at the start of training
            eps_last_frame: what frame should epsilon stop decaying
            eps_start: starting value of epsilon
            eps_end: final value of epsilon
            episode_length: max length of an episode
            warm_start_steps: max episode reward in the environment
        """
        super().__init__()
        self.save_hyperparameters()
        
        # self.env = gym.make(self.hparams.env) # if use env tag
        self.env = env # if directly parsing an env
        obs_size = self.env.observation_space.shape[0]
        n_actions = self.env.action_space.shape[0]
        print(f'obs_size: {obs_size}')
        print(f'n_actions: {n_actions}')

# customized env setup

In [9]:
MAX_ACCOUNT_BALANCE = 2147483647
MAX_NUM_SHARES = 2147483647
MAX_SHARE_PRICE = 5000
MAX_OPEN_POSITIONS = 5
MAX_STEPS = 20000

INITIAL_ACCOUNT_BALANCE = 10000

In [28]:
class FXTradingEnv(gym.Env):
    "A prop trading env for FX "
    metadata = {'render.modes': ['human']}
    
    def __init__(self, df):
        super().__init__()
        self.df = df 
        self.reward_range = (0, MAX_ACCOUNT_BALANCE)
        
        
        # todo: get to understand the meaning of space.Box() classs
        # actions
        self.action_space = spaces.Box(low=np.array([0,0]), high=np.array([3,1]), dtype=np.float16)
        
        # prices of OHCL values for the last five entries
        self.observation_space = spaces.Box(low=0, high=1, shape=(6, 6), dtype=np.float16)
        
    def reset(self):
        # reset to init state
        self.balance = INITIAL_ACCOUNT_BALANCE
        self.net_worth = INITIAL_ACCOUNT_BALANCE
        self.max_net_worth = INITIAL_ACCOUNT_BALANCE
        self.shares_held = 0
        self.cost_basis = 0
        self.total_shares_sold = 0
        self.total_sales_value = 0
        
        # set the current step to a random point within the data frame to start the new round.
        self.current_step = random.randint(0, len(self.df.loc[:, 'Open'].values)-6)
        return self._next_observation()
    
    def _next_observation(self):
        # get last five steps and make the next obs
        # try minmax scaler later
        # why not + 6
        frame = np.array([
            self.df.loc[self.current_step: self.current_step + 5, 'Open'].values / MAX_SHARE_PRICE,
            self.df.loc[self.current_step: self.current_step + 5, 'High'].values / MAX_SHARE_PRICE,
            self.df.loc[self.current_step: self.current_step + 5, 'Low'].values / MAX_SHARE_PRICE,
            self.df.loc[self.current_step: self.current_step + 5, 'Close'].values / MAX_SHARE_PRICE,
            self.df.loc[self.current_step: self.current_step + 5, 'Volume'].values / MAX_SHARE_PRICE,
        ])
        obs = np.append(frame, 
                        [[
                        self.balance / MAX_ACCOUNT_BALANCE,
                        self.max_net_worth / MAX_ACCOUNT_BALANCE,
                        self.shares_held / MAX_NUM_SHARES,
                        self.cost_basis / MAX_SHARE_PRICE,
                        self.total_shares_sold / MAX_NUM_SHARES,
                        self.total_sales_value / (MAX_NUM_SHARES * MAX_SHARE_PRICE),
                        ]],
                        axis = 0
                       )
        return obs
    
    def step(self, action):
        # every step gaps 5 frames
        self._take_action(action)
        self.current_step += 1
        
        if self.current_step > len(self.df.loc[:, 'Open'].values)-6:
            self.current_step = 0
            
        delay_modifier = (self.current_step / MAX_STEPS)
        # balance will be changed due to self._take_action, so as net_worth
        reward = self.balance * delay_modifier 
        done = self.net_worth <= 0
        
        obs = self._next_observation() # after updating the step
        # obs is new_state when call step 
        return obs, reward, done, {}
    
    
    
    def _take_action(self, action):
        # either buy, sell, hold
        # why not high or low
        # needs to seperate ask and bid prices
        current_price = random.uniform(
            self.df.loc[current_step, 'Open'],
            self.df.loc[current_step, 'Close']
        )
        
        action_type = action[0]
        amount = action[1]
        
        if action_type < 1:
            # buy amount % of balance in shares
            total_possible = self.balance / current_price
            share_bought = total_possible * amount
            prev_cost = self.cost_basis * self.shares_held
            additional_cost = shares_bought * current_price
            self.balance -= addtional_cost
            self.cost_basis = (prev_cost + additional_cost) / (self.shares_held + share_bought)
            self.share_held += share_bought
        elif action_type < 2:
            # sell
            shares_sold = self.shares_hold * amount
            self.balance += shares_sold * current_price
            self.shares_held -= shares_sold
            self.total_shares_sold += shares_sold
            self.total_sales_value += shares_sold * current_price
            
        self.net_worth = self.balance + self.shares_held * current_price
        if self.net_worth > self.max_net_worth:
            self.max_net_worth = self.net_worth
        
        if self.shares_held == 0:
            self.cost_basis = 0
    
    
    def render(self, mode='human', close=False):
        # render the environment to the screen 
        profit = self.net_worth - INITIAL_ACCOUNT_BALANCE
        
        print(f'Step: {self.current_step}')
        print(f'Balance: {self.balance}')
        print(f'Shares held: {self.shares_held} (Total sold: {self.total-shares_sold})')
        print(f'Avg cost for held shares: {self.cost_basis} (Total sales value: {self.total_sales_value})')
        print(f'Net worth: {self.net_worth} (Max net worth: {self.max_net_worth})')
        print(f'Profit: {profit}')
        
        

# build up algo 

In [22]:
df = pd.read_csv('AAPL.csv')
df.head()

Unnamed: 0.1,Unnamed: 0,Date,Open,High,Low,Close,Volume
0,0,1998-01-02,13.63,16.25,13.5,16.25,6411700.0
1,1,1998-01-05,16.5,16.56,15.19,15.88,5820300.0
2,2,1998-01-06,15.94,20.0,14.75,18.94,16182800.0
3,3,1998-01-07,18.81,19.0,17.31,17.5,9300200.0
4,4,1998-01-08,17.44,18.62,16.94,18.19,6910900.0


In [35]:
# todo: understand the meanning of the action and obs spaces
fx_trading_env = FXTradingEnv(df)
lightning_module = DQNLightning(env=fx_trading_env)

obs_size: 6
n_actions: 2


  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


In [34]:
fx_trading_env.action_space

Box(0.0, [3. 1.], (2,), float16)