# DATASET

In [57]:
import numpy as np
import tensorflow_datasets as tfds
import tqdm
from PIL import Image
from IPython import display

import mujoco,sys
import numpy as np
import matplotlib.pyplot as plt

sys.path.append('../package/')
sys.path.append('../package/helper/')
sys.path.append('../package/mujoco_usage/')

from mujoco_parser import *
from transformation import *
from slider import *
from utility import *
from utils import *

%config InlineBackend.figure_format = 'retina'
%matplotlib inline
print ("MuJoCo:[%s]"%(mujoco.__version__))

MuJoCo:[3.1.6]


In [58]:
dataset = DATASETS[34] # berkeley_rpt_converted_externally_to_rlds
display_key = 'hand_image'

ds, iterator = get_dataset(dataset, display_key=display_key)

FeaturesDict({
    'episode_metadata': FeaturesDict({
        'file_path': Text(shape=(), dtype=string),
    }),
    'steps': Dataset({
        'action': Tensor(shape=(8,), dtype=float32),
        'discount': Scalar(shape=(), dtype=float32),
        'is_first': bool,
        'is_last': bool,
        'is_terminal': bool,
        'language_embedding': Tensor(shape=(512,), dtype=float32),
        'language_instruction': Text(shape=(), dtype=string),
        'observation': FeaturesDict({
            'gripper': Scalar(shape=(), dtype=bool),
            'hand_image': Image(shape=(480, 640, 3), dtype=uint8),
            'joint_pos': Tensor(shape=(7,), dtype=float32),
        }),
        'reward': Scalar(shape=(), dtype=float32),
    }),
})


In [59]:
episode = next(iterator)
images = get_image_from_episode(episode, display_key=display_key)

import os
from PIL import Image
import numpy as np
os.environ["IMAGEIO_FFMPEG_EXE"] = "/opt/homebrew/bin/ffmpeg"

from moviepy import *

frames = [np.array(img.convert('RGB')) for img in images]
clip = ImageSequenceClip(frames, fps=3)
clip.write_videofile("output.mp4", codec='libx264', audio=False)

MoviePy - Building video output.mp4.
MoviePy - Writing video output.mp4



                                                                         

MoviePy - Done !
MoviePy - video ready output.mp4


In [60]:
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt

def arr2image(array):
    """
    Displays a NumPy array (h, w, c) as an image.
    """
    # Check the array shape and type
    if len(array.shape) != 3 or array.shape[2] not in [3, 4]:
        raise ValueError("Input array must have shape (h, w, c) with c=3 (RGB) or c=4 (RGBA).")
    
    # Display the image using matplotlib
    plt.imshow(array)
    plt.axis('off')  # Turn off axis
    plt.show()

# EPISODE

In [62]:
iter_steps = iter(episode['steps'])
joint_states = [base['observation']['joint_pos'] for base in iter_steps]

iter_steps = iter(episode['steps'])
actions = [base['action'] for base in iter_steps]

2025-01-29 21:10:20.492180: I tensorflow/core/framework/local_rendezvous.cc:405] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


# ENV

In [63]:
xml_path = '../mujoco_menagerie/franka_fr3/scene.xml'
env = MuJoCoParserClass(name='Tabletop',rel_xml_path=xml_path,verbose=False)

# CONTROL

In [65]:
sliders = MultiSliderClass( # Slider for EE control
    n_slider      = 7,
    title         = 'Sliders for [%s] Control'%(env.name),
    window_width  = 450,
    window_height = 300,
    x_offset      = 0,
    y_offset      = 100,
    slider_width  = 300,
    label_texts   = ['X','Y','Z','Roll-deg','Pitch-deg','Yaw-deg','Gripper'],
    slider_mins   = [-1,-1,0,-180,-180,-180,0],
    slider_maxs   = [+1,+1,1.2,+180,+180,+180,2],
    slider_vals   = [0,0,0,0,0,0,0],
    resolutions   = [0.02,0.02,0.02,3.6,3.6,3.6,0.04], # range/50
    verbose       = False,
)

joint_names = env.joint_names[:-2] # Except fingers

