# Imports

In [1]:
import gymnasium as gym
import pygame
import random
import sys

# Entorno Jugable : ¡Pruébalo!

In [3]:
pygame.init()

# Configuración de pantalla de juego
width, height = 400, 600
screen = pygame.display.set_mode((width, height))
pygame.display.set_caption("Entorno de Aprendizaje")

# Colores
white = (255, 255, 255)
black = (0, 0, 0)

# Jugador
player_size = 50
player_x = width // 2 - player_size // 2
player_y = height - 2 * player_size

# Obstáculos
obstacle_size = 50
obstacle_speed = 5
obstacle_frequency = 25  # A mayor valor, menos obstáculos
obstacles = []

# Reloj para controlar la velocidad del juego
clock = pygame.time.Clock()

# Función para mostrar un mensaje en la pantalla
def show_message(message, size, color, y_offset):
    font = pygame.font.Font(None, size)
    text = font.render(message, True, color)
    text_rect = text.get_rect(center=(width // 2, height // 2 + y_offset))
    screen.blit(text, text_rect)

# Bucle principal del juego
running = True
waiting_for_restart = False
while running:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False

    keys = pygame.key.get_pressed()
    player_speed = 5
    player_x -= keys[pygame.K_LEFT] * player_speed
    player_x += keys[pygame.K_RIGHT] * player_speed

    # Límites del jugador
    player_x = max(0, min(player_x, width - player_size))

    # Generar obstáculos aleatorios
    if random.randint(0, obstacle_frequency) == 0:
        obstacle_x = random.randint(0, width - obstacle_size)
        obstacle_y = 0
        obstacles.append((obstacle_x, obstacle_y))

    # Mover y dibujar obstáculos
    new_obstacles = []
    for obstacle in obstacles:
        obstacle_x, obstacle_y = obstacle
        obstacle_y += obstacle_speed
        pygame.draw.rect(screen, white, (obstacle_x, obstacle_y, obstacle_size, obstacle_size))
        if obstacle_y < height:
            new_obstacles.append((obstacle_x, obstacle_y))
    obstacles = new_obstacles

    # Dibujar jugador
    pygame.draw.rect(screen, white, (player_x, player_y, player_size, player_size))

    # Verificar colisiones
    player_rect = pygame.Rect(player_x, player_y, player_size, player_size)
    for obstacle in obstacles:
        obstacle_rect = pygame.Rect(obstacle[0], obstacle[1], obstacle_size, obstacle_size)
        if player_rect.colliderect(obstacle_rect):
            show_message("Game Over", 36, white, -20)
            show_message("Press 'R' to Restart", 24, white, 20)
            pygame.display.flip()
            waiting_for_restart = True

    pygame.display.flip()
    screen.fill(black)
    clock.tick(30)

    # Bucle para esperar la pulsación de la tecla 'R' para reiniciar el juego
    while waiting_for_restart:
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False
                waiting_for_restart = False
            elif event.type == pygame.KEYDOWN:
                if event.key == pygame.K_r:
                    waiting_for_restart = False

        # Mensaje de reinicio en la pantalla
        show_message("Press 'R' to Restart", 24, white, 20)
        pygame.display.flip()
        clock.tick(30)

        # Limpiar y reiniciar el juego
        player_x = width // 2 - player_size // 2
        player_y = height - 2 * player_size
        obstacles = []

pygame.quit()
sys.exit()

SystemExit: 

# Declaración de Entorno

In [2]:
pygame.init()

class SimpleGameEnv(gym.Env):
    def __init__(self):
        super(SimpleGameEnv, self).__init__()

        # Configuración de pantalla de juego
        self.width, self.height = 400, 600
        self.screen = pygame.display.set_mode((self.width, self.height))
        pygame.display.set_caption("Entorno de Aprendizaje")

        # Colores
        self.white = (255, 255, 255)
        self.black = (0, 0, 0)

        # Jugador
        self.player_size = 50
        self.player_x = self.width // 2 - self.player_size // 2
        self.player_y = self.height - 2 * self.player_size

        # Obstáculos
        self.obstacle_size = 50
        self.obstacle_speed = 5
        self.obstacle_frequency = 25  # A mayor valor, menos obstáculos
        self.obstacles = []

        # Reloj para controlar la velocidad del juego
        self.clock = pygame.time.Clock()

        # Definir el espacio de observación y de acción
        self.observation_space = gym.spaces.Discrete(2)  # Ajusta el espacio de observación según tu juego
        self.action_space = gym.spaces.Discrete(2)  # Ajusta el espacio de acción según tu juego

        # Define recompensas y penalizaciones
        self.reward_for_movement = 0.1
        self.reward_for_avoiding_obstacle = 1.0
        self.penalty_for_collision = -10

    def reset(self):
        # Reiniciar el juego y devolver el estado inicial
        self.player_x = self.width // 2 - self.player_size // 2
        self.player_y = self.height - 2 * self.player_size
        self.obstacles = []
        return self._get_observation()

    def step(self, action):
        # Realizar la acción en el juego y devolver la observación, la recompensa y si el episodio ha terminado
        self._handle_player_movement(action)
        self._generate_obstacles()
        self._move_and_draw_obstacles()
        self._draw_player()

        # Verificar colisiones
        collision = self._check_collisions()
        if collision:
            reward = self.penalty_for_collision
            done = True
        else:
            reward = self.reward_for_movement
            done = False

        pygame.display.flip()
        self.screen.fill(self.black)
        self.clock.tick(30)

        # Retorna información relevante para el agente Q-learning
        return self._get_observation(), reward, done, {}

    def render(self):
        # Mostrar el estado actual del juego
        pass

    def close(self):
        # Cerrar el entorno
        pygame.quit()
        sys.exit()

    def _get_observation(self):
        # Devuelve la observación actual
        return (self.player_x, self.player_y)

    def _handle_player_movement(self, action):
        # Manejar el movimiento del jugador según la acción
        player_speed = 5
        self.player_x += (2 * action - 1) * player_speed  # Mover a la izquierda si action es 0, mover a la derecha si action es 1
        self.player_x = max(0, min(self.player_x, self.width - self.player_size))


    def _generate_obstacles(self):
        # Generar obstáculos aleatorios
        if random.randint(0, self.obstacle_frequency) == 0:
            obstacle_x = random.randint(0, self.width - self.obstacle_size)
            obstacle_y = 0
            self.obstacles.append((obstacle_x, obstacle_y))

    def _move_and_draw_obstacles(self):
        # Mover y dibujar obstáculos
        new_obstacles = []
        for obstacle in self.obstacles:
            obstacle_x, obstacle_y = obstacle
            obstacle_y += self.obstacle_speed
            pygame.draw.rect(self.screen, self.white, (obstacle_x, obstacle_y, self.obstacle_size, self.obstacle_size))
            if obstacle_y < self.height:
                new_obstacles.append((obstacle_x, obstacle_y))
        self.obstacles = new_obstacles

    def _draw_player(self):
        # Dibujar jugador
        pygame.draw.rect(self.screen, self.white, (self.player_x, self.player_y, self.player_size, self.player_size))

    def _check_collisions(self):
        # Verificar colisiones entre el jugador y los obstáculos
        player_rect = pygame.Rect(self.player_x, self.player_y, self.player_size, self.player_size)
        for obstacle in self.obstacles:
            obstacle_rect = pygame.Rect(obstacle[0], obstacle[1], self.obstacle_size, self.obstacle_size)
            if player_rect.colliderect(obstacle_rect):
                return True  # Colisión detectada
        return False

## Agente

In [3]:
class Agente:
    def __init__(self, alpha, gamma, epsilon):
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.q_table = {}

    def select_action(self, observation):
        if observation not in self.q_table or random.uniform(0, 1) < self.epsilon:
            action = random.choice([0, 1, 2])  # Exploración aleatoria
        else:
            action = max(self.q_table[observation], key=self.q_table[observation].get)  # Explotación basada en Q

        return action

    def update(self, observation, action, reward, next_observation, done):
        if observation not in self.q_table:
            self.q_table[observation] = {0: 0, 1: 0, 2: 0}

        if next_observation not in self.q_table:
            self.q_table[next_observation] = {0: 0, 1: 0, 2: 0}
            
        max_q_next = max(self.q_table[next_observation].values()) if not done else 0
        self.q_table[observation][action] += self.alpha * (reward + self.gamma * max_q_next - self.q_table[observation][action])

## Bucle de Entrenamiento

In [4]:
import threading

def train_thread(env, agent, num_episodes):
    for episode in range(num_episodes):
        observation = env.reset()
        total_reward = 0

        while True:
            action = agent.select_action(observation)
            next_observation, reward, done, _ = env.step(action)
            total_reward += reward

            agent.update(observation, action, reward, next_observation, done)

            observation = next_observation

            if done:
                print(f"Hilo {threading.current_thread().name}, Episodio {episode + 1}, Recompensa total: {total_reward}")
                break


# ¡Ahora te toca a ti!

Puedes experimentar cambiando todos los parámetros, como el número de entrenamientos simultáneos, los hiperparámetros de entrenamiento (alpha, gamma, epsilon), probar con diferentes entornos, probar nuevos algoritmos... 

¿Crees que puedes hacerlo mejor? ¡Seguro que sí!

In [5]:
pygame.init()

num_threads = 4
num_episodes_per_thread = 1000

threads = []
agents = [Agente(alpha=0.1, gamma=0.9, epsilon=0.1) for _ in range(num_threads)]

envs = [SimpleGameEnv() for _ in range(num_threads)]

for i in range(num_threads):
    thread = threading.Thread(target=train_thread, args=(envs[i], agents[i], num_episodes_per_thread), name=f"Thread-{i+1}")
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

for env in envs:
    env.close()

Hilo Thread-3, Episodio 1, Recompensa total: 10.700000000000024
Hilo Thread-2, Episodio 1, Recompensa total: 12.200000000000045
Hilo Thread-1, Episodio 1, Recompensa total: 17.800000000000125
Hilo Thread-4, Episodio 1, Recompensa total: 20.30000000000016
Hilo Thread-1, Episodio 2, Recompensa total: 2.1999999999999726
Hilo Thread-2, Episodio 2, Recompensa total: 8.799999999999997
Hilo Thread-3, Episodio 2, Recompensa total: 16.900000000000112
Hilo Thread-4, Episodio 2, Recompensa total: 10.300000000000018
Hilo Thread-1, Episodio 3, Recompensa total: 4.1999999999999655
Hilo Thread-3, Episodio 3, Recompensa total: 0.19999999999997975
Hilo Thread-2, Episodio 3, Recompensa total: 7.299999999999976
Hilo Thread-3, Episodio 4, Recompensa total: -0.8000000000000167
Hilo Thread-2, Episodio 4, Recompensa total: 3.5999999999999677
Hilo Thread-4, Episodio 3, Recompensa total: 12.100000000000044
Hilo Thread-3, Episodio 5, Recompensa total: 0.0999999999999801
Hilo Thread-1, Episodio 4, Recompensa tot