In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from trl import DPOConfig, DPOTrainer
from transformers import AutoModelForCausalLM, AutoTokenizer
from src.data_process import TrajectoryDataset
import os
# os.environ["CUDA_VISIBLE_DEVICES"] = "2,3"
import sys
sys.path.append("../..")
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Union
import draccus
import numpy as np
import tqdm
from experiments.robot.robot_utils import get_model
from libero.libero import benchmark
import wandb
from experiments.robot.libero.libero_utils import (
    get_libero_dummy_action,
    get_libero_env,
    get_libero_image,
    quat2axisangle,
    save_rollout_video_CoA,
)
from experiments.robot.openvla_utils import get_processor, get_input
from experiments.robot.robot_utils import (
    DATE_TIME,
    get_action,
    get_CoA,
    get_image_resize_size,
    get_model,
    invert_gripper_action,
    normalize_gripper_action,
    set_seed_everywhere,
)

from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Union
import torch
from torch.utils.data import Dataset, IterableDataset
from transformers import PreTrainedTokenizerBase, BaseImageProcessor, FeatureExtractionMixin, ProcessorMixin
from trl import DPOConfig, DPOTrainer
from typing import Union

  from .autonotebook import tqdm as notebook_tqdm
2025-08-05 20:24:48.298157: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-08-05 20:24:48.298197: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-08-05 20:24:48.299869: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-08-05 20:24:48.309431: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler fl





In [3]:
@dataclass
class GenerateConfig:
    # fmt: off

    #################################################################################################################
    # Model-specific parameters
    #################################################################################################################
    model_family: str = "openvla"                    # Model family
    pretrained_checkpoint: Union[str, Path] = "/mnt/sda/home/zijianwang/openvla/FT_res/openvla-7b-finetuned-libero-10+libero_10_no_noops+b4+lr-0.0005+lora-r48+dropout-0.0--image_aug--2025-07-18_19-26-25"     # Pretrained checkpoint path
    winner_trajectory_path: str = "/mnt/sda/home/zijianwang/openvla/vla-scripts/DPO/winner_trajectory"

    #################################################################################################################
    load_in_8bit: bool = False                       # (For OpenVLA only) Load with 8-bit quantization
    load_in_4bit: bool = False                       # (For OpenVLA only) Load with 4-bit quantization

    center_crop: bool = True                         # Center crop? (if trained w/ random crop image aug)

    #################################################################################################################
    # LIBERO environment-specific parameters
    #################################################################################################################
    task_suite_name: str = "libero_10"          # Task suite. Options: libero_spatial, libero_object, libero_goal, libero_10, libero_90
    num_steps_wait: int = 10                         # Number of steps to wait for objects to stabilize in sim
    num_trials_per_task: int = 50                    # Number of rollouts per task

    #################################################################################################################
    # Utils
    #################################################################################################################
    run_id_note: Optional[str] = None                # Extra note to add in run ID for logging
    local_log_dir: str = "./experiments/logs"        # Local directory for eval logs

    use_wandb: bool = False                          # Whether to also log results in Weights & Biases
    wandb_project: str = "YOUR_WANDB_PROJECT"        # Name of W&B project to log to (use default!)
    wandb_entity: str = "YOUR_WANDB_ENTITY"          # Name of entity to log under

    seed: int = 7                                    # Random Seed (for reproducibility)

    device: str = "cuda:0"

    # fmt: on

In [4]:
def setup_model_and_config(cfg: GenerateConfig):
    """Setup and validate configuration, then load the model."""
    assert cfg.pretrained_checkpoint is not None, "cfg.pretrained_checkpoint must not be None!"
    if "image_aug" in cfg.pretrained_checkpoint:
        assert cfg.center_crop, "Expecting `center_crop==True` because model was trained with image augmentations!"
    assert not (cfg.load_in_8bit and cfg.load_in_4bit), "Cannot use both 8-bit and 4-bit quantization!"

    # Set random seed
    set_seed_everywhere(cfg.seed)

    cfg.unnorm_key = cfg.task_suite_name

    # Load model
    model = get_model(cfg)
    
    return model

