In [2]:
import gym
import numpy as np
from collections import namedtuple


In [3]:
Node = namedtuple('Node', ['t', 'm'])  # Store the node in binomial tree  [at time t move up m times]

In [1]:
class BinomialTree:
    def __init__(self, s0 = 100, K = 100, r = 0.02, sigma = 0.1, maturity = 1, n = 10, D = 1):
        self.s0 = s0
        self.K = K  # ATM strike price
        self.r = r  # riskless interest rate
        self.sigma = sigma  # stocks vol
        self.T = maturity  # option's maturity
        self.n = n  # total steps of Tree
        self.delta_t = self.T / self.n


        ## Calculate key elements in Tree generalization
        self.u = np.exp(self.sigma * np.sqrt(self.delta_t))  # Move up steps
        self.d = 1 / self.u
        self.R = np.exp(self.r * self.delta_t)  # Discount ratio of each step
        self.p = (self.R - self.d) / (self.u - self.d)  # Prob of moving up

        ## Self state
        self.node = None  # current state: Node(t, m)
        self.hedging_pos = 0  # Started hedging position
        self.D = D  # Holding position of derivatives

        ## Env state
        self.grid = None  # Store the tree's Node:  {time t: [Node(t, m1), Node(t, m2) ...]}
        self.observation_space = None  # Dict: {Node(t, m): (stock_price, option_price, {'exercise': bool})}
        self.action_space = np.array([0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0])
        self.state_dim = 1
        self.action_dim = len(self.action_space)


    def get_price(self, node):  # Return the stock price at Node(i,j)
        return self.s0 * np.power(self.u, node.m)

    def _generate(self):
        grid = {}
        for t in range(self.n + 1):
            grid[t] = []
            for i in range(-t, t+1, 2):
                grid[t].append(Node(t, i))  # Node at time t
        self.grid = grid

        # return grid

    def _back_propagation(self):
        assert self.grid, 'Grid must be generated first!'
        obs_space = {}  # Restore data at each node {Node : (stock price, option value)

        for t in range(self.n, 0 - 1, -1):

            if t == self.n:  # Value at maturity
                for node in self.grid[t]:
                    stock_p = self.get_price(node)
                    opt_v = max(self.K - stock_p, 0)  # Put option value at maturity
                    obs_space[node] = (stock_p, opt_v, {'exercise': True})  # Restore current price & value
            else:  # Value before maturity
                for node in self.grid[t]:
                    m = node.m
                    stock_p = self.get_price(node)
                    opt_exe_v = max(self.K - stock_p, 0)
                    opt_continue_v = (self.p * (obs_space[Node(t + 1, m + 1)][1]) + (1 - self.p) * (obs_space[Node(t + 1, m - 1)][1])) / self.R

                    if opt_exe_v > opt_continue_v:
                        opt_v = opt_exe_v
                        obs_space[node] = (stock_p, opt_v, {'exercise': True})
                    else:
                        opt_v = opt_continue_v
                        obs_space[node] = (stock_p, opt_v, {'exercise': False})
        self.observation_space = obs_space
        # return obs_space

    def fit(self):  # Initialized the environment
        self._generate()
        self._back_propagation()
        print('Environment initialization completed!')


    def get_holding_value(self, stock_p, opt_v, hedge_pos):
        return hedge_pos * stock_p - opt_v * self.D

    def reset(self):
        assert self.observation_space, 'Env must be fited first!'
        self.node = Node(0, 0)  # Initial state
        self.hedging_pos = 0
        stock_p, opt_v, _ = self.observation_space[self.node]

        # holding_value = self.get_holding_value(stock_p, opt_v, self.hedging_pos)
        # state = (stock_p, holding_value)
        state = (stock_p, opt_v)
        return np.array(state, dtype=np.float32)

    def reward(self, stock_p, opt_v, hedging_pos_new, w = 0.001):
        """
        Considering the transaction cost in each step and the final payoff
        :param stock_p: current stock price
        :param opt_v: current option value
        :param hedging_pos_new:  current action
        :param w: trading cost coefficient
        :return:
        """
        trading_cost = - stock_p * (hedging_pos_new - self.hedging_pos) * w  # Negative trading reward
        hedging_diff = - np.square(self.get_holding_value(stock_p, opt_v, hedging_pos_new))  # Negative hedging diff
        return hedging_diff +  trading_cost

    def step(self, action):
        """
        :param action: hedging amount (continuous num)
        :return: stock_price, reward, done, {"exercise": bool}
        """
        assert self.node, 'Must reset the environment first'
        assert action in self.action_space, "Input action is not in action space"
        t, m = self.node

        if t == self.n:
            stock_p, opt_v, exercise = self.observation_space[self.node]
            # holding_val = self.get_holding_value(stock_p, opt_v, action)
            # state = (stock_p, holding_val)
            state = (stock_p, opt_v)

            done = True
            reward = self.reward(stock_p, opt_v, action)

            return state, reward, done, {}

        else:
            if np.random.binomial(1, self.p):  # Binomial transition prob
                node = Node(t + 1, m + 1)
            else:
                node = Node(t + 1, m - 1)

            stock_p, opt_v, exercise = self.observation_space[node]

            # holding_val = self.get_holding_value(stock_p, opt_v, action)
            # state = (stock_p, holding_val)
            state = (stock_p, opt_v)

            done = exercise['exercise']
            reward = self.reward(stock_p, opt_v, action)

            ## Update self state
            self.hedging_pos = action
            self.node = node

            return np.array(state, dtype=np.float32), reward, done, {}




