# Introduction

Imitation learning is a type of machine learning where an agent learns to imitate the actions of an expert. In order to train a robust agent, we need a large amount of data. However, manual data collection through human demonstrations is time-consuming and expensive.

Isaac Lab Mimic is a feature included in Isaac Lab that allows users to generate new demonstrations by synthesizing trajectories using a small number of human demonstrations. This blueprint will show how to use Isaac Lab Mimic to generate new motion trajectories for a Franka robotic arm and then visually augment using NVIDIA Cosmos to create datasets for imitation learning. The workflow is broken down into two main steps:

1. Generate new demonstrations by synthesizing trajectories using a small number of human demonstrations with Isaac Lab Mimic.
2. Apply diverse visual transformations using NVIDIA Cosmos to the new demonstrations to create a large and diverse dataset.

This notebook will guide you through each step of the workflow.

**NOTE: This notebook must be run on the same machine as the Isaac Sim simulator and a display must be connected to the machine.**


# Understanding the Blueprint

## Motion Trajectory Synthesis
Isaac Lab Mimic is a feature set bundled with Isaac Lab (an open source robotic learning framework designed to help train robot policies). The core idea of Mimic is to allow users to synthetically generate a large number of new robot motion trajectories using only a handful of human demonstrations, thus greatly reducing the amount of time and effort required to collect a dataset for imitation learning. 

Human datasets are annotated with subtask information, which Isaac Lab Mimic uses to construct trajectories for new scene configurations by spatially transforming the original demonstrations.

## Visual Augmentation
Once generated, the new motion trajectories can be visually augmented using NVIDIA Cosmos to create a diverse dataset that is suitable for training an imitation learning policy. 

By using multi-staged data generation scheme, we can automatically create a robust dataset for training complex imitation learning policies without the need for large amounts of manual human data, greatly increasing the amount of data available for training and lowering the amount of time required to collect a dataset.


# Generate a New Motion Trajectory


## Setup Initial Configuration for Isaac Lab

This cell sets up the basic configuration for data generation:

1. **How to Modify**:
   - Adjust `num_envs` based on your GPU capability
   - Set `generation_num_trials` to how many successful trials to execute. Note that some trials may be unsuccessful and so the total number of trials performed may be larger.

2. **Tips**:
   - Start with 1 trial for testing, increase for training. Increasing trails will increase the time it takes to generate the dataset.

In [None]:
from notebook_widgets import create_num_trials_input

num_envs = 1
num_trials = create_num_trials_input()

## Spin up the Simulation

Run this cell to start the simulation environment. This sets up the necessary components for data generation.

**NOTE**: When the simulation is running, a **"Isaac Sim" is not responding."** pop up may appear. This is expected. Please click the **Wait** option and wait while the process completes.

In [None]:
import os
import nest_asyncio

nest_asyncio.apply()

from argparse import ArgumentParser, Namespace
from isaaclab.app import AppLauncher

parser = ArgumentParser()
AppLauncher.add_app_launcher_args(parser)
# 아래 코드는 주피터 노트북에서 parsing을 제대로 하기 위한 코드다
args_cli = parser.parse_args([])
args_cli.enable_cameras = True
args_cli.kit_args = "--enable omni.videoencoding"

config = {
    "task": "Isaac-Stack-Cube-Franka-IK-Rel-Blueprint-Mimic-v0",
    "num_envs": num_envs,
    "generation_num_trials": num_trials.value,
    "input_file": "datasets/annotated_dataset.hdf5",
    "output_file": "datasets/generated_dataset.hdf5",
    "pause_subtask": False,
    "enable": "omni.kit.renderer.capture",
}

# Update the default configuration
args_dict = vars(args_cli)
args_dict.update(config)
args_cli = Namespace(**args_dict)

# Now launch the simulator with the final configuration
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app

import asyncio
import gymnasium as gym
import numpy as np
import random
import torch

import isaaclab_mimic.envs  # noqa: F401
from isaaclab_mimic.datagen.generation import (
    env_loop,
    setup_env_config,
    setup_async_generation,
)
from isaaclab_mimic.datagen.utils import (
    get_env_name_from_dataset,
    setup_output_paths,
    interactive_update_randomizable_params,
    reset_env,
)
from isaaclab.managers import ObservationTermCfg as ObsTerm
from notebook_utils import ISAACLAB_OUTPUT_DIR

import isaaclab_tasks  # noqa: F401

num_envs = args_cli.num_envs

# Setup output paths and get env name
output_dir, output_file_name = setup_output_paths(args_cli.output_file)
env_name = args_cli.task or get_env_name_from_dataset(args_cli.input_file)

# Configure environment
# HJ: Isaac Lab Mimic 데이터 생성을 위한 환경 초기화 
env_cfg, success_term = setup_env_config(
    env_name=env_name,
    output_dir=output_dir,
    output_file_name=output_file_name,
    num_envs=num_envs,
    device=args_cli.device,
    generation_num_trials=args_cli.generation_num_trials,
)
# Set observation output directory
for obs in vars(env_cfg.observations.rgb_camera).values():
    if not isinstance(obs, ObsTerm):
        continue
    obs.params["image_path"] = os.path.join(
        ISAACLAB_OUTPUT_DIR, obs.params["image_path"]
    )
