# Robot Recording and Replay

This notebook demonstrates how to record teleoperated episodes with a robot arm and then replay them.

In [4]:
import time
import torch
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.utils import hw_to_dataset_features
from lerobot.robots.so_follower import SO101Follower, SO101FollowerConfig
from lerobot.teleoperators.so_leader import SO101LeaderConfig, SO101Leader
from lerobot.utils.control_utils import init_keyboard_listener
from lerobot.utils.utils import log_say
from lerobot.utils.visualization_utils import init_rerun
from lerobot.scripts.lerobot_record import record_loop
from lerobot.processor import make_default_processors
from lerobot.utils.robot_utils import precise_sleep

In [5]:
NUM_EPISODES = 5
FPS = 30
EPISODE_TIME_SEC = 60
RESET_TIME_SEC = 10
TASK_DESCRIPTION = "My task description"

# Create robot configuration
robot_config = SO101FollowerConfig(
    id="my_so101_follower_1",
    cameras={
        "front_1": OpenCVCameraConfig(index_or_path="/dev/follower_cam_1", width=640, height=480, fps=FPS, fourcc="MJPG"),
        "top_1": OpenCVCameraConfig(index_or_path="/dev/top_cam_1", width=640, height=480, fps=FPS, fourcc="MJPG"),
        "top_2": OpenCVCameraConfig(index_or_path="/dev/top_cam_2", width=640, height=480, fps=FPS, fourcc="MJPG"),
    },
    port="/dev/follower_arm_1",
)

teleop_config = SO101LeaderConfig(
    id="my_so101_leader_1",
    port="/dev/leader_arm_1",
)

In [8]:
# Initialize the robot and teleoperator
robot = SO101Follower(robot_config)
teleop = SO101Leader(teleop_config)

# Configure the dataset features
action_features = hw_to_dataset_features(robot.action_features, "action")
obs_features = hw_to_dataset_features(robot.observation_features, "observation")
dataset_features = {**action_features, **obs_features}

# Create the dataset
DATASET_REPO_ID = f"lerobot/record_replay_test_{int(time.time())}"
log_say(f"Creating dataset at: {DATASET_REPO_ID}")

dataset = LeRobotDataset.create(
    repo_id=DATASET_REPO_ID,
    fps=FPS,
    features=dataset_features,
    robot_type=robot.name,
    use_videos=True,
    image_writer_threads=4,
)

# Initialize the keyboard listener and rerun visualization
try:
    listener, events = init_keyboard_listener()
except Exception as e:
    print(f"Warning: Could not initialize keyboard listener: {e}")
    events = {}

init_rerun(session_name="recording")

# Connect the robot and teleoperator
robot.connect()
teleop.connect()

# Create the required processors
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()

RuntimeError: gRPC connection to rerun+http://127.0.0.1:9876/proxy gracefully disconnected

In [None]:
episode_idx = 0
while episode_idx < NUM_EPISODES:
    log_say(f"Recording episode {episode_idx + 1} of {NUM_EPISODES}")
    
    record_loop(
        robot=robot,
        events=events,
        fps=FPS,
        teleop_action_processor=teleop_action_processor,
        robot_action_processor=robot_action_processor,
        robot_observation_processor=robot_observation_processor,
        teleop=teleop,
        dataset=dataset,
        control_time_s=EPISODE_TIME_SEC,
        single_task=TASK_DESCRIPTION,
        display_data=True,
    )
    
    dataset.save_episode(episode_index=episode_idx)
    episode_idx += 1
    
    if episode_idx < NUM_EPISODES:
        log_say(f"Resetting for {RESET_TIME_SEC} seconds...")
        time.sleep(RESET_TIME_SEC)

# Finalize the dataset to ensure everything is saved
dataset.finalize()

In [None]:
# Replay logic
log_say("Starting Replay...")

# Reload the dataset if needed, or use the existing object if it has the data loaded
# The dataset.hf_dataset should be populated after finalize/save if using lazy loading correctly
# For simplicity, we assume we can read back from it immediately via the existing object

for episode_idx in range(NUM_EPISODES):
    # Retrieve actions for the episode
    # Filter for the specific episode index
    episode_data = dataset.hf_dataset.filter(lambda x: x["episode_index"] == episode_idx)
    actions = episode_data["action"] # This will be a list of action vectors
    
    log_say(f"Replaying episode {episode_idx}")
    num_frames = len(actions)
    
    for idx in range(num_frames):
        t0 = time.perf_counter()

        action_tensor = actions[idx]
        # Construct the action dictionary using feature names
        action_dict = {
            name: float(action_tensor[i]) for i, name in enumerate(dataset.features["action"]["names"])
        }
        
        robot.send_action(action_dict)

        precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0))

log_say("Replay finished.")
robot.disconnect()
teleop.disconnect()