# Environment

In [1]:
import numpy as np
import os
from pyboy import PyBoy
from gymnasium import Env, spaces
from stable_baselines3 import PPO
from stable_baselines3.ppo import MultiInputPolicy
from stable_baselines3.common.env_checker import check_env
from gymnasium.wrappers import TransformObservation
from gymnasium.spaces import Box, Dict

# List of actions
ACTIONS = ['a', 'b', 'left', 'right', 'up', 'down']

# Definir posiciones objetivo con orientación (x, y, map, orientation)
TARGET_POSITIONS = [
    (16, 8, 80, 1),  # Target Position 1
    (17, 9, 80, 2),  # Target Position 2
    (16, 10, 80, 0)   # Target Position 3
]
class GenericPyBoyEnv(Env):
    def __init__(self, pyboy, debug=False, render_mode=False, max_gameplay_time=1080000):
        super().__init__()
        self.pyboy = pyboy
        self.debug = debug
        self.render_mode = render_mode
        self.max_gameplay_time = max_gameplay_time
        self.current_gameplay_time = 0
        
        self.action_space = spaces.Discrete(len(ACTIONS))
        self.observation_space = Dict({
           "info": Box(0, 255, (4,), dtype=np.float32)  # Added orientation
        })
        
        self.visited_maps = set()
        self.visited_positions = set()
        self.last_pos = None
        
        if not self.debug:
            self.pyboy.set_emulation_speed(9 if not self.render_mode else 3)
        
        self.load_state()

    def load_state(self):
        with open("zero_state.state", "rb") as f:
            self.pyboy.load_state(f)

    def get_observation(self):
        pos_x = self.pyboy.memory[0xC0D4]
        pos_y = self.pyboy.memory[0xC0D5]
        map_id = self.pyboy.memory[0xC92D]
        orientation = self.pyboy.memory[0xC0D8]

        return {
            "info": np.array([pos_x, pos_y, map_id, orientation], dtype=np.float32)
        }

    def step(self, action):
        self.pyboy.button(ACTIONS[action])
        for _ in range(60):
            self.pyboy.tick()
            self.current_gameplay_time += 1
            
        timeout = self.current_gameplay_time >= self.max_gameplay_time
        
        pos_x = self.pyboy.memory[0xC0D4]
        pos_y = self.pyboy.memory[0xC0D5]
        map_value = self.pyboy.memory[0xC92D]
        orientation = self.pyboy.memory[0xC0D8]
        full_position = (pos_x, pos_y, map_value, orientation)
        # Check if the agent reached *any* of the target positions and pressed 'A'
        reached_target = full_position in TARGET_POSITIONS and action == 0
        #reached_target = self.pyboy.memory[0xC92D] == 107
        reward = -0.001

        #if full_position not in self.visited_positions:
         #   reward += 1
            #print("New position")

        #if full_position == self.last_pos and action in [2, 3, 4, 5]:
         #   reward -= 0.05
            #print("Invalid move")
        if map_value not in self.visited_maps:
            print("New Map")
            reward += 1
            
        if reached_target:
            print("You Win!")
            reward += 10  # Large reward for reaching the goal and pressing 'A'

        # If timeout happens, give a small penalty
        if timeout:
            #reward -= 10
            print("GAME OVER")

        self.visited_maps.add(map_value)
        self.visited_positions.add(full_position)
        self.last_pos = full_position

        # Done condition: timeout or reaching target position & pressing 'A'
        done = timeout or reached_target

        return self.get_observation(), reward, done, False, {}

    def reset(self, seed=None, **kwargs):
        self.load_state()
        self.current_gameplay_time = 0
        self.visited_positions.clear()
        self.visited_maps.clear()
        return self.get_observation(), {}

    def close(self):
        self.pyboy.stop()




# Training Agent 1 ENV

In [None]:
from pyboy import PyBoy
from stable_baselines3 import PPO
from stable_baselines3.ppo import MultiInputPolicy
from gymnasium.wrappers import TransformObservation
from gymnasium.spaces import Dict
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.monitor import Monitor
import matplotlib.pyplot as plt
import pandas as pd

# Load PyBoy and initialize environment
pyboy = PyBoy(r"C:\Users\USUARIO\Desktop\RLMEDA\MedarotKabuto.gb")
train_env = GenericPyBoyEnv(pyboy, debug=False, render_mode=True)

