In [1]:
# Installs the necessary Python and system libraries
try:
    from easypip import easyimport, easyinstall, is_notebook
except ModuleNotFoundError as e:
    get_ipython().run_line_magic("pip", "install 'easypip>=1.2.0'")
    from easypip import easyimport, easyinstall, is_notebook

easyinstall("swig")
easyinstall("bbrl>=0.2.2")
easyinstall("gymnasium")
easyinstall("mazemdp")
easyinstall("bbrl_gymnasium>=0.2.0")
easyinstall("tensorboard")
easyinstall("box2d-kengz")

[easypip] Installing bbrl_gymnasium>=0.2.0


In [2]:
import os
from typing import Tuple, List

import numpy as np
if is_notebook():
    get_ipython().run_line_magic("matplotlib", "inline")
import matplotlib.pyplot as plt

easyimport("gymnasium")
easyimport("bbrl_gymnasium")
from bbrl_gymnasium.envs.maze_mdp import MazeMDPEnv

[easypip] Installing bbrl_gymnasium


Matplotlib backend: module://matplotlib_inline.backend_inline


In [3]:
import gymnasium as gym
import bbrl_gymnasium

bbrl_env = gym.make('MazeMDP-v0', kwargs={"width": 5, "height": 5, "ratio": 0.2})
bbrl_env.reset()

bbrl_env.init_draw("The maze")

  logger.warn(


Output()

In [4]:
def objective_function(policy, env, horizon):
    """
    Calculates the total reward accumulated by following the given policy in the environment until termination
    or reaching the maximum number of steps.

    Args:
        policy (callable): The policy function that maps observations to actions.
        env (gym.Env): The environment.
        horizon (int): The maximum number of steps to take.

    Returns:
        float: The total accumulated reward.
    """
    total_reward = 0
    observation = env.reset()
    for _ in range(horizon):
        action = policy(observation)
        observation, reward, terminated, truncated, _ = env.step(action)
        total_reward += reward
        if terminated or truncated:
            break
    return total_reward


def basic_random_search(env_name, step_size, num_directions, noise_std, max_iterations, horizon):
    """
    Performs basic random search to optimize parameters for a given environment.

    Args:
        env_name (str): Name of the environment.
        step_size (float): Step size for parameter updates.
        num_directions (int): Number of directions sampled per iteration.
        noise_std (float): Standard deviation of exploration noise.
        max_iterations (int): Maximum number of iterations.
        horizon (int): Maximum number of steps per rollout.

    Returns:
        numpy.ndarray: Optimal parameters found through random search.
    """
    env = gym.make(env_name)
    num_params = env.observation_space.shape[0]

    theta = np.zeros(num_params)

    for _ in range(max_iterations):
        # Sample directions:
        ## Sample N directions (δ1, δ2, ..., δN) of the same size as the current parameters θj from the standard normal distribution.
        directions = np.random.randn(num_directions, num_params)

        # Collect rollouts and rewards:
        for direction in directions:
            # Construct two policies for each direction
            ## (We clip the action to ensure it falls within the valid action space defined by env.action_space.low and env.action_space.high)
            policy_plus = lambda obs: np.clip(theta + noise_std * direction, env.action_space.low, env.action_space.high) # πj,k,+(x) = πθj+νδk(x)
            policy_minus = lambda obs: np.clip(theta - noise_std * direction, env.action_space.low, env.action_space.high) # πj,k,−(x) = πθj−νδk(x)

            # Calculate rewards for each policy
            reward_plus = objective_function(policy_plus, env, horizon)
            reward_minus = objective_function(policy_minus, env, horizon)

            # Update parameters:
            update_step = (step_size / num_directions) * np.sum(reward_plus - reward_minus) * direction
            theta += update_step

    env.close()
    return theta


# Example:
env_name = 'Pendulum-v1'   # Environment: Pendulum
step_size = 0.05           # Step size for parameter updates
num_directions = 20        # Number of directions sampled per iteration
noise_std = 0.2            # Standard deviation of exploration noise
max_iterations = 50        # Maximum number of iterations
horizon = 150              # Horizon (number of time steps per rollout)

# Run BRS
optimal_params = basic_random_search(env_name, step_size, num_directions, noise_std, max_iterations, horizon)

# Print the optimal parameters found
print("Optimal Parameters:", optimal_params)


Optimal Parameters: [  3.5980421  -25.75099216  21.10023133]


In [5]:
##########################
# OUR ATTEMPT USING BBRL #
##########################

from mazemdp.toolbox import egreedy, egreedy_loc

def objective_function(mdp: MazeMDPEnv, policy, epsilon = 0.02):
    """
    Calculates the total reward accumulated by following the given policy in the MazeMDPEnv environment until termination
    or reaching the maximum number of steps, using an ε-greedy strategy for action selection.

    Args:
        mdp (MazeMDPEnv): The MazeMDPEnv environment.
        policy (callable): The policy function that maps states to actions.
        epsilon (float, optional): The epsilon value for ε-greedy action selection. Defaults to 0.02.

    Returns:
        float: The total accumulated reward.
    """
    total_reward = 0
    state, _ = mdp.reset(uniform = True)
    terminated = False
    truncated = False
    while not (terminated or truncated):
        action = egreedy(policy, state, epsilon)  # Using ε-greedy since step in BBRL takes as parameter the index of the action
        observation, reward, terminated, truncated, _ = mdp.step(action)
        total_reward += reward
        state = observation
    return total_reward


def basic_random_search(mdp: MazeMDPEnv, step_size, num_directions, noise_std, max_iterations):
    """
    Performs basic random search to optimize parameters for a given MazeMDPEnv environment.

    Args:
        mdp (MazeMDPEnv): The MazeMDPEnv environment.
        step_size (float): Step size for parameter updates.
        num_directions (int): Number of directions sampled per iteration.
        noise_std (float): Standard deviation of exploration noise.
        max_iterations (int): Maximum number of iterations.

    Returns:
        numpy.ndarray: Optimal parameters found through random search.
    """
    theta = np.zeros((mdp.nb_states, mdp.action_space.n))

    for _ in range(max_iterations):
        # Sample directions:
        ## Sample N directions (δ1, δ2, ..., δN) of the same size as the current parameters θj from the standard normal distribution.
        directions = np.random.randn(num_directions, mdp.action_space.n)

        # Collect rollouts and rewards:
        for direction in directions:
            # Construct two policies for each direction
            policy_plus = theta + noise_std * direction   # πj,k,+(x) = πθj+νδk(x)
            policy_minus = theta - noise_std * direction  # πj,k,−(x) = πθj−νδk(x)

            # Calculate rewards for each policy
            reward_plus = objective_function(mdp, policy_plus)
            reward_minus = objective_function(mdp, policy_minus)

            # Update parameters:
            update_step = (step_size / num_directions) * np.sum(reward_plus - reward_minus) * direction
            theta += update_step

    return theta[-1]


# Example:
step_size = 0.05           # Step size for parameter updates
num_directions = 20        # Number of directions sampled per iteration
noise_std = 0.2            # Standard deviation of exploration noise
max_iterations = 50        # Maximum number of iterations

# Run BRS
optimal_params = basic_random_search(bbrl_env, step_size, num_directions, noise_std, max_iterations)

# Print the optimal parameters found
print("Optimal Parameters:", optimal_params)


  logger.warn(


Optimal Parameters: [-0.15956021  0.14859785  0.22963451 -0.16514555]
