# Ray Overview Tutorials: RLLib

In [8]:
import gymnasium as gym
import numpy as np
import torch
from typing import Dict, Tuple, Any, Optional

from ray.rllib.algorithms.ppo import PPOConfig


# Define your problem using python and Farama-Foundation's gymnasium API:
class SimpleCorridor(gym.Env):
    """Corridor environment where an agent must learn to move right to reach the exit.

    ---------------------
    | S | 1 | 2 | 3 | G |   S=start; G=goal; corridor_length=5
    ---------------------

    Actions:
        0: Move left
        1: Move right

    Observations:
        A single float representing the agent's current position (index)
        starting at 0.0 and ending at corridor_length

    Rewards:
        -0.1 for each step
        +1.0 when reaching the goal

    Episode termination:
        When the agent reaches the goal (position >= corridor_length)
    """

    def __init__(self, config):
        self.end_pos = config["corridor_length"]
        self.cur_pos = 0.0
        self.action_space = gym.spaces.Discrete(2)  # 0=left, 1=right
        self.observation_space = gym.spaces.Box(0.0, self.end_pos, (1,), np.float32)

    def reset(
        self, *, seed: Optional[int] = None, options: Optional[Dict] = None
    ) -> Tuple[np.ndarray, Dict]:
        """Reset the environment for a new episode.

        Args:
            seed: Random seed for reproducibility
            options: Additional options (not used in this environment)

        Returns:
            Initial observation of the new episode and an info dict.
        """
        super().reset(seed=seed)  # Initialize RNG if seed is provided
        self.cur_pos = 0.0
        # Return initial observation.
        return np.array([self.cur_pos], np.float32), {}

    def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict]:
        """Take a single step in the environment based on the provided action.

        Args:
            action: 0 for left, 1 for right

        Returns:
            A tuple of (observation, reward, terminated, truncated, info):
                observation: Agent's new position
                reward: Reward from taking the action (-0.1 or +1.0)
                terminated: Whether episode is done (reached goal)
                truncated: Whether episode was truncated (always False here)
                info: Additional information (empty dict)
        """
        # Walk left if action is 0 and we're not at the leftmost position
        if action == 0 and self.cur_pos > 0:
            self.cur_pos -= 1
        # Walk right if action is 1
        elif action == 1:
            self.cur_pos += 1
        # Set `terminated` flag when end of corridor (goal) reached.
        terminated = self.cur_pos >= self.end_pos
        truncated = False
        # +1 when goal reached, otherwise -0.1.
        reward = 1.0 if terminated else -0.1
        return np.array([self.cur_pos], np.float32), reward, terminated, truncated, {}


# Create an RLlib Algorithm instance from a PPOConfig object.
print("Setting up the PPO configuration...")
config = (
    PPOConfig().environment(
        # Env class to use (our custom gymnasium environment).
        SimpleCorridor,
        # Config dict passed to our custom env's constructor.
        # Use corridor with 20 fields (including start and goal).
        env_config={"corridor_length": 20},
    )
    # Parallelize environment rollouts for faster training.
    .env_runners(num_env_runners=3)
    # Use a smaller network for this simple task
    .training(model={"fcnet_hiddens": [64, 64]})
)

# Construct the actual PPO algorithm object from the config.
algo = config.build_algo()
rl_module = algo.get_module()

# Train for n iterations and report results (mean episode rewards).
# Optimal reward calculation:
# - Need at least 19 steps to reach the goal (from position 0 to 19)
# - Each step (except last) gets -0.1 reward: 18 * (-0.1) = -1.8
# - Final step gets +1.0 reward
# - Total optimal reward: -1.8 + 1.0 = -0.8
print("\nStarting training loop...")
for i in range(5):
    results = algo.train()

    # Log the metrics from training results
    print(f"Iteration {i+1}")
    print(f"  Training metrics: {results['env_runners']}")

# Save the trained algorithm (optional)
checkpoint_dir = algo.save()
print(f"\nSaved model checkpoint to: {checkpoint_dir}")

print("\nRunning inference with the trained policy...")
# Create a test environment with a shorter corridor to verify the agent's behavior
env = SimpleCorridor({"corridor_length": 10})
# Get the initial observation (should be: [0.0] for the starting position).
obs, info = env.reset()
terminated = truncated = False
total_reward = 0.0
step_count = 0

# Play one episode and track the agent's trajectory
print("\nAgent trajectory:")
positions = [float(obs[0])]  # Track positions for visualization

while not terminated and not truncated and step_count < 1000:
    # Compute an action given the current observation
    action_logits = rl_module.forward_inference(
        {"obs": torch.from_numpy(obs).unsqueeze(0)}
    )["action_dist_inputs"].numpy()[
        0
    ]  # [0]: Batch dimension=1

    # Get the action with highest probability
    action = np.argmax(action_logits)

    # Log the agent's decision
    action_name = "LEFT" if action == 0 else "RIGHT"
    print(f"  Step {step_count}: Position {obs[0]:.1f}, Action: {action_name}")

    # Apply the computed action in the environment
    obs, reward, terminated, truncated, info = env.step(action)
    positions.append(float(obs[0]))

    # Sum up rewards
    total_reward += reward
    step_count += 1

