<a href="https://colab.research.google.com/github/RandomForestPanda/.github/blob/main/agent_playground.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install stable_baselines3 -q

In [3]:

# Not sure why this should be hidden ...
import sys as _pyjuter_sys
# ... nevertheless, consistency

class _pyjuter_ModuleShim:
    @classmethod
    def get (cls, name):
        if name not in _pyjuter_sys.modules:
            _pyjuter_sys.modules [name] = cls ()
        return _pyjuter_sys.modules [name]

    def populate (self, old_global_names, new_globals):
        for name in new_globals:
            if (
                (not name.startswith ("_pyjuter_")) and
                name not in old_global_names
            ):
                self.__setattr__ (name, new_globals [name])


In [None]:
_pyjuter_module = _pyjuter_ModuleShim.get ('preproc')
_pyjuter_old_global_names = set (globals ().keys ())

#!/bin/python3

"""
Module for common pre-processing tasks

These are convenience wrappers around functionality already
implemented in libraries like Pandas. The purpose of this
module is to ensure consistency and code re-use across the
repository.

In most cases, the only thing you'll want to use from this
module is `read`.
"""


_pyjuter_new_globals = globals ()
_pyjuter_module.populate (_pyjuter_old_global_names, _pyjuter_new_globals)


In [None]:
_pyjuter_module = _pyjuter_ModuleShim.get ('preproc')
_pyjuter_old_global_names = set (globals ().keys ())

from pandas import (
    DataFrame,
    read_csv,
)
from numpy import datetime64, arctan, pi
from datetime import timedelta, datetime


_pyjuter_new_globals = globals ()
_pyjuter_module.populate (_pyjuter_old_global_names, _pyjuter_new_globals)


In [None]:
_pyjuter_module = _pyjuter_ModuleShim.get ('preproc')
_pyjuter_old_global_names = set (globals ().keys ())

class MalformedDataError (Exception):
    pass


_pyjuter_new_globals = globals ()
_pyjuter_module.populate (_pyjuter_old_global_names, _pyjuter_new_globals)


In [None]:
_pyjuter_module = _pyjuter_ModuleShim.get ('preproc')
_pyjuter_old_global_names = set (globals ().keys ())

def verify (data: DataFrame):
    """
    Verify that `data` is of the correct form. That means
    * It has the correct columns
    * It uses timestamps as indices
    """
    # TODO: More informative error messages
    assert list (data.columns) == ["close", "high", "low", "open", "volume"]
    assert data.index.dtype.type == datetime64
    assert data.index [1] > data.index [0]

def scale (data: DataFrame | float, *, no_verify = False):
    """
    Scale OHCLV values to fit in the range [0, 1).
    This is achieved with the help of the inverse tangent function.

    In order to ensure a good gradient, inputs are divided by 1000
    before being passed to the expit. Without this division, values
    saturate to 1 very quickly.

    This function can be used on individual floating-point
    numbers as well as DataFrames.
    """
    if isinstance (data, DataFrame):
        if not no_verify: verify (data)

    return arctan (data / 1000) * (2 / pi)

def fill_missing (data: DataFrame, no_verify = False):
    """
    Fill missing values in the data.
    The original data is assumed to be at a frequency of
    1 minutes. Missing tuples are assumed to originate from periods
    in time when there was no change in stock price and no trading
    activity.
    """
    if not no_verify: verify (data)
    # TODO: Test
    close_filled = data ['close'].resample ("1Min").ffill ()
    return data.resample ("1Min").asfreq ().fillna ({
        "open": close_filled,
        "high": close_filled,
        "low": close_filled,
        "close": close_filled,
        "volume": 0
    })

def resample (data: DataFrame, interval: timedelta):
    """
    Resample OHCLV data to the desired frequency.
    """
    assert interval > timedelta (minutes = 1)
    return (data
        .resample (interval)
        .aggregate ({
            "close": "last",
            "high": "max",
            "low": "min",
            "open": "first",
            "volume": "sum",
        })
    )


_pyjuter_new_globals = globals ()
_pyjuter_module.populate (_pyjuter_old_global_names, _pyjuter_new_globals)


In [None]:
_pyjuter_module = _pyjuter_ModuleShim.get ('preproc')
_pyjuter_old_global_names = set (globals ().keys ())

