<h2>1- Tabla de transiciones</h2>

In [1]:
import csv
import os
from collections import deque
import numpy as np
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
import matplotlib.pyplot as plt

In [7]:
directions = [-3, 1, 3, -1]

def is_valid_move(index, direction):
    if direction == -3 and index < 3:  # Up
        return False
    if direction == 3 and index > 5:  # Down
        return False
    if direction == -1 and index % 3 == 0:  # Left
        return False
    if direction == 1 and index % 3 == 2:  # Right
        return False
    return True

def swap(state, i, j):
    state = list(state)
    state[i], state[j] = state[j], state[i]
    return ''.join(state)

def generate_transitions_8_puzzle(initial_state):
    visited = set()
    queue = deque([initial_state])
    transitions = {}

    while queue:
        current_state = queue.popleft()
        if current_state in visited:
            continue
        visited.add(current_state)

        empty_index = current_state.index('9')
        state_transitions = []

        for direction in directions:
            if is_valid_move(empty_index, direction):
                new_state = swap(current_state, empty_index, empty_index + direction)
                state_transitions.append(new_state)
                if new_state not in visited:
                    queue.append(new_state)
            else:
                state_transitions.append(None)  # Invalid move

        transitions[current_state] = state_transitions

    return transitions

initial_state = "123456789"
T = generate_transitions_8_puzzle(initial_state)


<h2>2- Tabla de recompensas</h2>

Tomamos el valor máximo posible de la distancia (en este caso, el valor máximo para un estado es 12, que es la distancia máxima posible en el 8-puzzle) y restamos la distancia de Manhattan de este valor.

Esto asegurará que los estados más cercanos a la solución tengan una mayor recompensa.

<h4>Método 1</h4>

In [2]:
def sequence_reward(state):
  goal_state = "123456789"  # Estado objetivo

  correct_sequence = ""  # Secuencia correcta acumulada

  # Construir la secuencia correcta desde el inicio del estado
  for i, char in enumerate(state):
    if char == goal_state[i]:
      correct_sequence += char
    else:
      break  # Detenerse en la primera pieza incorrecta

  # La recompensa será igual al número de piezas correctas en secuencia
  rewart=1000
  if len(correct_sequence)<9:
    rewart=(len(correct_sequence)-1)*2 # Recompensa base negativa
  return rewart


<h4>Método 2:Penalización por redundancia + distancia de Manhattan</h4>

In [8]:
def sequence_reward(state, previous_state=None):
    goal_state = "123456789"
    reward = 0

    for i, char in enumerate(state):
        if char == goal_state[i]:
            reward += 10

    for i, char in enumerate(state):
        if char != "9":  # Ignorar la casilla vacía
            current_position = divmod(state.index(char), 3)
            goal_position = divmod(goal_state.index(char), 3)
            manhattan_distance = abs(current_position[0] - goal_position[0]) + abs(current_position[1] - goal_position[1])
            reward -= manhattan_distance  # Penalización proporcional a la distancia

    if previous_state is not None and state == previous_state:
        reward -= 50  # Penalización fuerte para evitar loops

    if state == goal_state:
        reward += 100

    return reward


In [9]:
R = {}
# Asignar recompensas basadas en la secuencia correcta
for state in T.keys():
  R[state] = sequence_reward(state)

In [5]:
R["123456789"]

190

<h2>3- INICIALIZACIÓN de la tabla Q a cero</h2>

Tabla de calidad que recoge las recompensas.

In [None]:
def initialize_Q_table(transitions):
    Q_table = {}
    
    for state in transitions.keys():
        Q_table[state] = {action: 0 for action in range(4)}
    return Q_table #, df_Q

In [None]:
Q=initialize_Q_table(T)

<h3>Cargar la tabla Q</h3>

In [13]:
def cargar_tabla_q_y_transformar(archivo_csv):
    q_table = {}

    with open(archivo_csv, mode='r') as archivo:
        lector = csv.reader(archivo)
        next(lector)  # Saltar la cabecera del CSV (si tiene)

        for fila in lector:
            if len(fila) != 3:
                print(f"Fila incorrecta: {fila}")  # Verificación de la estructura de cada fila
                continue  # Ignorar filas con formato incorrecto

            estado, accion, valor = fila
    
            try:
                accion = int(accion)  # La acción debe ser un número entero
                valor = float(valor)  # El valor Q debe ser un número flotante
            except ValueError:
                print(f"Error en los datos: {fila}")  # Mostrar mensaje si los datos son incorrectos
                continue  # Ignorar filas con datos incorrectos

            if estado not in q_table:
                q_table[estado] = {action: 0 for action in range(4)}  # Suponemos que hay 4 acciones

            q_table[estado][accion] = valor

    return q_table

