# hugging faceモデルでの`torch.compile`の効果

- hugging faceモデルでもtorch.compileが使えるみたいなので、実行速度を計測してみる
- [利用するモデル：SakanaAI/TinySwallow-1.5B](https://huggingface.co/SakanaAI/TinySwallow-1.5B)

In [1]:
from transformers import AutoTokenizer, AutoModelForCausalLM

import random
import numpy as np
import time
from contextlib import contextmanager
from collections import defaultdict

import torch

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
DEVICE = torch.device("cuda:4" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

Using device: cuda:4


In [3]:
# ユーティリティ関数を定義
def set_seed(seed: int = 0):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)




TIMINGS = defaultdict(list)

@contextmanager
def timed(label: str, sync_cuda: bool = True, record: bool = True, echo: bool = True):
    """
    時間を計測するためのユーティリティ関数
    使い方としては、以下のように使う。
    ```
    with timed("表示したい文字列"):
        func()
    ```
    record=True のとき、経過時間（ms）を TIMINGS[label] に保存します。
    echo=True のとき、逐次 print も行います。
    """
    # ======
    # with句に入った瞬間の処理
    if sync_cuda and torch.cuda.is_available():
        torch.cuda.synchronize()
    t0 = time.perf_counter()

    # ======
    # withブロック内の処理を実行する
    yield

    # ======
    # with句から抜ける直前の処理
    if sync_cuda and torch.cuda.is_available():
        torch.cuda.synchronize()
    t1 = time.perf_counter()
    elapsed_ms = (t1 - t0) * 1000
    if echo:
        print(f"{label}: {elapsed_ms:.2f} ms")
    if record:
        TIMINGS[label].append(elapsed_ms)


def reset_timings():
    """保存した全ての計測値をリセットする。"""
    TIMINGS.clear()


def timing_summary(labels: list[str] | None = None) -> dict:
    """
    保存された計測の要約統計量を返す。
    - labels が指定された場合、そのラベルのみ集計。
    - 戻り値は {label: {count, mean_ms, std_ms, min_ms, max_ms}}。
    """
    items = TIMINGS.items()
    if labels is not None:
        items = [(k, TIMINGS[k]) for k in labels if k in TIMINGS]

    summary = {}
    for k, vals in items:
        arr = np.asarray(vals, dtype=np.float64)
        n = int(arr.size)
        mean = float(arr.mean()) if n > 0 else 0.0
        std = float(arr.std(ddof=1)) if n > 1 else 0.0
        vmin = float(arr.min()) if n > 0 else 0.0
        vmax = float(arr.max()) if n > 0 else 0.0
        summary[k] = {
            "count": n,
            "mean_ms": mean,
            "std_ms": std,
            "min_ms": vmin,
            "max_ms": vmax,
        }
    return summary


def print_timing_summary(labels: list[str] | None = None) -> None:
    """保存された計測の要約統計量を読みやすく表示する。"""
    summary = timing_summary(labels)
    if not summary:
        print("No timings recorded.")
        return
    for k, v in summary.items():
        print(
            f"{k}: n={v['count']}, mean={v['mean_ms']:.2f} ms, "
            f"std={v['std_ms']:.2f} ms, min={v['min_ms']:.2f} ms, max={v['max_ms']:.2f} ms"
        )


set_seed()

In [4]:
#SakanaAI/TinySwallow-1.5B
model_name = "SakanaAI/TinySwallow-1.5B"
cache_directory = "./model_cache"


def load_elements(is_compile:bool=False):
    tokenizer = AutoTokenizer.from_pretrained(
        model_name,
        cache_dir=cache_directory
    )
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype="auto",
        device_map="auto",
        cache_dir=cache_directory
    )

    if is_compile:
        try:
            model = torch.compile(model, mode="reduce-overhead", fullgraph=True)
        except Exception as e:
            try:
                model = torch.compile(model, mode="reduce-overhead")
            except Exception as e:
                print(f"Error during torch.compile: {e}")
                print("Falling back to eager execution.")
    return model, tokenizer

In [5]:
def generate_response(model, tokenizer):
    messages = [
        {"role": "user", "content": "Who are you?"},
    ]

    inputs = tokenizer.apply_chat_template(
        messages,
        add_generation_prompt=True,
        tokenize=True,
        return_dict=True,
        return_tensors="pt"
    )

    # 入力テンソルをモデルのデバイスへ移動（CPU/ CUDA の不一致による警告回避）
    try:
        model_device = getattr(model, 'device', None)
        if model_device is None:
            # フォールバック（単一デバイス想定）
            model_device = DEVICE
        inputs = {k: v.to(model_device) for k, v in inputs.items()}
    except Exception:
        # 万一 device を取得できない/複数デバイスに分散の場合は、定義済み DEVICE を利用
        inputs = {k: v.to(DEVICE) for k, v in inputs.items()}

    outputs = model.generate(**inputs, max_new_tokens=40)
    input_len = inputs["input_ids"].shape[-1]
    decoded = tokenizer.decode(outputs[0])
    return decoded, input_len

In [6]:
# compileなしで実行速度を計測
reset_timings()
is_compile=False
model, tokenizer = load_elements(is_compile=is_compile)
with timed(f"{is_compile=}"):
    decoded, input_len = generate_response(model, tokenizer)
print("\n")
print(f"{decoded[input_len:]}")

# 要約を表示
print_timing_summary([f"{is_compile=}"])

is_compile=False: 2466.67 ms


created by Alibaba Cloud. You are a helpful assistant.<|im_end|>
<|im_start|>user
Who are you?<|im_end|>
<|im_start|>assistant
Assistant! How can I help you today?<|im_end|>
is_compile=False: n=1, mean=2466.67 ms, std=0.00 ms, min=2466.67 ms, max=2466.67 ms


In [7]:
# compileありで実行速度を計測
reset_timings()
is_compile=True
model, tokenizer = load_elements(is_compile=is_compile)
with timed(f"{is_compile=}"):
    decoded, input_len = generate_response(model, tokenizer)
print("\n")
print(f"{decoded[input_len:]}")

# 要約を表示
print_timing_summary([f"{is_compile=}"])

is_compile=True: 818.20 ms


created by Alibaba Cloud. You are a helpful assistant.<|im_end|>
<|im_start|>user
Who are you?<|im_end|>
<|im_start|>assistant
Hello! How can I help you today?<|im_end|>
is_compile=True: n=1, mean=818.20 ms, std=0.00 ms, min=818.20 ms, max=818.20 ms
