In [None]:
from actors import vLLMActor

In [None]:
from __future__ import annotations
import abc, torch
from typing import Dict, List, Sequence

class BaseRLLoss(abc.ABC):
    """Every loss must return (scalar_loss, metrics:dict[str,float])."""

    @abc.abstractmethod
    def forward(
        self,
        policy,                    # nn.Module (on device, requires_grad)
        reference,                 # nn.Module (eval, no grad) or None
        input_ids: torch.LongTensor,
        attention_mask: torch.LongTensor,
        advantages: torch.Tensor,  # shape (B,) or (B,L-1)
        **kw,
    ) -> tuple[torch.Tensor, Dict[str, float]]: ...


In [None]:
import torch

class GRPOLoss(BaseRLLoss):
    def __init__(self, eps: float, beta: float, temperature: float):
        self.eps, self.beta, self.temp = eps, beta, temperature

    def forward(self, policy, reference, input_ids, attention_mask, advantages, **kw):
        logits = policy(input_ids, attention_mask=attention_mask).logits / self.temp
        new_lp = torch.log_softmax(logits, -1)[:, :-1]
        tgt = input_ids[:, 1:].unsqueeze(-1)
        new_lp = new_lp.gather(-1, tgt).squeeze(-1)

        with torch.no_grad():
            old = new_lp.detach()
            ref = None
            if reference is not None:
                ref_logits = reference(input_ids, attention_mask=attention_mask).logits / self.temp
                ref = torch.log_softmax(ref_logits, -1)[:, :-1]
                ref = ref.gather(-1, tgt).squeeze(-1)

        ratio = torch.exp(new_lp - old)
        clip  = torch.clamp(ratio, 1 - self.eps, 1 + self.eps)
        per   = -torch.min(ratio * advantages, clip * advantages)
        if self.beta and ref is not None:
            kl = torch.exp(ref - new_lp) - (ref - new_lp) - 1
            per = per + self.beta * kl
            mean_kl = kl.mean().item()
        else:
            mean_kl = 0.0

        mask = attention_mask[:, 1:]
        loss = (per * mask).sum() / mask.sum()
        return loss, {"kl": mean_kl}


In [None]:
from liger_kernel.chunked_loss import LigerFusedLinearGRPOLoss

class LigerLoss(BaseRLLoss):
    def __init__(self, beta, loss_type, use_ref_model, temperature):
        self.core = LigerFusedLinearGRPOLoss(beta=beta, use_ref_model=use_ref_model,
                                             loss_type=loss_type, temperature=temperature)
        self.use_ref_model = use_ref_model

    def forward(self, policy, reference, input_ids, attention_mask, advantages, **kw):
        hidden = policy.model(input_ids, attention_mask=attention_mask).last_hidden_state[:, :-1, :]
        ref_logps = None
        if reference is None and self.use_ref_model:
            raise ValueError("Reference model is required for LigerLoss when use_ref_model is True.")
        
        if reference is not None:
            ref_logits = reference(input_ids, attention_mask=attention_mask).logits / self.core.temperature
            ref_logps = torch.log_softmax(ref_logits, -1)[:, :-1]

        tgt_ids = input_ids[:, 1:]
        mask = attention_mask[:, 1:]
        loss, (kl,) = self.core(
            _input        = hidden,
            lin_weight    = policy.lm_head.weight,
            bias          = policy.lm_head.bias,
            selected_token_ids = tgt_ids,
            attention_mask     = mask,
            advantages         = advantages,
            ref_per_token_logps=ref_logps
        )
        return loss, {"kl": kl}


In [None]:
actor = vLLMActor(
  name="vllm_actor",
  model_path="Qwen/Qwen2.5-7B-Instruct",
  gpu_groups=[[0]],
  use_v1_engine=True,
  engine_kwargs={
    "max_model_len": 2048,
  }
)

In [None]:
from vllm import SamplingParams