def read (
    dataset: str, *,
    period: tuple[datetime, datetime] = None,
    passes = (fill_missing,)
):
    """
    Read a dataframe, ensure that it is of the right form, and
    apply each function in `passes` to it.
    `period`, if supplied, must be specified as `(start, end)`
    """
    # Read and index the DataFrame
    df = read_csv (dataset, parse_dates = ['datetime'])
    df.set_index ('datetime', inplace = True)
    df.sort_index (inplace = True)

    # Select the desired time frame
    if period is not None:
        df = df [
            (df.index >= period [0]) &
            (df.index <= period [1])
        ]

    # Apply standard passes
    df = df [['close', 'high', 'low', 'open', 'volume']]
    for func in passes:
        df = func (df)
    return df

_pyjuter_new_globals = globals ()
_pyjuter_module.populate (_pyjuter_old_global_names, _pyjuter_new_globals)


In [None]:
_pyjuter_module = _pyjuter_ModuleShim.get ('environment')
_pyjuter_old_global_names = set (globals ().keys ())

from gymnasium import Env
from gymnasium.spaces import Box
import numpy as np
from math import isclose
from datetime import (
    datetime,
    timedelta,
)
from pandas import DataFrame

from preproc import scale

class TradingEnvironment (Env):
    """
    Basic trading environment based on an OHLCV timeseries.
    """

    action_space = Box (-1.0, 1.0)
    """
    Action space: positive is sell, negative is buy.
    Absolute value determines fraction of capital
    to spend or fraction of shares to sell.
    """

    observation_space = Box (0, np.inf, shape = (5,))
    """
    Observation space: open, high, low, close and volume.
    Each is a positive real number.
    """

    def __init__ (self, initial_capital: float, data: DataFrame):
        # `data` is a pandas.DataFrame, and is expected to hold
        # OHLCV parameters and timestamps for a single stock.
        self.data = data
        assert self.data.index.dtype.type == np.datetime64

        # Initial capital at the trader's disposal
        self.initial_capital = initial_capital

        # State of the trader. Will be set by `reset ()`
        self.data_pos = None     # Current position in the timeseries
        self.shares_held = None
        self.capital = None
        self.last_trade = {
            # Time and type of last executed trade
            "datetime": None,
            "purchase?": None,
        }

    def get_observation (self):
        """
        Return one row of OHCLV data
        """
        return self.data.iloc [self.data_pos]

    def get_shareprice (self):
        """
        Return the current share price.
        This is kept independent of `get_observation` so that
        the latter can be overloaded without any complications.
        """
        return self.data.iloc [self.data_pos]['close']

    def get_information (self):
        """
        Return extra information associated with the current observation.
        This information is not used in the agent's decision-making process.
        """
        return {
            'datetime': self.data.index [self.data_pos],
            'nshares': self.shares_held,
            'capital': self.capital,
        }

    def reset_state (self):
        self.data_pos = 0
        self.shares_held = 0
        self.capital = self.initial_capital
        self.last_trade ["datetime"] = self.data.index [0]
        self.last_trade ["purchase?"] = False

    def reset (self, seed = None):
        super().reset(seed=seed)
        self.reset_state ()
        observation = self.get_observation ()
        information = self.get_information ()
        return observation, information

    def step (self, action):
        """
        Step the agent forward in time by executing `action`.
        If `action` is a positive number, its magnitude is taken as
        the fraction of shares currently held to sell. If negative,
        its magnitude is taken as a fraction of capital to spend
        on shares.

        A penalty is awarded for a sell that follows a buy in quick
        succession.
        """
        assert self.shares_held >= 0
        assert self.capital >= 0

        action = action [0]

        # Share price is approximated as the last seen closing price
        share_price = self.get_shareprice ()

        # NOTE: Ensure that these two are set in the following block
        capital_diff, shares_diff = None, None
        if action > 0:
            # Sell `action` * shares currently held
            shares_to_sell = int(self.shares_held * action)
            capital_diff = shares_to_sell * share_price
            shares_diff = -shares_to_sell
        else:
            # Buy shares, spending `action` * current capital
            target_capital_loss = action * self.capital
            shares_diff = int (-target_capital_loss / share_price)
            capital_diff = - shares_diff * share_price
        assert (capital_diff is not None) and (shares_diff is not None)
        assert isclose (capital_diff, - shares_diff * share_price)

        # Calculate reward
        short_span = timedelta (hours = 2)  # TODO: Parametrise
        reward = capital_diff
        dt = self.data.index [self.data_pos]
        if shares_diff != 0:
            if shares_diff < 0:
                # This is a sell
                gap = dt - self.last_trade ["datetime"]
                if self.last_trade ["purchase?"] and gap < short_span:
                    # This sell comes too soon after a buy
                    reward = -reward
                self.last_trade ["purchase?"] = False
            elif shares_diff > 0:
                # This is a buy
                self.last_trade ["purchase?"] = True
            self.last_trade ["datetime"] = dt

        # Update trader's state
        self.capital += capital_diff
        self.shares_held += shares_diff

        # Step forward in time
        self.data_pos += 1
        return (
            self.get_observation (),              # Observation
            reward,                               # Reward
            self.data_pos >= len (self.data) - 1, # Terminated?
            False,                                # Truncated?
            self.get_information (),              # Information
        )


