In [None]:
import torch
import numpy as np
from torchvision import transforms
from einops import rearrange
from llava_pythia.constants import IMAGE_TOKEN_INDEX, DEFAULT_IMAGE_TOKEN, DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN
from llava_pythia.mm_utils import tokenizer_image_token, get_model_name_from_path
from llava_pythia.model.builder import load_pretrained_model
from llava_pythia.conversation import conv_templates
from llava_pythia.model.language_model.pythia.llava_pythia import LlavaPythiaConfig

def get_image(ts, camera_names, rand_crop_resize=False):
    curr_images = []
    for cam_name in camera_names:
        img = rearrange(ts.observation['images'][cam_name], 'h w c -> c h w')
        curr_images.append(img)
    curr_image = np.stack(curr_images, axis=0)
    curr_image = torch.from_numpy(curr_image / 255.0).float().cuda().unsqueeze(0)

    if rand_crop_resize:
        print('rand crop resize is used!')
        original_size = curr_image.shape[-2:]
        ratio = 0.95
        curr_image = curr_image[..., int(original_size[0]*(1-ratio)/2): int(original_size[0]*(1+ratio)/2),
                                int(original_size[1]*(1-ratio)/2): int(original_size[1]*(1+ratio)/2)]
        curr_image = curr_image.squeeze(0)
        resize_transform = transforms.Resize(original_size, antialias=True)
        curr_image = resize_transform(curr_image).unsqueeze(0)
    return curr_image

def convert_actions(pred_action):
    return pred_action  # 그대로 반환

class llava_pythia_act_policy:
    def __init__(self, policy_config, data_args=None):
        self.load_policy(policy_config)
        self.data_args = data_args

    def load_policy(self, policy_config):
        self.policy_config = policy_config
        model_base = policy_config["model_base"] if policy_config['enable_lora'] else None
        model_name = get_model_name_from_path(policy_config['model_path'])
        model_path = policy_config["model_path"]

        self.tokenizer, self.policy, self.image_processor, self.context_len = load_pretrained_model(
            model_path, model_base, model_name, False, False
        )
        self.config = LlavaPythiaConfig.from_pretrained(
            '/'.join(model_path.split('/')[:-1]),
            trust_remote_code=True
        )

    def process_batch_to_llava(self, curr_image, robo_state, raw_lang):
        self.conv = conv_templates[self.policy_config['conv_mode']].copy()

        if len(curr_image.shape) == 5:
            curr_image = curr_image.squeeze(0)

        image, image_r = torch.chunk(curr_image, 2, dim=0)

        image = self.expand2square(image, tuple(x for x in self.image_processor.image_mean))
        image_tensor = self.image_processor.preprocess(
            image, return_tensors='pt', do_normalize=True, do_rescale=False, do_center_crop=False
        )['pixel_values'].to(torch.float32).to(self.policy.device)

        image_r = self.expand2square(image_r, tuple(x for x in self.image_processor.image_mean))
        image_tensor_r = self.image_processor.preprocess(
            image_r, return_tensors='pt', do_normalize=True, do_rescale=False, do_center_crop=False
        )['pixel_values'].to(torch.float32).to(self.policy.device)

        inp = raw_lang
        if self.policy.config.mm_use_im_start_end:
            inp = DEFAULT_IM_START_TOKEN + DEFAULT_IMAGE_TOKEN + DEFAULT_IM_END_TOKEN + '\n' + inp
        else:
            inp = DEFAULT_IMAGE_TOKEN + '\n' + inp

        self.conv.append_message(self.conv.roles[0], inp)
        self.conv.append_message(self.conv.roles[1], None)
        prompt = self.conv.get_prompt() + " <|endoftext|>"

        input_ids = tokenizer_image_token(prompt, self.tokenizer, IMAGE_TOKEN_INDEX,
                                          return_tensors='pt').unsqueeze(0).cuda().long()
        attn_mask = input_ids.ne(self.tokenizer.pad_token_id).long()

        states = robo_state.to(self.policy.device, dtype=torch.float32)

        return dict(
            input_ids=input_ids,
            attention_mask=attn_mask,
            images=image_tensor.float(),
            images_r=image_tensor_r.float(),
            # states=states  # 필요 시 추가
        )

    def expand2square(self, pil_imgs, background_color):
        batch_size, channels, height, width = pil_imgs.shape
        max_dim = max(height, width)
        expanded_imgs = np.full((batch_size, max_dim, max_dim, channels),
                                background_color, dtype=np.float32)

        if height == width:
            expanded_imgs = pil_imgs.permute(0, 2, 3, 1).cpu().numpy()
        elif height > width:
            offset = (max_dim - width) // 2
            expanded_imgs[:, :height, offset:offset+width, :] = pil_imgs.permute(0,2,3,1).cpu().numpy()
        else:
            offset = (max_dim - height) // 2
            expanded_imgs[:, offset:offset+height, :width, :] = pil_imgs.permute(0,2,3,1).cpu().numpy()

        return torch.tensor(expanded_imgs).to(dtype=pil_imgs.dtype, device=pil_imgs.device)


