In [2]:
import numpy as np
import gym
from gym.spaces import Discrete, Box
import torch.nn as nn
import torch
import torch.optim as optimizer
import torch.nn.functional as F
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import matplotlib
import torchvision.transforms as T
from torch.distributions import Categorical, Normal
from scipy import signal
import scipy
from torch import optim

In [3]:
class Net(nn.Module):

    def __init__(self, h, w, outputs):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2)
        self.bn3 = nn.BatchNorm2d(32)

        # Number of Linear input connections depends on output of conv2d layers
        # and therefore the input image size, so compute it.
        def conv2d_size_out(size, kernel_size = 5, stride = 2):
            return (size - (kernel_size - 1) - 1) // stride  + 1
        convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w)))
        convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h)))
        # print(convh, convw)
        linear_input_size = convw * convh * 32
        # self.head = nn.Linear(linear_input_size, outputs)
        self.head = nn.Linear(linear_input_size, convw*convh)
        # self.mid = nn.Linear(convw*convh/3, convh/2)
        self.out = nn.Linear(convw*convh, outputs)
        # self.out = F.softmax

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        # print(x)
        # x = x.to(device)
        # print('U', x[0])
        # print('O', np.shape(x))
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        # return self.head(x.view(x.size(0), -1))
        x = F.relu(self.head(x.view(x.size(0), -1)))
        return F.softmax(self.out(x), dim=-1)

In [4]:
env = gym.make("CartPole-v0")

In [5]:
from torchvision.transforms import InterpolationMode
resize = T.Compose([T.ToPILImage(),
                    T.Resize(40, interpolation=InterpolationMode.BICUBIC),
                    T.ToTensor()])
def get_cart_location(screen_width):
    world_width = env.x_threshold * 2
    scale = screen_width / world_width
    return int(env.state[0] * scale + screen_width / 2.0)  # MIDDLE OF CART