# Transform observation to remove unnecessary screen info
train_env = TransformObservation(
    train_env,
    lambda obs: {"info": obs["info"]},  # Solo usar la información de posición
    observation_space=Dict({"info": Box(0, 255, (4,), dtype=np.float32)})  # Ajuste de límites
)

# Validate environment
#check_env(train_env, warn=True)

# Train PPO model with optimized hyperparameters comment if u want to see training
train_env = Monitor(train_env, filename="./ppo_medarot_logs")

model = PPO(
    MultiInputPolicy,
    train_env,
    verbose=1,
    learning_rate=1e-4,  # Slower learning for better convergence
    n_steps=4096,  # More steps before updates
    gamma=0.995,  # Encourage long-term planning
    ent_coef=0.1,  # More exploration during training
    batch_size=512,  # Larger batch for stability
    clip_range=0.2,  # Less restrictive clipping
    tensorboard_log="./ppo_tensorboard/"
)
# Train for more timesteps for better learning
model.learn(total_timesteps=100000)  
model.save("ppo_medarot")


New Map
New Map
Using cpu device
Wrapping the env in a DummyVecEnv.
Logging to ./ppo_tensorboard/PPO_27
New Map


# Evaluate for 5 episodes

In [12]:
# Evaluación del modelo
model = PPO.load("ppo_medarot")
pyboy = PyBoy(r"C:\Users\USUARIO\Desktop\RLMEDA\MedarotKabuto.gb")
eval_env = GenericPyBoyEnv(pyboy, debug=False, render_mode=False)
eval_env = TransformObservation(
    eval_env,
    lambda obs: {"info": obs["info"]},
    observation_space=eval_env.observation_space
)


# Evaluar el modelo entrenado
num_episodes = 5
total_rewards = []

for episode in range(num_episodes):
    obs, _ = eval_env.reset()
    total_reward = 0

    for _ in range(3600):  # 10 minutos, 1 acción por segundo
        # Asegurar que la observación es un diccionario (como en `observation_space`)
        if not isinstance(obs, dict):
            raise ValueError(f"La observación esperada es un diccionario, pero se recibió: {type(obs)}")

        # Obtener la acción del modelo
        action, _ = model.predict(obs)  # No concatenar, usar directamente el diccionario

        # Tomar un paso en el entorno
        obs, reward, done, truncated, _ = eval_env.step(action)
        total_reward += reward

        if done:
            break

    total_rewards.append(total_reward)
    print(f"Episodio {episode + 1}: Recompensa total: {total_reward}")

# Imprimir la recompensa promedio
print(f"Recompensa promedio en {num_episodes} episodios: {np.mean(total_rewards)}")


You Win!
Episodio 1: Recompensa total: 1
You Win!
Episodio 2: Recompensa total: 1
You Win!
Episodio 3: Recompensa total: 1
You Win!
Episodio 4: Recompensa total: 1
You Win!
Episodio 5: Recompensa total: 1
Recompensa promedio en 5 episodios: 1.0


# Train Agent in 4 Env

In [2]:
from pyboy import PyBoy
from stable_baselines3 import PPO
from stable_baselines3.ppo import MultiInputPolicy
from gymnasium.wrappers import TransformObservation
from gymnasium.spaces import Box, Dict
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import SubprocVecEnv
import numpy as np
import matplotlib.pyplot as plt

# Define a function to create environments for SubprocVecEnv
def make_env():
    pyboy = PyBoy(r"C:\Users\USUARIO\Desktop\RLMEDA\MedarotKabuto.gb")
    train_env = GenericPyBoyEnv(pyboy, debug=False, render_mode=False)

    # Transform observation to remove unnecessary screen info
    train_env = TransformObservation(
        train_env,
        lambda obs: {"info": obs["info"]},  # Only use position info
        observation_space=Dict({
            "info": Box(low=0, high=255, shape=(4,), dtype=np.float32)  # Adjusted shape and dtype
        })
    )
# Validate environment
    #check_env(train_env, warn=True)

# Monitor to log training information
    return Monitor(train_env, filename="./ppo_medarot_logs")

# Number of parallel environments
num_envs = 8  # Use 4 parallel environments
vec_env = SubprocVecEnv([make_env for _ in range(num_envs)])

