
## Q-Learning Example: 
Find shortest number of steps to user defined goal node (simple shortest path problem) from selected start node.

<img src="http://mnemstudio.org/ai/path/images/map1a.gif">

In this image the start node is 2 and goal node is 5

In [1]:
import numpy as np

In [2]:
# Función para inicializar la matriz de recompensas
def init_reward_matrix(goal_state=5, initial_goal_reward=100):
    
    global R
    # Matriz de recompensas
    # cada fila es un estado, cada columna es un nodo de destino
    # -1 marca un movimiento "nulo" y no se puede realizar
    R = np.asmatrix([[-1, -1, -1, -1,  0, -1], # 0 can take action 4
                     [-1, -1, -1,  0, -1,  0], # 1 can take action 3, 5
                     [-1, -1, -1,  0, -1, -1], # 2 can take action 3
                     [-1,  0,  0, -1,  0, -1], # 3 can take action 1, 4
                     [ 0,  -1, -1, 0, -1,  0], # 4 can take action 0, 3, 5
                     [-1,  0, -1, -1,  0,  -1] # 5 can take action 1, 4
                    ])
    
    # añadir recompensa para movimientos de longitud 1 hacia la meta (por ejemplo, 100)
    for (row_idx, col_idx), reward in np.ndenumerate(R):
        if col_idx == goal_state and reward != -1:
            R[row_idx, col_idx] = initial_goal_reward
        
        # Hacer posible/deseable el movimiento de la meta a la meta
        if row_idx == goal_state and col_idx == goal_state:
            R[row_idx, col_idx] = initial_goal_reward
    
    print(f"Matriz de recompensas con estado objetivo {goal_state}: \n {R}\n")

In [3]:
# Función para obtener las acciones disponibles para un estado dado
def available_actions(state):
    current_state_row = R[state,]
    actions = np.where(current_state_row != -1)[1]
    return actions

# Función para actualizar los valores Q
def update(current_state, action, gamma):
    # obtener el índice de columna del valor Q máximo 
    max_index = np.where(Q[action,] == np.max(Q[action,]))[1]

    if max_index.shape[0] > 1:
        # si hay varios, elegir al azar
        max_index = int(np.random.choice(max_index, size=1))
    else:
        max_index = int(max_index)
        
    max_value = Q[action, max_index]

    """ Fórmula de Q-learning
    Omite la tasa de aprendizaje y el valor antiguo; efectivamente tiene una tasa de aprendizaje de 1 (ver fórmula a continuación)    
    """
    Q[current_state, action] = R[current_state, action] + gamma * max_value

<img src="https://wikimedia.org/api/rest_v1/media/math/render/svg/47fa1e5cf8cf75996a777c11c7b9445dc96d4637">

In [4]:
# Valores Q
Q = np.asmatrix(np.zeros([6, 6]))

Q_history =  []

# Hyperparameters
gamma = 0.8 # factor de descuento

goal_state = int(input("Ingresa el estado objetivo (0-5) para entrenar: "))
init_reward_matrix(goal_state)


# Entrenamiento: mejora iterativamente los valores Q para que indiquen acciones óptimas
# Keep choosing a random state and a random action and updating Q-table
def train(iterations=1000):
    
    for i in range(10000):
        # estado inicial aleatorio
        current_state = np.random.randint(int(Q.shape[0]))

        # siguiente acción aleatoria (estado al que moverse)
        available_act = available_actions(current_state)
        action = int(np.random.choice(available_act, 1))
        
        # actualizar los valores Q
        update(current_state, action, gamma)
        np.set_printoptions(precision=3)

        Q_history.append({"Q": np.copy(Q), 
                          "state": current_state, 
                          "action": action, 
                          "reward" : R[current_state, action]
                        })
        
# ------------------------------
# Entrenamiento
train(10000)

# Actualizar la matriz Q
#update(initial_state, action, gamma)

np.set_printoptions(precision=3)
print(f"Trained Q values:\n{Q}\n")

#print(f"Train Q values (normalised):\n{Q / np.max(Q) * 100}\n")

# ------------------------------
# Testing / Testeando
current_state = int(input("Enter starting state (0-5): "))

steps = [current_state]

if current_state not in range(6) or goal_state not in range(6):
    print("Error: Invalid states" )
    sys.exit(-1)

num_timesteps = 10

# Start episode
# alternatively: 
# while current_state != goal_state:
while num_timesteps > 0:
    
    # Explicación: np.where(condición) devuelve los índices (fila, columna) de la condición encontrada
    # necesitamos el valor Q máximo -> el valor de columna más grande en la fila de estados actual
    # esto se debe a que cada entrada en cada fila es la recompensa acumulativa por tomar la acción (columna)
    # desde el estado actual (fila)
    next_step_index = np.where(Q[current_state,] == np.max(Q[current_state,]))[1]

    if next_step_index.shape[0] > 1:
        next_step_index = int(np.random.choice(next_step_index, size=1))
    else:
        next_step_index = int(next_step_index)

    steps.append(next_step_index)
    current_state = next_step_index

    num_timesteps -= 1
    
# Imprimir secuencia seleccionada de pasos
print(f"\nSelected path: {steps}")



Reward matrix with goal_state 3: 
 [[ -1  -1  -1  -1   0  -1]
 [ -1  -1  -1 100  -1   0]
 [ -1  -1  -1 100  -1  -1]
 [ -1   0   0 100   0  -1]
 [  0  -1  -1 100  -1   0]
 [ -1   0  -1  -1   0  -1]]

Trained Q values:
[[  0.   0.   0.   0. 400.   0.]
 [  0.   0.   0. 500.   0. 320.]
 [  0.   0.   0. 500.   0.   0.]
 [  0. 400. 400. 500. 400.   0.]
 [320.   0.   0. 500.   0. 320.]
 [  0. 400.   0.   0. 400.   0.]]


Selected path: [1, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]


In [5]:
# Visualización del historial de entrenamiento
from IPython.display import clear_output
from time import sleep

def print_history(dicts):
    '''Muestra cómo, durante el entrenamiento, se utilizan estados aleatorios con acciones aleatorias 
    para mejorar iterativamente los valores Q, que finalmente convergen'''
    
    for i, frame in enumerate(dicts):
        clear_output(wait=True)
        print(frame['Q'])
        print(f"Iteration: {i + 1}")
        print(f"Random State: {frame['state']}")
        print(f"Random Action: {frame['action']}")
        print(f"Reward: {frame['reward']}")
        sleep(.1)
              

print_history(Q_history)

[[  0.   0.   0.   0. 400.   0.]
 [  0.   0.   0. 500.   0. 320.]
 [  0.   0.   0. 500.   0.   0.]
 [  0. 400. 400. 500. 400.   0.]
 [320.   0.   0. 500.   0. 320.]
 [  0. 400.   0.   0. 400.   0.]]
Iteration: 10000
Random State: 5
Random Action: 1
Reward: 0


In [6]:
# Representación de la red
network = r""" 
    1
    | \
2 - 3  5
    | /
0 - 4
"""

In [7]:
print(network)

 
    1
    | \
2 - 3  5
    | /
0 - 4