env_cfg.observations


# create environment
env = gym.make(env_name, cfg=env_cfg).unwrapped

# set seed for generation
random.seed(env.cfg.datagen_config.seed)
np.random.seed(env.cfg.datagen_config.seed)
torch.manual_seed(env.cfg.datagen_config.seed)

# reset before starting
reset_env(env, 100)

## Interactive Parameter Updates

To get diversity in the generated motion trajectories, the scene configuration is randomized for each trial. This section provides interactive sliders and controls to adjust various environment parameters in real-time:

1. **What You'll See**:
   - Sliders for numerical values
   - Range inputs for min/max settings
   - Current value displays
   - Parameter names and allowed ranges

2. **How to Use**:
   - Move the sliders to adjust values
   - Watch the environment update in real-time

3. **Available Parameters**:
   - **Franka Joint State Randomization**:
     - **mean (0.0 - 0.5)**: Controls the average joint angle offset (in radians)
     - **std (0.0 - 0.1)**: Controls the spread of randomization around the mean

   - **Cube Position Randomization**:
     - **pose_range.x (0.3 - 0.9)**: Controls cube placement along the x-axis (in meters)
     - **pose_range.y (-0.3 - 0.3)**: Controls cube placement along the y-axis (in meters)
     - **min_separation (0.0 - 0.5)**: Minimum allowed distance between cubes (in meters)
     
     **Note:** If the system cannot place cubes with the specified minimum separation after several attempts (due to space constraints), it will accept the last generated positions even if they don't meet the separation requirement. This prevents the system from getting stuck in an impossible configuration.


4. **Tips**:
   - Start with small adjustments to understand their effects

Note: These adjustments will affect how new demonstrations are generated, so take time to experiment with different settings to achieve desired behavior.

In [None]:
randomizable_params = {
    "randomize_franka_joint_state": {
        "mean": (0.0, 0.5, 0.01),
        "std": (0.0, 0.1, 0.01),
    },
    "randomize_cube_positions": {
        "pose_range": {
            "x": (0.3, 0.9, 0.01),
            "y": (-0.3, 0.3, 0.01),
        },
        "min_separation": (0.0, 0.5, 0.01),
    },
}

for i in range(len(env.unwrapped.event_manager._mode_term_cfgs["reset"])):
    event_term = env.unwrapped.event_manager._mode_term_cfgs["reset"][i]
    name = env.unwrapped.event_manager.active_terms["reset"][i]
    display(f"Updating parameters for event: {event_term.func.__name__}")
    interactive_update_randomizable_params(
        event_term, name, randomizable_params[name], env=env
    )

## Data Generation

Run this cell to start generating demonstrations using the parameters you've configured. The process will:
- Generate the specified number of demonstrations
- Save successful demonstrations to your output file
- Show progress as demonstrations are generated

In [None]:
import sys
from IPython.display import display, Pretty


# Create a new output capture
class OutputCapture:
    def __init__(self):
        self._buffer = ""

    def write(self, text):
        if text.strip():  # Only process non-empty strings
            display(Pretty(text.rstrip()))

    def flush(self):
        if self._buffer:
            display(Pretty(self._buffer))
            self._buffer = ""


# Move stdout redirection before setup_async_generation
old_stdout = sys.stdout
sys.stdout = OutputCapture()

try:
    # Setup and run async data generation
    async_components = setup_async_generation(
        env=env,
        num_envs=args_cli.num_envs,
        input_file=args_cli.input_file,
        success_term=success_term,
        pause_subtask=args_cli.pause_subtask,
    )

    future = asyncio.ensure_future(asyncio.gather(*async_components["tasks"]))
    env_loop(
        env,
        async_components["action_queue"],
        async_components["info_pool"],
        async_components["event_loop"],
    )
except asyncio.CancelledError:
    display(Pretty("Tasks were cancelled."))
except AttributeError as e:
    if "'FrankaCubeStackIKRelMimicEnv' object has no attribute 'scene'" in str(e):
        display(
            Pretty(
                "Environment was closed during execution. This is expected behavior."
            )
        )
except Exception as e:
    display(Pretty(f"Error occurred: {str(e)}"))
finally:
    # Restore original stdout first
    sys.stdout = old_stdout

    # Cancel the future and ignore any AttributeErrors from pending tasks
    if "future" in locals():
        future.cancel()
        try:
            async_components["event_loop"].run_until_complete(future)
        except (asyncio.CancelledError, AttributeError) as e:
            if isinstance(
                e, AttributeError
            ) and "'FrankaCubeStackIKRelMimicEnv' object has no attribute 'scene'" in str(
                e
            ):
                display(
                    Pretty(
                        "Environment was closed during execution. This is expected behavior!"
                    )
                )
            elif isinstance(e, asyncio.CancelledError):
                display(Pretty("Tasks were properly cancelled during cleanup."))
            else:
                display(Pretty(f"Unexpected cleanup error: {str(e)}"))

