In [8]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from tqdm import tqdm

def get_dataset():
    return datasets.FashionMNIST(
        root="/tmp/data",
        train=True,
        download=True,
        transform=ToTensor(),
    )

class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10),
        )

    def forward(self, inputs):
        inputs = self.flatten(inputs)
        logits = self.linear_relu_stack(inputs)
        return logits

def train_func():
    num_epochs = 3
    batch_size = 64

    dataset = get_dataset()
    dataloader = DataLoader(dataset, batch_size=batch_size)

    model = NeuralNetwork()

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    for epoch in tqdm(range(num_epochs)):
        for inputs, labels in dataloader:
            optimizer.zero_grad()
            pred = model(inputs)
            loss = criterion(pred, labels)
            loss.backward()
            optimizer.step()
        print(f"epoch: {epoch}, loss: {loss.item()}")

train_func()

 33%|███▎      | 1/3 [00:08<00:17,  8.50s/it]

epoch: 0, loss: 0.9511812329292297


 67%|██████▋   | 2/3 [00:16<00:08,  8.33s/it]

epoch: 1, loss: 0.774529218673706


100%|██████████| 3/3 [00:24<00:00,  8.22s/it]

epoch: 2, loss: 0.7073671817779541





In [11]:
import ray.train.torch
from tqdm import tqdm

def train_func_distributed():
    num_epochs = 3
    batch_size = 64

    dataset = get_dataset()
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    dataloader = ray.train.torch.prepare_data_loader(dataloader)

    model = NeuralNetwork()
    model = ray.train.torch.prepare_model(model)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    for epoch in tqdm(range(num_epochs)):
        if ray.train.get_context().get_world_size() > 1:
            dataloader.sampler.set_epoch(epoch)

        for inputs, labels in dataloader:
            optimizer.zero_grad()
            pred = model(inputs)
            loss = criterion(pred, labels)
            loss.backward()
            optimizer.step()
        print(f"epoch: {epoch}, loss: {loss.item()}")
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

# For GPU Training, set `use_gpu` to True.
use_gpu = True

trainer = TorchTrainer(
    train_func_distributed,
    scaling_config=ScalingConfig(num_workers=4, use_gpu=use_gpu)
)

results = trainer.fit()

2025-06-08 01:25:57,031	INFO tune.py:616 -- [output] This uses the legacy output and progress reporter, as Jupyter notebooks are not supported by the new engine, yet. For more information, please see https://github.com/ray-project/ray/issues/36949


== Status ==
Current time: 2025-06-08 01:25:57 (running for 00:00:00.11)
Using FIFO scheduling algorithm.
Logical resource usage: 0/12 CPUs, 0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: /tmp/ray/session_2025-06-08_01-17-28_597852_193779/artifacts/2025-06-08_01-25-57/TorchTrainer_2025-06-08_01-25-57/driver_artifacts
Number of trials: 1/1 (1 PENDING)


== Status ==
Current time: 2025-06-08 01:26:02 (running for 00:00:05.16)
Using FIFO scheduling algorithm.
Logical resource usage: 0/12 CPUs, 0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: /tmp/ray/session_2025-06-08_01-17-28_597852_193779/artifacts/2025-06-08_01-25-57/TorchTrainer_2025-06-08_01-25-57/driver_artifacts
Number of trials: 1/1 (1 PENDING)


== Status ==
Current time: 2025-06-08 01:26:07 (running for 00:00:10.21)
Using FIFO scheduling algorithm.
Logical resource usage: 0/12 CPUs, 0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: /tmp/ray/session_2025-06-08_01-17-28_597852_193779/artifacts/2025-06-08_01-25-57/Tor



== Status ==
Current time: 2025-06-08 01:26:57 (running for 00:01:00.72)
Using FIFO scheduling algorithm.
Logical resource usage: 0/12 CPUs, 0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: /tmp/ray/session_2025-06-08_01-17-28_597852_193779/artifacts/2025-06-08_01-25-57/TorchTrainer_2025-06-08_01-25-57/driver_artifacts
Number of trials: 1/1 (1 PENDING)


