<a href="https://colab.research.google.com/github/Jorayala/AI_Machine_Learning_2024/blob/main/policy_car.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Solución de MDPs por medio de iteración de políticas

Para este laboratorio utilizaremos el método de iteración de políticas para solucionar el problema del motor del vehículo definido en el laboratorio de MDPs.

Para ello utilizaremos el ambiente definido de los motores y construiremos el método de iteración de políticas para resolverlo.

## Ambiente

Para recordar la definición del ambiente se realiza en la clase `MDP`. Esta clase recibe como parámetro una codificación de los estados, la dimensión del MDP dada por la cantidad de estados en el ambiente (dados como una tupla) y el estado inicial.

La codificación del ambiente se define como una tabla de tamaño `estados x acciones` que para cada uno de los estados (filas) contiene una lista de tuplas con la información de la probabilidad asociada a la acción de la columna, el estado de llegada de ejecutar la acción y la recompensa asociada a la ejecución de la acción. La codificación de la tabla se muestra a continuación, donde los estados son `cold=0`, `warm=1` y `overheated=2`.

![Encoding_table](table.png)

Las dimensiones del tablero se almacenan en los astributos `nrows` y `ncols` de la clase `MDP`.
Los valores codificados en el tablero se almacenarán en dos atributos de la clase, un atributo `transitions` que mantiene la probabilidad de transición de un estado a otro dada una acción como una lista de tuplas `(probability, state)` y un atributo `rewards` que mantiene la información las recompensas obtenida en el paso entre estados definido como una lista `(reward, state)`.

Adicionalmente la clase tiene los atributos `initial_state` y `state` que corresponden al estado inicial, dado por parámetro, y el estado actual del agente en el ambiente, inicializado en el estado inicial.

Note también que el estado `overheated` no tienen ninguna acción asociada a él y por lo tanto lo consideramos como un estado final.

In [1]:
import random

class MDP:
    def __init__(self, table, dimensions, initial_state):
        self.states = ['cold', 'warm', 'overheated']
        #rows = estados, cols = acciones
        self.nrows, self.ncols = dimensions
        self.grid = table.copy()
        self.initial_state = initial_state
        self.state = initial_state
        self.actions = ['slow', 'fast']
        self.rewards = []
        self.transitions = []

        for i in range(self.nrows - 1):
            row_rewards = []
            row_transitions = []
            for j in range(self.ncols):
                transitions = self.grid[i][j]
                col_rewards = []
                col_transitions = []

                for k in range(len(transitions)):
                    prob, state, reward = transitions[k]
                    col_rewards.append((reward, state))
                    col_transitions.append((prob, state))

                row_rewards.append(col_rewards)
                row_transitions.append(col_transitions)

            self.rewards.append(row_rewards)
            self.transitions.append(row_transitions)


    def get_current_state(self):
        return self.state

    def get_possible_actions(self, state):

        if state == 2:
            return []

        return self.actions

    def get_possible_states(self, state, action):


        probabilities_list = []
        rewards_list = []
        states_list = []

        if state == 2:
            return probabilities_list, rewards_list, states_list

        action_i = self.actions.index(action)

        transitions_for_action  = self.transitions[state][action_i]
        rewards_for_action = self.rewards[state][action_i]

        for i in range(len(transitions_for_action)):

            prob, state = transitions_for_action[i]
            reward, _ = rewards_for_action[i]
            probabilities_list.append(prob)
            states_list.append(state)
            rewards_list.append(reward)

        return probabilities_list, rewards_list, states_list

    def do_action(self, action):
        if self.is_terminal():
            raise SyntaxError('Cannot reach actions from state overheated')

        state_i = self.state
        action_i = self.actions.index(action)
        tr_st = self.transitions[state_i][action_i]
        rw_st = self.rewards[state_i][action_i]
        sel_event = -1

        if len(tr_st) > 1:
            rn = random.uniform(0, 1)
            acc_prob = 0
            for i in range(len(tr_st)):
                p, s = tr_st[i]
                acc_prob += p

                if rn <= p:
                    sel_event = i

        r, s = rw_st[sel_event]

        self.state = s

        return r, self.state

    def reset(self):
        self.state = self.initial_state

    def is_terminal(self):

        if self.state == 2:
            return True

        if len(self.get_possible_actions(self.state)) == 0:
            return True

        else:
            return False

## Creación del agente

#### 1. definición de la clase de iteración de políticas

Implemente la clase `PolicyIteration` que define cinco atributos:

- `mdp` que corresponde al MDP a resolver
- `discount` que corresponde al factor de decuento a utilizar.
- `iterations` que corresponde a el número de iteraciones a realizar
- `values` que corresponde a un mapa con los valores calculados para los estados del MDP. Los estados se definen como una tupla de los valores posibles del estado. En el caso de Gridworld, la tupla es la posición de cada casilla. Inicialmente definiremos el mapa como un mapa vacío, el cual poblaremos con los estados descubiertos durante cada iteración.
- `policy` que corresponde a un mapa con los nombres de las acciones a ejecutar para cada uno de los estados del ambiente. La política se debe inicializar con una acción aleatoria para cada estado. Si el estado no tiene acciones posibles, su política se debe inicializar en `None`.

Al momento de crear la clase `PolicyIteration`, esta debe recibir por parámetro, la definición del MDP `MDP`, el valor de descuento `discount` (inicializado por defecto en 0.9 si no se pasa ningún valor) y la cantidad de iteraciones a ejecutar `iterations` (con un valor de 30 por defecto, si no se pasa ningún valor).

In [2]:
# Definición de las librerías a utilizar

class PolicyIteration():
    def __init__(self, mdp: MDP, discount: float = 0.9, iterations: int = 64):
        self.mdp = mdp
        self.discount = discount
        self.iterations = iterations
        self.last_values = {}
        self.values = {}  # {(i,j): 0 for i in range(self.mdp.nrows) for j in range(self.mdp.ncols)}
        self.policy = {}
        self.last_policy = {}
        for i in range(mdp.nrows):

            state = i
            actions = self.mdp.get_possible_actions(state)
            if not actions:
                self.policy[state] = None
            else:
                self.policy[state] = random.choice(actions)

    def set_policy(self, policy: list[str]):
        # Función de actualización de la política usada para las pruebas
        self.policy = policy

    def get_policy(self, state: int) -> str:
        # your code here
        return self.policy[state]

    def get_value(self, state: int) -> float:
        # your code here
        if self.values.get(state) == None:
            self.values[state] = 0
        return self.values[state]

    def get_action(self, state: int) -> str:
        # your code here
        return self.policy[state]

    def compute_new_action(self, state: int) -> str:
        # your code here
        best_action = self.policy[state]
        best_value = self.get_value(state)
        max_value, max_action = self.compute_new_value(state)
        if max_value > best_value:
            best_value = max_value
            best_action = max_action

        return best_action

    def value_evaluation(self, state: int, action: str) -> float:
        # your code here
        if state != 2:
            probs, rewards, states = self.mdp.get_possible_states(state, action)
            val = 0
            for i in range(len(probs)):
                val += probs[i] * (rewards[i] + self.discount * self.get_value(states[i]))
            return val
        else:
            return 0

    def compute_new_value(self, state: int) -> tuple[float, str]:
        # your code here
        if state != 2:
            max_val = float('-inf')
            action = ""
            for a in self.mdp.get_possible_actions(state):
                probs, rewards, states = self.mdp.get_possible_states(state, a)
                val = 0
                for i in range(len(probs)):
                    val += probs[i] * (rewards[i] + self.discount * self.get_value(states[i]))
                if val >= max_val:
                    max_val = val
                    action = a
            return max_val, action
        else:
            return 0, self.policy[state]

    def policy_evaluation(self, new_policy: dict[int, str]) -> bool:
        # your code here
        print(new_policy)

        improvement = False
        for state in range(len(self.mdp.states)):
            print(self.policy[state])
            if self.policy[state] != new_policy[state]:
                improvement = True
                return improvement
        return improvement

    def policy_iteration(self) -> tuple[bool, dict[int, str]]:
        # your code here
        new_values = {}
        for state in range(len(self.mdp.states)):
            new_values[state] = self.get_value(state)
            new_values[state] = self.value_evaluation(state, self.policy[state])
        self.values = new_values
        # Mejora
        new_policy = {}
        for state in range(len(self.mdp.states)):
            new_policy[state] = self.policy[state]
            new_policy[state] = self.compute_new_action(state)
        if self.policy_evaluation(new_policy):
            # print("Convergencia en la iteracion:",it)
            self.policy = new_policy
            return False, self.policy
        return True, self.policy

    def run_policy_iteration(self) -> tuple[dict[float], dict[str]]:
        done = False
        i = 1
        policy = {}
        while not done and i <= self.iterations:
            done, policy = self.policy_iteration()
            i += 1
        if done:
            print(f"La política converge en {i} iteraciones")
            print(f"Política : {policy}")
        return self.values, self.policy