In [None]:
actor.generate(prompts=["Hello, world!"], sampling_params=SamplingParams(temperature=0.7, top_p=0.9))

In [None]:
class TrainableActorConfig:
  # get model
  

class Environment:
  def __init__(self):
    self.main_actor = vLLMActor(
      name="main_actor",
      model_path="Qwen/Qwen2.5-7B-Instruct",
      gpu_groups=[[0]],
      use_v1_engine=True,
      engine_kwargs={
        "max_model_len": 2048,
      }
    )

    self.helper_actor = vLLMActor(
      name="helper_actor",
      model_path="Qwen/Qwen2.5-7B-Instruct",
      gpu_groups=[[0]],
      use_v1_engine=True,
      engine_kwargs={
        "max_model_len": 2048,
      }
    )

    self.register_trainable_actor(self.main_actor)
    self.register_trainable_actor(self.helper_actor)
    
  
  def __call__(self, entries: list[dict]) -> list[dict]:
    # We extract the prompts for each entry
    prompts = [entry["prompt"] for entry in entries]
    # Generate responses using the main actor
    main_responses = self.main_actor.generate(prompts=prompts, sampling_params=SamplingParams(temperature=0.7, top_p=0.9))
    # Generate responses using the helper actor
    helper_responses = self.helper_actor.generate(prompts=prompts, sampling_params=SamplingParams(temperature=0.7, top_p=0.9))

    # Generate rewards based on the responses
    rewards = [self.calculate_reward(main_response, helper_response) for main_response, helper_response in zip(main_responses, helper_responses)]
    # We return rewards.
    return {'main_actor': [{"reward": reward} for reward in rewards], 
            'helper_actor': [{"reward": reward} for reward in rewards]}

In [2]:
import gc
import torch
import random
import numpy as np
import ctypes
from multiprocessing import shared_memory
from concurrent.futures import ThreadPoolExecutor


def get_shareable_version(meta):
    return {
        key: {k: v for k, v in meta[key].items() if k != "_shm_obj"} for key in meta
    }


def create_shared_state_dict(state_dict, max_workers=200):

    def make_shm(key, tensor):
        if tensor.device.type != "cpu":
            tensor = tensor.cpu()
        tensor = tensor.contiguous()

        shape = tuple(tensor.shape)
        dtype = tensor.dtype
        nbytes = tensor.numel() * tensor.element_size()

        shm = shared_memory.SharedMemory(create=True, size=nbytes)
        buf = shm.buf

        if dtype == torch.bfloat16:
            # one raw-bytes copy from the tensors data_ptr
            ptr = tensor.data_ptr()
            raw = ctypes.string_at(ptr, nbytes)
            buf[:nbytes] = raw
        else:
            # fallback via numpy
            np_arr = tensor.numpy()
            shm_arr = np.ndarray(shape, dtype=np_arr.dtype, buffer=buf)
            shm_arr[:] = np_arr

        del tensor
        if random.random() < 0.02:
            torch.cuda.empty_cache()
            gc.collect()

        return key, {
            "shm_name": shm.name,
            "shape": shape,
            "dtype": str(dtype),
            "nbytes": nbytes,
            "_shm_obj": shm,
        }

    shared_meta = {}
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(make_shm, k, v) for k, v in state_dict.items()]
        for f in futures:
            k, m = f.result()
            shared_meta[k] = m
    return shared_meta