ruta_archivo_drive = "/content/drive/My Drive/Colab Notebooks/Programación IA 7R0/Tarea 5. Aprendizaje por refuerzo Q-Learning/tablas Q/tabla_q.csv"
ruta_local = "tabla_q2.csv"
ruta_absoluta = os.path.abspath("tabla_q2.csv")
Q = cargar_tabla_q_y_transformar(ruta_absoluta)

print("Tabla Q cargada:", ruta_absoluta)


Tabla Q transformada: c:\Users\DevRoger\Documents\Q-Learning 8-Puzzles\tabla_q2.csv


<h2>4- Q-LEARNING</h2>

Ejemplo de Estado aleatorio seleccionado: 621849375 Transiciones válidas desde este estado: ['629841375', None, '621845379', '621894375']. T['621849375'][2]='621845379' Q['123456789'] vale inicialmente {0: 0, 1: 0, 2: 0, 3: 0}

<h3>épsilon greedy</h3>

Va probando movimientos aleatorios descubriendo nuevas recompensas y modificando la tabla Q, y con el conocimiento adquirido toma la mejor desición posible (la acción con el mayor valor Q del momento).

<h4>última modificación</h4>

Ahora se puede entranar tantas veces como quieras pulsando el play.
Modifica la variable num_entrenamientos para poner la cantidad de entrenamientos que quiere realizar.

In [None]:
# HIPERPARÁMETROS
v = 0.8  # Tasa de aprendizaje
y = 0.9  # Factor de descuento
epsilon = 0.8  # Probabilidad de exploración inicial
epsilon_min = 0.01  # Epsilon mínimo
epsilon_decay = 0.995  # Factor de decaimiento de epsilon

# SELECCIÓN DEL ESTADO INICIAL
def entrenar_qlearning(num_episodios, mostrar_progreso_cada=100000):
    global Q, epsilon  # Asegurar acceso a las variables globales

    estados = list(T.keys())
    s = np.random.choice(estados)  # Seleccionar un estado inicial aleatorio
    entrenar = 0

    while entrenar < num_episodios:
        # Filtrar acciones válidas
        acciones_validas = [a for a in Q[s] if T[s][a] is not None]

        if not acciones_validas:
            print(f"No hay acciones válidas desde el estado {s}")
            break  # Salir del bucle si no hay acciones válidas

        # Seleccionar acción usando epsilon-greedy
        if np.random.rand() < epsilon:
            # Exploración: elegir una acción válida al azar
            a = np.random.choice(acciones_validas)
        else:
            # Explotación: elegir la mejor acción válida
            a = max(acciones_validas, key=lambda a: Q[s][a])

        # Obtener el siguiente estado
        siguiente = T[s][a]

        # Actualizar la tabla Q usando la ecuación de Bellman
        Q[s][a] = (1 - v) * Q[s][a] + v * (R[s] + y * max(Q[siguiente].values()))

        # Mostrar progreso cada N episodios
        #if entrenar % mostrar_progreso_cada == 0:
        #    print(f"Episodio {entrenar}, Epsilon: {epsilon:.4f}")
        #    print(f"Estado actual: {s}, Q[{s}]: {Q[s]}")

        # Actualizar el estado actual
        if siguiente != "123456789":
            s = siguiente
        else:
            # Si llegamos al estado objetivo, reiniciar a un estado aleatorio
            s = np.random.choice(estados)

        # Reducir epsilon para priorizar explotación a medida que se aprende
        epsilon = max(epsilon_min, epsilon * epsilon_decay)

        # Incrementar el contador de episodios
        entrenar += 1

    print(f"Entrenamiento completado: {num_episodios} episodios")

# Realizar múltiples entrenamientos
num_entrenamientos = 100  # Número de entrenamientos consecutivos
episodios_por_entrenamiento = 1000000  # Episodios por cada entrenamiento

