# Create a custom Environment

In this tutorial, we will go over `Environment` objects, how they work and how to build a custom subclass to implement your own task design.

Let's start by importing what we need.


In [1]:
import os
import sys
import numpy as np
import torch as th
from IPython import get_ipython

implementation = "motorchnet"


colab_env = 'google.colab' in str(get_ipython()) if hasattr(__builtins__,'__IPYTHON__') else False
colab_initialized = True if os.path.exists("MotorNet") else False

if colab_env and not colab_initialized:
  !git clone https://github.com/OlivierCodol/MotorNet
  sys.path.append('MotorNet')
  print("Running cell using COLAB initialization...")
elif colab_env and colab_initialized:
  print("Already initialized using COLAB initialization.")
else:
  paths = [p for p in sys.path if os.path.exists(p)]
  local_initialized = True if [p for p in paths if implementation in os.listdir(p)] else False
  if local_initialized:
    %load_ext autoreload
    %autoreload 2
    print("Already initialized using LOCAL initialization.")
  else:
    path = [p for p in paths if p.__contains__("examples")]
    if len(path) != 1:
      raise ValueError("Path to MotorNet could not be determined with certainty.")
    else:
       path = path[0]
    sys.path.append(os.path.dirname(path[:path.rfind('examples')]))
    %load_ext autoreload
    %autoreload 2
    print("Running cell using LOCAL initialization...")

if implementation == "motorchnet":
  import motorchnet as mn
else:
  import motornet as mn


print('All packages imported.')
print('pytorch version: ' + th.__version__)
print('numpy version: ' + np.__version__)
print('motornet version: ' + mn.__version__)

Running cell using LOCAL initialization...
All packages imported.
pytorch version: 2.0.1
numpy version: 1.23.0
motornet version: 0.2.0



# I. Useful methods in `Environment` objects

Several methods are useful to assess what your task object currently contains.
- The `print_attributes` method will print all attributes held by the `Environment` instance as well as their current value.
- The `get_attributes` method will fetch those attributes, and return two lists: one with the name of each attribute, and one with the associated value of each attribute.

First, let's import a built-in `Environment` object and create an instance.

In [2]:

effector = mn.effector.ReluPointMass24()
env = mn.environment.Environment(effector=effector, name='env')


In [3]:

env.print_attributes()