# Don't Know Why.....
q0 = np.zeros_like(joint_names, dtype=float)


# q0 = np.zeros(len(joint_names), dtype=float)
p0 = env.get_p_body(body_name='base_link')+np.array([0.5,0.0,0.45])
R0 = rpy_deg2r([0,0,0])

env.init_viewer(
    title       = 'Tabletop',
    transparent = False,
    azimuth     = 133,
    distance    = 3.5,
    elevation   = -42.4,
    lookat      = (-0.06,0.07,0.31),
)
env.reset() # reset
env.forward(q=q0,joint_names=joint_names) # initial position

# Loop
q_ik_init = q0.copy()
while env.is_viewer_alive():
    
    # Update
    sliders.update() # update slider
    xyzrpyg = sliders.get_slider_values()
    qpos,ik_err_stack,ik_info = solve_ik(
        env                = env,
        joint_names_for_ik = joint_names,
        body_name_trgt     = 'tcp_link',
        q_init             = q_ik_init,
        p_trgt             = xyzrpyg[:3]+p0,
        R_trgt             = rpy_deg2r(xyzrpyg[3:6])@R0,
        max_ik_tick        = 500,
        ik_stepsize        = 1.0,
        ik_eps             = 1e-2,
        ik_th              = np.radians(5.0),
        render             = False,
        verbose_warning    = False,
    )
    ik_err = np.abs(ik_err_stack).max() # IK error
    if ik_err < 1e-2: q_ik_init = qpos.copy()
    else: q_ik_init = q0.copy()
    
    env.step( # dynamic update
        ctrl        = np.append(np.append(qpos,xyzrpyg[6]),xyzrpyg[6]),
        joint_names = joint_names+['joint_finger_right','joint_finger_left'])
    
    # Render 
    if env.loop_every(tick_every=10):
        env.plot_T(
            T=env.get_T_body(body_name='base_link'),
            axis_len=0.5,print_xyz=False)
        env.plot_text(
            p=env.get_p_body(body_name='base_link')+np.array([0,0,0.5]),
            label = 'time:[%.2f]sec ik_err:[%.3f]'%(env.get_sim_time(),ik_err))
        env.plot_body_T(body_name='tcp_link',axis_len=0.1,axis_width=0.005)
        env.plot_contact_info(
            r_arrow=0.005,h_arrow=0.1,rgba_contact=(1,0,0,0.5),plot_sphere=False)
        plot_ik_info(env=env,ik_info=ik_info)
        
        env.render()

# Close
env.close_viewer()
sliders.close()
print ("Done.")

Done.


# Trajectory

### 1) genration

In [47]:
joint_names = env.joint_names

q0 = np.array(joint_states[0])

q_traj = []

q_ik_init = q0.copy()
for ik_target in states:

    qpos,ik_err_stack,ik_info = solve_ik(
        env                = env,
        joint_names_for_ik = joint_names,
        body_name_trgt     = 'tcp_link',
        q_init             = q_ik_init,
        p_trgt             = ik_target[:3],
        R_trgt             = quat2r(ik_target[3:]),
        max_ik_tick        = 500,
        ik_stepsize        = 1.0,
        ik_eps             = 1e-2,
        ik_th              = np.radians(5.0),
        render             = False,
        verbose_warning    = False,
    )

    ik_err = np.abs(ik_err_stack).max() # IK error
    if ik_err < 1e-2: q_ik_init = qpos.copy()
    else: q_ik_init = q0.copy()
    
    # print(ik_err, ik_target[:3], rot_target)
    q_traj.append(qpos)

### 2) Comparing

In [13]:
# G -> B -> R

idx = 0
assert idx < len(frames)

sliders = MultiSliderClass( # Slider for EE control
    n_slider      = 7,
    title         = 'Sliders for [%s] Control'%(env.name),
    window_width  = 450,
    window_height = 300,
    x_offset      = 0,
    y_offset      = 100,
    slider_width  = 300,
    label_texts   = ['X','Y','Z','Roll-deg','Pitch-deg','Yaw-deg','Gripper'],
    slider_mins   = [-1,-1,0,-180,-180,-180,0],
    slider_maxs   = [+1,+1,1.2,+180,+180,+180,2],
    slider_vals   = [0,0,0,0,0,0,0],
    resolutions   = [0.02,0.02,0.02,3.6,3.6,3.6,0.04], # range/50
    verbose       = False,
)