# Report final results
print(f"\nEpisode complete:")
print(f"  Steps taken: {step_count}")
print(f"  Total reward: {total_reward:.2f}")
print(f"  Final position: {obs[0]:.1f}")

# Verify the agent has learned the optimal policy
if total_reward > -0.5 and obs[0] >= 9.0:
    print("  Success! The agent has learned the optimal policy (always move right).")
else:
    print("  Failure! The agent didn't reach the goal within 1000 timesteps.")



Setting up the PPO configuration...


KeyboardInterrupt: 

# RLLib Getting Started 

In [1]:
from ray.rllib.algorithms.ppo import PPOConfig

# Create a config instance for the PPO algorithm.
config = (
    PPOConfig()
    .environment("Pendulum-v1")
)

In [2]:
config.training(
    lr=0.0002,
    train_batch_size_per_learner=2000,
    num_epochs=10,
)

<ray.rllib.algorithms.ppo.ppo.PPOConfig at 0x312ca0f80>

In [5]:
from pprint import pprint
ppo = config.build_algo()
for _ in range(4):
    pprint(ppo.train())

`UnifiedLogger` will be removed in Ray 2.7.
  return UnifiedLogger(config, logdir, loggers=None)
The `JsonLogger interface is deprecated in favor of the `ray.tune.json.JsonLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
The `CSVLogger interface is deprecated in favor of the `ray.tune.csv.CSVLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
The `TBXLogger interface is deprecated in favor of the `ray.tune.tensorboardx.TBXLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
2025-10-06 15:48:38,225	INFO worker.py:1951 -- Started a local Ray instance.
[2025-10-06 15:48:40,053 E 29591 995670] core_worker.cc:2246: Actor with class name: 'SingleAgentEnvRunner' and ID: '949fa719064dcafb83332ba501000000' has constructor arguments in the object store and max_restarts > 0. If the 

