# Reinforcement Learning

# 5. Gradient Methods

This notebook presents gradient methods, useful for learning in some environment with a large state space.

We use a neural network with a single hidden layer.

In [1]:
import numpy as np
from matplotlib import pyplot as plt

In [2]:
from model import TicTacToe, Nim, ConnectFour
from agent import Agent, OnlineEvaluation
from dynamic import ValueIteration

In [3]:
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim

In [4]:
class MCLearning(OnlineEvaluation):
    """Online evaluation by Monte-Carlo."""
        
    def update_values(self, state=None, horizon=100):
        """Update the values from one episode."""
        stop, states, rewards = self.get_episode(state=state, horizon=horizon)
        # remove last state
        states.pop()
        gain = 0
        # backward update
        for state, reward in zip(reversed(states), reversed(rewards)):
            self.add_state(state)
            code = self.model.encode(state)
            self.count[code] += 1
            # to be modified
            # begin
            gain = reward + self.gamma * gain
            # end 
            diff = gain - self.value[code]
            count = self.count[code]
            self.value[code] += diff / count

## Value gradient

We start with value-based methods. The neural network is a regressor that approximates the value function. Note that the model is supposed to be known, so that we don't need the action-value function.

In [5]:
class Regressor(nn.Module):
    """Neural network for value function approximation. Return the value of each state."""
    def __init__(self, model, hidden_size):
        if not hasattr(model, 'one_hot_encode'):
            raise ValueError("The environment must have a one-hot encoding of states.")   
        super(Regressor, self).__init__()
        self.model = model
        state = model.init_state()
        code = model.one_hot_encode(state)
        input_size = len(code)
        self.nn = nn.Sequential(
            nn.Linear(input_size, hidden_size), 
            nn.GELU(), 
            nn.Linear(hidden_size, 1))

    def forward(self, code):
        """Forward pass."""
        return self.nn(code)

In [6]:
game = TicTacToe()

In [7]:
regressor = Regressor(model=game, hidden_size=100)

In [8]:
state = game.state
# one-hot encoding
code = game.one_hot_encode(state)
# tensor
code = torch.tensor(code).float()

In [9]:
value = regressor.forward(code).detach()

In [10]:
# value of the state
print(value)

tensor([-0.1113])


## To do

* Complete the method get_best_actions of the class ValueGradient. 
* Test the agent on TicTacToe, against (1) a random adversary and (2) a perfect adversary.
* Test the agent on ConnectFour, against (1) a random adversary and (2) an adversary with the one-step policy.
* Compare your results to another learning strategy (e.g., Monte-Carlo learning) and interpret the results.

In [13]:
class ValueGradient(OnlineEvaluation):
    """Agent learning by value gradient. The model is supposed to be known.
    
    Parameters
    ----------
    model : object of class Environment
        Model.
    player : int
        Player for games (1 or -1, default = default player of the game).
    gamma : float
        Discount rate (in [0, 1], default = 1).
    hidden_size : int
        Size of the hidden layer (default = 100).
    """
    def __init__(self, model, player=None, gamma=1, hidden_size=100):
        super(ValueGradient, self).__init__(model, player=player)  
        if not hasattr(model, "get_next_state"):
            raise ValueError("The model must be known, with a 'get_next_state' method.")
        self.nn = Regressor(model, hidden_size)
        self.gamma = gamma
        
    def get_best_actions(self, state):
        """Get the best actions in some state according to the value function.""" 
        actions = self.get_actions(state)
        if len(actions) > 1:
            # to be modified
            # if the state is terminal, take the actual value
            values = np.zeros(len(actions))
            for i, action in enumerate(actions):
                next_state = self.model.get_next_state(state, action)
                if self.model.is_terminal(next_state):
                    values[i] = self.model.get_reward(next_state)
                else:
                    code = self.model.one_hot_encode(next_state)
                    code = torch.tensor(code).float()
                    values[i] = self.nn.forward(code).detach().numpy()[0]
            
            if self.player == 1:
                best_value = max(values)
            else:
                best_value = min(values)
            actions = [action for action, value in zip(actions, values) if value==best_value]
        return actions        
    
    def update_policy(self):
        self.policy = self.get_policy()
    
    def get_samples(self, horizon, epsilon):
        """Get samples from one episode under the epsilon-greedy policy."""
        self.policy = self.randomize_policy(epsilon=epsilon)
        self.model.reset()
        state = self.model.state
        states = []
        rewards = []
        for t in range(horizon):
            action = self.get_action(state)
            reward, stop = self.model.step(action)
            states.append(state)
            rewards.append(reward)
            if stop:
                break
            state = self.model.state
        gains = []
        gain = 0
        for reward in reversed(rewards):
            gain = reward + self.gamma * gain
            gains.append(gain)
        return reversed(states), gains
        
    def train(self, horizon=100, n_episodes=1000, learning_rate=0.01, epsilon=0.1):
        """Train the neural network with samples drawn from the epsilon-greedy policy."""
        optimizer = optim.Adam(self.nn.parameters(), lr=learning_rate)
        for t in range(n_episodes):
            states, gains = self.get_samples(horizon, epsilon)
            codes = [self.model.one_hot_encode(state) for state in states]
            codes = np.array(codes)
            codes = torch.tensor(codes).float()
            values = self.nn.forward(codes)
            gains = torch.tensor(gains).float().reshape(-1, 1)
            mse = nn.MSELoss()
            loss = mse(values, gains)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

