# OpenVLA × LIBERO robosuite demo (notebook)

This notebook mirrors the `openvla_libero_runner.py` script but runs inside a cell-based workflow. It loads a LIBERO robosuite task, connects an OpenVLA checkpoint, and streams both an on-screen simulator window and off-screen camera frames for the policy. Make sure your environment is set up with the dependencies listed in `VLAs/readme.md` before running the cells.


In [None]:

# Install-time environment notes
# - robosuite with mujoco (with GUI support) and libero must be installed
# - the `openvla` package and chosen checkpoint should be available
# - this notebook expects GPU access by default but can be switched to CPU


In [None]:

from __future__ import annotations

import importlib.util
from dataclasses import dataclass
from typing import Dict

import numpy as np
import torch
import robosuite as suite
from robosuite.wrappers import GymWrapper


## Configuration
Adjust these values to point at your preferred LIBERO benchmark, task index, and OpenVLA checkpoint. The `camera_name` controls both the on-screen viewer perspective and the off-screen images fed to the policy. Set `device` to `"cpu"` if you do not have a GPU available.


In [None]:

benchmark_name = "libero_spatial"  # e.g., libero_spatial, libero_object
benchmark_task_index = 0
model_id = "openvla-openai/7b"  # model identifier or local checkpoint directory
camera_name = "agentview"
horizon = 200
device = "cuda" if torch.cuda.is_available() else "cpu"
seed = 1


In [None]:

@dataclass
class LiberoTask:
    """Container for a single LIBERO task description."""

    name: str
    language_instruction: str
    env_kwargs: Dict


class LiberoBenchmark:
    """Light-weight adapter for LIBERO benchmarks.

    The official ``libero`` package exposes benchmarks as Python dictionaries.
    To avoid hard-coding every variant, we request the benchmark dynamically so
    the notebook works for both spatial/object variants.
    """

    def __init__(self, benchmark_name: str):
        if importlib.util.find_spec("libero") is None:
            raise ImportError(
                "The libero package is required for benchmark lookup. Install it with `pip install libero` "
                "and ensure all robosuite extras are available."
            )

        from libero.benchmark import get_benchmark  # type: ignore

        benchmark = get_benchmark(benchmark_name)
        if benchmark is None:
            raise ValueError(f"Unknown LIBERO benchmark '{benchmark_name}'")

        self.tasks = [
            LiberoTask(
                name=task.name,
                language_instruction=task.language,
                env_kwargs=task.kwargs,
            )
            for task in benchmark.tasks
        ]

    def __len__(self) -> int:
        return len(self.tasks)

    def __getitem__(self, index: int) -> LiberoTask:
        return self.tasks[index]


class OpenVLAPolicy:
    """Wrap an OpenVLA checkpoint to return continuous actions."""

    def __init__(self, model_id: str, device: str = "cuda"):
        if importlib.util.find_spec("openvla") is None:
            raise ImportError(
                "The openvla package is required for inference. Install it via `pip install openvla` or "
                "follow the official instructions to pull the model weights."
            )

        from openvla.modeling import load_pretrained_model  # type: ignore
        from openvla.processing import OpenVLAProcessor  # type: ignore

        self.device = device
        self.model = load_pretrained_model(model_id).to(device)  # type: ignore[arg-type]
        self.model.eval()
        self.processor = OpenVLAProcessor.from_pretrained(model_id)  # type: ignore[call-arg]

    def act(self, image: np.ndarray, proprio: np.ndarray, language: str) -> np.ndarray:
        # robosuite returns images as HxWxC in uint8; OpenVLA expects PIL/torch.
        inputs = self.processor(
            images=image,
            proprio=proprio.tolist(),
            text=language,
            return_tensors="pt",
        ).to(self.device)

        with torch.no_grad():
            output = self.model(**inputs)

        # OpenVLA returns an action distribution; we take the mean.
        action = output.action.squeeze(0).detach().cpu().numpy()
        return action


def make_env(task: LiberoTask, camera_name: str, horizon: int, seed: int) -> GymWrapper:
    env = suite.make(
        **task.env_kwargs,
        has_renderer=True,  # pop up an interactive viewer
        has_offscreen_renderer=True,
        render_camera=camera_name,
        use_camera_obs=True,
        camera_names=[camera_name],
        horizon=horizon,
        control_freq=20,
        reward_shaping=True,
    )
    env = GymWrapper(env, keys=["image", "robot-state", "task-obs"])
    env.reset()
    env.env.seed(seed)

    # Align the viewer with the requested camera if the backend supports it.
    if getattr(env.env, "viewer", None) is not None:
        try:
            env.env.viewer.set_camera(camera_name)  # type: ignore[attr-defined]
        except Exception:
            pass

    return env


def run_episode(env: GymWrapper, policy: OpenVLAPolicy, task: LiberoTask, camera_name: str, render_gui: bool = True) -> float:
    obs = env.reset()
    cumulative_reward = 0.0
    done = False

    while not done:
        # update the on-screen viewer (if available)
        if render_gui:
            try:
                env.render()
            except Exception:
                pass

        image = env.sim.render(
            camera_name=camera_name,
            height=240,
            width=320,
            depth=False,
        )
        proprio = obs["robot-state"]
        action = policy.act(image=image, proprio=proprio, language=task.language_instruction)
        obs, reward, done, _ = env.step(action)
        cumulative_reward += float(reward)

    return cumulative_reward


## Load benchmark + policy
This cell instantiates the LIBERO benchmark, builds the robosuite environment with an on-screen renderer, and loads the specified OpenVLA checkpoint.


In [None]:

benchmark = LiberoBenchmark(benchmark_name)
if benchmark_task_index < 0 or benchmark_task_index >= len(benchmark):
    raise IndexError(
        f"Task index {benchmark_task_index} is out of range for benchmark '{benchmark_name}' with {len(benchmark)} tasks"
    )

libero_task = benchmark[benchmark_task_index]
env = make_env(libero_task, camera_name=camera_name, horizon=horizon, seed=seed)
policy = OpenVLAPolicy(model_id, device=device)

print(f"Loaded task '{libero_task.name}' with instruction: {libero_task.language_instruction}")
print(f"Environment ready on device={device}; close the viewer window to stop the kernel if needed.")


## Run a single episode
The loop below continuously renders the robosuite GUI window while capturing off-screen frames for OpenVLA. Close the viewer or interrupt the kernel to stop execution early.


In [None]:

reward = run_episode(env, policy, libero_task, camera_name=camera_name, render_gui=True)
print(f"Finished task '{libero_task.name}' with reward {reward:.3f}")
