# PyTorch RL Tutorial

## Intro to TorchRL

In [2]:
!pip install tensordict-nightly
!pip install torchrl-nightly

  pid, fd = os.forkpty()


Collecting tensordict-nightly
  Downloading tensordict_nightly-2024.9.30-cp312-cp312-manylinux1_x86_64.whl.metadata (8.9 kB)
INFO: pip is looking at multiple versions of tensordict-nightly to determine which version is compatible with other requirements. This could take a while.
  Downloading tensordict_nightly-2024.9.29-cp312-cp312-manylinux1_x86_64.whl.metadata (8.9 kB)
  Downloading tensordict_nightly-2024.9.28-cp312-cp312-manylinux1_x86_64.whl.metadata (8.9 kB)
  Downloading tensordict_nightly-2024.9.27-cp312-cp312-manylinux1_x86_64.whl.metadata (8.9 kB)
  Downloading tensordict_nightly-2024.9.26-cp312-cp312-manylinux1_x86_64.whl.metadata (8.9 kB)
  Downloading tensordict_nightly-2024.9.25-cp312-cp312-manylinux1_x86_64.whl.metadata (8.9 kB)
  Downloading tensordict_nightly-2024.9.24-cp312-cp312-manylinux1_x86_64.whl.metadata (8.9 kB)
  Downloading tensordict_nightly-2024.9.23-cp312-cp312-manylinux1_x86_64.whl.metadata (8.9 kB)
INFO: pip is still looking at multiple versions of tens

In [8]:
!pip install gymnasium