_pyjuter_new_globals = globals ()
_pyjuter_module.populate (_pyjuter_old_global_names, _pyjuter_new_globals)


In [None]:
_pyjuter_module = _pyjuter_ModuleShim.get ('environment')
_pyjuter_old_global_names = set (globals ().keys ())

class TEWrapper (Env):
    pass

class TEWrapper (Env):
    """
    Base class for building wrappers around other trading environments.

    A wrapper class can be written to wrap an environment (the
    "inner environment" henceforth) in order to
    make slight modifications. These modifications are typically
    extensions of the base class' behaviour that are not suitable
    for adding to the base class directly. Experimental features
    and features not relevant to all users of the base class can
    be implemented via this mechanism.

    Wrappers using this class are meant specifically to wrap
    `TradingEnvironment` objects. Alternatively, they can also
    wrap other wrappers, which in turn wrap `TradingEnvironment`.

    Note that no method in the wrapper class should have to
    access the inner trading environment directly.
    If access to any of the inner's members is necessary,
    this base wrapper class must provide a means to do so
    without accessing `self.inner`.
    """

    inner: TradingEnvironment | TEWrapper
    """
    The environment that is being wrapped. Wrappers will typically
    intercept and modify data going to/from the inner environment.
    """

    def __init__ (self, inner: TradingEnvironment | TEWrapper):
        """
        Subclasses that override `__init__` **should** call this constructor.

        `inner`: the environment to wrap
        """
        self.inner = inner
        self.action_space = self.get_action_space (self.inner.action_space)
        self.observation_space = self.get_observation_space (self.inner.observation_space)

    def get_obsevation_space (self, inner_space: Box) -> Box:
        """Overload this to modify the observation space"""
        return inner_space

    def get_action_space (self, inner_space: Box) -> Box:
        """Overload this to modify the action space"""
        return inner_space

    def reset (self, seed = None):
        """Forwarded to the inner by default"""
        return self.inner.reset (seed)

    def step (self, action):
        """Forwarded to the inner by default"""
        return self.inner.step (action)


_pyjuter_new_globals = globals ()
_pyjuter_module.populate (_pyjuter_old_global_names, _pyjuter_new_globals)


In [None]:
_pyjuter_module = _pyjuter_ModuleShim.get ('environment')
_pyjuter_old_global_names = set (globals ().keys ())

class SelfAwareTE (TEWrapper):
    """
    Wrapper that includes no. of shares held and current capital
    in the input.
    """
    def get_observation_space (self, inner_space):
        """Inner observations + (current capital, shares held)"""
        low = list (inner_space.low.copy ())
        high = list (inner_space.high.copy ())
        low.extend ((0, 0))
        high.extend ((np.inf, np.inf))
        return Box (np.array (low), np.array (high))

    @staticmethod
    def wrap_observation (obs, info):
        """
        Add no. of shares held and capital to the observation.
        Called internally by `step` and `reset`.
        """
        obs = obs.copy ()
        obs ['nshares'] = info ['nshares']
        obs ['capital'] = info ['capital']
        return obs

    def reset (self, seed = None):
        obs, info = super ().reset (seed)
        return self.wrap_observation (obs, info), info

    def step (self, action):
        obs, rew, term, trunc, info = super ().step (action)
        return self.wrap_observation(obs, info), rew, term, trunc, info

class MovingAvgCenteredTE (TEWrapper):
    """
    Wrapper around a trading environment that centers input
    data around its moving average.
    """

    data_queue: np.ndarray
    """The queue used to calculate the moving average"""
    data_queue_pos: int

    def __init__ (self, *pargs, window_size: int, **kargs):
        super ().__init__ (*pargs, **kargs)
        self.window_size = window_size

    def get_observation_space (self, inner_space):
        """Change the bounds to [-inf, +inf]; retain the shape"""
        low = inner_space.low - np.inf
        high = inner_space.high + np.inf
        return Box (low, high)

    def reset (self, seed = None):
        obs, info = super ().reset (seed)
        self.data_queue = np.reshape (
            np.tile (obs, self.window_size),
            (self.window_size, *obs.shape)
        )
        self.data_queue_pos = 0
        return obs, info

    def step (self, action):
        """Center the observation around moving average"""
        obs, reward, term, trunc, info = super ().step (action)
        self.data_queue [self.data_queue_pos] = obs
        self.data_queue_pos += 1
        self.data_queue_pos %= self.window_size
        mv_avg = sum (self.data_queue) / self.window_size
        return obs - mv_avg, reward, term, trunc, info

