# MDPs

A continuación se presentan los entornos MDPs (Markov Decision Process) del ajedrez como del juego de gato. El juego de gato está compuesto por dos clases, una del jugador aleatorio y otra del "tablero" de gato.

## Gato

La primera celda contiene el código de la clase de tablero y de jugador aleatorio. Se puede ver un ejemplo de juego entre dos jugadores aleatorios en la segunda celda.

In [1]:
import random
import numpy as np

class tictactoe:

    def __init__(self):
        self.tablero = np.zeros(9, dtype = int)        

    def reset(self):
        self.tablero = np.zeros(9, dtype = int)

    def valid_space(self, pos):
        if self.tablero[pos] == 0:
            return True
        return False

    def place_move(self, pos, value):
        self.tablero[pos] = value

    def horizontal_win(self):
        for i in range(3):
            if self.tablero[0+(3*i)] == self.tablero[1+(3*i)] == self.tablero[2+(3*i)] and self.tablero[0+(3*i)] != 0:
                return True
        return False

    def vertical_win(self):
        for i in range(3):
            if self.tablero[i] == self.tablero[i+3] == self.tablero[i+6] and self.tablero[i] != 0:
                return True
        return False

    def diagonal_win(self):
        diag1 = (self.tablero[0] == self.tablero[4] == self.tablero[8] and self.tablero[0] != 0)
        diag2 = (self.tablero[2] == self.tablero[4] == self.tablero[6] and self.tablero[2] != 0)
        if diag1 or diag2:
            return True
        return False

    def check_win(self):
        if self.horizontal_win() or self.vertical_win() or self.diagonal_win():
            return True
        return False

    def is_space(self):
        for _ in self.tablero:
            if _ == 0:
                return True
        return False

    def step(self, player):
        player.play_tile(self)
        if self.check_win():
            return 1
        else:
            return 0

class random_player:
    
    def __init__(self, value):
        self.value = value
        self.reward = 0

    def play_tile(self, ttt):
        """
        Realiza un tiro aleatorio, siempre y cuando la posición sea válida.
        """
        if ttt.is_space():
            while True:
                random = np.random.randint(9)
                if ttt.valid_space(random):
                    ttt.place_move(random, self.value)
                    break


def play_loop(x, o, tic, iteraciones):
    """
    Loop del juego. Se detiene cuando alguno de los dos jugadores gana, o cuando ya no hay espacio disponible.

    x, o = player() ; instancias de la clase de jugadores
    tic = tictactoe() ; instancia de la clase de tableros.
    """
    x_score = 0
    o_score = 0
    for i in range(iteraciones):
        tic.reset()
        print(f'Juego {i+1}')
        while(True):
            rewardx = tic.step(x)
            if tic.check_win():
                print('Gana X')
                x.reward = rewardx
                o.reward = -1
                break
            rewardy = tic.step(o)
            if tic.check_win():
                print('Gana O')
                o.reward = rewardy
                x.reward = -1
                break
            if not tic.is_space():
                print('Gato')
                x.reward = o.reward = 0
                break
        x_score += x.reward # Dependiendo de lo que necesitemos se puede modificar este valor para tener una recompensa total.
        o_score += o.reward
        print(tic.tablero[:3])
        print(tic.tablero[3:6])
        print(tic.tablero[6:])
        print('------')
    print(f'Puntuaciones finales: X = {x_score}. O = {o_score}')

In [2]:
a = tictactoe()
xp = random_player(1)
op = random_player(2)
play_loop(xp, op, a, 10)

Juego 1
Gana O
[2 2 2]
[2 1 1]
[1 1 0]
------
Juego 2
Gana O
[1 1 0]
[2 2 2]
[0 1 0]
------
Juego 3
Gana X
[2 2 1]
[1 2 2]
[1 1 1]
------
Juego 4
Gato
[1 1 2]
[2 1 1]
[1 2 2]
------
Juego 5
Gana O
[0 2 0]
[1 2 0]
[1 2 1]
------
Juego 6
Gana X
[2 2 1]
[1 1 0]
[1 0 2]
------
Juego 7
Gana O
[1 0 2]
[1 2 2]
[2 1 1]
------
Juego 8
Gana X
[2 1 2]
[1 1 1]
[1 2 2]
------
Juego 9
Gato
[1 2 1]
[1 1 2]
[2 1 2]
------
Juego 10
Gana X
[1 1 1]
[2 0 2]
[2 0 1]
------
Puntuaciones finales: X = 0. O = 0