Collecting gymnasium
  Downloading gymnasium-0.29.1-py3-none-any.whl.metadata (10 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m7.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1


### Data

In [11]:
import torch
from tensordict import TensorDict

#### TensorDict

In [14]:
batch_size = 5

torch.zeros(batch_size, 3)

tensor([[0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.]])

In [15]:
batch_size = 5
tensordict = TensorDict(
    source={
        "key 1": torch.zeros(batch_size, 3),
        "key 2": torch.zeros(batch_size, 5, 6, dtype=torch.bool),
    },
    batch_size=[batch_size]
)

print(tensordict)

TensorDict(
    fields={
        key 1: Tensor(shape=torch.Size([5, 3]), device=cpu, dtype=torch.float32, is_shared=False),
        key 2: Tensor(shape=torch.Size([5, 5, 6]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([5]),
    device=None,
    is_shared=False)


Indexing TensorDict

In [20]:
print(tensordict[2],'\n')
print(tensordict["key 1"] is tensordict.get("key 1"))

TensorDict(
    fields={
        key 1: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
        key 2: Tensor(shape=torch.Size([5, 6]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([]),
    device=None,
    is_shared=False) 

True


Stacking multiple TensorDicts

In [None]:
tensordict1 = TensorDict(
    source={
        "key 1": torch.
    }
)

### Environments, TED and transform
The standard RL training loop: train a model/policy to accomplish a task in an environment (e.g., a simulator).

:mod:`~torchrl.envs` environment wrapper.

In [1]:
from torchrl.envs import GymEnv

env = GymEnv("Pendulum-v1")

#### `env.reset()`

In [2]:
reset = env.reset()
print(reset)

TensorDict(
    fields={
        done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        observation: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
        terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([]),
    device=None,
    is_shared=False)


In [3]:
reset["observation"]

tensor([ 0.6809,  0.7323, -0.0661])

Adding a random action in the td.

In [4]:
reset_with_action = env.rand_action(reset)
print(reset_with_action)

TensorDict(
    fields={
        action: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
        done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        observation: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
        terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([]),
    device=None,
    is_shared=False)


In [5]:
reset_with_action["action"]

tensor([0.6137])

#### `env.step()`
Passing the td with initial env variable values and a random action to env.step().

This returns TED - TorchRL Episode Data format - ubiquitous way of representing data in the library.

In [6]:
stepped_data = env.step(reset_with_action)
print(stepped_data)

TensorDict(
    fields={
        action: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
        done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
                observation: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([]),
            device=None,
            is_shared=False),
        observation: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
        terminat

`step_mdp`: to bring the "next" entry at root to perform the next step.

In [7]:
from torchrl.envs import step_mdp

data = step_mdp(stepped_data)
print(data)

TensorDict(
    fields={
        done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        observation: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
        terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([]),
    device=None,
    is_shared=False)


#### `rollout()`
Combination of three steps:
- computing an action
- taking a step
- moving in the MDP

Without a policy, it'll execute random actions.

In [8]:
rollout = env.rollout(max_steps=10)
print(rollout)

TensorDict(
    fields={
        action: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
        done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                observation: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([10]),
            device=None,
            is_shared=False),
        observation: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, 

In [14]:
# spatial indexing
print(rollout[3])
print()

# indexing by key
print(rollout["action"])
print()

# indexing by key and step
print(rollout['action'][3])

TensorDict(
    fields={
        action: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
        done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
                observation: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([]),
            device=None,
            is_shared=False),
        observation: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
        terminat

#### `TransformedEnv`
Complete list of transforms [here](https://pytorch.org/rl/stable/reference/envs.html#id2)



In [22]:
from torchrl.envs import StepCounter, TransformedEnv

transformed_env = TransformedEnv(env, StepCounter(max_steps=10))
rollout = transformed_env.rollout(max_steps=100)
print(rollout)

TensorDict(
    fields={
        action: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
        done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                observation: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                step_count: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.int64, is_shared=False),
                terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([10]),
            device=None,
       

In [31]:
import torch

print('action: ', rollout["action"],'\n')
print('done: ', rollout['done'],'\n')
print('observation: ', rollout['observation'],'\n')
print('observation: ', torch.stack([rollout["observation"], rollout["next","observation"]],1),'\n')
# print('step: ', rollout['step_count'],'\n')
print('step: ', torch.stack([rollout["step_count"], rollout["next","step_count"]],1),'\n')
print('terminated: ', rollout['terminated'],'\n')
# print('truncated: ', rollout['truncated'],'\n')
print('truncated: ', torch.stack([rollout["truncated"], rollout["next","truncated"]],1),'\n')


action:  tensor([[ 1.2521],
        [ 0.3543],
        [-1.4463],
        [ 1.6247],
        [-0.0473],
        [-1.5524],
        [-0.2393],
        [ 0.3432],
        [-0.4568],
        [ 0.1557]]) 

done:  tensor([[False],
        [False],
        [False],
        [False],
        [False],
        [False],
        [False],
        [False],
        [False],
        [False]]) 

observation:  tensor([[-0.8678,  0.4969, -0.0552],
        [-0.8801,  0.4748,  0.5053],
        [-0.9009,  0.4341,  0.9146],
        [-0.9219,  0.3875,  1.0232],
        [-0.9492,  0.3146,  1.5575],
        [-0.9735,  0.2287,  1.7863],
        [-0.9896,  0.1440,  1.7250],
        [-0.9985,  0.0546,  1.7970],
        [-0.9992, -0.0399,  1.8894],
        [-0.9916, -0.1291,  1.7910]]) 

observation:  tensor([[[-0.8678,  0.4969, -0.0552],
         [-0.8801,  0.4748,  0.5053]],

        [[-0.8801,  0.4748,  0.5053],
         [-0.9009,  0.4341,  0.9146]],

        [[-0.9009,  0.4341,  0.9146],
         [-0.9219,  0.3

### `TensorDictModules`: Policy construction
Similar to how environments interact with instances of TensorDict, modules used to represent policies and vlaue functions also do the same. 
Code idea:
- encapsulate a `Module` within a class
- that know which entries need to be read and passed to the module
- and then records the results with the assigned entries

In [33]:
import torch
from tensordict.nn import TensorDictModule
from torchrl.envs import GymEnv

env = GymEnv("Pendulum-v1")
# LazyLinear automatically fetches observation space
module = torch.nn.LazyLinear(out_features=env.action_spec.shape[-1])
policy = TensorDictModule(
    module,
    in_keys=["observation"],
    out_keys=["action"],
)

In [34]:
rollout = env.rollout(max_steps=10, policy=policy)
print(rollout)

TensorDict(
    fields={
        action: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
        done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                observation: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([10]),
            device=None,
            is_shared=False),
        observation: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, 

#### Specialized wrapper
`Actor` provides default values for the `in_keys` and `out_keys`, making integration with common environments straightforward.

[List of specialized modules](https://pytorch.org/rl/stable/reference/modules.html#tdmodules)

In [35]:
from torchrl.modules import Actor

module = torch.nn.LazyLinear(out_features=env.action_spec.shape[-1])

# specialized actor policy
policy = Actor(module)

## non-specialized policy
# policy = TensorDictModule(
#     module,
#     in_keys=["observation"],
#     out_keys=["action"],
# )

rollout = env.rollout(max_steps=10, policy=policy)
print(rollout)

TensorDict(
    fields={
        action: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
        done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                observation: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([10]),
            device=None,
            is_shared=False),
        observation: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, 

#### Networks
Regular modules that can be used without recurring to tensordict features. Two most common are `MLP` and `ConvNet`.

In [50]:
from torchrl.modules import MLP

## TensorDictModule
# module = torch.nn.LazyLinear(out_features=env.action_spec.shape[-1])

# MLP module
module = MLP(out_features=env.action_spec.shape[-1],
             num_cells=[32, 64],
             activation_class=torch.nn.Tanh,)

policy = Actor(module)
rollout = env.rollout(max_steps=10, policy=policy)
print(rollout)


TensorDict(
    fields={
        action: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
        done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                observation: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([10]),
            device=None,
            is_shared=False),
        observation: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, 

#### Probabilistic Policies
TorchRL facilitates stocastic policy by grouping operations like:
- building distribution from parameters
- sampling from that distribution
- retrieving log-probability

In the next example, we'll build an actor that relies on a regular normal distribution using three components:
- an `MLP` backbone reading observation of size [3], and outputting a single tensor of size [2].
- a `NormalParamExtractor` module that will split the output into two chunks - a mean and a std dev of size [1]
- a `ProbabilisticActor` that will read those parameters as `in_keys`, create a distribution with them, and populate the tensordict with samples and log-probabilities.

In [51]:
from tensordict.nn.distributions import NormalParamExtractor
from torch.distributions import Normal
from torchrl.modules import ProbabilisticActor

# out is 2 instead of 1
backbone = MLP(in_features=3, out_features=2)
# split the out of size [2] into a mean and std dev of size [1]
extractor = NormalParamExtractor()
module = torch.nn.Sequential(backbone, extractor)

# instead of observation to action, we have observation > mean/std > action
td_module = TensorDictModule(module,
                             in_keys=["observation"],
                             out_keys=["loc", "scale"])

policy = ProbabilisticActor(td_module,
                            in_keys=["loc","scale"],
                            out_keys=["action"],
                            distribution_class=Normal,
                            return_log_prob=True,)

rollout = env.rollout(max_steps=10, policy=policy)
print(rollout)

TensorDict(
    fields={
        action: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
        done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        loc: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                observation: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([10]),
            device=None,
            is_share



The sampling of action can chose expected value instead of using random samples with `set_exploration_type()` function.

In [54]:
from torchrl.envs.utils import ExplorationType, set_exploration_type

with set_exploration_type(ExplorationType.MEAN):
    # takes the mean as action
    rolloutMean = env.rollout(max_steps=10, policy=policy)

with set_exploration_type(ExplorationType.RANDOM):
    # takes the mean as action
    rolloutDist = env.rollout(max_steps=10, policy=policy)


#### Exploration
Deterministic policies don't explore inherently. Torchrl has modules for exploration.

Epsilon-greedy exploration module parameters:
- epsilon: (1 is every action random, 0 is no exploration)
- anneal: decrease epsilon

In [55]:
from tensordict.nn import TensorDictSequential
from torchrl.modules import EGreedyModule

# # MLP module
# backbone = MLP(in_features=3, out_features=2,
#              num_cells=[32, 64])
# policy = Actor(module)

policy = Actor(MLP(3, 1, num_cells=[32, 64]))

exploration_module = EGreedyModule(
    spec=   env.action_spec, annealing_num_steps=1000, eps_init=0.5
)

exploration_policy = TensorDictSequential(policy, exploration_module)

with set_exploration_type(ExplorationType.MEAN):
    # turns off exploration
    rollout1 = env.rollout(max_steps=10, policy=exploration_policy)

with set_exploration_type(ExplorationType.RANDOM):
    # turns on exploration
    rollout2 = env.rollout(max_steps=10, policy=exploration_policy)

In [57]:
print(torch.stack([rollout1["action"], rollout2["action"]], 1), '\n')
print(torch.stack([rollout1["next", "reward"], rollout2["next", "reward"]], 1))


tensor([[[ 0.4319],
         [ 1.6355]],

        [[ 0.4239],
         [ 0.1764]],

        [[ 0.3959],
         [-1.3561]],

        [[ 0.3594],
         [ 1.5796]],

        [[ 0.3245],
         [ 0.0813]],

        [[ 0.2960],
         [ 0.1487]],

        [[ 0.2750],
         [ 0.1530]],

        [[ 0.2605],
         [ 0.1623]],

        [[ 0.2513],
         [ 0.1778]],

        [[ 0.2460],
         [ 0.2005]]], grad_fn=<StackBackward0>) 

tensor([[[ -3.6267],
         [ -5.7703]],

        [[ -3.7987],
         [ -6.3049]],

        [[ -4.1803],
         [ -6.9850]],

        [[ -4.7753],
         [ -7.6511]],

        [[ -5.5833],
         [ -8.6991]],

        [[ -6.5935],
         [ -9.7239]],

        [[ -7.7793],
         [-10.7903]],

        [[ -9.0957],
         [-10.0133]],

        [[-10.4822],
         [ -9.0440]],

        [[-11.4969],
         [ -8.1109]]])


#### Q-Value actors
Policy can be a composite module. **Q-Value actors** require an estimate of action value, and will greedily pick up the action with the highest value. DQN is for continuous state-space where a neural network encode the `Q(s,a)` value map.

In [58]:
env = GymEnv("CartPole-v1")
print(env.action_spec)

OneHotDiscreteTensorSpec(
    shape=torch.Size([2]),
    space=DiscreteBox(n=2),
    device=cpu,
    dtype=torch.int64,
    domain=discrete)


We build a value network that produces one value per action when it reads a state from the environment. 

In [62]:
from torchrl.modules import QValueModule

num_actions = 2
value_net = TensorDictModule(
    MLP(out_features=num_actions, num_cells=[32, 32]),
    in_keys=["observation"],
    out_keys=["action_value"],
)

policy = TensorDictSequential(value_net, # writes "action_value"
                              QValueModule(spec=env.action_spec), # reads "action_value" by default
                              )

rollout = env.rollout(max_steps=3, policy=policy)
print(rollout)

TensorDict(
    fields={
        action: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.int64, is_shared=False),
        action_value: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False),
        chosen_action_value: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.float32, is_shared=False),
        done: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                observation: Tensor(shape=torch.Size([3, 4]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch

Here the output has "action_values" and "chosen_action_values". Since it relies on `argmax` (only exploitation), we will use `EGreedyModule` for exploration.

In [63]:
policy_explore = TensorDictSequential(policy,
                                      EGreedyModule(env.action_spec))

with set_exploration_type(ExplorationType.RANDOM):
    rollout_explore = env.rollout(max_steps=3, policy=policy_explore)

print(torch.stack([rollout["chosen_action_value"], rollout_explore["chosen_action_value"]], 1), '\n')


tensor([[[-0.1172],
         [-0.1195]],

        [[-0.1217],
         [-0.1240]],

        [[-0.1268],
         [-0.1290]]], grad_fn=<StackBackward0>) 



### Losses and optimization

Typical training loop:
```
>>> for i in range(n_collections):
...     data = get_next_batch(env, policy)
...     for j in range(n_optim):
...         loss = loss_fn(data)
...         loss.backward()
...         optim.step()
```
#### RL objective functions
Off-policy DDPG: requires a deterministic map from the observation space to the action space, and a value network that predicts the value of a state-action pair. The DDPG loss attempts to find the policy parameters that output actions that maximize the value for a given state.

In [73]:
from torchrl.envs import GymEnv

env = GymEnv("Pendulum-v1")

from torchrl.modules import Actor, MLP, ValueOperator
from torchrl.objectives import DDPGLoss

n_obs = env.observation_spec["observation"].shape[-1]
n_act = env.action_spec.shape[-1]

actor = Actor(
    MLP(in_features=n_obs,
         out_features=n_act,
           num_cells=[32, 32]))

value_net = ValueOperator(
    MLP(in_features=n_obs + n_act,
         out_features=1,
           num_cells=[32, 32]),
    in_keys=["observation", "action"],
)

ddpg_loss = DDPGLoss(actor_network=actor, value_network=value_net)

In [77]:
rollout = env.rollout(max_steps=100, policy=actor)
loss_vals = ddpg_loss(rollout)
print(loss_vals, '\n')

total_loss = 0
for key, val in loss_vals.items():
    if key.startswith("loss_"):
        total_loss += val
    
print(total_loss.item())

TensorDict(
    fields={
        loss_actor: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
        loss_value: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
        pred_value: Tensor(shape=torch.Size([100]), device=cpu, dtype=torch.float32, is_shared=False),
        pred_value_max: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
        target_value: Tensor(shape=torch.Size([100]), device=cpu, dtype=torch.float32, is_shared=False),
        target_value_max: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
        td_error: Tensor(shape=torch.Size([100]), device=cpu, dtype=torch.float32, is_shared=False)},
    batch_size=torch.Size([]),
    device=None,
    is_shared=False) 

53.0244026184082




#### Training a LossModule


In [78]:
from torch.optim import Adam

optim=Adam(ddpg_loss.parameters())
total_loss.backward()

# # with following items
# optim.step()
# optim.zero_grad()

#### Target parameters
Target parameters represent a delayed or smoothed version of the parameters over time. The value are updated by user's requirement.

In [None]:
from torchrl.objectives import SoftUpdate
updater = SoftUpdate(ddpg_loss, eps=0.99)

# updating
updater.step()

### Data collection and storage
#### Data collectors
The primary data collector is the `SyncDataCollector` -
- executing policy within the environment
- resetting the environment when necessary
- providing batches of a predefined size

Collectors don't reset between consecutive batches, thus two batches can share a trajectory.

Collector arguments
- batch size (`frames_per_batch`)
- length of iterator
- the policy
- the environment

In [79]:
import torch

torch.manual_seed(0)

from torchrl.collectors import SyncDataCollector
from torchrl.envs import GymEnv
from torchrl.envs.utils import RandomPolicy

env = GymEnv("CartPole-v1")
env.set_seed(0)

policy = RandomPolicy(env.action_spec)
collector = SyncDataCollector(env, policy, frames_per_batch=200, total_frames=-1) #-1 indicates never ending collector


In [80]:
for data in collector:
    print(data)
    break

TensorDict(
    fields={
        action: Tensor(shape=torch.Size([200, 2]), device=cpu, dtype=torch.int64, is_shared=False),
        collector: TensorDict(
            fields={
                traj_ids: Tensor(shape=torch.Size([200]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([200]),
            device=None,
            is_shared=False),
        done: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                observation: Tensor(shape=torch.Size([200, 4]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor

In [81]:
print(data["collector", "traj_ids"])

tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2,
        2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3,
        3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
        4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
        4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5,
        5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,
        6, 6, 6, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
        7, 7, 7, 7, 7, 7, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9,
        9, 9, 9, 9, 9, 9, 9, 9])


#### Replay buffers
To temporarily store data after collection.
```
>>> for data in collector:
...     storage.store(data)
...     for i in range(n_optim):
...         sample = storage.sample()
...         loss_val = loss_fn(sample)
...         loss_val.backward()
...         optim.step() # etc
```
`ReplayBuffer`: TorchRL data storage parent class. They are composable:
- edit storage type
- sampling technique
- writing heuristic
- transforms

General requirement: type of storage.
Recommendation: a `TensorStorage` subclass, like `LazyMemmapStorage`. It can be populated with `add()` (single element) or `extend()` (multiple element) methods.

In [82]:
from torchrl.data.replay_buffers import LazyMemmapStorage, ReplayBuffer

buffer = ReplayBuffer(storage=LazyMemmapStorage(max_size=1000))

indices = buffer.extend(data)

In [83]:
# buffer has the same number of elements as from the collector
assert len(buffer) == collector.frames_per_batch

In [84]:
sample = buffer.sample(batch_size=30)
print(sample)

TensorDict(
    fields={
        action: Tensor(shape=torch.Size([30, 2]), device=cpu, dtype=torch.int64, is_shared=False),
        collector: TensorDict(
            fields={
                traj_ids: Tensor(shape=torch.Size([30]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([30]),
            device=cpu,
            is_shared=False),
        done: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                observation: Tensor(shape=torch.Size([30, 4]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=to

### TorchRL's logging
Logging is crucial for reporting results for the outside world, and for performance check. TorchRL has several loggers that interface with custom backends such as wandb, tensorboard, CSV logger. They require at least an experiment name, and directory.

In [85]:
from torchrl.record import CSVLogger
logger = CSVLogger(exp_name='my_exp')

Once the logger is instantiated, logging method `log_scalar()` can be called in several places across the training example to log values such as reward, loss value, time elapsed, etc.

In [86]:
logger.log_scalar("my_scalar", 0.4)

#### Recording videos
In `GymEnv`, `from_pixels=True` makes env `step` function write a `pixels` entry containing images of observation, and `pixels_only=False` will indicate you want the observations to be returned.

In [None]:
!pip install gymnasium[classic-control]

In [None]:
from torchrl.envs import GymEnv

env = GymEnv("CartPole-v1", from_pixels=True, pixels_only=False)

print(env.rollout(max_steps=3))

from torchrl.envs import TransformedEnv

A recorder and logger can be used to save video from the images.

In [None]:
from torchrl.record import VideoRecorder

recorder = VideoRecorder(logger, tag="my_video")
record_env = TransformedEnv(env, recorder)


In [None]:
rollout = record_env.rollout(max_steps=3)
# Uncomment this line to save the video on disk:
# recorder.dump()

### Full Training Loop

#### Building the environment

In [1]:
import torch

torch.manual_seed(0)

import time

from torchrl.envs import GymEnv, StepCounter, TransformedEnv

env = TransformedEnv(GymEnv("CartPole-v1"), StepCounter())
env.set_seed(0)



  return torch._C._cuda_getDeviceCount() > 0


795726461

#### Designing a policy

In [2]:
from tensordict.nn import TensorDictModule as Mod, TensorDictSequential as Seq
from torchrl.modules import EGreedyModule, MLP, QValueModule

value_mlp = MLP(out_features=env.action_spec.shape[-1], num_cells=[64, 64])
value_net = Mod(value_mlp, in_keys=["observation"], out_keys=["action_value"])

policy = Seq(value_net, QValueModule(spec=env.action_spec))

exploration_module = EGreedyModule(
    env.action_spec, annealing_num_steps=100_000, eps_init=0.5)

policy_explore = Seq(policy, exploration_module)

#### Data collection and replay buffer

In [3]:
from torchrl.collectors import SyncDataCollector
from torchrl.data import LazyTensorStorage, ReplayBuffer

init_rand_steps = 5000
frames_per_batch = 100
optim_steps = 10
collector = SyncDataCollector(
    env,
    policy,
    frames_per_batch=frames_per_batch,
    total_frames=-1,
    init_random_frames=init_rand_steps,
)
rb = ReplayBuffer(storage=LazyTensorStorage(100_000))



#### Loss module and optimizer

In [4]:
from torch.optim import Adam
from torchrl.objectives import DQNLoss, SoftUpdate

loss = DQNLoss(value_network=policy, action_space=env.action_spec, delay_value=True)
optim = Adam(loss.parameters(), lr=0.02)
updater = SoftUpdate(loss, eps=0.99)

#### Logger

In [5]:
from torchrl._utils import logger as torchrl_logger
from torchrl.record import CSVLogger, VideoRecorder

path = "./training_loop"
logger = CSVLogger(exp_name="dqn", log_dir=path, video_format="mp4")
video_recorder = VideoRecorder(logger, tag="video")
record_env = TransformedEnv(
    GymEnv("CartPole-v1", from_pixels=True, pixels_only=False), video_recorder
)

#### Training loop

In [6]:
total_count = 0
total_episodes = 0
t0 = time.time()
for i, data in enumerate(collector):
    # Write data in replay buffer
    rb.extend(data)
    max_length = rb[:]["next", "step_count"].max()
    if len(rb) > init_rand_steps:
        # Optim loop (we do several optim steps
        # per batch collected for efficiency)
        for _ in range(optim_steps):
            sample = rb.sample(128)
            loss_vals = loss(sample)
            loss_vals["loss"].backward()
            optim.step()
            optim.zero_grad()
            # Update exploration factor
            exploration_module.step(data.numel())
            # Update target params
            updater.step()
            if i % 10:
                torchrl_logger.info(f"Max num steps: {max_length}, rb length {len(rb)}")
            total_count += data.numel()
            total_episodes += data["next", "done"].sum()
    if max_length > 200:
        break

t1 = time.time()

torchrl_logger.info(
    f"solved after {total_count} steps, {total_episodes} episodes and in {t1-t0}s."
)

2024-10-14 10:59:52,531 [torchrl][INFO] Max num steps: 100, rb length 5200
2024-10-14 10:59:52,544 [torchrl][INFO] Max num steps: 100, rb length 5200
2024-10-14 10:59:52,556 [torchrl][INFO] Max num steps: 100, rb length 5200
2024-10-14 10:59:52,571 [torchrl][INFO] Max num steps: 100, rb length 5200
2024-10-14 10:59:52,584 [torchrl][INFO] Max num steps: 100, rb length 5200
2024-10-14 10:59:52,597 [torchrl][INFO] Max num steps: 100, rb length 5200
2024-10-14 10:59:52,611 [torchrl][INFO] Max num steps: 100, rb length 5200
2024-10-14 10:59:52,622 [torchrl][INFO] Max num steps: 100, rb length 5200
2024-10-14 10:59:52,635 [torchrl][INFO] Max num steps: 100, rb length 5200
2024-10-14 10:59:52,649 [torchrl][INFO] Max num steps: 100, rb length 5200
2024-10-14 10:59:52,758 [torchrl][INFO] Max num steps: 100, rb length 5300
2024-10-14 10:59:52,770 [torchrl][INFO] Max num steps: 100, rb length 5300
2024-10-14 10:59:52,785 [torchrl][INFO] Max num steps: 100, rb length 5300
2024-10-14 10:59:52,799 [

#### Rendering

In [7]:
record_env.rollout(max_steps=1000, policy=policy)
video_recorder.dump()

### Pendulum: writing own environment

In [1]:

from collections import defaultdict
from typing import Optional

import numpy as np
import torch
import tqdm
from tensordict import TensorDict, TensorDictBase
from tensordict.nn import TensorDictModule
from torch import nn

from torchrl.data import BoundedTensorSpec, CompositeSpec, UnboundedContinuousTensorSpec
from torchrl.envs import (
    CatTensors,
    EnvBase,
    Transform,
    TransformedEnv,
    UnsqueezeTransform,
)
from torchrl.envs.transforms.transforms import _apply_to_composite
from torchrl.envs.utils import check_env_specs, step_mdp
DEFAULT_X = np.pi
DEFAULT_Y = 1.0

4 things to take care of when designing a new environment class:
* `EnvBase._reset()`, resetting the simulator at a (potentially random) initial state;
* `EnvBase._step()` which codes for the state transition dynamic;
* `EnvBase._set_seed()` which implements seeding mechanism;
* the environment specs

For the pendulum task, we need
- a motion equation (following action)
- a reward equation

#### Coding the effect of an action: `_step()`
The _step() method should do the following:

- Read the input keys (such as "action") and execute the simulation based on these;
- Retrieve observations, done state and reward;
- Write the set of observation values along with the reward and done state at the corresponding entries in a new TensorDict.
- Merge the output TensorDict (as "next" key) in the input TensorDict.

In [12]:
def _step(tensordict):
    th, thdot = tensordict["th"], tensordict["thdot"]  # th := theta

    g_force = tensordict["params", "g"]
    mass = tensordict["params", "m"]
    length = tensordict["params", "l"]
    dt = tensordict["params", "dt"]
    u = tensordict["action"].squeeze(-1)
    u = u.clamp(-tensordict["params", "max_torque"], tensordict["params", "max_torque"])
    costs = angle_normalize(th) ** 2 + 0.1 * thdot**2 + 0.001 * (u**2)

    new_thdot = (
        thdot
        + (3 * g_force / (2 * length) * th.sin() + 3.0 / (mass * length**2) * u) * dt
    )
    new_thdot = new_thdot.clamp(
        -tensordict["params", "max_speed"], tensordict["params", "max_speed"]
    )
    new_th = th + new_thdot * dt
    reward = -costs.view(*tensordict.shape, 1)
    done = torch.zeros_like(reward, dtype=torch.bool)
    out = TensorDict(
        {
            "th": new_th,
            "thdot": new_thdot,
            "params": tensordict["params"],
            "reward": reward,
            "done": done,
        },
        tensordict.shape,
    )
    return out


def angle_normalize(x):
    return ((x + torch.pi) % (2 * torch.pi)) - torch.pi

#### Resetting the simulator: `_reset()`

- _reset() writes observation entries and a done (default=False) state.
- expects (not mandatory) a tensordict as input (beneficial in multi-agent settings)    # since the environment is stateless, we expect the previous output as input.
    # For this, ``EnvBase`` expects some state_spec to be available
    self.state_spec = self.observation_spec.clone()

In [13]:
def _reset(self, tensordict):
    if tensordict is None or tensordict.is_empty():
        # if no ``tensordict`` is passed, we generate a single set of hyperparameters
        # Otherwise, we assume that the input ``tensordict`` contains all the relevant
        # parameters to get started.
        tensordict = self.gen_params(batch_size=self.batch_size)

    high_th = torch.tensor(DEFAULT_X, device=self.device)
    high_thdot = torch.tensor(DEFAULT_Y, device=self.device)
    low_th = -high_th
    low_thdot = -high_thdot

    # for non batch-locked environments, the input ``tensordict`` shape dictates the number
    # of simulators run simultaneously. In other contexts, the initial
    # random state's shape will depend upon the environment batch-size instead.
    th = (
        torch.rand(tensordict.shape, generator=self.rng, device=self.device)
        * (high_th - low_th)
        + low_th
    )
    thdot = (
        torch.rand(tensordict.shape, generator=self.rng, device=self.device)
        * (high_thdot - low_thdot)
        + low_thdot
    )
    out = TensorDict(
        {
            "th": th,
            "thdot": thdot,
            "params": tensordict["params"],
        },
        batch_size=tensordict.shape,
    )
    return out

#### Environment metadata: `env.*_spec`

The specs define the input and output domain of the environment. They can also be used to instantiate lazily defined neural networks and test scripts. There are four specs that we must code in our environment:

- `EnvBase.observation_spec`: This will be a `CompositeSpec` instance where each key is an observation (a CompositeSpec can be viewed as a dictionary of specs).
- `EnvBase.action_spec`: It can be any type of spec, it corresponds to the "action" entry in the input tensordict;
- `EnvBase.reward_spec`: provides information about the reward space;
- `EnvBase.done_spec`: provides information about the space of the done flag.

TorchRL specs are organized in two general containers:
- input_spec which contains the specs of the information that the step function reads (divided between action_spec containing the action and state_spec containing all the rest),
- output_spec which encodes the specs that the step outputs (observation_spec, reward_spec and done_spec).

In general, you should not interact directly with output_spec and input_spec but only with their content: observation_spec, reward_spec, done_spec, action_spec and state_spec. TorchRL offers multiple TensorSpec subclasses to encode the environment’s input and output characteristics.

##### Specs shape
The environment specs leading dimensions must match the environment batch-size. This is done to enforce that every component of an environment (including its transforms) have an accurate representation of the expected input and output shapes. This is something that should be accurately coded in stateful settings. For non batch-locked environments, such as the one in our example (see below), this is irrelevant as the environment batch size will most likely be empty.

In [14]:
def _make_spec(self, td_params):
    # Under the hood, this will populate self.output_spec["observation"]
    self.observation_spec = CompositeSpec(
        th=BoundedTensorSpec(
            low=-torch.pi,
            high=torch.pi,
            shape=(),
            dtype=torch.float32,
        ),
        thdot=BoundedTensorSpec(
            low=-td_params["params", "max_speed"],
            high=td_params["params", "max_speed"],
            shape=(),
            dtype=torch.float32,
        ),
        # we need to add the ``params`` to the observation specs, as we want
        # to pass it at each step during a rollout
        params=make_composite_from_td(td_params["params"]),
        shape=(),
    )
    # since the environment is stateless, we expect the previous output as input.
    # For this, ``EnvBase`` expects some state_spec to be available
    self.state_spec = self.observation_spec.clone()
    # action-spec will be automatically wrapped in input_spec when
    # `self.action_spec = spec` will be called supported
    self.action_spec = BoundedTensorSpec(
        low=-td_params["params", "max_torque"],
        high=td_params["params", "max_torque"],
        shape=(1,),
        dtype=torch.float32,
    )
    self.reward_spec = UnboundedContinuousTensorSpec(shape=(*td_params.shape, 1))


def make_composite_from_td(td):
    # custom function to convert a ``tensordict`` in a similar spec structure
    # of unbounded values.
    composite = CompositeSpec(
        {
            key: make_composite_from_td(tensor)
            if isinstance(tensor, TensorDictBase)
            else UnboundedContinuousTensorSpec(
                dtype=tensor.dtype, device=tensor.device, shape=tensor.shape
            )
            for key, tensor in td.items()
        },
        shape=td.shape,
    )
    return composite

#### Reproducible experiments: seeding

In [15]:
def _set_seed(self, seed: Optional[int]):
    rng = torch.manual_seed(seed)
    self.rng = rng

#### The `EnvBase` class

In [16]:
def gen_params(g=10.0, batch_size=None) -> TensorDictBase:
    """Returns a ``tensordict`` containing the physical parameters such as gravitational force and torque or speed limits."""
    if batch_size is None:
        batch_size = []
    td = TensorDict(
        {
            "params": TensorDict(
                {
                    "max_speed": 8,
                    "max_torque": 2.0,
                    "dt": 0.05,
                    "g": g,
                    "m": 1.0,
                    "l": 1.0,
                },
                [],
            )
        },
        [],
    )
    if batch_size:
        td = td.expand(batch_size).contiguous()
    return td

We define the environment as non-batch_locked by turning the homonymous attribute to False. This means that we will not enforce the input tensordict to have a batch-size that matches the one of the environment.

In [17]:
class PendulumEnv(EnvBase):
    metadata = {
        "render_modes": ["human", "rgb_array"],
        "render_fps": 30,
    }
    batch_locked = False

    def __init__(self, td_params=None, seed=None, device="cpu"):
        if td_params is None:
            td_params = self.gen_params()

        super().__init__(device=device, batch_size=[])
        self._make_spec(td_params)
        if seed is None:
            seed = torch.empty((), dtype=torch.int64).random_().item()
        self.set_seed(seed)

    # Helpers: _make_step and gen_params
    gen_params = staticmethod(gen_params)
    _make_spec = _make_spec

    # Mandatory methods: _step, _reset and _set_seed
    _reset = _reset
    _step = staticmethod(_step)
    _set_seed = _set_seed

#### Testing environment
TorchRL provides a simple function `check_env_specs()` to check that a (transformed) environment has an input/output structure that matches the one dictated by its specs. 

In [18]:
env = PendulumEnv()
check_env_specs(env)

2024-10-20 18:07:52,320 [torchrl][INFO] check_env_specs succeeded!


We can have a look at our specs to have a visual representation of the environment signature:

In [19]:
print("observation_spec:", env.observation_spec)
print("state_spec:", env.state_spec)
print("reward_spec:", env.reward_spec)

observation_spec: CompositeSpec(
    th: BoundedTensorSpec(
        shape=torch.Size([]),
        space=ContinuousBox(
            low=Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, contiguous=True),
            high=Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, contiguous=True)),
        device=cpu,
        dtype=torch.float32,
        domain=continuous),
    thdot: BoundedTensorSpec(
        shape=torch.Size([]),
        space=ContinuousBox(
            low=Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, contiguous=True),
            high=Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, contiguous=True)),
        device=cpu,
        dtype=torch.float32,
        domain=continuous),
    params: CompositeSpec(
        max_speed: UnboundedContinuousTensorSpec(
            shape=torch.Size([]),
            space=None,
            device=cpu,
            dtype=torch.int64,
            domain=discrete),
        max_torque: UnboundedC

We can execute a couple of commands too to check that the output structure matches what is expected.

In [20]:
td = env.reset()
print("reset tensordict", td)

reset tensordict TensorDict(
    fields={
        done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        params: TensorDict(
            fields={
                dt: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
                g: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
                l: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
                m: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
                max_speed: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.int64, is_shared=False),
                max_torque: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([]),
            device=None,
            is_shared=False),
        terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        th: Te

We can run the env.rand_step() to generate an action randomly from the action_spec domain. A tensordict containing the hyperparameters and the current state must be passed since our environment is stateless. In stateful contexts, env.rand_step() works perfectly too.

In [21]:
td = env.rand_step(td)
print("random step tensordict", td)

random step tensordict TensorDict(
    fields={
        action: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
        done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
                params: TensorDict(
                    fields={
                        dt: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
                        g: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
                        l: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
                        m: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
                        max_speed: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.int64, is_shared=False),
         

#### Transforming an environment

Writing environment transforms for stateless simulators is slightly more complicated than for stateful ones: transforming an output entry that needs to be read at the following iteration requires to apply the inverse transform before calling meth.step() at the next step. This is an ideal scenario to showcase all the features of TorchRL’s transforms!

For instance, in the following transformed environment we unsqueeze the entries ["th", "thdot"] to be able to stack them along the last dimension. We also pass them as in_keys_inv to squeeze them back to their original shape once they are passed as input in the next iteration.

In [22]:
env = TransformedEnv(
    env,
    # ``Unsqueeze`` the observations that we will concatenate
    UnsqueezeTransform(
        unsqueeze_dim=-1,
        in_keys=["th", "thdot"],
        in_keys_inv=["th", "thdot"],
    ),
)



#### Writing custom transforms
TorchRL’s transforms may not cover all the operations one wants to execute after an environment has been executed. Writing a transform does not require much effort. As for the environment design, there are two steps in writing a transform:

- Getting the dynamics right (forward and inverse);
- Adapting the environment specs.

A transform can be used in two settings: on its own, it can be used as a `Module`. It can also be used appended to a `TransformedEnv`. The structure of the class allows to customize the behavior in the different contexts.

A `Transform` skeleton can be summarized as follows:

In [23]:
class Transform(nn.Module):
    def forward(self, tensordict):
        ...
    def _apply_transform(self, tensordict):
        ...
    def _step(self, tensordict):
        ...
    def _call(self, tensordict):
        ...
    def inv(self, tensordict):
        ...
    def _inv_apply_transform(self, tensordict):
        ...

There are three entry points (`forward()`, `_step()` and `inv()`) which all receive `tensordict.TensorDict` instances. The first two will eventually go through the keys indicated by in_keys and call `_apply_transform()` to each of these. The results will be written in the entries pointed by `Transform.out_keys` if provided (if not the `in_keys` will be updated with the transformed values). If inverse transforms need to be executed, a similar data flow will be executed but with the `Transform.inv()` and `Transform._inv_apply_transform()` methods and across the `in_keys_inv` and `out_keys_inv` list of keys. The following figure summarized this flow for environments and replay buffers.

In some cases, a transform will not work on a subset of keys in a unitary manner, but will execute some operation on the parent environment or work with the entire input tensordict. In those cases, the `_call()` and `forward()` methods should be re-written, and the `_apply_transform()` method can be skipped.

Let us code new transforms that will compute the sine and cosine values of the position angle, as these values are more useful to us to learn a policy than the raw angle value:

In [None]:
class SinTransform(Transform):
    def _apply_transform(self, obs: torch.Tensor) -> None:
        return obs.sin()

    # The transform must also modify the data at reset time
    def _reset(
        self, tensordict: TensorDictBase, tensordict_reset: TensorDictBase
    ) -> TensorDictBase:
        return self._call(tensordict_reset)

    # _apply_to_composite will execute the observation spec transform across all
    # in_keys/out_keys pairs and write the result in the observation_spec which
    # is of type ``Composite``
    @_apply_to_composite
    def transform_observation_spec(self, observation_spec):
        return BoundedTensorSpec(
            low=-1,
            high=1,
            shape=observation_spec.shape,
            dtype=observation_spec.dtype,
            device=observation_spec.device,
        )


class CosTransform(Transform):
    def _apply_transform(self, obs: torch.Tensor) -> None:
        return obs.cos()

    # The transform must also modify the data at reset time
    def _reset(
        self, tensordict: TensorDictBase, tensordict_reset: TensorDictBase
    ) -> TensorDictBase:
        return self._call(tensordict_reset)

    # _apply_to_composite will execute the observation spec transform across all
    # in_keys/out_keys pairs and write the result in the observation_spec which
    # is of type ``Composite``
    @_apply_to_composite
    def transform_observation_spec(self, observation_spec):
        return BoundedTensorSpec(
            low=-1,
            high=1,
            shape=observation_spec.shape,
            dtype=observation_spec.dtype,
            device=observation_spec.device,
        )


t_sin = SinTransform(in_keys=["th"], out_keys=["sin"])
t_cos = CosTransform(in_keys=["th"], out_keys=["cos"])
env.append_transform(t_sin)
env.append_transform(t_cos)

Concatenates the observations onto an “observation” entry. `del_keys=False` ensures that we keep these values for the next iteration.

In [None]:
cat_transform = CatTensors(
    in_keys=["sin", "cos", "thdot"], dim=-1, out_key="observation", del_keys=False
)
env.append_transform(cat_transform)

Once more, let us check that our environment specs match what is received:

In [None]:
check_env_specs(env)

#### Executing a rollout
Executing a rollout is a succession of simple steps:
- reset the environment
- while some condition is not met:
    - compute an action given a policy
    - execute a step given this action
    - collect the data
    - make a MDP step
- gather the data and return

These operations have been conveniently wrapped in the `rollout()` method, from which we provide a simplified version here below.

In [9]:
def simple_rollout(steps=100):
    # preallocate:
    data = TensorDict({}, [steps])
    # reset
    _data = env.reset()
    for i in range(steps):
        _data["action"] = env.action_spec.rand()
        _data = env.step(_data)
        data[i] = _data
        _data = step_mdp(_data, keep_other=True)
    return data


print("data from rollout:", simple_rollout(100))

data from rollout: TensorDict(
    fields={
        action: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.float32, is_shared=False),
        done: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                params: TensorDict(
                    fields={
                        dt: Tensor(shape=torch.Size([100]), device=cpu, dtype=torch.float32, is_shared=False),
                        g: Tensor(shape=torch.Size([100]), device=cpu, dtype=torch.float32, is_shared=False),
                        l: Tensor(shape=torch.Size([100]), device=cpu, dtype=torch.float32, is_shared=False),
                        m: Tensor(shape=torch.Size([100]), device=cpu, dtype=torch.float32, is_shared=False),
                        max_speed: Tensor(shape=torch.Size([100]), device=cpu, dtype=torch.int64, i

#### Batching computations

The last unexplored end of our tutorial is the ability that we have to batch computations in TorchRL. Because our environment does not make any assumptions regarding the input data shape, we can seamlessly execute it over batches of data. Even better: for non-batch-locked environments such as our Pendulum, we can change the batch size on the fly without recreating the environment. To do this, we just generate parameters with the desired shape.

In [11]:
batch_size = 1  # number of environments to be executed in batch
td = env.reset(env.gen_params(batch_size=[batch_size]))
print("reset (batch size of 10)", td)
td = env.rand_step(td)
print("rand step (batch size of 10)", td)

reset (batch size of 10) TensorDict(
    fields={
        done: Tensor(shape=torch.Size([1, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        params: TensorDict(
            fields={
                dt: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
                g: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
                l: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
                m: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
                max_speed: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.int64, is_shared=False),
                max_torque: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([1]),
            device=None,
            is_shared=False),
        terminated: Tensor(shape=torch.Size([1, 1]), device=cpu, dtype=torch.bool, is_shared=F

Executing a rollout with a batch of data requires us to reset the environment out of the rollout function, since we need to define the batch_size dynamically and this is not supported by `rollout()`:

In [None]:
rollout = env.rollout(
    3,
    auto_reset=False,  # we're executing the reset out of the ``rollout`` call
    tensordict=env.reset(env.gen_params(batch_size=[batch_size])),
)
print("rollout of len 3 (batch size of 10):", rollout)

#### Training a simple policy
In this example, we will train a simple policy using the reward as a differentiable objective, such as a negative loss. We will take advantage of the fact that our dynamic system is fully differentiable to backpropagate through the trajectory return and adjust the weights of our policy to maximize this value directly. Of course, in many settings many of the assumptions we make do not hold, such as differentiable system and full access to the underlying mechanics.%0A%0AStill, this is a very simple example that showcases how a training loop can be coded with a custom environment in TorchRL.%0A%0ALet us first write the policy network:

In [None]:
torch.manual_seed(0)
env.set_seed(0)

net = nn.Sequential(
    nn.LazyLinear(64),
    nn.Tanh(),
    nn.LazyLinear(64),
    nn.Tanh(),
    nn.LazyLinear(64),
    nn.Tanh(),
    nn.LazyLinear(1),
)
policy = TensorDictModule(
    net,
    in_keys=["observation"],
    out_keys=["action"],
)

optim = torch.optim.Adam(policy.parameters(), lr=2e-3)

#### Training loop
We will successively:

- generate a trajectory
- sum the rewards
- backpropagate through the graph defined by these operations
- clip the gradient norm and make an optimization step
- repeat

At the end of the training loop, we should have a final reward close to 0 which demonstrates that the pendulum is upward and still as desired.



In [None]:
batch_size = 32
pbar = tqdm.tqdm(range(20_000 // batch_size))
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optim, 20_000)
logs = defaultdict(list)

for _ in pbar:
    init_td = env.reset(env.gen_params(batch_size=[batch_size]))
    rollout = env.rollout(100, policy, tensordict=init_td, auto_reset=False)
    traj_return = rollout["next", "reward"].mean()
    (-traj_return).backward()
    gn = torch.nn.utils.clip_grad_norm_(net.parameters(), 1.0)
    optim.step()
    optim.zero_grad()
    pbar.set_description(
        f"reward: {traj_return: 4.4f}, "
        f"last reward: {rollout[..., -1]['next', 'reward'].mean(): 4.4f}, gradient norm: {gn: 4.4}"
    )
    logs["return"].append(traj_return.item())
    logs["last_reward"].append(rollout[..., -1]["next", "reward"].mean().item())
    scheduler.step()


def plot():
    import matplotlib
    from matplotlib import pyplot as plt

    is_ipython = "inline" in matplotlib.get_backend()
    if is_ipython:
        from IPython import display

    with plt.ion():
        plt.figure(figsize=(10, 5))
        plt.subplot(1, 2, 1)
        plt.plot(logs["return"])
        plt.title("returns")
        plt.xlabel("iteration")
        plt.subplot(1, 2, 2)
        plt.plot(logs["last_reward"])
        plt.title("last reward")
        plt.xlabel("iteration")
        if is_ipython:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        plt.show()


plot()

In this tutorial, we have learned how to code a stateless environment from scratch. We touched the subjects of:

The four essential components that need to be taken care of when coding an environment (step, reset, seeding and building specs). We saw how these methods and classes interact with the TensorDict class;

How to test that an environment is properly coded using check_env_specs();

How to append transforms in the context of stateless environments and how to write custom transformations;

How to train a policy on a fully differentiable simulator.

# Practice

## my_bot

### Troubleshoot

In [6]:
from collections import defaultdict
from typing import Optional

import numpy as np
import torch
import tqdm
from tensordict import TensorDict, TensorDictBase
from tensordict.nn import TensorDictModule
from torch import nn

from torchrl.data import OneHotDiscreteTensorSpec, DiscreteTensorSpec, BoundedTensorSpec, CompositeSpec, UnboundedContinuousTensorSpec
from torchrl.envs import (
    CatTensors,
    EnvBase,
    Transform,
    TransformedEnv,
    UnsqueezeTransform,
)
from torchrl.envs.transforms.transforms import _apply_to_composite
from torchrl.envs.utils import check_env_specs, step_mdp


In [10]:
class MiniEnv(EnvBase):
    def __init__(self):
        super().__init__()

        # e.g. a conveyor belt with some values there
        observation_spec = DiscreteTensorSpec(n=3, shape=torch.Size([6]))
        self.observation_spec = CompositeSpec(
            # observation=observation_spec, shape=observation_spec.shape
            # observation=observation_spec, shape=[]
            observation=observation_spec, shape=()
        )
        self.action_spec = OneHotDiscreteTensorSpec(
            2,  # shift left or right
            shape=torch.Size([2]),
        )
        self.reward_spec = UnboundedContinuousTensorSpec(shape=torch.Size([1]))

    def _reset(self, tensordict, *kwargs):
        pass

    def _step(self, tensordict):
        pass

    def _set_seed(self, seed):
        pass

env = MiniEnv()
env.fake_tensordict()

TensorDict(
    fields={
        action: Tensor(shape=torch.Size([2]), device=cpu, dtype=torch.bool, is_shared=False),
        done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
                observation: Tensor(shape=torch.Size([6]), device=cpu, dtype=torch.int64, is_shared=False),
                reward: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([]),
            device=None,
            is_shared=False),
        observation: Tensor(shape=torch.Size([6]), device=cpu, dtype=torch.int64, is_shared=False),
        terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([])