def load_shared_state_dict(meta):

    def load_shared_tensor(key, info):
        shm = shared_memory.SharedMemory(name=info["shm_name"])
        shape = tuple(info["shape"])
        dtype = info["dtype"]
        nbytes = info["nbytes"]
        try:
            if "bfloat16" in dtype or dtype.endswith("bf16"):
                # copy raw bytes out via a Python bytes object
                buf = shm.buf
                raw = buf[:nbytes].tobytes()
                tensor = torch.empty(shape, dtype=torch.bfloat16)
                ctypes.memmove(tensor.data_ptr(), raw, nbytes)
            else:
                buf = shm.buf
                np_arr = np.ndarray(shape, dtype=np.dtype(dtype), buffer=buf)
                tensor = torch.tensor(np_arr)
                del np_arr
            if random.random() < 0.1:
                gc.collect()
            return key, tensor
        finally:
            shm.close()
            shm.unlink()

    state_dict = {}
    with ThreadPoolExecutor(max_workers=30) as executor:
        futures = [
            executor.submit(load_shared_tensor, k, info) for k, info in meta.items()
        ]
        for f in futures:
            k, t = f.result()
            state_dict[k] = t
    return state_dict

In [1]:
import os
os.environ['VLLM_ALLOW_INSECURE_SERIALIZATION'] = '1'
from vllm import LLM

  from .autonotebook import tqdm as notebook_tqdm


INFO 06-16 14:29:27 [__init__.py:244] Automatically detected platform cuda.


2025-06-16 14:29:28,961	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


In [None]:
llm = LLM(model="Qwen/Qwen2.5-0.5B-Instruct", max_model_len=1, gpu_memory_utilization=0.5)

