# Imports

In [2]:
import sys, subprocess
pkgs = [
    "torchmetrics==1.4.0.post0",   # FID + CLIPScore
    "torchvision",                 # only used for optional fallback real images
    "ftfy", "regex", "tqdm"        # CLIP/tokenizer deps & nice progress
]
subprocess.check_call([sys.executable, "-m", "pip", "install", *pkgs])


0

In [3]:
import sys, platform, torch, diffusers
print("Python:", sys.executable)
print("Torch:", torch.__version__, "| CUDA?", torch.cuda.is_available())
print("Diffusers:", diffusers.__version__)

# ==== InstaFlow explorer + one-layer W4 quantization (single cell) ==========================
import os, json, csv, time, math, argparse, sys
from pathlib import Path
from typing import List, Tuple

import torch
import torch.nn as nn
from PIL import Image
# %% CLIPScore eval (TorchMetrics-safe): per-image display + CSV

from IPython.display import display, HTML
import torchvision.transforms.functional as TF
from torchmetrics.multimodal import CLIPScore


  from .autonotebook import tqdm as notebook_tqdm


AttributeError: module 'torch' has no attribute 'version'

# Baseline Model

In [None]:
import os, time, torch
from PIL import Image
from Rectified import RectifiedFlowPipeline

# Try native RectifiedFlowPipeline; fall back to custom pipeline if needed
pipe = RectifiedFlowPipeline.from_pretrained(
    "XCLiu/instaflow_0_9B_from_sd_1_5",
    torch_dtype=torch.float32,
    use_safetensors=True,
).to("cpu")
print("Using native RectifiedFlowPipeline")
    
# Keep CPU happy
torch.set_num_threads(max(1, os.cpu_count() - 1))

In [None]:
prompt = "Montain"
W = H = 512      # start smaller on CPU; bump later if needed
t0 = time.time()
img = pipe(
    prompt=prompt,
    width=W, height=H,
    num_inference_steps=1,      # InstaFlow is one-step
    guidance_scale=0.0
).images[0]
out = "Output_Images/image.png"
img.save(out)
print(f"Saved {out} in {time.time()-t0:.2f}s (size {W}x{H}, steps=1, guidance=0.0)")
Image.open(out)

In [None]:
import json, psutil
meta = {
    "prompt": prompt ,
    "resolution": f"{W}x{H}",
    "steps": 1,
    "guidance": 0.0,
    "cpu_logical_cores": psutil.cpu_count(logical=True),
    "python": platform.python_version(),
    "torch": torch.__version__,
    "diffusers": __import__("diffusers").__version__,
}
print(json.dumps(meta, indent=2))


## Analysis Of Baseline Model 

### Average Time

In [None]:
import time, os, torch
from PIL import Image

def generate_and_time(pipe, prompt, width=512, height=512, runs=3):
    times = []
    outs = []
    torch.set_num_threads(max(1, os.cpu_count()-1))
    for i in range(runs):
        t0 = time.perf_counter()
        img = pipe(prompt=prompt,
                   width=width, height=height,
                   num_inference_steps=1, guidance_scale=0.0).images[0]
        dt = time.perf_counter() - t0
        img.save(f"Output_Images/instaflow_{width}x{height}_{i+1}.png")
        times.append(dt); outs.append(img)
        print(f"[{i+1}/{runs}] {width}x{height} in {dt:.2f}s -> instaflow_{width}x{height}_{i+1}.png")
    print(f"\nAvg time: {sum(times)/len(times):.2f}s | Min: {min(times):.2f}s | Max: {max(times):.2f}s")
    return times, outs

# Example:
_= generate_and_time(pipe, prompt="Cute cat, studio lighting", width=512, height=512, runs=3)

### CLIP Score Baseline Model

In [None]:
# %% CLIPScore eval (manual CLIP preprocessing; per-image scores + CSV + display)
import os, csv
from PIL import Image
from IPython.display import display, HTML
import torch
from torchmetrics.multimodal import CLIPScore
import torchvision.transforms as T

# Prompts
simple_prompts = [
    "a cute dog",
    "a beautiful mountain",
    "a bowl of fruit",
    "a happy cat",
    "a sunset over the ocean",
    "a man riding a bike",
    "a delicious pizza",
    "a child playing football",
    "a red sports car",
    "a flower in the rain"
]

# 1) Generate images with your existing `pipe`
os.makedirs("Output_Images", exist_ok=True)
W, H = 512, 512
gen_paths, gen_pils = [], []
for i, p in enumerate(simple_prompts, 1):
    im = pipe(prompt=p, width=W, height=H, num_inference_steps=1, guidance_scale=0.0).images[0]
    path = f"Output_Images/clip_{i:02d}.png"
    im.save(path)
    gen_paths.append(path)
    gen_pils.append(Image.open(path).convert("RGB"))

In [None]:
# 2) Convert PIL -> **uint8 tensors** [C,H,W] (no normalization, no /255)
imgs_tensor_list = [TF.pil_to_tensor(img) for img in gen_pils]  # 0–255 uint8

# 3) Build CLIPScore metric and compute per-image scores
metric = CLIPScore(model_name_or_path="openai/clip-vit-large-patch14").to("cpu")

scores = []
with torch.no_grad():
    for img_t, prompt in zip(imgs_tensor_list, simple_prompts):
        # Pass single-item lists (image list, text list)
        s = metric([img_t], [prompt]).item()
        scores.append(s)

avg = sum(scores) / len(scores)

In [None]:
# 4) Display each image with its CLIPScore
display(HTML("<h3>CLIPScore Results (Option A)</h3>"))
for idx, (p, path, s) in enumerate(zip(simple_prompts, gen_paths, scores), 1):
    display(HTML(f"<b>{idx:02d}. Prompt:</b> {p}<br><b>CLIPScore:</b> {s:.3f}"))
    display(Image.open(path))

