# 1. Import libraries an Super Mario Bross Env

In [None]:
from nes_py.wrappers import JoypadSpace
import gym_super_mario_bros
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT, RIGHT_ONLY 
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecEnv, SubprocVecEnv
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.callbacks import BaseCallback
import matplotlib.pyplot as plt
from gym_utils import SMBRamWrapper
import os
import torch
import time
import sys

In [None]:
# Device
if torch.cuda.is_available():
   device = 'cuda'
   device_name = torch.cuda.get_device_name(0)
   gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3
   print(f"GPU: {device_name}")
   print(f"Memory GPU available: {gpu_memory:.1f} GB")
else:
   device = 'cpu'
   print("CPU")

print(f"Selected device: {device}")

In [None]:
# i
world = 2
# j
level = 2

In [None]:
env = gym_super_mario_bros.make('SuperMarioBros-{}-{}-v0'.format(world, level))
env = JoypadSpace(env, SIMPLE_MOVEMENT)

# 2. Process Environment

In [None]:
# Setup cropping size
x0 = 0
x1 = 16
y0 = 0
y1 = 13
n_stack = 4
n_skip = 4

env_wrap = SMBRamWrapper(env, [x0, x1, y0, y1], n_stack=n_stack, n_skip=n_skip)

In [None]:
# test env_wrap
done = True
for i in range(150):
    if done:
        state = env_wrap.reset()
    state, reward, done, info = env_wrap.step(env_wrap.action_space.sample())

In [None]:
state.shape

In [None]:
fig, ax = plt.subplots(1, n_stack, figsize=(14,10))
for i in range(n_stack):
    ax[i].imshow(state[:,:,n_stack-i-1], vmin=-1, vmax=2)
plt.show()

In [None]:
# Apply other wrapper functions
env_wrap = Monitor(env_wrap)  # for tensorboard log
env_wrap = DummyVecEnv([lambda: env_wrap])

# 3. Setup RL Model

In [None]:
# Callbacks
# Save intermediate models

class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, 
                 starting_steps=0, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path
        self.starting_steps = starting_steps

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls + int(self.starting_steps)))
            self.model.save(model_path)

        return True
    
# Linear learning rate schedule
# https://stable-baselines3.readthedocs.io/en/master/guide/examples.html#learning-rate-schedule
from typing import Callable

def linear_schedule(initial_value: float) -> Callable[[float], float]:
    """
    Linear learning rate schedule.

    :param initial_value: Initial learning rate.
    :return: schedule that computes
      current learning rate depending on remaining progress
    """
    def func(progress_remaining: float) -> float:
        """
        Progress will decrease from 1 (beginning) to 0.

        :param progress_remaining:
        :return: current learning rate
        """
        return progress_remaining * initial_value

    return func

In [None]:
### MODIFY THESE TWO DIRECTORIES BEFORE TRAINING A NEW MODEL ###
MODEL_DIR = './models/NEW_MODEL_DIR'
LOG_DIR = './logs/NEW_LOG_DIR'

In [None]:
model = PPO('MlpPolicy', 
            env_wrap, 
            verbose=1, 
            learning_rate=linear_schedule(3e-4), 
            tensorboard_log=LOG_DIR,
            device=device
        ) 

In [None]:
callback = TrainAndLoggingCallback(check_freq=1e5, starting_steps=0, save_path=MODEL_DIR)

# Train the model

In [None]:
total_timesteps = 1000000

class ProgressBarCallback(BaseCallback):
    def __init__(self, total_timesteps, verbose=0, update_freq=1000):
        super().__init__(verbose)
        self.total_timesteps = int(total_timesteps)
        self.update_freq = update_freq  # Actualiza cada N pasos
        self.start_time = time.time()

    def _on_step(self) -> bool:
        # Solo actualiza cada update_freq pasos para reducir overhead
        if self.num_timesteps % self.update_freq == 0 or self.num_timesteps == self.total_timesteps:
            progress = self.num_timesteps / self.total_timesteps
            elapsed_time = time.time() - self.start_time
            
            # Estimar tiempo restante
            if progress > 0:
                eta_seconds = (elapsed_time / progress) * (1 - progress)
                eta_hours = eta_seconds / 3600
                eta_str = f"{eta_hours:.2f}h" if eta_hours >= 1 else f"{eta_seconds/60:.1f}m"
            else:
                eta_str = "?"
            
            # Crear barra de progreso visual
            bar_length = 30
            filled_length = int(bar_length * progress)
            bar = '█' * filled_length + '▒' * (bar_length - filled_length)
            
            # Actualizar línea (sobrescribe la anterior)
            sys.stdout.write(f'\r[{bar}] {progress*100:.1f}% ({self.num_timesteps:,}/{self.total_timesteps:,}) ETA: {eta_str}')
            sys.stdout.flush()
        
        return True

    def _on_training_end(self) -> None:
        elapsed_time = time.time() - self.start_time
        # Nueva línea al final + resumen
        print(f'\n✓ Entrenamiento completado en {elapsed_time/3600:.2f} horas')

# ---- Uso ----
t_start = time.time()

callback = ProgressBarCallback(total_timesteps=total_timesteps, update_freq=1000)
model.learn(total_timesteps=total_timesteps, callback=callback)

t_elapsed = time.time() - t_start
print(f"Tiempo total: {t_elapsed/3600:.2f} horas")

In [None]:
print('Wall time: {} s'.format(round(t_elapsed, 2)))

# Save and load trained model

In [None]:
# Save model
model_path = os.path.join(MODEL_DIR, 'SAVED_MODEL_NAME')
model.save(model_path)

In [None]:
del model

In [None]:
# Load model
MODEL_DIR = './models/NEW_MODEL_DIR'
LOG_DIR = './logs/NEW_LOG_DIR'

model_path = os.path.join(MODEL_DIR, 'SAVED_MODEL_NAME')
model = PPO.load(model_path, env=env_wrap)

# 4. Test the Trained Model

In [None]:
from stable_baselines3.common.evaluation import evaluate_policy

In [None]:
evaluate_policy(model, env_wrap, n_eval_episodes=1, deterministic=True, render=False, return_episode_rewards=False)

In [None]:
episode = 10

for episode in range(1, episode+1):
    states = env_wrap.reset()
    done = False
    score = 0
    
    while not done:
        env_wrap.render()
        action, _ = model.predict(states, deterministic=True)
        states, reward, done, info = env_wrap.step(action)
        score += reward
        time.sleep(0.01)
    print('Episode:{} Score:{}'.format(episode, score))
    
env.close()