T_destination:  ~T_destination
action_frame_stacking:  0
action_noise:  [0.0, 0.0, 0.0, 0.0]
action_space:  Box(0.0, 1.0, (4,), float32)
call_super_init:  False
delay_range:  [0, 0]
device:  cpu
differentiable:  True
dt:  0.01
dump_patches:  False
elapsed:  0.0
goal:  tensor([[0., 0.]])
max_ep_duration:  1.0
metadata:  {'render_modes': []}
n_muscles:  4
np_random:  Generator(PCG64)
nq_init:  None
obs_buffer:  {'proprioception': [tensor([[2.2041, 3.5503, 2.2970, 3.6087, 0.0000, 0.0000, 0.0000, 0.0000]])], 'vision': [tensor([[0.9683, 0.0523]])], 'action': []}
obs_noise:  [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
observation_space:  Box(-inf, inf, (12,), float32)
proprioception_delay:  1
proprioception_noise:  [0.0]
q_init:  None
render_mode:  None
reward_range:  (-inf, inf)
seed:  40823744835879118475264510630667754255
space_dim:  2
spec:  None
states:  {'joint': tensor([[0.9683, 0.0523, 0.0000, 0.0000]]), 'cartesian': tensor([[0.9683, 0.0523, 0.0000, 0.0000]]), 'muscl

In [4]:
attr_names, attr_values = env.get_attributes()
print(attr_names)
print(attr_values)

['T_destination', 'action_frame_stacking', 'action_noise', 'call_super_init', 'delay_range', 'device', 'differentiable', 'dt', 'dump_patches', 'elapsed', 'max_ep_duration', 'metadata', 'n_muscles', 'nq_init', 'obs_noise', 'proprioception_delay', 'proprioception_noise', 'q_init', 'render_mode', 'reward_range', 'seed', 'space_dim', 'spec', 'training', 'vision_delay', 'vision_noise']
[~T_destination, 0, [0.0, 0.0, 0.0, 0.0], False, [0, 0], device(type='cpu'), True, 0.01, False, 0.0, 1.0, {'render_modes': []}, 4, None, [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], 1, [0.0], None, None, (-inf, inf), 40823744835879118475264510630667754255, 2, None, True, 1, [0.0]]


# II. Initializing an `Environment` object

Once created, an `Environment` requires to be initialized. This can be done using the `Environment.reset()` method, which outputs an initial observation tensor and info dictionary.


In [5]:
obs, info = env.reset(batch_size=7)

print("obs shape:          ", obs.shape, end="\n\n\n")

for key, val in info.items():
  if type(val) is dict:
    print(key + ": ")
    for k, v in val.items():
      print("\t\t\t" + k + " shape:" + " " * (10-len(k)), v.shape)
  else:
    print(key + " shape:" + " " * (13-len(key)), val.shape)


obs shape:           torch.Size([7, 12])


states: 
			joint shape:      torch.Size([7, 4])
			cartesian shape:  torch.Size([7, 4])
			muscle shape:     torch.Size([7, 4, 4])
			geometry shape:   torch.Size([7, 4, 4])
			fingertip shape:  torch.Size([7, 2])
action shape:        torch.Size([7, 4])
noisy action shape:  torch.Size([7, 4])
goal shape:          torch.Size([7, 2])


The breakdown of this content is already detailed at the end of the previous tutorial on states, but this is repeated here for convenience.

We can see that the `Effector` states are carried over to the `info` dictionary. In addition, the action, and its noisy version are also available, with shape `(batch_size, n_muscles)`. Additionally, the goal attribute, which is held at the `Environment` level, is also returned.

The observation vector dimensionality is `(batch_size, n_observations)`, with the second dimension being an arbitrary combination of `Effector` states and other information that the user deems relevant for the network to receive as input. These observations can be potentially noised and/or time-delayed. By default, this is the goal, the fingertip state, the normalized muscle length for each muscle and normalized muscle elocity for each muscle. Since there are five muscles in this effector, and the goal and fingertip are 2D cartesian position vectors, this yields `n_observations = 2 + 2 + 5 + 5 = 14`. The content of the observation vector, and whether noise and/or time-delay is applied is defined when building the `Environment` class. This is further detailed in the follow-up `3-environments.ipynb` tutorial.

**Importantly**, users familiar with reinforcement learning (RL) toolboxes will notice that this API design matches the standardized API used for RL packages and pipelines. This is not coincidental. The RL community has been relying on open-source packages and cross-compatible pipelines for years now, and abiding by a similar API design ensures that the insights and best practices learnt through this experience is translated here. Another important feature is that it enables compatibility between this package and packages implementing state-of-the-art RL agents.


# III. Subclassing an `Environment` object

Now let's try to build our own task design. To do so, we will go over a simple subclassing process to create a custom environment. The task we will try to build is a reaching task from a random starting position in the full joint space to a random target in that joint space.

**NOTE:** The `motornet.environment.Environment` class shares a lot of similarity with the [`gymnasium.Env`](https://gymnasium.farama.org/api/env/#gymnasium-env) class from the popular [`gymnasium`](https://gymnasium.farama.org) package. Getting familiar with this package will provide much insight into the logic of this class for `motornet`. Conversely, people already familiar with `gymnasium` should quickly be able to grasp how the `motornet.Environment` class works.

**NOTE 2:** Actually, the `motornet.environment.Environment` class is a subclass of `gymnasium.Env`, so packages compatible with `gymnasium` should also be compatible with `motornet`.

## III. 1. Inheritence of the `Environment` subclass
The base class for environments is `mn.environment.Environment`, so we will make the custom environment inherit from that base class. We can then define a `__name__` for the class.

## III. 2. The `reset()` method

As we saw above, the reset method initializes the environment for a simulaton episode. This method should be called before any new episode, *not just when creating the class*.

When overwriting the `reset()` method, one should ensure that the new method also calls the `environment.effector.reset()` (or to be specific. `self.effector.reset()`) so that the effector states are initialized before they are used by follow-up code.

One can then define a custom goal here, and assign it to the `self.goal` attribute so that it is available to every other method. The `self.elapsed` attribute should likely be set to 0. as well, as this attribute keeps track of how much simulation time has elapsed since the beginning of the simulation episode.

The `self.obs_buffer` is a dictionary that keeps in memory the several previous states for each entry available. This is usually the proprioception, vision, and action log. Each key is associated with a list containing as many elements as the number of timesteps that this buffer goes back to, in line with the delay properties provided by the user when creting the object instance. It should be `proprioception_delay`, `vision_delay`, and `action_frame_stacking` for proprioception, vision, and action, respectively, and their default values should be 1, 1, and 0, respectively. The 1st item of the list is always the oldest, and the last is the most recent (the instantaneous state).

When initializing the observation buffer, we want to use the initial states and fill the buffer with it. To get the vision and proprioception feedback, we can use the `self.get_proprioception()` and `self.get_vision()` methods.


We can then get the observation from the `self.get_obs()` method (more on this later), and we set the initial action to be zeros. Finally, we pack all this into the `info` dictionary and return the `(obs, info)` tuple.

**Note on seeding:** One can pass a `seed` argument to the `reset()` method to seed the class instance for reproducibility. If the user implements this function to their custom class, they should call the `self._set_generator(seed)` method to initialize a generator with the seed they are passing in as a keyword argument. This will also set a seeded generator to the effector attached to the `Environment` instance. The seed used for the effector will be randomly drawn from the (already seeded) `Environment` generator.


In [6]:

class MyCustomEnv(mn.environment.Environment):
  """A reach to a random target from a random starting position."""

  def __init__(self, *args, **kwargs):
    # pass everything as-is to the parent Environment class
    super().__init__(*args, **kwargs)
    self.__name__ = "my_custom_env"

  def reset(self, batch_size: int = 1, joint_state=None, deterministic: bool = False, seed: int | None = None):
    """
    Uses the :meth:`Environment.reset()` method of the parent class :class:`Environment` that can be overwritten to 
    change the returned data. Here the goals (`i.e.`, the targets) are drawn from a random uniform distribution across
    the full joint space.
    """
    self._set_generator(seed)  # seed the environment and the effector
  
    self.effector.reset(batch_size, joint_state)
  
    goal = self.joint2cartesian(self.effector.draw_random_uniform_states(batch_size)).chunk(2, dim=-1)[0]
    self.goal = goal if self.differentiable else self.detach(goal)
    self.elapsed = 0.
  
    action = th.zeros((batch_size, self.muscle.n_muscles)).to(self.device)
  
    self.obs_buffer["proprioception"] = [self.get_proprioception()] * len(self.obs_buffer["proprioception"])
    self.obs_buffer["vision"] = [self.get_vision()] * len(self.obs_buffer["vision"])
    self.obs_buffer["action"] = [action] * self.action_frame_stacking
  
    obs = self.get_obs(deterministic=deterministic)
    info = {
      "states": self.states,
      "action": action,
      "noisy action": action,  # no noise here so it is the same
      "goal": self.goal,
      }
    return obs, info


## III. 3. The `step()` method


The `Environment.step()` method should be called at every step of the episode. It requires an action vector as positional argument, with dimensionality `(batch_size, n_muscles)`. Ideally, a `deterministic` boolean keyword argument should also be present, that defines if noise is applied to various elements of the simulation.

This method would usually see the `self.elapsed` attribute updated if the duration of the episode needs to be tracked.

The `self.apply_noise()` method will also be useful to those who want to apply noise on various arrays, such as the action input.
Note of caution, make sure you don't apply observation noise in the `Environment.step()` method and in the `Environment.get_obs()` method at the same time!

Once noise is applied to the action input, it may be passed on to the effector using `self.effector.step(action)`, which will run an integration step of the effector and update the effector's states accordingly. The `Environment.state` attribute will be automatically updated as well to the new states.

The goal can also be changed dynamically if the user desires. This can be useful for moving targets or many-targets reaches for instance, where conditional changes should occur. Here the goal is static so we are simply cloning the previous goal.

The next observation can be fetched using the `self.get_obs()` method.

Finally, the output should be a 5-elements tuple containing the observation vector, the reward information (`None` here as this is only useful for RL), whether the episode is terminated this timestep, whether the episode was terminated early (truncated, always `False` here), and finally the `info` dictionary containing this step's information.


In [7]:


class MyCustomEnv(mn.environment.Environment):
  """A reach to a random target from a random starting position."""

  def __init__(self, *args, **kwargs):
    # pass everything as-is to the parent Environment class
    super().__init__(*args, **kwargs)
    self.__name__ = "my_custom_env"

  def step(self, action, deterministic: bool = False):
    """
    Perform one simulation step. This method is likely to be overwritten by any subclass to implement user-defined 
    computations, such as reward value calculation for reinforcement learning, custom truncation or termination
    conditions, or time-varying goals.
    
    Args:
    action: `Tensor` or `numpy.ndarray`, the input drive to the actuators.
    deterministic: `Boolean`, whether observation, action, proprioception, and vision noise are applied.
    
    Returns:
    - The observation vector as `tensor` or `numpy.ndarray`, if the :class:`Environment` is set as differentiable or 
    not, respectively. It has dimensionality `(batch_size, n_features)`.
    - A `numpy.ndarray` with the reward information for the step, with dimensionality `(batch_size, 1)`. This is 
    `None` if the :class:`Environment` is set as differentiable. By default this always returns `0.` in the 
    :class:`Environment`.
    - A `boolean` indicating if the simulation has been terminated or truncated. If the :class:`Environment` is set as
    differentiable, this returns `True` when the simulation time reaches `max_ep_duration` provided at 
    initialization.
    - A `boolean` indicating if the simulation has been truncated early or not. This always returns `False` if the
    :class:`Environment` is set as differentiable.
    - A `dictionary` containing this step's information.
    """
    
    self.elapsed += self.dt
    
    if deterministic is False:
      noisy_action = self.apply_noise(action, noise=self.action_noise)
    else:
      noisy_action = action
    
    self.effector.step(noisy_action)
    self.goal = self.goal.clone()
    
    obs = self.get_obs(action=noisy_action)
    reward = None
    truncated = False
    terminated = bool(self.elapsed >= self.max_ep_duration)
    info = {
      "states": self.states,
      "action": action,
      "noisy action": noisy_action,
      "goal": self.goal,
      }
    return obs, reward, terminated, truncated, info


## III. 4. The `get_obs()`, `get_proprioception()`, and `get_vision()` methods

The other methods likely to be overwritten are `Environment.get_obs()`, `Environment.get_proprioception()`, and `Environment.get_vision()`.

Likely the most important element to add to the `Environment.get_obs()` method is a call to `self.update_obs_buffer()`. This call ensures the buffer is updated with the latest state from the effector and that the oldest state is discarded.

The proprioception and vision buffers are updated by fetching the latest effector states via the `Environment.get_proprioception()` and `Environment.get_vision()` methods. Therefore, the user can control what proprioception and vision encompass by overwriting these two methods. They should always return a `(batch_size, n_features)` tensor, with `n_feature` being an arbitrary integer.

Once this is done, we can proceed to collecting states from the buffer if we desire. As you may notice, this mainly includes fetching vision and proprioception states, but we can include other elements to it if we wish. Typically, this includes the target position, which for this environment, is available in the `self.goal` attribute.


In [8]:

class MyCustomEnv(mn.environment.Environment):
  """A reach to a random target from a random starting position."""

  def __init__(self, *args, **kwargs):
    # pass everything as-is to the parent Environment class
    super().__init__(*args, **kwargs)
    self.__name__ = "my_custom_env"

  def get_proprioception(self):
    """
    Returns a `(batch_size, n_features)` `tensor` containing the instantaneous (non-delayed) proprioceptive 
    feedback. By default, this is the normalized muscle length for each muscle, followed by the normalized
    muscle velocity for each muscle as well. `.i.i.d.` Gaussian noise is added to each element in the `tensor`,
    using the :attribute:`proprioception_noise` attribute.
    """
    mlen = self.states["muscle"][:, 1:2, :] / self.muscle.l0_ce
    mvel = self.states["muscle"][:, 2:3, :] / self.muscle.vmax
    prop = th.concatenate([mlen, mvel], dim=-1).squeeze(dim=1)
    return self.apply_noise(prop, self.proprioception_noise)

  def get_vision(self):
    """
    Returns a `(batch_size, n_features)` `tensor` containing the instantaneous (non-delayed) visual 
    feedback. By default, this is the cartesian position of the end-point effector, that is, the fingertip.
    `.i.i.d.` Gaussian noise is added to each element in the `tensor`, using the
    :attribute:`vision_noise` attribute.
    """
    vis = self.states["fingertip"]
    return self.apply_noise(vis, self.vision_noise)

  def get_obs(self, action=None, deterministic: bool = False):
    """
    Returns a `(batch_size, n_features)` `tensor` containing the (potientially time-delayed) observations.
    By default, this is the task goal, followed by the output of the :meth:`get_proprioception()` method, 
    the output of the :meth:`get_vision()` method, and finally the last :attr:`action_frame_stacking` action sets,
    if a non-zero `action_frame_stacking` keyword argument was passed at initialization of this class instance.
    `.i.i.d.` Gaussian noise is added to each element in the `tensor`,
    using the :attribute:`obs_noise` attribute.
    """
    self.update_obs_buffer(action=action)

    obs_as_list = [
      self.goal,
      self.obs_buffer["vision"][0],  # oldest element
      self.obs_buffer["proprioception"][0],   # oldest element
      ]
    
    obs = th.cat(obs_as_list, dim=-1)

    if deterministic is False:
      obs = self.apply_noise(obs, noise=self.obs_noise)

    return obs


## III. 5. Putting it all together

Bringing all the above together, this should look like the below.



In [9]:

class MyCustomEnv(mn.environment.Environment):
  """A reach to a random target from a random starting position."""

  def __init__(self, *args, **kwargs):
    # pass everything as-is to the parent Environment class
    super().__init__(*args, **kwargs)
    self.__name__ = "my_custom_env"

  def reset(self, batch_size: int = 1, joint_state=None, deterministic: bool = False, seed: int | None = None):
    self._set_generator(seed)  # seed the environment and the effector

    self.effector.reset(batch_size, joint_state)
  
    goal = self.joint2cartesian(self.effector.draw_random_uniform_states(batch_size)).chunk(2, dim=-1)[0]
    self.goal = goal if self.differentiable else self.detach(goal)
    self.elapsed = 0.
    action = th.zeros((batch_size, self.muscle.n_muscles)).to(self.device)
  
    self.obs_buffer["proprioception"] = [self.get_proprioception()] * len(self.obs_buffer["proprioception"])
    self.obs_buffer["vision"] = [self.get_vision()] * len(self.obs_buffer["vision"])
    self.obs_buffer["action"] = [action] * self.action_frame_stacking

    obs = self.get_obs(deterministic=deterministic)
    info = {
      "states": self.states,
      "action": action,
      "noisy action": action,  # no noise here so it is the same
      "goal": self.goal,
      }
    return obs, info

  def step(self, action, deterministic: bool = False):
    self.elapsed += self.dt

    if deterministic is False:
      noisy_action = self.apply_noise(action, noise=self.action_noise)
    else:
      noisy_action = action
    
    self.effector.step(noisy_action)
    self.goal = self.goal.clone()

    obs = self.get_obs(action=noisy_action)
    reward = None
    truncated = False
    terminated = bool(self.elapsed >= self.max_ep_duration)
    info = {
      "states": self.states,
      "action": action,
      "noisy action": noisy_action,
      "goal": self.goal,
      }
    return obs, reward, terminated, truncated, info

  def get_proprioception(self):
    mlen = self.states["muscle"][:, 1:2, :] / self.muscle.l0_ce
    mvel = self.states["muscle"][:, 2:3, :] / self.muscle.vmax
    prop = th.concatenate([mlen, mvel], dim=-1).squeeze(dim=1)
    return self.apply_noise(prop, self.proprioception_noise)

  def get_vision(self):
    vis = self.states["fingertip"]
    return self.apply_noise(vis, self.vision_noise)

  def get_obs(self, action=None, deterministic: bool = False):
    self.update_obs_buffer(action=action)

    obs_as_list = [
      self.goal,
      self.obs_buffer["vision"][0],  # oldest element
      self.obs_buffer["proprioception"][0],   # oldest element
      ]
    obs = th.cat(obs_as_list, dim=-1)

    if deterministic is False:
      obs = self.apply_noise(obs, noise=self.obs_noise)
    return obs


env = MyCustomEnv(effector=mn.effector.ReluPointMass24())
print("Task subclass built.\n")

obs, info = env.reset()


print("obs shape:          ", obs.shape, end="\n\n\n")

for key, val in info.items():
  if type(val) is dict:
    print(key + ": ")
    for k, v in val.items():
      print("\t\t\t" + k + " shape:" + " " * (10-len(k)), v.shape)
  else:
    print(key + " shape:" + " " * (13-len(key)), val.shape)



Task subclass built.

obs shape:           torch.Size([1, 12])


states: 
			joint shape:      torch.Size([1, 4])
			cartesian shape:  torch.Size([1, 4])
			muscle shape:     torch.Size([1, 4, 4])
			geometry shape:   torch.Size([1, 4, 4])
			fingertip shape:  torch.Size([1, 2])
action shape:        torch.Size([1, 4])
noisy action shape:  torch.Size([1, 4])
goal shape:          torch.Size([1, 2])


In [10]:
obs, reward, terminated, truncated, info = env.step(action=th.zeros(1, env.n_muscles))

print("obs shape:          ", obs.shape)
print("reward:             ", reward)
print("terminated:         ", terminated)
print("truncated:          ", truncated, end="\n\n")

print("info:")
for key, val in info.items():
  if type(val) is dict:
    print("\t" + key + ": ")
    for k, v in val.items():
      print("\t\t" + k + " shape:" + " " * (10-len(k)), v.shape)
  else:
    print("\t" + key + " shape:" + " " * (13-len(key)), val.shape)



obs shape:           torch.Size([1, 12])
reward:              None
terminated:          False
truncated:           False

info:
	states: 
		joint shape:      torch.Size([1, 4])
		cartesian shape:  torch.Size([1, 4])
		muscle shape:     torch.Size([1, 4, 4])
		geometry shape:   torch.Size([1, 4, 4])
		fingertip shape:  torch.Size([1, 2])
	action shape:        torch.Size([1, 4])
	noisy action shape:  torch.Size([1, 4])
	goal shape:          torch.Size([1, 2])
