In [1]:
pip install panda_gym av

Collecting panda_gym
  Downloading panda_gym-3.0.7-py3-none-any.whl.metadata (4.3 kB)
Collecting av
  Downloading av-14.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.5 kB)
Collecting pybullet (from panda_gym)
  Downloading pybullet-3.2.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.8 kB)
Downloading panda_gym-3.0.7-py3-none-any.whl (23 kB)
Downloading av-14.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (33.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.0/33.0 MB[0m [31m46.9 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hDownloading pybullet-3.2.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (103.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m103.2/103.2 MB[0m [31m15.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: pybullet, av, panda_gym
Successfully installed av-14.0.1 panda_gym-3.0.7 pybullet-3.2.6
Note: y

In [2]:
import numpy as np
import os
from collections import deque

import gymnasium as gym
import panda_gym

import av  

In [3]:
camera_params = dict(
     render_target_position=[-0.3, 0.0, 0.0],
     render_distance=1.0,
     render_yaw=0.0,
     render_pitch=-50,
     render_roll=0,
)
env = gym.make("PandaPickAndPlace-v3", render_mode="rgb_array", **camera_params)

pybullet build time: Nov 28 2023 23:45:17


In [4]:
def save_video(numpy_images, output_filename="output_video.mp4", fps=10):
    container = av.open(output_filename, format='mp4', mode='w')
    height, width, _ = numpy_images[0].shape
    stream = container.add_stream('h264', rate=fps)
    stream.height = height
    stream.width = width
    stream.pix_fmt = 'yuv420p'
    
    for img in numpy_images:
        frame = av.VideoFrame.from_ndarray(img, format='rgb24')
        for packet in stream.encode(frame):
            container.mux(packet)
            
    for packet in stream.encode(None):
        container.mux(packet)
    container.close()

def get_movement_action(current_endeffector_position, target_position, gripper_state, velocity=0.1):
    direction = target_position - current_endeffector_position
    direction_norm = np.linalg.norm(direction)
    
    if direction_norm < velocity:
        displacement = direction
    else:
        normalized_direction = direction / direction_norm
        displacement = velocity * normalized_direction
    
    return np.concatenate([displacement, np.array([gripper_state])])
    
def collect_one_trajectory(env, velocity=0.1):
    images = []    
    actions = []

    observation, info = env.reset()
    initial_observation = observation['observation']
    achieved_goal = observation['achieved_goal']
    desired_goal = observation['desired_goal']
    
    target_position_above_cube = achieved_goal + np.array([0, 0, 0.1])
    target_position_grasp = achieved_goal
    target_position_lift = achieved_goal + np.array([0, 0, 0.1])
    target_position_above_place = desired_goal + np.array([0, 0, 0.1])
    target_position_place = desired_goal
    
    sequence = [
        {'position': target_position_above_cube, 'gripper': 0.5},     # Above the cube
        {'position': target_position_grasp, 'gripper': 0.0},          # Near the cube
        {'position': target_position_grasp, 'gripper': -0.5},         # End-effector closure
        {'position': target_position_lift, 'gripper': -0.5},          # Lift with cube
        {'position': target_position_above_place, 'gripper': -0.5},   # Above desired position
        {'position': target_position_place, 'gripper': -0.5},         # Desired position
    ]
    
    current_endeffector_position = initial_observation[:3]
    for step_idx, step in enumerate(sequence):
        target_position = step['position']
        gripper_state = step['gripper']
        values = deque(maxlen=10)
        values.append(np.linalg.norm(current_endeffector_position - target_position))
      
        while True:
            image = env.render()  
            images.append(image) 
            
            action = get_movement_action(current_endeffector_position, target_position, velocity=velocity, gripper_state=gripper_state)
            actions.append(action)   
            
            observation, reward, terminated, truncated, info = env.step(action)
            current_endeffector_position = observation['observation'][:3]
            distance = np.linalg.norm(current_endeffector_position - target_position)
            values.append(distance)
    
            if len(values) == values.maxlen:
                if np.max(np.abs(np.array(values) - np.mean(values))) < 0.001:
            
                    break
            
            assert len(images) == len(actions), f"Problem: {len(images)} != {len(actions)}"
    
    return images, actions

def save_batch(batch_images, batch_actions, filename):
    images_array = np.array(batch_images)
    actions_array = np.array(batch_actions)
    np.savez_compressed(filename, images=images_array, actions=actions_array)

def collect_batches(env, num_batches=125, batch_size=32, trajectory_dir='./trajectories/'):
    os.makedirs(trajectory_dir, exist_ok=True)
    all_images = []
    all_actions = []

    test_images, test_actions = collect_one_trajectory(env)

    video_filename = os.path.join(trajectory_dir, 'test_trajectory.mp4')
    save_video(test_images, output_filename=video_filename, fps=10)
    print(f"test trajectory: {video_filename}")

    for batch_num in range(num_batches):
        batch_images = []
        batch_actions = []

        for trajectory_num in range(batch_size):
            images, actions = collect_one_trajectory(env)
            if len(images) < 2: 
                continue
            batch_images.extend(images)
            batch_actions.extend(actions)

        batch_filename = os.path.join(trajectory_dir, f'batch_{batch_num}.npz')
        save_batch(batch_images, batch_actions, batch_filename)

        print(f"Saved batch {batch_num + 1}.")

In [None]:
collect_batches(env, num_batches=125, batch_size=32, trajectory_dir='/kaggle/working/trajectories/')

test trajectory: /kaggle/working/trajectories/test_trajectory.mp4
