In [None]:
import gym
import ray
import numpy as np
import pandas as pd
from tqdm import tqdm
ray.init()

Load the current s&p 500 ticker list, we will only be trading on these stocks

In [None]:

tickers = open('s&p500_tickers.dat', 'r').read().split('\n')
print(tickers)

Data start and end dates and initialise ticker dataframe dictionary

In [None]:
one_day = pd.Timedelta(days=1)

i = 0
cur_day = pd.to_datetime('1992-06-15', format=r'%Y-%m-%d') #pd.to_datetime('1992-06-15')
end_day = pd.to_datetime('2020-01-01', format=r'%Y-%m-%d')

end_df = pd.read_csv('equity_data/' + (end_day - one_day).strftime(r'%Y%m%d') + '.csv')
ticker_df = end_df.loc[end_df.symbol.isin(tickers)] # Tickers that are in the dataframe on the last day
ticker_dict = {ticker_df.symbol.iloc[i] : ticker_df.finnhub_id.iloc[i] for i in range(len(ticker_df.index))} # Create a mapping between tickers and finnhub_ids

For all dates between start and end range, load the day into ticker dict with the key being ticker and dataframe indexed by day

In [None]:

df_columns = pd.read_csv('equity_data/' + cur_day.strftime(r'%Y%m%d') + '.csv').columns
ticker_dfs = { ticker : pd.DataFrame(index=pd.date_range(cur_day, end_day - one_day, freq='D'), columns=df_columns) for ticker in tickers }
save_df = False
if save_df:
    pbar = tqdm(total=(end_day - cur_day).days)
    while (cur_day != end_day):
        pbar.update()
        try:
            day_df = pd.read_csv('equity_data/' + cur_day.strftime(r'%Y%m%d') + '.csv')
        except FileNotFoundError:
            cur_day += one_day
            i += 1
            continue
        for ticker in ticker_dict.keys():
            if ticker in day_df.symbol.values:
                row = day_df.loc[day_df.finnhub_id == ticker_dict[ticker]]
                if row.shape[0] == 2:
                    print(ticker)
                    print(row)
                if not row.shape[0] == 0:
                    ticker_dfs[ticker].loc[cur_day] = row.values[0, :]
        cur_day += one_day
        i += 1
    pbar.close()

In [None]:
# Loading logic, as the above is slow process and we dont want to perform it every time
if save_df:
    for ticker, ticker_frame in ticker_dfs.items():
        ticker_frame.reset_index(inplace=True)
        ticker_frame.to_feather('equity_data/stored/' + ticker.lower() + '.feather')
else:
    print('Loading from storage...')
    for symbol in ticker_dict.keys():
        ticker_dfs[symbol] = pd.read_feather('equity_data/stored/' + symbol.lower() + '.feather').set_index('index', drop=True)
    print(ticker_dfs)

In [None]:
# Clear the data somewhat, we only want frames with more than 2000 days that have gaps no larger than 7 days
to_delete = []
for ticker, frame in ticker_dfs.items():
    prev_day = frame.index[-1]
    frame.dropna(axis='index', how='all', inplace=True)
    if frame.empty:
        to_delete.append(ticker)
    elif len(frame.index) < 2000:
        to_delete.append(ticker)
    else:
        for day in frame.index[::-1][1:]:
            if (prev_day - day).days > 7: # if gap between datapoints larger than 7 days, remove
                to_delete.append(ticker)
                break
            prev_day = day

for ticker in to_delete:
    del ticker_dfs[ticker]
    print('Deleting ticker: ' + ticker)
print(len(ticker_dfs.keys()))

In [None]:
# Align dataframes by date and create an intersection
index_intersection = ticker_dfs[list(ticker_dfs.keys())[0]].index
print(index_intersection)
for ticker, ticker_frame in ticker_dfs.items():
    index_intersection = index_intersection.intersection(ticker_frame.index)
    print(ticker + ': ' + str(len(index_intersection)))
