# Extension - Gym Environment Interface: minimal example

In [None]:
import robotic as ry
import gymnasium as gym
import numpy as np
print('ry version:', ry.__version__, ry.compiled())

In [None]:
C = ry.Config()
C.addFile(ry.raiPath('scenarios/pandaSingle.g'))
C.view(False)

C.addFrame('box') \
    .setShape(ry.ST.ssBox, size=[.1,.1,.1,.005]) .setColor([1,.5,0]) \
    .setPosition([.06,.35,.7]) \
    .setMass(.1) \
    .setAttribute('friction', 0.01) # friction definition # initial noise with falling box?

C.addFrame('plate', 'l_gripper') \
    .setShape(ry.ST.ssBox, size=[.01,.2,.36,.0025]) .setColor([.5,1,0]) \
    .setRelativePosition([0,0,-.16])

C.addFrame('target') \
    .setShape(ry.ST.marker, size=[.1]) .setColor([0,1,0]) \
    .setPosition([.4,.3,.7]) \

#the following would make it a LOT simpler
C.setJointState([.0], ['l_panda_joint2'])
C.setJointState([.7], ['l_panda_joint7'])

C.setJointState([.007], ['l_panda_finger_joint1']) #only cosmetics

q0 = C.getJointState()
X0 = C.getFrameState()

C.view()

### Task 1:

The following code consists of a minimal Implementation of an environment in which the robot can operate. Currently the policy receives information about the internal state but no further information about the environment state. This is called open-loop Reinforcement Learning.

What do you need to change to make it a closed-loop Reinforcement Learning setting? Apply the changes to the current environment.

In [None]:
class RaiGym(gym.Env):
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}
    tau = .05
    time = 0.

    def __init__(self, C, time_limit, reward_fct, render_mode=None):
        self.C : ry.Config = C
        self.time_limit = time_limit
        self.reward_fct = reward_fct
        self.render_mode = render_mode
        # self.limits = self.C.getJointLimits()
        self.limits = [-10., 10.]
        self.q0 = self.C.getJointState()
        self.X0 = self.C.getFrameState()

        # here they need to configure open loop or closed loop RL, with and without box position
        #self.observation_space = gym.spaces.box.Box(self.limits[0], self.limits[1], shape=(self.q0.size,), dtype=np.float32)
        self.observation_space = gym.spaces.box.Box(self.limits[0], self.limits[1], shape=(self.q0.size + 3,), dtype=np.float32)
        
        #self.action_space = gym.spaces.box.Box(low=-1., high=1., shape=(self.q0.size,), dtype=np.float32)
        self.action_space = gym.spaces.Box(low=np.array([-1, -1, -1]), high=np.array([1, 1, 1]), dtype=np.float32)

        assert render_mode is None or render_mode in self.metadata["render_modes"]
        self.render_mode = render_mode

        self.sim = ry.Simulation(self.C, ry.SimulationEngine.physx, verbose=4)

    def __del__(self):
        del self.sim
        del self.C
    
    # different step functions implemntation, once with direct actions and one with a komo solver endeffector
        
    # def step(self, action):
    #     self.sim.step(action, self.tau, ry.ControlMode.velocity)
    #     self.time += self.tau
        
    #     observation_joint = self.C.getJointState()
    #     observation_box = self.C.getFrame('box').getPosition()
    #     observation = np.concatenate((observation_joint, observation_box), axis=0)
    #     reward = self.reward_fct(C)
    #     truncated = (self.time >= self.time_limit) # terminated and truncated difference is super important
    #     info = {"no": "additional info"}

    #     return observation, reward, False, truncated, info
    
    def step(self, action):
        dx, dy, dtheta = action

        ee = self.C.getFrame("plate")
        pos = ee.getPosition()
        quat = ee.getQuaternion()

        target_pos = pos + np.array([dx, dy, 0.0])
        R_current = ry.Quaternion().set(quat)
        R_delta = ry.Quaternion().setExp([0., 0., dtheta])
        target_quat = (R_delta * R_current).asArr()

        komo = ry.KOMO()
        komo.setConfig(self.C, True)
        komo.setTiming(1, 1, 1., 2)
        komo.addObjective([], ry.FS.position, ['plate'], ry.OT.eq, [1e1], target_pos)
        komo.addObjective([], ry.FS.quaternion, ['plate'], ry.OT.eq, [1e1], target_quat)
        ret = ry.NLP_Solver(komo.nlp(), verbose=4).solve()

        q_target = komo.getPath()[0]
        self.sim.step(q_target - self.C.getJointState(), self.tau, ry.ControlMode.velocity)
        self.time += self.tau

        # self.C.view(False, 'simulating')
          
        observation_joint = self.C.getJointState() #more information
        observation_box = self.C.getFrame('box').getPosition()
        observation = np.concatenate((observation_joint, observation_box), axis=0)
        reward = self.reward_fct(C)
        terminated = False
        truncated = (self.time >= self.time_limit) # terminated and truncated difference is super important
        info = {"no": "additional info"}

        # mistake in former repository
        return observation, reward, terminated, truncated, info

        
    def reset(self, seed=None):
        super().reset(seed=seed)

        self.time = 0.
        self.sim.setState(X0, q0)
        self.sim.resetSplineRef()

        observation_joint = self.C.getJointState()
        observation_box = self.C.getFrame('box').getPosition()
        observation = np.concatenate((observation_joint, observation_box), axis=0)
        info = {"no": "additional info"}

        if self.render_mode == "human":
            self.C.view(False)

        return observation, info
        
    def render(self):
        self.C.view(False, f'RaiGym time {self.time} / {self.time_limit}')
        if self.render_mode == "rgb_array":
            return self.C.view_getRgb()