for i in range(num_entrenamientos):
    print(f"\nInicio del entrenamiento {i + 1}/",num_entrenamientos)
    entrenar_qlearning(num_episodios=episodios_por_entrenamiento, mostrar_progreso_cada=100000)
    print(f"Fin del entrenamiento {i + 1}/",num_entrenamientos)


In [14]:
def entrenar_qlearning(num_episodios, mostrar_progreso_cada=100000):
    global Q, epsilon  # Acceso a variables globales
    
    v_inicial = 0.8
    v_minimo = 0.1
    y = 0.9  # Factor de descuento
    epsilon_inicial = 0.95
    epsilon_minimo = 0.01
    decay_rate_v = 0.001
    decay_rate_epsilon = 0.0005
    
    estados = list(T.keys())
    s = np.random.choice(estados)  # Estado inicial aleatorio
    entrenar = 0

    while entrenar < num_episodios:
        v = max(v_minimo, v_inicial * np.exp(-decay_rate_v * entrenar))
        epsilon = max(epsilon_minimo, epsilon_inicial * np.exp(-decay_rate_epsilon * entrenar))
        
        acciones_validas = [a for a in Q[s] if T[s][a] is not None]

        if not acciones_validas:
            print(f"No hay acciones válidas desde el estado {s}")
            break

        if np.random.rand() < epsilon:
            a = np.random.choice(acciones_validas)
        else:
            a = max(acciones_validas, key=lambda a: Q[s][a])

        siguiente = T[s][a]

        Q[s][a] = (1 - v) * Q[s][a] + v * (R[s] + y * max(Q[siguiente].values()))

        if siguiente != "123456789":
            s = siguiente
        else:
            s = np.random.choice(estados)

        entrenar += 1

    print(f"Entrenamiento completado: {num_episodios} episodios")

num_entrenamientos = 2000  # Número de entrenamientos consecutivos
episodios_por_entrenamiento = 1000000  # Episodios por cada entrenamiento

for i in range(num_entrenamientos):
    print(f"\nInicio del entrenamiento {i + 1}/",num_entrenamientos)
    entrenar_qlearning(num_episodios=episodios_por_entrenamiento, mostrar_progreso_cada=100000)
    print(f"Fin del entrenamiento {i + 1}/",num_entrenamientos)



Inicio del entrenamiento 1/ 2000
Entrenamiento completado: 1000000 episodios
Fin del entrenamiento 1/ 2000

Inicio del entrenamiento 2/ 2000
Entrenamiento completado: 1000000 episodios
Fin del entrenamiento 2/ 2000

Inicio del entrenamiento 3/ 2000
Entrenamiento completado: 1000000 episodios
Fin del entrenamiento 3/ 2000

Inicio del entrenamiento 4/ 2000
Entrenamiento completado: 1000000 episodios
Fin del entrenamiento 4/ 2000

Inicio del entrenamiento 5/ 2000
Entrenamiento completado: 1000000 episodios
Fin del entrenamiento 5/ 2000

Inicio del entrenamiento 6/ 2000
Entrenamiento completado: 1000000 episodios
Fin del entrenamiento 6/ 2000

Inicio del entrenamiento 7/ 2000
Entrenamiento completado: 1000000 episodios
Fin del entrenamiento 7/ 2000

Inicio del entrenamiento 8/ 2000
Entrenamiento completado: 1000000 episodios
Fin del entrenamiento 8/ 2000

Inicio del entrenamiento 9/ 2000
Entrenamiento completado: 1000000 episodios
Fin del entrenamiento 9/ 2000

Inicio del entrenamiento 10

<h3>Guardar Tabla Q</h3>


In [10]:
def guardar_tabla_q(q_table, archivo_csv):
    directorio = os.path.dirname(archivo_csv)
    if not os.path.exists(directorio):
        os.makedirs(directorio)

    with open(archivo_csv, mode='w', newline='') as archivo:
        escritor = csv.writer(archivo)
        escritor.writerow(["Estado", "Accion", "ValorQ"])
        for estado, acciones in q_table.items():
            for accion, valor in acciones.items():
                escritor.writerow([estado, accion, valor])

In [11]:
ruta_absoluta = os.path.abspath("tabla_q2.csv")
print(f"El archivo se guardará en: {ruta_absoluta}")
guardar_tabla_q(Q, ruta_absoluta)

