In [4]:
import gym
import numpy as np
import torch
import torch.nn as nn
from tqdm import tqdm

In [8]:
env = gym.make('LunarLander-v2')

[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m


In [9]:
dim_in = env.observation_space.shape[0]
dim_out = env.action_space.n

In [10]:
def build_model(dim_in, H, dim_out):
    return nn.Sequential(
        nn.Linear(dim_in, H),
        nn.ReLU(),
        nn.Linear(H, H),
        nn.ReLU(),
        nn.Linear(H, H),
        nn.ReLU(),
        nn.Linear(H, dim_out)
    )

In [11]:
class replay_buffer(object):
    def __init__(self, size_limit):
        self.states = []
        self.actions = []
        self.rewards = []
        self.next_states = []
        self.dones = []
        self.size = 0
        self.size_limit = size_limit
        self.dtypes = [np.float32, np.int64, np.float32, np.float32, np.float32]
    
    def add(self, s, a, r, ns, done):
        self.states.append(s)
        self.actions.append(a)
        self.rewards.append(r)
        self.next_states.append(ns)
        self.dones.append(done)
        self.size += 1
        
        if self.size > self.size_limit:
            self.states.pop(0)
            self.actions.pop(0)
            self.rewards.pop(0)
            self.next_states.pop(0)
            self.dones.pop(0)
    
    def sample_batch(self, batch_size):
        # get batch indexes
        indexs = np.arange(self.size)
        np.random.shuffle(indexs)
        batch_i = indexs[:batch_size]
        
        batch = []
        # extract batch indexes
        for dset, dtype in zip([self.states, self.actions, self.rewards, self.next_states, self.dones], self.dtypes):
            batch.append(np.asarray(dset, dtype=dtype)[batch_i])

        # convert to tensors
        for i, dset in enumerate(batch):
            batch[i] = torch.tensor(dset)
        
        return tuple(batch)


In [22]:
def sample_env(buffer, Qfunc, num_samples, e):
    obs = env.reset()
    count = 0
    rewards = 0
    while count < num_samples:
        # start with random sample policy
        if np.random.randint(0, high=1) < e:
            act = env.action_space.sample()
        else:
            act = Qfunc(torch.tensor(np.asarray(obs, dtype=np.float32)))
            act = torch.argmax(act).item()
            
        next_obs, reward, done, _ = env.step(act)
        rewards += reward
        buffer.add(obs, act, reward, next_obs, done)
        if done:
            obs = env.reset()
        obs = next_obs
        count += 1

In [20]:
def train(Qtarget, Qfunc, buffer, num_samples, num_qtarg_update, num_resample, num_q_update, batch_size=1000, gamma=torch.tensor(0.99, dtype=torch.float32)):
    loss_fn = nn.L1Loss()
    optim = torch.optim.Adam(Qfunc.parameters(), lr=1e-2)
    e = 0.9
    
    for i in range(num_qtarg_update):
        # update Q'(s, a) = Q(s, a)
        Qtarget.load_state_dict(Qfunc.state_dict())
        
        # sample more data to buffer
        for _ in range(num_resample):
            # sample from env 
            while buffer.size < batch_size:
                sample_env(buffer, Qfunc, num_samples, e ** i)
            # update Q(s, a) with k steps
            for _ in range(num_q_update):
                obs_k, act_k, rew_k, obs_n_k, done_k = buffer.sample_batch(batch_size) 
                # Q(s, a) of taken action
                qpred_k = Qfunc(obs_k)
                qpred_k = qpred_k[np.arange(len(qpred_k)), act_k]
                
                # Double Q Learning
                # y = r(s, a) + gamma * Q'(s', max_a' Q(s', a'))
                qtarget_pred = Qtarget(obs_n_k)
                qfunc_act = torch.argmax(Qfunc(obs_n_k), 1)
                qtarget_pred = qtarget_pred[np.arange(len(qtarget_pred)), qfunc_act]
                qtarget_pred = qtarget_pred.detach()
                
                done_mask = torch.ones_like(done_k) - done_k
                y_k = rew_k + done_mask * gamma * qtarget_pred

                loss = (qpred_k - y_k).sum()
                optim.zero_grad()
                
                # grad descent
                loss.backward()
                optim.step()
    return loss.item()

In [15]:
def test(Qfunc):
    obs = env.reset()
    rewards = 0
    done = False
    while not done:
        act = Qfunc(torch.tensor(np.asarray(obs, dtype=np.float32)))
        act = torch.argmax(act).item()
            
        obs, reward, done, _ = env.step(act)
        rewards += reward
    return rewards

In [16]:
H = 100
Qtarget = build_model(dim_in, H, dim_out)
Qfunc = build_model(dim_in, H, dim_out)

In [23]:
epoch = 5
buffer = replay_buffer(size_limit=1000000)
e_loss = []
for e in range(epoch):
    print('Epoch:', e)
    loss = train(Qtarget, Qfunc, buffer,
                  num_samples=5000,
                  num_qtarg_update=5000,
                  num_resample=10000, 
                  num_q_update=4,
                  batch_size=2000)
    print("Loss:", loss)
    print("Reward:", test(Qfunc))

Epoch: 0


KeyboardInterrupt: 

In [18]:
torch.save(Qfunc.state_dict(), './weights')

In [19]:
!python run.py

[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