### Task 3:

Great, the environment is currently set up. You defined the observation space, the action space and already defined the step function. One essential ingredient is missing: The Reward Function. You should pose the question: How do I make sure that the goal of pushing the 'box' with the 'plate' to the 'target' position?

Tipp: C.eval(ry.FS.{members}, [{frames}]) will help you defining the reward. Familiarize yourself with the different members of the class FS.


In [None]:
def reward_function(C):
    touch, _ = C.eval(ry.FS.negDistance, ["plate", "box"])
    dist, _ = C.eval(ry.FS.positionDiff, ["box", "target"])
    r = touch[0] - np.linalg.norm(dist)
    return r

In [None]:
env = RaiGym(C, 10., reward_function)

### Test Functionality of the components you implemented and the physx simulator

In [None]:
env.reset()

v = np.zeros(3)
v[0] = 0.2 # 3 joints, linear velocity - + mit rechte daumen regel
# v = np.zeros(env.q0.size)
# v[0] = -1. # 7 joints, radial velocity - + mit rechte daumen regel
print(v)

t = 0
while True:
    t += 1
    observation, reward, terminated, truncated, info = env.step(v)
    done = terminated or truncated
    if done:
        break
    # print("reward: ", reward)
    # if not (t%5):
    #     env.render()

### Now the actual policy learning happens

We import SAC and PPO from the stable_baselines3 library, define checkpoints and a tensorboard logger. Learning the policy takes some time on a normal CPU, consider a total_timesteps of 200_000 or even more. Observe the training dynamics in the tensorbaord logger.

In [None]:
from stable_baselines3 import SAC, PPO
from stable_baselines3.common.callbacks import CheckpointCallback
import torch
print(torch.__version__)

In [None]:
checkpoint_callback = CheckpointCallback(save_freq=10000, save_path='./checkpoints_endeffectorActions/', name_prefix='rl_model')
model = SAC("MlpPolicy", env, gamma=0.99, learning_rate=1e-4, verbose=1, tensorboard_log="./tensorboard/")

In [None]:
model.learn(total_timesteps=500_000, callback=checkpoint_callback)

### Rollout of the trained policy

Once the policy is trained, you can load it from a certain checkpoint

### Task 4:

Now you want to actually play the policy, therefore you need to implement a simple loop. Make sure that you observe the reward.

In [None]:
model = SAC.load('./checkpoints_endeffectorActions/rl_model_199498_steps.zip', env=env)

In [None]:
obs, info = env.reset()
for t in range(200):
    action, _state = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = env.step(action) # super relevant Achim the legend
    if not (t%10):
        env.render()
        print("reward: ", reward)

### (Bonus) Task 5:

You did a great job! The last task considers again the step function of the environment. Beforehand the step function considered directly the joint space positions actions which were directly fed into the physx simulator. 

But what about reducing the action space to for example the x, y positions and theta orientation of the endeffector ('plate')? Define a new step function in the environment. 

Tipp: You will need to solve a komo problem in the step function to move the endeffector. Consider adjustments in the actionspace and in the functionality test function.

In [None]:
# del model
# del g
# del C