# Configuring PPO with the vectorized environment
model = PPO(
    MultiInputPolicy,
    vec_env,
    verbose=1,
    learning_rate=1e-4,
    n_steps=4096,  # Ensure divisible by num_envs for batch updates
    batch_size=512,
    gamma=0.999,
    gae_lambda=0.95,
    ent_coef=0.1,
    clip_range=0.2,
    tensorboard_log="./ppo_tensorboard/"
)

# Train the model for 50,000 timesteps
model.learn(total_timesteps=900000)
model.save("ppo_medarot_2")


Using cpu device
Logging to ./ppo_tensorboard/PPO_29
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 2.01e+03 |
|    ep_rew_mean     | 12       |
| time/              |          |
|    fps             | 101      |
|    iterations      | 1        |
|    time_elapsed    | 323      |
|    total_timesteps | 32768    |
---------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 3.81e+03     |
|    ep_rew_mean          | 10.2         |
| time/                   |              |
|    fps                  | 105          |
|    iterations           | 2            |
|    time_elapsed         | 619          |
|    total_timesteps      | 65536        |
| train/                  |              |
|    approx_kl            | 0.0030895197 |
|    clip_fraction        | 0.000287     |
|    clip_range           | 0.2          |
|    entropy_loss         | -1.79        |
|    

# Plot Best Policy

In [3]:
import matplotlib.pyplot as plt
import pandas as pd
# Evaluate the policy on a fresh environment and visualize the trajectory
def test_policy(model):
    # Initialize a test environment
    test_env = make_env()
    obs, info = test_env.reset()

    done = False
    test_positions = []

    # Test the learned policy
    while not done:
        action, _ = model.predict(obs)
        obs, reward, done, truncated, info = test_env.step(action)  # Unpack 5 values
        print(obs)
        test_positions.append((obs["info"][0], obs["info"][1]))  # Record positions
        
    # Plot the trajectory
    test_positions = np.array(test_positions)
    #print(test_positions)
    final_position = len(test_positions) - 1
    plt.plot(test_positions[:, 0], test_positions[:, 1], marker="o", linestyle="-")
    plt.plot(test_positions[0, 0], test_positions[0, 1], "g^")  # Starting point
    plt.plot(test_positions[final_position, 0], test_positions[final_position, 1], "rs")  # Final point
    plt.xlabel("Posición X")
    plt.ylabel("Posición Y")
    plt.title("Trayectoria del agente en la prueba")
    plt.show()

# Load the trained PPO model
model = PPO.load("ppo_medarot")  # Load your trained model

# Test the policy on a single environment and visualize the trajectory
test_policy(model) 