import time
def get_screen():
    # Returned screen requested by gym is 400x600x3, but is sometimes larger
    # such as 800x1200x3. Transpose it into torch order (CHW).
    # time.sleep(2)
    screen = env.render(mode='rgb_array')
    screen  = screen.transpose((2, 0, 1))
    # screen = env.render(mode='rgb_array').transpose((2, 0, 1))
    # Cart is in the lower half, so strip off the top and bottom of the screen
    _, screen_height, screen_width = screen.shape
    screen = screen[:, int(screen_height*0.4):int(screen_height * 0.8)]
    view_width = int(screen_width * 0.6)
    cart_location = get_cart_location(screen_width)
    if cart_location < view_width // 2:
        slice_range = slice(view_width)
    elif cart_location > (screen_width - view_width // 2):
        slice_range = slice(-view_width, None)
    else:
        slice_range = slice(cart_location - view_width // 2,
                            cart_location + view_width // 2)
    # Strip off the edges, so that we have a square image centered on a cart
    screen = screen[:, :, slice_range]
    # Convert to float, rescale, convert to torch tensor
    # (this doesn't require a copy)
    screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
    screen = torch.from_numpy(screen)
    # Resize, and add a batch dimension (BCHW)
    return resize(screen).unsqueeze(0).numpy()

In [85]:
env.reset()
_, _, h, w = get_screen().shape
# print(h, w, env.action_space)
model = Net(h, w, env.action_space.n)

In [44]:
def discount_rewards(rewards, gamma=0.99):
    # Cumulative discounted sum
    r = np.array([gamma**i * rewards[i] 
                  for i in range(len(rewards))])
    r = r[::-1].cumsum()[::-1]
    # Subtracting the baseline reward 
    # Intuitively this means if the network predicts what it
    # expects it should not do too much about it
    # Stabalizes and speeds up the training 
    return r - r.mean()

In [87]:
from torch.utils.tensorboard import SummaryWriter
tb = SummaryWriter()

In [84]:
total_rewards = []
batch_rewards = []
batch_actions = []
batch_states = []
batch_frames = []

batch_counter = 1
opt = optimizer.Adam(model.parameters(), 1e-2)
action_space = np.arange(env.action_space.n)

for ep in tqdm(range(1000, 1500)):
    # Reset
    s_0 = env.reset()
    frame = get_screen()
    states = []
    rewards = []
    actions = []
    frames = []
    complete = False
    while complete == False:
        # print(frame)
        action_probs = model(torch.FloatTensor(frame)).detach().numpy().squeeze()
        # action_probs = model(torch.FloatTensor(frame)).detach().numpy().squeeze()
        # print(action_probs)
        # action_probs = [0.5, 0.5]
        action = np.random.choice(action_space, p=action_probs)
        s1, r, complete, _ = env.step(action)
        frame_ = get_screen()
        states.append(s_0)
        rewards.append(r)
        actions.append(action)
        frames.append(frame.squeeze())
        s_0 = s1
        frame = frame_
        if complete:
            batch_rewards.extend(discount_rewards(rewards, 0.99))
            batch_states.extend(states)
            batch_actions.extend(actions)
            batch_frames.extend(frames)
            batch_counter += 1
            total_rewards.append(sum(rewards))
            tb.add_scalar('Rewards', total_rewards[-1], ep)

            if batch_counter == 10:
                # Prepare the batches for training
                # Add states, reward and actions to tensor
                opt.zero_grad()
                state_tensor = torch.FloatTensor(batch_states)
                reward_tensor = torch.FloatTensor(batch_rewards)
                action_tensor = torch.LongTensor(batch_actions)
                # print(batch_states)
                # print(batch_frames)
                frame_tensor = torch.FloatTensor(batch_frames)

                # Convert the probs by the model to log probabilities
                log_probs = torch.log(model(frame_tensor))
                # Mask the probs of the selected actions
                selected_log_probs = reward_tensor * reward_tensor * log_probs[np.arange(len(action_tensor)), action_tensor]
                # Loss is negative of expected policy function J = R * log_prob
                loss = -selected_log_probs.mean()

                # Do the update gradient descent(with negative reward hence is gradient ascent) 
                loss.backward()
                opt.step()
                tb.add_scalar('2', loss.item(), ep)
                batch_rewards = []
                batch_actions = []
                batch_states = []
                batch_frames = []
                batch_counter = 1

            print("\rEp: {} Average of last 10: {:.2f}".format(
                ep + 1, np.mean(total_rewards[-10:])), end="")

  0%|          | 0/500 [00:00<?, ?it/s]

Ep: 1500 Average of last 10: 28.10

In [41]:
opt.zero_grad()
opt.step()

In [56]:
torch.save(model, f'./RL_VPG_28k.pt')

In [57]:
model.eval()

Net(
  (conv1): Conv2d(3, 16, kernel_size=(5, 5), stride=(2, 2))
  (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(16, 32, kernel_size=(5, 5), stride=(2, 2))
  (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv3): Conv2d(32, 32, kernel_size=(5, 5), stride=(2, 2))
  (bn3): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (head): Linear(in_features=512, out_features=16, bias=True)
  (out): Linear(in_features=16, out_features=2, bias=True)
)

In [81]:
import imageio
from PIL import Image

def generate_gif(env, filename, T=200):
    frames = []
    s_t = env.reset()
    reward = 0
    for t in range(T):
        # a_t = agent.act(s_t)
        frame = get_screen()
        action_probs = model(torch.FloatTensor(frame)).detach().numpy().squeeze()
        # action_probs = model(torch.FloatTensor(frame)).detach().numpy().squeeze()
        # print(action_probs)
        # action_probs = [0.5, 0.5]
        action = np.random.choice(action_space, p=action_probs)
        s_t, r_t, d_t, _ = env.step(action)
        frame_ = env.render(mode='rgb_array')
        frames.append(frame_)
        reward += r_t
        if d_t:
            break
    print(reward)
    images_list = [Image.fromarray(frame) for frame in frames]
    imageio.mimsave(f'{filename}.gif', frames, duration=0.02)

In [82]:
generate_gif(env, './Test2')

8.0
