## Torchrl

In [1]:
import torch

print(torch.__version__)

2.4.1+cu121


In [None]:
!pip install pyntcloud

In [89]:
# first time installs
!pip uninstall numpy -y
!pip install numpy==1.22.4
!pip install pandas
!pip install tqdm
!pip install tensordict-nightly
!pip install torchrl-nightly
!pip install torchvision

Found existing installation: numpy 1.22.4
Uninstalling numpy-1.22.4:
  Successfully uninstalled numpy-1.22.4
Defaulting to user installation because normal site-packages is not writeable
Collecting numpy==1.22.4
  Using cached numpy-1.22.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (16.8 MB)
Installing collected packages: numpy
Successfully installed numpy-1.22.4
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Collecting torchvision
  Downloading torchvision-0.20.0-cp310-cp310-manylinux1_x86_64.whl (7.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.2/7.2 MB[0m [31m10.8 MB/s[0m eta [36m0:00:00[0m00:01[0m0:

In [3]:
!ls /usr/local/cuda/lib64 | grep libcudnn

In [1]:
from collections import defaultdict
from typing import Optional


import cv2 as cv
import numpy as np
import pandas as pd
import torch
import tqdm
from tensordict import TensorDict, TensorDictBase
from tensordict.nn import TensorDictModule
from torch import nn
import torchvision
from torchrl.data import BoundedTensorSpec, CompositeSpec, UnboundedContinuousTensorSpec
from torchrl.envs import (
    EnvBase,
    Transform,
    Compose,
    ToTensorImage,
    Resize,
    TransformedEnv,
    UnsqueezeTransform,
)


from torchrl.envs.transforms.transforms import _apply_to_composite
from torchrl.envs.utils import check_env_specs, step_mdp




### Environment
#### `_step()`

- Read the input keys (such as "action") and execute the simulation based on these;
- Retrieve observations, done state and reward;
- Write the set of observation values along with the reward and done state at the corresponding entries in a new TensorDict.
- Merge the output TensorDict (as "next" key) in the input TensorDict.

In [2]:
import os
import glob

data_path = '/ROS2_my_bot/my_bot/src/my_bot_controller/resource/24_10_19_sensorDump/'
image_path = 'egoCam/*.png'

image_files = glob.glob(data_path + image_path)
trajectory_data = pd.read_csv(data_path + 'trajectory.csv')


In [3]:
def _step(tensordict):
    
    # take a step
    step = tensordict["stepInt"].item()

    image = cv.imread(image_files[step])
    image_tensor = torch.tensor(image, dtype=torch.float32)

    step += 1

    step_tensor = torch.tensor(step, dtype=torch.int32)

    # laser_readings = ""
    # laser_tensor = torch.tensor(laser_readings, dtype=torch.float32)

    progress = torch.tensor(0, dtype=torch.float32)

    reward = progress.view(*tensordict.shape, 1)
    done = torch.zeros_like(reward, dtype=torch.bool)


    # write observations
    # merge the output with input tensordict
    out = TensorDict(
        {
            "stepInt": step_tensor,
            "image": image_tensor,
            # "laser": laser_tensor,
            "params": tensordict["params"],
            "reward": reward,
            "done": done,
        },
        tensordict.shape,
    )

    return out

#### _reset()

In [None]:

# retrieve observations (observations, reward, done)


In [4]:
def _reset(self, tensordict):
    if tensordict is None or tensordict.is_empty():
        # if no ``tensordict`` is passed, we generate a single set of hyperparameters
        # Otherwise, we assume that the input ``tensordict`` contains all the relevant
        # parameters to get started.
        tensordict = self.gen_params(batch_size=self.batch_size)


    # retrieve observations (observations, reward, done)
    step = 0

    step_tensor = torch.tensor(step, dtype=torch.int32)
    # take a step
    image = cv.imread(image_files[step])
    image_tensor = torch.tensor(image, dtype=torch.float32)

    out = TensorDict(
        {   
            "stepInt": step_tensor,
            "image": image_tensor,
            "params": tensordict["params"],
        },
        batch_size=tensordict.shape,
    )
    return out

#### Environment metadata: `env.*_spec`

The specs define the input and output domain of the environment. They can also be used to instantiate lazily defined neural networks and test scripts. There are four specs that we must code in our environment:

- `EnvBase.observation_spec`: This will be a `CompositeSpec` instance where each key is an observation (a CompositeSpec can be viewed as a dictionary of specs).
- `EnvBase.action_spec`: It can be any type of spec, it corresponds to the "action" entry in the input tensordict;
- `EnvBase.reward_spec`: provides information about the reward space;
- `EnvBase.done_spec`: provides information about the space of the done flag.

TorchRL specs are organized in two general containers:
- input_spec which contains the specs of the information that the step function reads (divided between action_spec containing the action and state_spec containing all the rest),
- output_spec which encodes the specs that the step outputs (observation_spec, reward_spec and done_spec).

In general, you should not interact directly with output_spec and input_spec but only with their content: observation_spec, reward_spec, done_spec, action_spec and state_spec. TorchRL offers multiple TensorSpec subclasses to encode the environment’s input and output characteristics.

##### Specs shape
The environment specs leading dimensions must match the environment batch-size. This is done to enforce that every component of an environment (including its transforms) have an accurate representation of the expected input and output shapes. This is something that should be accurately coded in stateful settings. For non batch-locked environments, such as the one in our example (see below), this is irrelevant as the environment batch size will most likely be empty.

In [5]:
def _make_spec(self, td_params):
    # Under the hood, this will populate self.output_spec["observation"]
    self.observation_spec = CompositeSpec(
        stepInt=BoundedTensorSpec(
            low=td_params["params", "step_start"],
            high=td_params["params", "step_end"],
            shape=(),
            dtype=torch.int32,
        ),
        image=UnboundedContinuousTensorSpec(
            shape=(td_params["params", "imageHeight"], td_params["params", "imageWidth"], 3),
            dtype=torch.float32,
        ),
        # we need to add the ``params`` to the observation specs, as we want
        # to pass it at each step during a rollout
        params=make_composite_from_td(td_params["params"]),
        shape=(),
    )

    # action-spec will be automatically wrapped in input_spec when
    # `self.action_spec = spec` will be called supported
    self.action_spec = CompositeSpec(
        action=CompositeSpec(
            linear_velocity=BoundedTensorSpec(
                low=-td_params["params", "max_linear_velocity"],
                high=td_params["params", "max_linear_velocity"],
                shape=(),
                dtype=torch.float32,
            ),
            angular_velocity=BoundedTensorSpec(
                low=-td_params["params", "max_angular_velocity"],
                high=td_params["params", "max_angular_velocity"],
                shape=(),
                dtype=torch.float32,
            ),
            shape=(),
        ),
        shape=(),
    )
    self.reward_spec = UnboundedContinuousTensorSpec(shape=(*td_params.shape, 1))


def make_composite_from_td(td):
    # custom function to convert a ``tensordict`` in a similar spec structure
    # of unbounded values.
    composite = CompositeSpec(
        {
            key: make_composite_from_td(tensor)
            if isinstance(tensor, TensorDictBase)
            else UnboundedContinuousTensorSpec(
                dtype=tensor.dtype, device=tensor.device, shape=tensor.shape
            )
            for key, tensor in td.items()
        },
        shape=td.shape,
    )
    return composite

#### Seeding

In [6]:
def _set_seed(self, seed: Optional[int]):
    rng = torch.manual_seed(seed)
    self.rng = rng

#### Generate Parameters

In [25]:
def gen_params(batch_size=None) -> TensorDictBase:
    """Returns a ``tensordict`` containing the physical parameters such as gravitational force and torque or speed limits."""
    if batch_size is None:
        batch_size = []
    td = TensorDict(
        {
            "params": TensorDict(
                {
                    "step_start": 0,
                    "step_end": 20,
                    "max_linear_velocity": 1.0,
                    "max_angular_velocity": 1.0,
                    "imageHeight": 480,
                    "imageWidth": 640,
                },
                [],
            )
        },
        [],
    )
    if batch_size:
        td = td.expand(batch_size).contiguous()
    return td

#### Environment class

In [38]:
class my_botEnv(EnvBase):
    metadata = {
        "render_modes": ["human", "rgb_array"],
        "render_fps": 30,
    }
    batch_locked = True

    def __init__(self, td_params=None, seed=None, device="cpu"):
        if td_params is None:
            td_params = self.gen_params()

        super().__init__(device=device, batch_size=[])
        self._make_spec(td_params)
        if seed is None:
            seed = torch.empty((), dtype=torch.int64).random_().item()
        self.set_seed(seed)

    # Helpers: _make_step and gen_params
    gen_params = staticmethod(gen_params)
    _make_spec = _make_spec

    # Mandatory methods: _step, _reset and _set_seed
    _reset = _reset
    _step = staticmethod(_step)
    _set_seed = _set_seed

#### Check environment implementation

In [39]:
env = my_botEnv()
check_env_specs(env)

2024-10-20 21:48:00,865 [torchrl][INFO] check_env_specs succeeded!


In [34]:
print("observation_spec:", env.observation_spec)
print("state_spec:", env.state_spec)
print("reward_spec:", env.reward_spec)

observation_spec: Composite(
    stepInt: BoundedDiscrete(
        shape=torch.Size([]),
        space=ContinuousBox(
            low=Tensor(shape=torch.Size([]), device=cpu, dtype=torch.int32, contiguous=True),
            high=Tensor(shape=torch.Size([]), device=cpu, dtype=torch.int32, contiguous=True)),
        device=cpu,
        dtype=torch.int32,
        domain=discrete),
    image: UnboundedContinuous(
        shape=torch.Size([480, 640, 3]),
        space=ContinuousBox(
            low=Tensor(shape=torch.Size([480, 640, 3]), device=cpu, dtype=torch.float32, contiguous=True),
            high=Tensor(shape=torch.Size([480, 640, 3]), device=cpu, dtype=torch.float32, contiguous=True)),
        device=cpu,
        dtype=torch.float32,
        domain=continuous),
    params: Composite(
        step_start: UnboundedDiscrete(
            shape=torch.Size([]),
            space=ContinuousBox(
                low=Tensor(shape=torch.Size([]), device=cpu, dtype=torch.int64, contiguous=True

In [41]:
td = env.reset()
print("reset tensordict", td)

reset tensordict TensorDict(
    fields={
        done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        image: Tensor(shape=torch.Size([480, 640, 3]), device=cpu, dtype=torch.float32, is_shared=False),
        params: TensorDict(
            fields={
                imageHeight: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.int64, is_shared=False),
                imageWidth: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.int64, is_shared=False),
                max_angular_velocity: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
                max_linear_velocity: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
                step_end: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.int64, is_shared=False),
                step_start: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([]),
            d

In [42]:
td = env.rand_step(td)
print("random step tensordict", td)

random step tensordict TensorDict(
    fields={
        action: TensorDict(
            fields={
                angular_velocity: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
                linear_velocity: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([]),
            device=cpu,
            is_shared=False),
        done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        image: Tensor(shape=torch.Size([480, 640, 3]), device=cpu, dtype=torch.float32, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
                image: Tensor(shape=torch.Size([480, 640, 3]), device=cpu, dtype=torch.float32, is_shared=False),
                params: TensorDict(
                    fields={
                        imageHeight: Tensor(shape=torch.Si

### Transforming environment

In [43]:
base_env = my_botEnv()
transform = Compose(ToTensorImage(in_keys=["image"]), Resize(64, 64, in_keys=["image"]))
env = TransformedEnv(base_env, transform)

In [44]:
check_env_specs(env)

2024-10-20 21:48:22,755 [torchrl][INFO] check_env_specs succeeded!


### Rollout

In [45]:
def simple_rollout(steps=15):
    # preallocate:
    data = TensorDict({}, [steps])
    # reset
    _data = env.reset()
    for i in range(steps):
        _data["action"] = env.action_spec.rand()
        _data = env.step(_data)
        data[i] = _data
        _data = step_mdp(_data, keep_other=True)
    return data


print("data from rollout:", simple_rollout(15))

data from rollout: TensorDict(
    fields={
        action: TensorDict(
            fields={
                action: TensorDict(
                    fields={
                        angular_velocity: Tensor(shape=torch.Size([15]), device=cpu, dtype=torch.float32, is_shared=False),
                        linear_velocity: Tensor(shape=torch.Size([15]), device=cpu, dtype=torch.float32, is_shared=False)},
                    batch_size=torch.Size([15]),
                    device=cpu,
                    is_shared=False)},
            batch_size=torch.Size([15]),
            device=cpu,
            is_shared=False),
        done: Tensor(shape=torch.Size([15, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        image: Tensor(shape=torch.Size([15, 3, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([15, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                image: T

### Batching

In [46]:
batch_size = 1 # number of environments to be executed in batch
td = env.reset(env.gen_params(batch_size=[batch_size]))
print(f"reset (batch size of {batch_size})", td)
td = env.rand_step(td)
print(f"rand step (batch size of {batch_size})", td)

RuntimeError: Expected a tensordict with shape==env.batch_size, got torch.Size([1]) and torch.Size([])

### Training a Simple Policy

In [23]:
torch.manual_seed(0)
env.set_seed(0)

net = nn.Sequential(
    nn.LazyLinear(64),
    nn.Tanh(),
    nn.LazyLinear(64),
    nn.Tanh(),
    nn.LazyLinear(64),
    nn.Tanh(),
    nn.LazyLinear(1),
)
policy = TensorDictModule(
    net,
    in_keys=["observation"],
    out_keys=["action"],
)

optim = torch.optim.Adam(policy.parameters(), lr=2e-3)

#### Training loop
We will successively:

- generate a trajectory
- sum the rewards
- backpropagate through the graph defined by these operations
- clip the gradient norm and make an optimization step
- repeat

At the end of the training loop, we should have a final reward close to 0 which demonstrates that the pendulum is upward and still as desired.



In [47]:
batch_size = 1
pbar = tqdm.tqdm(range(20_000 // batch_size))
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optim, 20_000)
logs = defaultdict(list)

for _ in pbar:
    init_td = env.reset(env.gen_params(batch_size=[batch_size]))
    rollout = env.rollout(100, policy, tensordict=init_td, auto_reset=False)
    traj_return = rollout["next", "reward"].mean()
    (-traj_return).backward()
    gn = torch.nn.utils.clip_grad_norm_(net.parameters(), 1.0)
    optim.step()
    optim.zero_grad()
    pbar.set_description(
        f"reward: {traj_return: 4.4f}, "
        f"last reward: {rollout[..., -1]['next', 'reward'].mean(): 4.4f}, gradient norm: {gn: 4.4}"
    )
    logs["return"].append(traj_return.item())
    logs["last_reward"].append(rollout[..., -1]["next", "reward"].mean().item())
    scheduler.step()


def plot():
    import matplotlib
    from matplotlib import pyplot as plt

    is_ipython = "inline" in matplotlib.get_backend()
    if is_ipython:
        from IPython import display

    with plt.ion():
        plt.figure(figsize=(10, 5))
        plt.subplot(1, 2, 1)
        plt.plot(logs["return"])
        plt.title("returns")
        plt.xlabel("iteration")
        plt.subplot(1, 2, 2)
        plt.plot(logs["last_reward"])
        plt.title("last reward")
        plt.xlabel("iteration")
        if is_ipython:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        plt.show()


plot()

  0%|          | 0/20000 [00:00<?, ?it/s]


RuntimeError: Expected a tensordict with shape==env.batch_size, got torch.Size([1]) and torch.Size([])