In [24]:
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sb

import torch as th
import torch.nn as nn
import torch.nn.functional as F

from tqdm import tqdm

from typing import Callable, Optional, Union, Tuple, List, Type, Dict, Any

# Utils

## European option's price

$$ C(S,t) = S\Phi(d_1)-Ke^{-\gamma T}\Phi(d_2) \\
d_1={\ln\left({S_t\over K}\right)+\left(\gamma+{\sigma^2\over2}\right)T\over \sigma\sqrt{T}} \\
~ \\
\therefore \Delta = {\partial C\over \partial S} = \Phi(d_1)
$$

In [11]:
def european_option_delta(log_moneyness, expiry, volatility):
    """
    log_moneyness : log ratio of money over execution price
    """
    s, t, v = map(th.as_tensor, (log_moneyness, expiry, volatility))
    normal = th.distributions.Normal(loc=th.tensor(0.), scale=th.tensor(1.))
    delta = normal.cdf((s+(v**2/2)*t) / (v*th.sqrt(t)))
    return delta.item()

## European option's payoff

In [41]:
def european_option_payoff(prices: np.ndarray, strike=1.0) -> np.ndarray:
    return np.clip(prices[-1] - strike, 0, np.inf)

def lookback_option_payoff(prices: np.ndarray, strike=1.03) -> np.ndarray:
    return np.clip(np.max(prices, axis=0) - strike, 0, np.inf)


## Mean Clamp

In [13]:
def clamp(x, lb, ub) -> th.Tensor:
    if not isinstance(x, th.Tensor):
        x = th.tensor(x, dtype=th.float32)
    if not isinstance(ub, th.Tensor):
        ub = th.tensor(ub, dtype=th.float32)
    if not isinstance(lb, th.Tensor):
        lb = th.tensor(lb, dtype=th.float32)

    x = th.min(th.max(x, lb), ub)
    x = th.where(lb < ub, x, (lb+ub)/2)
    return x

## Entropic Loss

$$ L = -{1\over \lambda}\log E\left[e^{-\gamma X}\right] $$

여기서 $X$ : Profit&Loss

In [14]:
def pnl_entropic_loss(pnl, aversion=1.0) -> th.Tensor:
    return -th.mean(-th.exp(-aversion*pnl), dim=-1)

def pnl_entropic_premium(pnl, aversion=1.0) -> th.Tensor:
    return -th.log(pnl_entropic_loss(pnl, aversion=aversion))/aversion.view(-1)


## Geometric Brownian Motion
$$ dS = \mu S dt + \sigma S dz \\ ~ \\
S = S_0 \exp\left[{\left(\mu-{\sigma^2\over 2}\right)t+\sigma\sqrt{t}Z_t}\right]
$$

In [12]:
class GBM:
    def __init__(self, n_assets, dt=1/365, drift=0.0, volatility=0.2, initial_price=1.0):
        self.n_assets = n_assets,
        self.dt = dt
        self.drift = drift
        self.volatility = volatility
        self.initial_price = initial_price

        self.now = 0
        self.random_processes = np.zeros(self.n_assets)

    def get_seed(self):
        return np.random.get_state()

    def move_price(self) -> np.ndarray:
        """
        generate gbm moves result for self.now and self.now ++
        """
        normal_rand = np.random.standard_normal(size=self.n_assets)
        if self.now == 0:
            normal_rand[:] = 0.0

        self.random_processes += normal_rand
        noise_term = self.volatility * (self.dt ** 0.5) * self.random_processes
        t = self.now * self.dt
        prices = np.exp((self.drift - 0.5*self.volatility**2)*t + noise_term)

        self.now += 1

        return self.initial_price * prices

# Env

In [45]:
import gym
import gym.spaces as spaces

from stable_baselines3.common.type_aliases import GymObs, GymStepReturn

