### Imports

In [1]:
import torch
import motornet as mn
from motornet.effector import RigidTendonArm26
from motornet.muscle import MujocoHillMuscle
from motornet.environment import RandomTargetReach
import numpy as np

### The arm26 effector and a MotorNet random-reach environment

In [2]:
# arm26 effector
muscle = MujocoHillMuscle()
arm26 = RigidTendonArm26(muscle=muscle)

# random-reach environment
env = RandomTargetReach(
    effector=arm26,
    name="arm26_random_reach",
    max_ep_duration=5.0,     # 5 s - like Codol
    action_frame_stacking=0,
    proprioception_delay=arm26.dt,
    vision_delay=arm26.dt,
    proprioception_noise=0.0,
    vision_noise=0.0,
    obs_noise=0.0,
    action_noise=0.0,
)

obs, info = env.reset()
print("obs shape:", obs.shape)# obs shape: torch.Size([1, 16])
# 2 (goal) + 2 (fingertip) + 6 (muscle lengths) + 6 (muscle velocities)
print("goal (target):", env.goal)
print("fingertip:", env.states["fingertip"])
print("muscle state shape:", env.states["muscle"].shape)
print("proprioception sample:", env.get_proprioception()[0, :10])

obs shape: torch.Size([1, 16])
goal (target): tensor([[-0.2529,  0.5768]])
fingertip: tensor([[-0.1736,  0.5117]])
muscle state shape: torch.Size([1, 7, 6])
proprioception sample: tensor([0.8958, 1.1298, 1.0077, 0.7976, 0.8120, 0.8185, 0.0000, 0.0000, 0.0000,
        0.0000])


In [3]:
# This cell steps the environment forward using random muscle activations for a few 
# timesteps to confirm that the arm moves and the state updates correctly.

for t in range(10):
    action = torch.rand(1, arm26.n_muscles)  # random actions
    obs, reward, terminated, truncated, info = env.step(action)
    if terminated:
        break

print("final fingertip:", env.states["fingertip"])

# RESULTS :
# the arm responds to the random muscle activations,
# the state updates over time


final fingertip: tensor([[-0.1698,  0.4403]])


In [4]:
class CodolObsWrapper:
    """
    Wraps a MotorNet RandomTargetReach env and produces observations as described in the Codol paper:
    [go, endpoint, start, target, muscle_length(6), muscle_velocity(6)]
    """

    def __init__(self, base_env, go_delay_steps=0):
        self.env = base_env
        self.go_delay_steps = go_delay_steps
        self.t = 0
        self.start_pos = None   # fingertip at t=0

    def reset(self):
        obs, info = self.env.reset()
        self.t = 0
        # store starting fingertip position (batch 1)
        self.start_pos = self.env.states["fingertip"].clone()  # [1,2]
        return self._build_policy_obs()

    def _split_proprio(self):
        """
        MotorNet proprioception = [len(6), vel(6)] normalized, shape [1,12]
        """
        proprio = self.env.get_proprioception()  # [1,12]
        lengths = proprio[:, :6]
        vels    = proprio[:, 6:]
        return lengths, vels

    def _build_policy_obs(self):
        # go cue: 0 until go_delay_steps, then 1
        go_val = 1.0 if self.t >= self.go_delay_steps else 0.0
        go = torch.tensor([[go_val]], dtype=torch.float32)

        endpoint = self.env.states["fingertip"]        # [1,2]
        start    = self.start_pos                      # [1,2]
        target   = self.env.goal                       # [1,2]
        lengths, vels = self._split_proprio()          # [1,6], [1,6]

        policy_obs = torch.cat(
            [go, endpoint, start, target, lengths, vels],
            dim=1
        )  # shape [1, 19]

        return policy_obs

    def step(self, action):
        self.t += 1
        obs, reward, terminated, truncated, info = self.env.step(action)
        policy_obs = self._build_policy_obs()
        done = terminated or truncated
        return policy_obs, reward, done, info


In [5]:
wrapped = CodolObsWrapper(env, go_delay_steps=0)

obs = wrapped.reset()
print("obs shape:", obs.shape)   # torch.Size([1, 19])
# [go, endpoint(2), start(2), target(2), lengths(6), velocities(6)]

rewards = []
for step in range(20):
    action = torch.rand(1, env.effector.n_muscles)
    obs, reward, done, info = wrapped.step(action)
    rewards.append(reward)
    if done:
        break

print("steps run:", len(rewards))
print("last obs example:", obs)

# This cell tests the CodolObsWrapper by running a short rollout with random actions, 
# confirming the wrapped environment steps correctly.


obs shape: torch.Size([1, 19])
steps run: 20
last obs example: tensor([[ 1.0000,  0.0235,  0.1714,  0.0633,  0.3937,  0.2432,  0.5737,  1.1875,
          0.8505,  0.5455,  1.0616,  0.6974,  0.7185,  0.0000,  0.0000,  0.0154,
         -0.0060,  0.0203, -0.0059]])