print(index_intersection)

In [None]:
for ticker in ticker_dfs.keys():
    ticker_dfs[ticker] = ticker_dfs[ticker].loc[index_intersection]
print(ticker_dfs)

In [None]:
# Env config file structure for reference
n_assets = 0
n_features = 0
config = {
    'initial_balance': 0,
    'initial_portfolio': [0]*n_assets,
    'tickers': ['']*n_assets, # Tickers to trade, must correspond to tickers in dataframe dict! Implicitly defines number of assets
    'indicators': [None]*n_features, # Indicator functions/classes to compute features for each stock, implicitly defines number of features. TODO: Support multidimensional indicators
    'max_indicator_lookback': 0, # Number of days after which all indicators can compute proper values
    'trading_days': 0,
    'start_day_offset': None
}

In [None]:
class TradingEnv(gym.Env):

    def __init__(self, env_config):
        super(TradingEnv, self).__init__()

        self._env_config = env_config
        self._tickers = env_config['tickers']
        self._indicator_funcs = self._env_config['indicators']
        self._max_indicator_lookback = self._env_config['max_indicator_lookback'] # Number of days after which all indicators can compute proper values

        self._n_assets = len(self._tickers)
        self._n_features = len(self._indicator_funcs)

        assert self._n_assets != 0, 'Number of assets must not be zero!'
        assert self._n_features != 0, 'Number of features must not be zero!'

        self._df_dict = env_config['df_dict'] # Daily OHCL data for each stock, indexed and aligned by day

        self._days = self._df_dict[self._tickers[0]].index
        self._trading_days = env_config['trading_days'] # Number of days the algorithm will be trading
        self._start_day_idx = env_config['start_day_offset'] # Offset of the first trading day from the first dataframe day
        
        if self._start_day_idx is not None:
            assert self._start_day_idx >= self._max_indicator_lookback, 'start_day_offset must be larger than max_indicator_lookback in order to properly initialise all indicators'
            assert self._start_day_idx + self._trading_days <= len(self._days), 'start_day_idx + trading_days must be lower than the number of days'
        else:
            self._start_day_idx
        
        assert self._trading_days + self._max_indicator_lookback <= len(self._days) ,'The sum of trading_days + max_indicator_lookback must be lower than the number of days in the dataframe'

        self._initial_balance = self._env_config['initial_balance']
        self._initial_portfolio = self._env_config['initial_portfolio'] if self._env_config['initial_portfolio'] is not None else [0] * self._n_assets

        assert len(self._initial_portfolio) == self._n_assets, 'Size of initial portfolio must equal the number of assets!'

        action_shape = (self._n_assets + 1,)
        obs_shape = (self._n_features*self._n_assets + 1,)

        self.action_space = gym.spaces.Box(np.full(action_shape, 0), np.full(action_shape, 1), shape=action_shape, dtype=np.float16) # Action space is the assets + cash for rebalancing
        self.observation_space = gym.spaces.Box(np.full(obs_shape, 0), np.inf, shape=obs_shape, dtype=np.float16) # Observation space is all the features for each asset + cash
        self.max_episode_steps = self._trading_days

    def reset(self):
        self._balance = self._initial_balance
        self._portfolio = self._initial_portfolio

        if self._start_day_idx is None:
            self._start_day_idx = np.random.randint(self._max_indicator_lookback, len(self._days) - self._trading_days) # If no start day chosen, generate a random start
        self._cur_day_idx = self._start_day_idx
        self._cur_day = self._days[self._cur_day_idx]
        self._cur_day_idx += 1 # Advance one day
        
        indicators = self._compute_indicators(self._cur_day) # Compute the indicators for the start date
        
        return np.append(indicators, self._balance) # Observation is number of indicators * number of assets + 1

    def _compute_indicators(self, day):
        features = np.empty((self._n_features*self._n_assets,))
        for (i, ticker) in enumerate(self._tickers):
            for (j, indicator) in enumerate(self._indicator_funcs):
                ticker_frame_slice = self._df_dict[ticker].loc[self._days[self._start_day_idx] - pd.Timedelta(days=1)*self._max_indicator_lookback:(day + pd.Timedelta(days=1))] # Get the relevant dataframe up until this day (inclusive)
                features[i*self._n_features + j] = indicator(ticker_frame_slice)
        return features

    def _asset_prices(self, day): # Use open prices on the current day
        prices = np.empty((self._n_assets,))
        for i, ticker in enumerate(self._tickers):
            prices[i] = self._df_dict[ticker].loc[day].open
        return prices

    def _portfolio_val(self, portfolio, balance, day):
        return np.dot(self._asset_prices(self._cur_day), portfolio) + balance
    
    def _rebalance(self, actions): # TODO: Test this more to see if it makes sense
        weightings = self._softmax(actions) # First weight is for cash

        prices = self._asset_prices(self._cur_day) # Get the open prices of assets on the current day
        portfolio_val = np.dot(prices, self._portfolio) + self._balance
        return (portfolio_val*np.divide(weightings[1:], prices), portfolio_val*weightings[0]) # Rebalanced portfolio in the form of (assets, cash)

    def _reward(self):
        # For now just compute the increase in portfolio value
        return 1 - self._portfolio_val(self._portfolio, self._balance, self._cur_day) / self._portfolio_val(self._initial_portfolio, self._initial_balance, self._days[self._start_day_idx])

    def step(self, action):
        self._cur_day = self._days[self._cur_day_idx]
        #print('Day: ' + str(self._cur_day))
        (self._portfolio, self._balance) = self._rebalance(action)

        obs = np.append(self._compute_indicators(self._cur_day), self._balance)
        rw = self._reward()
        done = (self._cur_day_idx - self._start_day_idx) >= self._trading_days
        info = {} # TODO: Add info here

        self._cur_day_idx += 1 # Advance one day
        return obs, rw, done, info
    
    def _softmax(self, x):
        """Compute softmax values for each sets of scores in x."""
        e_x = np.exp(x - np.max(x))
        return e_x / e_x.sum()

