<a href="https://colab.research.google.com/github/HienTChau/SecretGardenRL/blob/main/SecretGarden.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Building the Secret Garden
Solving a Cookie Run Kingdom minigame with Reinforcement Learning

## Preliminaries

###Software Installation

In [9]:
%%capture

# remove legacy gym (installed in Colab by default)
!pip uninstall -y gym

# environments
! pip install "gymnasium[box2d, atari, mujoco]"

! sudo apt install swig # needed for box2d

# RL algorithms
! pip install "stable-baselines3[extra]"

# visualization
! apt-get update && apt-get install ffmpeg freeglut3-dev xvfb

### Imports

In [10]:
import stable_baselines3
import gymnasium as gym
import numpy as np
from typing import Optional

import os
import re
import base64

from time import sleep
from pathlib import Path

# for pretty display in notebook
from IPython import display as ipythondisplay
from IPython.display import clear_output

from stable_baselines3.common import results_plotter
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import VecVideoRecorder, DummyVecEnv
from stable_baselines3.common.callbacks import BaseCallback, CallbackList
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.results_plotter import load_results, ts2xy, plot_results

# suppresses deprecation warnings
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

# display library versions
print(f'stable_baselines3 version: {stable_baselines3.__version__}')
print(f'gym version: {gym.__version__}')

stable_baselines3 version: 2.7.0
gym version: 1.2.2


### Helper Code

In [11]:
def create_dirs(dirs):
  for dir in dirs:
    if not os.path.exists(dir):
      os.makedirs(dir)
      print(f"Created directory: {dir}")

def create_env_instance(env_name, log_dir=None, wrap=True, **env_kwargs):
  env = gym.make(env_name, **env_kwargs)

  # wraps the environment to provide additional functionality
  if wrap:
    env = Monitor(env, filename=log_dir)

  return env

def show_env_info(env):
  spec = gym.spec(env.unwrapped.spec.id)

  print(f'Environment Name: {spec.id}')
  print(f'Action Space: {env.action_space}')
  print(f'Observation Space: {env.observation_space}')
  print(f'Max Episode Steps: {spec.max_episode_steps}')
  print(f'Nondeterministic: {spec.nondeterministic}')

def get_session_name(env, algorithm):
    """
    Build a filesystem-friendly session name from an environment and an
    SB3 algorithm instance.

    Example output: "CartPole-v1_PPO"
    """
    if isinstance(env, str):
        env_name = env
    else:
        # handle vectorized envs (DummyVecEnv, SubprocVecEnv, etc.)
        base_env = env
        if hasattr(env, "envs") and len(env.envs) > 0:
            base_env = env.envs[0]

        # unwrap if needed (Monitor, TimeLimit, etc.)
        base_env = getattr(base_env, "unwrapped", base_env)

        # try spec.id, fall back to class name
        spec = getattr(base_env, "spec", None)
        env_name = getattr(spec, "id", None) or type(base_env).__name__

    # get algorithm name from instance
    alg_name = type(algorithm).__name__   # e.g., "PPO", "TD3", "SAC"

    # combine and make filesystem-safe
    raw_name = f"{env_name}_{alg_name}"
    safe_name = re.sub(r"[^A-Za-z0-9_.-]+", "_", raw_name)

    return safe_name

class SaveOnBestTrainingRewardCallback(BaseCallback):
    def __init__(self, check_freq: int, log_dir: str, save_dir: str, save_filename: str, verbose=1):
        super().__init__(verbose)
        self.check_freq = check_freq
        self.log_dir = log_dir
        self.save_dir = save_dir
        self.save_path = os.path.join(save_dir, save_filename)
        self.best_mean_reward = -np.inf

    def _on_step(self) -> bool:
        if self.n_calls % self.check_freq == 0:

            # retrieve training reward
            x, y = results_plotter.ts2xy(
                results_plotter.load_results(self.log_dir), "timesteps")
            if len(x) > 0:
                # mean training reward over the last 100 episodes
                mean_reward = np.mean(y[-100:])
                if self.verbose > 0:
                    clear_output(wait=True)

                    print(f"Num timesteps: {self.num_timesteps}")
                    print(
                        f"Previous best mean reward: {self.best_mean_reward:.2f} - Last mean reward per episode: {mean_reward:.2f}"
                    )

                # new best model, you could save the agent here
                if mean_reward > self.best_mean_reward:
                    self.best_mean_reward = mean_reward
                    # example for saving best model
                    if self.verbose > 0:
                        print(f"Saving new best model to {self.save_path}.zip")
                    self.model.save(self.save_path)

        return True

