In [None]:
import gym
from gym import spaces
import numpy as np
import itertools
import time
import torch
import pylab as plt
# %matplotlib notebook
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
import copy
import memory as mem   
from feedforward import Feedforward
# import MountainCarInformativeReward
from custompendulumenv import CustomPendulumEnvDiscrete
import pickle

## Helper Functions

In [None]:
def running_mean(x, N):
    cumsum = np.cumsum(np.insert(x, 0, 0)) 
    return (cumsum[N:] - cumsum[:-N]) / float(N)


# func for debugging
def print_model_params(model):
    for name, param in model.named_parameters():
        if param.requires_grad:
            print (name, param.data)

# DQN

Complete the implemenation of DQN with a main Q-network and a target Q-network

In [None]:
""" Q Network, input: observations, output: q-values for all actions """
class QFunction(Feedforward):
    def __init__(self, observation_dim, action_dim, device,
                 hidden_sizes=[100,100], learning_rate = 0.0002):
        super().__init__(input_size=observation_dim, 
                         hidden_sizes=hidden_sizes, 
                         output_size=action_dim,
                         device=device)
        if device.type == 'cuda':
            self.cuda()
        self.optimizer = torch.optim.Adam(self.parameters(), 
                                        lr=learning_rate, 
                                        eps=0.000001) 
        self.loss = torch.nn.SmoothL1Loss()
        
        self.print = True
        
    def fit(self, observations, actions, targets):
        # TODO: complete this
        targets = torch.from_numpy(targets).to(self.device).float()

        pred = self.Q_value(observations, actions)
        loss = self.loss(pred, targets)
        loss.backward()
        for param in self.parameters():
            param.grad.data.clamp_(-1, 1)
        self.optimizer.step()
        return loss.item()
    
    def Q_value(self, observations, actions):
        pred = self.forward(
            torch.from_numpy(observations).to(self.device).float()
        )
        return torch.gather(pred, 1, torch.from_numpy(actions).to(self.device).long())
    
    def maxQ(self, observations):
        return np.max(self.predict(observations), axis=-1)
    
    def greedyAction(self, observations):
        # this computes the greedy action
        return np.argmax(self.predict(observations), axis=-1)

In [None]:
class DQNAgent(object):
    """
    Agent implementing Q-learning with NN function approximation.    
    """
    def __init__(self, observation_space, action_space, **userconfig):
        
        if not isinstance(observation_space, spaces.box.Box):
            raise UnsupportedSpace('Observation space {} incompatible ' \
                                   'with {}. (Require: Box)'.format(observation_space, self))
        if not isinstance(action_space, spaces.discrete.Discrete):
            raise UnsupportedSpace('Action space {} incompatible with {}.' \
                                   ' (Reqire Discrete.)'.format(action_space, self))
        
        self._observation_space = observation_space
        self._action_space = action_space
        self._action_n = action_space.n
        self._config = {
            "eps": 0.05,            # Epsilon in epsilon greedy policies                        
            "discount": 0.95,
            "buffer_size": int(1e5),
            "batch_size": 128,
            "learning_rate": 0.0002, 
            "hidden_sizes": [100, 100],
            "update_target": False
        }
        self._config.update(userconfig)        
        self._eps = self._config['eps']
        self.buffer = mem.Memory(max_size=self._config["buffer_size"])
        
        self._print = True # custom variable for debugging
        
        # complete here
        self.train_iter = 100
        self.Q = QFunction(self._observation_space.shape[0], self._action_n, 
                           hidden_sizes=self._config['hidden_sizes'], 
                           learning_rate=self._config['learning_rate'],
                           device=self._config['device'])
        
        self.target_Q = copy.deepcopy(self.Q)
        
    def _update_target_net(self):
        self.target_Q.load_state_dict(self.Q.state_dict())
    
    def act(self, observation, eps=None):
        if eps is None:
            eps = self._eps
        # epsilon greedy
        if np.random.random() > eps:
            action = self.Q.greedyAction(observation)
        else: 
            action = self._action_space.sample()        
        return action
    
    def store_transition(self, transition):
        self.buffer.add_transition(transition)
            
    def train(self, iter_fit=32):
        losses = []
        if self._config['update_target']:
            self._update_target_net()
        
        for i in range(iter_fit):
            data = self.buffer.sample(batch=self._config['batch_size'])
            s = np.stack(data[:, 0]) # s_t
            a = np.stack(data[:, 1])[:, None] # a_t
            rew = np.stack(data[:, 2])[:, None] # r
            s_next = np.stack(data[:, 3]) # s_t+1
            not_done = (~np.stack(data[:, 4])[:, None]).astype(np.int) # not_done flag
            
            value_s = self.Q.Q_value(s, a)
            if self._config['update_target']:
                value_s_next = self.target_Q.maxQ(s_next)[:, None]
            else:
                value_s_next = self.Q.maxQ(s_next)[:, None]
            # target
            targets = rew + self._config['discount'] * np.multiply(not_done, value_s_next)

            # optimize
            fit_loss = self.Q.fit(s, a, targets)
            losses.append(fit_loss)    
        
        return losses