In [3]:
# your code here
table = [[[(1, 0, 1)], [(0.5, 0, 2), (0.5, 1, 2)]],
         [[(0.5, 0, 1), (0.5, 1, 1)], [(1, 2, -10)]],
         [None, None]]

env = MDP(table, (3, 2), 0)



In [4]:
#Pruebas estructura del agente


a = PolicyIteration(env, 0.8, 10)

try:
    a.mdp
    assert type(a.mdp) is MDP, "El tipo del mdp debe ser MDP (el tipo de la clase)"
except:
    print("El atributo mdp no está definido")
try:
    a.discount
    assert type(a.discount) is float, "El tipo del factor de descuento debe ser float"
    assert 0 < a.discount and a.discount <=1, "El factor de descuento debe ser un valor entre 0 (excluido) y 1 (incluido)"
except:
    print("El atributo discount no está definido")
try:
    a.iterations
    assert type(a.iterations) is int, "El tipo de la cantidad de iteraciones debe ser entero"
except:
    print("El atributo iterations no está definido")

try:
    a.values
    assert type(a.values) is dict or type(a.values) is dict[int,float], "El tipo del mapa de valeres debe ser dict (el tipo de los mapas en python). El mapa puede estar isntanciado en su llave y valor si fue inicializado previamente"
except:
    print("El atributo values no está definido")


#### 2. Política del agente

Implemente la función `get_policy` que recibe un estado por parámetro y retorna el nombre de la acción (política) actual para dicho estado.

In [5]:
#Pruebas política

a = PolicyIteration(env, 0.8, 10)

try:
    a.get_policy
except:
    print("La función get_policy no está definida")

assert type(a.get_policy(1)) is str, "La función get_policy debe retornan un string con el nombre de la acción a ejecutar en el estado dado, o None si del estado no se puede ejecutar ninguna acción"


#### 3. Valores de estados

Defina la función `get_value` que recibe un estado (el entero representando el estado) como parámetro y retorna el valor correspondiente para dicho estado. Si el estado no ha sido visitado, la función debe retornar 0, el valor inicial para el estado.

In [6]:
#Pruebas de get_value

a = PolicyIteration(env, 0.8, 10)


try:
    a.get_value
except:
    print("La función get_value no está definida")

assert type(a.get_value(0)) is int or type(a.get_value(0)) is float, f"La función debe retornar un valor entero o un flotante"


#### 3. Evaluación de valores

Defina la función `value_evaluation` que tiene como proposito calcular el valor de un estado (sin modificar el atributo de valores), dada una acción.
Esta función recibe por parámetro un estado y una acción (el string del nombre) a ejecutar en el estado (entero en este caso) y retorna el valor calculado para dicho estado.

Notas:
- El valor por defecto de todos los estados debe ser 0.
- Para poder realizar este cálculo requerimos la nueva función del ambiente `get_possible_states`. La nueva función recibe el nombre de la acción a ejecutar para el estado actual del ambiente y retorna una lista con las probabilidades de transiciones entre los estados, una lista de recompensas y una lista de los estados de llegada del estado actual.

In [7]:
#Pruebas de value_evaluation

a = PolicyIteration(env, 0.8, 10)

try:
    a.value_evaluation
except:
    print("La función value_evaluation no está definida")

assert type(a.value_evaluation(0, 'fast')) is int or type(a.value_evaluation(0,'fast')) is float, f"La función value_evaluation debe retornar un valor de tipo entero o flotante, no {type(a.value_evaluation(0, 'fast'))}"

#### 4. Cálculo de valor

Defina la función `compute_new_value` que de forma similar a `value_evaluation` calcula el valor para el estado. La diferencia con la función anterior, es que `compute_new_value` calcula el valor máximo de todas las acciones posibles, dado un estado y no solo el valor de una única acción.
Esta función recibe un estado como parámetro y calcula el nuevo valor para el estado siguiendo la fórmula de iteración de políticas. Esta función calcula el nuevo valor para el estado actual del agente (`s`) a partir de las acciones, las recompensas y los valores de los posibles estados de llegada (`s'`) de acuerdo a la función de transición (o ruido), obteniendo el valor máximo de todas las posibles acciones en el estado de entrada.

Adicional al valor máximo, esta función debe retornar el nombre de la acción que nos lleva a ese máximo valor.

In [8]:
#Pruebas de compute_new_value

a = PolicyIteration(env, 0.8, 50)

try:
    a.compute_new_value