joint_names = ['joint1', 'joint2', 'joint3', 'joint4', 'joint5', 'joint6', 'joint7']

env.reset(step=True)

q0 = q_traj[idx]

p0 = env.get_p_body(body_name='tcp_link')
R0 = env.get_R_body(body_name='tcp_link')

env.init_viewer()

# Loop
q_ik_init = q0.copy()
while env.is_viewer_alive():
    
    # Update
    sliders.update() # update slider
    xyzrpyg = sliders.get_slider_values()
    qpos,ik_err_stack,ik_info = solve_ik(
        env                = env,
        joint_names_for_ik = joint_names,
        body_name_trgt     = 'tcp_link',
        q_init             = q_ik_init,
        p_trgt             = xyzrpyg[:3]+p0,
        R_trgt             = rpy_deg2r(xyzrpyg[3:6])@R0,
        max_ik_tick        = 500,
        ik_stepsize        = 1.0,
        ik_eps             = 1e-2,
        ik_th              = np.radians(5.0),
        render             = False,
        verbose_warning    = False,
    )
    
    ik_err = np.abs(ik_err_stack).max() # IK error
    if ik_err < 1e-2: q_ik_init = qpos.copy()
    else: q_ik_init = q0.copy()
    
    env.step( # dynamic update
        ctrl        = qpos,
        joint_names = joint_names
    )
    
    # Render 
    if env.loop_every(tick_every=10):
        env.plot_T(
            T=env.get_T_body(body_name='link0'),
            axis_len=0.5,print_xyz=False)
        env.plot_text(
            p=env.get_p_body(body_name='link0')+np.array([0,0,0.5]),
            label = 'time:[%.2f]sec ik_err:[%.3f]'%(env.get_sim_time(),ik_err))
        env.plot_body_T(body_name='tcp_link',axis_len=0.1,axis_width=0.005)
        env.plot_contact_info(
            r_arrow=0.005,h_arrow=0.1,rgba_contact=(1,0,0,0.5),plot_sphere=False)
        plot_ik_info(env=env,ik_info=ik_info)
        
        env.render()

# Close
env.close_viewer()
sliders.close()
print("Done.")  

Done.


# Trajectory Reappearance

In [40]:
joint_names = env.joint_names
q0 = np.array(joint_states[0])

env.init_viewer()

env.reset() # reset
env.forward(q=q0,joint_names=joint_names) # initial position
    
# Loop
q_ik_init = q0.copy()
qpos = q_traj[0]
idx = 0

# append two gripper states to qpos
# gripper = gripper_states[0]
# qpos = np.append(qpos, [gripper, gripper])

while env.is_viewer_alive():
    env.forward(q=qpos,joint_names=joint_names)

    # Render 
    if env.loop_every(HZ=1):
        if idx < len(q_traj)-1:
            idx += 1
        else: idx = 0

        qpos = q_traj[idx]
        # gripper = gripper_states[idx]
        # qpos = np.append(qpos, [gripper, gripper])

        env.plot_T(
            T=env.get_T_body(body_name='base'),
            axis_len=0.5,print_xyz=False)
        env.plot_text(
            p=env.get_p_body(body_name='base')+np.array([0,0,0.5]),
            label = 'tick:[%d]time:[%.2f]'%(idx, env.get_sim_time()))
        env.plot_body_T(body_name='tcp_link',axis_len=0.1,axis_width=0.005)
        
        env.render()

# Close
env.close_viewer()
print ("Done.")

Done.


In [64]:
joint_names = env.joint_names
q0 = np.array(joint_states[0])

env.init_viewer()

env.reset() # reset
env.forward(q=q0,joint_names=joint_names) # initial position
    
# Loop
q_ik_init = q0.copy()
qpos = joint_states[0]
idx = 0