class FrameStackingTE (TEWrapper):
    """
    Wrapper that implements frame-stacking.

    Frame-stacking causes parameters from the last n
    timesteps to be returned as a single observation.
    """
    def __init__ (self, *pargs, n_frames: int, **kargs):
        """
        `n_frames`: the number of observation frames to stack.
        """
        self.n_frames = n_frames
        super ().__init__ (*pargs, **kargs)

    def get_observation_space (self, inner_space):
        """
        That of inner, repeated `n_frames` times
        """
        low = np.array ([inner_space.low] * self.n_frames)
        high = np.array ([inner_space.high] * self.n_frames)
        return Box (low, high)

    def reset (self, seed = None):
        obs, info = super ().reset (seed)
        self.obs_framestack = [obs] * self.n_frames
        return self.obs_framestack, info

    def step (self, action):
        obs, rew, term, trunc, info = super ().step (action)
        self.obs_framestack.pop (0)
        self.obs_framestack.append (obs)
        return self.obs_framestack, rew, term, trunc, info

class FreqConstrainedTE (TEWrapper):
    """
    Wrapper that asks the agent for a recommendation at select
    times, rather than for every observation generated.

    This can be used to cause the agent to trade at a lower
    frequency than the data it is observing. It can also be
    used to impose operational constraints like trading hours.
    """
    pass

_pyjuter_new_globals = globals ()
_pyjuter_module.populate (_pyjuter_old_global_names, _pyjuter_new_globals)


In [None]:
#!/bin/env python3

"""
Very early prototype of an RL agent.

In train mode, a dataset is loaded, the agent is trained
on it, and the resulting model is saved to disk.

In test mode, the model is loaded from disk, agent
is initialised from it, and a backtest is run on
the same dataset. The results of this test are dumped
to `testlog.txt`.

Expect this program to fail, change behaviour frequently,
and even be removed some time in the future. It is meant
only as a prototype, and is currently checked into Git
only for knowledge sharing purposes.
"""

from stable_baselines3 import PPO
from datetime import datetime, timedelta



In [None]:
# Call this function for train mode
def train ():
    model = PPO ("MlpPolicy", env, verbose = 1)
    model.learn (total_timesteps = int (1e6))
    model.save ("ppo_bhel")



In [None]:
# Call this function for test mode
def test ():
    model = PPO.load ("ppo_bhel")
    testlog = open ("testlog.csv", "w")
    testlog.write ("datetime,action,reward,capital,nshares,shareprice\n")
    obs, info = env.reset ()
    while True:
        action, _states = model.predict (obs)
        obs, reward, term, trunc, info = env.step (action)
        if term:
            break
        yield obs, reward, info



In [None]:
from environment import (
    TradingEnvironment,
    SelfAwareTE,
    MovingAvgCenteredTE,
    FrameStackingTE,
)
from preproc import read, resample
# Load datasets and set up the environment
period = (
    datetime (year = 2023, month = 1, day = 1),
    datetime (year = 2023, month = 7, day = 1)
)
small_df = read ("BHEL.csv", period = period)
small_df = resample (small_df, interval = timedelta (hours = 4))

env = TradingEnvironment (initial_capital = 10_000, data = small_df)
env = MovingAvgCenteredTE (env, window_size = 30)
env = SelfAwareTE (env)
env = FrameStackingTE (env, n_frames = 30)



  gym.logger.warn(
  gym.logger.warn(


In [None]:
# Train the model, saving to 'ppo_bhel.zip'
train ()



[1;30;43mStreaming output truncated to the last 5000 lines.[0m
|    loss                 | 4.29        |
|    n_updates            | 2610        |
|    policy_gradient_loss | -0.0144     |
|    std                  | 0.0664      |
|    value_loss           | 212         |
-----------------------------------------
----------------------------------------
| rollout/                |            |
|    ep_len_mean          | 1.08e+03   |
|    ep_rew_mean          | 0          |
| time/                   |            |
|    fps                  | 254        |
|    iterations           | 263        |
|    time_elapsed         | 2116       |
|    total_timesteps      | 538624     |
| train/                  |            |
|    approx_kl            | 0.04528948 |
|    clip_fraction        | 0.301      |
|    clip_range           | 0.2        |
|    entropy_loss         | 1.29       |
|    explained_variance   | 0.0914     |
|    learning_rate        | 0.0003     |
|    loss                 |

In [None]:
# Test the model, loading it from 'ppo_bhel.zip'
test ()