El archivo se guardará en: c:\Users\DevRoger\Documents\Q-Learning 8-Puzzles\tabla_q2.csv


In [14]:
#ruta_archivo = "C:/Usuarios/DevRoger/Documentos/tablasQ/tabla_q.csv"
#ruta_archivo = "/content/drive/My Drive/Colab Notebooks/Programación IA 7R0/Tarea 5. Aprendizaje por refuerzo Q-Learning/tablas Q/tabla_q2.csv"

<h3>¿Cuánto aprendió?</h3>

In [15]:
path = []

def calcular_porcentaje_aprendizaje(Q, valor_inicial=0):
    total_valores = 0
    valores_aprendidos = 0

    for estado, acciones in Q.items():
        for accion, valor in acciones.items():
            total_valores += 1
            if valor > valor_inicial:
                valores_aprendidos += 1

    porcentaje = (valores_aprendidos / total_valores) * 100
    return porcentaje



<h2>Solución</h2>

Después del entrenamiento se resuelve el 8-puzzle asignando un estado inicial aleatorio:

In [16]:
def resolver_8_puzzle(T, Q, estado_inicial, estado_objetivo="123456789"):
    if estado_inicial is None:
        estado_inicial = np.random.choice(list(T.keys()))
    print(f"Estado inicial: {estado_inicial}")

    estado_actual = estado_inicial
    pasos = 0

    while estado_actual != estado_objetivo:
        path.append(estado_actual)
        acciones = Q[estado_actual]
        acciones_validas = [a for a in Q[estado_actual] if T[estado_actual][a] is not None]
        accion_mejor = max(acciones_validas, key=acciones.get)

        siguiente_estado = T[estado_actual][accion_mejor]

        if siguiente_estado is None:
            print(f"Error: no hay una transición válida desde el estado {estado_actual} con la acción {accion_mejor}.")
            break

        estado_actual = siguiente_estado
        pasos += 1

        if pasos > 100:  # Prevenir loops infinitos
            print("El algoritmo no pudo resolver el puzzle en un número razonable de pasos.")
            break

    if estado_actual == estado_objetivo:
        print(f"¡Puzzle resuelto en {pasos} pasos! Estado final: {estado_actual}")
    else:
        print("No se pudo resolver el puzzle. Último estado alcanzado: ", estado_actual)

estado_inicial = None  # Deja como None para seleccionar un estado aleatorio

porcentaje_aprendizaje = calcular_porcentaje_aprendizaje(Q)
print(f"Porcentaje de aprendizaje: {porcentaje_aprendizaje:.2f}%")

resolver_8_puzzle(T, Q, estado_inicial)

Porcentaje de aprendizaje: 63.29%
Estado inicial: 198325746
El algoritmo no pudo resolver el puzzle en un número razonable de pasos.
No se pudo resolver el puzzle. Último estado alcanzado:  128356749


In [None]:
print("Path recorrido por el agente:")
for estado in path:
    print(estado)

<h3>Visualización gráfica de la solución</h3>

In [22]:
def animate_puzzle_solution(path):
    # Crear la figura
    fig, ax = plt.subplots()
    ax.axis("off")
    tiles = [[None for _ in range(3)] for _ in range(3)]

    # Inicialización del gráfico
    def init():
        for i in range(3):
            for j in range(3):
                tiles[i][j] = ax.text(
                    j, -i, "", ha="center", va="center", fontsize=20,
                    bbox=dict(boxstyle="round", facecolor="white", edgecolor="black")
                )
        ax.set_xlim(-0.5, 2.5)
        ax.set_ylim(-2.5, 0.5)
        return [tile for row in tiles for tile in row]

    # Actualización para cada frame
    def update(frame):
        state = path[frame]
        for i, value in enumerate(state):
            x, y = i % 3, i // 3
            tiles[y][x].set_text("" if value == '9' else value)
        return [tile for row in tiles for tile in row]

    # Crear la animación
    anim = FuncAnimation(fig, update, frames=len(path), init_func=init, blit=True, repeat=False)
    plt.close(fig)
    return HTML(anim.to_jshtml())

# Mostrar la animación
animate_html = animate_puzzle_solution(path)
display(animate_html)