# Nano-vLLM

### 環境構築

In [None]:
from dataclasses import dataclass
import os
from dataclasses import dataclass
from transformers import AutoConfig
from dataclasses import dataclass
import torch

import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
logger.addHandler(logging.FileHandler("debug.log"))

## Qwen3

In [None]:
import torch
from torch import nn
import torch.nn.functional as F

class SiluAndMul(nn.Module):
    """
    SwiGLU（Swish-Gated Linear Unit）活性化関数の実装
    """

    def __init__(self):
        logger.info(f"SwiGLUを初期化")
        super().__init__()

    @torch.compile
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        logger.info(f"SwiGLUを順伝播開始 {x.shape=}")

        # 入力を2つのチャンクに分割
        x, y = x.chunk(2, -1)
        logger.debug(f"チャンク化 {x.shape=}, {y.shape=}")

        # SwiGLU活性化関数の適用
        result = F.silu(x) * y

        logger.info(f"SwiGLUを順伝播完了 {result.shape=}")
        return result


In [None]:
import torch
from torch import nn
import triton
import triton.language as tl

from flash_attn import flash_attn_varlen_func, flash_attn_with_kvcache
from nanovllm.utils.context import get_context

@triton.jit
def store_kvcache_kernel(
    key_ptr,
    key_stride,
    value_ptr,
    value_stride,
    k_cache_ptr,
    v_cache_ptr,
    slot_mapping_ptr,
    D: tl.constexpr,
):
    """
    計算したキーとバリューをKVキャッシュに保存するTritonカーネル
    スロットマッピングを参照して適切な物理メモリにキーとバリューを保存する

    Args:
        key_ptr: キーのポインタ
        key_stride: キーのストライド
        value_ptr: バリューのポインタ
        value_stride: バリューのストライド
        k_cache_ptr: キーキャッシュの先頭ポインタ
        v_cache_ptr: バリューキャッシュの先頭ポインタ
        slot_mapping_ptr: スロットマッピングのポインタ
        D: キーとバリューの次元数
    """

    # 1) 担当するインデックスを取得

    # 担当するトークンのインデックスを取得
    idx = tl.program_id(0)

    # 2) 保存先スロットを特定

    # スロットマッピングからトークンに対応する物理スロットを取得
    slot = tl.load(slot_mapping_ptr + idx)

    # 割り当てがない場合は何もしない
    if slot == -1: return

    # 3) 入力データを読み込み

    # キーの読み込みオフセットを作成（D次元分）
    key_offsets = idx * key_stride + tl.arange(0, D)

    # バリューの読み込みオフセットを作成（D次元分）
    value_offsets = idx * value_stride + tl.arange(0, D)

    # 単一のキーを読み込み
    key = tl.load(key_ptr + key_offsets)

    # 単一のバリューを読み込み
    value = tl.load(value_ptr + value_offsets)

    # 4) KVキャッシュに書き込み

    # KVキャッシュの書き込みオフセットを作成（D次元分）
    cache_offsets = slot * D + tl.arange(0, D)

    # 単一のキーをKVキャッシュに書き込み
    tl.store(k_cache_ptr + cache_offsets, key)

    # 単一のバリューをKVキャッシュに書き込み
    tl.store(v_cache_ptr + cache_offsets, value)


def store_kvcache(key: torch.Tensor, value: torch.Tensor, k_cache: torch.Tensor, v_cache: torch.Tensor, slot_mapping: torch.Tensor):
    """
    計算したキーとバリューをKVキャッシュに保存する
    store_kvcache_kernelのラッパー

    Args:
        key (torch.Tensor): 計算したキーのテンソル
        value (torch.Tensor): 計算したバリューのテンソル
        k_cache (torch.Tensor): キーキャッシュのテンソル
        v_cache (torch.Tensor): バリューキャッシュのテンソル
        slot_mapping (torch.Tensor): スロットマッピングのテンソル
    """
    logger.info(f"KVキャッシュにキーとバリューを保存開始 {key.shape=}, {value.shape=}, {k_cache.shape=}, {v_cache.shape=}, {slot_mapping.shape=}")

    # 1) 入力を検証

    # トークン数N、ヘッド数num_heads、ヘッド次元head_dim
    N, num_heads, head_dim = key.shape

    # キーとバリューの次元数D
    D = num_heads * head_dim

    assert key.stride(-1) == 1 and value.stride(-1) == 1
    assert key.stride(1) == head_dim and value.stride(1) == head_dim
    assert k_cache.stride(1) == D and v_cache.stride(1) == D
    assert slot_mapping.numel() == N

    # 2) Tritonカーネルを起動
    store_kvcache_kernel[(N,)](
        key,
        key.stride(0),
        value,
        value.stride(0),
        k_cache, v_cache,
        slot_mapping,
        D)

    logger.info(f"KVキャッシュにキーとバリューを保存完了")


class Attention(nn.Module):
    """
    FlashAttentionのラッパー
    """

    def __init__(
        self,
        num_heads,
        head_dim,
        scale,
        num_kv_heads,
    ):
        logger.info(f"Attention層を初期化 {num_heads=}, {head_dim=}, {scale=}, {num_kv_heads=}")
        super().__init__()

        # ヘッド数
        self.num_heads = num_heads

        # ヘッドの次元数
        self.head_dim = head_dim

        # QKのスケーリング係数
        self.scale = scale

        # KVヘッド数（GQA用）
        self.num_kv_heads = num_kv_heads

        # KVキャッシュ領域への参照を初期化
        self.k_cache = self.v_cache = torch.tensor([])

        logger.info(f"Attention層を初期化完了")

    def forward(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor):
        """
        FlashAttentionの順伝播

        Args:
            q (torch.Tensor): 計算したばかりのクエリ
            k (torch.Tensor): 計算したばかりのキー
            v (torch.Tensor): 計算したばかりのバリュー
        Returns:
            torch.Tensor: アテンション
        """
        logger.info(f"Attention層の順伝播開始 {q.shape=}, {k.shape=}, {v.shape=}")

        # 1) コンテキストを取得

        context = get_context()

        logger.debug(f"コンテキスト情報取得 {context.is_prefill=}, {context.slot_mapping=}, {context.max_seqlen_q=}, {context.cu_seqlens_q=}, {context.max_seqlen_k=}, {context.cu_seqlens_k=}, {context.context_lens=}, {context.block_tables=}")

        # 2) KVキャッシュへの保存

        # KVキャッシュへの参照を取得
        k_cache, v_cache = self.k_cache, self.v_cache

        # KVキャッシュが存在する場合
        if k_cache.numel() and v_cache.numel():

            # KVキャッシュにキーとバリューを保存
            store_kvcache(k, v, k_cache, v_cache, context.slot_mapping)

        # 3) FlashAttentionの実行

        # Prefillの場合
        if context.is_prefill:

            # PrefixCachingの場合
            if context.block_tables is not None:

                # キャッシュからキーとバリューを取得
                k, v = k_cache, v_cache

            logger.debug(f"PrefillモードでFlashAttentionを実行 {context.max_seqlen_q=}, {context.cu_seqlens_q=}, {context.max_seqlen_k=}, {context.cu_seqlens_k=}")

            # 可変長に対応したFlashAttentionを実行
            o = flash_attn_varlen_func(
                q,
                k,
                v,
                max_seqlen_q=context.max_seqlen_q,
                cu_seqlens_q=context.cu_seqlens_q,
                max_seqlen_k=context.max_seqlen_k,
                cu_seqlens_k=context.cu_seqlens_k,
                softmax_scale=self.scale,
                causal=True,
                block_table=context.block_tables)
        
        # Decodeの場合
        else:

            logger.debug(f"DecodeモードでFlashAttentionを実行 {context.context_lens=}, {context.block_tables=}")

            # KVキャッシュを用いたFlashAttentionを実行
            o = flash_attn_with_kvcache(
                q.unsqueeze(1),
                k_cache,
                v_cache,
                cache_seqlens=context.context_lens,
                block_table=context.block_tables, 
                softmax_scale=self.scale,
                causal=True)

        logger.info(f"Attention層の順伝播完了 {o.shape=}")

        return o


In [None]:
import torch
from torch import nn
import torch.nn.functional as F
import torch.distributed as dist

class VocabParallelEmbedding(nn.Module):
    """
    入力の埋め込み層を並列化するクラス（Tensor Parallelism）
    語彙を複数のGPUに分割しメモリ使用量を削減し、並列計算を可能にする
    """

    def __init__(
        self,
        num_embeddings: int,
        embedding_dim: int,
    ):
        logger.info(f"入力の埋め込み層を初期化開始 {num_embeddings=}, {embedding_dim=}")

        super().__init__()

        # 1) 分散環境の情報を取得

        # 担当するGPU
        self.tp_rank = dist.get_rank()

        # 全GPU数
        self.tp_size = dist.get_world_size()

        logger.debug(f"分散環境情報取得 {self.tp_rank=}, {self.tp_size=}")

        # 2) 担当する語彙の範囲を計算

        assert num_embeddings % self.tp_size == 0

        # 全語彙数
        self.num_embeddings = num_embeddings

        # 担当する語彙数
        self.num_embeddings_per_partition = \
            self.num_embeddings // self.tp_size

        # 担当する語彙の開始インデックス
        self.vocab_start_idx = \
            self.num_embeddings_per_partition * self.tp_rank

        # 担当する語彙の終了インデックス
        self.vocab_end_idx = \
            self.vocab_start_idx + self.num_embeddings_per_partition

        logger.debug(f"担当する語彙の範囲計算 {self.vocab_start_idx=}, {self.vocab_end_idx=}")

        # 3) 担当する語彙の重み行列を初期化

        self.weight = nn.Parameter(
            torch.empty(self.num_embeddings_per_partition, embedding_dim))

        self.weight.weight_loader = self.weight_loader

        logger.debug(f"担当する語彙の重み行列初期化 {self.weight.shape=}")

        logger.info(f"入力の埋め込み層を初期化完了")


    def weight_loader(self, param: nn.Parameter, loaded_weight: torch.Tensor):
        """
        モデルのチェックポイントから埋め込み層の重みをスライスしてロードする

        Args:
            param (nn.Parameter): 埋め込み層のパラメータ
            loaded_weight (torch.Tensor): ロードする重み
        """
        logger.info(f"埋め込み層の重みをロード開始 {param.shape=}, {loaded_weight.shape=}")

        # 重みデータへの参照を取得
        param_data = param.data

        # 担当する重みのサイズ
        shard_size = param_data.size(0)

        # 担当する重みの開始インデックスを計算
        start_idx = self.tp_rank * shard_size

        # 担当する重みをスライスしてコピー
        loaded_weight = loaded_weight.narrow(0, start_idx, shard_size)

        logger.info(f"埋め込み層の重みをロード完了 {loaded_weight.shape=}")

        
    def forward(self, x: torch.Tensor):
        """
        入力シーケンスを埋め込みベクトルに変換する

        Args:
            x (torch.Tensor): 入力シーケンスのインデックス
        Returns:
            torch.Tensor: 埋め込みベクトル
        """
        logger.info(f"埋め込み層の順伝播開始 {x.shape=}")

        # 1) 担当する語彙の範囲に基づいて入力をマスク

        # GPUが複数ある場合
        if self.tp_size > 1:

            # 担当する語彙のマスクを作成
            mask = (x >= self.vocab_start_idx) & (x < self.vocab_end_idx)

            # ローカルIDに変換し、担当する語彙以外は0に置き換え
            x = mask * (x - self.vocab_start_idx)

        # 2) 埋め込みを計算

        y = F.embedding(x, self.weight)

        # 3) 結果を集約

        # GPUが複数ある場合
        if self.tp_size > 1:

            # 担当する語彙以外の埋め込みを0に置き換え
            y = mask.unsqueeze(1) * y

            # 全GPUの埋め込みを集約
            dist.all_reduce(y)

        logger.info(f"埋め込み層の順伝播完了 {y.shape=}")
        return y


class ParallelLMHead(VocabParallelEmbedding):
    """
    隠れ層の状態を語彙数分のロジットに変換する言語モデルヘッド
    VocabParallelEmbeddingを継承し、Tensor Parallelismを利用
    """

    def __init__(
        self,
        num_embeddings: int,
        embedding_dim: int,
        bias: bool = False,
    ):
        logger.info(f"出力の言語モデルヘッドを初期化開始 {num_embeddings=}, {embedding_dim=}, {bias=}")

        assert not bias

        super().__init__(num_embeddings, embedding_dim)

        logger.info(f"出力の言語モデルヘッドを初期化完了")


    def forward(self, x: torch.Tensor):
        """
        隠れ層の状態を語彙数分のロジットに変換する

        Args:
            x (torch.Tensor): 隠れ層の状態
        """

        logger.info(f"出力の言語モデルヘッドの順伝播開始 {x.shape=}")

        # コンテキストを取得
        context = get_context()

        # Prefillの場合
        if context.is_prefill:
            # 最後のトークンのインデックスを取得
            last_indices = context.cu_seqlens_q[1:] - 1

            # 入力から最後のトークンを抽出
            x = x[last_indices].contiguous()

            logger.debug(f"最後のトークンを抽出（Prefill） {x.shape=}")


        # 線形射影を適用
        logits = F.linear(x, self.weight)

        # 複数GPUの場合
        if self.tp_size > 1:

            # メインプロセスの場合、出力を初期化
            all_logits = [
                torch.empty_like(logits) for _ in range(self.tp_size)
            ] if self.tp_rank == 0 else None

            # 全GPUのロジットを集約
            dist.gather(logits, all_logits, 0)

            # メインプロセスでロジットを連結
            logits = torch.cat(all_logits, -1) if self.tp_rank == 0 else None

        logger.info(f"出力の言語モデルヘッドの順伝播完了 {logits.shape if logits is not None else None=}")
        return logits