In [None]:
        )

    def process_batch_to_llava(self, curr_image, robo_state, raw_lang):
        self.conv = conv_templates[self.policy_config['conv_mode']].copy()

        if len(curr_image.shape) == 5:
            curr_image = curr_image.squeeze(0)

        image, image_r = torch.chunk(curr_image, 2, dim=0)

        image = self.expand2square(image, tuple(x for x in self.image_processor.image_mean))
        image_tensor = self.image_processor.preprocess(
            image, return_tensors='pt', do_normalize=True, do_rescale=False, do_center_crop=False
        )['pixel_values'].to(torch.float32).to(self.policy.device)

        image_r = self.expand2square(image_r, tuple(x for x in self.image_processor.image_mean))
        image_tensor_r = self.image_processor.preprocess(
            image_r, return_tensors='pt', do_normalize=True, do_rescale=False, do_center_crop=False
        )['pixel_values'].to(torch.float32).to(self.policy.device)

        inp = raw_lang
        if self.policy.config.mm_use_im_start_end:
            inp = DEFAULT_IM_START_TOKEN + DEFAULT_IMAGE_TOKEN + DEFAULT_IM_END_TOKEN + '\n' + inp
        else:
            inp = DEFAULT_IMAGE_TOKEN + '\n' + inp

        self.conv.append_message(self.conv.roles[0], inp)
        self.conv.append_message(self.conv.roles[1], None)
        prompt = self.conv.get_prompt() + " <|endoftext|>"

        input_ids = tokenizer_image_token(prompt, self.tokenizer, IMAGE_TOKEN_INDEX,
                                          return_tensors='pt').unsqueeze(0).cuda().long()
        attn_mask = input_ids.ne(self.tokenizer.pad_token_id).long()

        states = robo_state.to(self.policy.device, dtype=torch.float32)

        return dict(
            input_ids=input_ids,
            attention_mask=attn_mask,
            images=image_tensor.float(),
            images_r=image_tensor_r.float(),
            # states=states  # 필요 시 추가
        )

    def expand2square(self, pil_imgs, background_color):
        batch_size, channels, height, width = pil_imgs.shape
        max_dim = max(height, width)
        expanded_imgs = np.full((batch_size, max_dim, max_dim, channels),
                                background_color, dtype=np.float32)

        if height == width:
            expanded_imgs = pil_imgs.permute(0, 2, 3, 1).cpu().numpy()
        elif height > width:
            offset = (max_dim - width) // 2
            expanded_imgs[:, :height, offset:offset+width, :] = pil_imgs.permute(0,2,3,1).cpu().numpy()
        else:
            offset = (max_dim - height) // 2
            expanded_imgs[:, offset:offset+height, :width, :] = pil_imgs.permute(0,2,3,1).cpu().numpy()

        return torch.tensor(expanded_imgs).to(dtype=pil_imgs.dtype, device=pil_imgs.device)


In [21]:
import robosuite as suite
from robosuite.controllers import load_composite_controller_config
import numpy as np