while env.is_viewer_alive():
    env.forward(q=qpos,joint_names=joint_names)
    # Render 

    if env.loop_every(HZ=1):
        if idx < len(joint_states)-1:
            idx += 1
        else: idx = 0

        qpos = joint_states[idx]  # Use first 7 joints from joint_states

        env.plot_T(
            T=env.get_T_body(body_name='base'),
            axis_len=0.5,print_xyz=False)
        env.plot_text(
            p=env.get_p_body(body_name='base')+np.array([0,0,0.5]),
            label = 'tick:[%d]time:[%.2f]'%(idx, env.get_sim_time()))
        env.plot_body_T(body_name='tcp_link',axis_len=0.1,axis_width=0.005)
        
        env.render()

# Close
env.close_viewer()
print ("Done.")

# [ 0.46062913 -0.0419206   0.58133918] (calculated by joint_states) -> [0.46408984 -0.04164245  0.38942292] (EEF)
# [ 0.46334388 -0.04195479  0.57160527] (calculated by joint_states) -> [0.4666901  -0.04140665  0.37968513] (EEF)

Done.


# Reverse Engineering

In [14]:
# current link gripper -> tcp_link rpy 0,0,0
# target link gripper -> target rpy

target = base_tool_poses[idx-1][3:]
target = quat2r(target)

base = env.get_R_body(body_name='link_gripper')
current = env.get_R_body(body_name='tcp_link')

R = current.T@base
rpy = r2rpy(R)
# print(rpy)

R = base.T@target
rpy = r2rpy(R)
# print(rpy)
print(rpy2quat(rpy))


'''
[-5.27306031e-17  9.32701387e-17  1.57079633e+00]
[ 1.48438446e-03  1.39833212e-01 -1.50792732e+00]
[ 0.72716463  0.04836099  0.05041956 -0.68289874]
[ 0.70710678  0.          0.         -0.70710678]


[9.99735615e-01 2.21498574e-02 9.10323765e-04 6.10366593e-03]
'''

NameError: name 'base_tool_poses' is not defined

In [53]:
ranges = env.joint_ranges

invalid_counts = [0] * len(ranges)

for state in joint_states:
    joint_values = state.numpy()
    
    for j, (value, joint_range) in enumerate(zip(joint_values, ranges)):
        min_val, max_val = joint_range
        if not (min_val <= value <= max_val):
            invalid_counts[j] += 1

print("Number of invalid occurrences for each joint:")
for j, count in enumerate(invalid_counts):
    print(f"Joint {j}: {count} invalid states")

Number of invalid occurrences for each joint:
Joint 0: 0 invalid states
Joint 1: 0 invalid states
Joint 2: 0 invalid states
Joint 3: 55 invalid states
Joint 4: 0 invalid states
Joint 5: 0 invalid states
Joint 6: 0 invalid states


In [54]:
num_joints = len(env.joint_ranges)  # Excluding last 2 joints
min_vals = [float('inf')] * num_joints
max_vals = [float('-inf')] * num_joints

for state in joint_states:
    joint_values = state.numpy()
    for j in range(num_joints):
        min_vals[j] = min(min_vals[j], joint_values[j])
        max_vals[j] = max(max_vals[j], joint_values[j])

for j in range(num_joints):
    print(f"Joint {j} range: [{min_vals[j]:.4f}, {max_vals[j]:.4f}]")

Joint 0 range: [-0.0203, 0.1721]
Joint 1 range: [0.0135, 0.3977]
Joint 2 range: [-0.0379, 0.1186]
Joint 3 range: [1.1062, 1.5377]
Joint 4 range: [0.0313, 0.0399]
Joint 5 range: [0.7821, 1.5530]
Joint 6 range: [-0.0225, -0.0063]


In [55]:
env.joint_ranges

array([[-2.7437,  2.7437],
       [-1.7837,  1.7837],
       [-2.9007,  2.9007],
       [-3.0421, -0.1518],
       [-2.8065,  2.8065],
       [ 0.5445,  4.5169],
       [-3.0159,  3.0159]])