{'config': {'_disable_action_flattening': False,
            '_disable_execution_plan_api': -1,
            '_disable_initialize_loss_from_dummy_batch': False,
            '_disable_preprocessor_api': False,
            '_dont_auto_sync_env_runner_states': False,
            '_enable_rl_module_api': -1,
            '_env_to_module_connector': None,
            '_fake_gpus': False,
            '_is_atari': None,
            '_is_online': True,
            '_learner_class': None,
            '_learner_connector': None,
            '_model_config': {},
            '_module_to_env_connector': None,
            '_per_module_overrides': {},
            '_prior_exploration_config': {'type': 'StochasticSampling'},
            '_rl_module_spec': None,
            '_tf_policy_handles_more_than_one_loss': False,
            '_torch_grad_scaler_class': None,
            '_torch_lr_scheduler_classes': None,
            '_train_batch_size_per_learner': 2000,
            '_use_msgpack_checkpoints': F



[36m(autoscaler +10m23s)[0m Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.






In [13]:
# Check policy for a trained and cached algorithm instance
rl_module = ppo.get_module("default_policy")  # Equivalent to `rl_module = ppo.get_module()`

In [6]:
config.evaluation(
    # Run one evaluation round every iteration.
    evaluation_interval=1,

    # Create 2 eval EnvRunners in the extra EnvRunnerGroup.
    evaluation_num_env_runners=2,

    # Run evaluation for exactly 10 episodes. Note that because you have
    # 2 EnvRunners, each one runs through 5 episodes.
    evaluation_duration_unit="episodes",
    evaluation_duration=10,
)

# Rebuild the PPO, but with the extra evaluation EnvRunnerGroup
ppo_with_evaluation = config.build_algo()

for _ in range(3):
    pprint(ppo_with_evaluation.train())

`UnifiedLogger` will be removed in Ray 2.7.
  return UnifiedLogger(config, logdir, loggers=None)
The `JsonLogger interface is deprecated in favor of the `ray.tune.json.JsonLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
The `CSVLogger interface is deprecated in favor of the `ray.tune.csv.CSVLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
The `TBXLogger interface is deprecated in favor of the `ray.tune.tensorboardx.TBXLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))


{'config': {'_disable_action_flattening': False,
            '_disable_execution_plan_api': -1,
            '_disable_initialize_loss_from_dummy_batch': False,
            '_disable_preprocessor_api': False,
            '_dont_auto_sync_env_runner_states': False,
            '_enable_rl_module_api': -1,
            '_env_to_module_connector': None,
            '_fake_gpus': False,
            '_is_atari': None,
            '_is_online': True,
            '_learner_class': None,
            '_learner_connector': None,
            '_model_config': {},
            '_module_to_env_connector': None,
            '_per_module_overrides': {},
            '_prior_exploration_config': {'type': 'StochasticSampling'},
            '_rl_module_spec': None,
            '_tf_policy_handles_more_than_one_loss': False,
            '_torch_grad_scaler_class': None,
            '_torch_lr_scheduler_classes': None,
            '_train_batch_size_per_learner': 2000,
            '_use_msgpack_checkpoints': F

In [10]:
from ray import train, tune
from ray.rllib.algorithms.ppo import PPOConfig

config = (
    PPOConfig()
    .environment("Pendulum-v1")
    # Specify a simple tune hyperparameter sweep.
    .training(
        lr=tune.grid_search([0.001, 0.0005, 0.0001]),
    )
)

# Create a Tuner instance to manage the trials.
tuner = tune.Tuner(
    config.algo_class,
    param_space=config,
    # Specify a stopping criterion. Note that the criterion has to match one of the
    # pretty printed result metrics from the results returned previously by
    # ``.train()``. Also note that -1100 is not a good episode return for
    # Pendulum-v1, we are using it here to shorten the experiment time.
    run_config=train.RunConfig(
        stop={"env_runners/episode_return_mean": -1100.0},
    ),
)
# Run the Tuner and capture the results.
results = tuner.fit()

0,1
Current time:,2025-10-06 16:21:26
Running for:,00:10:36.66
Memory:,13.0/18.0 GiB

Trial name,status,loc,lr
PPO_Pendulum-v1_42f45_00000,PENDING,,0.001
PPO_Pendulum-v1_42f45_00001,PENDING,,0.0005
PPO_Pendulum-v1_42f45_00002,PENDING,,0.0001


2025-10-06 16:21:26,593	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/Users/paula/ray_results/PPO_2025-10-06_16-10-49' in 0.0023s.
2025-10-06 16:21:26,597	INFO tune.py:1041 -- Total run time: 636.67 seconds (636.66 seconds for the tuning loop).
Resume experiment with: Tuner.restore(path="/Users/paula/ray_results/PPO_2025-10-06_16-10-49", trainable=...)
- PPO_Pendulum-v1_42f45_00000: FileNotFoundError('Could not fetch metrics for PPO_Pendulum-v1_42f45_00000: both result.json and progress.csv were not found at /Users/paula/ray_results/PPO_2025-10-06_16-10-49/PPO_Pendulum-v1_42f45_00000_0_lr=0.0010_2025-10-06_16-10-49')
- PPO_Pendulum-v1_42f45_00001: FileNotFoundError('Could not fetch metrics for PPO_Pendulum-v1_42f45_00001: both result.json and progress.csv were not found at /Users/paula/ray_results/PPO_2025-10-06_16-10-49/PPO_Pendulum-v1_42f45_00001_1_lr=0.0005_2025-10-06_16-10-49')
- PPO_Pendulum-v1_42f45_00002: FileNotFoundError('Could not

In [None]:
# Get the best checkpoint corresponding to the best result
# from the preceding experiment.
best_checkpoint = best_result.checkpoint

In [None]:
# Use tune with ray rllib to run hyperparameter tuning experiments

from pathlib import Path
import gymnasium as gym
import numpy as np
import torch
from ray.rllib.core.rl_module import RLModule

# Create only the neural network (RLModule) from our algorithm checkpoint.
# See here (https://docs.ray.io/en/master/rllib/checkpoints.html)
# to learn more about checkpointing and the specific "path" used.
rl_module = RLModule.from_checkpoint(
    Path(best_checkpoint.path)
    / "learner_group"
    / "learner"
    / "rl_module"
    / "default_policy"
)

# Create the RL environment to test against (same as was used for training earlier).
env = gym.make("Pendulum-v1", render_mode="human")

episode_return = 0.0
done = False

# Reset the env to get the initial observation.
obs, info = env.reset()

while not done:
    # Uncomment this line to render the env.
    # env.render()

    # Compute the next action from a batch (B=1) of observations.
    obs_batch = torch.from_numpy(obs).unsqueeze(0)  # add batch B=1 dimension
    model_outputs = rl_module.forward_inference({"obs": obs_batch})

    # Extract the action distribution parameters from the output and dissolve batch dim.
    action_dist_params = model_outputs["action_dist_inputs"][0].numpy()

    # We have continuous actions -> take the mean (max likelihood).
    greedy_action = np.clip(
        action_dist_params[0:1],  # 0=mean, 1=log(stddev), [0:1]=use mean, but keep shape=(1,)
        a_min=env.action_space.low[0],
        a_max=env.action_space.high[0],
    )
    # For discrete actions, you should take the argmax over the logits:
    # greedy_action = np.argmax(action_dist_params)

    # Send the action to the environment for the next step.
    obs, reward, terminated, truncated, info = env.step(greedy_action)

    # Perform env-loop bookkeeping.
    episode_return += reward
    done = terminated or truncated

print(f"Reached episode return of {episode_return}.")

NameError: name 'best_checkpoint' is not defined