class RobosuiteDeployEnv:
    def __init__(self, env_name="Lift", cameras=("sideview", "frontview"),
                 control_freq=20):
        """
        DROID 스타일 시점으로 카메라를 재배치한 robosuite 환경
        기존 카메라 이름만 사용하고, 위치/각도를 덮어써서 DROID 느낌으로 변환
        """
        controller_config = load_composite_controller_config(robot="Panda")

        # ✅ robosuite 환경 생성
        self.env = suite.make(
            env_name=env_name,
            robots="Panda",
            controller_configs=controller_config,
            has_renderer=False,
            has_offscreen_renderer=True,
            render_camera=None,
            use_object_obs=True,
            use_camera_obs=True,
            control_freq=control_freq,
            camera_names=list(cameras),   # 기존 등록된 카메라만 사용
            camera_heights=240,
            camera_widths=320,
        )

        # ✅ 카메라 위치/각도 재배치 (DROID 스타일)
        sim = self.env.sim

        # sideview → DROID 왼쪽 카메라처럼 설정
        # ✅ sideview → 왼쪽 위, 사선 내려보기
        cam_id_side = sim.model.camera_name2id("sideview")
        sim.model.cam_pos[cam_id_side] = [0.4, 0.8, 1.0]
        sim.model.cam_quat[cam_id_side] = [0.653, 0.271, -0.653, 0.271]

        # ✅ frontview → 오른쪽 위, 사선 내려보기
        cam_id_front = sim.model.camera_name2id("frontview")
        sim.model.cam_pos[cam_id_front] = [-0.4, -0.8, 1.0]
        sim.model.cam_quat[cam_id_front] = [0.653, -0.271, 0.653, 0.271]


        # ✅ 초기 reset
        self.obs = self.env.reset()
        print("DEBUG obs keys:", self.obs.keys())
        print("✅ 카메라 시점 재배치 완료! 현재 카메라:", self.env.camera_names)

    def get_observation(self):
        """
        ✅ 현재 step의 관측값을 (ts, robot_state) 형태로 반환
        """
        obs = self.obs

        # 1) 이미지 dictionary 생성
        ts = type("Timestep", (), {})()
        ts.observation = {'images': {}}
        for cam_name in self.env.camera_names:
            ts.observation['images'][cam_name] = obs[f'{cam_name}_image']

        # 2) 로봇 상태 (eef + gripper)
        eef_pos = obs['robot0_eef_pos']
        eef_quat = obs['robot0_eef_quat']
        gripper_qpos = obs['robot0_gripper_qpos']
        gripper_state = np.mean(gripper_qpos)

        robot_state = np.concatenate([eef_pos, eef_quat, [gripper_state]])

        return ts, robot_state

    def reset(self):
        self.obs = self.env.reset()
        return self.obs

    def step(self, action):
        self.obs, reward, done, info = self.env.step(action)
        return self.obs, reward, done, info

    def render_cameras(self, cameras=("droid_left", "droid_right"), width=320, height=240):
        frames = []
        # ⚠️ Mujoco 내부 상태 업데이트
        self.env.sim.forward()

        for cam in cameras:
            frame = self.env.sim.render(
                camera_name=cam,
                width=width,
                height=height,
                depth=False,        # ✅ depth buffer 강제 OFF
                mode="offscreen",   # ✅ GPU offscreen 렌더링
            )
            # ⚠️ robosuite 기본은 RGB이지만, 일부 드라이버에서 BGR 출력됨 → 변환
            if frame.shape[-1] == 3:
                frame = frame[..., ::-1]
            frames.append(frame)
        return frames



In [22]:
import cv2
import numpy as np

def eval_bc(policy, deploy_env, policy_config, save_episode=True, num_rollouts=1,
            raw_lang=None, n_steps=50, fps=20,
            camera_names=("sideview", "frontview")):   # ✅ 기본 카메라를 DROID 스타일로 변경
    assert raw_lang is not None, "raw lang is None!!!!!!"

    all_frames = []

    for rollout_idx in range(num_rollouts):
        print(f"=== rollout {rollout_idx} start ===")
        deploy_env.reset()

        for t in range(n_steps):
            # ✅ 새로운 카메라 이름으로 get_observation
            ts, robot_state = deploy_env.get_observation()
            traj_rgb_np = get_image(ts, camera_names)   # ✅ 여기서도 바꿔줌

            robot_state_tensor = torch.from_numpy(robot_state).float().cuda()

            batch = policy.process_batch_to_llava(traj_rgb_np, robot_state_tensor, raw_lang)

            with torch.no_grad():
                all_actions = policy.policy(**batch, eval=True)
            print("=== DEBUG: all_actions shape:", all_actions.shape)
            print("=== DEBUG: first all_actions[0]:", all_actions[0])

            raw_action = all_actions[0][0].detach().cpu().numpy()
            action = convert_actions(raw_action)

            obs, reward, done, info = deploy_env.step(action)
            print(f"[t={t}] reward={reward:.4f}, done={done}")

            # ✅ 여기서도 새로운 카메라 시점 사용
            frames = deploy_env.render_cameras(
                cameras=camera_names,   # ✅ 바꾼 카메라 반영
                width=640, height=480
            )
            merged_frame = np.concatenate(frames, axis=1)
            all_frames.append(merged_frame)

            if done:
                break

    # ✅ mp4 저장 부분은 동일
    if save_episode and all_frames:
        h, w, _ = all_frames[0].shape
        out = cv2.VideoWriter("rollout.mp4", cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h))
        for frame in all_frames:
            out.write(frame)
        out.release()
        print("✅ rollout.mp4 저장 완료!")