In [None]:
import torch
from torch import nn

class RMSNorm(nn.Module):
    """
    Root Mean Square Layer Normalizationの実装
    """

    def __init__(
        self,
        hidden_size: int,
        eps: float = 1e-6,
    ) -> None:
        logger.info(f"RMSNormを初期化 {hidden_size=}, {eps=}")
        super().__init__()
        self.eps = eps
        # 1で初期化
        self.weight = nn.Parameter(torch.ones(hidden_size))
        logger.info(f"RMSNormの初期化完了")

    @torch.compile
    def rms_forward(
        self,
        x: torch.Tensor,
    ) -> torch.Tensor:
        """
        RMSNormの正規化

        Args:
            x (torch.Tensor): 入力テンソル
        Returns:
            torch.Tensor: 正規化された出力テンソル
        """
        logger.info(f"正規化開始 {x.shape=}")

        orig_dtype = x.dtype

        # アップキャスト
        x = x.float()

        # 分散を計算
        var = x.pow(2).mean(dim=-1, keepdim=True)

        # 分散の逆平方根を乗じて正規化
        x.mul_(torch.rsqrt(var + self.eps))

        # 元のデータ型に戻し、ゲインを乗じる
        x = x.to(orig_dtype).mul_(self.weight)

        logger.info(f"正規化完了 {x.shape=}")
        return x

    @torch.compile
    def add_rms_forward(
        self,
        x: torch.Tensor,
        residual: torch.Tensor,
    ) -> tuple[torch.Tensor, torch.Tensor]:
        """
        残差接続を伴うRMSNormの正規化
        """
        logger.info(f"残差接続を適用")

        orig_dtype = x.dtype

        # アップキャストして残差接続を適用
        x = x.float().add_(residual.float())

        # ダウンキャスト
        residual = x.to(orig_dtype)

        # 分散を計算
        var = x.pow(2).mean(dim=-1, keepdim=True)

        # 分散の逆平方根を乗じて正規化
        x.mul_(torch.rsqrt(var + self.eps))

        # 元のデータ型に戻し、ゲインを乗じる
        x = x.to(orig_dtype).mul_(self.weight)

        logger.info(f"残差接続を適用完了")
        return x, residual

    def forward(
        self,
        x: torch.Tensor,
        residual: torch.Tensor | None = None,
    ) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor]:
        """
        RMSNormの順伝播
        """
        logger.info(f"RMSNormの順伝播開始 {x.shape=}, {residual.shape if residual is not None else None=}")

        # 残差接続がない場合
        if residual is None:
            result = self.rms_forward(x)

        # 残差接続がある場合
        else:
            result = self.add_rms_forward(x, residual)

        return result

In [None]:
import torch
from torch import nn
import torch.nn.functional as F
import torch.distributed as dist

def divide(numerator, denominator):
    assert numerator % denominator == 0
    return numerator // denominator


class LinearBase(nn.Module):
    """
    テンソル並列化（TP, Tensor Parallelism）に対応した線形層の基底クラス
    """

    def __init__(
        self,
        input_size: int,
        output_size: int,
        bias: bool = False,
        tp_dim: int | None = None,
    ):
        """
        Args:
            input_size (int): 入力の次元数
            output_size (int): 出力の次元数
            bias (bool): バイアス項の有無
            tp_dim (int | None): 並列化する次元
        """
        logger.info(f"線形層の基底クラスを初期化開始 {input_size=}, {output_size=}, {bias=}, {tp_dim=}")

        super().__init__()

        # 1) テンソル並列化の設定

        self.tp_dim = tp_dim
        self.tp_rank = dist.get_rank()
        self.tp_size = dist.get_world_size()

        # 2) 重みパラメータの初期化

        self.weight = nn.Parameter(torch.empty(output_size, input_size))

        # 3) 重みの読み込み関数を設定

        self.weight.weight_loader = self.weight_loader

        # バイアス項がある場合
        if bias:

            # バイアスパラメータの初期化
            self.bias = nn.Parameter(torch.empty(output_size))

            # バイアスの読み込み関数を設定
            self.bias.weight_loader = self.weight_loader
        
        # バイアス項がない場合
        else:

            # バイアスパラメータをNoneに設定
            self.register_parameter("bias", None)

        logger.info(f"線形層の基底クラスを初期化完了")

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        raise NotImplementedError


