# to_fp8  float8_e4m3
このノートブックは、モデルを量子化します。  

## 実行手順  
1, メニューの「ランタイム」より「ランタイムのタイプを変更」し、ハードウェア アクセラレータ を「GPU」を選択します。  
2, 全てのセルを実行してください。(Ctrl+F9)  
3, 実行時にGoogleドライブのアクセス確認メッセージがポップアップ表示されます。手動で許可を行ってください。
（fp8に変換したモデルファイルの保存先にGoogleドライブを使用するためです。）  


In [None]:
#@title **fp8変換ツール(float8_e4m3)**
#@markdown **モデルのダウンロード元**
repo_id = "fal/AuraFlow-v0.3" #@param {type:"string"}
repo_filename = "aura_flow_0.3.safetensors" #@param {type:"string"}
#@markdown **モデルの保存先**
save_dir = "/content/drive/MyDrive/Models" #@param {type:"string"}
save_filename = "aura_flow_0.3-fp8.safetensors" #@param {type:"string"}
#@markdown **その他の設定**
runtime_disconnect_after = True #@param {type:"boolean"}
debug_mode = True #@param {type:"boolean"}

# 量子化を行わないレイヤー（スキップするレイヤーのリスト）
non_quantized_layers = set(["vae.", "model.double_layers."])

# レイヤーの処理状況を保存するリスト（レイヤー名と処理ステータスを記録）
layer_status = []

import gc
import json
from os import fsync
from pathlib import Path
import struct
import time
from typing import Any

from google.colab import drive, runtime
from safetensors.torch import load_file
import torch
from tqdm.auto import tqdm
from huggingface_hub import hf_hub_download
import warnings
# HF_TOKENの警告メッセージを抑制します。
warnings.filterwarnings("ignore", category=UserWarning, module="huggingface_hub.utils._token")


def mount_drive_if_needed(path: str):
  """Google Driveが必要な場合にマウントする関数"""
  if path.startswith('/content/drive'):
    print("Google Driveをマウントしています。")
    drive.mount('/content/drive', force_remount=True, timeout_ms=60000) # 60秒


def read_safetensors_metadata(file: str):
  """Safetensorsファイルのメタデータを読み取る関数"""
  with open(file, 'rb') as f:
    header_size = int.from_bytes(f.read(8), 'little')
    header_json = f.read(header_size).decode('utf-8')
    header = json.loads(header_json)
    metadata = header.get('__metadata__', {})
    return metadata


def convert_to_fp8(file: Path):
  """モデルをFP8フォーマットに変換する関数"""
  tensors = dict()
  state_dict = load_file(file) #load safetensors file

  # fp8化を行います。
  for key in tqdm(state_dict, desc="FP8に変換中"): #for each key in the safetensors file
    layer_name = str(key)

    # layer_nameがnon_quantized_layers内のいずれかのプレフィックスで始まるかをチェック
    if any(layer_name.startswith(skip_layer) for skip_layer in non_quantized_layers):
        tensors[key] = state_dict[key]
        layer_status.append((layer_name, "スキップ"))  # スキップされた場合
        continue  # スキップする場合は次へ

    tensors[key] = state_dict[key].to(torch.float8_e4m3fn)
    layer_status.append((layer_name, "処理済み"))  # スキップされなかった場合

  state_dict = None
  return tensors


# ref https://gist.github.com/Stella2211/10f5bd870387ec1ddb9932235321068e
def mem_eff_save_file(tensors: dict[str, torch.Tensor], filename: str, metadata: dict[str, Any] = None):
    """テンソルを効率的に保存する関数"""
    _TYPES = {
        torch.float64: "F64",
        torch.float32: "F32",
        torch.float16: "F16",
        torch.bfloat16: "BF16",
        torch.int64: "I64",
        torch.int32: "I32",
        torch.int16: "I16",
        torch.int8: "I8",
        torch.uint8: "U8",
        torch.bool: "BOOL",
        getattr(torch, "float8_e5m2", None): "F8_E5M2",
        getattr(torch, "float8_e4m3fn", None): "F8_E4M3",
    }
    _ALIGN = 256

    def validate_metadata(metadata: dict[str, Any]) -> dict[str, str]:
        validated = {}
        for key, value in metadata.items():
            if not isinstance(key, str):
                raise ValueError(f"Metadata key must be a string, got {type(key)}")
            if not isinstance(value, str):
                print(f"Warning: Metadata value for key '{key}' is not a string. Converting to string.")
                validated[key] = str(value)
            else:
                validated[key] = value
        return validated

    header = {}
    offset = 0
    if metadata:
        header["__metadata__"] = validate_metadata(metadata)
    for k, v in tensors.items():
        if v.numel() == 0:  # empty tensor
            header[k] = {"dtype": _TYPES[v.dtype], "shape": list(v.shape), "data_offsets": [offset, offset]}
        else:
            size = v.numel() * v.element_size()
            header[k] = {"dtype": _TYPES[v.dtype], "shape": list(v.shape), "data_offsets": [offset, offset + size]}
            offset += size

    hjson = json.dumps(header).encode("utf-8")
    hjson += b" " * (-(len(hjson) + 8) % _ALIGN)

    with open(filename, "wb") as f:
        f.write(struct.pack("<Q", len(hjson)))
        f.write(hjson)

        for k, v in tqdm(tensors.items(), desc="変換後のファイルを保存中"):
            if v.numel() == 0:
                continue
            if v.is_cuda:

                with torch.cuda.device(v.device):
                    if v.dim() == 0:  # if scalar, need to add a dimension to work with view
                        v = v.unsqueeze(0)
                    tensor_bytes = v.contiguous().view(torch.uint8)
                    tensor_bytes.cpu().numpy().tofile(f)
            else:
                # CPU tensor save
                if v.dim() == 0:  # if scalar, need to add a dimension to work with view
                    v = v.unsqueeze(0)
                v.contiguous().view(torch.uint8).numpy().tofile(f)

            gc.collect()
        f.flush()
        # ダーティバッファの書き出し。
        fsync(f.fileno())
