# Learning Algorithm class to understand the PPO RL implementation on Ray RLlib

When looking at the PPO architecture displayed in Ray RLlib (image below), we can verify the Learner interact with RLModule to optimize the policies (neural networks). But, the EnvRunners and the training_step mechanisms are not implemented in the two previous structures, RLModules and Learners, we learned about. The EnvRunner is responsible for using frozen policies (policies that are not updating their parameters) to interact with the environment providing actions and receiving observations and rewards which compose our batches of experiences to be used by the Learners to update the Policy parameters. Therefore, the PPO implementation inherits from an Algorithm class, and they are responsible for coordinating the EnvRunners and Learners to optimize the policy parameters through the interaction with the environment.



In [None]:
from ray.rllib.algorithms.ppo.ppo import PPOConfig, PPO
from ray.rllib.algorithms import AlgorithmConfig, Algorithm

![PPO architecture](./imgs/ray_rllib_ppo.png)

The Algorithm class is the base class for different RL algorithms implementation (You can check Algorithm class code [here](https://github.com/ray-project/ray/blob/master/rllib/algorithms/algorithm.py)). You can take a look in the function `setup()` which is the responsbile to read the `AlgorithmConfig` class containing algorithm specific configurations and create the EnvRunners and Learners that compose the learning process. You can also check the functions `training_step()`, and `compute_actions()`.

The `training_step()` function is responsible for implementing a single iteration logic of the algorithm and is usually overwritten by the RL algorithm logic that is going to inherit it. The code for general `training_step()` from `Algorithm` class is presented below. It is divided in three groups of steps, the first one collects batches of experience from the environments using a frozen policy in the EnvRunners. The second part uses the Learners to update/optimize the policies based on collected experiences. The third part synchronize the new parameters obtained for policies through Learner operations with the policies from EnvRunners.

In [None]:
@OverrideToImplementCustomLogic
def training_step(self) -> None:
    """Default single iteration logic of an algorithm.

    - Collect on-policy samples (SampleBatches) in parallel using the
            Algorithm's EnvRunners (@ray.remote).
    - Concatenate collected SampleBatches into one train batch.
    - Note that we may have more than one policy in the multi-agent case:
            Call the different policies' `learn_on_batch` (simple optimizer) OR
            `load_batch_into_buffer` + `learn_on_loaded_batch` (multi-GPU
            optimizer) methods to calculate loss and update the model(s).
    - Return all collected metrics for the iteration.

    Returns:
            For the new API stack, returns None. Results are compiled and extracted
            automatically through a single `self.metrics.reduce()` call at the very end
            of an iteration (which might contain more than one call to
            `training_step()`). This way, we make sure that we account for all
            results generated by each individual `training_step()` call.
            For the old API stack, returns the results dict from executing the training
            step.
    """
    if not self.config.enable_env_runner_and_connector_v2:
        raise NotImplementedError(
            "The `Algorithm.training_step()` default implementation no longer "
            "supports the old API stack! If you would like to continue "
            "using these "
            "old APIs with this default `training_step`, simply subclass "
            "`Algorithm` and override its `training_step` method (copy/paste the "
            "code and delete this error message)."
        )

    # Collect a list of Episodes from EnvRunners until we reach the train batch
    # size.
    with self.metrics.log_time((TIMERS, ENV_RUNNER_SAMPLING_TIMER)):
        if self.config.count_steps_by == "agent_steps":
            episodes, env_runner_results = synchronous_parallel_sample(
                worker_set=self.env_runner_group,
                max_agent_steps=self.config.total_train_batch_size,
                sample_timeout_s=self.config.sample_timeout_s,
                _uses_new_env_runners=True,
                _return_metrics=True,
            )
        else:
            episodes, env_runner_results = synchronous_parallel_sample(
                worker_set=self.env_runner_group,
                max_env_steps=self.config.total_train_batch_size,
                sample_timeout_s=self.config.sample_timeout_s,
                _uses_new_env_runners=True,
                _return_metrics=True,
            )
    # Reduce EnvRunner metrics over the n EnvRunners.
    self.metrics.merge_and_log_n_dicts(env_runner_results, key=ENV_RUNNER_RESULTS)

    # Here, the Learners utilize the batches of experiences to update/optimize the policy parameters
    with self.metrics.log_time((TIMERS, LEARNER_UPDATE_TIMER)):
        learner_results = self.learner_group.update_from_episodes(
            episodes=episodes,
            timesteps={
                NUM_ENV_STEPS_SAMPLED_LIFETIME: (
                    self.metrics.peek(NUM_ENV_STEPS_SAMPLED_LIFETIME)
                ),
            },
        )
        self.metrics.log_dict(learner_results, key=LEARNER_RESULTS)

    # Update weights - after learning on the local worker - on all
    # remote workers (only those RLModules that were actually trained).
    with self.metrics.log_time((TIMERS, SYNCH_WORKER_WEIGHTS_TIMER)):
        self.env_runner_group.sync_weights(
            from_worker_or_learner_group=self.learner_group,
            policies=list(set(learner_results.keys()) - {ALL_MODULES}),
            inference_only=True,
        )

RL algorithm logics usually can change this `Algorithm` class default logic for `training_step()` function and that is why they override it. For instance, let's take a look at the `PPO` class implementation that inherits from `Algorithm` class.

In [None]:
class PPO(Algorithm):
    @classmethod
    @override(Algorithm)
    def get_default_config(cls) -> AlgorithmConfig:
        return PPOConfig()

    @classmethod
    @override(Algorithm)
    def get_default_policy_class(
        cls, config: AlgorithmConfig
    ) -> Optional[Type[Policy]]:
        if config["framework"] == "torch":

            from ray.rllib.algorithms.ppo.ppo_torch_policy import PPOTorchPolicy

            return PPOTorchPolicy
        elif config["framework"] == "tf":
            from ray.rllib.algorithms.ppo.ppo_tf_policy import PPOTF1Policy

            return PPOTF1Policy
        else:
            from ray.rllib.algorithms.ppo.ppo_tf_policy import PPOTF2Policy

            return PPOTF2Policy

    @override(Algorithm)
    def training_step(self) -> None:
        # Old API stack (Policy, RolloutWorker, Connector).
        if not self.config.enable_env_runner_and_connector_v2:
            return self._training_step_old_api_stack()

        # Collect batches from sample workers until we have a full batch.
        with self.metrics.log_time((TIMERS, ENV_RUNNER_SAMPLING_TIMER)):
            # Sample in parallel from the workers.
            if self.config.count_steps_by == "agent_steps":
                episodes, env_runner_results = synchronous_parallel_sample(
                    worker_set=self.env_runner_group,
                    max_agent_steps=self.config.total_train_batch_size,
                    sample_timeout_s=self.config.sample_timeout_s,
                    _uses_new_env_runners=(
                        self.config.enable_env_runner_and_connector_v2
                    ),
                    _return_metrics=True,
                )
            else:
                episodes, env_runner_results = synchronous_parallel_sample(
                    worker_set=self.env_runner_group,
                    max_env_steps=self.config.total_train_batch_size,
                    sample_timeout_s=self.config.sample_timeout_s,
                    _uses_new_env_runners=(
                        self.config.enable_env_runner_and_connector_v2
                    ),
                    _return_metrics=True,
                )
            # Return early if all our workers failed.
            if not episodes:
                return

            # Reduce EnvRunner metrics over the n EnvRunners.
            self.metrics.merge_and_log_n_dicts(
                env_runner_results, key=ENV_RUNNER_RESULTS
            )

        # Perform a learner update step on the collected episodes.
        with self.metrics.log_time((TIMERS, LEARNER_UPDATE_TIMER)):
            learner_results = self.learner_group.update_from_episodes(
                episodes=episodes,
                timesteps={
                    NUM_ENV_STEPS_SAMPLED_LIFETIME: (
                        self.metrics.peek(
                            (ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED_LIFETIME)
                        )
                    ),
                },  ########## Here is the difference from Algorithm.training_step(), The three following parameters are not used in the Algorithm class
                num_epochs=self.config.num_epochs,
                minibatch_size=self.config.minibatch_size,
                shuffle_batch_per_epoch=self.config.shuffle_batch_per_epoch,
            )
            self.metrics.merge_and_log_n_dicts(learner_results, key=LEARNER_RESULTS)

        # Update weights - after learning on the local worker - on all remote
        # workers.
        with self.metrics.log_time((TIMERS, SYNCH_WORKER_WEIGHTS_TIMER)):
            modules_to_update = set(learner_results[0].keys()) - {ALL_MODULES}
            self.env_runner_group.sync_weights(
                # Sync weights from learner_group to all EnvRunners.
                from_worker_or_learner_group=self.learner_group,
                policies=modules_to_update,
                inference_only=True,
            )

Usually, we don't handle the `Algorithm` class or the RL algorithm class (such as `PPO` class) directly, instead, we utilize the `AlgorithmConfig` class or the `PPOConfig` class (which inherits form `AlgorithmConfig` class) to configure our Algorithm with the hyperparameters and configuration of our choice. The general `AlgorithmConfig` class contains the general configuration which are habitually considered in most of the RL agents and also configurations related to the Ray RLlib execution, such as `num_cpus_per_env_runner` and learning rate `lr`. You can check the `AlgorithmConfig` details [here](https://github.com/ray-project/ray/blob/master/rllib/algorithms/algorithm_config.py).

The `PPOConfig` class inherits from `AlgorithmConfig` class and implement PPO specific parameters that should be utilized to execute the PPO RL algorithm. You can also check the `PPOConfig` implementation [here](https://github.com/ray-project/ray/blob/master/rllib/algorithms/ppo/ppo.py).

To finalize, do you remember how did we implement the PPO agent in the previous lessons? The code below shows an example of [Lesson 1 notebook 3](../lesson_1/3-rl_agent_for_radio_resource_scheduler.ipynb). We utilized the PPOConfig class to configure and build our PPO agent.

In [None]:
config = (
    PPOConfig()
    .environment("comm_env")
    .env_runners(num_env_runners=1)
    .learners(num_learners=1)
    .training(
        lr=0.0003,
        train_batch_size=2048,
        sgd_minibatch_size=64,
        num_sgd_iter=10,
        gamma=0.99,
        lambda_=0.95,
        model={
            "fcnet_hiddens": [64, 64],
            "fcnet_activation": "relu",
        },
        clip_param=0.2,
        entropy_coeff=0.01,
        vf_loss_coeff=0.5,
        grad_clip=0.5,
        vf_clip_param=np.inf,
        use_gae=True,
        kl_coeff=0,
        use_kl_loss=False,
        kl_target=0,
    )
)