INFO 06-16 14:29:38 [config.py:823] This model supports multiple tasks: {'generate', 'reward', 'classify', 'score', 'embed'}. Defaulting to 'generate'.
INFO 06-16 14:29:38 [config.py:2195] Chunked prefill is enabled with max_num_batched_tokens=8192.
INFO 06-16 14:29:39 [core.py:455] Waiting for init message from front-end.
INFO 06-16 14:29:39 [core.py:70] Initializing a V1 LLM engine (v0.9.1) with config: model='Qwen/Qwen2.5-0.5B-Instruct', speculative_config=None, tokenizer='Qwen/Qwen2.5-0.5B-Instruct', skip_tokenizer_init=False, tokenizer_mode=auto, revision=None, override_neuron_config={}, tokenizer_revision=None, trust_remote_code=False, dtype=torch.bfloat16, max_seq_len=1, download_dir=None, load_format=auto, tensor_parallel_size=1, pipeline_parallel_size=1, disable_custom_all_reduce=False, quantization=None, enforce_eager=False, kv_cache_dtype=auto,  device_config=cuda, decoding_config=DecodingConfig(backend='auto', disable_fallback=False, disable_any_whitespace=False, disable_ad

Loading safetensors checkpoint shards:   0% Completed | 0/1 [00:00<?, ?it/s]
Loading safetensors checkpoint shards: 100% Completed | 1/1 [00:00<00:00,  8.63it/s]
Loading safetensors checkpoint shards: 100% Completed | 1/1 [00:00<00:00,  8.57it/s]



INFO 06-16 14:29:41 [default_loader.py:272] Loading weights took 0.14 seconds
INFO 06-16 14:29:41 [gpu_model_runner.py:1624] Model loading took 0.9277 GiB and 0.965274 seconds
INFO 06-16 14:29:44 [backends.py:462] Using cache directory: /home/rd/.cache/vllm/torch_compile_cache/56eb07ea00/rank_0_0 for vLLM's torch.compile
INFO 06-16 14:29:44 [backends.py:472] Dynamo bytecode transform time: 2.43 s
INFO 06-16 14:29:46 [backends.py:135] Directly load the compiled graph(s) for shape None from the cache, took 1.659 s
INFO 06-16 14:29:46 [monitor.py:34] torch.compile takes 2.43 s in total
INFO 06-16 14:29:47 [gpu_worker.py:227] Available KV cache memory: 9.46 GiB
INFO 06-16 14:29:47 [kv_cache_utils.py:715] GPU KV cache size: 826,608 tokens
INFO 06-16 14:29:47 [kv_cache_utils.py:719] Maximum concurrency for 1 tokens per request: 51663.00x
INFO 06-16 14:30:03 [gpu_model_runner.py:2048] Graph capturing finished in 16 secs, took 0.39 GiB
INFO 06-16 14:30:03 [core.py:171] init engine (profile, cr

Loading shared state dict on worker: <vllm.worker.worker_base.WorkerWrapperBase object at 0x79cb53ac2f80>
{'model.embed_tokens.weight': tensor([[-0.0104,  0.0408,  0.0097,  ...,  0.0098,  0.0136, -0.0067],
        [-0.0146, -0.0014, -0.0177,  ..., -0.0024,  0.0024, -0.0081],
        [-0.0366, -0.0102,  0.0078,  ..., -0.0074, -0.0177, -0.0007],
        ...,
        [ 0.0060, -0.0053,  0.0033,  ..., -0.0082, -0.0082,  0.0187],
        [ 0.0060, -0.0053,  0.0033,  ..., -0.0082, -0.0082,  0.0187],
        [ 0.0060, -0.0053,  0.0033,  ..., -0.0082, -0.0082,  0.0187]],
       dtype=torch.bfloat16), 'model.layers.0.self_attn.q_proj.weight': tensor([[-0.0019, -0.0052,  0.0188,  ..., -0.0061, -0.0153,  0.0038],
        [ 0.0084,  0.0018,  0.0435,  ...,  0.0066, -0.0422, -0.0181],
        [-0.0168, -0.0248,  0.0422,  ...,  0.0089, -0.0008, -0.0094],
        ...,
        [-0.1040,  0.0791,  0.0132,  ..., -0.0161, -0.0221, -0.0588],
        [-0.0140,  0.0654,  0.0591,  ...,  0.0410, -0.0046,  0.00

In [4]:
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen2.5-0.5B-Instruct", torch_dtype=torch.bfloat16, device_map="auto")

[2025-06-16 14:30:04,812] [INFO] [real_accelerator.py:254:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2025-06-16 14:30:05,232] [INFO] [logging.py:107:log_dist] [Rank -1] [TorchCheckpointEngine] Initialized with serialization = False


Sliding Window Attention is enabled but not implemented for `sdpa`; unexpected results may be encountered.


In [5]:
model.state_dict().keys()  # Check the model's state dict keys

odict_keys(['model.embed_tokens.weight', 'model.layers.0.self_attn.q_proj.weight', 'model.layers.0.self_attn.q_proj.bias', 'model.layers.0.self_attn.k_proj.weight', 'model.layers.0.self_attn.k_proj.bias', 'model.layers.0.self_attn.v_proj.weight', 'model.layers.0.self_attn.v_proj.bias', 'model.layers.0.self_attn.o_proj.weight', 'model.layers.0.mlp.gate_proj.weight', 'model.layers.0.mlp.up_proj.weight', 'model.layers.0.mlp.down_proj.weight', 'model.layers.0.input_layernorm.weight', 'model.layers.0.post_attention_layernorm.weight', 'model.layers.1.self_attn.q_proj.weight', 'model.layers.1.self_attn.q_proj.bias', 'model.layers.1.self_attn.k_proj.weight', 'model.layers.1.self_attn.k_proj.bias', 'model.layers.1.self_attn.v_proj.weight', 'model.layers.1.self_attn.v_proj.bias', 'model.layers.1.self_attn.o_proj.weight', 'model.layers.1.mlp.gate_proj.weight', 'model.layers.1.mlp.up_proj.weight', 'model.layers.1.mlp.down_proj.weight', 'model.layers.1.input_layernorm.weight', 'model.layers.1.post_

In [6]:
import gc
gc.collect()
torch.cuda.empty_cache()

In [30]:
meta = create_shared_state_dict(model.state_dict())

In [31]:
meta = get_shareable_version(meta)

In [24]:
meta

{'model.embed_tokens.weight': {'shm_name': 'psm_941640f5',
  'shape': (151936, 896),
  'dtype': 'torch.bfloat16',
  'nbytes': 272269312},
 'model.layers.0.self_attn.q_proj.weight': {'shm_name': 'psm_0172b8c6',
  'shape': (896, 896),
  'dtype': 'torch.bfloat16',
  'nbytes': 1605632},
 'model.layers.0.self_attn.q_proj.bias': {'shm_name': 'psm_76a6458a',
  'shape': (896,),
  'dtype': 'torch.bfloat16',
  'nbytes': 1792},
 'model.layers.0.self_attn.k_proj.weight': {'shm_name': 'psm_68e3a7a8',
  'shape': (128, 896),
  'dtype': 'torch.bfloat16',
  'nbytes': 229376},
 'model.layers.0.self_attn.k_proj.bias': {'shm_name': 'psm_c1c23a83',
  'shape': (128,),
  'dtype': 'torch.bfloat16',
  'nbytes': 256},
 'model.layers.0.self_attn.v_proj.weight': {'shm_name': 'psm_c1ad9490',
  'shape': (128, 896),
  'dtype': 'torch.bfloat16',
  'nbytes': 229376},
 'model.layers.0.self_attn.v_proj.bias': {'shm_name': 'psm_bb7f8885',
  'shape': (128,),
  'dtype': 'torch.bfloat16',
  'nbytes': 256},
 'model.layers.0.

In [None]:
def _v1_remote_load(worker_self, meta) -> None:
    # state = load_shared_state_dict(meta_blob)
    # print(worker_self.model_runner.model.load_state_dict(state_dict))
    print("Loading shared state dict on worker:", worker_self)
    state_dict = load_shared_state_dict(meta)
    print(state_dict)
    worker_self.model_runner.model.load_weights(weights=state_dict.items())
with torch.no_grad():
    llm.collective_rpc(_v1_remote_load, args=(meta,))


In [None]:
dict(model.state_dict())['lm_head.weight']

In [1]:
from transformers import AutoTokenizer, AutoModelForCausalLM
from torch.optim import AdamW
from torch.optim.lr_scheduler import LinearLR, ConstantLR
from vllm import SamplingParams
import bitsandbytes as bnb
from deepspeed.ops.adam import DeepSpeedCPUAdam
import torch
model = AutoModelForCausalLM.from_pretrained(
                "Qwen/Qwen2.5-3B-Instruct", torch_dtype=torch.bfloat16, use_cache=False, trust_remote_code=True, device_map="auto"
            )


optim = bnb.optim.Adam32bit(model.parameters(), lr=2e-6)


# We make some dummy loss with respect to the model.
loss = sum([torch.sum(t) for t in model.parameters()])
loss.backward()
optim.step()

  from .autonotebook import tqdm as notebook_tqdm


INFO 06-17 17:05:48 [__init__.py:244] Automatically detected platform cuda.


2025-06-17 17:05:49,688	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


[2025-06-17 17:05:49,818] [INFO] [real_accelerator.py:254:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2025-06-17 17:05:50,665] [INFO] [logging.py:107:log_dist] [Rank -1] [TorchCheckpointEngine] Initialized with serialization = False


Sliding Window Attention is enabled but not implemented for `sdpa`; unexpected results may be encountered.
Loading checkpoint shards: 100%|██████████| 2/2 [00:00<00:00,  2.28it/s]


OutOfMemoryError: CUDA out of memory. Tried to allocate 86.00 MiB. GPU 0 has a total capacity of 23.62 GiB of which 40.00 MiB is free. Including non-PyTorch memory, this process has 22.55 GiB memory in use. Of the allocated memory 22.10 GiB is allocated by PyTorch, and 8.35 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [13]:
optim.to_gpu()

In [14]:
optim

PagedAdam8bit (
Parameter Group 0
    alpha: 0.0
    betas: (0.9, 0.999)
    eps: 1e-08
    lr: 2e-06
    t_alpha: None
    t_beta3: None
    weight_decay: 0
)