== Status ==
Current time: 2025-06-08 01:27:02 (running for 00:01:05.76)
Using FIFO scheduling algorithm.
Logical resource usage: 0/12 CPUs, 0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: /tmp/ray/session_2025-06-08_01-17-28_597852_193779/artifacts/2025-06-08_01-25-57/TorchTrainer_2025-06-08_01-25-57/driver_artifacts
Number of trials: 1/1 (1 PENDING)


== Status ==
Current time: 2025-06-08 01:27:07 (running for 00:01:10.81)
Using FIFO scheduling algorithm.
Logical resource usage: 0/12 CPUs, 0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: /tmp/ray/session_2025-06-08_01-17-28_597852_193779/artifacts/2025-06-08_01-25-57/Tor



== Status ==
Current time: 2025-06-08 01:27:58 (running for 00:02:01.29)
Using FIFO scheduling algorithm.
Logical resource usage: 0/12 CPUs, 0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: /tmp/ray/session_2025-06-08_01-17-28_597852_193779/artifacts/2025-06-08_01-25-57/TorchTrainer_2025-06-08_01-25-57/driver_artifacts
Number of trials: 1/1 (1 PENDING)


== Status ==
Current time: 2025-06-08 01:28:03 (running for 00:02:06.33)
Using FIFO scheduling algorithm.
Logical resource usage: 0/12 CPUs, 0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: /tmp/ray/session_2025-06-08_01-17-28_597852_193779/artifacts/2025-06-08_01-25-57/TorchTrainer_2025-06-08_01-25-57/driver_artifacts
Number of trials: 1/1 (1 PENDING)


== Status ==
Current time: 2025-06-08 01:28:08 (running for 00:02:11.37)
Using FIFO scheduling algorithm.
Logical resource usage: 0/12 CPUs, 0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: /tmp/ray/session_2025-06-08_01-17-28_597852_193779/artifacts/2025-06-08_01-25-57/Tor

2025-06-08 01:28:31,301	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/home/brandon-ho/ray_results/TorchTrainer_2025-06-08_01-25-57' in 0.0025s.
2025-06-08 01:28:31,304	INFO tune.py:1041 -- Total run time: 154.27 seconds (154.26 seconds for the tuning loop).
Resume training with: <FrameworkTrainer>.restore(path="/home/brandon-ho/ray_results/TorchTrainer_2025-06-08_01-25-57", ...)
- TorchTrainer_0e280_00000: FileNotFoundError('Could not fetch metrics for TorchTrainer_0e280_00000: both result.json and progress.csv were not found at /home/brandon-ho/ray_results/TorchTrainer_2025-06-08_01-25-57/TorchTrainer_0e280_00000_0_2025-06-08_01-25-57')


== Status ==
Current time: 2025-06-08 01:28:31 (running for 00:02:34.26)
Using FIFO scheduling algorithm.
Logical resource usage: 0/12 CPUs, 0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: /tmp/ray/session_2025-06-08_01-17-28_597852_193779/artifacts/2025-06-08_01-25-57/TorchTrainer_2025-06-08_01-25-57/driver_artifacts
Number of trials: 1/1 (1 PENDING)




In [13]:
from ray import tune


def objective(config):  # ①
    score = config["a"] ** 2 + config["b"]
    return {"score": score}


search_space = {  # ②
    "a": tune.grid_search([0.001, 0.01, 0.1, 1.0]),
    "b": tune.choice([1, 2, 3]),
}

tuner = tune.Tuner(objective, param_space=search_space)  # ③

results = tuner.fit()
print(results.get_best_result(metric="score", mode="max").config)

0,1
Current time:,2025-06-08 01:29:12
Running for:,00:00:01.45
Memory:,10.2/31.2 GiB