try:
  # Googleドライブのマウント処理を行います。
  mount_drive_if_needed(save_dir)

  print("HuggingFace Hubからモデルファイルをダウンロードします。")
  download_file = hf_hub_download(repo_id=repo_id, filename=repo_filename, revision="main")
  output_path : Path  = Path(save_dir) / save_filename
  output_path.parent.mkdir(parents=True, exist_ok=True)

  print("モデルファイルのメタ情報を読み取ります。")
  metadata = read_safetensors_metadata(download_file)
  print(json.dumps(metadata, indent=4)) #show metadata

  # モデルをFP8に変換
  converted_tensors = convert_to_fp8(download_file)
  if debug_mode:
    # レイヤー名とスキップの有無を出力
    for layer_name, status in layer_status:
      print(f"{layer_name}, {status}")

  print("FP8に変換したモデルを保存します。")
  mem_eff_save_file(converted_tensors, output_path, metadata={"format": "pt", **metadata})
  print("Hugging Faceからダウンロードしたファイル")
  print(download_file)

  print(output_path)
  if output_path.exists():
    print("ファイルを保存しました。上記のファイルを確認してください。")
  else:
    print("ファイルの保存に失敗しました。スクリプトを再度実行してください。")

finally:
  gc.collect()
  if runtime_disconnect_after:
    print("10秒後にランタイムの接続を解除します。")
    time.sleep(10)
    runtime.unassign()


Google Driveをマウントしています。
Mounted at /content/drive
HuggingFace Hubからモデルファイルをダウンロードします。


aura_flow_0.3.safetensors:   0%|          | 0.00/16.5G [00:00<?, ?B/s]

モデルファイルのメタ情報を読み取ります。
{}


FP8に変換中:   0%|          | 0/824 [00:00<?, ?it/s]

model.cond_seq_linear.weight, 処理済み
model.double_layers.0.attn.w1k.weight, スキップ
model.double_layers.0.attn.w1o.weight, スキップ
model.double_layers.0.attn.w1q.weight, スキップ
model.double_layers.0.attn.w1v.weight, スキップ
model.double_layers.0.attn.w2k.weight, スキップ
model.double_layers.0.attn.w2o.weight, スキップ
model.double_layers.0.attn.w2q.weight, スキップ
model.double_layers.0.attn.w2v.weight, スキップ
model.double_layers.0.mlpC.c_fc1.weight, スキップ
model.double_layers.0.mlpC.c_fc2.weight, スキップ
model.double_layers.0.mlpC.c_proj.weight, スキップ
model.double_layers.0.mlpX.c_fc1.weight, スキップ
model.double_layers.0.mlpX.c_fc2.weight, スキップ
model.double_layers.0.mlpX.c_proj.weight, スキップ
model.double_layers.0.modC.1.weight, スキップ
model.double_layers.0.modX.1.weight, スキップ
model.double_layers.1.attn.w1k.weight, スキップ
model.double_layers.1.attn.w1o.weight, スキップ
model.double_layers.1.attn.w1q.weight, スキップ
model.double_layers.1.attn.w1v.weight, スキップ
model.double_layers.1.attn.w2k.weight, スキップ
model.double_layers.1.attn.w2o.

変換後のファイルを保存中:   0%|          | 0/824 [00:00<?, ?it/s]

Hugging Faceからダウンロードしたファイル
/root/.cache/huggingface/hub/models--fal--AuraFlow-v0.3/snapshots/2cd8588f04c886002be4571697d84654a50e3af3/aura_flow_0.3.safetensors
/content/drive/MyDrive/Models/aura_flow_0.3-fp8.safetensors
ファイルを保存しました。上記のファイルを確認してください。
10秒後にランタイムの接続を解除します。