# 5) Save CSV file with per-image and average scores
csv_path = "baseline_clip_score.csv"
with open(csv_path, "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["index", "image_path", "prompt", "clip_score"])
    for i, (path, p, s) in enumerate(zip(gen_paths, simple_prompts, scores), 1):
        writer.writerow([i, path, p, f"{s:.6f}"])
    writer.writerow([])
    writer.writerow(["avg", "", "", f"{avg:.6f}"])

print(f"\nSaved per-image CLIP scores and average to: {csv_path}")
print(f"Average CLIPScore over {len(simple_prompts)} prompts: {avg:.3f}")

## Baseline Model Exploration

In [None]:
# ----------------------------- utilities -----------------------------
def human(n: int) -> str:
    if n >= 1_000_000_000: return f"{n/1_000_000_000:.2f}B"
    if n >= 1_000_000:     return f"{n/1_000_000:.2f}M"
    if n >= 1_000:         return f"{n/1_000:.2f}K"
    return str(n)

def count_params(m: nn.Module) -> int:
    return sum(p.numel() for p in m.parameters())

def shape_of(w: torch.Tensor) -> str:
    return "x".join(str(int(s)) for s in w.shape)

def print_header(title: str):
    print("\n" + "=" * 88)
    print(title)
    print("=" * 88)

def print_table(rows: List[List[str]], headers: List[str], max_rows: int = 60):
    if not rows:
        print("(no rows)")
        return
    rows = rows[:max_rows]
    widths = [max(len(str(x)) for x in col) for col in zip(headers, *rows)]
    line = " | ".join(h.ljust(w) for h, w in zip(headers, widths))
    sep  = "-+-".join("-" * w for w in widths)
    print(line)
    print(sep)
    for r in rows:
        print(" | ".join(str(c).ljust(w) for c, w in zip(r, widths)))
    if len(rows) == max_rows:
        print(f"... (showing first {max_rows} rows)")

def is_probably_fragile(name: str, mod: nn.Module) -> bool:
    # Sensible first-pass skip-list
    if "conv_in" in name or "conv_out" in name: return True
    if "time_embedding" in name: return True
    if isinstance(mod, (nn.LayerNorm, nn.GroupNorm)): return True
    if "attentions" in name and ("to_out" in name or "proj_out" in name): return True
    return False

def list_unet_dense_layers(unet: nn.Module) -> List[Tuple[str, nn.Module]]:
    layers = []
    for name, mod in unet.named_modules():
        if isinstance(mod, (nn.Conv2d, nn.Linear)):
            layers.append((name, mod))
    return layers

# ----------------------------- explore components -----------------------------
print_header("Top-level components (parameter counts)")
components = {
    "text_encoder": pipe.text_encoder,
    "unet"        : pipe.unet,
    "vae"         : pipe.vae,
    "safety_checker": pipe.safety_checker,
}
rows = []
total = 0
for k, m in components.items():
    n = count_params(m) if isinstance(m, nn.Module) else 0
    rows.append([k, human(n)])
    total += n
rows.append(["TOTAL (nn.Modules)", human(total)])
print_table(rows, headers=["Component", "Params"])

# ----------------------------- UNet layer inventory -----------------------------
print_header("UNet Conv/Linear inventory")
layers = list_unet_dense_layers(pipe.unet)

inv_rows = []
csv_rows  = []
json_items = []
for idx, (name, mod) in enumerate(layers):
    w = mod.weight
    wshape = shape_of(w)
    has_bias = (mod.bias is not None)
    n_params = w.numel() + (mod.bias.numel() if has_bias else 0)
    note = "SKIP?" if is_probably_fragile(name, mod) else ""
    inv_rows.append([idx, name, type(mod).__name__, wshape, "Y" if has_bias else "N", human(n_params), note])
    csv_rows.append([idx, name, type(mod).__name__, wshape, has_bias, n_params, note])
    json_items.append({
        "idx": idx, "name": name, "type": type(mod).__name__,
        "weight_shape": wshape, "has_bias": has_bias,
        "param_count": int(n_params),
        "suggest_skip": bool(note)
    })

print_table(inv_rows, headers=["#", "name", "type", "weight_shape", "bias", "params", "note"])

# Save inventory
os.makedirs("Inventory", exist_ok=True)
csv_path  = "Inventory/unet_layers.csv"
json_path = "Inventory/unet_layers.json"
with open(csv_path, "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["idx","name","type","weight_shape","has_bias","param_count","note"])
    writer.writerows(csv_rows)
with open(json_path, "w") as f:
    json.dump({"layers": json_items}, f, indent=2)
print(f"\n[info] Saved inventory:\n - {csv_path}\n - {json_path}")

# ----------------------------- pick a single target layer -----------------------------
# Option A: choose by index after reviewing the printed table / CSV
TARGET_IDX = next((i for i,(n, m) in enumerate(layers) if ("down_blocks.0.resnets.0.conv1" in n)), 0)
tgt_name, tgt_mod = layers[TARGET_IDX]
print_header("Target layer for 4-bit (weights-only) test")
print("Index :", TARGET_IDX)
print("Name  :", tgt_name)
print("Type  :", type(tgt_mod).__name__)
print("Shape :", shape_of(tgt_mod.weight))

# ----------------------------- tiny W4 quantizer (weights-only) -----------------------------
def per_channel_int4_quantize_weight(W: torch.Tensor):
    """
    Per-output-channel symmetric INT4 PTQ:
      s_c = max(|W_c|)/7 ; q = round(W/s_c) clipped to [-8,7] stored as int8 (values in 4-bit domain).
    """
    oc = W.shape[0]
    Wc = W.detach().clone().view(oc, -1)
    scales = Wc.abs().max(dim=1).values / 7.0
    scales = torch.clamp(scales, min=1e-8)
    q = torch.round((Wc.T / scales).T)
    q = torch.clamp(q, -8, 7).to(torch.int8)  # store as int8; values are 4-bit domain
    return q, scales

def dequantize_int4(q: torch.Tensor, scales: torch.Tensor, shape):
    oc = shape[0]
    Wf = (q.float().T * scales).T
    return Wf.view(shape)

class QConv2d_OneLayer(nn.Module):
    """
    Drop-in wrapper for a single Conv2d that:
      - stores INT4 weights (+ per-channel scales)
      - dequantizes to float each forward (quality study; not for speed)
    """
    def __init__(self, conv: nn.Conv2d):
        super().__init__()
        # meta
        self.stride, self.padding = conv.stride, conv.padding
        self.dilation, self.groups = conv.dilation, conv.groups
        self.bias = conv.bias is not None
        if self.bias:
            self.register_buffer("biasv", conv.bias.data.clone())

        # quantize once
        q, s = per_channel_int4_quantize_weight(conv.weight.data)
        self.register_buffer("qweight", q)
        self.register_buffer("scales", s)

        self.out_channels = conv.out_channels
        self.in_channels  = conv.in_channels
        self.kernel_size  = conv.kernel_size

    def forward(self, x):
        W = dequantize_int4(
            self.qweight, self.scales,
            (self.out_channels, self.in_channels, *self.kernel_size)
        )
        return nn.functional.conv2d(
            x, W, bias=(self.biasv if self.bias else None),
            stride=self.stride, padding=self.padding,
            dilation=self.dilation, groups=self.groups
        )

class QLinear_OneLayer(nn.Module):
    def __init__(self, lin: nn.Linear):
        super().__init__()
        # treat Linear as Conv1x1 for quant/dequant convenience
        w4d = lin.weight.data.unsqueeze(-1).unsqueeze(-1)  # (out,in,1,1)
        q, s = per_channel_int4_quantize_weight(w4d)
        self.register_buffer("qweight", q)
        self.register_buffer("scales", s)
        self.bias = lin.bias is not None
        if self.bias:
            self.register_buffer("biasv", lin.bias.data.clone())
        self.out_features = lin.out_features
        self.in_features  = lin.in_features

    def forward(self, x):
        W = dequantize_int4(self.qweight, self.scales, (self.out_features, self.in_features, 1, 1))
        W = W.squeeze(-1).squeeze(-1)  # (out,in)
        y = x @ W.t()
        return y + (self.biasv if self.bias else 0.0)

# ----------------------------- replace just one layer -----------------------------
# Walk to the parent module of the target, then swap it
parent = pipe.unet
parts = tgt_name.split(".")
for p in parts[:-1]:
    parent = getattr(parent, p)
leaf = parts[-1]

orig_mod = getattr(parent, leaf)
if isinstance(orig_mod, nn.Conv2d):
    setattr(parent, leaf, QConv2d_OneLayer(orig_mod))
    replaced_type = "Conv2d -> QConv2d_OneLayer"
elif isinstance(orig_mod, nn.Linear):
    setattr(parent, leaf, QLinear_OneLayer(orig_mod))
    replaced_type = "Linear -> QLinear_OneLayer"
else:
    replaced_type = f"(no-op) unsupported type: {type(orig_mod).__name__}"

print(f"[info] Replaced: {tgt_name}   ({replaced_type})")


# 8 Bit Quantization Model

In [None]:
# %% [single-cell] InstaFlow 8-bit (weights-only) PTQ demo — load → quantize → (pipe_8bitQ)
# -----------------------------------------------------------------------------------------
# What this cell does:
#   1) Loads a fresh InstaFlow pipeline into `pipe_8bitQ` (keeps your baseline untouched)
#   2) Finds UNet Conv/Linear layers
#   3) Applies simple **INT8 weights-only** per-output-channel PTQ
#      - symmetric quantization to [-127, +127]
#      - fragile layers (conv_in/conv_out/norms/attn out-proj) are skipped on first pass
#   4) Leaves activations in float (no runtime speedup yet; quality exploration first)
#
# Notes:
#   • If MODEL_ID / DEVICE / TORCH_DTYPE / INV_DIR / SAVE_INVENTORY already exist from previous cells, we reuse them.
#     Otherwise, we set safe defaults below.
#   • This creates a separate pipeline named `pipe_8bitQ`.
# -----------------------------------------------------------------------------------------

import os, json, csv, time
from typing import List, Tuple

import torch
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image

# ---------- Fallback CONFIG (reuse if already defined) ----------
try:
    MODEL_ID
except NameError:
    MODEL_ID = "XCLiu/instaflow_0_9B_from_sd_1_5"
try:
    DEVICE
except NameError:
    DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
try:
    TORCH_DTYPE
except NameError:
    TORCH_DTYPE = torch.float16 if (DEVICE == "cuda") else torch.float32
try:
    INV_DIR
except NameError:
    INV_DIR = "Inventory"
try:
    SAVE_INVENTORY
except NameError:
    SAVE_INVENTORY = True

# ---------- Import pipeline ----------
from Rectified import RectifiedFlowPipeline

print(f"[info] Loading pipeline for INT8 PTQ: {MODEL_ID} on {DEVICE} ({str(TORCH_DTYPE).split('.')[-1]})")
pipe_8bitQ = RectifiedFlowPipeline.from_pretrained(
    MODEL_ID,
    torch_dtype=TORCH_DTYPE,
    use_safetensors=True,
).to(DEVICE)

if DEVICE == "cpu":
    torch.set_num_threads(max(1, os.cpu_count() - 1))

# ---------- Helpers ----------
def list_unet_dense_layers(unet: nn.Module) -> List[Tuple[str, nn.Module]]:
    """Return (name, module) for all Conv2d/Linear layers in the UNet."""
    layers = []
    for name, mod in unet.named_modules():
        if isinstance(mod, (nn.Conv2d, nn.Linear)):
            layers.append((name, mod))
    return layers

def is_fragile(name: str, mod: nn.Module) -> bool:
    """
    Skip list for first pass:
      - First/last UNet convs (conv_in, conv_out)
      - Time embedding layers
      - Norm layers
      - Attention output projection (often touchy initially)
    """
    if "conv_in" in name or "conv_out" in name: return True
    if "time_embedding" in name: return True
    if isinstance(mod, (nn.LayerNorm, nn.GroupNorm)): return True
    if "attentions" in name and ("to_out" in name or "proj_out" in name): return True
    return False

def get_parent_and_leaf(root: nn.Module, dotted: str):
    parent = root
    parts = dotted.split(".")
    for p in parts[:-1]:
        parent = getattr(parent, p)
    return parent, parts[-1]

# ---------- INT8 per-channel weights-only quantizer ----------
# Symmetric per-OUT-channel:
#   s_c = max(|W_c|) / 127
#   q   = round(W / s_c), clipped to [-127, +127] (store in int8)
#   Ŵ   = s_c * q   (dequant at forward)
def per_channel_int8_quantize_weight(W: torch.Tensor):
    oc = W.shape[0]
    Wc = W.detach().clone().view(oc, -1)
    scales = Wc.abs().max(dim=1).values / 127.0
    scales = torch.clamp(scales, min=1e-8)
    q = torch.round((Wc.T / scales).T)
    q = torch.clamp(q, -127, 127).to(torch.int8)
    return q, scales

def dequantize_int8(q: torch.Tensor, scales: torch.Tensor, shape, to_dtype, to_device):
    oc = shape[0]
    Wf = (q.float().T * scales).T
    return Wf.view(shape).to(dtype=to_dtype, device=to_device)

class QConv2dINT8(nn.Module):
    """Drop-in Conv2d layer with INT8 weights (+ per-channel scales), dequantized each forward()."""
    def __init__(self, conv: nn.Conv2d):
        super().__init__()
        # meta
        self.stride, self.padding = conv.stride, conv.padding
        self.dilation, self.groups = conv.dilation, conv.groups
        self.bias_flag = conv.bias is not None
        if self.bias_flag:
            self.register_buffer("biasv", conv.bias.data.detach().clone())
        # quantize once
        q, s = per_channel_int8_quantize_weight(conv.weight.data)
        self.register_buffer("qweight", q)
        self.register_buffer("scales", s)
        # shape
        self.out_channels = conv.out_channels
        self.in_channels  = conv.in_channels
        self.kernel_size  = conv.kernel_size

    def forward(self, x):
        W = dequantize_int8(
            self.qweight, self.scales,
            (self.out_channels, self.in_channels, *self.kernel_size),
            to_dtype=x.dtype, to_device=x.device
        )
        return F.conv2d(
            x, W, bias=(self.biasv if self.bias_flag else None),
            stride=self.stride, padding=self.padding,
            dilation=self.dilation, groups=self.groups
        )

class QLinearINT8(nn.Module):
    """Drop-in Linear layer with INT8 weights (+ per-channel scales), dequantized each forward()."""
    def __init__(self, lin: nn.Linear):
        super().__init__()
        # reuse conv quant path by treating (out,in) as (out,in,1,1)
        w4d = lin.weight.data.unsqueeze(-1).unsqueeze(-1)
        q, s = per_channel_int8_quantize_weight(w4d)
        self.register_buffer("qweight", q)
        self.register_buffer("scales", s)
        self.bias_flag = lin.bias is not None
        if self.bias_flag:
            self.register_buffer("biasv", lin.bias.data.detach().clone())
        self.out_features = lin.out_features
        self.in_features  = lin.in_features

    def forward(self, x):
        W = dequantize_int8(
            self.qweight, self.scales,
            (self.out_features, self.in_features, 1, 1),
            to_dtype=x.dtype, to_device=x.device
        )
        W = W.squeeze(-1).squeeze(-1)  # (out,in)
        y = x @ W.t()
        return y + (self.biasv if self.bias_flag else 0.0)

# ---------- (optional) export UNet inventory before replacement ----------
targets = list_unet_dense_layers(pipe_8bitQ.unet)

if SAVE_INVENTORY:
    os.makedirs(INV_DIR, exist_ok=True)
    csv_path  = os.path.join(INV_DIR, "unet_layers_int8.csv")
    json_path = os.path.join(INV_DIR, "unet_layers_int8.json")
    rows, items = [], []
    for idx, (name, mod) in enumerate(targets):
        wshape = "x".join(map(str, list(mod.weight.shape)))
        has_bias = (mod.bias is not None)
        rows.append([idx, name, type(mod).__name__, wshape, has_bias])
        items.append({"idx": idx, "name": name, "type": type(mod).__name__,
                      "weight_shape": wshape, "has_bias": has_bias})
    with open(csv_path, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["idx","name","type","weight_shape","has_bias"])
        writer.writerows(rows)
    with open(json_path, "w") as f:
        json.dump({"layers": items}, f, indent=2)
    print(f"[info] Saved UNet layer inventory (INT8 pass):\n  • {csv_path}\n  • {json_path}")

# ---------- Replace eligible layers with INT8 wrappers ----------
num_quantized = 0
for name, mod in targets:
    if is_fragile(name, mod):
        continue
    parent, leaf = get_parent_and_leaf(pipe_8bitQ.unet, name)
    if isinstance(mod, nn.Conv2d):
        setattr(parent, leaf, QConv2dINT8(mod))
        num_quantized += 1
    elif isinstance(mod, nn.Linear):
        setattr(parent, leaf, QLinearINT8(mod))
        num_quantized += 1

print(f"[info] Quantized (weights-only INT8) layers in pipe_8bitQ: {num_quantized} (skipped fragile layers)")

# ---------- (Optional) quick generation sanity check ----------
# OUT_DIR = "Output_Images"  # reuse if already defined elsewhere
try:
    OUT_DIR
except NameError:
    OUT_DIR = "Output_Images"

In [None]:
os.makedirs(OUT_DIR, exist_ok=True)

test_prompt = "a cute dog"
WIDTH = HEIGHT = 512
GUIDANCE = 0.0

t0 = time.time()
img = pipe_8bitQ(
    prompt=test_prompt,
    width=WIDTH, height=HEIGHT,
    num_inference_steps=1,
    guidance_scale=GUIDANCE
).images[0]
out_path = os.path.join(OUT_DIR, "instaflow_w8_weights_only.png")
img.save(out_path)
print(f"[done] INT8 image saved: {out_path} | device={DEVICE} dtype={str(TORCH_DTYPE).split('.')[-1]} | {time.time()-t0:.2f}s")

display(Image.open(out_path))


In [None]:
def generate_and_time(pipe, prompt, width=512, height=512, runs=3):
    times = []
    outs = []
    torch.set_num_threads(max(1, os.cpu_count()-1))
    for i in range(runs):
        t0 = time.perf_counter()
        img = pipe_8bitQ(prompt=prompt,
                   width=width, height=height,
                   num_inference_steps=1, guidance_scale=0.0).images[0]
        dt = time.perf_counter() - t0
        img.save(f"Output_Images/8bit_instaflow_{width}x{height}_{i+1}.png")
        times.append(dt); outs.append(img)
        print(f"[{i+1}/{runs}] {width}x{height} in {dt:.2f}s -> instaflow_{width}x{height}_{i+1}.png")
    print(f"\nAvg time: {sum(times)/len(times):.2f}s | Min: {min(times):.2f}s | Max: {max(times):.2f}s")
    return times, outs

# Example:
_= generate_and_time(pipe, prompt="Cute cat, studio lighting", width=512, height=512, runs=3)

## 8 bit Clip score

In [None]:
W, H = 512, 512
gen_paths, gen_pils = [], []
for i, p in enumerate(simple_prompts, 1):
    im = pipe_8bitQ(prompt=p, width=W, height=H,
              num_inference_steps=1, guidance_scale=0.0).images[0]
    path = f"Output_Images/8bit)qantized_clip_{i:02d}.png"
    im.save(path)
    gen_paths.append(path)
    gen_pils.append(Image.open(path).convert("RGB"))

In [None]:
# 2) Convert PIL -> **uint8 tensors** [C,H,W] (no normalization, no /255)
imgs_tensor_list = [TF.pil_to_tensor(img) for img in gen_pils]  # 0–255 uint8

# 3) Build CLIPScore metric and compute per-image scores
metric = CLIPScore(model_name_or_path="openai/clip-vit-large-patch14").to("cpu")

scores = []
with torch.no_grad():
    for img_t, prompt in zip(imgs_tensor_list, simple_prompts):
        # Pass single-item lists (image list, text list)
        s = metric([img_t], [prompt]).item()
        scores.append(s)

avg = sum(scores) / len(scores)

In [None]:
# 4) Display each image with its CLIPScore
display(HTML("<h3>CLIPScore Results (Option A)</h3>"))
for idx, (p, path, s) in enumerate(zip(simple_prompts, gen_paths, scores), 1):
    display(HTML(f"<b>{idx:02d}. Prompt:</b> {p}<br><b>CLIPScore:</b> {s:.3f}"))
    display(Image.open(path))

# 5) Save CSV file with per-image and average scores
csv_path = "8bit_quantized_clip_score.csv"
with open(csv_path, "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["index", "image_path", "prompt", "clip_score"])
    for i, (path, p, s) in enumerate(zip(gen_paths, simple_prompts, scores), 1):
        writer.writerow([i, path, p, f"{s:.6f}"])
    writer.writerow([])
    writer.writerow(["avg", "", "", f"{avg:.6f}"])

print(f"\nSaved per-image CLIP scores and average to: {csv_path}")
print(f"Average CLIPScore over {len(simple_prompts)} prompts: {avg:.3f}")

# 4 Bit Quantizations Model

In [None]:
# %% [single-cell] InstaFlow 4-bit (weights-only) PTQ demo — load → quantize → generate
# ------------------------------------------------------------------------------------
# What this cell does:
#   1) Loads the InstaFlow (Rectified Flow) text-to-image pipeline (0.9B) from Hugging Face
#   2) Builds a compact inventory of UNet Conv/Linear layers
#   3) Applies simple, per-output-channel **4-bit** (INT4) **weights-only** quantization
#      to a safe subset of UNet layers (skips fragile spots like conv_in/conv_out/norms)
#   4) Runs one-step generation and saves a quantized output image
#
# Notes:
#   • This is a **quality exploration** path. We dequantize weights on-the-fly in forward(),
#     so you won't see runtime speedups yet. It's the correct first step before moving to
#     fused INT4 kernels (ONNX Runtime/TensorRT/CUTLASS) later.
#   • The original downloaded weights are NOT modified on disk. We create a **separate**
#     quantized pipeline instance in memory.
#   • Works on CPU; GPU recommended for speed (set DEVICE = "cuda" if available).
# ------------------------------------------------------------------------------------

import os, time, json, csv
from typing import List, Tuple

import torch
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image

# ---------- CONFIG ----------
MODEL_ID      = "XCLiu/instaflow_0_9B_from_sd_1_5"  # public HF model id
DEVICE        = "cuda" if torch.cuda.is_available() else "cpu"  # "cpu" or "cuda"
TORCH_DTYPE   = torch.float16 if (DEVICE == "cuda") else torch.float32
PROMPT        = "two cute cats playing with yarn, soft lighting, photorealistic"
WIDTH = HEIGHT = 512
GUIDANCE      = 0.0   # InstaFlow is one-step; CFG>1.0 does extra work. 0.0 keeps it simple.
OUT_DIR       = "Output_Images"
INV_DIR       = "Inventory"
SAVE_INVENTORY= True  # saves a CSV/JSON inventory of quantized layers

# ---------- 0) Load baseline pipeline ----------
# Expect to run inside the InstaFlow repo's `code/` folder so this import works:
from Rectified import RectifiedFlowPipeline

print(f"[info] Loading pipeline {MODEL_ID} on {DEVICE} as {str(TORCH_DTYPE).split('.')[-1]} ...")
pipe_base = RectifiedFlowPipeline.from_pretrained(
    MODEL_ID,
    torch_dtype=TORCH_DTYPE,
    use_safetensors=True,
).to(DEVICE)

if DEVICE == "cpu":
    torch.set_num_threads(max(1, os.cpu_count() - 1))

# ---------- 1) UNet inventory helpers ----------
def list_unet_dense_layers(unet: nn.Module) -> List[Tuple[str, nn.Module]]:
    """Return (name, module) for all Conv2d/Linear layers in the UNet."""
    layers = []
    for name, mod in unet.named_modules():
        if isinstance(mod, (nn.Conv2d, nn.Linear)):
            layers.append((name, mod))
    return layers

def is_fragile(name: str, mod: nn.Module) -> bool:
    """
    First quantization pass skip-list:
      - First/last UNet convs (conv_in, conv_out)
      - Time embedding layers
      - Norms (handled elsewhere; keep in float)
      - Attention output projection (often touchy on first pass)
    """
    if "conv_in" in name or "conv_out" in name: return True
    if "time_embedding" in name: return True
    if isinstance(mod, (nn.LayerNorm, nn.GroupNorm)): return True
    if "attentions" in name and ("to_out" in name or "proj_out" in name): return True
    return False

# ---------- 2) Simple 4-bit quantizer (weights-only, per-out-channel) ----------
# We quantize each OUT channel independently: s_c = max(|W_c|)/7; q = round(W/s_c) in [-8, 7].
# We store q as int8 but values live in 4-bit domain (two's complement nibble).
# In forward(), we dequantize: W_hat = s_c * q  (computed to match input dtype/device).
def per_channel_int4_quantize_weight(W: torch.Tensor):
    """
    W shape:
      • Conv2d: (out_c, in_c, kH, kW)
      • Linear: (out_c, in_c)
    Returns:
      q (int8 tensor holding 4-bit domain values), scales (float per out-channel)
    """
    oc = W.shape[0]
    Wc = W.detach().clone().view(oc, -1)               # flatten per OUT channel
    scales = Wc.abs().max(dim=1).values / 7.0          # s_c = max|W_c| / 7
    scales = torch.clamp(scales, min=1e-8)             # avoid div-by-zero
    q = torch.round((Wc.T / scales).T)
    q = torch.clamp(q, -8, 7).to(torch.int8)           # 4-bit domain values stored in int8
    return q, scales

def dequantize_int4(q: torch.Tensor, scales: torch.Tensor, shape, to_dtype, to_device):
    oc = shape[0]
    Wf = (q.float().T * scales).T                      # W_hat = s * q
    return Wf.view(shape).to(dtype=to_dtype, device=to_device)

class QConv2dINT4(nn.Module):
    """Drop-in Conv2d that holds INT4 weights (+ per-channel scales) and dequants per forward()."""
    def __init__(self, conv: nn.Conv2d):
        super().__init__()
        # save Conv2d meta
        self.stride, self.padding = conv.stride, conv.padding
        self.dilation, self.groups = conv.dilation, conv.groups
        self.bias_flag = conv.bias is not None
        if self.bias_flag:
            self.register_buffer("biasv", conv.bias.data.detach().clone())
        # quantize weights once
        q, s = per_channel_int4_quantize_weight(conv.weight.data)
        self.register_buffer("qweight", q)
        self.register_buffer("scales", s)
        # shapes
        self.out_channels = conv.out_channels
        self.in_channels  = conv.in_channels
        self.kernel_size  = conv.kernel_size

    def forward(self, x):
        W = dequantize_int4(
            self.qweight, self.scales,
            (self.out_channels, self.in_channels, *self.kernel_size),
            to_dtype=x.dtype, to_device=x.device
        )
        return F.conv2d(
            x, W, bias=(self.biasv if self.bias_flag else None),
            stride=self.stride, padding=self.padding,
            dilation=self.dilation, groups=self.groups
        )

class QLinearINT4(nn.Module):
    """Drop-in Linear with INT4 weights (+ per-channel scales), dequant per forward()."""
    def __init__(self, lin: nn.Linear):
        super().__init__()
        # treat (out,in) as (out,in,1,1) for shared quant code
        w4d = lin.weight.data.unsqueeze(-1).unsqueeze(-1)
        q, s = per_channel_int4_quantize_weight(w4d)
        self.register_buffer("qweight", q)
        self.register_buffer("scales", s)
        self.bias_flag = lin.bias is not None
        if self.bias_flag:
            self.register_buffer("biasv", lin.bias.data.detach().clone())
        self.out_features = lin.out_features
        self.in_features  = lin.in_features

    def forward(self, x):
        W = dequantize_int4(self.qweight, self.scales,
                            (self.out_features, self.in_features, 1, 1),
                            to_dtype=x.dtype, to_device=x.device)
        W = W.squeeze(-1).squeeze(-1)  # (out,in)
        y = x @ W.t()
        return y + (self.biasv if self.bias_flag else 0.0)

# ---------- 3) Build a separate quantized pipeline (keeps baseline intact) ----------
print("[info] Creating a separate quantized pipeline instance ...")
pipe_q = RectifiedFlowPipeline.from_pretrained(
    MODEL_ID,
    torch_dtype=TORCH_DTYPE,
    use_safetensors=True,
).to(DEVICE)

# identify target layers in UNet
targets = list_unet_dense_layers(pipe_q.unet)

# optional: export inventory before replacement
if SAVE_INVENTORY:
    os.makedirs(INV_DIR, exist_ok=True)
    csv_path  = os.path.join(INV_DIR, "unet_layers.csv")
    json_path = os.path.join(INV_DIR, "unet_layers.json")
    rows, items = [], []
    for idx, (name, mod) in enumerate(targets):
        wshape = "x".join(map(str, list(mod.weight.shape)))
        has_bias = (mod.bias is not None)
        rows.append([idx, name, type(mod).__name__, wshape, has_bias])
        items.append({"idx": idx, "name": name, "type": type(mod).__name__,
                      "weight_shape": wshape, "has_bias": has_bias})
    with open(csv_path, "w", newline="") as f:
        writer = csv.writer(f); writer.writerow(["idx","name","type","weight_shape","has_bias"]); writer.writerows(rows)
    with open(json_path, "w") as f:
        json.dump({"layers": items}, f, indent=2)
    print(f"[info] Saved UNet layer inventory:\n  • {csv_path}\n  • {json_path}")

# replacement helper: navigate to parent of a dotted module path
def get_parent_and_leaf(root: nn.Module, dotted: str):
    parent = root
    parts = dotted.split(".")
    for p in parts[:-1]:
        parent = getattr(parent, p)
    return parent, parts[-1]

# replace eligible layers with INT4 wrappers
num_quantized = 0
for name, mod in targets:
    if is_fragile(name, mod):
        continue
    parent, leaf = get_parent_and_leaf(pipe_q.unet, name)
    if isinstance(mod, nn.Conv2d):
        setattr(parent, leaf, QConv2dINT4(mod))
        num_quantized += 1
    elif isinstance(mod, nn.Linear):
        setattr(parent, leaf, QLinearINT4(mod))
        num_quantized += 1

print(f"[info] Quantized (weights-only INT4) layers: {num_quantized} (skipped fragile layers)")

In [None]:
# ---------- 4) Generate one image with the quantized pipeline ----------
os.makedirs(OUT_DIR, exist_ok=True)

PROMPTQ = "a cute cat"
t0 = time.time()
img = pipe_q(
    prompt=PROMPTQ,
    width=WIDTH, height=HEIGHT,
    num_inference_steps=1,   # InstaFlow is one-step
    guidance_scale=GUIDANCE
).images[0]
out_path = os.path.join(OUT_DIR, "instaflow_w4_weights_only.png")
img.save(out_path)
elapsed = time.time() - t0

print(f"[done] Saved: {out_path}  | device={DEVICE} dtype={str(TORCH_DTYPE).split('.')[-1]} | {elapsed:.2f}s")
display(Image.open(out_path))


#This si the best 

## CLIP score for quantized model

In [None]:
W, H = 512, 512
gen_paths, gen_pils = [], []
for i, p in enumerate(simple_prompts, 1):
    im = pipe_q(prompt=p, width=W, height=H,
              num_inference_steps=1, guidance_scale=0.0).images[0]
    path = f"Output_Images/qantized_clip_{i:02d}.png"
    im.save(path)
    gen_paths.append(path)
    gen_pils.append(Image.open(path).convert("RGB"))

# 2) Convert PIL -> **uint8 tensors** [C,H,W] (no normalization, no /255)
imgs_tensor_list = [TF.pil_to_tensor(img) for img in gen_pils]  # 0–255 uint8

# 3) Build CLIPScore metric and compute per-image scores
metric = CLIPScore(model_name_or_path="openai/clip-vit-large-patch14").to("cpu")

scores = []
with torch.no_grad():
    for img_t, prompt in zip(imgs_tensor_list, simple_prompts):
        # Pass single-item lists (image list, text list)
        s = metric([img_t], [prompt]).item()
        scores.append(s)

avg = sum(scores) / len(scores)

# 4) Display each image with its CLIPScore
display(HTML("<h3>CLIPScore Results (Option A)</h3>"))
for idx, (p, path, s) in enumerate(zip(simple_prompts, gen_paths, scores), 1):
    display(HTML(f"<b>{idx:02d}. Prompt:</b> {p}<br><b>CLIPScore:</b> {s:.3f}"))
    display(Image.open(path))

# 5) Save CSV file with per-image and average scores
csv_path = "quantized_clip_score.csv"
with open(csv_path, "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["index", "image_path", "prompt", "clip_score"])
    for i, (path, p, s) in enumerate(zip(gen_paths, simple_prompts, scores), 1):
        writer.writerow([i, path, p, f"{s:.6f}"])
    writer.writerow([])
    writer.writerow(["avg", "", "", f"{avg:.6f}"])

print(f"\nSaved per-image CLIP scores and average to: {csv_path}")
print(f"Average CLIPScore over {len(simple_prompts)} prompts: {avg:.3f}")



In [None]:
# 2) Convert PIL -> **uint8 tensors** [C,H,W] (no normalization, no /255)
imgs_tensor_list = [TF.pil_to_tensor(img) for img in gen_pils]  # 0–255 uint8

# 3) Build CLIPScore metric and compute per-image scores
metric = CLIPScore(model_name_or_path="openai/clip-vit-large-patch14").to("cpu")

scores = []
with torch.no_grad():
    for img_t, prompt in zip(imgs_tensor_list, simple_prompts):
        # Pass single-item lists (image list, text list)
        s = metric([img_t], [prompt]).item()
        scores.append(s)

avg = sum(scores) / len(scores)


In [None]:
# 4) Display each image with its CLIPScore
display(HTML("<h3>CLIPScore Results (Option A)</h3>"))
for idx, (p, path, s) in enumerate(zip(simple_prompts, gen_paths, scores), 1):
    display(HTML(f"<b>{idx:02d}. Prompt:</b> {p}<br><b>CLIPScore:</b> {s:.3f}"))
    display(Image.open(path))

# 5) Save CSV file with per-image and average scores
csv_path = "quantized_clip_score.csv"
with open(csv_path, "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["index", "image_path", "prompt", "clip_score"])
    for i, (path, p, s) in enumerate(zip(gen_paths, simple_prompts, scores), 1):
        writer.writerow([i, path, p, f"{s:.6f}"])
    writer.writerow([])
    writer.writerow(["avg", "", "", f"{avg:.6f}"])

print(f"\nSaved per-image CLIP scores and average to: {csv_path}")
print(f"Average CLIPScore over {len(simple_prompts)} prompts: {avg:.3f}")

# ONNX Build And checking 

In [1]:
# %% [single-cell] Quick ONNX Runtime CPU sanity check + real-time diff vs PyTorch
import sys, time, statistics, math
from pathlib import Path

# ----------------------------- dependency probe -----------------------------
missing = []
try:
    import torch
except Exception:
    missing.append("torch")
try:
    import onnx
except Exception:
    missing.append("onnx")
try:
    import onnxruntime as ort
except Exception:
    missing.append("onnxruntime")

if missing:
    print("❌ Missing packages:", ", ".join(missing))
    print("Install them (CPU-only):")
    print("  pip install --upgrade torch --index-url https://download.pytorch.org/whl/cpu")
    print("  pip install onnx onnxruntime")
    raise SystemExit

print(f"✅ torch {torch.__version__} | onnx {onnx.__version__} | onnxruntime {ort.__version__}")

# ----------------------------- tiny test model ------------------------------
import torch
import torch.nn as nn
import torch.nn.functional as F

class TinyBlock(nn.Module):
    def __init__(self, C_in=3, C_mid=16, C_out=32, H=64, W=64):
        super().__init__()
        self.conv = nn.Conv2d(C_in, C_mid, kernel_size=3, padding=1)
        self.lin  = nn.Linear(C_mid*H*W, C_out)
        self.H, self.W = H, W
    def forward(self, x):
        # x: (N, C_in, H, W)
        y = self.conv(x)
        y = F.relu(y)
        y = y.reshape(y.shape[0], -1)  # flatten
        y = self.lin(y)
        return y

# fix shapes to keep things simple and comparable
B, C_in, H, W = 1, 3, 64, 64
model = TinyBlock(C_in=C_in, C_mid=16, C_out=32, H=H, W=W).cpu().eval()
dummy = torch.randn(B, C_in, H, W, dtype=torch.float32)

# ----------------------------- export to ONNX -------------------------------
onnx_path = Path("tinyblock.onnx").absolute()
OPSET = 17  # set to 21 if you plan to use newest quantizers later
torch.onnx.export(
    model, dummy, onnx_path.as_posix(),
    input_names=["input"],
    output_names=["output"],
    opset_version=OPSET,
    do_constant_folding=True,
    dynamic_axes=None,  # fixed-shape for this quick test
)
print(f"📦 Exported ONNX to: {onnx_path}")

# basic model load check
_ = onnx.load(onnx_path.as_posix())
onnx.checker.check_model(_)
print("✅ ONNX graph loads & passes checker")

# ----------------------------- ORT session (CPU) ----------------------------
sess = ort.InferenceSession(
    onnx_path.as_posix(),
    providers=["CPUExecutionProvider"]
)
print("🧠 ORT providers:", sess.get_providers())

# helper to run ORT
import numpy as np
def run_ort(x_torch: torch.Tensor):
    io_binding = None
    x_np = x_torch.detach().cpu().numpy()
    ort_inputs = {sess.get_inputs()[0].name: x_np}
    out = sess.run(None, ort_inputs)[0]
    return out

# ----------------------------- correctness check ----------------------------
with torch.no_grad():
    ref = model(dummy).cpu().numpy()
    ort_out = run_ort(dummy)

# numeric diffs
abs_diff = np.abs(ref - ort_out)
max_abs = float(abs_diff.max())
mean_abs = float(abs_diff.mean())
print(f"🔍 Numeric diff vs PyTorch: max_abs={max_abs:.3e}, mean_abs={mean_abs:.3e}")

# ----------------------------- timing (real-time diff) ----------------------
def bench_pytorch(model, x, warmup=10, iters=100):
    times = []
    with torch.no_grad():
        for _ in range(warmup):
            _ = model(x)
        for _ in range(iters):
            t0 = time.perf_counter()
            _ = model(x)
            times.append((time.perf_counter() - t0)*1000.0)
    return times

def bench_onnx(sess, x, warmup=10, iters=100):
    times = []
    _ = run_ort(x)  # quick touch
    for _ in range(warmup):
        _ = run_ort(x)
    for _ in range(iters):
        t0 = time.perf_counter()
        _ = run_ort(x)
        times.append((time.perf_counter() - t0)*1000.0)
    return times

print("\n⏱️  Benchmarking on CPU…")
pt_times  = bench_pytorch(model, dummy, warmup=10, iters=100)
ort_times = bench_onnx(sess, dummy,  warmup=10, iters=100)

def stats(xs):
    return {
        "avg_ms": statistics.mean(xs),
        "p50_ms": statistics.median(xs),
        "p95_ms": statistics.quantiles(xs, n=20)[18],  # approx 95th
        "min_ms": min(xs),
        "max_ms": max(xs),
        "n": len(xs),
    }

pt_s  = stats(pt_times)
ort_s = stats(ort_times)

print(f"PyTorch  CPU  -> avg {pt_s['avg_ms']:.3f} ms | p50 {pt_s['p50_ms']:.3f} | p95 {pt_s['p95_ms']:.3f} | min {pt_s['min_ms']:.3f}")
print(f"ONNX RT  CPU  -> avg {ort_s['avg_ms']:.3f} ms | p50 {ort_s['p50_ms']:.3f} | p95 {ort_s['p95_ms']:.3f} | min {ort_s['min_ms']:.3f}")

speedup = pt_s["avg_ms"] / ort_s["avg_ms"] if ort_s["avg_ms"] > 0 else float("inf")
print(f"\n⚖️  Reported numeric diff (max/mean abs): {max_abs:.3e} / {mean_abs:.3e}")
print(f"🚀 Relative speedup (PyTorch avg / ORT avg): {speedup:.2f}×")

print("\nDone. If ORT runs and numeric diffs are tiny, your ONNX toolchain is good to go.")


❌ Missing packages: torch, onnxruntime
Install them (CPU-only):
  pip install --upgrade torch --index-url https://download.pytorch.org/whl/cpu
  pip install onnx onnxruntime


SystemExit: 

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [4]:
# %% Fix & verify torch / onnx / onnxruntime inside THIS Jupyter kernel

import sys, subprocess, pkgutil, platform

print("Python exe:", sys.executable)
print("Python ver:", sys.version.split()[0], "| OS:", platform.platform())

def pip_install(*args):
    cmd = [sys.executable, "-m", "pip", "install", *args]
    print(">", " ".join(cmd))
    return subprocess.check_call(cmd)

# 1) Show where pip would install and what pip is being used
subprocess.check_call([sys.executable, "-m", "pip", "-V"])
subprocess.check_call([sys.executable, "-m", "pip", "list"])

# 2) (Re)install CPU wheels into THIS kernel's environment
#    - Force reinstall to avoid path confusion
#    - Torch CPU: use the official CPU wheel index (works on Windows/Linux/macOS-Intel)
pip_install("--upgrade", "--force-reinstall", "pip", "setuptools", "wheel")
pip_install("--upgrade", "--force-reinstall", "onnx")
pip_install("--upgrade", "--force-reinstall", "onnxruntime")   # CPU package
try:
    pip_install("--upgrade", "--force-reinstall",
                "torch", "torchvision", "torchaudio",
                "--index-url", "https://download.pytorch.org/whl/cpu")
except subprocess.CalledProcessError as e:
    print("PyTorch CPU install via index-url failed; trying default PyPI (still CPU-only).")
    pip_install("--upgrade", "--force-reinstall", "torch", "torchvision", "torchaudio")

# 3) Sanity-imports in THIS kernel
errs = []
for mod in ("torch", "onnx", "onnxruntime"):
    try:
        __import__(mod)
        m = sys.modules[mod]
        print(f"✅ import {mod} ok — version:", getattr(m, "__version__", "n/a"))
    except Exception as e:
        errs.append((mod, repr(e)))
        print(f"❌ import {mod} failed:", e)

# 4) Extra checks & tips
if errs:
    print("\nTroubleshooting tips:")
    print("• If you see multiple Python installs, ensure your Jupyter kernel uses this Python exe shown above.")
    print("• In Jupyter, always prefer `%pip install ...` (or the pattern we used: `sys.executable -m pip ...`).")
    print("• If you previously installed `onnxruntime-gpu`, uninstall it: `pip uninstall -y onnxruntime-gpu` and keep `onnxruntime`.")
else:
    print("\nAll good. You can proceed with ONNX Runtime on CPU in this notebook.")


Python exe: c:\Users\Akshat Kumar\OneDrive - Indian Institute of Technology Bombay\Desktop\IITB Notes\7th_Sem\IE 643\PROJECT\.instaflow310\Scripts\python.exe
Python ver: 3.10.0 | OS: Windows-10-10.0.26200-SP0
> c:\Users\Akshat Kumar\OneDrive - Indian Institute of Technology Bombay\Desktop\IITB Notes\7th_Sem\IE 643\PROJECT\.instaflow310\Scripts\python.exe -m pip install --upgrade --force-reinstall pip setuptools wheel
> c:\Users\Akshat Kumar\OneDrive - Indian Institute of Technology Bombay\Desktop\IITB Notes\7th_Sem\IE 643\PROJECT\.instaflow310\Scripts\python.exe -m pip install --upgrade --force-reinstall onnx
> c:\Users\Akshat Kumar\OneDrive - Indian Institute of Technology Bombay\Desktop\IITB Notes\7th_Sem\IE 643\PROJECT\.instaflow310\Scripts\python.exe -m pip install --upgrade --force-reinstall onnxruntime
> c:\Users\Akshat Kumar\OneDrive - Indian Institute of Technology Bombay\Desktop\IITB Notes\7th_Sem\IE 643\PROJECT\.instaflow310\Scripts\python.exe -m pip install --upgrade --force

In [9]:
# %% Windows ORT + NumPy ABI fix (run once in THIS kernel)
import sys, subprocess, platform

def pip(*args):
    print(">", sys.executable, "-m", "pip", *args)
    return subprocess.check_call([sys.executable, "-m", "pip", *args])

print("Python  :", sys.executable)
print("Version :", sys.version.split()[0], "| OS:", platform.platform())

# 0) Remove GPU/DML variants (if any)
for pkg in ("onnxruntime-gpu", "onnxruntime-directml"):
    try:
        pip("uninstall", "-y", pkg)
    except subprocess.CalledProcessError:
        pass

# 1) Eject onnxruntime-extensions FOR NOW (it auto-imports ORT and can re-trigger the mismatch)
try:
    pip("uninstall", "-y", "onnxruntime-extensions")
except subprocess.CalledProcessError:
    pass

# 2) Force NumPy to a pre-2.0 ABI and reinstall ORT CPU against it
pip("install", "--upgrade", "--force-reinstall", "numpy==1.26.4")
pip("install", "--upgrade", "--force-reinstall", "onnxruntime==1.17.3")

# 3) If ONNX itself complains later, we’ll re-pin it; preemptively align it now:
pip("install", "--upgrade", "--force-reinstall", "onnx==1.15.0")

print("\n✅ Packages aligned. IMPORTANT: Restart the Jupyter kernel now (Kernel → Restart).")
print("After restart, run the sanity cell I’ll give you next.")


Python  : c:\Users\Akshat Kumar\OneDrive - Indian Institute of Technology Bombay\Desktop\IITB Notes\7th_Sem\IE 643\PROJECT\.instaflow310\Scripts\python.exe
Version : 3.10.0 | OS: Windows-10-10.0.26200-SP0
> c:\Users\Akshat Kumar\OneDrive - Indian Institute of Technology Bombay\Desktop\IITB Notes\7th_Sem\IE 643\PROJECT\.instaflow310\Scripts\python.exe -m pip uninstall -y onnxruntime-gpu
> c:\Users\Akshat Kumar\OneDrive - Indian Institute of Technology Bombay\Desktop\IITB Notes\7th_Sem\IE 643\PROJECT\.instaflow310\Scripts\python.exe -m pip uninstall -y onnxruntime-directml
> c:\Users\Akshat Kumar\OneDrive - Indian Institute of Technology Bombay\Desktop\IITB Notes\7th_Sem\IE 643\PROJECT\.instaflow310\Scripts\python.exe -m pip uninstall -y onnxruntime-extensions
> c:\Users\Akshat Kumar\OneDrive - Indian Institute of Technology Bombay\Desktop\IITB Notes\7th_Sem\IE 643\PROJECT\.instaflow310\Scripts\python.exe -m pip install --upgrade --force-reinstall numpy==1.26.4
> c:\Users\Akshat Kumar\On