In [None]:
#@title üß∞ Pre-convert setup (installs + folders)
import os, sys, platform, torch

print("Python:", sys.version)
print("Platform:", platform.platform())
print("Torch:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("GPU:", torch.cuda.get_device_name(0))
    print("Compute capability:", torch.cuda.get_device_capability(0))
    print("CUDA runtime:", torch.version.cuda)

!pip -q install -U safetensors tqdm requests==2.32.4 huggingface_hub hf_transfer
!pip -q install -U comfy-kitchen

# optional: faster Hugging Face transfers
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"
print("HF_HUB_ENABLE_HF_TRANSFER=1")

MODEL_DIR = "/content/models"
os.makedirs(MODEL_DIR, exist_ok=True)
print("MODEL_DIR:", MODEL_DIR)

In [None]:
#@title üì• Download BF16 model from Hugging Face
from huggingface_hub import hf_hub_download

REPO_ID = "Comfy-Org/Qwen-Image-Layered_ComfyUI" #
FILENAME = "split_files/diffusion_models/qwen_image_layered_bf16.safetensors" #

input_path = hf_hub_download(repo_id=REPO_ID, filename=FILENAME)
print("Downloaded to:", input_path)
# After hf_hub_download(...)
downloaded_path = input_path  # so key-check + convert cells that expect downloaded_path still work
print("downloaded_path set to:", downloaded_path)


In [None]:
#@title üì• Download from Civitai (modelId + modelVersionId) slower
import os, re, requests
from tqdm.auto import tqdm

# ===== YOU SET THESE =====
CIVITAI_MODEL_ID = ""          # optional (info lookup only)
CIVITAI_VERSION_ID = ""        # REQUIRED
CIVITAI_API_TOKEN = ""         # optional; needed for models requiring login
FILENAME_OVERRIDE = ""         # optional
# =========================

def _filename_from_cd(cd: str):
    if not cd:
        return None
    m = re.search(r'filename\*?=(?:UTF-8\'\')?"?([^\";]+)"?', cd, flags=re.IGNORECASE)
    return m.group(1) if m else None

def civitai_download(model_version_id: str, out_dir: str, token: str = "", filename_override: str = "") -> str:
    url = f"https://civitai.com/api/download/models/{model_version_id}"
    # token via query string is explicitly supported
    if token:
        url += f"?token={token}"

    with requests.get(url, stream=True, allow_redirects=True, timeout=(10, 120)) as r:
        r.raise_for_status()
        total = int(r.headers.get("Content-Length", "0") or "0")
        cd = r.headers.get("Content-Disposition", "")

        fname = filename_override.strip() or _filename_from_cd(cd) or f"civitai_{model_version_id}.safetensors"
        out_path = os.path.join(out_dir, fname)

        pbar = tqdm(total=total if total > 0 else None, unit="B", unit_scale=True, desc=f"Downloading {fname}")
        with open(out_path, "wb") as f:
            for chunk in r.iter_content(chunk_size=1024 * 1024):
                if chunk:
                    f.write(chunk)
                    pbar.update(len(chunk))
        pbar.close()

    return out_path

assert CIVITAI_VERSION_ID.strip(), "Set CIVITAI_VERSION_ID (modelVersionId)."
downloaded_path = civitai_download(CIVITAI_VERSION_ID.strip(), MODEL_DIR, CIVITAI_API_TOKEN.strip(), FILENAME_OVERRIDE.strip())
print("‚úÖ Downloaded:", downloaded_path)

In [None]:
#@title üßæ Key check: list safetensors keys (prints + saves full list) + prefix/pattern summary Just for adding model support
import os, re, json
import safetensors

# --------- SETTINGS ----------
PRINT_FIRST_N_KEYS = 160          # print first N keys to the notebook output
SAVE_KEYS_TXT = "/content/model_keys.txt"

# Patterns to quickly spot ‚Äúfamily‚Äù structure + likely blacklist candidates
PATTERNS = [
    # Qwen-family / likely sensitive heads
    "img_in", "txt_in", "time_text_embed", "norm_out", "proj_out",
    "img_mod", "txt_mod", "txt_mlp",

    # Common diffusion/transformer naming
    "final_layer", "adaLN_modulation", "cap_embedder", "x_embedder",
    "noise_refiner", "context_refiner", "t_embedder",
    "attn", "qkv", "to_out", "mlp", "proj", "norm"
]

# Probing reads a few tensors (dtype/shape) ‚Äî keep small to avoid slowdowns on huge models
PROBE_TENSORS = True
PROBE_MAX = 12

# --------- VALIDATE ----------
assert "downloaded_path" in globals(), "downloaded_path is not set (run the download cell first)."
assert downloaded_path.endswith(".safetensors"), f"Not a .safetensors file: {downloaded_path}"
assert os.path.exists(downloaded_path), f"File not found: {downloaded_path}"

file_mb = os.path.getsize(downloaded_path) / (1024**2)
print("‚úÖ File:", downloaded_path)
print(f"‚úÖ Size: {file_mb:,.2f} MB")

# --------- READ KEYS + METADATA ----------
with safetensors.safe_open(downloaded_path, framework="pt") as f:
    keys = list(f.keys())
    meta = f.metadata() or {}

print("\n‚úÖ Key count:", len(keys))
print(f"‚úÖ First {min(PRINT_FIRST_N_KEYS, len(keys))} keys:")
for k in keys[:PRINT_FIRST_N_KEYS]:
    print(k)

# Save all keys to text
with open(SAVE_KEYS_TXT, "w", encoding="utf-8") as w:
    for k in keys:
        w.write(k + "\n")
print("\nüíæ Saved full key list to:", SAVE_KEYS_TXT)

# --------- METADATA QUICK VIEW ----------
print("\n‚úÖ Metadata keys (up to 60):", list(meta.keys())[:60])
# If quant metadata exists, print only its top-level key names (avoid dumping huge JSON)
if "_quantization_metadata" in meta:
    try:
        qm = json.loads(meta["_quantization_metadata"])
        print("‚úÖ _quantization_metadata keys:", list(qm.keys()))
    except Exception as e:
        print("‚ö†Ô∏è Could not parse _quantization_metadata JSON:", str(e)[:200])

# --------- PREFIX / STRUCTURE SUMMARY ----------
has_cu_prefix = any("model.diffusion_model." in k for k in keys)
print("\n‚úÖ Has 'model.diffusion_model.' prefix?", has_cu_prefix)

def top_prefix_counts(keys_list, depth=1, topk=30):
    counts = {}
    for k in keys_list:
        parts = k.split(".")
        prefix = ".".join(parts[:depth]) if len(parts) >= depth else k
        counts[prefix] = counts.get(prefix, 0) + 1
    items = sorted(counts.items(), key=lambda x: (-x[1], x[0]))[:topk]
    return items

print("\nüì¶ Top prefixes (depth=1):")
for p, c in top_prefix_counts(keys, depth=1, topk=30):
    print(f"  {p:32s}  {c}")

print("\nüì¶ Top prefixes (depth=2):")
for p, c in top_prefix_counts(keys, depth=2, topk=30):
    print(f"  {p:32s}  {c}")

# --------- PATTERN HITS (for blacklist/profile building) ----------
def find_hits(substring):
    return [k for k in keys if substring in k]

print("\nüîé Pattern hits summary:")
hits_summary = {}
for pat in PATTERNS:
    hits = find_hits(pat)
    hits_summary[pat] = len(hits)
    if hits:
        print(f"\n‚Äî '{pat}' : {len(hits)} hit(s). Showing up to 25:")
        for h in hits[:25]:
            print("  ", h)

# Save pattern hits to file for easy copy/paste
HITS_TXT = "/content/key_hits.txt"
with open(HITS_TXT, "w", encoding="utf-8") as w:
    for pat in PATTERNS:
        w.write(f"[{pat}] ({hits_summary[pat]} hits)\n")
        for h in find_hits(pat):
            w.write(h + "\n")
        w.write("\n")
print("\nüíæ Saved pattern hits to:", HITS_TXT)

# --------- OPTIONAL: PROBE A FEW TENSORS (dtype/shape) ----------
if PROBE_TENSORS:
    probe_keys = []
    for pat in PATTERNS:
        hits = find_hits(pat)
        if hits:
            probe_keys.append(hits[0])
        if len(probe_keys) >= PROBE_MAX:
            break

    if probe_keys:
        print("\nüß™ Probing a few tensors (dtype/shape):")
        with safetensors.safe_open(downloaded_path, framework="pt") as f:
            for k in probe_keys:
                try:
                    t = f.get_tensor(k)
                    print(f"  {k}\n    dtype={t.dtype}  shape={tuple(t.shape)}  ndim={t.ndim}")
                except Exception as e:
                    print(f"  {k}\n    ‚ö†Ô∏è probe failed: {str(e)[:200]}")
    else:
        print("\nüß™ No probe keys found (no PATTERNS matched).")

print("\n‚úÖ Done.")

In [None]:
#@title Write (convert_nvfp4.py)
# =========================
# Cell: Write converter module to disk (convert_nvfp4.py)
# =========================
%%writefile convert_nvfp4.py
from __future__ import annotations

import argparse
import json
import math
import os
import re
from dataclasses import dataclass
from typing import Dict, Tuple, List, Optional

import torch
from safetensors.torch import load_file, save_file


# ============================================================
# Key routing (Qwen-layered + generic)
# ============================================================

# Qwen-ish patterns seen across Qwen/Qwen2/Qwen2.5 style checkpoints:
# - model.layers.N.self_attn.(q_proj|k_proj|v_proj|o_proj).weight
# - model.layers.N.mlp.(gate_proj|up_proj|down_proj).weight
# Some repos use transformer.h.N.* instead of model.layers.N.*
QWEN_ALLOWLIST_RE: List[re.Pattern] = [
    re.compile(r"^(model\.layers\.\d+\.self_attn\.(q_proj|k_proj|v_proj|o_proj)\.weight)$"),
    re.compile(r"^(model\.layers\.\d+\.mlp\.(gate_proj|up_proj|down_proj)\.weight)$"),
    re.compile(r"^(transformer\.h\.\d+\.attn\.(q_proj|k_proj|v_proj|o_proj)\.weight)$"),
    re.compile(r"^(transformer\.h\.\d+\.mlp\.(gate_proj|up_proj|down_proj)\.weight)$"),
    # Sometimes names differ (wq/wk/wv/wo):
    re.compile(r"^(model\.layers\.\d+\.self_attn\.(wq|wk|wv|wo)\.weight)$"),
    re.compile(r"^(transformer\.h\.\d+\.attn\.(wq|wk|wv|wo)\.weight)$"),
]

# Things you typically DO NOT want in 4-bit float packing:
# - norms, biases, rotary emb, small vectors, scalars, etc.
QWEN_BLACKLIST_SUBSTR = [
    ".bias",
    ".norm", "layernorm", "ln_", ".ln",
    "rotary", "rope",
    "inv_freq",
    "cos_cached", "sin_cached",
]

# Embeddings/lm_head are big 2D matrices but often better left BF16/FP8 (your call).
QWEN_BIG_MATS_SPECIAL = [
    "model.embed_tokens.weight",
    "transformer.wte.weight",
    "lm_head.weight",
    "model.lm_head.weight",
]


def _looks_like_qwen(keys: List[str]) -> bool:
    # Heuristic: if it contains model.layers.N.self_attn.* or transformer.h.N.attn.*
    for k in keys:
        if "model.layers." in k and ".self_attn." in k:
            return True
        if "transformer.h." in k and ".attn." in k:
            return True
    return False


def _matches_any_allowlist(k: str, allowlist: List[re.Pattern]) -> bool:
    return any(p.match(k) is not None for p in allowlist)


def _is_blacklisted(k: str, blacklist_substr: List[str]) -> bool:
    lk = k.lower()
    return any(s in lk for s in blacklist_substr)


def _is_2d_weight(k: str, t: torch.Tensor) -> bool:
    return k.endswith(".weight") and t.ndim == 2 and t.dtype.is_floating_point


# ============================================================
# Quant packing (practical, loader-friendly)
# - NVFP4 (approx): signed int4 per-row + per-row scale, packed 2 nibbles per byte
# - NVFP8/FP8 (approx): signed int8 per-row + per-row scale
#
# Notes:
# - This produces compact safetensors + explicit metadata.
# - If you already have a custom runtime that reads _quantization_metadata,
#   you can map these qweight/scale blobs into your kernels.
# ============================================================

def _to_2s_comp_4bit(q: torch.Tensor) -> torch.Tensor:
    # q int8 in [-8, 7] -> uint8 nibble two's complement
    q = q.to(torch.int16)
    q = torch.where(q < 0, q + 16, q)
    return (q & 0xF).to(torch.uint8)

def _pack_int4_nibbles(q_nibble_u8: torch.Tensor) -> torch.Tensor:
    # q_nibble_u8 shape [R, C] each in [0..15] -> packed [R, ceil(C/2)]
    R, C = q_nibble_u8.shape
    C2 = (C + 1) // 2
    out = torch.empty((R, C2), dtype=torch.uint8, device=q_nibble_u8.device)
    lo = q_nibble_u8[:, 0::2]
    hi = q_nibble_u8[:, 1::2]
    out[:, :lo.shape[1]] = lo
    out[:, :hi.shape[1]] |= (hi << 4)
    # if odd C, the last high nibble stays 0
    return out

def quantize_nvfp4_per_row(w: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
    """
    Approx NVFP4:
      per-row scale = max(abs(w_row)) / 7
      q = round(w/scale) clipped to [-8..7] (we reserve -8 too)
      store q as packed int4 + scale
    """
    if w.ndim != 2:
        raise ValueError("nvfp4 expects 2D weight")

    w = w.detach()
    # work in fp32 for stability, then store scale in fp16
    wf = w.float()
    max_abs = wf.abs().amax(dim=1).clamp_min(1e-12)  # [R]
    scale = (max_abs / 7.0).to(torch.float16)        # [R]
    q = torch.round(wf / scale.unsqueeze(1)).clamp(-8, 7).to(torch.int8)  # [R, C]

    q_nib = _to_2s_comp_4bit(q)                       # [R, C] uint8 nibbles
    q_packed = _pack_int4_nibbles(q_nib).contiguous() # [R, ceil(C/2)] uint8
    return q_packed.cpu(), scale.cpu()

def quantize_int8_per_row(w: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
    """
    Per-row int8 with scale:
      scale = max(abs)/127
      q = round(w/scale) clipped [-127..127]
    """
    if w.ndim != 2:
        raise ValueError("int8-per-row expects 2D weight")
    w = w.detach()
    wf = w.float()
    max_abs = wf.abs().amax(dim=1).clamp_min(1e-12)
    scale = (max_abs / 127.0).to(torch.float16)
    q = torch.round(wf / scale.unsqueeze(1)).clamp(-127, 127).to(torch.int8)
    return q.contiguous().cpu(), scale.cpu()


# ============================================================
# Conversion policy
# ============================================================

@dataclass
class ConvertConfig:
    model_type: str = "auto"  # auto | qwen | generic
    # What to do when a key is blacklist-matched but still a 2D .weight matrix:
    # - "bf16": keep as BF16
    # - "fp8": store as fp8-like (int8+scale) with fp8_format metadata
    blacklisted_2d_mode: str = "fp8"  # bf16 | fp8

    # Fallback chain for *non-blacklisted* 2D weights:
    # Attempt formats in this order until one succeeds:
    # nvfp4 -> nvfp8 -> fp8 -> bf16 (your requested behavior)
    fallback_chain: Tuple[str, ...] = ("nvfp4", "nvfp8", "fp8", "bf16")

    # Choose ONE fp8 format label (no auto-fallback between them)
    fallback_fp8_format: str = "e4m3fn"  # e4m3fn | e5m2

    # If True, quantize any 2D .weight not allowlisted (unless blacklisted rules apply)
    quantize_unmatched_2d: bool = True

    # Allowlist only applies strongly to qwen mode; in generic mode we quantize all 2D weights unless blacklisted.
    qwen_allowlist_only: bool = False  # if True, only allowlisted qwen blocks get nvfp4/nvfp8/fp8 attempts


def _validate_config(cfg: ConvertConfig) -> None:
    if cfg.blacklisted_2d_mode not in ("bf16", "fp8"):
        raise ValueError("blacklisted_2d_mode must be bf16 or fp8")
    for f in cfg.fallback_chain:
        if f not in ("nvfp4", "nvfp8", "fp8", "bf16"):
            raise ValueError("fallback_chain entries must be nvfp4|nvfp8|fp8|bf16")
    if cfg.fallback_fp8_format not in ("e4m3fn", "e5m2"):
        raise ValueError("fallback_fp8_format must be e4m3fn or e5m2")


def _attempt_quant(
    fmt: str,
    key: str,
    w: torch.Tensor,
    fp8_format: str,
) -> Tuple[str, Dict[str, torch.Tensor], Dict]:
    """
    Returns:
      chosen_fmt,
      out_tensors (new tensors to write),
      layer_meta (json-serializable metadata for this layer)
    """
    R, C = w.shape
    if fmt == "nvfp4":
        # pragmatic constraints you can tighten if your runtime requires block alignment
        if C < 8:
            raise RuntimeError("nvfp4: too few cols")
        qweight, scale = quantize_nvfp4_per_row(w)
        return "nvfp4", {
            f"{key}.__qweight": qweight,
            f"{key}.__scale": scale,
        }, {
            "format": "nvfp4",
            "orig_shape": [R, C],
            "qweight": f"{key}.__qweight",
            "scale": f"{key}.__scale",
            "pack": "int4_2scomp_per_row",
        }

    if fmt == "nvfp8":
        # treat nvfp8 as fp8-like int8+scale but label as nvfp8 for your pipeline
        qweight, scale = quantize_int8_per_row(w)
        return "nvfp8", {
            f"{key}.__qweight": qweight,
            f"{key}.__scale": scale,
        }, {
            "format": "nvfp8",
            "orig_shape": [R, C],
            "qweight": f"{key}.__qweight",
            "scale": f"{key}.__scale",
            "pack": "int8_per_row",
            "fp8_format": fp8_format,
        }

    if fmt == "fp8":
        qweight, scale = quantize_int8_per_row(w)
        return "fp8", {
            f"{key}.__qweight": qweight,
            f"{key}.__scale": scale,
        }, {
            "format": "fp8",
            "orig_shape": [R, C],
            "qweight": f"{key}.__qweight",
            "scale": f"{key}.__scale",
            "pack": "int8_per_row",
            "fp8_format": fp8_format,
        }

    if fmt == "bf16":
        return "bf16", {key: w.to(torch.bfloat16).cpu().contiguous()}, {"format": "bf16", "orig_shape": [R, C]}

    raise ValueError(f"Unknown fmt: {fmt}")


def convert_safetensors(
    in_path: str,
    out_path: str,
    cfg: ConvertConfig,
) -> Dict:
    _validate_config(cfg)

    tensors = load_file(in_path)  # CPU tensors
    keys = list(tensors.keys())

    # Auto model detection
    model_type = cfg.model_type
    if model_type == "auto":
        model_type = "qwen" if _looks_like_qwen(keys) else "generic"

    out: Dict[str, torch.Tensor] = {}
    layers_meta: Dict[str, Dict] = {}

    # Preserve original top-level metadata if present (safetensors doesn't expose it via load_file),
    # so we only write our own metadata.
    stats = {
        "model_type": model_type,
        "counts": {"nvfp4": 0, "nvfp8": 0, "fp8": 0, "bf16": 0, "kept": 0, "skipped": 0},
        "total_keys_in": len(keys),
        "total_keys_out": 0,
    }

    for k, t in tensors.items():
        # If it's not a floating tensor or not 2D weight, keep as-is (unless you want to cast)
        if not (t.dtype.is_floating_point and t.ndim >= 1):
            out[k] = t.contiguous()
            stats["counts"]["kept"] += 1
            continue

        # Handle 2D weight matrices
        if _is_2d_weight(k, t):
            is_black = _is_blacklisted(k, QWEN_BLACKLIST_SUBSTR if model_type == "qwen" else [])
            is_special_big = (k in QWEN_BIG_MATS_SPECIAL) if model_type == "qwen" else False

            # In qwen mode: allowlist detection for the "layered model" blocks
            allow = True
            if model_type == "qwen":
                allow = _matches_any_allowlist(k, QWEN_ALLOWLIST_RE)
                if cfg.qwen_allowlist_only and (not allow) and (not is_special_big):
                    # keep unmatched 2D weights (unless you turned on quantize_unmatched_2d)
                    if not cfg.quantize_unmatched_2d:
                        out[k] = t.to(torch.bfloat16).contiguous()
                        stats["counts"]["bf16"] += 1
                        continue

            # Blacklisted 2D weights get separate handling
            if is_black or is_special_big:
                if cfg.blacklisted_2d_mode == "bf16":
                    out[k] = t.to(torch.bfloat16).contiguous()
                    layers_meta[k] = {"format": "bf16", "reason": "blacklisted_or_special"}
                    stats["counts"]["bf16"] += 1
                    continue
                else:
                    # fp8 for blacklisted 2D weights
                    chosen, new_tensors, meta = _attempt_quant("fp8", k, t, cfg.fallback_fp8_format)
                    out.update(new_tensors)
                    layers_meta[k] = {**meta, "reason": "blacklisted_or_special"}
                    stats["counts"][chosen] += 1
                    continue

            # Non-blacklisted 2D: try fallback chain (nvfp4 -> nvfp8 -> fp8 -> bf16)
            last_err: Optional[str] = None
            done = False
            for fmt in cfg.fallback_chain:
                try:
                    chosen, new_tensors, meta = _attempt_quant(fmt, k, t, cfg.fallback_fp8_format)
                    out.update(new_tensors)
                    layers_meta[k] = meta
                    stats["counts"][chosen] += 1
                    done = True
                    break
                except Exception as e:
                    last_err = f"{type(e).__name__}: {e}"
                    continue

            if not done:
                # final safety: keep bf16
                out[k] = t.to(torch.bfloat16).contiguous()
                layers_meta[k] = {"format": "bf16", "fallback_from_error": last_err}
                stats["counts"]["bf16"] += 1

            continue

        # Not a 2D weight: keep, but avoid fp32 bloat
        # (you can change this to keep original dtype if you prefer)
        if t.dtype == torch.float32:
            out[k] = t.to(torch.bfloat16).contiguous()
            stats["counts"]["bf16"] += 1
        else:
            out[k] = t.contiguous()
            stats["counts"]["kept"] += 1

    quant_meta = {
        "format_version": "1.1",
        "model_type": model_type,
        "fallback_chain": list(cfg.fallback_chain),
        "blacklisted_2d_mode": cfg.blacklisted_2d_mode,
        "fp8_format": cfg.fallback_fp8_format,
        "layers": layers_meta,
    }

    metadata = {
        "_quantization_metadata": json.dumps(quant_meta, ensure_ascii=False),
    }

    os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True)
    save_file(out, out_path, metadata=metadata)

    stats["total_keys_out"] = len(out.keys())
    return stats


# ============================================================
# CLI
# ============================================================

def _parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(description="Convert safetensors weights to nvfp4/nvfp8/fp8/bf16 with metadata.")
    p.add_argument("--in", dest="in_path", required=True, help="Input .safetensors")
    p.add_argument("--out", dest="out_path", required=True, help="Output .safetensors")

    p.add_argument("--model", dest="model_type", default="auto", choices=["auto", "qwen", "generic"])
    p.add_argument("--blacklisted-2d-mode", default="fp8", choices=["bf16", "fp8"])
    p.add_argument("--fallback-chain", default="nvfp4,nvfp8,fp8,bf16", help="Comma list: nvfp4,nvfp8,fp8,bf16")
    p.add_argument("--fp8-format", default="e4m3fn", choices=["e4m3fn", "e5m2"])
    p.add_argument("--quantize-unmatched-2d", action="store_true", help="Quantize unmatched 2D weights too (default True in code)")
    p.add_argument("--no-quantize-unmatched-2d", action="store_true", help="Do not quantize unmatched 2D weights")
    p.add_argument("--qwen-allowlist-only", action="store_true", help="In qwen mode, only quantize allowlisted blocks (+ specials handled separately)")
    return p.parse_args()


def main() -> None:
    args = _parse_args()
    chain = tuple([s.strip() for s in args.fallback_chain.split(",") if s.strip()])
    cfg = ConvertConfig(
        model_type=args.model_type,
        blacklisted_2d_mode=args.blacklisted_2d_mode,
        fallback_chain=chain,
        fallback_fp8_format=args.fp8_format,
        quantize_unmatched_2d=(False if args.no_quantize_unmatched_2d else True),
        qwen_allowlist_only=args.qwen_allowlist_only,
    )
    stats = convert_safetensors(args.in_path, args.out_path, cfg)
    print(json.dumps(stats, indent=2))


if __name__ == "__main__":
    main()

In [None]:
#@title Convert settings
# =========================
# Cell: Convert (custom output filename)
# =========================
import os
from convert_nvfp4 import convert_safetensors, ConvertConfig

# ---- set your input path ----
IN_PATH = downloaded_path

# ---- custom output naming ----
OUT_DIR = r"/content/converted"
CUSTOM_BASENAME = "qwen_layered_nvfp4_2dBF16"  # <-- change
OUT_PATH = os.path.join(OUT_DIR, f"{CUSTOM_BASENAME}.safetensors")

cfg = ConvertConfig(
    model_type="qwen",                       # auto | qwen | generic
    blacklisted_2d_mode="bf16",               # bf16 | fp8
    fallback_chain=("nvfp4", "nvfp8", "fp8", "bf16"),  # nvfp4 -> nvfp8 -> fp8 -> bf16
    fallback_fp8_format="e4m3fn",            # e4m3fn | e5m2 (NO auto fallback between them)
    quantize_unmatched_2d=True,              # tries the chain for leftover 2D weights too
    qwen_allowlist_only=False,               # set True if you ONLY want the allowlisted Qwen layer blocks
)

stats = convert_safetensors(IN_PATH, OUT_PATH, cfg)
print("Wrote:", OUT_PATH)
print(stats)

In [None]:
#@title üîê Hugging Face login (token)
from getpass import getpass
HF_TOKEN = getpass("Paste your Hugging Face token (write access):").strip()
assert HF_TOKEN, "Token is required"
print("Token received (not printing it).")


In [None]:
#@title üèóÔ∏è Create repo 'quanttesting' and upload NVFP4 model
import os
from huggingface_hub import HfApi
output_path = OUT_PATH
api = HfApi(token=HF_TOKEN)
who = api.whoami()
username = who["name"]

repo_name = "YOUR repo HERE" # change
repo_id = f"{username}/{repo_name}"

api.create_repo(repo_id=repo_id, repo_type="model", exist_ok=True)
print("Repo:", repo_id)

# Upload the NVFP4 safetensors (uses LFS automatically for large files)
api.upload_file(
    path_or_fileobj=output_path,
    path_in_repo=os.path.basename(output_path),
    repo_id=repo_id,
    repo_type="model",
    commit_message="Upload NVFP4-converted",
)
print("‚úÖ Upload complete.")