# Cosmos

Now that a new motion trajectory has been generated, we will apply visual transformations to the data using Cosmos to create a realistic demo that is suitable for training an imitation learning policy.

## Video Preprocessing
In this first step, we will process the generated motion trajectory into a video that can be used as an input for the Cosmos model.
The normals of the scene are used to apply shading to the semantic segmentation which produces an input that works very well with the Cosmos model.

In [None]:
from notebook_widgets import create_camera_input
from notebook_utils import ISAACLAB_OUTPUT_DIR

VIDEO_LENGTH = 120  # Suggested length is between 120 and 200
camera_selection = create_camera_input(ISAACLAB_OUTPUT_DIR)

In [None]:
import os
from IPython.display import Video
from notebook_utils import encode_video, ISAACLAB_OUTPUT_DIR, get_env_trial_frames

env_trial_frames = get_env_trial_frames(ISAACLAB_OUTPUT_DIR, camera_selection.value, 10)
camera = camera_selection.value
for env_num, trial_nums in env_trial_frames.items():
    for trial_num, (start_frame, end_frame) in trial_nums.items():
        trial_length = end_frame - start_frame + 1
        if trial_length < VIDEO_LENGTH:
            print(f"\nSkipping Trial {trial_num}: Too short ({trial_length} frames)")
            continue

        video_start = max(start_frame, end_frame - VIDEO_LENGTH + 1)

        # Generate video filename with trial number
        video_filepath = os.path.join(
            ISAACLAB_OUTPUT_DIR,
            f"shaded_segmentation_{camera}_trial_{trial_num}_tile_{env_num}.mp4",
        )

        try:
            encode_video(
                ISAACLAB_OUTPUT_DIR,
                video_start,
                VIDEO_LENGTH,
                camera,
                video_filepath,
                env_num,
                trial_num,
            )
            display(video_filepath)
            display(Video(video_filepath, width=1000))
        except ValueError as e:
            print(f"Error processing trial {trial_num}: {str(e)}")

## Deploying Cosmos
Deploy Cosmos on your provider of choice, or to your own local resources: [Cosmos Transfer1](https://huggingface.co/nvidia/Cosmos-Transfer1-7B). 
Click on the `Code` link on the Cosmos Transfer page and follow the installation steps outlined in the README. You can find detailed setup instructions in under `examples/inference_multi_control_manual_input.md`.

> ### Adding a Web API to Cosmos Transfer1
> To simplify testing, copy the file `notebook/app.py` into the Cosmos root directory, and run it with `python app.py`. This will expose endpoints which we'll use to communicate between the notebook and the cosmos model. The script exposes endpoints at port `5000` by default.

In [None]:
import ipywidgets as widgets

url_widget = widgets.Text(
    value="",
    placeholder="cosmos/url:port",
    description="Cosmos URL:",
    style={"description_width": "initial"},
    layout={"width": "1000px"},
)
display(url_widget)

### Using the Cosmos Model

The Cosmos model has several available parameters which alter the output in various ways:
- `prompt`: Text prompt for the video generation.
- `seed`: Seed for the random number generator. `int [0 - 2147483648]`
- `control_weight`: Controls how strongly the control input should affect the output. The stronger the effect, the more adherance to the control input, but the less the model generation freedom. `float [0 - 1.0]`
- `sigma_max`: A float value representing the maximum sigma. Lower values result in less change from the original input while a larger values allows for more change but may diverge more from the input scene. `float [0 - 80.0]`

In [None]:
from notebook_widgets import create_variable_dropdowns, create_cosmos_params
from notebook_utils import ISAACLAB_OUTPUT_DIR, COSMOS_OUTPUT_DIR

prompt_manager = create_variable_dropdowns("stacking_prompt.toml")
cosmos_params = create_cosmos_params(ISAACLAB_OUTPUT_DIR)

## Generate with Cosmos
---
> **NOTE:** Generation generally takes around 5 to 10 minutes on a single H100 GPU depending on the video length.

---

> **Tips:**
> - To increase prompt adherence, try increasing the `Sigma Max` value
> - To reduce divergence from the input scene, try increasing the `Control Weight` and/or increasing `Canny Strength`

In [None]:
import os
from cosmos_request import process_video
from notebook_utils import ISAACLAB_OUTPUT_DIR
from notebook_widgets import create_download_link
from IPython.display import Video, clear_output

params = {k: w.value for k, w in cosmos_params.items()}
video_filepath = os.path.join(ISAACLAB_OUTPUT_DIR, params.pop("input_video"))
output_path = f"{COSMOS_OUTPUT_DIR}/cosmos_{params['seed']}.mp4"
params["prompt"] = prompt_manager.prompt

if not url_widget.value:
    raise ValueError("Enter URL to proceed.")

response = process_video(
    url=url_widget.value,
    video_path=video_filepath,
    output_path=output_path,
    **params,
)
if response is None:
    display("An error occurred processing the request")
elif response.status_code == 200:
    clear_output(wait=True)
    display(Video(output_path))
    display(
        create_download_link(output_path, link_text=f"Download Video: {output_path}")
    )