## FrozenLake

Al ser un entorno del gymnasium podemos acceder fácilmente a las distintas propiedades del entorno sin tener que programar tanto como en el caso del gato. Se presentan dos formas de calcular las funciones de valor y las funciones de estado, ambas siguen el mismo formato del algoritmo visto dentro del libro, no obstante una se presenta dentro de la clase de un agente, mientras que otra se presenta como función aislada. (Algo de la implementación interna del agente no funciona bien, nos da resultados distintos a lo de la implementación externa).

In [3]:
import gymnasium as gym
import numpy as np
from gymnasium.wrappers import RecordEpisodeStatistics
from collections import defaultdict

In [4]:
fl_env = gym.make("FrozenLake-v1", render_mode = "human", is_slippery = True)
fl_env = fl_env.unwrapped
obs, info = fl_env.reset()

fl_env.close()

In [5]:
# La dinámica del ambiente está dado por
fl_env.P

{0: {0: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 4, 0.0, False)],
  1: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 4, 0.0, False),
   (0.3333333333333333, 1, 0.0, False)],
  2: [(0.3333333333333333, 4, 0.0, False),
   (0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)],
  3: [(0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)]},
 1: {0: [(0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 5, 0.0, True)],
  1: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 5, 0.0, True),
   (0.3333333333333333, 2, 0.0, False)],
  2: [(0.3333333333333333, 5, 0.0, True),
   (0.3333333333333333, 2, 0.0, False),
   (0.3333333333333333, 1, 0.0, False)],
  3: [(0.3333333333333333, 2, 0.0, False),
   (0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)]},
 2:

In [6]:
class Elf:
    def __init__(self):
        self.env = fl_env
        self.state_val = np.zeros(self.env.observation_space.n)
        self.policy = np.zeros(self.env.observation_space.n)
        self.gamma = 0.95
        self.delta = 0.003

    def state_value(self):
        """
        Evaluación de la política
        """
        while True:
            update = np.copy(self.state_val)
            for s in range(self.env.observation_space.n):
                action = self.policy[s]
                self.state_val[s] = sum([t_prob * (r_prob + self.gamma * update[next_s])
                                          for t_prob, next_s, r_prob, _ in fl_env.P[s][action]])

            if (np.sum(np.fabs(update-self.state_val)) <= self.delta):
                break
        return self.state_val

    def new_pol(self):
        """
        Mejora de política
        """
        for s in range(self.env.observation_space.n):
            q = np.zeros(self.env.observation_space.n)
            for action in range(self.env.action_space.n):
                for next_sr in self.env.P[s][action]:
                    t_prob, next_s, r_prob, _ = next_sr
                    q[action] += (t_prob * (r_prob + self.gamma * self.state_val[next_s]))
                    
            self.policy[s] = np.argmax(q)
        return self.policy 

    def policy_iteration(self, eps):
        """
        Algoritmo completo
        """
        policy = self.policy
        for i in range(eps):
            f_value = self.state_value()
            self.policy = self.new_pol()

            if (np.all(policy == self.policy)):
                break
            policy = self.policy
        return self.policy, f_value

In [7]:
elfo = Elf()

elf_pol, elf_value = elfo.policy_iteration(100)
print(elf_pol, elf_value)

[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.] [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]


#### Iteración de Política

In [8]:
# Versión fuera de la clase del agente.
def state_value(pi, gamma):
    """
    Evaluación de la política
    """
    value_function = np.zeros(fl_env.observation_space.n)
    delta = 0.003

    while True:
        update = np.copy(value_function)
        for state in range(fl_env.observation_space.n):
            action = pi[state]
            value_function[state] = sum([t_prob * (r_prob + gamma * update[next_state])
                                        for t_prob, next_state, r_prob, _ in fl_env.P[state][action]])

        if (np.sum(np.fabs(update - value_function)) <= delta):
            #print("Convergencia")
            break
    return value_function


