# Reinforcement Learning Environment for Economic Market (Time Series)
This note book contains the RL environment for finance. We start with a base class for general finance problem. Then we use a derived class for a concrete environment for training.

In [1]:
import gym
import numpy as np
import pandas as pd
from gym import spaces
import random
import sys
sys.path.append('../module')

from data_handler import get_econ_predictors, get_quarterly_date_format, get_monthly_date_format

# RL Environment Base Class for General Finance
We firstly introduce a base class for general purpose. This idea is inspired by [gym-anytrading](https://github.com/AminHP/gym-anytrading/blob/296bae49e77d08a7c7349b542cdd00ea7ca23af0/gym_anytrading/envs/trading_env.py)

In [2]:
class TimeSeriesEnvBase(gym.Env):
    metadata = {"render_modes": "human"}

    def __init__(self):
        '''
        Initialize the environment.
        
        Description
        -----------
        We use the OpenAI gym.Env class as the parent class of our environment.
        Here we initialize some key properties of the environment.
        We set the observation space and action space for our environment using OpenAI Gym's space API.
        '''
        self.action_space = None
        self.state_space = None
        self.state = None
        self.reward = 0
        self.terminated = False
        self.info = {}

        self.start_tick = None
        self.end_tick = None
        self.current_tick = None

    def reset(self, seed=None):
        '''
        Reset the environment`s state. 
        Set the random seed once at the beginning of the experiment. 
        Then keep the same seed for the whole experiment.
        Also, we need to reset the state to the initial state.
        Clear the cumulative reward and other information.
        '''
        super().reset(seed=seed)
        state = None # reset the state to the initial state
        info = None

        return state, info

    def step(self, action):
        '''
        Take an action and return the next state, reward, done, info.
        We use the term "state" instead of "observation" in this function.
        The observation is the state plus some other information.
        The state is the representation of the environment.
        For research purpose, we want to keep the setting as simple as possible.
        Therefore, we only use the state as the input.
        '''

        state_prime = self._trans_func(action)
        reward = self._calculate_reward(action)
        terminated = self._is_terminated()
        info = self._get_info()

        return state_prime, reward, terminated, info
    
    def _trans_func(self, action):
        '''
        Move to the next state based on the current state and action.
        In the time series data, the transition is represented by the transition of the timestep.
        We get the timestep of current state and action value to calculate the next state.
        
        '''
        raise NotImplementedError

    def _calculate_reward(self, action):
        '''
        Since this class is built for time series data, the transition is represented by the transition of the timestep.
        Then we only need the timestep of state and action value to calculate the reward.
        '''
        raise NotImplementedError
    
    def _is_terminated(self):
        '''
        Check if the episode is terminated.
        '''
        raise NotImplementedError
    
    def _get_info(self):
        raise NotImplementedError
        
    

# RL Environment for Economic Market

The RL problem is summarized as follow:
\begin{equation}
\begin{aligned}
S &= \{ECON_1, ECON_2, \dots, ECON_{15}\} \\
A &= \{(w_1, w_2)|w_1, w_2 \in \mathbb{R}\} \\
R &= \log(PR)
\end{aligned}
\end{equation}

In [2]:
class EconMarketEnv(gym.Env):
    metadata = {"render_modes": "human"}
    """
    This class defines an OpenAI Gym environment for simulating a simple trading market.
    The market is modeled as a time series of econ factors.
    The environment is used for training trading agents to learn how to select and manage a portfolio of equities.

    The EconMarketEnv class is an implementation of the OpenAI Gym Env class.
    It has two main functions:
        - reset: reset the environment to its initial state and return the first observation
        - step: take an action, update the state of the environment, and return the new observation, reward, done, and info.

    The environment's state is a vector of econ factors, representing the market at a particular point in time.
    The observation is the state vector plus some additional information, (TBD).
    The action is a vector representing the portfolio weights for each equity.
    The reward is computed based on the portfolio returns.
    The environment is considered "done" if the episode reaches the last tick of the data.
    """

    def __init__(self, data: pd.DataFrame, portfolio: pd.DataFrame):
        """
        Initialize the environment.

        Parameters
        ----------
        data : pandas.DataFrame
            The input time series data of econ factors.
        portfolio : pandas.DataFrame
            The portfolio time series of each individual equity return.

        Description
        -----------
        We set the observation space and action space for our environment using OpenAI Gym's space API.
        We also convert the data and portfolio to numpy arrays for performance reasons.
        """
        super(EconMarketEnv, self).__init__()
        low = np.min(data).values
        high = np.max(data).values
        self.observation_space = spaces.Box(low=low, high=high, dtype=np.float64)
        self.action_space = spaces.Box(low=np.array([-1, -1]), high=np.array([1, 1]), dtype=np.float64)
        self.data = data.values
        self.index = data.index
        self.start_tick = 0
        self.end_tick = data.shape[0] - 2
        self.current_tick = None
        self.portfolio = portfolio.values
        self.total_reward = 0
        self.port_ret = 0
        self.terminated = False
        self.info = {}
        self.reward = 0

    def reset(self, seed=None):
        '''
        Reset the environment's state. 
    
        Parameters
        ----------
        seed : int, optional
            Random seed for the experiment, by default None.
        
        Returns
        -------
        state : ndarray
            The initial state of the environment.
        info : dict
            A dictionary of additional information for the initial state.

        Notes
        -----
        Set the random seed once at the beginning of the experiment. 
        Then keep the same seed for the whole experiment.
        '''
        super().reset(seed=seed)
        self.current_tick = self.start_tick
        state = self.data[self.current_tick]
        self.total_reward = 0
        self.info = {}
        self.done = False

        return state
    
    def step(self, action):
        '''
        Takes an action and returns the next state, reward, done, info.

        Parameters:
        -----------
        state : numpy.ndarray
            The current state of the environment.
        action : int
            The action taken by the agent.

        Returns:
        --------
        state_prime : numpy.ndarray
            The next state of the environment.
        reward : float
            The reward received by the agent.
        terminated : bool
            True if the episode is over, False otherwise.
        info : dict
            A dictionary containing any additional information about the transition.

        Notes:
        ------
        We use the term "state" instead of "observation" in this function.
        The observation is the state plus some other information.
        The state is the representation of the environment.
        For research purposes, we want to keep the setting as simple as possible.
        Therefore, we only use the state as the input.
        '''
        self.terminated = self._is_terminated()
        reward = self._calculate_reward(action)
        state_prime = self._trans_func(action)
        info = self._get_info()
        self.total_reward += reward

        return state_prime, reward, self.terminated, info

    
    def _trans_func(self, action):
        '''
        Move to the next state based on the current state and action.

        Parameters:
        ----------
        state : ndarray
            A numpy array that represents the current state of the environment.
        action : float
            An float that represents the action taken by the agent.

        Returns:
        -------
        state_prime : ndarray
            A numpy array that represents the next state of the environment.

        Description:
        ------------
        In the time series data, the transition is represented by the transition of the timestep.
        We get the timestep of current state and action value to calculate the next state.    
        '''
        state_prime = self.data[self.current_tick + 1]
        self.current_tick += 1

        return state_prime
        
    def _calculate_reward(self, action):
        '''
        Calculate the reward based on the current state and action.
        
        Parameters
        ----------
        state : numpy array
            The current state of the environment.
        action : numpy array
            The action taken by the agent.

        Returns
        -------
        float
            The reward for the current state and action.
        
        Description:
        ------------
        For time series data, the transition is represented by the transition of the timestep.
        We get the timestep of current state and action value to calculate the reward.
        We need wait untile the next timestep to get the equity return.
        Then we calculate the reward by the portfolio weight and the equity return.
        '''
        self.port_ret = action @ self.portfolio[self.current_tick + 1]
        reward = np.log(self.port_ret + 1)
    
        return reward

    def _is_terminated(self):
        '''
        Check if the episode is terminated.
        
        Returns
        -------
        bool
            True if the episode is terminated, False otherwise.
        
        Description:
        ------------
            An episode is terminated if the current tick is equal to the end tick.
        '''
        return self.current_tick == self.end_tick

    def _get_info(self):
        return {}

# Test the environment

prepare the data to construct the environment

In [3]:
data = get_econ_predictors(data_freq='quarterly')
data.pop('Equity Premium')
data_freq = 'quarterly'  

date_freq_to_data_func_map = {'monthly': ('./../data/portfolio_market_rf_month_1871_2021.csv',
                                            get_monthly_date_format), 
                            'quarterly': ('../../data/portfolio_market_rf_quarter_1871_2021.csv',
                                        get_quarterly_date_format)}
               
data_path, date_format_func = date_freq_to_data_func_map[data_freq]
portfolio = pd.read_csv(data_path, index_col=0)
portfolio.index = [pd.Period(str(x), freq='M') for x in portfolio.index]
portfolio = portfolio.loc[data.index]

In [5]:
env_test = EconMarketEnv(data, portfolio)

Test the constructor of the environment class

In [6]:
print(env_test.data.shape)
print(env_test.portfolio.shape)
print(env_test.index)
print(env_test.start_tick)
print(env_test.end_tick)
print(env_test.current_tick)
print(env_test.total_reward)
print(env_test.terminated)
print(env_test.reward)
print(env_test.info)

(232, 16)
(232, 2)
PeriodIndex(['1947-06', '1947-09', '1947-12', '1948-03', '1948-06', '1948-09',
             '1948-12', '1949-03', '1949-06', '1949-09',
             ...
             '2002-12', '2003-03', '2003-06', '2003-09', '2003-12', '2004-03',
             '2004-06', '2004-09', '2004-12', '2005-03'],
            dtype='period[M]', length=232)
0
230
None
0
False
0
{}


Test the rest() function

In [7]:
print(env_test.reset(100))
print(env_test.current_tick)
print(env_test.total_reward)
print(env_test.terminated)
print(env_test.np_random.__getstate__())
print(env_test.reset())
print(env_test.np_random.__getstate__())

(array([ 1.68678312e-03, -2.99638995e+00, -2.99375664e+00, -2.35730999e+00,
       -6.39079959e-01,  6.81122530e-03,  7.41116751e-01,  2.64727428e-02,
        3.80000000e-03,  2.16000000e-02,  5.87377800e-04,  1.78000000e-02,
        6.60000000e-03,  3.81822380e-03,  4.56621000e-03,  3.48415583e-02]), None)
0
0
False
{'bit_generator': 'PCG64', 'state': {'state': 241834680195789509926839563169936010333, 'inc': 30008503642980956324491363429807189605}, 'has_uint32': 0, 'uinteger': 0}
(array([ 1.68678312e-03, -2.99638995e+00, -2.99375664e+00, -2.35730999e+00,
       -6.39079959e-01,  6.81122530e-03,  7.41116751e-01,  2.64727428e-02,
        3.80000000e-03,  2.16000000e-02,  5.87377800e-04,  1.78000000e-02,
        6.60000000e-03,  3.81822380e-03,  4.56621000e-03,  3.48415583e-02]), None)
{'bit_generator': 'PCG64', 'state': {'state': 241834680195789509926839563169936010333, 'inc': 30008503642980956324491363429807189605}, 'has_uint32': 0, 'uinteger': 0}


Test the step() function

In [395]:

action = np.array([0.5, 0.5])
print(env_test.step(action))
print(env_test.port_ret)
print(env_test.current_tick)
print(env_test.total_reward)
print(env_test.terminated)


(array([-0.00752462, -2.95107911, -2.95767544, -2.27710185, -0.67397726,
        0.00418734,  0.7403234 ,  0.02405855,  0.008     ,  0.0213    ,
        0.00998745,  0.0133    ,  0.0062    , -0.02813465,  0.04545455,
        0.0338689 ]), -0.0028162729560917994, False, None)
-0.0028123109796186997
1
-0.0028162729560917994
False


Test the environment as a whole

In [4]:
env = EconMarketEnv(data, portfolio)  # create an instance of your environment
np.random.seed(1234)  # set the random seed to 1234
random.seed(1234)  # set the random seed to 1234

# reset the environment to get the initial state
state = env.reset(1234)
done = False  # whether the episode is done
total_reward = 0  # the total reward earned during the episode

while not done:
    # randomly select an action from the action space
    action = env.action_space.sample()

    # take a step in the environment with the selected action
    next_state, reward, done, info = env.step(action)

    total_reward += reward  # accumulate the reward

print(f'Total reward: {total_reward}')


Total reward: 0.008367524574711686


Make the environment compatible with stable-baselines3

In [6]:
from stable_baselines3.common.env_checker import check_env

In [7]:
check_env(env)

Train SB3 agent on the environment

In [5]:
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3 import PPO, A2C, TD3

# Create environment
env = EconMarketEnv(data, portfolio)

# Instantiate the agent
model = PPO("MlpPolicy", env, verbose=1, tensorboard_log="../../log/tensorboard_econmarket/")

# Train the agent and display a progress bar
model.learn(total_timesteps=int(2e5))


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to ../../log/tensorboard_econmarket/PPO_2
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 231      |
|    ep_rew_mean     | -0.503   |
| time/              |          |
|    fps             | 7824     |
|    iterations      | 1        |
|    time_elapsed    | 0        |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 231         |
|    ep_rew_mean          | -0.376      |
| time/                   |             |
|    fps                  | 4796        |
|    iterations           | 2           |
|    time_elapsed         | 0           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.007834452 |
|    clip_fraction        | 0.0805      |
|    clip_ra

<stable_baselines3.ppo.ppo.PPO at 0x7fb9145c5890>