class BSMarket(gym.Env):
    def __init__(self, 
                 n_assets: int, 
                 cost: float,
                 payoff: str="european",
                 payoff_kwargs: Dict[str, Any] = None,
                 maturity: int=30,
                 dt: float=1/365,
                 hedge_initialize="zero",
                 gen_name: str="gbm",
                 gen_kwargs: Dict[str, Any] = None):
        
        super(BSMarket, self).__init__()
        self.n_assets = n_assets
        self.transaction_cost = cost
        self.payoff_kwargs = payoff_kwargs
        self.payoff = self.get_payoff_fn(payoff)
        self.maturity = maturity
        self.dt = dt
        self.price_generator = self.get_price_generator(gen_name)
        self.gen_kwargs = gen_kwargs
        
        self.now = 0
        self.prices: List[np.ndarray] = []
        
        self.hedge: np.ndarray = np.empty(n_assets)
        
        self.reset(initialize=hedge_initialize)
        
        # self.observation_space = spaces.Box(shape=(n_assets, 3))
        self.observation_space = spaces.Dict({'price': spaces.Box(shape=(n_assets,)),
                                              'time_expiry': spaces.Box(shape=(n_assets, )),
                                              'volatility': spaces.Box(shape=(n_assets, )),
                                              'prev_positions': spaces.Box(shape=(n_assets, ))})
        
        self.action_space = spaces.Box(shape=(n_assets, ))
    
    def seed(self, seed=None):
        np.random.seed(seed)
        th.manual_seed(seed)
    
    def reset(self, initialize="zero") -> GymObs:
        self.now = 0
        self.prices = [np.full(self.n_assets, self.gen_kwargs.get('init_price', 1.0))]
        
        if initialize == "std":
            self.hedge = np.random.standard_normal(self.n_assets)
        elif initialize == "zero":
            self.hedge = np.zeros(self.n_assets)
    
    def get_obs(self) -> GymObs:
        price = self.prices[self.now]
        time_expiry = np.full_like(price, self.maturity - self.now)
        volatility = np.full_like(price, self.gen_kwargs['volatility'])
        prev_hedge = self.hedge.copy()
        
        obs_dict = {'price': price,
                    'time_expiry': time_expiry,
                    'volatility': volatility,
                    'prev_hedge': prev_hedge}
        
        return obs_dict        
    
    def step(self, action: np.ndarray) -> GymStepReturn:
        """
        action: hedge(t+1)
        """
        net_pnl, done, info =0, False, {}

        if self.now < self.maturity-1:
            obs = self.get_obs()
            next_price = self.price_generator.move_price()

            gain = action * (next_price - obs['price'])
            transaction_cost = self.transaction_cost * np.abs(action - self.hedge) * obs['price']
            net_pnl = gain - transaction_cost

            self.prices.append(next_price)
            self.hedge = action
            self.now += 1

        elif self.now == self.maturity - 1:
            net_pnl = -self.payoff(self.prices, **self.payoff_kwargs)
            self.now += 1

            done = True
            info['msg'] = "MAX_STEP"

        return self.get_obs(), net_pnl, done, info
        
    def get_payoff_fn(self, payoff_name):
        strike = self.payoff_kwargs.get('strike', 1.0)
        if payoff_name == "european":
            return european_option_payoff

        elif payoff_name == "lookback":
            return lookback_option_payoff

        else:
            raise ValueError(f"payoff name not found: {payoff_name}")
        
    def get_price_generator(self, gen_name):
        if gen_name == "gbm":
            self.price_generator = GBM(n_assets=self.n_assets,
                                       dt=self.dt,
                                       **self.gen_kwargs)
        else:
            raise ValueError(f"price generator name not found: {gen_name}")

In [43]:
from stable_baselines3.common.policies import BaseFeaturesExtractor, create_mlp

class NoTransactionBandNet(BaseFeaturesExtractor):
    def __init__(self,
                 observation_space: gym.Space,
                 in_features:int,
                 net_arch: List[int]=None,
                 activation_fn: Type[nn.Module] = nn.ReLU):
        super(NoTransactionBandNet, self).__init__()

        if net_arch is None:
            net_arch = [32, 32, 32, 32]

        self.mlp_layer = create_mlp(in_features, 2, net_arch, activation_fn=activation_fn)

    def forward(self, observation: th.Tensor):
        log_moneyness = observation['price'].log()
        time_expiry = observation['time_expiry']
        volatility = observation['volatility']
        prev_positions = observation['prev_positions']

        no_cost_delta = european_option_delta(log_moneyness, time_expiry, volatility)

        x = th.transpose(th.vstack([log_moneyness, time_expiry, volatility]), 0, 1)

        band_width = self.mlp_layer(x)
        lb_delta = no_cost_delta - F.leaky_relu(band_width[:, 0])
        ub_delta = no_cost_delta + F.leaky_relu(band_width[:, 1])

        hedge = clamp(prev_positions, lb_delta, ub_delta)

        return hedge

In [44]:
class FFNet(BaseFeaturesExtractor):
    def __init__(self,
                 observation_space: gym.Space,
                 in_features: int,
                 net_arch: List[int]=None,
                 activation_fn: Type[nn.Module] = nn.ReLU):
        super(FFNet, self).__init__()

        if net_arch is None:
            net_arch = [32, 32, 32, 32]

        self.mlp_layer = create_mlp(in_features + 1, 1, net_arch, activation_fn=activation_fn)

    def forward(self, observation: th.Tensor):
        log_moneyness = observation['price'].log()
        time_expiry = observation['time_expiry']
        volatility = observation['volatility']
        prev_positions = observation['prev_positions']

        no_cost_delta = european_option_delta(log_moneyness, time_expiry, volatility)

        x = th.transpose(th.vstack([log_moneyness, time_expiry, volatility, prev_positions]), 0, 1)
        x = self.mlp_layer(x).reshape(-1)
        x = th.tanh(x)

        hedge = no_cost_delta + x

        return hedge

## Env Test

In [40]:
from stable_baselines3.common.env_checker import check_env

env_config_1 = {
    'n_assets' : 5,
    'cost': 1e-3,
    'payoff': 'european',
    'payoff_kwargs': {
        'strike': 1.0
    },
    'maturity': 30,
    'dt': 1/365,
    'hedge_initialize': 'zero',
    'gen_name': 'gbm',
    'gen_kwargs': {
        'drift': 0.0,
        'volatility': 0.2,
        'initial_price': 1.0
    }
}

env = BSMarket(**env_config_1)
check_env(env)

AttributeError: 'BSMarket' object has no attribute 'prices'