Trial name,status,loc,a,b,iter,total time (s),score
objective_81db8_00000,TERMINATED,192.168.0.18:198861,0.001,2,1,0.00025177,2.0
objective_81db8_00001,TERMINATED,192.168.0.18:198862,0.01,1,1,0.000261307,1.0001
objective_81db8_00002,TERMINATED,192.168.0.18:198864,0.1,1,1,0.000218391,1.01
objective_81db8_00003,TERMINATED,192.168.0.18:198863,1.0,2,1,0.000219345,3.0


2025-06-08 01:29:12,604	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/home/brandon-ho/ray_results/objective_2025-06-08_01-29-11' in 0.0045s.
2025-06-08 01:29:12,607	INFO tune.py:1041 -- Total run time: 1.46 seconds (1.45 seconds for the tuning loop).


{'a': 1.0, 'b': 2}


In [None]:
import gymnasium as gym
import numpy as np
import torch
from typing import Dict, Tuple, Any, Optional

from ray.rllib.algorithms.ppo import PPOConfig


# Define your problem using python and Farama-Foundation's gymnasium API:
class SimpleCorridor(gym.Env):
    """Corridor environment where an agent must learn to move right to reach the exit.

    ---------------------
    | S | 1 | 2 | 3 | G |   S=start; G=goal; corridor_length=5
    ---------------------

    Actions:
        0: Move left
        1: Move right

    Observations:
        A single float representing the agent's current position (index)
        starting at 0.0 and ending at corridor_length

    Rewards:
        -0.1 for each step
        +1.0 when reaching the goal

    Episode termination:
        When the agent reaches the goal (position >= corridor_length)
    """

    def __init__(self, config):
        self.end_pos = config["corridor_length"]
        self.cur_pos = 0.0
        self.action_space = gym.spaces.Discrete(2)  # 0=left, 1=right
        self.observation_space = gym.spaces.Box(0.0, self.end_pos, (1,), np.float32)

    def reset(
        self, *, seed: Optional[int] = None, options: Optional[Dict] = None
    ) -> Tuple[np.ndarray, Dict]:
        """Reset the environment for a new episode.

        Args:
            seed: Random seed for reproducibility
            options: Additional options (not used in this environment)

        Returns:
            Initial observation of the new episode and an info dict.
        """
        super().reset(seed=seed)  # Initialize RNG if seed is provided
        self.cur_pos = 0.0
        # Return initial observation.
        return np.array([self.cur_pos], np.float32), {}

    def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict]:
        """Take a single step in the environment based on the provided action.

        Args:
            action: 0 for left, 1 for right

        Returns:
            A tuple of (observation, reward, terminated, truncated, info):
                observation: Agent's new position
                reward: Reward from taking the action (-0.1 or +1.0)
                terminated: Whether episode is done (reached goal)
                truncated: Whether episode was truncated (always False here)
                info: Additional information (empty dict)
        """
        # Walk left if action is 0 and we're not at the leftmost position
        if action == 0 and self.cur_pos > 0:
            self.cur_pos -= 1
        # Walk right if action is 1
        elif action == 1:
            self.cur_pos += 1
        # Set `terminated` flag when end of corridor (goal) reached.
        terminated = self.cur_pos >= self.end_pos
        truncated = False
        # +1 when goal reached, otherwise -0.1.
        reward = 1.0 if terminated else -0.1
        return np.array([self.cur_pos], np.float32), reward, terminated, truncated, {}


# Create an RLlib Algorithm instance from a PPOConfig object.
print("Setting up the PPO configuration...")
config = (
    PPOConfig().environment(
        # Env class to use (our custom gymnasium environment).
        SimpleCorridor,
        # Config dict passed to our custom env's constructor.
        # Use corridor with 20 fields (including start and goal).
        env_config={"corridor_length": 20},
    )
    # Parallelize environment rollouts for faster training.
    .env_runners(num_env_runners=3)
    # Use a smaller network for this simple task
    .training(model={"fcnet_hiddens": [64, 64]})
)

