In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [None]:
import matplotlib.pyplot as plt
import numpy as np

In [None]:

print(f"Torch Version {torch.__version__}")
print(f"Torch Available: {torch.cuda.is_available()}")

In [None]:
class PolicyNet(nn.Module):

    def __init__(self, T: int):
        super(PolicyNet, self).__init__()
        self.T = T
        self.hidden1 = nn.Linear(2, 64)
        self.hidden2 = nn.Linear(64, 32)
        self.softmax_layer = nn.Linear(32, 1)

    def forward(self, x):
        # Convert x to be between -1 and 1
        x = x / self.T - 0.5
        x = F.relu(self.hidden1(x))
        x = F.relu(self.hidden2(x))
        x = F.sigmoid(self.softmax_layer(x))

        return x

In [None]:
class ValueNet(nn.Module):

    def __init__(self, T: int):
        super(ValueNet, self).__init__()
        self.T = T
        self.hidden1 = nn.Linear(2, 64)
        self.hidden2 = nn.Linear(64, 32)
        self.value_layer = nn.Linear(32, 1)

    def forward(self, x):
        # Convert x to be between -1 and 1
        x = x / self.T - 0.5
        x = F.relu(self.hidden1(x))
        x = F.relu(self.hidden2(x))
        x = self.value_layer(x)

        return x

In [None]:
T=100 # The length of the excursion
negative_reward = -50
lose_reward_gradient = -500

policy_learning_rate = 0.0001
value_learning_rate = 0.0001
batch_size = 64

In [None]:
model_dir = "results/models/"
import os
if not os.path.exists("results/"):
    os.mkdir("results/")
if not os.path.exists(model_dir) or not os.path.isdir(model_dir):
    os.mkdir(model_dir)

with open(os.path.join(model_dir, 'params.txt'), 'w') as writer:
    writer.write(f"T: {T}\n")
    writer.write(f"negative_reward: {negative_reward}\n")
    writer.write(f"lose_reward_gradient: {lose_reward_gradient}\n")
    writer.write(f"policy_learning_rate: {policy_learning_rate}\n")
    writer.write(f"value_learning_rate: {value_learning_rate}\n")
    writer.write(f"batch_size: {batch_size}\n")
def save_models(epoch: int, policy_net, value_net):
    save_dir = os.path.join(model_dir, f"epoch_{epoch}")
    os.mkdir(save_dir)
    policy_path = os.path.join(save_dir, "policy.pt")
    value_path = os.path.join(save_dir, "value.pt") 
    torch.save(policy_net, policy_path)
    torch.save(value_net, value_path)

In [None]:

# Define the neural networks for the policy and the value function
policy_net = PolicyNet(T) # Policy outputs the probability of the up action
value_net = ValueNet(T) # Value outputs a single number representing the value of the state (x,t)
# LOAD PREVIOUS WORK
# Define the optimisers for the gradient descent
policy_optim = optim.Adam(policy_net.parameters(), lr=policy_learning_rate)
value_optim = optim.Adam(value_net.parameters(), lr=value_learning_rate)

