<h1> Custom Environment </h1>
<h4> -An attempt at creating it from scratch </h4>
<i> "You either die a hero, or live long enough to see yourself become a villain."</i>

class
<br>
initializer
<br>
step
<br>
reset
<br>
render
<br>
close
<br>
<br>
Validate thorugh check_env()

https://colab.research.google.com/github/araffin/rl-tutorial-jnrr19/blob/master/5_custom_gym_env.ipynb#scrollTo=9DOpP_B0-LXm

In [1]:
import numpy as np
import pandas as pd

import gymnasium as gym
from gymnasium import spaces

from stable_baselines3.common.env_checker import check_env

from stable_baselines3 import SAC
from stable_baselines3 import PPO

from stable_baselines3.common.evaluation import evaluate_policy

from RewardFunctions import (
    sharpe_ratio,
    sortino_ratio,
    calculate_drawdown,
    sterling_ratio,
    return_ratio,
    penalise_reward
)

In [2]:
history_usage = 30
rolling_reward_window = 10

return_data = pd.read_csv("../../Data/StockReturns.csv")

esg_data = np.array([36.6, 35.3, 17.9, 18, 
                    18, 21.2, 18.7, 29.2, 
                    15.7, 25.6, 25.6, 18.4, 
                    19.8, 13.8, 18.1, 19, 
                    17.2, 14, 17.2, 19.5, 
                    19.7, 21.2, 26.8, 19.3])

objective = "Sterling"

esg_compliancy = True

In [3]:
split_size = 0.8

train_data = return_data.iloc[:int(split_size*len(return_data))]
test_data = return_data.iloc[int(split_size*len(return_data)):].reset_index(drop=True)

In [4]:
class PortfolioEnvironment(gym.Env):
    """
    doc string
    """
    def __init__(self,
                 history_usage, rolling_reward_window,
                 return_data, esg_data,
                 objective, esg_compliancy):
        super().__init__()
        """
        doc  string,

        Good, initialize all variables with values 
        """
        self.return_data = return_data.values
        self.esg_data: np.array = esg_data
        self.history_usage: int = history_usage
        self.rolling_reward_window: int = rolling_reward_window
        self.n_stocks = len(esg_data)

        self.objective: str = objective
        self.esg_compliancy: bool = esg_compliancy

        self.action_space = spaces.Box(low=-1, 
                                       high=1, 
                                       shape=(self.n_stocks,),)
        self.observation_space = spaces.Box(low=-np.inf, 
                                            high=np.inf, 
                                            shape=(self.n_stocks * self.history_usage,))

        self.current_step: int = 0
        self.weights_list: list = []
        self.returns_list: list = []
        


    def reset(self, seed=42):
        """
        doc string

        Good, changing all non-fixed variables inside the environment
        """
        super().reset(seed=None)

        self.current_step = 0
        self.weights = []
        self.portfolio_returns = []

        observation = self.get_observation()
        additional_info = {
            "time_step": self.current_step,
            "cumulative_geo_return": np.cumprod(self.portfolio_returns)
        }

        return observation, additional_info



    def get_observation(self):
        """
        doc string
        """
        start_idx = max(0, self.current_step -self.history_usage)
        end_idx = self.current_step
        observation_space  = self.return_data[start_idx:end_idx].T
        if observation_space.shape[1] < self.history_usage:
            padding = np.zeros((self.n_stocks, self.history_usage - observation_space.shape[1]))
            observation_space = np.hstack([padding, observation_space])
        
        return observation_space.flatten().astype(np.float32)


    def step(self, action):
        """
        doc string
        """
        # Generate weights based on actions
        # Forces action from in range (-1,1) to become (0,1)
        current_weights = (action + 1) / 2                          
        current_weights = (current_weights+1e-8) / (np.sum(current_weights)+1e-8)
        self.weights_list.append(current_weights)
        
        # Find current weights and multiply with weights
        # Variables for (early) stopping
        terminated = self.current_step >= len(self.return_data)-1
        truncated = False

        # Add return if possible, (edge case if-statement)
        if not terminated:
            current_returns = self.return_data[self.current_step +1]
            portfolio_return = 0.0
            if self.current_step +1 < len(self.return_data):
                portfolio_return = np.dot(current_weights, current_returns)
            self.returns_list.append(portfolio_return)
        else:
            portfolio_return = 0.0
            self.returns_list.append(portfolio_return)

        #Calculate ESG score for portfolio
        esg_score = np.dot(current_weights, self.esg_data)

        # Define rolling window for reward
        if len(self.returns_list) < self.rolling_reward_window:
            current_reward = np.array(self.returns_list)
        else:
            current_reward = np.array(self.returns_list[-self.rolling_reward_window:])

        # Calcualte reward based on objective
        if self.objective == "Return":
            new_reward = return_ratio(current_reward)
        elif self.objective == "Sharpe":
            new_reward = sharpe_ratio(current_reward)
        elif self.objective == "Sortino":
            new_reward = sortino_ratio(current_reward)
        else:
            new_reward = sterling_ratio(current_reward)
        
        # Add ESG penalty
        if esg_compliancy == True:
            new_reward = penalise_reward(new_reward, esg_score)
    
        # New step
        self.current_step += 1
            
        # Returns the next observation space for the algo to use
        next_window = self.get_observation()

        return next_window, new_reward, terminated, truncated, {}
        


    def render(self, mode="human"):
        """ 
        doc string
        """
        print(f"Current step: {self.current_step}, and geometric return: {np.cumprod(self.portfolio_returns)}")
        pass

In [5]:
train_env = PortfolioEnvironment(history_usage=history_usage, 
                                 rolling_reward_window=rolling_reward_window,
                                 return_data=train_data,
                                 esg_data=esg_data,
                                 objective=objective,
                                 esg_compliancy=esg_compliancy)
check_env(train_env, warn=True)

In [6]:
model = SAC(policy="MlpPolicy",
            env=train_env,
            buffer_size=100_000,
            verbose=1
            ).learn(500)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




In [7]:
test_env = PortfolioEnvironment(history_usage=history_usage, 
                                rolling_reward_window=rolling_reward_window,
                              return_data=test_data,
                              esg_data=esg_data,
                              objective=objective,
                              esg_compliancy=esg_compliancy)

In [8]:
np.array([np.repeat([3],4)]).shape

(1, 4)

In [9]:
obs, info = test_env.reset()
weights_history = []
finished = False

while not finished: 
    action, _ = model.predict(obs, deterministic=True)

    weights = (action+1) / 2
    weights /= np.sum(weights)

    obs, reward, terminated, truncated, info = test_env.step(action)
    finished = terminated or truncated

    weights_history.append(action)
    
df = pd.DataFrame(weights_history)

In [10]:
# mean_reward, std_reward = evaluate_policy(model, train_env, n_eval_episodes=10)
# print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")

In [11]:
# mean_reward, std_reward = evaluate_policy(model, test_env, n_eval_episodes=10)
# print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")