In [1]:
import datetime
import glob
import os
import warnings
from pathlib import Path

import gymnasium as gym
import numpy as np
import pandas as pd
import ray
from gym_trading_env.utils.history import History
from gym_trading_env.utils.portfolio import TargetPortfolio
from gymnasium import spaces
from ray.rllib.env import EnvContext
from gym_trading_env.environments import MultiDatasetTradingEnv

from preprocess import preprocess

ModuleNotFoundError: No module named 'ray'

In [None]:
ray.init()

In [None]:
def reward_only_position_changed(history):
    prev_position = history[-2]["position"]
    curr_position = history[-1]["position"]
    holding_fee = 0.01
    holding_cost = 0

    index = 1
    index_limit = len(history)

    while index < index_limit and history["position", -index] == prev_position:
        index += 1
        holding_cost -= holding_fee

    if prev_position == curr_position:
        if curr_position == 0:
            return holding_cost
        else:
            return 0
    else:
        return (history["portfolio_valuation", -1] / history["portfolio_valuation", -2] - 1)  # / sqrt(index)

In [None]:
warnings.filterwarnings("error")


def basic_reward_function(history: History):
    return np.log(history["portfolio_valuation", -1] / history["portfolio_valuation", -2])


def dynamic_feature_last_position_taken(history):
    return history['position', -1]


def dynamic_feature_real_position(history):
    return history['real_position', -1]