In [None]:
close_indicator = lambda df: df.close[-1]

In [None]:
n_assets = len(ticker_dfs.keys())
env_config = {
    'initial_balance': 1E6,
    'initial_portfolio': [0]*n_assets,
    'tickers': list(ticker_dfs.keys()), # Tickers to trade, must correspond to tickers in dataframe dict! Implicitly defines number of assets
    'indicators': [close_indicator], # Indicator functions/classes to compute features for each stock, implicitly defines number of features. TODO: Support multidimensional indicators
    'max_indicator_lookback': 0, # Number of days after which all indicators can compute proper values
    'trading_days': 100,
    'start_day_offset': None,
    'df_dict': ticker_dfs
}

In [None]:
import ray.rllib.agents.ppo as ppo
import ray.rllib.models.catalog as catalog
import ray.tune as tune
from ray.tune.logger import pretty_print

config = ppo.DEFAULT_CONFIG.copy()
config["num_gpus"] = 0
config["num_workers"] = 5
config["rollout_fragment_length"] = 100
config["train_batch_size"] = 500
#config["framework"] = "torch"
config["env_config"] = env_config
config["log_level"] = "DEBUG"
config["env"] = TradingEnv

model_config = catalog.MODEL_DEFAULTS.copy()
model_config["use_lstm"] = True
model_config["max_seq_len"] = 100

#trainer = ppo.PPOTrainer(config=config, env=TradingEnv)
tune.run(ppo.PPOTrainer, stop={"training_iteration": 100}, config=config, local_dir='ray_results')

"""
# Can optionally call trainer.restore(path) to load a checkpoint.

for i in range(1000):
    # Perform one iteration of training the policy with PPO
    result = trainer.train()
    print(pretty_print(result))
    
    checkpoint = trainer.save()
    print("checkpoint saved at", checkpoint)"""