## Test in Env

In [None]:
# env_name = 'CustomPendulumDiscrete-v0'
env_name = 'CartPole-v0'

env = gym.make(env_name)
ac_space = env.action_space
o_space = env.observation_space
print(ac_space)
print(o_space)
print(list(zip(env.observation_space.low, env.observation_space.high)))

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
update_target = True
iter_fit = 40

In [None]:
q_agent = DQNAgent(o_space, ac_space, discount=0.95, eps=0.2, 
                   hidden_sizes=[256], update_target=update_target, 
                   learning_rate=0.0001,
                   device=device)

In [None]:
epsilon = 0.95
epsilon_decay = 0.9996
min_epsilon = 0.01

temp_epsilon = epsilon

l = []

for i in range(20000):
    temp_epsilon = max(epsilon_decay*temp_epsilon, min_epsilon)
    l.append(temp_epsilon)
    
    
plt.plot(l)
plt.ylim(0, 1)
plt.show()

Train the agent!

In [None]:
stats = []
losses = []
max_episodes = 700
max_steps = 800 
fps = 50
show = False

for i in range(max_episodes):
    total_reward = 0
    ob = env.reset()
    for t in range(max_steps):
        done = False    
        epsilon = max(epsilon_decay * epsilon, min_epsilon)
        a = q_agent.act(ob, eps=epsilon)
        (ob_new, reward, done, _info) = env.step(a)
        total_reward += reward
        q_agent.store_transition((ob, a, reward, ob_new, done))            
              
        if show:
            time.sleep(1.0/fps)
            env.render(mode='human')        
        if done: 
            break 
            
        ob = ob_new  
            
    losses.extend(q_agent.train(iter_fit=iter_fit))
    stats.append([i,total_reward,t+1])    
    
    if ((i-1)%20==0):
        print("{}:\tDone after {} steps.\tReward:\t{}".format(i, t+1, total_reward))

if show:
    env.close()

Plot the training reward over time. Use the running_mean(array, window_size) to plot a smooth version 

In [None]:
stats_np = np.asarray(stats)

mean = running_mean(stats_np[:, 1], 50)
plt.plot(mean)
plt.savefig(f'{env_name}-{update_target}-{iter_fit}-reward.pdf')
plt.show()

In [None]:
losses_np = np.asarray(losses)

mean = running_mean(losses_np, 50)
plt.plot(mean)
plt.savefig(f'{env_name}-{update_target}-{iter_fit}-loss.pdf')
plt.show()

In [None]:
q_agent.buffer.size

## Evaluate (without exploration)

Please look at the behavior for a small number of episodes

In [None]:
test_stats = []
episodes = 1000 # increased the number so that we get statistically significant results
fps = 50
show = False
for i in range(episodes):
    total_reward = 0
    ob = env.reset()
    for t in range(max_steps):
        done = False        
        a = q_agent.act(ob, eps=0.0)
        (ob_new, reward, done, _info) = env.step(a)
        total_reward+= reward
        ob=ob_new        
        if show:
            time.sleep(1.0/fps)
            env.render(mode='human')        
        if done: break    
    print("{}:\tDone after {} steps.\tReward:\t{}".format(i, t+1, total_reward))
    test_stats.append([i,total_reward,t+1])        