TicTacToe, Random (Without training)

In [None]:
game = TicTacToe()
agent = ValueGradient(game)
gains = agent.get_gains(n_runs=1000)
np.unique(gains, return_counts=True), np.mean(gains)

TicTacToe, Random

In [None]:
game = TicTacToe()
agent = ValueGradient(game)
agent.train(n_episodes=1000)
agent.update_policy()
gains = agent.get_gains(n_runs=1000)
np.unique(gains, return_counts=True), np.mean(gains)

TicTacToe, Perfect

In [None]:
algo_optimal = ValueIteration(game)
_, adversary_policy = algo_optimal.get_perfect_players()
game = TicTacToe(adversary_policy=adversary_policy)
agent = ValueGradient(game)
agent.train(n_episodes=6000)
agent.update_policy()
gains = agent.get_gains(n_runs=1000)
np.unique(gains, return_counts=True), np.mean(gains)

ConnectFour, Random (Without training)

In [None]:
game = ConnectFour()
agent = ValueGradient(game)
gains = agent.get_gains(n_runs=100)
np.unique(gains, return_counts=True), np.mean(gains)

ConnectFour, Random

In [None]:
game = ConnectFour()
agent = ValueGradient(game)
agent.train(n_episodes=100)
agent.update_policy()
gains = agent.get_gains(n_runs=100)
np.unique(gains, return_counts=True), np.mean(gains)

ConnectFour, One-step

In [None]:
game = ConnectFour(adversary_policy='one_step')
agent = ValueGradient(game)
agent.train(n_episodes=100)
agent.update_policy()
gains = agent.get_gains(n_runs=100)
np.unique(gains, return_counts=True), np.mean(gains)

Comparison between Value Gradient & MC

Because the training is slow, you can change the value for the two commented lines for quick review

In [None]:
algo_optimal = ValueIteration(game)
_, adversary_policy = algo_optimal.get_perfect_players()
game = TicTacToe(adversary_policy=adversary_policy)
agent_vg = ValueGradient(game)
algo = MCLearning(game, policy='random')
policy = algo.get_policy()
agent_mc = Agent(game, policy)
iters = [0]
gains_mc = [np.mean(agent_mc.get_gains(n_runs=100))]
gains_vg = [np.mean(agent_vg.get_gains(n_runs=100))]
n_iter = 30 # change this value
n_games_per_iter = 100 # change this value
for iter in tqdm(range(n_iter)):
    iters.append((iter+1)*n_games_per_iter)
    for t in range(n_games_per_iter):
        algo.update_values(state='random')
    policy = algo.get_policy()
    agent_mc = Agent(game, policy)
    gains_mc.append(np.mean(agent_mc.get_gains(n_runs=100)))
    agent_vg.train(n_episodes=n_games_per_iter)
    agent_vg.update_policy()
    gains_vg.append(np.mean(agent_vg.get_gains(n_runs=100)))

plt.plot(iters, gains_mc, '-o', label='Monte-Carlo')
plt.plot(iters, gains_vg, '-o', label='Value Gradient')
plt.xlabel('Games')
plt.ylabel('Gains')
plt.ylim(-1.1, 0.1)
plt.legend()
plt.show()

The value gradient method works, but it converges slower than MC. It's probably because value gradient method relies on an accurate estimation of the gradient direction, and when facing perfect player, the sparsity and instability of the gradient signal can affect the optimization efficiency of the value function.

