# IA318 - Reinforcement Learning

# Dynamic programming

This notebook presents techniques of dynamic programming (**policy iteration, value iteration**) for Tic-Tac-Toe and Nim.

Do **not** attempt to apply this to Connect Four as the number of states is huge ($> 10^{12}$).

In [1]:
import numpy as np

In [2]:
from scipy import sparse

In [3]:
from model import TicTacToe, Nim, Agent

In [4]:
from display import display_board

In [5]:
# we need to encode states to form sets of states (not allowed with numpy arrays)
def encode(state):
    player, board = state
    if player == 1:
        return b'\x01' + board.tostring()
    else:
        return b'\x00' + board.tostring()

In [6]:
def decode(state_byte, shape):
    player_byte = state_byte[0]
    if player_byte:
        player = 1
    else:
        player = -1
    board_byte = state_byte[1:]
    board = np.frombuffer(board_byte, dtype=int).reshape(shape)
    return player, board

In [7]:
class Evaluation:
    """Evaluation of a policy for a game.
    """
    def __init__(self, Game, first_player=True, policy_player=None, policy_adversary=None, n_iter=10):
        self.set_game(Game, first_player)
        self.get_policies(policy_player, policy_adversary)
        self.get_states()
        self.state_id = {encode(state): i for i, state in enumerate(self.states)}
        self.get_transition_matrix()
        self.get_rewards()
        self.get_values(n_iter)

    def set_game(self, Game, first_player):
        self.game = Game(first_player)
        self.state = self.game.state
        self.shape = self.state[1].shape
        
    def get_policies(self, policy_player, policy_adversary):
        self.policy_player = Agent(self.game, policy_player).policy
        self.policy_adversary = Agent(self.game, policy_adversary).policy
        
    def get_transitions(self, state):
        probs = []
        states = []
        if not self.game.is_terminal(state):
            player, board = state
            if player == 1:
                policy = self.policy_player
            else:
                policy = self.policy_adversary
            for prob, action in zip(*policy(state)):
                probs_, states_ = self.game.get_transition(state, action)
                probs += list(prob * np.array(probs_))
                states += states_
        return probs, states
    
    def get_states(self):   
        state_set = set()
        state = self.state
        state_set.add(encode(state))
        state_explore = list(state_set)
        while len(state_explore):
            state_byte = state_explore.pop()
            state = decode(state_byte, self.shape)
            _, states = self.get_transitions(state)
            if len(states):
                state_add_set = {encode(state) for state in states}
                state_explore += list(state_add_set - state_set)
                state_set |= state_add_set
        self.states = [decode(state_byte, self.shape) for state_byte in state_set]

    def get_transition_matrix(self):    
        n = len(self.states)
        row = []
        col = []
        data = []
        for i, state in enumerate(self.states):
            probs, states = self.get_transitions(state)
            indices = [self.state_id[encode(state_)] for state_ in states]
            row += len(indices) * [i]
            col += indices
            data += probs
        self.transition = sparse.csr_matrix((data, (row, col)), shape=(n, n))

    def get_rewards(self):
        self.rewards = [self.game.get_reward(state) for state in self.states]
    
    def get_values(self, n_iter):
        n = len(self.states)
        values = np.zeros(n)
        rewards = np.array(self.rewards)
        for t in range(n_iter):
            values = self.transition.dot(rewards + values)
        self.values = values

In [9]:
evaluation = Evaluation(Nim)

  """
  import sys


In [10]:
len(evaluation.states)

752

In [11]:
len(evaluation.values)

752

In [12]:
evaluation = Evaluation(TicTacToe)

  """
  import sys


In [13]:
len(evaluation.states)

5478

In [14]:
len(evaluation.values)

5478

## To do

Find the optimal way to play each game using policy iteration or value iteration!

In [21]:
def improve_policy(evaluation):
    n, m = evaluation.shape
    values = evaluation.values
    rewards = evaluation.rewards
    gamma = evaluation.gamma
    best_actions = []
    for i in range(n * m):
        state = np.array([i // m, i % m])
        if maze.is_valid(state):
            actions = maze.get_actions(state)
            rewards_actions = []
            for action in actions:
                probs, states = maze.get_transition(state, action)
                indices = [state_[0] * m + state_[1] for state_ in states]
                rewards_actions.append(np.sum(np.array(probs) * (rewards + gamma * values)[indices]))
            best_actions.append(actions[np.argmax(rewards_actions)])
        else:
            best_actions.append((0, 0))
    # policy(state) -> probs, actions
    policy = lambda state: [[1], [best_actions[state[0] * m + state[1]]]]
    return policy