Evaluate mean and standard deviation of performance 

(for the Pendulum: an average return around -30 or better should be achieved)

(for the CartPendulum it is possible to get 200)

In [None]:
np.mean([rew for _, rew, _ in test_stats])

In [None]:
with open(f'{env_name}-{update_target}-{iter_fit}.pkl', 'wb') as output:
    pickle.dump(q_agent, output, pickle.HIGHEST_PROTOCOL)

Render 1 episode of the trained model

In [None]:
max_steps = 200
fps=50
total_reward = 0
ob = env.reset()
for t in range(max_steps):
    done = False        
    a = new_agent.act(ob, eps=0.0)
    (ob_new, reward, done, _info) = env.step(a)
    total_reward += reward
    ob = ob_new        
    time.sleep(1.0/fps)
    env.render(mode='human')        
    if done: break    
print("Done after {} steps. Reward: {}".format(t+1, total_reward))    
#env.close()

# Visualize

Visualization of the value function.

Adapt the value_function plotting from last time to plot the maxQ value

In [None]:
def plot_Q_function(value_function, o_space, plot_dim1, plot_dim2,
                    label_dim1, label_dim2):
    plt.rcParams.update({'font.size': 12})
    xxs =np.linspace(max(o_space.low[plot_dim1], -10), min(o_space.high[plot_dim1], 10))
    yys = np.linspace(max(o_space.low[plot_dim2], -10), min(o_space.high[plot_dim2], 10))
    XX,YY=np.meshgrid(xxs,yys)
    dots=np.asarray([XX.ravel(),YY.ravel()]).T
    # values = np.asarray(test_func(dots)).reshape(XX.shape)
    
    model_input = np.zeros((dots.shape[0], o_space.shape[0]))
    model_input[:, plot_dim1] = dots[:, 0]
    model_input[:, plot_dim2] = dots[:, 1]
    
    values = value_function.maxQ(model_input).reshape(XX.shape)

    fig = plt.figure(figsize=[10,8])
    ax = fig.gca(projection='3d')
    surf = ax.plot_surface(XX, YY, values, cmap=cm.coolwarm,
                           linewidth=0, antialiased=False)
    
    plt.savefig(f'{env_name}-{update_target}-{iter_fit}-value-{plot_dim1}-{plot_dim2}.pdf')
    ax.set_xlabel(label_dim1)
    ax.set_ylabel(label_dim2)
    ax.set_zlabel('value')
    # plt.colorbar(cmap=cm.coolwarm)
    return fig

In [None]:
with open(f'{env_name}-{update_target}-{iter_fit}-perfect.pkl', 'rb') as inp:
    new_agent = pickle.load(inp)
    
# Move agent to cpu for inference
new_agent.Q.device = torch.device('cpu')
new_agent.Q.cpu()

## Cartpole Env

Observation space:
 
0       Cart Position             -4.8                    4.8

1       Cart Velocity             -Inf                    Inf

2       Pole Angle                -0.418 rad (-24 deg)    0.418 rad (24 deg)

3       Pole Angular Velocity     -Inf                    Inf

Try to adapt the plotting function that it also works in higher input spaces.

In [None]:
figQ = plot_Q_function(new_agent.Q, o_space=o_space, plot_dim1=0, plot_dim2=2, 
                       label_dim1="Cart Pos", label_dim2="Pole Angle")

In [None]:
figQ = plot_Q_function(new_agent.Q, o_space=o_space, plot_dim1=0, plot_dim2=1, 
                       label_dim1="Cart Pos", label_dim2="Cart Vel")

In [None]:
figQ = plot_Q_function(new_agent.Q, o_space=o_space, plot_dim1=2, plot_dim2=3, 
                       label_dim1="Pol Angle", label_dim2="Pole Vel")