class TradingEnv(gym.Env):
    metadata = {'render_modes': ['logs']}

    def __init__(self, config: EnvContext):
        self.max_episode_duration = config.get("max_episode_duration")
        self.name = config.get("name", "Stock")
        self.verbose = config.get("verbose", 1)
        self.positions = config.get("positions")
        self.dynamic_feature_functions = config.get(
            "dynamic_feature_functions",
            [
                dynamic_feature_last_position_taken,
                dynamic_feature_real_position,
            ],
        )
        self.reward_function = config.get("reward_function", basic_reward_function)
        self.windows = config.get("windows")
        self.trading_fees = config.get("trading_fees", 0)
        self.borrow_interest_rate = config.get("borrow_interest_rate", 0)
        self.portfolio_initial_value = float(config.get("portfolio_initial_value", 1000))
        self.initial_position = config.get("initial_position", "random")
        assert (
                self.initial_position in self.positions or self.initial_position == "random"
        ), "The 'initial_position' parameter must be 'random' or a position mentionned in the 'position' (default is [0, 1]) parameter."
        self.max_episode_duration = config.get("max_episode_duration", "max")
        self.render_mode = config.get("render_mode", "logs")
        assert self.render_mode in self.metadata["render_modes"]
        self._set_df(config.get("df"))

        self.action_space = spaces.Discrete(len(config.get("positions")))
        self.observation_space = spaces.Box(-np.inf, np.inf, shape=[self._nb_features])

        if self.windows is not None:
            self.observation_space = spaces.Box(
                low=-np.inf,
                high=np.inf,
                shape=[self.windows, self._nb_features],
                dtype=np.float32,
            )

        self.log_metrics = []

    def _set_df(self, df):
        df = df.copy()
        self._features_columns = [col for col in df.columns if "feature" in col]
        self._info_columns = list(set(list(df.columns) + ["close"]) - set(self._features_columns))
        self._nb_features = len(self._features_columns)
        self._nb_static_features = self._nb_features

        for i in range(len(self.dynamic_feature_functions)):
            df[f"dynamic_feature__{i}"] = 0
            self._features_columns.append(f"dynamic_feature__{i}")
            self._nb_features += 1

        self.df = df
        self._obs_array = np.array(self.df[self._features_columns], dtype=np.float32)
        self._info_array = np.array(self.df[self._info_columns])
        self._price_array = np.array(self.df["close"])

    def _get_ticker(self, delta=0):
        return self.df.iloc[self._idx + delta]

    def _get_price(self, delta=0):
        return self._price_array[self._idx + delta]

    def _get_obs(self):
        for i, dynamic_feature_function in enumerate(self.dynamic_feature_functions):
            self._obs_array[self._idx, self._nb_static_features + i] = dynamic_feature_function(self.historical_info)

        if self.windows is None:
            _step_index = self._idx
        else:
            _step_index = np.arange(self._idx + 1 - self.windows, self._idx + 1)
        return self._obs_array[_step_index].astype(np.float32)

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        self._step = 0
        self._position = np.random.choice(
            self.positions) if self.initial_position == 'random' else self.initial_position
        self._limit_orders = {}

        self._idx = 0
        if self.windows is not None: self._idx = self.windows - 1
        if self.max_episode_duration != 'max':
            self._idx = np.random.randint(
                low=self._idx,
                high=len(self.df) - self.max_episode_duration - self._idx
            )

        self._portfolio = TargetPortfolio(
            position=self._position,
            value=self.portfolio_initial_value,
            price=self._get_price()
        )

        self.historical_info = History(max_size=len(self.df))
        self.historical_info.set(
            idx=self._idx,
            step=self._step,
            date=self.df.index.values[self._idx],
            position_index=self.positions.index(self._position),
            position=self._position,
            real_position=self._position,
            data=dict(zip(self._info_columns, self._info_array[self._idx])),
            portfolio_valuation=self.portfolio_initial_value,
            portfolio_distribution=self._portfolio.get_portfolio_distribution(),
            reward=0,
        )

        return self._get_obs(), self.historical_info[0]

    def render(self):
        pass

    def _trade(self, position, price=None):
        self._portfolio.trade_to_position(
            position,
            price=self._get_price() if price is None else price,
            trading_fees=self.trading_fees
        )
        self._position = position
        return

    def _take_action(self, position):
        if position != self._position:
            self._trade(position)

    def _take_action_order_limit(self):
        if len(self._limit_orders) > 0:
            ticker = self._get_ticker()
            for position, params in self._limit_orders.items():
                if position != self._position and params['limit'] <= ticker["high"] and params['limit'] >= ticker[
                    "low"]:
                    self._trade(position, price=params['limit'])
                    if not params['persistent']: del self._limit_orders[position]

    def add_limit_order(self, position, limit, persistent=False):
        self._limit_orders[position] = {
            'limit': limit,
            'persistent': persistent
        }

    def step(self, position_index=None):
        if position_index is not None: self._take_action(self.positions[position_index])
        self._idx += 1
        self._step += 1

        self._take_action_order_limit()
        price = self._get_price()
        self._portfolio.update_interest(borrow_interest_rate=self.borrow_interest_rate)
        portfolio_value = self._portfolio.valorisation(price)
        portfolio_distribution = self._portfolio.get_portfolio_distribution()

        done, truncated = False, False

        if portfolio_value <= 0:
            done = True
        if self._idx >= len(self.df) - 1:
            truncated = True
        if isinstance(self.max_episode_duration, int) and self._step >= self.max_episode_duration - 1:
            truncated = True

        self.historical_info.add(
            idx=self._idx,
            step=self._step,
            date=self.df.index.values[self._idx],
            position_index=position_index,
            position=self._position,
            real_position=self._portfolio.real_position(price),
            data=dict(zip(self._info_columns, self._info_array[self._idx])),
            portfolio_valuation=portfolio_value,
            portfolio_distribution=portfolio_distribution,
            reward=0
        )
        if not done:
            reward = self.reward_function(self.historical_info)
            self.historical_info["reward", -1] = reward

        if done or truncated:
            self.calculate_metrics()
            self.log()
        return self._get_obs(), self.historical_info["reward", -1], done, truncated, self.historical_info[-1]

    def add_metric(self, name, function):
        self.log_metrics.append({
            'name': name,
            'function': function
        })

    def calculate_metrics(self):
        self.results_metrics = {
            "Market Return": f"{100 * (self.historical_info['data_close', -1] / self.historical_info['data_close', 0] - 1):5.2f}%",
            "Portfolio Return": f"{100 * (self.historical_info['portfolio_valuation', -1] / self.historical_info['portfolio_valuation', 0] - 1):5.2f}%",
        }

        for metric in self.log_metrics:
            self.results_metrics[metric['name']] = metric['function'](self.historical_info)

    def get_metrics(self):
        return self.results_metrics

    def log(self):
        if self.verbose > 0:
            text = ""
            for key, value in self.results_metrics.items():
                text += f"{key} : {value}   |   "
            print(text)

    def save_for_render(self, dir="render_logs"):
        assert "open" in self.df and "high" in self.df and "low" in self.df and "close" in self.df, "Your DataFrame needs to contain columns : open, high, low, close to render !"
        columns = list(set(self.historical_info.columns) - set([f"date_{col}" for col in self._info_columns]))
        history_df = pd.DataFrame(
            self.historical_info[columns], columns=columns
        )
        history_df.set_index("date", inplace=True)
        history_df.sort_index(inplace=True)
        render_df = self.df.join(history_df, how="inner")

        if not os.path.exists(dir): os.makedirs(dir)
        render_df.to_pickle(f"{dir}/{self.name}_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.pkl")