New Map
New Map
New Map
{'info': array([  4.,   5., 124.,   2.], dtype=float32)}
{'info': array([  3.,   5., 124.,   2.], dtype=float32)}
{'info': array([  3.,   5., 124.,   2.], dtype=float32)}
{'info': array([  2.,   5., 124.,   2.], dtype=float32)}
{'info': array([  1.,   5., 124.,   2.], dtype=float32)}
{'info': array([  1.,   5., 124.,   2.], dtype=float32)}
{'info': array([  1.,   6., 124.,   1.], dtype=float32)}
{'info': array([  1.,   6., 124.,   1.], dtype=float32)}
{'info': array([  1.,   6., 124.,   1.], dtype=float32)}
{'info': array([  1.,   6., 124.,   2.], dtype=float32)}
{'info': array([  1.,   5., 124.,   0.], dtype=float32)}
{'info': array([  1.,   5., 124.,   2.], dtype=float32)}
{'info': array([  1.,   5., 124.,   2.], dtype=float32)}
{'info': array([  1.,   5., 124.,   2.], dtype=float32)}
{'info': array([  1.,   5., 124.,   2.], dtype=float32)}
{'info': array([  1.,   4., 124.,   0.], dtype=float32)}
{'info': array([  1.,   3., 124.,   0.], dtype=float32)}
{'info'

KeyboardInterrupt: 

In [7]:
import matplotlib.pyplot as plt
import numpy as np

# Evaluate the policy on a fresh environment and visualize the trajectory
def test_policy(model):
    # Initialize a test environment
    test_env = make_env()
    obs, info = test_env.reset()

    done = False
    map_positions = {}  # Dictionary to store positions per map

    # Test the learned policy
    while not done:
        action, _ = model.predict(obs)
        obs, reward, done, truncated, info = test_env.step(action)  # Unpack 5 values
        
        map_id = int(obs["info"][2])  # Third value in "info" array
        position = (obs["info"][0], obs["info"][1])  # X, Y coordinates

        # Store position in the corresponding map
        if map_id not in map_positions:
            map_positions[map_id] = []
        map_positions[map_id].append(position)

    # Create a separate plot for each map
    for map_id, positions in map_positions.items():
        positions = np.array(positions)

        plt.figure(figsize=(6, 6))
        plt.plot(positions[:, 0], positions[:, 1], marker="o", linestyle="-", color="b", label=f"Map {map_id}")
        plt.plot(positions[0, 0], positions[0, 1], "g^", markersize=10, label="Start")  # Starting point
        plt.plot(positions[-1, 0], positions[-1, 1], "rs", markersize=10, label="End")  # Final point

        plt.xlabel("Posición X")
        plt.ylabel("Posición Y")
        plt.title(f"Trayectoria en el Mapa {map_id}")
        plt.legend()
        plt.grid()
        plt.show()

# Load the trained PPO model
model = PPO.load("ppo_medarot")  # Load your trained model

# Test the policy on a single environment and visualize the trajectory
test_policy(model)


New Map
New Map
New Map
New Map


KeyboardInterrupt: 

# Continue Training a Model Not working

In [4]:
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import SubprocVecEnv
from gymnasium.wrappers import TransformObservation
from gymnasium.spaces import Box, Dict
import numpy as np

# Reload the previously saved PPO model
model = PPO.load("ppo_medarot")

# Define a function to recreate the environment (if it was customized)
def make_env():
    pyboy = PyBoy(r"C:\Users\USUARIO\Desktop\RLMEDA\MedarotKabuto.gb")
    train_env = GenericPyBoyEnv(pyboy, debug=False, render_mode=False)

    # Transform observation to remove unnecessary screen info
    train_env = TransformObservation(
        train_env,
        lambda obs: {"info": obs["info"]},  # Only use position info
        observation_space=Dict({
            "info": Box(low=0, high=255, shape=(4,), dtype=np.float32)  # Adjusted shape and dtype
        })
    )
    return train_env

# Number of parallel environments
num_envs = 4  # Use 4 parallel environments
vec_env = SubprocVecEnv([make_env for _ in range(num_envs)])

# Continue training the model
model.set_env(vec_env)  # Ensure the model is using the correct environment

# Continue training from the previous state for a set number of timesteps
model.learn(total_timesteps=50000)  # Continue training for 50,000 timesteps (or any other number)
model.save("ppo_medarot")  # Save the model again after additional training


Logging to ./ppo_tensorboard/PPO_21
------------------------------
| time/              |       |
|    fps             | 75    |
|    iterations      | 1     |
|    time_elapsed    | 217   |
|    total_timesteps | 16384 |
------------------------------
-----------------------------------------
| time/                   |             |
|    fps                  | 75          |
|    iterations           | 2           |
|    time_elapsed         | 434         |
|    total_timesteps      | 32768       |
| train/                  |             |
|    approx_kl            | 0.015452752 |
|    clip_fraction        | 0.0508      |
|    clip_range           | 0.2         |
|    entropy_loss         | -1.59       |
|    explained_variance   | 0.751       |
|    learning_rate        | 0.0001      |
|    loss                 | -0.159      |
|    n_updates            | 50          |
|    policy_gradient_loss | -0.00999    |
|    value_loss           | 0.0319      |
---------------------------------

# Plot Logs

In [None]:
# Load training logs
log_df = pd.read_csv("./ppo_medarot_logs.monitor.csv", skiprows=1)

# Plot episode rewards
plt.figure(figsize=(10, 5))
plt.plot(log_df["r"], label="Episode Reward", alpha=0.7)
plt.xlabel("Episode")
plt.ylabel("Reward")
plt.title("Training Reward Over Episodes")
plt.legend()
plt.show()

# Plot episode lengths
plt.figure(figsize=(10, 5))
plt.plot(log_df["l"], label="Episode Length", color="red", alpha=0.7)
plt.xlabel("Episode")
plt.ylabel("Length (Steps)")
plt.title("Training Episode Length Over Time")
plt.legend()
plt.show()