def record_video(env_id, model, video_path, prefix="rl-video", video_length=500):
    eval_env = DummyVecEnv([lambda: gym.make(env_id, render_mode="rgb_array")])
    eval_env = VecVideoRecorder(
        eval_env,
        video_folder=video_path,
        record_video_trigger=lambda step: step == 0,
        video_length=video_length,
        name_prefix=prefix
    )

    obs = eval_env.reset()
    for _ in range(video_length):
        action, _ = model.predict(obs)
        obs, _, _, _ = eval_env.step(action)

    eval_env.close()

def show_videos(video_path, prefix="rl-video"):
    html = []
    for mp4 in Path(video_path).glob(f"{prefix}*.mp4"):
        video_b64 = base64.b64encode(mp4.read_bytes())
        html.append(
            """<video alt="{}" autoplay
                    loop controls style="height: 400px;">
                    <source src="data:video/mp4;base64,{}" type="video/mp4" />
                </video>""".format(
                mp4, video_b64.decode("ascii")
            )
        )
    ipythondisplay.display(ipythondisplay.HTML(data="<br>".join(html)))

### Google Drive Configuration

In [12]:
# mounts your Google Drive as a directory in your Colab virtual machine
from google.colab import drive
drive.mount("/content/drive")
# creates the top-level directory where all Google Drive content will be saved
drive_basedir='/content/drive/MyDrive/comp377/colab'
os.makedirs(drive_basedir, exist_ok=True)
! ls -R $drive_basedir

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/MyDrive/comp377/colab:
figures  models  videos

/content/drive/MyDrive/comp377/colab/figures:

/content/drive/MyDrive/comp377/colab/models:
CartPole-v1_PPO.zip

/content/drive/MyDrive/comp377/colab/videos:


### Global Configuration

In [13]:
# contains temporary files created during training
train_dir = '/tmp/gym/model/train/'
tensorboard_dir = '/tmp/gym/tensorboard'

# directories that will contain content generated during training
model_dir  = os.path.join(drive_basedir, 'models')
video_dir  = os.path.join(drive_basedir, 'videos')
figure_dir = os.path.join(drive_basedir, 'figures')

os.makedirs(train_dir, exist_ok=True)
os.makedirs(tensorboard_dir, exist_ok=True)
os.makedirs(model_dir, exist_ok=True)
os.makedirs(video_dir, exist_ok=True)
os.makedirs(figure_dir, exist_ok=True)

# set up fake display; otherwise video rendering will fail
os.system("Xvfb :1 -screen 0 1024x768x24 &")
os.environ['DISPLAY'] = ':1'

## Environment

The project requires a customized environment in Gymnasium.

Each game grants the board a 4x4 tile board. Players can place a building/block on a random empty space on the board. Blocks Lv. 1, 2, and 3 each have a chance to be placed on the board, 60%, 25%, and 15% resprectively, with the next block to be placed being visible.

Blocks can be merged with an adjacent block of the same level to create a higher-level block at the new location. Each new or merged block grants points equal to **2 to the power of the block's level - 1**. Blocks can be leveled up to Lv. 12 maximum.

Creating any Lv. block for the first time in the game unlocks a reward for Lv.4 blocks and above
Items can be used at any time as long as the game hasn't ended.

- Rewind: Undoes the last merge, or removes the last placed building. Can only be used 3 times between actions.
- Hammer: Destroys and frees up a block.
- Blueprint: Places the next building on a chosen tile.

| Level | Rewards (modified) |
|-------|---------|
| 1 | ‚Äî |
| 2 | ‚Äî |
| 3 | ‚Äî |
| 4 | Hammer x5 |
| 5 | Rewind x1 |
| 6 | Blueprint x1 |
| 7 | - |
| 8 | Hammer x5 |
| 9 | Rewind x1 |
| 10 | Blueprint x1 |
| 11 | - |
| 12 | - |

**Key Design Questions**

<ul>
üéØ Skill: Select the optimal action

üëÄ Information: The current state of the board, next tile and available items

üéÆ Actions: Skip, Merge (Hammer, Rewind, Blueprint will be added later)

üèÜ Success: Achieve maximum score or create the highest level block in minimum steps

‚è∞ End: When there is no available move (or optional time limit)
<ul>