def nueva_pi(value_function, gamma):
    """
    Mejora de política
    """
    pi = np.zeros(fl_env.observation_space.n)

    for state in range(fl_env.observation_space.n):
        q = np.zeros(fl_env.action_space.n)
        for action in range(fl_env.action_space.n):
            for next_sr in fl_env.P[state][action]:
                t_prob, next_state, r_prob, _ = next_sr
                q[action] += (t_prob * (r_prob + gamma * value_function[next_state]))

        pi[state] = np.argmax(q)
    return pi


def policy_iteration(env, gamma):
    """
    Algoritmo completo
    """
    pi = np.zeros(env.observation_space.n)
    episodios = 100

    for i in range(episodios):
        valor_f = state_value(pi, gamma)
        new_pi = nueva_pi(valor_f, gamma)

        if (np.all(pi == new_pi)):
            break
        pi = new_pi
    return new_pi, valor_f

In [9]:
policy, value = policy_iteration(fl_env, 0.95)
print(f"Política: {policy}. \n La función de valor: {value}")

Política: [0. 3. 0. 3. 0. 0. 0. 0. 3. 1. 0. 0. 0. 2. 1. 0.]. 
 La función de valor: [0.17546028 0.15051032 0.15038052 0.12914343 0.20441015 0.
 0.17476461 0.         0.26676807 0.37216449 0.40192189 0.
 0.         0.50718474 0.72273331 0.        ]


#### Iteración de Valor

In [10]:
def value_iteration(env, gamma = 0.95):
    """
    Algoritmo completo
    """
    
    # Inicio
    value_table = np.zeros(env.observation_space.n)
    episodios = 100
    umbral = 0.00005
    
    for i in range(episodios):
        updated_value_table = np.copy(value_table) 
        
        # Cálculo de Q y reajuste máximo        
        for state in range(env.observation_space.n):
            Q_value = []
            for action in range(env.action_space.n):
                next_states_rewards = []
                for next_sr in env.P[state][action]: 
                    trans_prob, next_state, reward_prob, _ = next_sr 
                    next_states_rewards.append((trans_prob * (reward_prob + gamma * updated_value_table[next_state]))) 
                
                Q_value.append(np.sum(next_states_rewards))
                
            value_table[state] = max(Q_value) 
            
        if (np.sum(np.fabs(updated_value_table - value_table)) <= umbral):
             #print(f'¡Convergencia! Iteración: {i+1}')
             break
    
    return value_table


def extract_policy(value_table, gamma = 1.0):
    """
    Obtención de política
    """
    policy = np.zeros(fl_env.observation_space.n) 
    for state in range(fl_env.observation_space.n):
        
        # Inicio de Q en cero
        Q_table = np.zeros(fl_env.action_space.n)
        
        # Cálculo de Q
        for action in range(fl_env.action_space.n):
            for next_sr in fl_env.P[state][action]: 
                trans_prob, next_state, reward_prob, _ = next_sr 
                Q_table[action] += (trans_prob * (reward_prob + gamma * value_table[next_state]))

        policy[state] = np.argmax(Q_table)
    
    return policy

In [11]:
opt_val = value_iteration(fl_env, 0.95)
policy = extract_policy(opt_val, 0.95)
print(f"Política: {policy}. \n Value_f: {opt_val}")

Política: [0. 3. 0. 3. 0. 0. 0. 0. 3. 1. 0. 0. 0. 2. 1. 0.]. 
 Value_f: [0.18035745 0.15466    0.15340661 0.13247087 0.20886331 0.
 0.17639284 0.         0.27037339 0.37459489 0.40363285 0.
 0.         0.50893908 0.72365223 0.        ]


## Comentarios:

Los algoritmos con la implementación del gym son bastante sencillos para implementar ya que el gym mismo te regresa todos los valores necesarios, dígase estados siguientes, recompensa etc. No obstante no logré encontrar una forma de representar el gato de tal forma que me regresara los valores de la misma manera, una opción que exploré (un poco tarde) fue representar el gato como entorno del gymnaisum mismo:

In [12]:
import gymnasium as gym

space = gym.spaces.Box(low = 0, high = 2, shape = (3,3), dtype = int)
acciones = gym.spaces.Discrete(9)
aux = space.sample()
act = acciones.sample()
print(aux, act)

[[1 1 0]
 [1 1 0]
 [0 1 1]] 7


**¿Ese acercamiento es más factible que la representación desde cero?**