except:
    print("La función compute_new_value no está definida")

assert type(a.compute_new_value(0)) is tuple, "La función compute_new_value debe retornar una tupla (de tipo entero o flotante), no {type(a.compute_new_value(0))}"

#### 5. Acción a ejecutar

Defina la función `get_action` que retorna la acción a tomar para un estado dado por parámetro, de acuerdo a los valores de los estados aledaños. Esta función debe observar el posible valor a obtener para cada una de sus acciones posibles y retornar aquella acción (su nombre) que lleva al mejor valor.
Tenga en cuenta que la evaluación de las acciones no debe modificar el valor actual del estado, o tener un efecto visible sobre el ambiente o el agente.

In [9]:
#Pruebas de get_action

a = PolicyIteration(env, 0.8, 10)


try:
    a.get_action
except:
    print("La función get_action no está definida")

assert type(a.get_action(0)) is str, "La función get_action debe retornar el nombre de una acción tipo string"


#### 6. Calculo de acciones

Implemente la función `compute_new_action` que retorna la nueva mejor acción a realizar luego de ejecutar el cálculo del nuevo valor para un estado. La función recibe un estado como parámetro y retorna la mejor acción a ejecutar para ese estado (actualizando la política).


In [10]:
#Pruebas de compute_new_action

a = PolicyIteration(env, 0.8, 10)

try:
    a.compute_new_action
except:
    print("La función compute_new_action no está definida")

assert type(a.compute_new_action(0)) is str, "La función compute_new_action debe retornar un str (o None si no hay acción posible)"


#### 7. Evaluación de políticas

Implemente la función `policy_evaluation` recibe la nueva política calculada como parámetro y evalua si existió algún cambio. En caso de tener un cambio con la política actual, la función retorna `True`, de lo contrario retorna `False`.

Tenga en cuenta que la evaluación de la política se debe hacer sobre los estados del ambiente, en este caso `cold`, `hot`, `overheated`.

In [11]:
#Pruebas de policy_evaluation

a = PolicyIteration(env, 0.8, 50)

try:
    a.policy_evaluation
except:
    print("La función policy_evaluation no está definida")

assert type(a.policy_evaluation(a.policy)) is bool, "La función policy_evaluation debe retornar un booleano"


{0: 'slow', 1: 'slow', 2: None}
slow
slow
None


#### 8. Iteración de políticas

Implemente la función `policy_iteration` encargada de ejecutar la iteración de políticas en tres pasos.
1. Primero se realiza el cálculo de los nuevos valores. Esto quiere decir que para cada uno de los estados del ambiente se calcula su nuevo valor utilizando la política actual (con la función `value_evaluation`).
2. Segundo, para los nuevos valores se revisa la mejora política posible, calculando la mejor acción posible para cada uno de los estados (usando función `compute_new_action`).
3. Tercero, se evalua la política calculada de las nuevas acciones en el paso anterior, actualizando la políticia si se encontraron mejores acciones. Si no hay una mejor acción, la función debe retornar la política calculada y un indicador (booleano) que hay convergencia para detener la iteración.

La función debe retornar `True` si la política converge y `False` si no lo hace, junto con la política calculada

In [12]:
#Pruebas de policy_iteration

a = PolicyIteration(env, 0.8, 10)

### BEGIN TESTS
try:
    a.policy_iteration
except:
    print("La función policy_iteration no está definida")

assert type(a.policy_iteration()) is tuple, "La iteración de política debe retornar una tupla con un booleano y un mapa con la política"


{0: 'fast', 1: 'slow', 2: None}
slow



#### Ejecución del agente

Finalmente definimos la función `run_policy_iteration` que no recibe ningún parámatetro y ejecuta el algoritmo de solución del MDP.

Esta función ejecuta el número de iteraciones dadas a la clase. Para cada una de las iteraciones se debe calcular el nuevo valor para cada uno de los estados del ambiente y la nueva política.

Tenga en cuenta que el cálculo de los nuevos valores se debe realizar con los valores anteriores y únicamente se deben actualizar los valores cuando se actualicen los valores para todos los estados.

In [13]:
# your code here
a = PolicyIteration(env, 0.8, 10)

a.run_policy_iteration()

{0: 'fast', 1: 'slow', 2: None}
slow
{0: 'fast', 1: 'slow', 2: None}
fast
slow
None
La política converge en 3 iteraciones
Política : {0: 'fast', 1: 'slow', 2: None}


({0: 2.8, 1: 1.8, 2: 0}, {0: 'fast', 1: 'slow', 2: None})