In [1]:
from transformers import Qwen2_5_VLForConditionalGeneration, Qwen2_5_VLProcessor, Qwen2_5_VLConfig
model_path="/models/Qwen/Qwen2.5-VL-3B-Instruct"
config=None
processor=None
model=None

config = Qwen2_5_VLConfig.from_pretrained(model_path)
# processor = Qwen2_5_VLProcessor.from_pretrained(model_path)
# model = Qwen2_5_VLForConditionalGeneration.from_pretrained(model_path).cuda().eval()
print("[config]\n",config)
print("[processor]\n",processor)
print("[model]\n",model)

  from .autonotebook import tqdm as notebook_tqdm


[config]
 Qwen2_5_VLConfig {
  "architectures": [
    "Qwen2_5_VLForConditionalGeneration"
  ],
  "attention_dropout": 0.0,
  "bos_token_id": 151643,
  "eos_token_id": 151645,
  "hidden_act": "silu",
  "hidden_size": 2048,
  "image_token_id": 151655,
  "initializer_range": 0.02,
  "intermediate_size": 11008,
  "max_position_embeddings": 128000,
  "max_window_layers": 70,
  "model_type": "qwen2_5_vl",
  "num_attention_heads": 16,
  "num_hidden_layers": 36,
  "num_key_value_heads": 2,
  "rms_norm_eps": 1e-06,
  "rope_scaling": {
    "mrope_section": [
      16,
      24,
      24
    ],
    "rope_type": "default",
    "type": "default"
  },
  "rope_theta": 1000000.0,
  "sliding_window": 32768,
  "tie_word_embeddings": true,
  "torch_dtype": "bfloat16",
  "transformers_version": "4.49.0.dev0",
  "use_cache": true,
  "use_sliding_window": false,
  "video_token_id": 151656,
  "vision_config": {
    "hidden_size": 1280,
    "in_chans": 3,
    "model_type": "qwen2_5_vl",
    "out_hidden_size"

In [None]:
"learning cache engine functions and cache engine. No need to initialize"
from typing import *
import rich
import torch
from lmdeploy.pytorch.backends.selector import get_backend
from lmdeploy.pytorch.config import BackendConfig, CacheConfig, ModelConfig

def __adjust_block_size(model_config:ModelConfig, cache_config: CacheConfig):
    """
    pytorch.engine.model_agent._update_cache_config
    adjust block_size.
    Currently lmdeploy doesn't support block_size too large.
    """
    # TODO: support kernel with both large head dim and large block size.
    if model_config.k_head_dim >= 512 and cache_config.block_size > 32:
        cache_config.block_size = 32

def _get_key_value_block_shape_impl(
        model_config: ModelConfig,
        block_size: int,
        head_size: int,
        world_size: int = 1,
        quant_policy: Literal[0, 4, 8] = 0,
        local: bool = True
) -> tuple:
    """
    get single block shape. stores history tokens predicted by model in single block.
    Hence `block_size` you may consider it as max seq_len stored in this block.
    Returns:
        out(tuple),returns explicit backend shape.
        Every device supports different shape.  
        For instance,  
        cuda supports shape=(block_size, num_heads, head_size);  
        ascend supports shape=(block_size, num_heads*head_size)  
    """
    attn_backend = get_backend()
    dtype = model_config.dtype
    num_heads = model_config.num_key_value_heads
    if local:
        # k v heads must be divisible by world_size,
        # which originally represents num of processes run in all nodes(machines).
        # however in lmdeploy it represents `tp` you set.
        # tensor parallelism parallelizes grouped k v heads into different devices in one node.
        assert num_heads % world_size == 0, \
            f'num_heads: {num_heads}, world_size: {world_size}'
        num_heads = num_heads // world_size
    if quant_policy == 4:  # pack head_dim to uint8
        assert head_size % 2 == 0, \
            f'head_size: {head_size}, quant_policy: {quant_policy}'
        head_size = head_size // 2
    # get_k_block_shape==get_v_block_shape mostly
    return attn_backend.get_k_block_shape(block_size, num_heads, head_size, dtype)

def get_cache_block_size(block_size: int,
                         model_config: ModelConfig,
                         world_size: int = 1,
                         quant_policy: int = 0) -> int:
    """
    Get the required cache size of the model in single kvcahce block:
    ```
    num_layers * (block_size * num_heads * head_size) * 2
    ```

    Args:
        block_size (int): size of single cache block, used to store token's k,v vector.
        model_config (ModelConfig): The config of the model.
        quant_policy: 0 represents default float. 4 represents int8 quant.
    Returns:
       `int`, returns memory size in bytes all key value heads should be allocated.
    """
    num_layers = model_config.num_layers
    key_head_size = model_config.k_head_dim
    value_head_size = model_config.v_head_dim
    if key_head_size is None:
        key_head_size = model_config.head_dim
    if value_head_size is None:
        value_head_size = model_config.head_dim
    key_shape = _get_key_value_block_shape_impl(
        model_config,
        block_size=block_size,
        head_size=key_head_size,
        world_size=world_size,
        local=True,
        quant_policy=quant_policy,
    ) # (block_size, num_heads, head_size) in cuda; or (block_size, num_heads*head_size) in ascend
    value_shape = _get_key_value_block_shape_impl(
        model_config,
        block_size=block_size,
        head_size=value_head_size,
        world_size=world_size,
        quant_policy=quant_policy,
        local=True,
    )
    if quant_policy == 0:
        dtype = model_config.dtype
        # meta device represents empty device with no memory allocated.
        # tensor on `meta` contains dummy data.
        key_block = torch.empty(key_shape, dtype=dtype, device='meta')
        value_block = torch.empty(value_shape, dtype=dtype, device='meta')
        mem_key_block = key_block.numel() * key_block.element_size() # number of parameters * dtype size.
        mem_value_block = value_block.numel() * value_block.element_size()
    elif quant_policy in (4, 8):
        ...
    else:
        raise ValueError(f'unsupported quant_policy {quant_policy}')

    total = num_layers * (mem_key_block + mem_value_block)
    return total

def __get_free_gpu_mem_size(model_config:ModelConfig,
                            cache_config:CacheConfig,
                            cache_block_size:int,
                            gpu_id:int)->tuple[int]:
    """
    Get free gpu memory size after prefill.
    Args:
        cache_block_size(int): memory size in bytes all key value heads should be allocated.
    Returns:
        out(int): gpu memory free to be allcated subtracted from max runtime memory then multiple `cache_max_entry_count`.
    """
    def __get_runtime_size(num_free_gpu_mem: int,
                           cache_block_size: int,
                           vocal_size: int):
        """
        find best prefill token num and max runtime size needed.
        Estimate max runtime size must be allcated in prefill stage.
        Args:
            num_free_gpu_mem(int): size of gpu free memory in bytes.
        Returns:
            out(tuple[int]): `runtime_cache_size`: max gpu memory size in runtime;
            `max_prefill_token_num`: update max prefill tokens.
        """
        cache_max_entry_count = cache_config.cache_max_entry_count
        max_prefill_token_num = cache_config.max_prefill_token_num # num of tokens accepted to be prefilled.
        runtime_cache_size = 0
        while max_prefill_token_num > 0:
            # lm_head output(2) + to float(4) + estimated misc(1) = 7
            """
            In prefill stage,
            hidden_states.shape=[token_num, hidden_dim], dtype=float16 | bfloat16
            token_num <= max_prefill_token_num. It multiples lm_head(shape=[tokens_num,vocab_size],dtype=float32)

            intermediate size of single parameter at least contains `16bits + 32bits = 6Bytes` and
            leave deviation 1Byte which is 7Bytes in total.
            """
            runtime_cache_size = int(max_prefill_token_num * vocal_size * 7) # maximum runtime_cache_size
            # (free gpu memory - cache size must be allocated during run time) * cache_max_entry_count \in [0,1]
            num_available = (num_free_gpu_mem - runtime_cache_size) * cache_max_entry_count
            if int(num_available) // cache_block_size >= 16:
                # 这里有点固定了？ num_available 必须满足 kvcacheblocks 的16倍或以上才break。
                # 否则缩减`max_prefill_token_num`为之前的两倍，再计算是否满足16倍。
                break
            max_prefill_token_num = max_prefill_token_num // 2
        return runtime_cache_size, max_prefill_token_num
    
    torch.cuda.empty_cache()
    gpu_mem_physical_free, _ = torch.cuda.mem_get_info(gpu_id)
    vocal_size = model_config.vocab_size

    runtime_cache_size, max_prefill_token_num = __get_runtime_size(gpu_mem_physical_free, cache_block_size,
                                                                    vocal_size)
    if cache_config.max_prefill_token_num != max_prefill_token_num:
        if max_prefill_token_num <= 0:
            raise RuntimeError('No enough gpu memory for runtime.')
        cache_config.max_prefill_token_num = max_prefill_token_num
        print(f'device<{gpu_id}> No enough memory. '
                        'update max_prefill_token_num='
                        f'{max_prefill_token_num}')
    gpu_mem_physical_free -= runtime_cache_size
    print('estimated max runtime memory:'
                    f' {runtime_cache_size>>20} mb')
    return gpu_mem_physical_free * cache_config.cache_max_entry_count

def _update_cache_config(model_config: ModelConfig,
                         cache_config: CacheConfig,
                         gpu_id:int = 0,
                         host_mem_size:int = 1*(1<<30),
                         world_size:int=1):
    """
    Adjust:
    --
    `cache_config.max_prefill_token_num`: num of maximum prefill token  
    `cache_config.num_gpu_blocks`: num of gpu blocks used to store kv cache.  
    `cache_config.num_cpu_blocks`: num of cpu blocks  
    `cache_config.window_size`: ...
    """
    __adjust_block_size(model_config,cache_config)
    cache_block_size = get_cache_block_size(
        cache_config.block_size, model_config, world_size,
        cache_config.quant_policy
    )
    gpu_mem = __get_free_gpu_mem_size(model_config,cache_config,cache_block_size,gpu_id)
    cpu_mem = host_mem_size
    if cache_config.num_cpu_blocks == 0:
        cache_config.num_cpu_blocks = int(cpu_mem / cache_block_size)
        if cache_config.num_cpu_blocks <= 0:
            raise RuntimeError('No enough host memory for kv cache.')
    if cache_config.num_gpu_blocks == 0:
        cache_config.num_gpu_blocks = int(gpu_mem / cache_block_size)
        if cache_config.num_gpu_blocks <= 0:
            raise RuntimeError('No enough gpu memory for kv cache.')
    cache_config.window_size = model_config.sliding_window

    print('block num: {}'.format(cache_config.num_gpu_blocks))
    return

class CacheEngine:
    """Host and Device memory maintainer.

    Args:
        cache_config (CacheConfig): config of the cache information.
        model_config (ModelConfig): config of the model.
        rank (int): distribution rank, 0 on non-distributed environment.
        world_size (int): distribution world size, 1 on non-distributed
            environment.
    """

    def __init__(
        self,
        cache_config: CacheConfig,
        model_config: ModelConfig,
        rank: int = 0,
        world_size: int = 1,
    ) -> None:
        if rank == 0:
            print(f'build CacheEngine with config:{cache_config}')
        self.rank = rank
        self.world_size = world_size

        self.cache_config = cache_config
        self.model_config = model_config

        self.block_size = cache_config.block_size
        self.num_layers = model_config.num_layers
        self.kv_cache_dtype = model_config.dtype
        if cache_config.quant_policy > 0:
            if self.cache_config.device_type in ['cuda']:
                self.kv_cache_dtype = torch.uint8
            elif self.cache_config.device_type in ['ascend', 'npu']:
                self.kv_cache_dtype = torch.int8
            else:
                raise ValueError(f'unsupported device_type {self.cache_config.device_type}')

        # Initialize the cache.
        self.local_gpu_cache = self.allocate_gpu_cache()
        self.local_cpu_cache = self.allocate_cpu_cache()

        # Initialize the stream for caching operations.
        self.cache_stream = torch.cuda.Stream()
        assert self.cache_stream != torch.cuda.current_stream()
        # Initialize the events for stream synchronization.
        self.events = torch.cuda.Event()

    @property
    def cpu_cache(self):
        """gpu cache."""
        return self.local_cpu_cache

    @property
    def gpu_cache(self):
        """gpu cache."""
        return self.local_gpu_cache

    @property
    def num_gpu_blocks(self):
        """num gpu blocks."""
        return self.cache_config.num_gpu_blocks

    @property
    def num_cpu_blocks(self):
        """num gpu blocks."""
        return self.cache_config.num_cpu_blocks

    def _allocate_cache(self, num_blocks: int, device: torch.device):
        """
        create and allocate memory cache.
        Returns:
            out(tuple[torch.Tensor]): `(key_cache, value_cache)`.  
            `key_cache.shape=[num_layers, num_blocks, block_size, num_heads, head_size]` if cuda  
            `key_cache.shape=[num_layers, num_blocks, block_size, num_heads*head_size]` if ascend  
            `value_cache.shape` mostly equal to `key_cache.shape`
        """
        # cuda supports block_shape=(block_size, num_heads, head_size) and
        # ascend supports block_shape=(block_size, num_heads*head_size)
        key_block_shape = _get_key_value_block_shape_impl(
            self.model_config,self.block_size,
            head_size=self.model_config.head_dim,
            local=True
        )
        value_block_shape = _get_key_value_block_shape_impl(
            self.model_config,self.block_size,
            head_size=self.model_config.head_dim,
            local=True
        )
        num_layers = self.num_layers
        kv_cache_dtype = self.kv_cache_dtype

        key_cache = torch.empty(
            size=(num_layers, num_blocks, *key_block_shape),
            dtype=kv_cache_dtype,
            device=device,
        )
        value_cache = torch.empty(
            size=(num_layers, num_blocks, *value_block_shape),
            dtype=kv_cache_dtype,
            device=device,
        )

        output = (key_cache, value_cache)

        if self.cache_config.quant_policy in (4, 8):
            dtype = self.model_config.dtype
            key_sz_cache = torch.empty(
                size=(num_layers, num_blocks, *key_block_shape[:-1], 2),
                dtype=dtype,
                device=device,
            )
            val_sz_cache = torch.empty(
                size=(num_layers, num_blocks, *value_block_shape[:-1], 2),
                dtype=dtype,
                device=device,
            )
            output = output + (key_sz_cache, val_sz_cache)

        return output

    def allocate_gpu_cache(self):
        """allocate caches on GPU."""
        caches = self._allocate_cache(self.num_gpu_blocks, 'cuda')
        self.full_gpu_cache = caches
        # self.local_gpu_cache: 第一个维度是`num_layers`，
        # 第二个维度是`key_or_value_cache`，
        # 后续维度是 single `key_cache` or `value_cache`: [num_blocks, block_size, num_heads, head_size]
        self.local_gpu_cache = list(zip(*caches))
        return self.local_gpu_cache

    def allocate_cpu_cache(self):
        """allocate caches on Host."""
        caches = self._allocate_cache(self.num_cpu_blocks, 'cpu')

        self.full_cpu_cache = caches
        self.local_cpu_cache = list(zip(*caches))
        return self.local_cpu_cache



In [None]:
"lmdpeloy.[CacheConfig, ModelConfig]"
from lmdeploy import PytorchEngineConfig
from lmdeploy.pytorch.config import BackendConfig, CacheConfig, ModelConfig
from lmdeploy.pytorch.configurations import AutoModelConfigBuilder

engine_config=PytorchEngineConfig()

backend_config = BackendConfig(eager_mode=False,device_type='cuda')

model_config = AutoModelConfigBuilder.build(
    hf_config=config, model_path=model_path, tp=1
)
model_config.k_head_dim = model_config.v_head_dim = 128
# rich.print("[ModelConfig]\n",model_config)

cache_config = CacheConfig(
    max_batches=engine_config.max_batch_size,
    block_size=engine_config.block_size,
    num_cpu_blocks=engine_config.num_cpu_blocks,
    num_gpu_blocks=engine_config.num_gpu_blocks,
    cache_max_entry_count=engine_config.cache_max_entry_count,
    max_prefill_token_num=engine_config.max_prefill_token_num,
    enable_prefix_caching=engine_config.enable_prefix_caching,
    quant_policy=engine_config.quant_policy,
    device_type=engine_config.device_type,
)

### learning kvcache blocks ###

# _update_cache_config(model_config,cache_config)
# cache_engine = CacheEngine(cache_config,model_config)

### learning kvcache blocks ###

- python 中`asyncio.Event()`作用：  
用于同步协程各任务的对象。先设置`event = asyncio.Event()`  
初始`event.is_set()`为`False`。  
各含有`await event.wait()`的协程任务会先在这步暂停，等待`event.is_set()=True`，在通过`event.set()`设置为`True`后，  
各协程内的任务就会开始执行。当然你也可以设置多个`Event`用于管理多个协程任务队列。
- 例子：

```python
import asyncio
import functools
  
def set_event(event):
  print('setting event in callback')
  event.set()
  
async def coro1(event):
  print('coro1 waiting for event')
  await event.wait()
  print('coro1 triggered')
  
async def coro2(event):
  print('coro2 waiting for event')
  await event.wait()
  print('coro2 triggered')
  
async def main(loop):
  # Create a shared event
  event = asyncio.Event()
  print('event start state: {}'.format(event.is_set()))
  
  loop.call_later(
    0.1, functools.partial(set_event, event)
  )
  
  await asyncio.wait([coro1(event), coro2(event)])
  print('event end state: {}'.format(event.is_set()))
  
event_loop = asyncio.get_event_loop()
try:
  event_loop.run_until_complete(main(event_loop))
finally:
  event_loop.close()
```
- Output:
```shell
event start state: False
coro2 waiting for event
coro1 waiting for event
setting event in callback
coro2 triggered
coro1 triggered
event end state: True
```

### lmdeploy总体架构
- `lmdeploy.pytorch.engine.Engine`是大脑，`model_agent`，`scheduler`，`request_manger`会在`Engine`中初始化
    - 其中`Engine._async_loop`就是整个任务开端。使用3个`asyncio.Event()`相互交错地 await 在各协程任务之间，用于同步各协程任务防止出错
        - 协程任务1：`_async_loop_background`，用于模型推理
        - 协程任务2：`_async_loop_preprocess_message`，用于预处理输入的`input_ids`与VLM的图像输入，会受到协程任务1的`Event`的限制
        - 协程任务3：`_async_send_responses`，用于构造输出，也会受到协程任务1的`Event`的限制

- `lmdeploy.pytorch.paging.scheduler.Scheduler`用于调度。
    > 开始都是在`waiting`队列。若是`running`小于`max_batches`，就会从`waiting`中取出请求，匹配kvcache缓存表，并置入`running`队列
    - `schedule`是其中最主要的调度状态的函数

- 模型加载与`CacheEngine`会在`lmdeploy.pytorch.model_agent.*ModelAgent`中初始化
    - 各`_build_model`即加载模型的函数
    - 各`async_forward`即异步推理函数

- `lmdpeloy.pytorch.engine.request.RequestManager`用于管理请求的状态
    - 主要函数却在`Engine._bind_request_manager`绑定

In [6]:
from lmdeploy import Tokenizer
from lmdeploy.pytorch.engine import Engine

tokenizer = Tokenizer(model_path)
engine = Engine(model_path, tokenizer, engine_config, trust_remote_code=True)


2025-02-14 08:32:19,378 - lmdeploy - [37mINFO[0m - model_agent.py:228 - build model.
2025-02-14 08:32:19,733 - lmdeploy - [37mINFO[0m - model_agent.py:230 - loading weights.


Loading weights from safetensors: 100%|██████████| 2/2 [00:01<00:00,  1.64it/s]

2025-02-14 08:32:20,964 - lmdeploy - [37mINFO[0m - model_agent.py:232 - loading adapters.
2025-02-14 08:32:20,966 - lmdeploy - [37mINFO[0m - cache_engine.py:36 - build CacheEngine with config:CacheConfig(max_batches=128, block_size=64, num_cpu_blocks=455, num_gpu_blocks=3852, window_size=-1, cache_max_entry_count=0.8, max_prefill_token_num=4096, enable_prefix_caching=False, quant_policy=0, device_type='cuda')