## Policy gradient

We now consider a policy-based method. The neural network is a classifier that approximates the optimal policy. It returns the probability of each action.

In [5]:
class Classifier(nn.Module):
    """Neural network for policy gradient. Return the distribution of actions in each state."""
    def __init__(self, model, hidden_size):
        if not hasattr(model, 'one_hot_encode'):
            raise ValueError("The environment must have a one-hot encoding of states.")   
        super(Classifier, self).__init__()
        self.model = model
        actions = model.get_all_actions()
        if self.model.is_game():
            # remove action when passing
            actions.pop()
        self.actions = actions
        state = model.init_state()
        code = model.one_hot_encode(state)
        input_size = len(code)
        output_size = len(actions)
        self.nn = nn.Sequential(
            nn.Linear(input_size, hidden_size), 
            nn.GELU(), 
            nn.Linear(hidden_size, output_size), 
            nn.Softmax(dim=0))

    def forward(self, code):
        """Forward pass."""
        return self.nn(code)

In [48]:
game = TicTacToe()

In [49]:
classifier = Classifier(model=game, hidden_size=100)

In [50]:
state = game.state
# one-hot encoding
code = game.one_hot_encode(state)
# tensor
code = torch.tensor(code).float()

In [51]:
probs = classifier.forward(code).detach()

In [None]:
print(probs)

In [None]:
len(probs)

In [None]:
torch.sum(probs)

## To do

* Complete the method 'train' of the class PolicyGradient. Observe that a penalty is assigned for illegal actions.
* Test the agent on TicTacToe, against (1) a random adversary and (2) a perfect adversary.
* Test the agent on ConnectFour, against (1) a random adversary and (2) an adversary with the one-step policy.
* Compare your results to another learning strategy (e.g., Monte-Carlo learning) and interpret the results.
* (bonus) Try to improve policy gradient on TicTacToe with a perfect adversary.

In [6]:
class PolicyGradient(Agent):
    """Agent learning by policy gradient.
    
    Parameters
    ----------
    model : object of class Environment
        Model.a
    player : int
        Player for games (1 or -1, default = default player of the game).
    gamma : float
        Discount rate (in [0, 1], default = 1).
    hidden_size : int
        Size of the hidden layer (default = 100).
    penalty : float
        Penalty for illegal actions (default = -5).
    min_log : float
        Minimal value to compute the log (default = 1e-10)
    """
    def __init__(self, model, player=None, gamma=1, hidden_size=100, penalty=-1, min_log=1e-10):
        super(PolicyGradient, self).__init__(model, player=player)  
        self.nn = Classifier(model, hidden_size)
        self.action_id = {action: i for i, action in enumerate(self.nn.actions)}
        self.gamma = gamma
        self.penalty = penalty
        self.min_log = min_log
        
    def get_policy(self):
        """Get the current policy."""
        def policy(state):
            actions = self.model.get_actions(state)
            if len(actions) > 1:
                win_actions = []
                # check win actions
                if self.model.is_game():
                    next_states = [self.model.get_next_state(state, action) for action in actions]
                    win_actions = [self.model.get_reward(next_state) == self.player for next_state in next_states]
                if any(win_actions):
                    probs = np.array(win_actions).astype(float)
                else:
                    # get prob of each action
                    code = self.model.one_hot_encode(state)
                    code = torch.tensor(code).float()
                    probs = self.nn.forward(code)
                    probs = probs.detach().numpy()
                    # restrict to available actions
                    indices = [self.action_id[action] for action in actions]
                    probs = probs[indices]                    
                # renormalize
                if np.sum(probs) > 0:
                    probs = probs / np.sum(probs)
                else:
                    probs = np.ones(len(actions)) / len(actions)
            else:
                probs = [1]
            return probs, actions
        return policy
    
    def update_policy(self):
        """Update the policy."""
        self.policy = self.get_policy()
    
    def get_samples(self, horizon):
        """Get samples from one episode."""
        self.update_policy()
        rewards = []
        log_probs = []
        log_probs_illegal = []
        self.model.reset()
        state = self.model.state
        for t in range(horizon):
            action = self.get_action(state)
            if action is not None:
                i = self.action_id[action]
                code = self.model.one_hot_encode(state)
                code = torch.tensor(code).float()
                probs = self.nn.forward(code)
                prob = torch.clip(probs[i], self.min_log, 1 - self.min_log)
                log_prob = torch.log(prob).reshape(1)

                actions = self.model.get_actions(state)
                if action in actions:
                    reward, stop = self.model.step(action)
                    state = self.model.state
                    rewards.append(reward)
                    log_probs.append(log_prob)
                else:
                    log_probs_illegal.append(log_prob)
            else:
                reward, stop = self.model.step(action)
                rewards.append(reward)
                state = self.model.state
            if stop:
                break
        gain = 0
        for reward in reversed(rewards):
            gain = reward + self.gamma * gain
        return gain, log_probs, log_probs_illegal
        
    def train(self, horizon=100, n_episodes=1000, learning_rate=0.01):
        """Train the neural network."""
        optimizer = optim.Adam(self.nn.parameters(), lr=learning_rate)
        accumulated_gain = 0
        for t in range(n_episodes):
            gain, log_probs, log_probs_illegal = self.get_samples(horizon)
            accumulated_gain += gain
            loss = 0
            if len(log_probs):
                # to be modified
                for log_prob in log_probs:
                    loss += -log_prob * (gain - accumulated_gain / (t + 1))
            if len(log_probs_illegal):
                for log_prob in log_probs_illegal:
                    loss += self.penalty * log_prob
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()