In [None]:
epochs = 1000
total_mean_returns = []
mean_successes = []
mean_entropies = []
mean_hits = []
for epoch in range(epochs):
    if epoch % 50 == 0:
        print(f"Epoch {epoch}")

    policy_optim.zero_grad()
    value_optim.zero_grad()
    total_reward_mean = 0
    total_mean_entropy = 0
    state = torch.zeros(batch_size, 2)
    success = (torch.ones(batch_size, 1)).type(torch.BoolTensor)
    for t in range(T):

        prob_policy = policy_net(state)
        value_state = value_net(state)
        random_sample = torch.rand(batch_size, 1)

        action = (random_sample<prob_policy).type(torch.FloatTensor)
        action.requires_grad_(False)

        prob_action = action * prob_policy + (1.0-action)*(1.0-prob_policy)

        delta_state = torch.cat((action*2.0-1.0, torch.ones(batch_size, 1)), 1)
        # action True = move up, action False = move down

        next_state = state + delta_state
        
        below_zero_bool = next_state[:, 0] < 0
        success = torch.logical_and(torch.logical_not(below_zero_bool), success)
        below_zero = (below_zero_bool).type(torch.FloatTensor).reshape((-1, 1))
        
        if (t<T-1):
            base_reward = below_zero * negative_reward
            next_value = torch.zeros(batch_size, 1)
        else:
            base_reward = below_zero * negative_reward + torch.abs(next_state[:, 0].reshape((-1, 1))) * lose_reward_gradient
            has_hit = next_state[:, 0] == 0.0
            success = torch.logical_and(has_hit, success)
            next_value = value_net(next_state)

        entropy_term = -torch.log(prob_action).detach()
        total_mean_entropy += torch.mean(entropy_term)
        total_reward = (base_reward + entropy_term).detach()
        total_reward_mean += torch.mean(total_reward)
        # discount = 1.0
        temp_diff = total_reward + next_value.detach() - value_state
        value_loss = torch.mean(temp_diff * temp_diff)
        value_loss.backward()
        policy_loss = -torch.mean(temp_diff.detach() * torch.log(prob_action))
        policy_loss.backward()
        state = next_state
    
    policy_optim.step()
    value_optim.step()
    total_mean_returns.append(total_reward_mean.numpy())
    mean_successes.append((torch.mean(success.type(torch.FloatTensor))).numpy())
    mean_hits.append((torch.mean(has_hit.type(torch.FloatTensor))).numpy())
    mean_entropies.append(total_mean_entropy.numpy())
    if (epoch % 10 == 0):
        save_models(epoch, policy_net, value_net)

mean_return = np.mean(total_mean_returns)
with open(os.path.join(model_dir, 'run_data.csv'), 'w') as writer:
    writer.write("epoch,mean_entropy,mean_return,mean_success,mean_hit_rate\n")
    for i in range(len(mean_entropies)):
        writer.write(f"{i},{mean_entropies[i]},{total_mean_returns[i]},{mean_successes[i]},{mean_hits[i]}\n")
print(f"Peak Mean Return {np.max(total_mean_returns)}")
print(f"Peak Mean Entropy {np.max(mean_entropies)}")
plt.figure()
plt.title('Mean Return')
plt.plot([0, epochs-1], [0, 0], '-b')
plt.plot(range(epochs), total_mean_returns, '-r')
plt.show()
plt.figure()
plt.title('Mean Success')
plt.plot(range(epochs), mean_successes, '-r')
plt.ylim([0, 1])
plt.show()
plt.figure()
plt.title('Mean Hits')
plt.plot(range(epochs), mean_hits, '-r')
plt.ylim([0, 1])
plt.show()

In [None]:
num_samples = 1024
loaded_policy = torch.load('results/models/epoch_990/policy.pt')
total_reward_mean = 0
state = torch.zeros(num_samples, 2)
success = (torch.ones(num_samples, 1)).type(torch.BoolTensor)

states = []
for t in range(T):

    prob_policy = loaded_policy(state)
    random_sample = torch.rand(num_samples, 1)

    action = (random_sample<prob_policy).type(torch.FloatTensor)
    action.requires_grad_(False)

    prob_action = action * prob_policy + (1.0-action)*(1.0-prob_policy)

    delta_state = torch.cat((action*2.0-1.0, torch.ones(num_samples, 1)), 1)
    # action True = move up, action False = move down
    states.append(state[:, 0].numpy())
    state = state + delta_state
states.append(state[:, 0].numpy())

states = np.array(states)
plt.figure()
plt.title("Sampled Paths")
plt.xlabel("Time, t")
plt.ylabel("Position, x")
for i in range(num_samples):
    plt.plot(range(T+1), states[:,i], '-k', alpha=0.005)
plt.show()


In [None]:
epoch = 990
folder = f'results/models/epoch_{epoch}'
loaded_policy = torch.load(f'results/models/epoch_{epoch}/policy.pt')
with open(os.path.join(folder, 'policy.csv'), 'w') as writer:
    writer.write("x,t,policy_up,policy_down\n")

    for t in range(T):
        for x in range(-t, t+1, 2):
            state = torch.Tensor([x, t]).reshape((-1, 2))
            policy_val = np.asscalar(loaded_policy(state).detach().numpy())
            writer.write(f"{x},{t},{policy_val},{1.0-policy_val}\n")
