In [73]:
import gym
import numpy as np
from gym import spaces
import pandas as pd
import math


def encode_wealth(wealth, wealth_bins):
    return(pd.cut(x=[wealth], bins=wealth_bins, right=False, retbins=True)[0][0])
    
#def decode_wealth(discr_wealth, wealth_bins):
#    return [wealth_bins[discr_wealth], wealth_bins[discr_wealth + 1]]
    
def encode_action(action, actions):
    return(int(np.where(action == actions)[0][0]))

def decode_action(action, actions):
    return(actions[action])
    

class BSEnv(gym.Env):
    '''Custom discrete-time Black-Scholes environment with one risky-asset and bank account'''
    metadata = {'render.modes': ['human']}
    
    def __init__(self, mu, sigma, r, T, dt, V_0, actions, wealth_bins, U_2 = math.log):
        '''
        Args:
            :params mu (float):         expected risky asset return
            :params sigma (float):      risky asset standard deviation
            :params r (float):          risk-less rate of return
            :params T (float):          investment horizon
            :params dt (float):         time-step size
            :params V_0 (float, tuple): initial wealth, if tuple (v_d, v_u) draws initial wealth V(0) uniformly from [v_d, v_u]
            :params actions (np.array): possible investment fractions into risky asset
            :params wealth_bins (np.array): contains the limits of each wealth bin in ascending order
            :params U_2 (callable):     utility function for terminal wealth
        '''
        
        assert divmod(T, dt)[1] == 0        # To-Do: change to ValueError, is T 'ganzzahlig' divisible
        super().__init__()
        self.mu    = mu                        # risky asset return
        self.sigma = sigma                     # risky asset volatility
        self.r = r                             # risk-free rate (bank account return, riskless)
        self.T = T                             # Termination time
        self.dt = dt                           # time-step size
        self.num_timesteps = T//dt
        self.V_0 = V_0                         # Initial wealth
        self.actions = actions                 # possible actions, fraction of wealth invested in risky aset
        self.num_actions = len(self.actions)   # number of possible actions
        self.wealth_bins = wealth_bins
        self.U_2 = U_2
        
        self.reset()
        
        # Action space
        self.action_space = spaces.Discrete(self.num_actions)
        
        # Observation space
        self.observation_space = spaces.Tuple((
            spaces.Discrete(self.num_timesteps),
            spaces.Discrete(len(self.wealth_bins))))
        
    
    def step(self, action):
        '''Execute one time step within the environment'''
        assert self.action_space.contains(action)
        
        # Decode the discrete action to a float
        pi_t = decode_action(action, self.actions)
        
        # Update Wealth (see wealth dynamicy, Inv. Strategies script (by Prof. Zagst) Theorem 2.18):
        # 1) Sample BM increment for one step
        dW_t = np.random.normal(loc=0, scale=math.sqrt(self.dt))
        # 2) Wealth process update via simulation of the exponent
        self.V_t *= np.exp( (self.r + pi_t*(self.mu - self.r) - 0.5*(pi_t**2)*(self.sigma**2)) * self.dt + pi_t*self.sigma*dW_t ) 
        
        
        # Old
        #self.V_t *= pi_t * (np.random.normal(loc=self.dt * self.mu, scale=math.sqrt(self.dt) * self.sigma) - self.dt*self.r) + (1 + self.dt*self.r)  # Update Wealth (see notes)
        #self.V_t *= pi_t * self.dt * (self.mu  - self.r) + (1 + self.dt * self.r)    # stock pays deterministic higher return
        
        
        self.time_state += 1      # updating time-step
        #self.wealth_state = encode_wealth(self.V_t, self.wealth_bins)
        
        done = self.time_state == self.num_timesteps           # Episode is finished if termination time is reached
        
        reward = 0                                        # Reward is zero for each time step t<T
        if done:                                          # Reward at termination time R_T = U(V_T)
            reward = self.U_2(self.V_t)
            
        return self._get_obs(), reward, done, {}          # {} empty info
    
    def _get_obs(self):
        print(encode_wealth(self.V_t, self.wealth_bins))
        return (self.time_state, encode_wealth(self.V_t, self.wealth_bins))
            
    def reset(self):
        '''Reset the state of the environment to an initial state'''
        self.time_state   = 0                                          # setting time to zero
        self.V_t          = self.V_0                                   # setting wealth to V_0
        #self.wealth_state = encode_wealth(self.V_t, self.wealth_bins)  # encoding wealth V_0 
        return self._get_obs()
    
    
    #def render(self, mode='human', close=False):
    # Render the environment to the screen
    #...

In [74]:
#actions = np.arange(0, 1, step=0.1)   #vector of actions
actions = np.array([0, 0.1, 1])
lower = 90
upper = 110
delta_bin = 5
wealth_bins = [0] + np.arange(lower, upper+1, delta_bin).tolist() + [float('Inf')]  # +1 as upper limit is not included

In [75]:
model = BSEnv(mu=0.06, sigma=0.2, r=0.02, T=2, dt=0.5, V_0=100, actions=actions, wealth_bins=wealth_bins)

[100.0, 105.0)


In [77]:
print(model.action_space)
print(model.observation_space)
print(model._get_obs(), model.V_t)
new_state, reward, done, info = model.step(1)
print(new_state, reward, done, next_Wealth, model.V_t)
new_state, reward, done, info = model.step(1)
print(new_state, reward, done, next_Wealth, model.V_t)
new_state, reward, done, info = model.step(1)
print(new_state, reward, done, next_Wealth, model.V_t)
new_state, reward, done, info = model.step(1)
print(new_state, reward, done, next_Wealth, model.V_t)

model._get_obs()

Discrete(3)
Tuple(Discrete(4), Discrete(7))
[100.0, 105.0)
(1, Interval(100.0, 105.0, closed='left')) 103.06997853444643
[105.0, 110.0)
(2, Interval(105.0, 110.0, closed='left')) 0 False {} 105.40387315600938
[105.0, 110.0)
(3, Interval(105.0, 110.0, closed='left')) 0 False {} 107.89562333970801
[110.0, inf)
(4, Interval(110.0, inf, closed='left')) 4.70527158194037 True {} 110.52829836147174
[110.0, inf)
(5, Interval(110.0, inf, closed='left')) 0 False {} 110.77357777210784
[110.0, inf)


(5, Interval(110.0, inf, closed='left'))