TicTacToe, Random

In [None]:
game = TicTacToe()
agent = PolicyGradient(game)
agent.train(n_episodes=2000)
agent.update_policy()
gains = agent.get_gains(n_runs=100)
np.unique(gains, return_counts=True), np.mean(gains)

TicTacToe, Perfect

In [None]:
algo_optimal = ValueIteration(game)
_, adversary_policy = algo_optimal.get_perfect_players()
game = TicTacToe(adversary_policy=adversary_policy)
agent = PolicyGradient(game)
agent.train(n_episodes=6000)
agent.update_policy()
gains = agent.get_gains(n_runs=100)
np.unique(gains, return_counts=True), np.mean(gains)

ConnectFour, Random

In [None]:
game = ConnectFour()
agent = PolicyGradient(game)
agent.train(n_episodes=1000)
agent.update_policy()
gains = agent.get_gains(n_runs=100)
np.unique(gains, return_counts=True), np.mean(gains)

ConnectFour, One-step

In [None]:
game = ConnectFour(adversary_policy='one_step')
agent = PolicyGradient(game)
agent.train(n_episodes=2000)
agent.update_policy()
gains = agent.get_gains(n_runs=100)
np.unique(gains, return_counts=True), np.mean(gains)

Comparison between Policy Gradient & MC

Because the training is slow, you can change the value for the two commented lines for quick review

In [None]:
algo_optimal = ValueIteration(game)
_, adversary_policy = algo_optimal.get_perfect_players()
game = TicTacToe(adversary_policy=adversary_policy)
agent_pg = PolicyGradient(game)
algo = MCLearning(game, policy='random')
policy = algo.get_policy()
agent_mc = Agent(game, policy)
iters = [0]
gains_mc = [np.mean(agent_mc.get_gains(n_runs=100))]
gains_pg = [np.mean(agent_pg.get_gains(n_runs=100))]
n_iter = 30 # change this value
n_games_per_iter = 100 # change this value
for iter in tqdm(range(n_iter)):
    iters.append((iter+1)*n_games_per_iter)
    for t in range(n_games_per_iter):
        algo.update_values(state='random')
    policy = algo.get_policy()
    agent_mc = Agent(game, policy)
    gains_mc.append(np.mean(agent_mc.get_gains(n_runs=100)))
    agent_pg.train(n_episodes=n_games_per_iter)
    agent_pg.update_policy()
    gains_pg.append(np.mean(agent_pg.get_gains(n_runs=100)))

plt.plot(iters, gains_mc, '-o', label='Monte-Carlo')
plt.plot(iters, gains_pg, '-o', label='Policy Gradient')
plt.xlabel('Games')
plt.ylabel('Gains')
plt.ylim(-1.1, 0.1)
plt.legend()
plt.show()

After optimization, it's better than before and better than MC. In fact, in the previous policy gradient method, there is no explicit use of the value function, possessing an over-reliance on feedback from strategy execution when facing perfect opponents, therefore leading to inefficient learning. But after optimizing with centering gains, it works better because instead of stopping learning because of the low absolute value of the overall reward, it focuses more on the relative differences in rewards and stay motivated to improve the performance.