In [None]:
if __name__ == '__main__':
    policy_config = {
        "model_path": "/home/parkjeongsu/TinyVLA/OUTPUT_llava_pythia/checkpoint-2000",
        "model_base": "/home/parkjeongsu/TinyVLA/Llava-Pythia-400M",
        "enable_lora": False,
        "conv_mode": "pythia",
        "action_head": "droid_diffusion",
        "action_dim": 7,
        "chunk_size": 10
    }

    raw_lang = "pick up the cube and place it on the table"

    policy = llava_pythia_act_policy(policy_config)

    deploy_env = RobosuiteDeployEnv(
        env_name="Lift",
        cameras=("sideview", "frontview")
    )

    eval_bc(
        policy,
        deploy_env,
        policy_config,
        save_episode=True,
        num_rollouts=1,
        raw_lang=raw_lang,
        n_steps=30,
        fps=20
    )


The argument `trust_remote_code` is to be used with Auto classes. It has no effect here and is ignored.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


load llaVA-Pythia MLLM!!!
number of parameters: 7.283150e+07


The argument `trust_remote_code` is to be used with Auto classes. It has no effect here and is ignored.
[1m[32m[robosuite INFO] [0mLoading controller configuration from: /home/parkjeongsu/anaconda3/envs/tinysuite/lib/python3.10/site-packages/robosuite/controllers/config/robots/default_panda.json (composite_controller_factory.py:121)


{'device_map': 'cuda', 'torch_dtype': torch.float32}
DEBUG obs keys: odict_keys(['robot0_joint_pos', 'robot0_joint_pos_cos', 'robot0_joint_pos_sin', 'robot0_joint_vel', 'robot0_eef_pos', 'robot0_eef_quat', 'robot0_eef_quat_site', 'robot0_gripper_qpos', 'robot0_gripper_qvel', 'sideview_image', 'frontview_image', 'cube_pos', 'cube_quat', 'gripper_to_cube_pos', 'robot0_proprio-state', 'object-state'])
✅ 카메라 시점 재배치 완료! 현재 카메라: ['sideview', 'frontview']
=== rollout 0 start ===


RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x512 and 519x512)

In [19]:
import h5py

with h5py.File("/home/parkjeongsu/TinyVLA/droid_with_lang/droid_1dot7t_lang_succ_t0001_s-0-0/episode_171777.hdf5", "r") as f:
    print(list(f['observations/images'].keys()))


['left', 'right']


In [16]:
import h5py
def inspect(h5_path):
    with h5py.File(h5_path, "r") as f:
        print("language_raw:", f["language_raw"][...])
        print("action:", f["action"].shape)  # (T,10)
        obs = f["observations"]
        print("qpos:", obs["qpos"].shape)
        print("qvel:", obs["qvel"].shape)
        imgs = obs["images"]
        for k in imgs.keys():
            print(f"images/{k}:", imgs[k].shape)
        print("joint_positions:", obs["joint_position"].shape)
inspect("/home/parkjeongsu/TinyVLA/droid_with_lang/droid_1dot7t_lang_succ_t0001_s-0-0/episode_5392544.hdf5")


language_raw: [b'Press a button on the keyboard']
action: (304, 10)
qpos: (304, 7)
qvel: (304, 7)
images/left: (304, 180, 320, 3)
images/right: (304, 180, 320, 3)


KeyError: "Unable to synchronously open object (object 'joint_position' doesn't exist)"

In [28]:
import robosuite as suite

env = suite.make(
    "Lift",
    robots="Panda",
    has_renderer=False,
    use_camera_obs=True,
    camera_names=["sideview", "frontview"],  # 기존 카메라 이름 사용
    camera_heights=480,
    camera_widths=640,
)

sim = env.sim

# sideview → DROID 왼쪽 느낌
cam_id_side = sim.model.camera_name2id("sideview")
sim.model.cam_pos[cam_id_side] = [1.2, 0.5, 1.4]
sim.model.cam_quat[cam_id_side] = [0.7, 0.0, -0.7, 0.0]

# frontview → DROID 오른쪽 느낌
cam_id_front = sim.model.camera_name2id("frontview")
sim.model.cam_pos[cam_id_front] = [-1.2, -0.5, 1.4]
sim.model.cam_quat[cam_id_front] = [0.7, 0.0, 0.7, 0.0]

obs = env.reset()
left_img = obs["sideview_image"]
right_img = obs["frontview_image"]
print(left_img.shape, right_img.shape)


[1m[32m[robosuite INFO] [0mLoading controller configuration from: /home/parkjeongsu/anaconda3/envs/tinysuite/lib/python3.10/site-packages/robosuite/controllers/config/robots/default_panda.json (composite_controller_factory.py:121)
[1m[32m[robosuite INFO] [0mLoading controller configuration from: /home/parkjeongsu/anaconda3/envs/tinysuite/lib/python3.10/site-packages/robosuite/controllers/config/robots/default_panda.json (composite_controller_factory.py:121)


(480, 640, 3) (480, 640, 3)