# Construct the actual PPO algorithm object from the config.
algo = config.build_algo()
rl_module = algo.get_module()

# Train for n iterations and report results (mean episode rewards).
# Optimal reward calculation:
# - Need at least 19 steps to reach the goal (from position 0 to 19)
# - Each step (except last) gets -0.1 reward: 18 * (-0.1) = -1.8
# - Final step gets +1.0 reward
# - Total optimal reward: -1.8 + 1.0 = -0.8
print("\nStarting training loop...")
for i in range(5):
    results = algo.train()

    # Log the metrics from training results
    print(f"Iteration {i+1}")
    print(f"  Training metrics: {results['env_runners']}")

# Save the trained algorithm (optional)
checkpoint_dir = algo.save()
print(f"\nSaved model checkpoint to: {checkpoint_dir}")

print("\nRunning inference with the trained policy...")
# Create a test environment with a shorter corridor to verify the agent's behavior
env = SimpleCorridor({"corridor_length": 10})
# Get the initial observation (should be: [0.0] for the starting position).
obs, info = env.reset()
terminated = truncated = False
total_reward = 0.0
step_count = 0

# Play one episode and track the agent's trajectory
print("\nAgent trajectory:")
positions = [float(obs[0])]  # Track positions for visualization

while not terminated and not truncated:
    # Compute an action given the current observation
    action_logits = rl_module.forward_inference(
        {"obs": torch.from_numpy(obs).unsqueeze(0)}
    )["action_dist_inputs"].numpy()[
        0
    ]  # [0]: Batch dimension=1

    # Get the action with highest probability
    action = np.argmax(action_logits)

    # Log the agent's decision
    action_name = "LEFT" if action == 0 else "RIGHT"
    print(f"  Step {step_count}: Position {obs[0]:.1f}, Action: {action_name}")

    # Apply the computed action in the environment
    obs, reward, terminated, truncated, info = env.step(action)
    positions.append(float(obs[0]))

    # Sum up rewards
    total_reward += reward
    step_count += 1

# Report final results
print(f"\nEpisode complete:")
print(f"  Steps taken: {step_count}")
print(f"  Total reward: {total_reward:.2f}")
print(f"  Final position: {obs[0]:.1f}")

# Verify the agent has learned the optimal policy
if total_reward > -0.5 and obs[0] >= 9.0:
    print("  Success! The agent has learned the optimal policy (always move right).")

  from .autonotebook import tqdm as notebook_tqdm
2025-06-08 01:32:10,758	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2025-06-08 01:32:11,202	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


Setting up the PPO configuration...


`UnifiedLogger` will be removed in Ray 2.7.
  return UnifiedLogger(config, logdir, loggers=None)
The `JsonLogger interface is deprecated in favor of the `ray.tune.json.JsonLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
The `CSVLogger interface is deprecated in favor of the `ray.tune.csv.CSVLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
The `TBXLogger interface is deprecated in favor of the `ray.tune.tensorboardx.TBXLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
2025-06-08 01:32:12,773	INFO worker.py:1888 -- Started a local Ray instance.