In [32]:
class SecretGardenEnv(gym.Env):
  def __init__(self, size: int = 4):
    # The size of the square grid
    self.size = size
    self.n_cells = size * size

    self._board = None
    self._next_block = None
    self._score = 0

    # Observation space
    self.observation_space = gym.spaces.Dict({
      'board': gym.spaces.Box(low=0, high=12, shape=(size, size), dtype=np.uint8),
      "next_block": gym.spaces.Discrete(4) # will use 1, 2, 3
      # 'items': gym.spaces.Box(low=0, high=1, shape=(3,), dtype=np.uint8) #this will be added later
    })

    # Action space
    # 0 = PLACE
    # 1‚Äì256 = MERGE(i, j)
    self.action_space = gym.spaces.Discrete(1 + self.n_cells * self.n_cells)

    self._action_map = {}
    for i in range(self.n_cells):
      for j in range(self.n_cells):
        self._action_map[1 + i * self.n_cells + j] = (i, j)

  def generate_next_block(self):
    return np.random.choice(
        [1,2,3],
        p=[0.6, 0.25, 0.15]
    )

  def _get_obs(self):
    """Convert internal state to observation format.

    Returns:
        dict: Observation with current board and next block
    """
    return {"board": self._board, "next_block": self._next_block}

  def _get_info(self):
    """Compute auxiliary information for debugging.

    Returns:
        dict: Info with the current score
    """
    return {
        "score": self._score,
    }

  def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
    """Start a new episode.

    Args:
        seed: Random seed for reproducible episodes
        options: Additional configuration (unused in this example)

    Returns:
        tuple: (observation, info) for the initial state
    """
    # Seed the random number generator
    super().reset(seed=seed)

    # Initialize the board
    self._board = np.zeros((self.size, self.size), dtype=np.uint8)

    # Generate the next block
    self._next_block = self.generate_next_block()

    # Set score
    self._score = 0

    # Return the initial observation and info
    observation = self._get_obs()
    info = self._get_info()

    return observation, info

  def _get_empty_cells(self):
    """Returns a list of empty cells on the board.

    Returns:
        list: List of empty cells
    """
    empty_cells = []
    for i in range(self.size):
      for j in range(self.size):
        if self._board[i, j] == 0:
          empty_cells.append((i, j))
    return empty_cells

  def _has_valid_merge(self):
    """Checks if there is a valid merge on the board.

    Returns:
        bool: True if there is a valid merge, False otherwise
    """
    for i in range(self.size):
      for j in range(self.size):
        cur = self._board[i, j]
        if cur == 0:
          continue
        # Check right
        if j + 1 < self.size and self._board[i, j + 1] == cur:
            return True
        # Check down
        if i + 1 < self.size and self._board[i + 1, j] == cur:
            return True
    return False

  def _is_terminal(self):
    """Checks if the game is over.

    Returns:
        bool: True if the game is over, False otherwise
    """
    return not self._has_valid_merge() and not self._get_empty_cells()

  def _is_valid_merge(self, ax, ay, bx, by):
    """Checks if a merge is valid."""

    # A and B must be adjacent
    if abs(ax - bx) + abs(ay - by) != 1:
        return False

    lvlA = self._board[ax, ay]
    lvlB = self._board[bx, by]

    # Both must be nonzero and same level
    if lvlA == 0 or lvlB == 0:
        return False
    if lvlA != lvlB:
        return False

    # Cannot merge at max level
    if lvlA >= 12:
        return False

    return True

  def step(self, action):
    """Execute one time step within the environment.

    Args:
        action: Action to be executed

    Returns:
        tuple: (observation, reward, terminated, truncated, info)
    """
    terminated = False
    truncated = False
    reward = 0

    # 1. PLACE ACTION

    if action == 0:
      # Get empty cells
      empty_cells = self._get_empty_cells()

      if not empty_cells:
        return self._get_obs(), reward, terminated, truncated, self._get_info()

      # Place the block in a random empty cell
      i, j = empty_cells[np.random.randint(len(empty_cells))]
      self._board[i, j] = self._next_block

      # Reward for placing
      reward += 1

      self._score += reward
      self._next_block = self.generate_next_block()

    # 2. MERGE ACTION
    elif action > 0:
      # Tile A is the source, tile B is the destination
      A, B = self._action_map[action]

      # Decode tiles into 2D pos
      ax, ay = divmod(A, self.size)
      bx, by = divmod(B, self.size)

      if self._is_valid_merge(ax, ay, bx, by):
        # Merge the blocks
        self._board[bx, by] += 1
        self._board[ax, ay] = 0

        # Reward for merging
        reward += 2 ** (self._board[bx, by] - 1)

        self._score += reward

    if self._is_terminal():
      terminated = True

    return self._get_obs(), reward, terminated, truncated, self._get_info()

  def render(self, mode="human"):
    print("Score:", self._score)
    print("Next block:", self._next_block)
    print(self._board)
    print("-" * 20)


In [33]:
env = SecretGardenEnv()
obs, info = env.reset()
env.render()

obs, reward, done, trunc, info = env.step(0)  # PLACE
env.render()


Score: 0
Next block: 1
[[0 0 0 0]
 [0 0 0 0]
 [0 0 0 0]
 [0 0 0 0]]
--------------------
Score: 1
Next block: 2
[[0 0 0 0]
 [0 0 0 0]
 [0 0 0 1]
 [0 0 0 0]]
--------------------
