# Construct a custom Environment for Pair Trading

Some examples on the market
* [custom env example](https://colab.research.google.com/github/araffin/rl-tutorial-jnrr19/blob/sb3/5_custom_gym_env.ipynb#scrollTo=RqxatIwPOXe_)
* [StockTradingEnv by Adam King](https://github.com/notadamking/Stock-Trading-Environment)
* [FinRL](https://github.com/AI4Finance-Foundation/FinRL)

Target is to construct a custom Env for pair trading
This env restrict the behaviour of RL learner to pair trading only

In [1]:
import warnings
warnings.filterwarnings('ignore')

import os
import csv
import numpy as np
import pandas as pd
import gymnasium as gym
import statsmodels.api as sm

from gymnasium import spaces
from datetime import date
from envs.env_gridsearch import kellycriterion
from sklearn.model_selection import train_test_split
from stable_baselines3.common.vec_env import DummyVecEnv
from utils.read2df import read2df
from params import *

os.makedirs("result/rl-restrict", exist_ok=True)

for root, dirs, files in os.walk(f"result/rl-restrict/"):
    for file in files:
        os.remove(os.path.join(root, file))

PERIOD = 150 # Only look at the current price
CASH = 10000
ISKELLY = True
OPEN_THRE = 4.0
CLOS_THRE = 0.3

Load data from `preliminaries.ipynb`

In [2]:
import pickle

with open('result/cointncorr.pickle', 'rb') as pk:
    data = pickle.load(pk)

dfs = read2df(symbols=data[0], freqs={data[1]: freqs[data[1]]})

df0 = dfs[0][dfs[0]['tic']==data[0][0]].reset_index(drop=True)
df1 = dfs[0][dfs[0]['tic']==data[0][1]].reset_index(drop=True)

Set data before `trade_data` as training data, after `trade_data` is trade_data

In [3]:
train0 = df0[df0['datetime'] < trade_date]
train1 = df1[df1['datetime'] < trade_date]

test0 = df0[df0['datetime'] >= trade_date]
test1 = df1[df1['datetime'] >= trade_date]

print(f"The length of our training data: {len(train0)}")

The length of our training data: 1589703


# Define the custom Environment

The behaviour of RL learner is restricted. 

The action is defined as discrete actions -1, 0, 1

-1 means short df0 long df1, 0 means close position, +1 means long df0 short df1

> Baseline3 does not allow negative descrete space

In [4]:
# The lookback period for the observation space

class PairTradingEnv(gym.Env):
    metadata = {'render.modes': ['console']}

    # for pair trading, we need to feed in two OHLCV dataframes
    def __init__(self, df0, df1, tc=0.001, period=PERIOD, cash=CASH, isKelly=ISKELLY, model=""):
        super().__init__()

        if not df0['time'].equals(df1['time']):
            raise ValueError("Two dataframe must have same time index")

        self.cash = cash
        self.period = period
        self.model = model

        # transaction cost
        self.tc = tc
        # Whether to use Kelly or not
        self.isKelly = isKelly

        self.df0 = df0[['close', 'datetime']]
        self.df1 = df1[['close', 'datetime']]

        self.reward_range = (-np.inf, np.inf)

        # Baseline 3 does not support Dict/Tuple action spaces....only Box Discrete MultiDiscrete MultiBinary
        self.action_space = spaces.Discrete(4) # actions: {0: short p0 long p1, 1: close, 2: long p0 short p1, 3: do nothing}

        self.observation_space = spaces.Dict({
            "compare_open_thre": spaces.Discrete(3), # {0: above positive thres, 1: in between, 2: below negative thres}
            "compare_clos_thre": spaces.Discrete(3), # {0: above positive thres, 1: in between, 2: below negative thres}
            "zscore":     spaces.Box(low=-np.inf, high=np.inf, dtype=np.float64),
            "position":   spaces.Discrete(3), # {0: short leg0 long leg1, 1: none, 2: long leg0 short leg1}
        })

        # if the length is 35, then the index shall be 0~34
        self.max_steps = len(df0)-1
    
    def _kellycriterion(self, direct):
        # direct is +1 or -1
        spreads = pd.Series(self.distance[-self.period:-1]) * direct   
        kc_f = kellycriterion(spreads)

        return kc_f

    def _next_observation(self):
        # The current step is always higher than the PERIOD as defined in the 

        prices0 = self.df0['close'].iloc[self.current_step-self.period: self.current_step]
        prices1 = self.df1['close'].iloc[self.current_step-self.period: self.current_step]

        self.distance = [x - y for x, y in zip(prices0, prices1)]
        zscore = (self.distance[-1] - np.mean(self.distance)) / np.std(self.distance)

        '''The OPEN_THRE and CLOS_THRE comes from trade_gridsearch'''
        open_thre = OPEN_THRE
        clos_thre = CLOS_THRE
        compare_open_thre = 0 if zscore > open_thre else 2 if zscore < -open_thre else 1
        compare_clos_thre = 0 if zscore > clos_thre else 2 if zscore < -open_thre else 1
        
        obs = {
            "compare_open_thre": compare_open_thre,
            "compare_clos_thre": compare_clos_thre,
            "zscore": np.array([zscore]),
            "position": self.position,
        }
        
        return obs

    def _close_position(self):
        order_amount0 = -self.holding0
        order_amount1 = -self.holding1

        order_value0 = order_amount0 * self.curr_price0
        order_value1 = order_amount1 * self.curr_price1
        tc_cost = (abs(order_value0) + abs(order_value1)) * self.tc

        self.cash -= order_value0 + order_value1 + tc_cost
        self.holding0 = 0
        self.holding1 = 0
        self.position = 1

        self.order_amount0 = order_amount0
        self.order_amount1 = order_amount1

    def _open_position(self):

        # evaluate purchasing power 
        max_amount0 = self.cash/self.curr_price0
        max_amount1 = self.cash/self.curr_price1

        direction = self.action-1
        kc = self._kellycriterion(direct=direction) if self.isKelly else 1
        order_amount0 = direction * max_amount0 * kc
        order_amount1 = -direction * max_amount1 * kc

        order_value0 = order_amount0 * self.curr_price0
        order_value1 = order_amount1 * self.curr_price1
        tc_cost = (np.abs(order_value0) + np.abs(order_value1)) * self.tc

        # Open a new position
        self.cash -= order_value0 + order_value1 + tc_cost
        self.holding0 = order_amount0
        self.holding1 = order_amount1
        self.position = 1 if kc==0 else self.action

        self.kc = kc
        self.order_amount0 = order_amount0
        self.order_amount1 = order_amount1

    def _reverse_position(self):
        max_amount0 = self.cash/self.curr_price0
        max_amount1 = self.cash/self.curr_price1

        direction = self.action-1
        kc = self._kellycriterion(direct=direction) if self.isKelly else 1
        order_amount0 = direction * max_amount0 * kc - self.holding0
        order_amount1 = -direction * max_amount1 * kc - self.holding1

        order_value0 = order_amount0 * self.curr_price0
        order_value1 = order_amount1 * self.curr_price1
        tc_cost = (np.abs(order_value0) + np.abs(order_value1)) * self.tc

        self.cash -= order_value0 + order_value1 + tc_cost
        self.holding0 += order_amount0
        self.holding1 += order_amount1
        self.position = 1 if kc==0 else self.action

        # for debugging
        self.kc = kc
        self.order_amount0 = order_amount0
        self.order_amount1 = order_amount1

    def _take_action(self, action):

        # Record current net_worth to prev_net_worth
        self.prev_net_worth = self.net_worth

        self.curr_price0 = self.df0['close'].iloc[self.current_step]
        self.curr_price1 = self.df1['close'].iloc[self.current_step]

        self.action = action
        
        if self.action == 1:
            self._close_position()

        elif self.action == 0:
            if self.position  == 1:
                self._open_position()
            
            elif self.position == 2:
                self._reverse_position()
    
        elif self.action == 2:
            if self.position == 0:
                self._reverse_position()

            elif self.position == 1:
                self._open_position()

        # We record the net_worth from previous period to prev_net_worth
        self.net_worth = self.cash + self.holding0 * self.curr_price0 + self.holding1*self.curr_price1

    def step(self, action):
        self._take_action(action)
        self.current_step += 1

        self.observation = self._next_observation()
        reward = self.net_worth - self.prev_net_worth
        terminated = bool(self.current_step >= self.max_steps)
        truncated = bool(self.net_worth <= 0)
        info = {}

        return self.observation, reward, terminated, truncated, info

    def reset(self, seed=None):
        np.random.seed(seed)
        
        self.cash = self.cash
        self.net_worth = self.cash
        self.prev_net_worth = self.cash
        self.position = 0
        self.render_step = 0
        
        self.holding0 = 0
        self.holding1 = 0
        self.order_amount0 = 0
        self.order_amount1 = 0
        self.kc = 0

        self.current_step = np.random.randint(self.period, self.max_steps)

        return self._next_observation(), {}
    
    def render(self):
        profit = self.net_worth - self.cash
        
        # print(
        #     # f"direction: {self.action-1} "
        #     f"networth: {self.net_worth}, " 
        #     f"action: {self.action}, position: {self.position}, kc: {self.kc} "
        #     f"order_amount0: {self.order_amount0}, order_amount1: {self.order_amount1} "
        #     f"holding0: {self.holding0}, holding1: {self.holding1} "
        #     f"cash: {self.cash}, curr_price0: {self.curr_price0}, curr_price1: {self.curr_price1} "
        # )
            
        with open(f"result/rl-restrict/networth_{self.model}.csv", mode='a', newline='') as csv_f:
            writer = csv.writer(csv_f)
            writer.writerow(
                [self.df0['datetime'].iloc[self.current_step], 
                self.net_worth]
            )

## Check with baselin3 `env_checker`

Check if the env meets the requirements of `stable_baseline3`

In [5]:
from stable_baselines3.common.env_checker import check_env
# > UserWarning: The action space is not based off a numpy array. Typically this means it's either a Dict or Tuple space. This type of action space is currently not supported by Stable Baselines 3. You should try to flatten the action using a wrapper.
# Baseline 3 does not support Dict/Tuple action spaces....only Box Discrete MultiDiscrete MultiBinary
# Is there another way to achieve the same functionality?

env = PairTradingEnv(train0, train1)
check_env(env)

## Do a test run with random generated actions

In [6]:
env = PairTradingEnv(train0, train1, tc=0, model="test")
obs, _ = env.reset()

print(f"observation_space: {env.observation_space}")
print(f"action_space: {env.action_space}")
print(f"action_space.sample: {env.action_space.sample()}")

n_steps = 20

for step in range(n_steps):
    obs, reward, terminated, truncated, info = env.step(action=env.action_space.sample())
    done = terminated or truncated
    env.render()
    if done:
        break

observation_space: Dict('compare_clos_thre': Discrete(3), 'compare_open_thre': Discrete(3), 'position': Discrete(3), 'zscore': Box(-inf, inf, (1,), float64))
action_space: Discrete(4)
action_space.sample: 2


## Models from stable_baselines3

Train with training data

In [7]:
'''PPO'''

from stable_baselines3 import PPO

env = PairTradingEnv(train0, train1, tc=0, model="ppo")

model_ppo = PPO("MultiInputPolicy", env, verbose=0, tensorboard_log="logs")
model_ppo.learn(total_timesteps=2000000)
model_ppo.save("result/rl-restrict/ppo_pairtrading")

In [8]:
'''A2C'''

from stable_baselines3 import A2C

env = PairTradingEnv(train0, train1, tc=0, model="a2c")

model_a2c = A2C("MultiInputPolicy", env, verbose=0)
model_a2c.learn(total_timesteps=1000)
model_a2c.save("result/rl-restrict/a2c_pairtrading")

In [9]:
'''DQN'''

from stable_baselines3 import DQN

env = PairTradingEnv(train0, train1, tc=0, model="dqn")

model_dqn = DQN("MultiInputPolicy", env, verbose=0)
model_dqn.learn(total_timesteps=10000)
model_dqn.save("result/rl-restrict/dqn_pairtrading")

## Use the model on Test data

In [10]:
# del model_ppo, model_a2c, model_dqn

# model_ppo = PPO.load("result/rl-restrict/ppo_pairtrading.zip")
# model_a2c = A2C.load("result/rl-restrict/a2c_pairtrading.zip")
model_dqn = DQN.load("result/rl-restrict/dqn_pairtrading.zip")

In [20]:
env = PairTradingEnv(test0, test1, tc=0, model="ppo", isKelly=True)

env.reset()
while True:
    action, _states = model_ppo.predict(obs)
    observation, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated
    env.render()
    if terminated:
        print("Test Finished!")
        break
    elif truncated:
        print("bankrupted!")
        break

Test Finished!


In [12]:
env = PairTradingEnv(test0, test1, tc=0, model="a2c")

env.reset()
while True:
    action, _states = model_a2c.predict(obs)
    observation, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated
    env.render()
    if terminated:
        print("Test Finished!")
        break
    elif truncated:
        print("bankrupted!")
        break

Test Finished!


In [13]:
env = PairTradingEnv(test0, test1, tc=0, model="dqn")

env.reset()
while True:
    action, _states = model_dqn.predict(obs)
    observation, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated
    env.render()
    if terminated:
        print("Test Finished!")
        break
    elif truncated:
        print("bankrupted!")
        break

Test Finished!


### Analyze with PyFolio

In [22]:
folder_path = f"result/rl-restrict/"
os.remove(f"{folder_path}networth_test.csv") if os.path.exists(f"{folder_path}networth_test.csv") else None
csv_files = [file for file in os.listdir(folder_path) if file.endswith('.csv')]

best_res, best_model = None, None
for file_name in csv_files:
    file_path = os.path.join(folder_path, file_name)
    
    with open(file_path, 'r') as csv_file:
        csv_reader = csv.reader(csv_file)
        
        # Loop through the lines in the CSV file
        last_line = None
        for row in csv_reader:
            last_line = row  # Update last_line with the current row
    
    if best_res is None or float(best_res) < float(last_line[1]):
        best_res = last_line[1]
        best_model = file_name

    print(f"The ending capital of {file_name} is {last_line[0:2]}")

print(f"The best model is {best_model}")

The ending capital of networth_a2c.csv is ['2023-10-31 23:59:59.999000', '9931.931795015245']
The ending capital of networth_dqn.csv is ['2023-10-31 23:59:59.999000', '9940.087414121654']
The ending capital of networth_ppo.csv is ['2023-10-31 23:59:59.999000', '9970.694830722687']
The best model is networth_ppo.csv


In [15]:
def get_return(networthcsv):
    returns = pd.read_csv(networthcsv, names=['datetime', 'returns', "action", "position", "order0", "order1"])
    returns['datetime'] = pd.to_datetime(returns['datetime'])
    returns.set_index('datetime', inplace=True)
    res_daily = returns.resample('D').mean()
    res_daily['returns'] = res_daily['returns'].pct_change()
    res_daily = res_daily.dropna()
    return res_daily

best_return = get_return(f'result/rl-restrict/{best_model}')

In [16]:
best_df = pd.read_csv(f'result/rl-restrict/{best_model}', names=["datetime", "networth"])

In [17]:
# import matplotlib.pyplot as plt

# plt.plot(best_df['datetime'], best_df['networth'])

In [18]:
# # Calculate total orders count
# total_orders_count = best_df.shape[0]

# # Calculate won orders count
# won_orders_count = best_df[(best_df['order1'] == 1) & (best_df['position'] == 0)].shape[0]

# # Calculate lost orders count
# lost_orders_count = best_df[(best_df['order1'] == 2) & (best_df['position'] == 0)].shape[0]

# # Calculate Win/Loss order ratio
# win_loss_order_ratio = won_orders_count / lost_orders_count if lost_orders_count != 0 else np.inf

# # Calculate Avg order pnl
# avg_order_pnl = best_df['order0'].mean()

# # Calculate Avg order pnl won
# avg_order_pnl_won = best_df[(best_df['order1'] == 1) & (best_df['position'] == 0)]['order0'].mean()

# # Calculate Avg order pnl lost
# avg_order_pnl_lost = best_df[(best_df['order1'] == 2) & (best_df['position'] == 0)]['order0'].mean()

# # Calculate Avg long order pnl
# avg_long_order_pnl = best_df[(best_df['order1'] == 1) & (best_df['position'] == 2)]['order0'].mean()

# # Calculate Avg short order pnl
# avg_short_order_pnl = best_df[(best_df['order1'] == 1) & (best_df['position'] == 0)]['order1'].mean()

# # Print the calculated indices
# print("Total orders count:", total_orders_count)
# print("Won orders count:", won_orders_count)
# print("Lost orders count:", lost_orders_count)
# print("Win/Loss order ratio:", win_loss_order_ratio)
# print("Avg order pnl:", avg_order_pnl)
# print("Avg order pnl won:", avg_order_pnl_won)
# print("Avg order pnl lost:", avg_order_pnl_lost)
# print("Avg long order pnl:", avg_long_order_pnl)
# print("Avg short order pnl:", avg_short_order_pnl)


In [19]:
# import pyfolio

# pyfolio.tears.create_full_tear_sheet(best_return['returns'])