Starting training loop...
Iteration 1
  Training metrics: {'env_to_module_connector': {'timers': {'connectors': {'add_observations_from_episodes_to_batch': np.float64(9.987235734148889e-06), 'numpy_to_tensor': np.float64(4.2680838781411974e-05), 'add_time_dim_to_batch_and_zero_pad': np.float64(9.446649436100315e-06), 'add_states_from_episodes_to_batch': np.float64(5.240405638219443e-06), 'batch_individual_items': np.float64(2.345708717526926e-05)}}, 'connector_pipeline_timer': np.float64(0.00014628796608229556)}, 'num_env_steps_sampled': 4000, 'module_to_env_connector': {'timers': {'connectors': {'un_batch_to_individual_items': np.float64(2.0546734033118183e-05), 'tensor_to_numpy': np.float64(6.456608921658124e-05), 'normalize_and_clip_actions': np.float64(3.4387110817761923e-05), 'get_actions': np.float64(0.0002275230064821435), 'listify_data_for_vector_env': np.float64(4.075788345284659e-05), 'remove_single_ts_time_rank_from_batch': np.float64(2.3294462288748326e-06)}}, 'connector_p

[33m(raylet)[0m [2025-06-08 01:55:02,987 E 199795 199820] (raylet) file_system_monitor.cc:116: /tmp/ray/session_2025-06-08_01-32-11_890739_199634 is over 95% full, available space: 4.77544 GB; capacity: 95.5616 GB. Object creation will fail if spilling is required.
[33m(raylet)[0m [2025-06-08 01:55:12,998 E 199795 199820] (raylet) file_system_monitor.cc:116: /tmp/ray/session_2025-06-08_01-32-11_890739_199634 is over 95% full, available space: 4.41908 GB; capacity: 95.5616 GB. Object creation will fail if spilling is required.
[33m(raylet)[0m [2025-06-08 01:55:23,009 E 199795 199820] (raylet) file_system_monitor.cc:116: /tmp/ray/session_2025-06-08_01-32-11_890739_199634 is over 95% full, available space: 4.08345 GB; capacity: 95.5616 GB. Object creation will fail if spilling is required.
[33m(raylet)[0m [2025-06-08 01:55:33,019 E 199795 199820] (raylet) file_system_monitor.cc:116: /tmp/ray/session_2025-06-08_01-32-11_890739_199634 is over 95% full, available space: 3.68655 GB; c

In [3]:
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.connectors.env_to_module import FlattenObservations

# Configure the algorithm.
config = (
    PPOConfig()
    .environment("Taxi-v3")
    .env_runners(
        num_env_runners=2,
        # Observations are discrete (ints) -> We need to flatten (one-hot) them.
        env_to_module_connector=lambda env: FlattenObservations(),
    )
    .evaluation(evaluation_num_env_runners=1)
)

from pprint import pprint

# Build the algorithm.
algo = config.build_algo()

# Train it for 5 iterations ...
for _ in range(5):
    pprint(algo.train())

# ... and evaluate it.
pprint(algo.evaluate())

# Release the algo's resources (remote actors, like EnvRunners and Learners).
algo.stop()

`UnifiedLogger` will be removed in Ray 2.7.
  return UnifiedLogger(config, logdir, loggers=None)
The `JsonLogger interface is deprecated in favor of the `ray.tune.json.JsonLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
The `CSVLogger interface is deprecated in favor of the `ray.tune.csv.CSVLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
The `TBXLogger interface is deprecated in favor of the `ray.tune.tensorboardx.TBXLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))


{'config': {'_disable_action_flattening': False,
            '_disable_execution_plan_api': -1,
            '_disable_initialize_loss_from_dummy_batch': False,
            '_disable_preprocessor_api': False,
            '_dont_auto_sync_env_runner_states': False,
            '_enable_rl_module_api': -1,
            '_env_to_module_connector': <function <lambda> at 0x7302b0391d00>,
            '_fake_gpus': False,
            '_is_atari': None,
            '_learner_class': None,
            '_learner_connector': None,
            '_model_config': {},
            '_module_to_env_connector': None,
            '_per_module_overrides': {},
            '_prior_exploration_config': {'type': 'StochasticSampling'},
            '_rl_module_spec': None,
            '_tf_policy_handles_more_than_one_loss': False,
            '_torch_grad_scaler_class': None,
            '_torch_lr_scheduler_classes': None,
            '_train_batch_size_per_learner': None,
            '_use_msgpack_checkpoints': 

In [1]:
import functools
import random
from copy import copy

import numpy as np
from gymnasium.spaces import Discrete, MultiDiscrete

from pettingzoo import ParallelEnv

In [5]:
MultiDiscrete([7 * 7 - 1] * 3).sample()

array([37, 42, 44])