# Approximate q-learning (10 pts)

In this notebook you will teach a __PyTorch__ neural network to do Q-learning.

In [1]:
import gymnasium as gym
import thousand
from thousand.Player import Player
import numpy as np
import matplotlib.pyplot as plt
from tqdm import trange
from typing import Any
import time
import torch
import torch.nn as nn
import torch.nn.functional as F

In [2]:
class SmallestPlayer(Player):

    def make_a_move(self, observation, info):
        return info['correct_moves'][0]


class BiggestPlayer(Player):

    def make_a_move(self, observation, info):
        return info['correct_moves'][-1]


class RandomPlayer(Player):

    def __init__(self, seed=None) -> None:
        self.rng: np.random.Generator = np.random.default_rng(seed)

    def make_a_move(self, observation, info):
        return self.rng.choice(info['correct_moves'])

In [3]:
def get_action(env: gym.Env, network, observation, epsilon=0):
    """
    sample actions with epsilon-greedy policy
    recap: with p = epsilon pick random action, else pick action with highest Q(s,a)
    """
    observation = torch.tensor(observation, dtype=torch.float32)
    q_values = network(observation)
    if np.random.random() < epsilon:
        return int(np.random.choice(env.action_space.n))
    return int(np.argmax(q_values.detach().numpy()))

In [4]:
def compute_td_loss(network, states, actions, rewards, next_states, is_done, gamma=0.99):
    """ Compute td loss using torch operations only. Use the formula above. """
    states = torch.tensor(
        states, dtype=torch.float32)    # shape: [batch_size, state_size]
    actions = torch.tensor(actions, dtype=torch.long)    # shape: [batch_size]
    rewards = torch.tensor(rewards, dtype=torch.float32)  # shape: [batch_size]
    # shape: [batch_size, state_size]
    next_states = torch.tensor(next_states, dtype=torch.float32)
    is_done = torch.tensor(is_done, dtype=torch.uint8)  # shape: [batch_size]

    # get q-values for all actions in current states
    predicted_qvalues = network(states)

    # select q-values for chosen actions
    predicted_qvalues_for_actions = predicted_qvalues[
      range(states.shape[0]), actions
    ]
    # compute q-values for all actions in next states
    predicted_next_qvalues = network(next_states)

    # compute V*(next_states) using predicted next q-values
    next_state_values = torch.amax(predicted_next_qvalues, axis=1)
    assert next_state_values.dtype == torch.float32

    # compute "target q-values" for loss - it's what's inside square parentheses in the above formula.
    target_qvalues_for_actions = rewards + gamma * next_state_values

    # at the last state we shall use simplified formula: Q(s,a) = r(s,a) since s' doesn't exist
    target_qvalues_for_actions = torch.where(
        is_done, rewards, target_qvalues_for_actions)

    # mean squared error loss to minimize
    loss = torch.mean((predicted_qvalues_for_actions -
                       target_qvalues_for_actions.detach()) ** 2)


    return loss

### Playing the game

In [5]:
def generate_session(env, options: dict[str, Any], network, optimizer, epsilon=0, train=False):
    """play env with approximate q-learning agent and train it at the same time"""
    total_reward = 0
    observation, info = env.reset(options=options)
    is_done = False

    while not is_done:    
        action = get_action(env, network, observation, epsilon=epsilon)
        next_observation, reward, terminated, truncated, info = env.step(action)
        is_done = terminated or truncated
        if train:
            optimizer.zero_grad()
            compute_td_loss(network, [observation], [action], [reward], [next_observation], [is_done]).backward()
            optimizer.step()

        total_reward += reward
        observation = next_observation

    return total_reward

In [12]:
def learn_game(env: gym.Env, options: dict[str, Any], network, optimizer, n_sessions: int, epsilon: float, n_games: int):
    tr = trange(n_games, desc='mean: 0000; epsl: 0000')
    means = []
    for _ in tr:
        rewards = [generate_session(env, options, network, optimizer, epsilon, train=True) for _ in range(n_sessions)]
        epsilon *= 0.99
        tr.set_description(
            f'mean: {np.mean(rewards):4.1f}; epsl: {epsilon:4.3f}')
        means.append(np.mean(rewards))

    return means

In [7]:
def make_chart(mean, name):
    fig, ax = plt.subplots()
    ax.set_title('mean rewards')
    ax.grid()
    ax.plot(mean, label='mean')
    ax.legend()
    fig.savefig(name)

In [8]:
env = gym.make("Thousand-v1")
network = nn.Sequential(nn.Linear(env.observation_space.shape[0], 101), 
                        nn.ReLU(),
                        nn.Linear(101, 101),
                        nn.ReLU(),
                        nn.Linear(101, 50),
                        nn.ReLU(),
                        nn.Linear(50, env.action_space.n))
opt = torch.optim.Adam(network.parameters(), lr=1e-4)

In [9]:
all_mean = []

In [None]:
mean = learn_game(env, {'players': [SmallestPlayer(), SmallestPlayer()]}, network, opt, 100, 0.8, 500)
all_mean += mean

In [None]:
make_chart(all_mean, f'{time.time()}')