def setup_logging_and_environment(cfg: GenerateConfig, model):
    """Setup logging and LIBERO environment."""
    # [OpenVLA] Check that the model contains the action un-normalization key
    if cfg.model_family == "openvla":
        # In some cases, the key must be manually modified (e.g. after training on a modified version of the dataset
        # with the suffix "_no_noops" in the dataset name)
        if cfg.unnorm_key not in model.norm_stats and f"{cfg.unnorm_key}_no_noops" in model.norm_stats:
            cfg.unnorm_key = f"{cfg.unnorm_key}_no_noops"
        assert cfg.unnorm_key in model.norm_stats, f"Action un-norm key {cfg.unnorm_key} not found in VLA `norm_stats`!"

    # [OpenVLA] Get Hugging Face processor
    processor = None
    if cfg.model_family == "openvla":
        processor = get_processor(cfg)

    # Initialize local logging
    run_id = f"EVAL-{cfg.task_suite_name}-{cfg.model_family}-{DATE_TIME}"
    if cfg.run_id_note is not None:
        run_id += f"--{cfg.run_id_note}"
    os.makedirs(cfg.local_log_dir, exist_ok=True)
    local_log_filepath = os.path.join(cfg.local_log_dir, run_id + ".txt")
    log_file = open(local_log_filepath, "w")
    print(f"Logging to local log file: {local_log_filepath}")

    # Initialize Weights & Biases logging as well
    if cfg.use_wandb:
        wandb.init(
            entity=cfg.wandb_entity,
            project=cfg.wandb_project,
            name=run_id,
        )

    # Initialize LIBERO task suite
    benchmark_dict = benchmark.get_benchmark_dict()
    task_suite = benchmark_dict[cfg.task_suite_name]()
    num_tasks_in_suite = task_suite.n_tasks
    print(f"Task suite: {cfg.task_suite_name}")
    log_file.write(f"Task suite: {cfg.task_suite_name}\n")

    # Get expected image dimensions
    resize_size = get_image_resize_size(cfg)

    return processor, log_file, task_suite, num_tasks_in_suite, resize_size, cfg

In [5]:
"""Main function to run the OpenVLA LIBERO inference demo."""
print("[*] Starting OpenVLA LIBERO Inference Demo")

# Initialize configuration
cfg = GenerateConfig()

# Setup model and configuration
print("[*] Loading model and setting up configuration...")
model = setup_model_and_config(cfg)

# Setup logging and environment
print("[*] Setting up logging and environment...")
processor, log_file, task_suite, num_tasks_in_suite, resize_size,cfg = setup_logging_and_environment(cfg, model)

[*] Starting OpenVLA LIBERO Inference Demo
[*] Loading model and setting up configuration...
[*] Instantiating Pretrained VLA model
[*] Loading in BF16 with Flash-Attention Enabled


Loading checkpoint shards: 100%|██████████| 4/4 [00:00<00:00, 10.74it/s]


OutOfMemoryError: CUDA out of memory. Tried to allocate 32.00 MiB. GPU 0 has a total capacity of 23.64 GiB of which 5.25 MiB is free. Process 574445 has 17.66 GiB memory in use. Including non-PyTorch memory, this process has 5.97 GiB memory in use. Of the allocated memory 5.55 GiB is allocated by PyTorch, and 40.20 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [7]:
ref_config = GenerateConfig(device = "cuda:2" )
ref_model = get_model(ref_config)

[*] Instantiating Pretrained VLA model
[*] Loading in BF16 with Flash-Attention Enabled


Loading checkpoint shards: 100%|██████████| 4/4 [00:00<00:00, 10.68it/s]


Loaded model: <class 'transformers_modules.openvla-7b-finetuned-libero-10+libero_10_no_noops+b4+lr-0.0005+lora-r48+dropout-0.0--image_aug--2025-07-18_19-26-25.modeling_prismatic.OpenVLAForActionPrediction'>


In [None]:
# Create dataset instance
dataset = TrajectoryDataset(cfg, cfg.winner_trajectory_path, cfg.task_suite_name, processor, device = cfg.device, model = model)

# dataset只返回"prompt_input_ids": [1, 2, 3], "chosen_input_ids": [4, 5], "rejected_input_ids": [6], 
# attention_mask在DataCollatorForPreference生成, labels在dpotrainer.concatenated_forward中生成. 

Found 212 success trajectories


In [9]:
for i in range(len(dataset)):
    print(dataset[i].keys())
    break

