In [1]:
from collections import deque
import gym
import numpy as np
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [2]:
def policy(state, exploration_rate):
    if np.random.uniform(0, 1) < exploration_rate:
        return env.action_space.sample()
    q_values = Q(torch.from_numpy(state).float()).detach().numpy()
    return np.argmax(q_values)

In [3]:
def vfa_update(states, actions, rewards, dones, next_states):
    optimizer.zero_grad()
    states = torch.from_numpy(np.array(states)).float()
    actions = torch.from_numpy(np.array(actions)).unsqueeze(-1)
    rewards = torch.from_numpy(np.array(rewards)).float()
    dones = torch.from_numpy(np.array(dones)).float()
    next_states = torch.from_numpy(np.array(next_states)).float()

    """
    value function approximation update
    """
    q_values = torch.gather(Q(states), dim=-1, index=actions).squeeze()
    target_q_values = rewards + \
        (1 - dones) * DISCOUNT_FACTOR * Q(next_states).max(dim=-1)[0].detach()
    loss = F.mse_loss(q_values, target_q_values)

    loss.backward()
    optimizer.step()
    return loss.item()

In [7]:
def q_learning(num_episodes, exploration_rate=0.1, use_per=False):
    rewards = []
    replay_buffer = deque(maxlen=int(1e5))
    for episode in range(num_episodes):
        rewards.append(0)
        obs = env.reset()
        state = obs

        for t in range(MAX_EPISODE_LENGTH):
            action = policy(state, exploration_rate)

            obs, reward, done, _ = env.step(action)

            next_state = obs
            replay_buffer.append((state, action, reward, done, next_state))

            state = next_state

            rewards[-1] += reward

            if len(replay_buffer) >= BATCH_SIZE:
                if use_per:
                    # Prioritized Experience Replay
                    weights = np.linspace(0, 100, len(replay_buffer))
                    batch = random.choices(replay_buffer, k=BATCH_SIZE, weights=weights)
                else:
                    batch = random.choices(replay_buffer, k=BATCH_SIZE)
                vfa_update(*zip(*batch))

            if done:
                break

        if episode % (num_episodes / 100) == 0:
            print("Mean Reward: ", np.mean(rewards[-int(num_episodes / 100):]))
    return rewards

In [8]:
MAX_EPISODE_LENGTH = 1000
DISCOUNT_FACTOR = 0.99

env = gym.make('LunarLander-v2')

Q = nn.Sequential(nn.Linear(np.prod(env.observation_space.shape),
                            64), nn.ReLU(), nn.Linear(64, 64), nn.ReLU(), nn.Linear(64, env.action_space.n))

optimizer = optim.Adam(Q.parameters(), lr=5e-4)
BATCH_SIZE = 64

In [9]:
q_learning(1000)

Mean Reward:  -515.4510551292258
Mean Reward:  -282.57291779523786
Mean Reward:  -119.60106094478347
Mean Reward:  -140.1173111498822
Mean Reward:  -40.988395046556946
Mean Reward:  2.123535716905363
Mean Reward:  -10.17904552999978
Mean Reward:  -159.72790976154744
Mean Reward:  -71.73557629748355
Mean Reward:  -59.5558794142258
Mean Reward:  -62.324745317563824
Mean Reward:  -97.40087034218512


KeyboardInterrupt: 