class MultiDatasetTradingEnv(TradingEnv):
    def __init__(self, config: EnvContext):
        self.dataset_dir = config.get("dataset_dir")
        self.preprocess = config.get("preprocess", lambda df: df)
        self.episodes_between_dataset_switch = config.get("episodes_between_dataset_switch", 1)
        self.dataset_pathes = glob.glob(self.dataset_dir)
        self.dataset_nb_uses = np.zeros(shape=(len(self.dataset_pathes),))
        config["df"] = self.next_dataset()
        super().__init__(config)

    def next_dataset(self):
        self._episodes_on_this_dataset = 0
        # Find the indexes of the less explored dataset
        potential_dataset_pathes = np.where(self.dataset_nb_uses == self.dataset_nb_uses.min())[0]
        # Pick one of them
        random_int = np.random.randint(potential_dataset_pathes.size)
        dataset_path = self.dataset_pathes[random_int]
        self.dataset_nb_uses[random_int] += 1  # Update nb use counts

        self.name = Path(dataset_path).name
        return self.preprocess(pd.read_pickle(dataset_path))

    def reset(self, seed=None):
        self._episodes_on_this_dataset += 1
        if self._episodes_on_this_dataset % self.episodes_between_dataset_switch == 0:
            self._set_df(
                self.next_dataset()
            )
        if self.verbose > 1: print(f"Selected dataset {self.name} ...")
        return super().reset(seed)


In [None]:
from ray import tune


def env_creator(ctx):
    return MultiDatasetTradingEnv(
        dataset_dir="./data/train/month/**/*.pkl",
        preprocess=preprocess,
        reward_function=reward_only_position_changed,
        positions=[-10, 0, 10],
        trading_fees=0.0001 / 1000,
        borrow_interest_rate=0.000003,
        portfolio_initial_value=100,
        max_episode_duration="max",  # 24 * 60,
        verbose=1,
        windows=None,
        render_mode="logs",
        name="TRAIN",
    )


tune.register_env("MultiDatasetTradingEnv1", env_creator)

In [None]:
from ray.rllib.algorithms import DQNConfig

# Enviornment
warnings.filterwarnings("ignore", category=DeprecationWarning)

algo = DQNConfig().environment(
    env="MultiDatasetTradingEnv1",
    disable_env_checking=True,
    # env_config={
    #     "dataset_dir": "./data/train/month/**/*.pkl",
    #     "preprocess": preprocess,
    #     "reward_function": reward_only_position_changed,
    #     "positions": [-10, 0, 10],
    #     "trading_fees": 0.0001 / 1000,
    #     "borrow_interest_rate": 0.000003,
    #     "portfolio_initial_value": 100,
    #     "max_episode_duration": "max",  # 24 * 60,
    #     "verbose": 1,
    #     "windows": None,
    #     "render_mode": "logs",
    #     "name": "TRAIN",
    # },
).build()
# .rollouts(
#     num_rollout_workers=1,
#     remote_worker_envs=False,
# )
# .resources(num_gpus=0)



In [None]:
warnings.filterwarnings("ignore", category=UserWarning)
for i in range(10):
    result = algo.train()
    print(result)

In [None]:
ray.shutdown()