[info] using task orders [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
dict_keys(['prompt_input_ids', 'chosen_input_ids', 'rejected_input_ids'])


In [12]:
import torch
from torch.utils.data import Dataset, IterableDataset
from transformers import PreTrainedTokenizerBase, BaseImageProcessor, FeatureExtractionMixin, ProcessorMixin
from trl import DPOConfig, DPOTrainer
from typing import Union

class CustomDPOTrainer(DPOTrainer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def get_winner(self, batch: dict[str, Union[list, torch.LongTensor]]) -> dict:
        pass
    def get_loser(self, batch: dict[str, Union[list, torch.LongTensor]]) -> dict:
        pass
        
    def _prepare_dataset(
        self,
        dataset: Union[Dataset, IterableDataset],
        processing_class: Union[PreTrainedTokenizerBase, BaseImageProcessor, FeatureExtractionMixin, ProcessorMixin],
        args: DPOConfig,
        dataset_name: str,
    ) -> Union[Dataset, IterableDataset]:  # return dataset, which will be used in _get_dataloader
        return dataset

In [8]:
training_args = DPOConfig(output_dir="Qwen2-0.5B-DPO")
# print(training_args)
training_args.place_model_on_device = False 

AttributeError: can't set attribute 'place_model_on_device'

In [15]:
trainer = CustomDPOTrainer(model=model, ref_model=ref_model, args=training_args, processing_class=processor, train_dataset=dataset)

OutOfMemoryError: CUDA out of memory. Tried to allocate 32.00 MiB. GPU 0 has a total capacity of 23.64 GiB of which 5.25 MiB is free. Process 574445 has 17.66 GiB memory in use. Including non-PyTorch memory, this process has 5.97 GiB memory in use. Of the allocated memory 5.55 GiB is allocated by PyTorch, and 40.20 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

# ---------------

In [None]:
import imageio
import os
os.makedirs("./rollouts", exist_ok=True)
mp4_path = f"./rollouts/test.mp4"
video_writer = imageio.get_writer(mp4_path, fps=30)
for img in play_imgs:
    video_writer.append_data(img)
video_writer.close()

# Display the video in the notebook
from IPython.display import Video, display
display(Video(mp4_path, width=400, height=300))

In [None]:
import torch
import torch.nn as nn
from typing import Union
from trl import DPOTrainer

class CustomDPOTrainer(DPOTrainer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def get_winner(self, batch: dict[str, Union[list, torch.LongTensor]]) -> dict:
        pass
    def get_loser(self, batch: dict[str, Union[list, torch.LongTensor]]) -> dict:
        pass
        
    def _prepare_dataset(
        self,
        dataset: Union[Dataset, IterableDataset],
        processing_class: Union[PreTrainedTokenizerBase, BaseImageProcessor, FeatureExtractionMixin, ProcessorMixin],
        args: DPOConfig,
        dataset_name: str,
    ) -> Union[Dataset, IterableDataset]:  # return dataset, which will be used in _get_dataloader
        return dataset

    @staticmethod
    def concatenated_inputs(
        batch: dict[str, Union[list, torch.LongTensor]], padding_value: int
    ) -> dict[str, torch.LongTensor]:
        pass

    def concatenated_forward(
        self, model: nn.Module, batch: dict[str, Union[list, torch.LongTensor]], is_ref_model: bool = False
    ):
        """
        Runs the given model on the given batch of inputs, concatenating the chosen and rejected inputs together.

        We do this to avoid doing two forward passes, because it's faster for FSDP.

        Args:
            model:
                Model to run the forward pass on.
            batch:
                Batch of input data.
            is_ref_model:
                Whether this method is being called for the reference model. If `True`, length desensitization is not
                applied.
        """
        return super().concatenated_forward(model, batch, is_ref_model)


trainer = CustomDPOTrainer(
    model=model,
    ref_model=ref_model,
    args=args,
    data_collator=data_collator,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
)

# trainer.is_vision_model = True

trainer._signature_columns = [
    "prompt_input_ids",
    "chosen_input_ids",
    "rejected_input_ids",
    "image_sizes",
    "ref_chosen_logps",
    "ref_rejected_logps",
]  ## should add "pixel_values" and "pixel_attention_mask"


In [None]:
from transformers import LlamaForCausalLM, LlamaTokenizer

In [None]:
# 原始数据 
#   ↓
# __init__ 
#   ├── 设置 processing_class (tokenizer/processor)
#   ├── 设置 padding_value  
#   ├── 设置 data_collator
#   └── 调用 _prepare_dataset
#         ↓
# _prepare_dataset
#   ├── maybe_extract_prompt (提取隐式prompt)
#   ├── maybe_apply_chat_template (应用聊天模板)  
#   └── tokenize_row/process_row (标记化)
#         ↓
# get_train_dataloader/get_eval_dataloader
#   └── compute_ref_log_probs (预计算参考对数概率，可选)
#         ↓
# DataCollatorForPreference.torch_call
#   └── 批处理：转张量、填充
#         ↓
# concatenated_inputs  
#   └── 连接 chosen/rejected 输入
#         ↓
# 模型前向传播


# 1. DPOTrainer.__init__()
#    ├── 设置 processing_class, padding_value, data_collator
#    ├── 调用 _prepare_dataset() 
#    │   ├── maybe_extract_prompt()
#    │   ├── maybe_apply_chat_template()
#    │   └── tokenize_row/process_row()
#    └── 调用 super().__init__() 传递处理好的数据集

# 2. 训练开始：trainer.train()
#    └── 调用 _inner_training_loop()

# 3. _inner_training_loop()
#    └── 调用 get_train_dataloader()  ← 这里！

# 4. get_train_dataloader()
#    └── 调用 _get_dataloader()  ← 这里！

# 5. _get_dataloader()
#    └── 调用 _remove_unused_columns()  ← 最终调用位置！

# 6. DataLoader 创建完成，开始训练循环

In [None]:
# dpo_trainer.train()
# │
# ├─ Trainer.train() [基类方法]
# │   │
# │   ├─ 初始化训练环境
# │   ├─ 加载检查点 (如果有)
# │   │
# │   └─ _inner_training_loop()
# │       │
# │       ├─ 创建训练数据加载器
# │       ├─ 初始化优化器和调度器
# │       │
# │       └─ 主训练循环:
# │           │
# │           for epoch in epochs:
# │               for update_step in total_updates:
# │                   for inputs in batch_samples:
# │                       │
# │                       └─ training_step(model, inputs)
# │                           │
# │                           ├─ model.train()
# │                           ├─ _prepare_inputs(inputs)
# │                           │
# │                           └─ compute_loss(model, inputs) [DPO重写]
# │                               │
# │                               └─ get_batch_loss_metrics(model, batch)
# │                                   │
# │                                   ├─ 判断是否使用Liger优化
# │                                   │
# │                                   ├─ [路径1: 使用Liger] _compute_loss_liger()
# │                                   │   │
# │                                   │   ├─ concatenated_inputs() - 合并chosen/rejected数据
# │                                   │   ├─ 模型前向传播获取hidden states
# │                                   │   ├─ 获取参考模型权重
# │                                   │   └─ LigerFusedLinearDPOLoss() - 计算DPO loss
# │                                   │
# │                                   └─ [路径2: 标准流程] 
# │                                       │
# │                                       ├─ concatenated_forward(model, batch)
# │                                       │   │
# │                                       │   ├─ concatenated_inputs() - 合并数据
# │                                       │   ├─ 模型前向传播
# │                                       │   ├─ 计算logits和attention mask
# │                                       │   ├─ selective_log_softmax() - 计算log概率
# │                                       │   └─ 返回chosen_logps, rejected_logps
# │                                       │
# │                                       ├─ compute_ref_log_probs(batch) 
# │                                       │   │
# │                                       │   ├─ 如果batch中有预计算的ref logps则直接使用
# │                                       │   └─ 否则用参考模型计算:
# │                                       │       ├─ null_ref_context() - PEFT适配器切换
# │                                       │       └─ concatenated_forward(ref_model, batch)
# │                                       │
# │                                       └─ dpo_loss(chosen_logps, rejected_logps, ref_chosen_logps, ref_rejected_logps)
# │                                           │
# │                                           ├─ 计算log ratios
# │                                           ├─ 应用不同的loss类型 (sigmoid, hinge, ipo等)
# │                                           ├─ 计算rewards
# │                                           └─ 返回losses, chosen_rewards, rejected_rewards
# │
# └─ 返回训练结果