In [15]:
g = BinomialTree()

In [16]:
g.fit()

Environment initialization completed!


In [17]:
g.reset()

array([100.       ,   3.1800015], dtype=float32)

In [18]:
g.reset()
print(g.node)
print(g.hedging_pos)
for i in range(10):
    print(g.step(0.1))
    print(g.node)
    print(g.hedging_pos)
    print('------------------')

Node(t=0, m=0)
0
(array([103.21281  ,   1.8562732], dtype=float32), -71.6666770122375, False, {})
Node(t=1, m=1)
0.1
------------------
(array([106.52884   ,   0.91756433], dtype=float32), -94.77644721051321, False, {})
Node(t=2, m=2)
0.1
------------------
(array([103.21281  ,   1.5499569], dtype=float32), -76.9361234785206, False, {})
Node(t=3, m=1)
0.1
------------------
(array([100.       ,   2.5517154], dtype=float32), -55.476945068875246, False, {})
Node(t=4, m=0)
0.1
------------------
(array([96.8872   ,  4.0684943], dtype=float32), -31.586935764174466, False, {})
Node(t=5, m=-1)
0.1
------------------
(array([93.87129 ,  6.228697], dtype=float32), -9.975697752194499, False, {})
Node(t=6, m=-2)
0.1
------------------
(array([96.8872  ,  3.686346], dtype=float32), -36.02849231495374, False, {})
Node(t=7, m=-1)
0.1
------------------
(array([93.87129 ,  6.128706], dtype=float32), -10.617324069100846, True, {})
Node(t=8, m=-2)
0.1
------------------
(array([90.949265,  9.050732], 

# Vanilla Policy Gradient (REINFORCE)
using pytorch to build simple reinforce algorithm

In [4]:
import torch
from torch import nn
from torch import optim
from torch import autograd
import torch.nn.functional as F
from torch.distributions import Categorical

import matplotlib.pyplot as plt

In [5]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cuda', index=0)

In [20]:
class PolicyNetwork(nn.Module):

    def __init__(self, state_dim, action_dim, hidden_size, learning_rate = 3e-4):
        super(PolicyNetwork, self).__init__()

        self.state_dim = state_dim
        self.action_dim = action_dim
        self.fc1 = nn.Linear(state_dim, hidden_size)
        self.fc2 = nn.Linear(hidden_size, action_dim)

        self.optimizer = optim.Adam(self.parameters(), lr = learning_rate)

    def forward(self, x):
        # x.shape = (batch, state_dim)  batch:一次trajectory中的steps
        x = F.relu(self.fc1(x))
        x = F.softmax(self.fc2(x), dim = 1)
        return x

    def choose_action(self, state):
        state = torch.from_numpy(state).float()
        state = state.unsquzze(0).to(device)

        probs = self.forward(state).cpu()
        # highest_prob = np.random.choice(self.action_dim,  p=np.squeeze(prob.detach().numpy()))
        m = Categorical(probs)
        action = m.sample()
        # log_prob = torch.log(prob.squeeze(0)[highest_prob])

        return action.item() / 10, m.log_prob(action)

In [21]:
class AgentVPG:
    GAMMA = 0.9
    max_episodes = 2000

    def __init__(self, env, policy_net):
        self.env = env
        self.policy_net = policy_net


    @staticmethod
    def discounted_future_reward(rewards: list):
        discounted_r = [rewards[-1]]

        for r in rewards[-2::-1]:
            rr = AgentVPG.GAMMA * discounted_r[-1]
            Gt = r + rr
            discounted_r.append(Gt)
        discounted_r = discounted_r[::-1]
        return discounted_r

    def update_policy(self, rewards, log_probs):
        discounted_rewards = self.discounted_future_reward(rewards)
        discounted_rewards = torch.tensor(discounted_rewards)
        discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-9)

        policy_grads = []
        for log_prob, Gt in zip(log_probs, discounted_rewards):
            policy_grads.append(-log_prob*Gt)

        self.policy_net.optimizer.zero_grad()
        policy_grad = torch.stack(policy_grads).sum()
        policy_grad.backward()
        self.policy_net.optimizer.step()


    def fit(self):
        # num_steps = []
        # avg_num_steps = []
        all_rewards = []
        mean_rewards = []

        for episode in range(AgentVPG.max_episodes):
            state = self.env.reset()
            log_probs = []
            rewards = []
            for step in range(15):  # total steps is 10

                action, log_prob = self.policy_net.choose_action(state)
                next_state, reward, done, _ = self.env.step(action)

                log_probs.append(log_prob)
                rewards.append(reward)

                if done:
                    # 完成一次 episode/rollout，得到一次完整的 trajectory
                    self.update_policy(rewards, log_probs)
                    # num_steps.append(step)
                    # avg_num_steps.append(np.mean(num_steps[-3:]))

                    all_rewards.append(sum(rewards))
                    mean_rewards.append(np.mean(rewards))
                    if episode % 100 == 0:
                        print(f'episode: {episode}, total reward: {sum(rewards)}, mean_reward: {np.mean(rewards)}, length: {step}')
                    break

                state = next_state

        plt.plot(all_rewards)
        plt.plot(mean_rewards)
        plt.legend(['all_rewards', 'mean_rewards'])
        plt.xlabel('episode')
        plt.show()




In [28]:
binomial_env = BinomialTree()
binomial_env.fit()

Environment initialization completed!


In [29]:
VPG_policy = PolicyNetwork(binomial_env.state_dim, binomial_env.action_dim, 16)

In [30]:
agent = AgentVPG(binomial_env, VPG_policy)

In [31]:
agent.fit()

AttributeError: 'Tensor' object has no attribute 'unsquzze'

In [12]:
ss = binomial_env.reset()

In [13]:
ss

array([100.       ,   3.1800015], dtype=float32)

In [25]:
tt = torch.from_numpy(ss).float()

In [26]:
tt = tt.unsqueeze(0).to(device)

In [None]:

# torch.from_numpy(state).float().unsquzze(0).to(device)

In [27]:
tt

tensor([[100.0000,   3.1800]], device='cuda:0')

In [76]:
class Policy(nn.Module):
    def __init__(self, s_size, a_size, h_size, learning_rate = 0.9):
        super(Policy, self).__init__()
        self.fc1 = nn.Linear(s_size, h_size)
        self.fc2 = nn.Linear(h_size, a_size)

        self.optimizer = optim.Adam(self.parameters(), lr = learning_rate)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.softmax(x, dim=1)  # 这个dim是 0 or 1？

    def act(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        probs = self.forward(state).cpu()
        m = Categorical(probs)
        action = m.sample()
        return action.item() / 10, m.log_prob(action)

In [55]:
env_id = "CartPole-v1"
# Create the env
# env = gym.make(env_id)

In [65]:
# env.reset()[0][1]

0.049145546

In [77]:
pp = Policy(1, 8, 16).to(device)

In [86]:
pp.act(np.array([1]))

(0.0, tensor([-1.9138], grad_fn=<SqueezeBackward1>))