class ReplicatedLinear(LinearBase):
    """
    並列化しない線形層で、全てのGPUで同じ重みを持つ
    """

    def __init__(
        self,
        input_size: int,
        output_size: int,
        bias: bool = False,
    ):
        logger.info(f"並列化なしの線形層を初期化開始 {input_size=}, {output_size=}, {bias=}")

        super().__init__(input_size, output_size, bias)

        logger.info(f"並列化なしの線形層を初期化完了")

    def weight_loader(self, param: nn.Parameter, loaded_weight: torch.Tensor):
        """
        モデルのチェックポイントから重みをロードする
        すべてのGPUで同じ重みを使用する

        Args:
            param (nn.Parameter): ロード先のパラメータ
            loaded_weight (torch.Tensor): チェックポイントからロードした重みテンソル
        """
        logger.info(f"並列化なしの線形層の重みをロード開始 {param.shape=}, {loaded_weight.shape=}")

        param.data.copy_(loaded_weight)

        logger.info(f"並列化なしの線形層の重みをロード完了")

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        並列化なしの線形層の順伝播

        Args:
            x (torch.Tensor): 入力テンソル
        Returns:
            torch.Tensor: 出力テンソル
        """
        logger.info(f"並列化なしの線形層の順伝播開始 {x.shape=}")

        result = F.linear(x, self.weight, self.bias)

        logger.info(f"並列化なしの線形層の順伝播完了 {result.shape=}")
        return result


class ColumnParallelLinear(LinearBase):
    """
    重みの列方向（出力次元）を分割してテンソル並列化する線形層
    出力はGPUごとに異なり、全体の一部しか持たない
    """

    def __init__(
        self,
        input_size: int,
        output_size: int,
        bias: bool = False,
    ):
        logger.info(f"出力次元を並列化する線形層を初期化開始 {input_size=}, {output_size=}, {bias=}")

        tp_size = dist.get_world_size()

        # output_sizeをGPU数で分割
        super().__init__(input_size, divide(output_size, tp_size), bias, 0)

        logger.info(f"出力次元を並列化する線形層を初期化完了 {self.tp_size=}")

    def weight_loader(self, param: nn.Parameter, loaded_weight: torch.Tensor):
        """
        モデルのチェックポイントから出力次元を並列化する線形層の重みをロードする

        Args:
            param (nn.Parameter): ロード先のパラメータ
            loaded_weight (torch.Tensor): チェックポイントからロードした重みテンソル
        """
        logger.info(f"出力次元を並列化する線形層の重みを読み込み開始 {param.shape=}, {loaded_weight.shape=}")

        param_data = param.data

        # 分割方向（dim=0）のサイズを取得
        shard_size = param_data.size(self.tp_dim)

        # 自分の担当する分割の開始インデックスを計算
        start_idx = self.tp_rank * shard_size

        # チェックポイントからロードした重みをスライスしてコピー
        loaded_weight = loaded_weight.narrow(
            self.tp_dim, start_idx, shard_size)

        param_data.copy_(loaded_weight)

        logger.info(f"出力次元を並列化する線形層の重みを読み込み完了")

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        logger.info(f"出力次元を並列化する線形層の順伝播開始 {x.shape=}")

        result = F.linear(x, self.weight, self.bias)

        logger.info(f"出力次元を並列化する線形層の順伝播完了 {result.shape=}")
        return result


class MergedColumnParallelLinear(ColumnParallelLinear):
    """
    複数の線形層を一つにまとめて出力次元をテンソル並列化する線形層
    """

    def __init__(
        self,
        input_size: int,
        output_sizes: list[int], # 各線形層の出力次元のリスト
        bias: bool = False,
    ):
        logger.info(f"MergedColumnParallelLinearを初期化開始 {input_size=}, {output_sizes=}, {bias=}")

        self.output_sizes = output_sizes
        super().__init__(input_size, sum(output_sizes), bias)

        logger.info(f"MergedColumnParallelLinearを初期化完了")

    def weight_loader(self, param: nn.Parameter, loaded_weight: torch.Tensor, loaded_shard_id: int):
        """
        モデルのチェックポイントからMergedColumnParallelLinearの重みをロードする
        Args:
            param (nn.Parameter): ロード先のパラメータ
            loaded_weight (torch.Tensor): チェックポイントからロードした重みテンソル
            loaded_shard_id (int): ロードする線形層のID
        """
        logger.info(f"MergedColumnParallelLinearの重みを読み込み開始 {param.shape=}, {loaded_weight.shape=}, {loaded_shard_id=}")

        param_data = param.data

        # 書き込み先のオフセットを計算
        shard_offset = sum(self.output_sizes[:loaded_shard_id]) // self.tp_size

        # 書き込むサイズを計算
        shard_size = self.output_sizes[loaded_shard_id] // self.tp_size

        # パラメータ内の書き込み位置を特定
        param_data = param_data.narrow(self.tp_dim, shard_offset, shard_size)

        # ロードした重みを分割
        loaded_weight = loaded_weight.chunk(self.tp_size, self.tp_dim)[self.tp_rank]

        # 重みをコピー
        param_data.copy_(loaded_weight)

        logger.info(f"MergedColumnParallelLinearの重みを読み込み完了")


class QKVParallelLinear(ColumnParallelLinear):
    """
    QKVを一つにまとめて出力次元をテンソル並列化する線形層
    """

    def __init__(
        self,
        hidden_size: int,
        head_size: int,
        total_num_heads: int,
        total_num_kv_heads: int | None = None,
        bias: bool = False,
    ):
        logger.info(f"QKVParallelLinearを初期化開始 {hidden_size=}, {head_size=}, {total_num_heads=}, {total_num_kv_heads=}, {bias=}")

        tp_size = dist.get_world_size()

        total_num_kv_heads = total_num_kv_heads or total_num_heads

        self.head_size = head_size

        # 各GPUが担当するヘッド数を計算
        self.num_heads = divide(total_num_heads, tp_size)

        # KVヘッド数を計算
        self.num_kv_heads = divide(total_num_kv_heads, tp_size)

        # 全体の出力サイズ = (Qヘッド数 + Kヘッド数 + Vヘッド数) * ヘッド次元
        output_size = (total_num_heads + 2 * total_num_kv_heads) * self.head_size

        super().__init__(hidden_size, output_size, bias)

        logger.info(f"QKVParallelLinearを初期化完了 {self.tp_size=}, {self.num_heads=}, {self.num_kv_heads=}")

    def weight_loader(self, param: nn.Parameter, loaded_weight: torch.Tensor, loaded_shard_id: str):
        logger.info(f"QKVParallelLinearの重みを読み込み開始 {param.shape=}, {loaded_weight.shape=}, {loaded_shard_id=}")

        param_data = param.data

        assert loaded_shard_id in ["q", "k", "v"]

        # Qの場合
        if loaded_shard_id == "q":
            shard_size = self.num_heads * self.head_size
            shard_offset = 0

        # Kの場合
        elif loaded_shard_id == "k":
            shard_size = self.num_kv_heads * self.head_size
            shard_offset = self.num_heads * self.head_size

        # Vの場合
        else:
            shard_size = self.num_kv_heads * self.head_size
            shard_offset = self.num_heads * self.head_size + self.num_kv_heads * self.head_size

        # パラメータ内の書き込み位置を特定
        param_data = param_data.narrow(self.tp_dim, shard_offset, shard_size)

        # 重みを分割して担当分を取得
        loaded_weight = loaded_weight.chunk(self.tp_size, self.tp_dim)[self.tp_rank]

        # 重みをコピー
        param_data.copy_(loaded_weight)

        logger.info(f"QKVParallelLinearの重みを読み込み完了")


class RowParallelLinear(LinearBase):
    """
    重みの行方向（入力次元）を分割してテンソル並列化する線形層
    すべてのGPUで同じ出力を持つ
    """

    def __init__(
        self,
        input_size: int,
        output_size: int,
        bias: bool = False,
    ):
        logger.info(f"RowParallelLinearを初期化開始 {input_size=}, {output_size=}, {bias=}")

        tp_size = dist.get_world_size()

        # input_sizeをGPU数で分割
        # 重み行列(out, in)の1次元目（in側）を分割
        super().__init__(divide(input_size, tp_size), output_size, bias, 1)

        logger.info(f"RowParallelLinearを初期化完了 {self.tp_size=}")

    def weight_loader(self, param: nn.Parameter, loaded_weight: torch.Tensor):
        logger.info(f"RowParallelLinearの重みを読み込み開始 {param.shape=}, {loaded_weight.shape=}")

        param_data = param.data

        # 分割方向（dim=1）のサイズを取得
        shard_size = param_data.size(self.tp_dim)

        start_idx = self.tp_rank * shard_size

        # dim=1（列）をスライスして取り出す
        loaded_weight = loaded_weight.narrow(
            self.tp_dim, start_idx, shard_size)

        param_data.copy_(loaded_weight)

        logger.info(f"RowParallelLinearの重みを読み込み完了")

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        logger.info(f"RowParallelLinearの順伝播開始 入力形状: {x.shape}")

        # ローカルでの行列積
        # メインプロセスのみバイアスを使用
        y = F.linear(x, self.weight, self.bias if self.tp_rank == 0 else None)

        # All-Reduceで集約
        if self.tp_size > 1:
            dist.all_reduce(y)

        logger.info(f"RowParallelLinearの順伝播完了 出力形状: {y.shape}")
        return y


In [None]:
from functools import lru_cache
import torch
from torch import nn

def apply_rotary_emb(
    x: torch.Tensor,
    cos: torch.Tensor,
    sin: torch.Tensor,
) -> torch.Tensor:
    """
    ベクトルを回転させる

    Args:
        x (torch.Tensor): 回転させるテンソル
        cos (torch.Tensor): コサイン成分
        sin (torch.Tensor): サイン成分
    Returns:
        torch.Tensor: 回転後のテンソル
    """
    logger.info(f"RoPEを適用開始 {x.shape=}, {cos.shape=}, {sin.shape=}")

    # 次元分割
    x1, x2 = torch.chunk(x.float(), 2, dim=-1)
    logger.debug(f"チャンク化 {x1.shape=}, {x2.shape=}")

    # 回転行列を適用
    y1 = x1 * cos - x2 * sin
    y2 = x2 * cos + x1 * sin
    logger.debug(f"回転行列適用 {y1.shape=}, {y2.shape=}")

    # 結合
    result = torch.cat((y1, y2), dim=-1).to(x.dtype)

    logger.info(f"RoPEを適用完了 {result.shape=}")
    return result


class RotaryEmbedding(nn.Module):
    """
    Rotary Positional Embedding (RoPE)の実装
    """

    def __init__(
        self,
        head_size: int,
        rotary_dim: int,
        max_position_embeddings: int,
        base: float,
    ) -> None:
        logger.info(f"RoPEを初期化 {head_size=}, {rotary_dim=}, {max_position_embeddings=}, {base=}")

        super().__init__()

        self.head_size = head_size

        assert rotary_dim == head_size

        # 逆周波数を計算
        # theta_i = 1 / (base ** (2i / d))
        inv_freq = 1.0 / (base**(torch.arange(0, rotary_dim, 2, dtype=torch.float) / rotary_dim))

        # 位置インデックスを生成
        t = torch.arange(max_position_embeddings, dtype=torch.float)

        # 位置と逆周波数の外積を計算
        freqs = torch.einsum("i,j -> ij", t, inv_freq)
        logger.debug(f"周波数計算 {freqs.shape=}")

        # コサインとサインを計算
        cos = freqs.cos()
        sin = freqs.sin()

        # コサインとサインを結合し、バッチ次元を追加
        cache = torch.cat((cos, sin), dim=-1).unsqueeze_(1)
        logger.debug(f"コサイン・サインキャッシュ計算 {cache.shape=}")

        # バッファとして登録（state_dictに含めない）
        self.register_buffer("cos_sin_cache", cache, persistent=False)

        logger.info(f"RoPEの初期化完了")

    @torch.compile
    def forward(
        self,
        positions: torch.Tensor,
        query: torch.Tensor,
        key: torch.Tensor,
    ) -> tuple[torch.Tensor, torch.Tensor]:
        """
        順伝播を実行

        Args:
            positions (torch.Tensor): 位置インデックスのテンソル
            query (torch.Tensor): クエリのテンソル
            key (torch.Tensor): キーのテンソル
        """

        logger.info(f"RoPEの順伝播開始 {positions.shape=}, {query.shape=}, {key.shape=}")

        # 位置に対応するコサイン・サインを取得
        cos_sin = self.cos_sin_cache[positions]

        # コサインとサインに分割
        cos, sin = cos_sin.chunk(2, dim=-1)

        # クエリとキーにRoPEを適用
        query = apply_rotary_emb(query, cos, sin)
        key = apply_rotary_emb(key, cos, sin)

        logger.info(f"RoPEの順伝播完了 {query.shape=}, {key.shape=}")
        return query, key


@lru_cache(1) # キャッシュを有効にする
def get_rope(
    head_size: int,
    rotary_dim: int,
    max_position: int,
    base: float,
    rope_scaling: dict | None = None,
):
    """
    RoPEインスタンスを取得するユーティリティ関数
    """

    assert rope_scaling is None

    # RoPEインスタンスを生成
    rotary_emb = RotaryEmbedding(head_size, rotary_dim, max_position, base)

    return rotary_emb

In [None]:
import torch
from torch import nn

class Sampler(nn.Module):
    """
    Gumbel-Max Trickを用いたサンプリング
    """

    def __init__(self):
        logger.info(f"Samplerを初期化")
        super().__init__()
        logger.info(f"Samplerの初期化完了")

    @torch.compile
    def forward(self, logits: torch.Tensor, temperatures: torch.Tensor):
        logger.info(f"サンプリング開始 {logits.shape=}, {temperatures.shape=}")

        # ロジットを温度でスケーリング
        logits = logits.float().div_(temperatures.unsqueeze(dim=1))

        # 確率分布に変換
        probs = torch.softmax(logits, dim=-1)

        # Gumbel-Max Trickでサンプリング
        # 指数分布 Exp(1) に従うノイズを生成し、逆数を乗じて最大値を取る
        # 数学的には argmax(log(probs) + Gumbel_noise) と等価
        sample_tokens = probs.div_(
            torch.empty_like(probs).exponential_(1).clamp_min_(1e-10)
            ).argmax(dim=-1)

        logger.info(f"サンプリング完了 {sample_tokens.shape=}")
        return sample_tokens


In [None]:
import torch
from torch import nn
import torch.distributed as dist
from transformers import Qwen3Config

class Qwen3Attention(nn.Module):
    """
    Qwen3のSelf-Attentionを実装したクラス
    QK-NormとRoPEをサポート
    """

    def __init__(
        self,
        hidden_size: int,
        num_heads: int,
        num_kv_heads: int,
        max_position: int = 4096 * 32,
        head_dim: int | None = None,
        rms_norm_eps: float = 1e-06,
        qkv_bias: bool = False,
        rope_theta: float = 10000,
        rope_scaling: tuple | None = None,
    ) -> None:

        logger.info(f"Qwen3Attentionを初期化開始 hidden_size: {hidden_size}, num_heads: {num_heads}, num_kv_heads: {num_kv_heads}, max_position: {max_position}, head_dim: {head_dim}, rms_norm_eps: {rms_norm_eps}, qkv_bias: {qkv_bias}, rope_theta: {rope_theta}, rope_scaling: {rope_scaling}")

        super().__init__()

        tp_size = dist.get_world_size()

        self.total_num_heads = num_heads
        assert self.total_num_heads % tp_size == 0
        self.num_heads = self.total_num_heads // tp_size
        self.total_num_kv_heads = num_kv_heads
        assert self.total_num_kv_heads % tp_size == 0
        self.num_kv_heads = self.total_num_kv_heads // tp_size
        self.head_dim = head_dim or hidden_size // self.total_num_heads
        self.q_size = self.num_heads * self.head_dim
        self.kv_size = self.num_kv_heads * self.head_dim
        self.scaling = self.head_dim ** -0.5
        self.qkv_bias = qkv_bias

        # 入力をクエリ、キー、バリューに変換する線形層
        # QKVは結合され、GPU間で列並列に分割されている
        self.qkv_proj = QKVParallelLinear(
            hidden_size,
            self.head_dim,
            self.total_num_heads,
            self.total_num_kv_heads,
            bias=qkv_bias,
        )

        # アテンションの出力を元の隠れ層サイズに戻す線形層
        # 行並列で、All-Reduce通信を使ってGPU間の出力を集約する
        self.o_proj = RowParallelLinear(
            self.total_num_heads * self.head_dim,
            hidden_size,
            bias=False,
        )

        # RoPEの初期化
        self.rotary_emb = get_rope(
            self.head_dim,
            rotary_dim=self.head_dim,
            max_position=max_position,
            base=rope_theta,
            rope_scaling=rope_scaling,
        )

        # アテンション機構の初期化
        self.attn = Attention(
            self.num_heads,
            self.head_dim,
            self.scaling,
            self.num_kv_heads,
        )

        # QK-Normの初期化
        # クエリとキーの内積の前に正規化を行う
        if not self.qkv_bias:
            self.q_norm = RMSNorm(self.head_dim, eps=rms_norm_eps)
            self.k_norm = RMSNorm(self.head_dim, eps=rms_norm_eps)

        logger.info(f"Qwen3Attentionの初期化完了")

    def forward(
        self,
        positions: torch.Tensor,
        hidden_states: torch.Tensor,
    ) -> torch.Tensor:
        logger.info(f"Qwen3Attentionの順伝播 {positions.shape=} {hidden_states.shape=}")

        # 入力からQKVに変換
        qkv = self.qkv_proj(hidden_states)

        # QKVを分割
        q, k, v = qkv.split([self.q_size, self.kv_size, self.kv_size], dim=-1)

        # QKVの形状を調整
        q = q.view(-1, self.num_heads, self.head_dim)
        k = k.view(-1, self.num_kv_heads, self.head_dim)
        v = v.view(-1, self.num_kv_heads, self.head_dim)

        # バイアス項がない場合、QK-Normを適用 
        if not self.qkv_bias:
            q = self.q_norm(q)
            k = self.k_norm(k)

        # RoPEを適用
        q, k = self.rotary_emb(positions, q, k)

        # アテンションを計算
        o = self.attn(q, k, v)

        # 出力を元の形状に戻し、線形変換を適用
        output = self.o_proj(o.flatten(1, -1))

        logger.info(f"Qwen3Attentionの順伝播完了 {output.shape=}")

        return output


class Qwen3MLP(nn.Module):
    """
    SwiGLUを採用し、分散並列処理（Tensor Parallelism）に対応したQwen3のMLP層
    """

    def __init__(
        self,
        hidden_size: int,
        intermediate_size: int,
        hidden_act: str,
    ) -> None:
        logger.info(f"Qwen3MLPを初期化開始 {hidden_size=} {intermediate_size=} {hidden_act=}")

        super().__init__()

        # Gate層とUp層を単一の行列に結合した線形層
        # 列並列化されているため、GPU間で出力が分割される
        self.gate_up_proj = MergedColumnParallelLinear(
            hidden_size,
            [intermediate_size] * 2, # [Gate, Up]
            bias=False,
        )

        # Down層の線形層
        # 行並列化されており、GPU間で出力が集約される
        self.down_proj = RowParallelLinear(
            intermediate_size,
            hidden_size,
            bias=False,
        )
        
        # SiLU活性化関数
        assert hidden_act == "silu"
        self.act_fn = SiluAndMul()

        logger.info(f"Qwen3MLPの初期化完了")

    def forward(self, x):
        logger.info(f"Qwen3MLPの順伝播 {x.shape=}")

        # GateとUpの線形変換を適用
        gate_up = self.gate_up_proj(x)

        # SiLU活性化関数を適用
        x = self.act_fn(gate_up)

        # Downの線形変換を適用
        x = self.down_proj(x)

        logger.info(f"Qwen3MLPの順伝播完了 {x.shape=}")
        return x


class Qwen3DecoderLayer(nn.Module):
    """
    Transformerのデコーダレイヤーを実装したクラス
    """

    def __init__(
        self,
        config: Qwen3Config,
    ) -> None:
        logger.info(f"Qwen3DecoderLayerを初期化開始 {config.hidden_size=} {config.num_attention_heads=} {config.num_key_value_heads=} {config.intermediate_size=} {config.rms_norm_eps=} {config.hidden_act=}")

        super().__init__()

        # Self-Attentionの初期化
        self.self_attn = Qwen3Attention(
            hidden_size=config.hidden_size,
            num_heads=config.num_attention_heads,
            num_kv_heads=config.num_key_value_heads,
            max_position=config.max_position_embeddings,
            rms_norm_eps=config.rms_norm_eps,
            qkv_bias=getattr(config, 'attention_bias', True),
            head_dim=getattr(config, 'head_dim', None),
            rope_theta=getattr(config, "rope_theta", 1000000),
            rope_scaling=getattr(config, "rope_scaling", None),
        )

        # MLPの初期化
        self.mlp = Qwen3MLP(
            hidden_size=config.hidden_size,
            intermediate_size=config.intermediate_size,
            hidden_act=config.hidden_act,
        )

        # アテンションの前のレイヤー正規化
        self.input_layernorm = RMSNorm(
            config.hidden_size, eps=config.rms_norm_eps)

        # アテンションの後のレイヤー正規化
        self.post_attention_layernorm = RMSNorm(
            config.hidden_size, eps=config.rms_norm_eps)

        logger.info(f"Qwen3DecoderLayerの初期化完了")

    def forward(
        self,
        positions: torch.Tensor,
        hidden_states: torch.Tensor,
        residual: torch.Tensor | None,
    ) -> tuple[torch.Tensor, torch.Tensor]:
        logger.info(f"Qwen3DecoderLayerの順伝播 {positions.shape=} {hidden_states.shape=} {residual is None=}")

        # 1) アテンション前のレイヤー正規化と残差接続

        # 前の層の残差接続がない場合
        if residual is None:

            # レイヤー正規化を適用し、残差を設定
            hidden_states, residual = self.input_layernorm(hidden_states), hidden_states

        # 前の層の残差接続がある場合
        else:

            # レイヤー正規化を適用し、残差を更新
            hidden_states, residual = self.input_layernorm(
                hidden_states, residual)

        # 2) セルフアテンションの適用

        hidden_states = self.self_attn(positions, hidden_states)

        # 3) アテンション後のレイヤー正規化と残差接続

        hidden_states, residual = \
            self.post_attention_layernorm(hidden_states, residual)

        # 4) MLPの適用

        hidden_states = self.mlp(hidden_states)

        logger.info(f"Qwen3DecoderLayerの順伝播完了 {hidden_states.shape=}")

        return hidden_states, residual


class Qwen3Model(nn.Module):
    """
    埋め込み層、複数のデコーダレイヤー、最終正規化層から構成されるQwen3モデル
    """

    def __init__(
        self,
        config: Qwen3Config,
    ) -> None:
        logger.info(f"Qwen3Modelを初期化開始 {config.vocab_size=} {config.hidden_size=} {config.num_hidden_layers=}")

        super().__init__()

        # 埋め込み層の初期化
        self.embed_tokens = VocabParallelEmbedding(
            config.vocab_size, config.hidden_size)

        # Transformerブロックの積み重ね
        self.layers = nn.ModuleList(
            [Qwen3DecoderLayer(config) for _ \
            in range(config.num_hidden_layers)])

        # 最終正規化層の初期化
        self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)

        logger.info(f"Qwen3Modelの初期化完了")


    def forward(
        self,
        input_ids: torch.Tensor,
        positions: torch.Tensor,
    ) -> torch.Tensor:
        logger.info(f"Qwen3Modelの順伝播開始 {input_ids.shape=} {positions.shape=}")

        # 入力の埋め込み層を適用
        hidden_states = self.embed_tokens(input_ids)

        residual = None

        # 各デコーダレイヤーを順に適用
        for layer in self.layers:
            hidden_states, residual = layer(
                positions, hidden_states, residual)

        # 最終正規化層を適用
        hidden_states, _ = self.norm(hidden_states, residual)

        logger.info(f"Qwen3Modelの順伝播完了 {hidden_states.shape=}")

        return hidden_states


class Qwen3ForCausalLM(nn.Module):
    """
    Qwen3モデルに言語モデリングヘッドを追加したクラス
    """

    # 事前学習済みの重みを正しく読み込むためのマッピング
    packed_modules_mapping = {
        "q_proj": ("qkv_proj", "q"),
        "k_proj": ("qkv_proj", "k"),
        "v_proj": ("qkv_proj", "v"),
        "gate_proj": ("gate_up_proj", 0),
        "up_proj": ("gate_up_proj", 1),
    }

    def __init__(
        self,
        config: Qwen3Config
    ) -> None:
        logger.info(f"Qwen3ForCausalLMを初期化開始 {config.vocab_size=} {config.hidden_size=} {config.tie_word_embeddings=}")

        super().__init__()

        # Qwen3モデルの初期化
        self.model = Qwen3Model(config)

        # 言語モデリングヘッドの初期化
        self.lm_head = ParallelLMHead(config.vocab_size, config.hidden_size)

        # 埋め込み層と出力層の重みを共有する場合（Weight Tying）
        if config.tie_word_embeddings:
            # 出力の重みを入力の埋め込み層の重みで初期化
            self.lm_head.weight.data = self.model.embed_tokens.weight.data

        logger.info(f"Qwen3ForCausalLMの初期化完了")

    def forward(
        self,
        input_ids: torch.Tensor,
        positions: torch.Tensor,
    ) -> torch.Tensor:
        logger.info(f"Qwen3ForCausalLMの順伝播開始 {input_ids.shape=} {positions.shape=}")

        output = self.model(input_ids, positions)

        logger.info(f"Qwen3ForCausalLMの順伝播完了 {output.shape=}")

        return output

    def compute_logits(
        self,
        hidden_states: torch.Tensor,
    ) -> torch.Tensor:
        logger.info(f"Qwen3ForCausalLMのロジット計算開始 {hidden_states.shape=}")

         # 言語モデリングヘッドを適用してロジットを計算
        logits = self.lm_head(hidden_states)

        logger.info(f"Qwen3ForCausalLMのロジット計算完了 {logits.shape=}")
        return logits

## Engine

In [None]:
import os
from glob import glob
import torch
from torch import nn
from safetensors import safe_open

def default_weight_loader(param: nn.Parameter, loaded_weight: torch.Tensor):
    logger.info(f"重みを読み込み {param.shape=} {loaded_weight.shape=}")
    param.data.copy_(loaded_weight)


def load_model(model: nn.Module, path: str):
    """
    HuggingFaceのsafetensorsファイルを適切に読み込む
    名前の変換と結合されたレイヤーへの振り分けを行う

    Args:
        model (nn.Module): 重みを読み込むモデル
        path (str): safetensorsファイルが保存されているディレクトリのパス
    """

    logger.info(f"モデルの重みを読み込み開始 {path=}")

    # マッピングの定義を取得
    # 例: q_proj -> (qkv_proj, q)
    packed_modules_mapping = getattr(model, "packed_modules_mapping", {})

    # safetensorsファイルでループ
    for file in glob(os.path.join(path, "*.safetensors")):
        logger.debug(f"重みファイルを処理中 {file=}")

        # safetensorsファイルを開く
        with safe_open(file, "pt", "cpu") as f:

            # ファイル内の各重みでループ
            for weight_name in f.keys():
                logger.debug(f"重みを読み込み中 {weight_name=}")

                # マッピングのキーでループ
                for k in packed_modules_mapping:
                    # マッピングキーが重み名に含まれている場合
                    # 例: q_projが含まれている場合
                    if k in weight_name:
                        logger.debug(f"マッピングによる重みを読み込み {weight_name=} {k=}")

                        # マッピングから情報を抽出
                        v, shard_id = packed_modules_mapping[k]

                        # パラメータ名を変更
                        # 例: ...q_proj... -> ...qkv_proj...
                        param_name = weight_name.replace(k, v)

                        # パラメータを取得
                        param = model.get_parameter(param_name)

                        # カスタムローダーを呼び出し
                        weight_loader = getattr(param, "weight_loader")

                        # 重みを読み込み
                        # シャードIDを渡すことでオフセットを指定できるようにする
                        weight_loader(
                            param,
                            f.get_tensor(weight_name),
                            shard_id)

                        break

                # 通常の重み読み込み
                else:
                    logger.debug(f"通常の重み読み込み {weight_name=}")

                    # パラメータを取得
                    param = model.get_parameter(weight_name)

                    # カスタムローダーがあれば使用しなければデフォルトを使用
                    weight_loader = getattr(
                        param, "weight_loader", default_weight_loader)

                    # 重みを読み込み
                    weight_loader(param, f.get_tensor(weight_name))

    logger.info("モデルの重みを読み込み完了")

In [None]:
@dataclass
class Config:
    """
    エンジンの起動設定パラメータの管理クラス
    """

    # モデルのディレクトリパス
    model: str

    # 1バッチ内で同時に処理できる最大トークン数
    max_num_batched_tokens: int = 16384

    # 1バッチ内で同時に処理できる最大シーケンス数
    max_num_seqs: int = 512

    # モデルが扱える最大コンテキスト長
    max_model_len: int = 4096

    # GPUメモリ使用率の上限
    gpu_memory_utilization: float = 0.9

    # 使用するGPUの数（Tensor Parallelismのサイズ）
    tensor_parallel_size: int = 1

    # CUDA Graphによる高速化を有効化
    enforce_eager: bool = False

    # HuggingFaceのモデル設定
    hf_config: AutoConfig | None = None

    # 特殊トークンID
    eos: int = -1

    # PagedAttentionの1ブロックあたりのトークン数
    kvcache_block_size: int = 256

    # KV Cacheの物理ブロック数
    num_kvcache_blocks: int = -1

    def __post_init__(self):
        logger.info(f"Config初期化の後処理 {self=}")
        assert os.path.isdir(self.model)
        assert self.kvcache_block_size % 256 == 0
        assert 1 <= self.tensor_parallel_size <= 8
        self.hf_config = AutoConfig.from_pretrained(self.model)

        # モデルの最大コンテキスト長をHuggingFaceの設定に合わせる
        self.max_model_len = min(self.max_model_len, self.hf_config.max_position_embeddings)

        assert self.max_num_batched_tokens >= self.max_model_len

In [None]:
@dataclass
class SamplingParams:
    """
    テキスト生成の設定
    """

    # 温度
    temperature: float = 1.0

    # 生成する最大トークン数
    max_tokens: int = 64

    # EOSトークンを無視するか
    ignore_eos: bool = False

    def __post_init__(self):
        logger.info(f"SamplingParams初期化の後処理 {self=}")
        assert self.temperature > 1e-10, "greedy sampling is not permitted"

In [None]:

@dataclass
class Context:
    """
    推論実行時のメタデータを格納するコンテキストクラス
    """

    # PrefillかDecodeかを示すフラグ
    is_prefill: bool = False

    # クエリーの累積シーケンス長
    cu_seqlens_q: torch.Tensor | None = None

    # キーの累積シーケンス長
    cu_seqlens_k: torch.Tensor | None = None

    # クエリーの最大シーケンス長
    max_seqlen_q: int = 0

    # キーの最大シーケンス長
    max_seqlen_k: int = 0

    # トークンごとの物理メモリスロットID
    # スロットは、トークン1個のキーとバリューを保存するメモリユニット
    slot_mapping: torch.Tensor | None = None

    # 各シーケンスの現在のコンテキスト長
    context_lens: torch.Tensor | None = None

    # 論理ブロックIDと物理ブロックIDのマッピングテーブル
    # ブロックは複数のスロットをまとめたメモリユニット（256スロットなど）
    block_tables: torch.Tensor | None = None

_CONTEXT = Context()
logger.info(f"グローバルコンテキスト初期化 {_CONTEXT=}")

def get_context():
    logger.info(f"現在のコンテキスト取得 {_CONTEXT=}")
    return _CONTEXT

def set_context(is_prefill, cu_seqlens_q=None, cu_seqlens_k=None, max_seqlen_q=0, max_seqlen_k=0, slot_mapping=None, context_lens=None, block_tables=None):
    logger.info(f"コンテキストを設定 {is_prefill=} {cu_seqlens_q=} {cu_seqlens_k=} {max_seqlen_q=} {max_seqlen_k=} {slot_mapping=} {context_lens=} {block_tables=}")
    global _CONTEXT
    _CONTEXT = Context(is_prefill, cu_seqlens_q, cu_seqlens_k, max_seqlen_q, max_seqlen_k, slot_mapping, context_lens, block_tables)

def reset_context():
    logger.info("コンテキストをリセット")
    global _CONTEXT
    _CONTEXT = Context()

In [None]:
from copy import copy
from enum import Enum, auto
from itertools import count

class SequenceStatus(Enum):
    # リクエスト実行の待機状態 = GPUメモリ未割り当て
    WAITING = auto()

    # PrefillまたはDecodeの実行状態 = GPU上にKVキャッシュが存在
    RUNNING = auto()

    # リクエスト完了状態 = メモリ解放待ち・メモリ解放済み
    FINISHED = auto()


class Sequence:
    """
    トークンのシーケンスや論理ブロックと物理ブロックのマッピングを保持するクラス
    """

    # ブロック内のトークン数
    block_size = 256

    # シーケンスIDのカウンタ
    counter = count()

    def __init__(self, token_ids: list[int], sampling_params = SamplingParams()):
        logger.info(f"シーケンスを初期化 {len(token_ids)=}, {sampling_params=}")

        self.seq_id = next(Sequence.counter)
        self.status = SequenceStatus.WAITING
        self.token_ids = copy(token_ids)
        self.last_token = token_ids[-1]
        self.num_tokens = len(self.token_ids)
        self.num_prompt_tokens = len(token_ids)

        # KVキャッシュに保存されているトークン数
        self.num_cached_tokens = 0

        # このシーケンスが使用している物理ブロックIDのリスト
        self.block_table = []

        self.temperature = sampling_params.temperature
        self.max_tokens = sampling_params.max_tokens
        self.ignore_eos = sampling_params.ignore_eos
        logger.info(f"シーケンスを初期化完了 {self.seq_id=}")

    def __len__(self):
        return self.num_tokens

    def __getitem__(self, key):
        return self.token_ids[key]

    @property
    def is_finished(self):
        return self.status == SequenceStatus.FINISHED

    @property
    def num_completion_tokens(self):
        return self.num_tokens - self.num_prompt_tokens

    @property
    def prompt_token_ids(self):
        return self.token_ids[:self.num_prompt_tokens]

    @property
    def completion_token_ids(self):
        return self.token_ids[self.num_prompt_tokens:]

    @property
    def num_cached_blocks(self):
        return self.num_cached_tokens // self.block_size

    @property
    def num_blocks(self):
        return (self.num_tokens + self.block_size - 1) // self.block_size

    @property
    def last_block_num_tokens(self):
        return self.num_tokens - (self.num_blocks - 1) * self.block_size

    def block(self, i):
        """
        i番目の論理ブロックに含まれるトークンIDのリストを返す

        Args:
            i (int): 論理ブロックのインデックス
        Returns:
            list[int]: 論理ブロックに含まれるトークンIDのリスト
        """
        logger.info(f"論理ブロックを取得 {i=}")

        assert 0 <= i < self.num_blocks

        # トークンIDのスライスを取得
        result = self.token_ids[i*self.block_size: (i+1)*self.block_size]

        logger.info(f"論理ブロックを取得完了 {i=} {result=}")
        return result

    def append_token(self, token_id: int):
        """
        新しく生成されたトークンをシーケンスに追加し、内部状態を更新する
        Scheduler.postprocessから呼び出される

        Args:
            token_id (int): 追加するトークンID
        """

        logger.info(f"トークンを追加 {token_id=}")

        # トークンIDを追加
        self.token_ids.append(token_id)

        # 最新のトークンを更新（次のステップの入力）
        self.last_token = token_id

        # トークン総数を更新
        self.num_tokens += 1

        logger.info(f"トークンを追加完了 {token_id=} {self.num_tokens=}")

    def __getstate__(self):
        """
        シーケンスの状態をシリアライズするためのメソッド
        Pickle化する際に呼び出される

        Prefill中は全てのトークンIDを送る
        Decode中は最後のトークンIDのみ送る
        """
        return (
            self.num_tokens,
            self.num_prompt_tokens,
            self.num_cached_tokens,
            self.block_table,
            self.token_ids if self.num_completion_tokens == 0 \
                else self.last_token
            )

    def __setstate__(self, state):
        """
        シーケンスの状態をデシリアライズするためのメソッド
        Pickleから復元する際に呼び出される
        """
        self.num_tokens, self.num_prompt_tokens, self.num_cached_tokens, self.block_table = state[:-1]

        # Prefill中
        if self.num_completion_tokens == 0:
            # 全てのトークンIDを復元
            self.token_ids = state[-1]

        # Decode中
        else:
            # last_tokenだけ復元
            self.last_token = state[-1]


In [None]:
from collections import deque
import xxhash
import numpy as np

class Block:
    """
    GPU上の巨大なKVキャッシュ領域の一部の状態を追跡するクラス
    """

    def __init__(self, block_id):
        """
        Args:
            block_id (int): 物理的なブロックID
        """
        logger.info(f"ブロックを初期化開始 {block_id=}")

        # 物理的なブロックID
        # kv_cacheテンソルの何番目のスロットに対応するかを示す
        # 0, 1, 2, ...
        self.block_id = block_id

        # 参照カウント
        # このブロックを参照しているシーケンスの数
        # 0: 未使用で再利用可能
        # 1: 1つのシーケンスが独占して使用
        # 2以上: 複数のシーケンスが共有して使用（Prefix CachingやBeam Search時）
        self.ref_count = 0

        # このブロックに格納されているトークン列のハッシュ値
        # 新しいシーケンスが来たとき、同じトークン列を持つブロックが存在するかを検索するため
        self.hash = -1

        # このブロックに含まれる実際のトークンIDのリスト
        self.token_ids = []

        logger.info(f"ブロックを初期化完了 {self.block_id=}")

    def update(self, hash: int, token_ids: list[int]):
        """
        ハッシュ値とトークンの内容を記録し、あとで検索可能にする
        ブロックがデータで満たされた（あるいはPrefix Cacheとして登録される）時に
        呼び出される
        """
        logger.info(f"ブロックを更新 {self.block_id} {hash=} {token_ids=}")
        self.hash = hash
        self.token_ids = token_ids

    def reset(self):
        """
        ブロックを新しく割り当てる時に呼び出される
        """
        logger.info(f"ブロックをリセット {self.block_id}")

        # ブロックを使用中である状態に変更
        self.ref_count = 1

        self.hash = -1
        self.token_ids = []


class BlockManager:
    """
    PagedAttentionで物理メモリブロックを管理するクラス
    Blockインスタンスのリストを保持して、以下のフローを制御:

    1. 新しいシーケンスが来る
    2. そのトークン列のハッシュを計算する
    3. 既存のBlockの中から同じハッシュを持つものを探す（Prefix Caching）
    3-1. 見つかった場合、そのBlockのref_countを増やして共有する
    3-2. 見つからなかった場合、ref_countが0のBlockを探し、reset()して割り当てる
    """

    def __init__(self, num_blocks: int, block_size: int):
        logger.info(f"ブロックマネージャーを初期化開始 {num_blocks=} {block_size=}")

        # ブロックに格納するトークン数
        # 256
        self.block_size = block_size

        # 全ての物理ブロックを初期化
        # 911
        self.blocks: list[Block] = [Block(i) for i in range(num_blocks)]

        # {ハッシュ値: ブロックID} の辞書を初期化
        # Prefix Cachingの検索用
        self.hash_to_block_id: dict[int, int] = dict()

        # 空きブロックのキューを初期化
        # 911
        self.free_block_ids: deque[int] = deque(range(num_blocks))

        # 使用中のブロックIDのセットを初期化
        self.used_block_ids: set[int] = set()

        logger.info(f"ブロックマネージャーを初期化完了")

    @classmethod
    def compute_hash(cls, token_ids: list[int], prefix: int = -1):
        """
        高速なハッシュ関数xxhashを使い、トークン列から一意なIDを生成する
        キャッシュを検索する際に使用
        """
        logger.info(f"一意のハッシュを計算 {token_ids=} {prefix=}")


        h = xxhash.xxh64()

        if prefix != -1:
            h.update(prefix.to_bytes(8, "little"))

        h.update(np.array(token_ids).tobytes())

        # 例: 7975407488731654516
        res = h.intdigest()

        logger.info(f"一意のハッシュを計算完了 {res=}")
        return res

    def _allocate_block(self, block_id: int) -> Block:
        """
        空きブロックを1つ割り当てて返す
        """
        logger.info(f"空きブロックを割り当て開始 {block_id=}")


        block = self.blocks[block_id]

        assert block.ref_count == 0

        # ref_countを1に設定して使用中にする
        block.reset()

        self.free_block_ids.remove(block_id)

        self.used_block_ids.add(block_id)

        res = self.blocks[block_id]

        logger.info(f"空きブロックを割り当て完了 {block_id=}")
        return res

    def _deallocate_block(self, block_id: int) -> Block:
        """
        ブロックを1つ解放する
        """
        logger.info(f"ブロックを解放開始 {block_id=}")

        # 参照カウントが0であることを確認
        assert self.blocks[block_id].ref_count == 0

        self.used_block_ids.remove(block_id)

        self.free_block_ids.append(block_id)

        logger.info(f"ブロックを解放完了 {block_id=}")

    def can_allocate(self, seq: Sequence) -> bool:
        result =  len(self.free_block_ids) >= seq.num_blocks
        logger.info(f"ブロックを割り当て可能か確認 {seq.seq_id} {seq.num_blocks=} {result=}")
        return result

    def allocate(self, seq: Sequence):
        """
        新しいシーケンスが来たとき、過去のキャッシュを利用可否を確認しブロックを割り当てる
        """

        logger.info(f"シーケンスにブロックを割り当て開始 {seq.seq_id}")

        assert not seq.block_table
        h = -1
        cache_miss = False

        # シーケンスが必要とするブロック数分ループ
        for i in range(seq.num_blocks):
            token_ids = seq.block(i)

            # シーケンス長がブロックサイズと同じ場合、ハッシュを計算
            h = self.compute_hash(token_ids, h) if len(token_ids) == self.block_size else -1

            # 既存のキャッシュを検索
            block_id = self.hash_to_block_id.get(h, -1)

            # 存在しない場合
            if block_id == -1 or self.blocks[block_id].token_ids != token_ids:
                # キャッシュミス
                cache_miss = True

            # キャッシュミスの場合
            if cache_miss:
                # 空きのブロックIDを取得
                block_id = self.free_block_ids[0]

                # 新しいブロックを割り当てる
                block = self._allocate_block(block_id)

            # キャッシュヒットの場合
            else:
                # キャッシュ済みのトークン数を更新
                seq.num_cached_tokens += self.block_size

                # 既に使用中の場合
                if block_id in self.used_block_ids:
                    # ブロックを共有（Prefix Caching）
                    block = self.blocks[block_id]

                    # 参照カウントを増やす
                    block.ref_count += 1

                # 未使用の場合（理論上ここには来ないはず）
                else:
                    # ブロックを割り当てる
                    block = self._allocate_block(block_id)

            # ハッシュ値が有効な場合
            if h != -1:
                # ブロックの内容を更新
                block.update(h, token_ids)

                # ハッシュテーブルに登録
                self.hash_to_block_id[h] = block_id

            # シーケンスのブロックテーブルに追加
            seq.block_table.append(block_id)

        logger.info(f"シーケンスにブロックを割り当て完了 {seq.seq_id} {seq.block_table=}")


    def deallocate(self, seq: Sequence):
        """
        割り当てたブロックを解放する

        Args:
            seq (Sequence): 使い終わったシーケンス
        """
        logger.info(f"シーケンスのブロックを解放開始 {seq.seq_id}")

        # シーケンスが使用しているブロックIDのリストを逆順で処理
        for block_id in reversed(seq.block_table):

            # ブロックを取得
            block = self.blocks[block_id]

            # 参照カウントを減らす
            block.ref_count -= 1

            # 参照カウントが0の場合
            if block.ref_count == 0:

                # 物理的に解放
                self._deallocate_block(block_id)

        # シーケンスの状態をリセット
        seq.num_cached_tokens = 0
        seq.block_table.clear()

        logger.info(f"シーケンスのブロックを解放完了 {seq.seq_id}")

    def can_append(self, seq: Sequence) -> bool:
        """
        新しいブロックが必要にある場合、空きを確認する
        """
        result = len(self.free_block_ids) >= (len(seq) % self.block_size == 1)
        logger.info(f"シーケンスにブロックを追加可能か確認 {seq.seq_id} {len(seq)=} {result=}")
        return result

    def may_append(self, seq: Sequence):
        """
        必要に応じてシーケンスに物理ブロックを追加する
        1. 直前のブロックが満杯の場合、新しいブロックを割り当てる
        2. 直前のブロックがちょうど満杯の場合、ハッシュを計算して登録する
        3. 直前のブロックが満杯でない場合、何もしない
        """
        # シーケンスのブロックテーブルを取得
        block_table = seq.block_table

        # 直前のブロックを取得
        last_block = self.blocks[block_table[-1]]

        # 直前のブロックが満杯の場合
        if len(seq) % self.block_size == 1:

            # 新しいブロックを割り当てる
            assert last_block.hash != -1
            block_id = self.free_block_ids[0]
            self._allocate_block(block_id)
            block_table.append(block_id)
            logger.info(f"直前のブロックが満杯のため、新しいブロックを追加 {seq.seq_id=} {block_id=}")

        # 直前のブロックがちょうど満杯の場合
        elif len(seq) % self.block_size == 0:

            # ブロックを更新する
            assert last_block.hash == -1
            token_ids = seq.block(seq.num_blocks-1)

            # 直前のブロックのハッシュをプレフィックスとして取得
            prefix = self.blocks[block_table[-2]].hash \
                if len(block_table) > 1 else -1

            # 現在のブロックのハッシュを計算
            h = self.compute_hash(token_ids, prefix)

            # ブロックにハッシュを設定
            last_block.update(h, token_ids)

            # ハッシュテーブルに登録
            logger.info(f"直前のブロックがちょうど満杯になったため、ハッシュを計算し、hash_to_block_idに登録 {seq.seq_id=} {h=}")
            self.hash_to_block_id[h] = last_block.block_id

        # 直前のブロックが満杯でない場合
        else:
            assert last_block.hash == -1
            logger.info(f"直前のブロックは満杯ではないため、スキップ {seq.seq_id=}")


In [None]:
from collections import deque

class Scheduler:
    """
    推論リクエストのスケジュールとKVキャッシュの管理を行うクラス
    """

    def __init__(self, config: Config):
        logger.info(f"スケジューラを初期化 {config.max_num_seqs=}, {config.max_num_batched_tokens=} {config.eos=}, {config.num_kvcache_blocks=}, {config.kvcache_block_size=}")

        # 1) 制約条件の読み込み

        # 同時に処理できるシーケンスの最大数
        self.max_num_seqs = config.max_num_seqs

        # 1バッチに含められる最大トークン数
        self.max_num_batched_tokens = config.max_num_batched_tokens

        # 終了トークンID
        self.eos = config.eos

        # 2) ブロックマネージャの初期化

        # BlockManagerインスタンスを作成
        self.block_manager = BlockManager(
            config.num_kvcache_blocks, # 確保された物理ブロックの総数
            config.kvcache_block_size) # 1ブロックあたりのトークン数

        # 3) キューの初期化

        # GPUメモリが割り当てられていないリクエスト
        self.waiting: deque[Sequence] = deque()

        # GPUメモリが割り当てられて実行中のリクエスト
        self.running: deque[Sequence] = deque()

        logger.info(f"スケジューラを初期化完了")

    def is_finished(self):
        """
        スケジューラが管理しているすべてのリクエストが完了しているかどうかを返す
        """
        result = not self.waiting and not self.running
        logger.info(f"スケジューラの完了状態を確認 {result=}")
        return result

    def add(self, seq: Sequence):
        """
        スケジューラに新しいリクエストを追加する

        Args:
            seq (Sequence): 追加するシーケンス
        """
        self.waiting.append(seq)
        logger.info(f"スケジューラに新しいリクエストを追加 {seq.seq_id=}")

    def schedule(self) -> tuple[list[Sequence], bool]:
        """
        次にどのリクエストをGPUで実行するかを決める
        Prefillのリクエストを優先

        Returns:
            tuple[list[Sequence], bool]:
                スケジュールされたシーケンスのリストと、prefillかdecodeかのフラグ
        """
        logger.info(f"スケジューリングを開始")

        # 1) Prefillリクエストのスケジューリング

        scheduled_seqs = []
        num_seqs = 0
        num_batched_tokens = 0

        # waitingキューをループ
        while self.waiting and num_seqs < self.max_num_seqs:

            # 先頭のシーケンスを取得
            seq = self.waiting[0]

            if num_batched_tokens + len(seq) > self.max_num_batched_tokens \
                or not self.block_manager.can_allocate(seq):
                break

            num_seqs += 1

            self.block_manager.allocate(seq)

            num_batched_tokens += len(seq) - seq.num_cached_tokens
            seq.status = SequenceStatus.RUNNING
            self.waiting.popleft()
            self.running.append(seq)
            scheduled_seqs.append(seq)

        # Prefillリクエストがあった場合
        if scheduled_seqs:

            # ここで返す
            logger.info(f"スケジューリング完了（Prefill） {len(scheduled_seqs)=}")
            return scheduled_seqs, True

        # 2) Decodeリクエストのスケジューリング

        # runningキューをループ
        while self.running and num_seqs < self.max_num_seqs:

            # 先頭のシーケンスを取得
            seq = self.running.popleft()

            # 次のトークンをメモリ不足で追加できない場合
            while not self.block_manager.can_append(seq):

                # 現在実行中のシーケンスがある場合
                if self.running:
                    # 末尾の低優先度のシーケンスをwaitingキューに戻す
                    self.preempt(self.running.pop())
                else:
                    # 自分自身をwaitingキューに戻す
                    self.preempt(seq)
                    break

            # 次のトークンを追加可能な場合、スケジュールに加える
            else:
                num_seqs += 1
                self.block_manager.may_append(seq)
                scheduled_seqs.append(seq)

        assert scheduled_seqs
        
        # スケジュールされたシーケンスをrunningキューの先頭に戻す
        self.running.extendleft(reversed(scheduled_seqs))

        logger.info(f"スケジューリング完了（Decode） {len(scheduled_seqs)=}")
        return scheduled_seqs, False

    def preempt(self, seq: Sequence):
        """
        実行中のリクエストを中断しwaitingキューに戻し、メモリを解放する

        Args:
            seq (Sequence): 中断するシーケンス
        """
        logger.info(f"実行中リクエストを中断開始 {seq.seq_id=}")

        # シーケンスの状態を実行中から待機中に変更
        seq.status = SequenceStatus.WAITING

        # ブロックマネージャからメモリを解放
        self.block_manager.deallocate(seq)

        # waitingキューの先頭にシーケンスを戻す
        self.waiting.appendleft(seq)

        logger.info(f"実行中リクエストを中断完了 {seq.seq_id=}")

    def postprocess(self, seqs: list[Sequence], token_ids: list[int]) -> list[bool]:
        """
        生成したトークンを各シーケンスに追加し、終了判定を行う

        Args:
            seqs (list[Sequence]): トークンを追加するシーケンスのリスト
            token_ids (list[int]): 各シーケンスに追加するトークンIDのリスト
        Returns:
            list[bool]: 各シーケンスが終了したかどうかのリスト
        """
        logger.info(f"生成トークンの後処理を開始 {len(seqs)=} {len(token_ids)=}")

        # シーケンスと生成したトークンIDをループ
        for seq, token_id in zip(seqs, token_ids):

            # シーケンスにトークンを追加
            seq.append_token(token_id)

            # 終了判定
            if (not seq.ignore_eos and token_id == self.eos) or \
                seq.num_completion_tokens == seq.max_tokens:

                # シーケンスの状態を終了に変更
                seq.status = SequenceStatus.FINISHED

                # ブロックマネージャからメモリを解放
                self.block_manager.deallocate(seq)

                # runningキューからシーケンスを削除
                self.running.remove(seq)

        logger.info(f"生成トークンの後処理完了 {len(seqs)=}")



In [None]:
import pickle
import torch
import torch.distributed as dist
from multiprocessing.synchronize import Event
from multiprocessing.shared_memory import SharedMemory

class ModelRunner:
    """
    デバイス上でモデルの推論を実行するための管理クラス
    """

    def __init__(self, config: Config, rank: int, event: Event | list[Event]):
        """
        Args:
            config (Config): モデル設定
            rank (int): ランクID
            event (Event | list[Event]):
                ランク0以外のプロセスでは同期用イベント、
                ランク0ではランク1以降のイベントリスト
        """

        logger.info(f"Initializing ModelRunner on rank {rank}")

        # 1) 設定の保存

        self.config = config

        hf_config = config.hf_config

        self.block_size = config.kvcache_block_size
        self.enforce_eager = config.enforce_eager
        self.world_size = config.tensor_parallel_size

        # 自身のランクID
        self.rank = rank

        self.event = event

        # 2) 分散プロセスの初期化

        dist.init_process_group(
            "nccl", # 分散処理バックエンドをNCCLに設定
            "tcp://localhost:2333", # 2333ポートを使用して他のプロセスと通信
            world_size=self.world_size,
            rank=rank
        )

        # 3) デバイス設定

        # このデバイスが使用するデバイスを設定
        torch.cuda.set_device(rank)

        default_dtype = torch.get_default_dtype()

        torch.set_default_dtype(hf_config.torch_dtype)

        torch.set_default_device("cuda")

        # 4) モデルを読み込み

        # モデルをランダムに初期化
        self.model = Qwen3ForCausalLM(hf_config)

        # モデルにパラメータを読み込み
        load_model(self.model, config.model)

        # 5) サンプラーの初期化

        self.sampler = Sampler()

        # 6) 最適化

        # 遅延初期化のコンポーネントやメモリ割り当てを実行
        self.warmup_model()

        # 可能な限り巨大なKVキャッシュ領域を確保
        self.allocate_kv_cache()

        if not self.enforce_eager:

            # 計算グラフをキャプチャ
            self.capture_cudagraph()

        torch.set_default_device("cpu")

        torch.set_default_dtype(default_dtype)

        # 7) 分散処理の分岐

        if self.world_size > 1:

            # メインプロセスの場合
            if rank == 0:

                # 共有メモリを作成
                # メインプロセスからワーカーへ指令を送るための領域
                self.shm = SharedMemory(
                    name="nanovllm", create=True, size=2**20
                )
                dist.barrier()

            # ワーカープロセスの場合
            else:
                dist.barrier()

                # 共有メモリを開く
                self.shm = SharedMemory(name="nanovllm")

                # 無限ループに入り、メインプロセスからの指令を待機
                self.loop()

        logger.info(f"Initialized ModelRunner on rank {rank}")

    def exit(self):
        """
        ワーカー停止時にリソースを開放する
        """
        logger.info(f"Exiting ModelRunner on rank {self.rank}")

        # マルチGPU環境の場合
        if self.world_size > 1:

            # 共有メモリへのアクセスを閉じる
            self.shm.close()

            # 全てのCPUプロセスを待機
            dist.barrier()

            # メインプロセスの場合
            if self.rank == 0:

                # 共有メモリを削除
                self.shm.unlink()

        # CUDA Graphを使用している場合
        if not self.enforce_eager:
            # CUDA Graphのインスタンスとメモリプールを削除
            del self.graphs, self.graph_pool

        # プロセス内のCPUとGPUの同期
        torch.cuda.synchronize()

        # 分散処理用の通信グループを破棄
        dist.destroy_process_group()

        logger.info(f"Exited ModelRunner on rank {self.rank}")

    def loop(self):
        """
        ワーカープロセスの無限ループ
        """
        logger.info(f"Starting loop on rank {self.rank}")

        while True:
            # 指令を待機
            method_name, args = self.read_shm()

            # 指令を実行
            self.call(method_name, *args)

            # 終了指令の場合
            if method_name == "exit":
                # ループを抜ける
                break

    def read_shm(self):
        """
        ワーカープロセスがメインプロセスからの指令を待機して取得する

        Returns:
            method_name (str): 実行するメソッド名
            args (list): メソッドの引数リスト
        """

        # メインプロセス以外であることを確認
        assert self.world_size > 1 and self.rank > 0

        # 指令が来るまで待機
        self.event.wait()

        # 共有メモリから先頭4バイトをリトルエンディアンで読み込む
        # nは読み取るべきデータの長さ
        n = int.from_bytes(self.shm.buf[0:4], "little")

        # 共有メモリからnバイト分のデータを読み込み、pickleでデシリアライズ
        # [メソッド名, 引数1, 引数2, ...] の形式で取得
        method_name, *args = pickle.loads(self.shm.buf[4:n+4])

        # 次の指令待機のためにイベントをクリア
        self.event.clear()

        return method_name, args

    def write_shm(self, method_name, *args):
        """
        メインプロセスが待機中のワーカープロセスへ指令を送る

        Args:
            method_name (str): 実行するメソッド名
            args (list): メソッドの引数リスト
        """

        # メインプロセスであることを確認
        assert self.world_size > 1 and self.rank == 0

        # 指令データをpickleでバイト列にシリアライズ
        data = pickle.dumps([method_name, *args])

        # バイト数を取得
        n = len(data)

        # 共有メモリへデータをリトルエンディアンで書き込む
        self.shm.buf[0:4] = n.to_bytes(4, "little")

        # データ本体を書き込む
        self.shm.buf[4:n+4] = data

        # 全てのワーカープロセスに対応するイベントオブジェクトのリストでループ
        for event in self.event:

            # イベントフラグを有効化
            event.set()

    def call(self, method_name, *args):
        """
        メインプロセスとワーカープロセスの両方に同じメソッドを一斉に実行する

        Args:
            method_name (str): 実行するメソッド名
            args (list): メソッドの引数リスト 
        """

        # マルチGPU環境でかつメインプロセスの場合
        if self.world_size > 1 and self.rank == 0:

            # 全てのワーカープロセスへ指令を送る
            self.write_shm(method_name, *args)

        # 自身のインスタンスから指定されたメソッドを取得
        method = getattr(self, method_name, None)

        # メソッドを実行して結果を返す
        return method(*args)

    def warmup_model(self):
        """
        推論を開始する前にダミーデータを使ってモデルを実行し、
        最大負荷時のメモリ消費量を計測・確定する
        これによりKVキャッシュの物理メモリブロックの割り当てを最適化し
        Out of Memoryを防止する
        """
        logger.info(f"Warming up model on rank {self.rank}")

        # 1) メモリ統計のリセット

        # CUDAのメモリキャッシュを解放
        torch.cuda.empty_cache()

        # メモリ使用量の計測カウンターをゼロにリセット
        torch.cuda.reset_peak_memory_stats()

        # 2) ダミーデータの作成

        # 最大バッチサイズを取得
        max_num_batched_tokens = self.config.max_num_batched_tokens

        # 最大シーケンス長を取得
        max_model_len = self.config.max_model_len

        # バッチ内のシーケンス数を計算
        # OOMを防ぐため両方の制約を満たす最大値を使用する
        num_seqs = min(
            max_num_batched_tokens // max_model_len,
            self.config.max_num_seqs
        )

        # 最大負荷のダミーシーケンスを作成
        seqs = [Sequence([0] * max_model_len) for _ in range(num_seqs)]

        # 3) ダミー実行

        # is_prefill=Trueでモデルを実行
        # この実行によりKVキャッシュの最大メモリ使用量が確定する
        self.run(seqs, True)

        # 4) 後処理

        # ウォームアップで使用したメモリを解放
        torch.cuda.empty_cache()

        logger.info(f"Warmed up model on rank {self.rank}")

    def allocate_kv_cache(self):
        """
        GPUの空き容量を計算し、最大のKVキャッシュ領域を確保する
        OOMを防ぎ、効率的なメモリ管理（PagedAttention）を可能にする
        """
        logger.info(f"Allocating KV cache on rank {self.rank}")

        # 1) 現在のGPUメモリ状況を取得

        config = self.config
        hf_config = config.hf_config

        # OSから見たGPUの空き容量と合計容量を取得
        free, total = torch.cuda.mem_get_info()

        # OSから見た現在のメモリ使用量を計算
        used = total - free

        # warmup_modelで計測した推論中の最大メモリ使用量を取得
        peak = torch.cuda.memory_stats()["allocated_bytes.all.peak"]

        # PyTorchが明示的に確保したメモリ使用量を取得
        current = torch.cuda.memory_stats()["allocated_bytes.all.current"]

        # 2) 1ブロックあたりのバイト数を計算

        # KVキャッシュのヘッド数を計算
        num_kv_heads = hf_config.num_key_value_heads // self.world_size

        # ヘッドの次元数を取得
        head_dim = getattr(
            hf_config,
            "head_dim",
            hf_config.hidden_size // hf_config.num_attention_heads
        )

        # 1つの物理ブロック（全層分）のメモリサイズを計算
        # キーとバリューの2つ * レイヤー数 * ブロックサイズ * KVヘッド数 * ヘッド次元数 * データ型のバイト数
        block_bytes = 2 * hf_config.num_hidden_layers * self.block_size * num_kv_heads * head_dim * hf_config.torch_dtype.itemsize

        # 3) 確保可能なKVキャッシュブロック数を計算

        # KVキャッシュに使用可能なメモリ容量を計算
        # GPU全体の90% - PyTorch以外の使用量 - 推論中のピーク使用量
        config.num_kvcache_blocks = int(total * config.gpu_memory_utilization - used - peak + current) // block_bytes

        assert config.num_kvcache_blocks > 0

        # 4) 物理メモリの一括確保

        # 巨大なKVキャッシュテンソルを作成 
        self.kv_cache = torch.empty(
            2,
            hf_config.num_hidden_layers,
            config.num_kvcache_blocks,
            self.block_size,
            num_kv_heads,
            head_dim
        )

        # 5) 各層に参照渡し

        layer_id = 0
        for module in self.model.modules():
            if hasattr(module, "k_cache") and hasattr(module, "v_cache"):
                module.k_cache = self.kv_cache[0, layer_id]
                module.v_cache = self.kv_cache[1, layer_id]
                layer_id += 1

        logger.info(f"Allocated KV cache with {config.num_kvcache_blocks} blocks on rank {self.rank}")

    def prepare_block_tables(self, seqs: list[Sequence]):
        """
        各シーケンスのブロックテーブルを2次元のテンソルに変換しGPUに転送する

        Args:
            seqs (list[Sequence]): シーケンスのリスト
        Returns:
            block_tables (torch.Tensor): ブロックテーブルの2次元テンソル
        """
        logger.info(f"Preparing block tables for {len(seqs)} sequences on rank {self.rank}")

        # 1) バッチ内で最長のブロックテーブルの長さを取得

        max_len = max(len(seq.block_table) for seq in seqs)

        # 2) 短いシーケンス末尾を-1でパディングして長さを揃える

        block_tables = [seq.block_table + [-1] * (max_len - len(seq.block_table)) for seq in seqs]

        # 3) 2次元テンソルに変換してGPUに転送

        block_tables = torch.tensor(
            block_tables,
            dtype=torch.int32,
            pin_memory=True, # CPU側のメモリを固定しGPU転送を高速化
        ).cuda(non_blocking=True) # データ転送を非同期化

        logger.info(f"Prepared block tables with shape {block_tables.shape} on rank {self.rank}")

        return block_tables

    def prepare_prefill(self, seqs: list[Sequence]):
        """
        Prefill（プロンプト処理）で可変長の入力シーケンスのリストを
        平坦化し、Attention計算に必要なメタデータを準備する
        FlashAttention（VarLen版）の入力は1次元テンソルであるため

        Args:
            seqs (list[Sequence]): シーケンスのリスト
        Returns:
            input_ids (torch.Tensor): 平坦化された入力トークンIDのテンソル
            positions (torch.Tensor): 平坦化された位置IDのテンソル
        """

        # 1) 各シーケンスのメタデータを準備

        input_ids = []
        positions = []
        cu_seqlens_q = [0]
        cu_seqlens_k = [0]
        max_seqlen_q = 0
        max_seqlen_k = 0
        slot_mapping = []
        block_tables = None

        # 2) 入力データの処理

        for seq in seqs:

            # 2-1) 入力データの平坦化

            seqlen = len(seq)

            # キャッシュ済みトークンをスキップしたシーケンスを入力に追加
            input_ids.extend(seq[seq.num_cached_tokens:])

            # ポジショナルエンコーディング用の位置インデックスも追加
            positions.extend(list(range(seq.num_cached_tokens, seqlen)))

            # 2-2) FlashAttention用の累積長を計算

            # クエリのシーケンス長を計算
            seqlen_q = seqlen - seq.num_cached_tokens

            # キーのシーケンス長を計算
            seqlen_k = seqlen

            # クエリの累積シーケンス長を更新
            # 例: [0, 5, 12] -> シーケンス1の長さ5、シーケンス2の長さ7
            cu_seqlens_q.append(cu_seqlens_q[-1] + seqlen_q)

            # キーの累積シーケンス長を更新
            cu_seqlens_k.append(cu_seqlens_k[-1] + seqlen_k)

            # クエリの最大シーケンス長を更新
            max_seqlen_q = max(seqlen_q, max_seqlen_q)

            # キーの最大シーケンス長を更新
            max_seqlen_k = max(seqlen_k, max_seqlen_k)

            if not seq.block_table:    # warmup
                continue

            # 2-3) PagedAttention用のスロットマップを作成

            # キャッシュ済みブロックをスキップしてループ
            for i in range(seq.num_cached_blocks, seq.num_blocks):

                # i番目のブロックが格納されている物理メモリの開始インデックスを計算
                start = seq.block_table[i] * self.block_size

                # 最後のブロックではない場合
                if i != seq.num_blocks - 1:
                    # ブロックサイズ分のインデックスを追加
                    end = start + self.block_size

                # 最後のブロックの場合
                else:
                    # 最後のブロックの実際のトークン数分のインデックスを追加
                    end = start + seq.last_block_num_tokens 

                # startからendまでのインデックスをスロットマップに追加
                slot_mapping.extend(list(range(start, end)))

        # 3) Prefix Cacheの判定

        # キーの累積シーケンス長がクエリより大きい場合
        if cu_seqlens_k[-1] > cu_seqlens_q[-1]:

            # Prefix Cacheを使用する
            # Prefix Cacheは、計算しないが参照する過去のトークン
            block_tables = self.prepare_block_tables(seqs)

        # 4) ブロックテーブルを準備

        input_ids = torch.tensor(
            input_ids,
            dtype=torch.int64,
            pin_memory=True
        ).cuda(non_blocking=True)

        positions = torch.tensor(
            positions,
            dtype=torch.int64,
            pin_memory=True
        ).cuda(non_blocking=True)

        cu_seqlens_q = torch.tensor(
            cu_seqlens_q,
            dtype=torch.int32,
            pin_memory=True
        ).cuda(non_blocking=True)

        cu_seqlens_k = torch.tensor(
            cu_seqlens_k,
            dtype=torch.int32,
            pin_memory=True
        ).cuda(non_blocking=True)

        slot_mapping = torch.tensor(
            slot_mapping,
            dtype=torch.int32,
            pin_memory=True
        ).cuda(non_blocking=True)

        # 5) コンテキストへの登録

        # メタデータをグローバルなコンテキストに登録する
        set_context(
            True,
            cu_seqlens_q,
            cu_seqlens_k,
            max_seqlen_q,
            max_seqlen_k,
            slot_mapping,
            None,
            block_tables)

        return input_ids, positions

    def prepare_decode(self, seqs: list[Sequence]):
        """
        トークン生成フェーズ（Decode）において次の1トークンを生成するための
        入力データとPagedAttentionの計算に必要なメタデータを準備する

        Args:
            seqs (list[Sequence]): シーケンスのリスト
        Returns:
            input_ids (torch.Tensor): 入力トークンIDのテンソル
            positions (torch.Tensor): 位置IDのテンソル
        """
        logger.info(f"トークン生成フェーズのメタデータを準備開始 {len(seqs)=}")

        # 1) メタデータを初期化

        input_ids = []
        positions = []
        slot_mapping = []
        context_lens = []

        # 2) 各シーケンスからメタデータを抽出

        # シーケンスのリストでループ
        for seq in seqs:

            # シーケンスの最後のトークンIDを入力に追加
            # Decodeでは直前に生成された1トークンのみを入力とするため
            input_ids.append(seq.last_token)

            # 最後のトークンの位置インデックスを追加
            positions.append(len(seq) - 1)

            # シーケンス全体の長さ
            # Attention計算時に必要
            context_lens.append(len(seq))

            # スロットマッピングの計算
            # 今回の入力に対するKVキャッシュを格納する物理アドレス（スロットID)
            slot_mapping.append(
                seq.block_table[-1] * self.block_size + \
                seq.last_block_num_tokens  - 1)

        # 3) テンソルに変換してGPUに転送

        input_ids = torch.tensor(
            input_ids,
            dtype=torch.int64,
            pin_memory=True).cuda(non_blocking=True)

        positions = torch.tensor(
            positions,
            dtype=torch.int64,
            pin_memory=True).cuda(non_blocking=True)

        slot_mapping = torch.tensor(
            slot_mapping,
            dtype=torch.int32,
            pin_memory=True).cuda(non_blocking=True)

        context_lens = torch.tensor(
            context_lens,
            dtype=torch.int32,
            pin_memory=True).cuda(non_blocking=True)

        # 4) ブロックテーブルの準備

        # 各シーケンスのブロックIDのリストを2次元テンソルに変換
        block_tables = self.prepare_block_tables(seqs)

        # 5) コンテキストへの登録

        # PagedAttention用のカーネルに必要なメタデータをコンテキストに登録
        set_context(
            False, # デコードフェーズ
            slot_mapping=slot_mapping,
            context_lens=context_lens,
            block_tables=block_tables)

        logger.info(f"トークン生成フェーズのメタデータを準備完了 {len(input_ids)=} {len(positions)=}")

        return input_ids, positions

    def prepare_sample(self, seqs: list[Sequence]):
        """
        サンプリングに必要なパラメータを抽出してGPUテンソルにまとめる
        推論サイクルの最後のサンプリング処理で使用する

        Args:
            seqs (list[Sequence]): シーケンスのリスト
        Returns:
            temperatures (torch.Tensor): 各シーケンスの温度パラメータのテンソル
        """
        logger.info(f"サンプリングパラメータを準備開始 {len(seqs)=}")

        temperatures = []

        for seq in seqs:
            temperatures.append(seq.temperature)

        temperatures = torch.tensor(
            temperatures,
            dtype=torch.float32,
            pin_memory=True).cuda(non_blocking=True)

        logger.info(f"サンプリングパラメータを準備完了 {temperatures=}")

        return temperatures

    @torch.inference_mode()
    def run_model(self, input_ids: torch.Tensor, positions: torch.Tensor, is_prefill: bool):
        """
        準備した入力データでモデルを順伝搬し、次のトークン予測のロジットを計算する

        プロンプト処理（Prefill)とトークン生成（Decode)の両方に対応

        Args:
            input_ids (torch.Tensor): 入力トークンIDのテンソル
            positions (torch.Tensor): 位置IDのテンソル
            is_prefill (bool): PrefillフェーズかDecodeフェーズかのフラグ
        Returns:
            logits (torch.Tensor): 次のトークン予測のロジットテンソル
        """
        logger.info(f"モデルの順伝搬を実行開始 {input_ids.shape=} {positions.shape=} {is_prefill=}")

        # A) 通常実行の場合（PrefillまたはCuda Graphが無効化）
        if is_prefill or self.enforce_eager or input_ids.size(0) > 512:

            # モデルの順伝搬を実行し、ロジットを計算
            result = self.model.compute_logits(
                self.model(input_ids, positions)
            )

            logger.info(f"モデルの順伝搬（prefill）を実行完了 {result.shape=}")
            return result

        # B) 高速実行（Decode + CUDA Graphs）の場合
        else:

            # B-1) 初期化

            # バッチサイズを取得
            bs = input_ids.size(0)

            # コンテキストを取得 
            context = get_context()

            # B-2) グラフの選択

            # バッチサイズをカバーできる最小の録画済みグラフを選択
            graph = self.graphs[next(x for x in self.graph_bs if x >= bs)]

            # B-3) データのセット

            # グラフ専用のメモリ領域の参照を取得
            graph_vars = self.graph_vars

            # グラフ専用のメモリ領域にデータをコピー
            graph_vars["input_ids"][:bs] = input_ids
            graph_vars["positions"][:bs] = positions
            graph_vars["slot_mapping"].fill_(-1)
            graph_vars["slot_mapping"][:bs] = context.slot_mapping
            graph_vars["context_lens"].zero_()
            graph_vars["context_lens"][:bs] = context.context_lens
            graph_vars["block_tables"][:bs, :context.block_tables.size(1)] = context.block_tables

            # B-4) グラフの再生

            # CPU介入なしに一気にGPUカーネルを実行
            graph.replay()

            # B-5) 結果の取得

            # 結果を取り出してロジットを計算
            result = self.model.compute_logits(graph_vars["outputs"][:bs])

            logger.info(f"モデルの順伝播（decode）を実行完了 {result.shape=}")
            return result

    def run(self, seqs: list[Sequence], is_prefill: bool) -> list[int]:
        """
        ModelRunnerのメインメソッド
        入力データを準備し、モデルを実行し、次のトークンを決定する

        Args:
            seqs (list[Sequence]): シーケンスのリスト
            is_prefill (bool): PrefillフェーズかDecodeフェーズかのフラグ
        Returns:
            token_ids (list[int]): 生成されたトークンIDのリスト
        """
        logger.info(f"推論の1サイクルを実行開始 {len(seqs)=} {is_prefill=}")

        # 1) 入力データの準備

        input_ids, positions = self.prepare_prefill(seqs) if is_prefill \
            else self.prepare_decode(seqs)

        # 2) サンプリングパラメータの準備

        temperatures = self.prepare_sample(seqs) if self.rank == 0 \
            else None

        # 3) モデルの順伝搬を実行し、ロジットを計算

        logits = self.run_model(input_ids, positions, is_prefill)

        # 4) サンプリングを実行し、次のトークンを決定

        token_ids = self.sampler(logits, temperatures).tolist() \
            if self.rank == 0 else None

        # 5) コンテキストのリセット

        reset_context()

        logger.info(f"推論の1サイクルを実行完了 {(len(token_ids) if token_ids else None)=}")

        return token_ids

    @torch.inference_mode()
    def capture_cudagraph(self):
        """
        様々なバッチサイズの計算グラフを事前にキャプチャする
        Decodeフェーズの計算を高速化するため
        """

        logger.info(f"CUDA Graphをキャプチャ開始 {self.rank=}")

        # 1) キャプチャ用の固定メモリを初期化

        config = self.config

        hf_config = config.hf_config

        # 最大バッチサイズ
        max_bs = min(self.config.max_num_seqs, 512)

        # 最大ブロック数
        max_num_blocks = (config.max_model_len + self.block_size - 1) \
            // self.block_size

        # 入力
        input_ids = torch.zeros(max_bs, dtype=torch.int64)

        # 位置
        positions = torch.zeros(max_bs, dtype=torch.int64)

        # スロットマッピング
        slot_mapping = torch.zeros(max_bs, dtype=torch.int32)

        # コンテキスト長
        context_lens = torch.zeros(max_bs, dtype=torch.int32)

        # ブロックテーブル
        block_tables = torch.zeros(max_bs, max_num_blocks, dtype=torch.int32)

        # 出力
        outputs = torch.zeros(max_bs, hf_config.hidden_size)

        # 2) キャプチャするバッチサイズのパターンを決める

        # バッチサイズのパターン
        # 1, 2, 4, 8, 16, 32, ..., max_bs
        self.graph_bs = [1, 2, 4, 8] + list(range(16, max_bs + 1, 16))

        self.graphs = {}

        self.graph_pool = None

        logger.debug(f"{self.rank=} キャプチャパターン: {self.graph_bs}")

        # 3) パターンごとに計算グラフをキャプチャする

        # バッチサイズのパターンで逆順にループ
        for bs in reversed(self.graph_bs):

            # 3-1) ウォームアップ

            # CUDA Graphのインスタンスを作成
            graph = torch.cuda.CUDAGraph()

            # PagedAttention用のメタデータをコンテキストに登録
            set_context(
                False,
                slot_mapping=slot_mapping[:bs],
                context_lens=context_lens[:bs],
                block_tables=block_tables[:bs])

            # ウォームアップ実行
            # カーネルのコンパイル・メモリアロケーターの初期化
            outputs[:bs] = self.model(input_ids[:bs], positions[:bs])

            # 3-2) キャプチャ開始

            # メモリプールを共有し、グラフごとに作業領域を使い回す
            with torch.cuda.graph(graph, self.graph_pool):

                # モデルの順伝搬をキャプチャ
                outputs[:bs] = self.model(input_ids[:bs], positions[:bs])

            # メモリプールがない場合
            if self.graph_pool is None:
                # 最初にキャプチャしたグラフからメモリループを取得
                # メモリループは使用したメモリ領域の管理情報
                self.graph_pool = graph.pool()

            # 3-3) グラフを保存

            self.graphs[bs] = graph

            # GPU上でキャプチャが終わるまで待機
            torch.cuda.synchronize()

            # コンテキストをリセット
            reset_context()

        # 4) キャプチャに使用した固定メモリへの参照を保存

        # キャプチャに使用したテンソルへの参照を辞書にまとめる
        # 実行時はここにデータをコピーしてからグラフを再生する
        self.graph_vars = dict(
            input_ids=input_ids,
            positions=positions,
            slot_mapping=slot_mapping,
            context_lens=context_lens,
            block_tables=block_tables,
            outputs=outputs,
        )

        logger.info(f"CUDA Graphをキャプチャ完了 {self.rank=}")


In [None]:
import atexit
from dataclasses import fields
from time import perf_counter
from tqdm.auto import tqdm
from transformers import AutoTokenizer
import torch.multiprocessing as mp

class LLMEngine:

    def __init__(self, model, **kwargs):
        """
        Args:
            model (str): モデル名またはパス
        """
        logger.info(f"エンジンを初期化開始 {model=}")


        # 1) 設定の初期化

        config_fields = {field.name for field in fields(Config)}
        config_kwargs = {k: v for k, v in kwargs.items() if k in config_fields}
        config = Config(model, **config_kwargs)

        # 2) ワーカープロセスの起動

        self.ps = []
        self.events = []

        # プロセス生成方式をspawnに設定
        ctx = mp.get_context("spawn")

        # ランク1移行のサブプロセスを生成
        for i in range(1, config.tensor_parallel_size):

            event = ctx.Event()

            # サブプロセスでModelRunnerを実行
            process = ctx.Process(target=ModelRunner, args=(config, i, event))
            process.start()

            self.ps.append(process)
            self.events.append(event)

        # 3) メインプロセスの初期化

        # ランク0（メインプロセス）のModelRunnerを作成
        self.model_runner = ModelRunner(config, 0, self.events)

        # 4) トークナイザーの初期化

        # トークナイザーを読み込み
        self.tokenizer = AutoTokenizer.from_pretrained(
            config.model, use_fast=True
        )

        # 終了判定に使用するEOSトークンIDを取得
        config.eos = self.tokenizer.eos_token_id

        # 5) スケジューラーの初期化

        # リクエストの順番待ちや、KVキャッシュのメモリブロック管理を行うスケジューラ
        self.scheduler = Scheduler(config)

        # 6) 終了処理の登録

        # サブプロセスが確実に終了するようにatexitで登録
        atexit.register(self.exit)

        logger.info(f"エンジンの初期化完了")

    def exit(self):
        """
        エンジン停止時にワーカープロセスや共有メモリを解放する
        """
        logger.info(f"エンジンの終了処理開始")

        # メインプロセスのModelRunnerのexitメソッドを実行
        self.model_runner.call("exit")

        # オブジェクトを削除
        del self.model_runner

        # サブプロセスでループ
        for p in self.ps:
            # プロセス終了を待機
            p.join()

        logger.info(f"エンジンの終了処理完了")

    def add_request(self, prompt: str | list[int], sampling_params: SamplingParams):
        """
        新しいプロンプト（生成リクエスト）を受け取り、キューに登録する

        Args:
            prompt (str | list[int]): プロンプト文字列またはトークンIDのリスト
            sampling_params (SamplingParams): サンプリングパラメータ
        """
        logger.info(f"リクエストをキューに登録開始 {prompt=} {sampling_params=}")

        # 1) Sequenceオブジェクトを作成

        # プロンプトが文字列の場合
        if isinstance(prompt, str):
            # トークンIDのリストに変換
            prompt = self.tokenizer.encode(prompt)

        # Sequenceオブジェクトを初期化
        seq = Sequence(prompt, sampling_params)

        # 2) キューに登録

        # スケジューラに登録
        self.scheduler.add(seq)

        logger.info(f"リクエストをキューに登録完了 {seq.seq_id=}")

    def step(self):
        """
        推論エンジンの1サイクルを実行し、完了したシーケンスの出力を返す
        """

        logger.info(f"推論エンジンの1サイクルを実行開始")

        # 1) スケジューリング

        # 処理するシーケンスをスケジューラから取得
        # is_prefillにより、prefill（プロンプト処理）かdecode（生成処理）かを判定
        seqs, is_prefill = self.scheduler.schedule()

        # 2) モデル実行

        # GPUワーカーにシーケンスを渡してrunメソッドを実行し、トークンを生成
        token_ids = self.model_runner.call("run", seqs, is_prefill)

        # 3) 後処理

        # 新しいトークンをシーケンスに追加し、スケジューラで終了判定する
        self.scheduler.postprocess(seqs, token_ids)

        # 4) 出力の収集

        # 完了したシーケンスの結果を抽出
        outputs = [(seq.seq_id, seq.completion_token_ids) \
            for seq in seqs if seq.is_finished]

        # スループット計算のためのトークン数を計算
        # prefill時: 処理したプロンプトの全トークン数
        # decode時: 生成したトークン数（負の値にして区別）
        num_tokens = sum(len(seq) for seq in seqs) \
            if is_prefill else -len(seqs)

        logger.info(f"推論エンジンの1サイクルを実行完了 {outputs=} {num_tokens=}")

        return outputs, num_tokens

    def is_finished(self):
        """
        推論エンジンに処理するべきタスクが残っているかを確認する
        """
        res = self.scheduler.is_finished()
        logger.info(f"スケジューラに処理すべきタスクが残っているか確認 {res=}")
        return res

    def generate(
        self,
        prompts: list[str] | list[list[int]],
        sampling_params: SamplingParams | list[SamplingParams],
        use_tqdm: bool = True,
    ) -> list[str]:
        """
        プロンプトに対してテキスト生成を行い、生成結果を返す

        Args:
            prompts (list[str] | list[list[int]]):
                プロンプト文字列またはトークンIDのリストのリスト
            sampling_params (SamplingParams | list[SamplingParams]):
                サンプリングパラメータまたはそのリスト
            use_tqdm (bool): 進捗表示にtqdmを使用するかどうか

        Returns:
            list[str]: 生成結果の文字列のリスト
        """
        logger.info(f"テキスト生成を開始 {len(prompts)=} {sampling_params=}")

        # 1) 初期化

        # 進捗表示する場合
        if use_tqdm:

            # プログレスバーを初期化
            pbar = tqdm(
                total=len(prompts), desc="Generating", dynamic_ncols=True
            )

        # SamplingParamsがリストの場合
        if not isinstance(sampling_params, list):

            # 全てのプロンプトに対して同じSamplingParamsを使用するように拡張
            sampling_params = [sampling_params] * len(prompts)

        # 2) リクエストの登録

        # 全てのプロンプトをキューに登録
        for prompt, sp in zip(prompts, sampling_params):
            self.add_request(prompt, sp)

        # 3) 生成ループ

        outputs = {}

        # prefillとdecodeのスループットを初期化
        # スループットは毎秒のトークン処理数
        prefill_throughput = decode_throughput = 0.

        while not self.is_finished():

            # 3-1) トークン生成ステップの実行

            t = perf_counter()

            # 1サイクル実行し、finishedになったシーケンスの出力を取得
            output, num_tokens = self.step()

            # 3-2) スループットの計算

            # 進捗を表示する場合
            if use_tqdm:

                # num_tokensが正の場合（prefillの場合）
                if num_tokens > 0:

                    # prefillスループットを計算
                    prefill_throughput = num_tokens / (perf_counter() - t)

                # num_tokensが負の場合（decodeの場合）
                else:

                    # decodeスループットを計算
                    decode_throughput = -num_tokens / (perf_counter() - t)

                # 進捗バーにスループットを表示
                pbar.set_postfix({
                    "Prefill": f"{int(prefill_throughput)}tok/s",
                    "Decode": f"{int(decode_throughput)}tok/s",
                })

            # 3-3) 完了した結果の収集

            # finishedになったシーケンスでループ
            for seq_id, token_ids in output:

                # 出力を保存
                outputs[seq_id] = token_ids

                # 進捗表示する場合
                if use_tqdm:

                    # プログレスバーを更新
                    pbar.update(1)

        # 4) 結果のデコード

        # シーケンスIDの順にソートし、入力順序に整列
        outputs = [outputs[seq_id] for seq_id in sorted(outputs.keys())]

        # トークンIDのリストを文字列にデコード
        outputs = [{
            "text": self.tokenizer.decode(token_ids),
            "token_ids": token_ids
        } for token_ids in outputs]

        # 進捗表示する場合
        if use_tqdm:

            # プログレスバーを閉じる
            pbar.close()

        logger.info(f"テキスト生成を完了 {len(outputs)=}")
        return outputs

In [None]:
class LLM(LLMEngine):
    pass

## 推論

In [None]:
import os

if os.path.exists("debug.log"):
    os.remove("debug.log")

In [None]:
import os

path =  "/root/huggingface/Qwen3-0.6B"

if not os.path.exists(path):
    !huggingface-cli download --resume-download Qwen/Qwen3-0.6B \
    --local-dir ~/huggingface/Qwen3-0.6B/ \
    --local-dir-use-symlinks False

In [None]:
if "llm" not in globals():
    llm = LLM(path, enforce_eager=True, tensor_parallel_size=1)

In [None]:
sampling_params = SamplingParams(temperature=0.6, max_tokens=1)
prompts = ["Hello"]
outputs = llm.generate(prompts, sampling_params)
outputs[0]["text"]