In [None]:
import torch
import numpy as np
from diffusers.image_processor import VaeImageProcessor
from diffusers.utils import load_image
from diffusers.pipelines.pipeline_utils import numpy_to_pil

input_image = load_image(
 "https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/kandinsky3/t2i.png"
)
midpoint_image = torch.randn(1, 4, 64, 64)
image_processor = VaeImageProcessor(
    vae_scale_factor=2**3,
    vae_latent_channels=4,
    resample="bicubic",
    reducing_gap=1
)

def preprocess(image, image_processor=None, branch="default"):
    if branch == "main":
        arr = np.array(image.convert("RGB"))
        arr = arr.astype(np.float32) / 127.5 - 1
        arr = np.transpose(arr, [2, 0, 1])
        image = torch.from_numpy(arr).unsqueeze(0)
        return image
    
    image = image_processor.preprocess(image)
    return image

def postprocess(image, image_processor=None, branch="default"):
    if branch=="main":
        image = image * 0.5 + 0.5
        image = image.clamp(0, 1)
        image = image.cpu().permute(0, 2, 3, 1).float().numpy()
        image = numpy_to_pil(image)[0]
        return image
    
    image = image_processor.postprocess(image)[0]
    return image

if torch.equal(preprocess(input_image, image_processor), preprocess(input_image, branch="main")):
    print("Preprocessed images are exactly the same.")

if list(postprocess(midpoint_image, image_processor).getdata()) == list(postprocess(midpoint_image, branch="main").getdata()):
    print("Postprocessed images are exactly the same.")


In [None]:
from PIL import Image, ImageChops
image = Image.open('/home/ishan/Downloads/test4.jpg')
image1 = Image.open('/home/ishan/Downloads/test5.jpg')

if list(image.getdata()) == list(image1.getdata()):
    print("Images are exactly the same.")

ImageChops.difference(image, image1).save('/home/ishan/Downloads/diff.jpg')

In [None]:
from diffusers.models.embeddings import PatchEmbed

In [None]:
patch_embed = PatchEmbed()

In [None]:
import torch
t = torch.randn(1, 3, 224, 224)

print(patch_embed(t).shape)

In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
from transformers import CLIPImageProcessor
from PIL import Image

# Initialize the processor
image_processor = CLIPImageProcessor(
    do_resize=True,
    resample=Image.BICUBIC,
    do_center_crop=True,
    do_normalize=True,
    image_mean=[0.5, 0.5, 0.5],
    image_std=[0.5, 0.5, 0.5]
)

def process_image(control_signal, hw):
    """Process and return the image tensor."""
    height, width = int(hw[0]), int(hw[1])
    control_signal = control_signal.convert("RGB")
    
    processed = image_processor(
        control_signal,
        size={"height": height, "width": width},
        crop_size={"height": height, "width": width},
        return_tensors="pt"
    )["pixel_values"]  # Returns tensor (1, 3, H, W)

    return processed

def show_processed_image(processed_image):
    """Denormalize and show the processed image."""
    mean = torch.tensor([0.5, 0.5, 0.5]).view(3, 1, 1)
    std = torch.tensor([0.5, 0.5, 0.5]).view(3, 1, 1)

    # Reverse normalization: x' = x * std + mean
    image = processed_image[0] * std + mean  # (3, H, W)
    image = image.clamp(0, 1)  # Ensure values are in [0,1]
    
    # Convert to NumPy and transpose (C, H, W) -> (H, W, C)
    image_np = image.permute(1, 2, 0).cpu().numpy()
    
    # Display the image
    plt.imshow(image_np)
    plt.axis("off")
    plt.show()

# Example usage
image = Image.open("/home/ishan/Downloads/ishan.jpg")  # Replace with actual image path
print(image.size)
hw = (224, 224)  # Desired height and width

processed_image = process_image(image, hw)
print(processed_image.shape)
show_processed_image(processed_image)


In [None]:
!wget https://github.com/richzhang/hed/releases/download/0.2.0/hed_pretrained_bsds.caffemodel

In [None]:
from diffusers import StableDiffusionControlNetPipeline, ControlNetModel
import torch
from PIL import Image
import requests
from io import BytesIO

# Load the ControlNet model trained with HED edge detection
controlnet = ControlNetModel.from_pretrained("lllyasviel/control_v11p_sd15_softedge", torch_dtype=torch.float16)

# Load the base model
pipe = StableDiffusionControlNetPipeline.from_pretrained(
    "runwayml/stable-diffusion-v1-5",
    controlnet=controlnet,
    torch_dtype=torch.float16
).to("cuda")

# Prepare input image
image_url = "https://drive.google.com/file/d/1jqnWqB2z47UKSrAUgLhkpEDSodtQqrFs/view?usp=sharing"
image = Image.open(BytesIO(requests.get(image_url).content)).convert("RGB")

# Generate the HED soft edges
output = pipe(image, num_inference_steps=50).images[0]

# Save the result
output.save("soft_edge_output.png")


In [None]:
from diffusers.utils import load_image
control_image = load_image(
    "https://huggingface.co/ishan24/Sana_600M_1024px_ControlNet_diffusers/resolve/main/hed_result.png"
)

In [None]:
control_image

In [None]:
import numpy as np

img = np.array(control_image, dtype=np.float32)
img = img.flatten()

np.concatenate((img[:16], img[-16:]))

In [None]:
from diffusers.image_processor import VaeImageProcessor
from PIL import Image
import numpy as np
import torch



def prepare_image(pil_image, w=512, h=512):
    pil_image = pil_image.resize((w, h), resample=Image.BICUBIC, reducing_gap=1)
    arr = np.array(pil_image.convert("RGB"))
    arr = arr.astype(np.float32) / 127.5 - 1
    arr = np.transpose(arr, [2, 0, 1])
    image = torch.from_numpy(arr).unsqueeze(0)
    return image

def prepare_image1(image, w=512, h=512):
    image = image_processor.preprocess(image, h, w)
    return image

In [None]:
img1 = prepare_image(control_image)
img2 = prepare_image1(control_image)

In [None]:
mask = img1 == img2
mask = mask.flatten()
mask.sum() == len(mask)

In [None]:
from typing import Dict

def _assign_components_to_devices(
    module_sizes: Dict[str, float], device_memory: Dict[str, float], device_mapping_strategy: str = "balanced"
):
    device_ids = list(device_memory.keys())
    device_cycle = device_ids + device_ids[::-1]
    device_memory = device_memory.copy()

    device_id_component_mapping = {}
    current_device_index = 0
    for component in module_sizes:
        print(current_device_index % len(device_cycle))
        device_id = device_cycle[current_device_index % len(device_cycle)]
        component_memory = module_sizes[component]
        curr_device_memory = device_memory[device_id]
        print("Component Memory", component_memory, flush=True)
        print("Current Device Memory", curr_device_memory, flush=True)
        # If the GPU doesn't fit the current component offload to the CPU.
        if component_memory > curr_device_memory:
            device_id_component_mapping["cpu"] = [component]
        else:
            if device_id not in device_id_component_mapping:
                device_id_component_mapping[device_id] = [component]
            else:
                device_id_component_mapping[device_id].append(component)

            # Update the device memory.
            device_memory[device_id] -= component_memory
            current_device_index += 1
    print("Device ID Component Mapping", device_id_component_mapping, flush=True)
    return device_id_component_mapping

In [None]:
_assign_components_to_devices({'text_encoder': 522869032, 'transformer': 13834957424, 'vae': 6222450053}, {0: 15540027392, 1: 15500181504})

In [None]:
# !pip install git+https://github.com/huggingface/transformers.git
!pip install transformers==4.49.0

In [None]:
import transformers
transformers.__version__

In [None]:
from transformers import AutoTokenizer, CLIPTextModelWithProjection
import torch

model = CLIPTextModelWithProjection.from_pretrained("stabilityai/stable-diffusion-3.5-medium", subfolder="text_encoder", torch_dtype=torch.bfloat16)

In [None]:
model.text_projection.weight.dtype

In [None]:
model.text_model.final_layer_norm.weight.dtype

In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("stabilityai/stable-diffusion-3.5-medium", subfolder="tokenizer", torch_dtype="bfloat16")
text_inputs = tokenizer(
    "Hello world!",
    padding="max_length",
    max_length=16,
    truncation=True,
    return_tensors="pt",
)

print(text_inputs.input_ids.dtype, text_inputs.input_ids.device)  # torch.int64

In [None]:
outputs = model(**text_inputs, output_hidden_states=True)
outputs.text_embeds.shape

In [None]:
import torch

t = torch.randn(4)


In [None]:
import torch
torch.tensor([1,2], dtype=torch.int8)

In [None]:
import torch
def unpack_weights(uint8tensor, bits):
    num_values = uint8tensor.shape[0] * 8 // bits

    num_steps = 8 // bits

    unpacked_tensor = torch.zeros((num_values), dtype=torch.uint8)

    unpacked_idx = 0

    # 1 0 3 2 - 01 00 11 10

    # [00000000 00000000 00000000 00000000]
    # [10110001 00101100 00001011 00000010]
    # [00000001 00000000 00000011 00000010]

    # 10110001
    # 00000011
    
    # 00000001

    # 1: [10110001]
    # 2: [00101100]
    # 3: [00001011]

    mask = 2 ** bits - 1

    for i in range(uint8tensor.shape[0]):
        for j in range(num_steps):
            unpacked_tensor[unpacked_idx] |= uint8tensor[i] >> (bits * j)
            unpacked_idx += 1
    
    print(mask)
    print(unpacked_tensor)
    unpacked_tensor &= mask
    return unpacked_tensor

In [None]:
torch.randint()

In [None]:
unpacked_tensor = torch.tensor([177, 255], 
                               dtype=torch.uint8)

In [None]:
# Answer should be: torch.tensor([1, 0, 3, 2, 3, 3, 3, 3]
unpack_weights(unpacked_tensor, 2)

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfig
from accelerate import init_empty_weights

with init_empty_weights():
    checkpoint = "HuggingFaceTB/SmolLM2-135M"
    config = AutoConfig.from_pretrained(checkpoint)
    model = AutoModelForCausalLM.from_config(config)

In [None]:
# pip install transformers
from transformers import AutoModelForCausalLM, AutoTokenizer
checkpoint = "HuggingFaceTB/SmolLM2-135M"
tokenizer = AutoTokenizer.from_pretrained(checkpoint)
# for multiple GPUs install accelerate and do `model = AutoModelForCausalLM.from_pretrained(checkpoint, device_map="auto")`
model = AutoModelForCausalLM.from_pretrained(checkpoint)
inputs = tokenizer.encode("Gravity is", return_tensors="pt")
outputs = model.generate(inputs)
print(tokenizer.decode(outputs[0]))

In [None]:
import modelopt.torch.quantization as mtq
import modelopt.torch.opt.conversion as mtc
from copy import deepcopy

mq = deepcopy(model)
config = mtq.FP8_DEFAULT_CFG
mtq.quantize(mq,config)

In [None]:
type(mq.model.layers[0].self_attn.q_proj.weight)

In [None]:
model.get_memory_footprint()/1e6, mq.get_memory_footprint()/1e6

In [None]:
import torch

hidden = torch.randn((2,32,32,32))
encoder = torch.randn((2,10,300,2304))
timestep = torch.Tensor([0,0])

with torch.no_grad():
    output = model(hidden, encoder, timestep)

In [None]:
import torch

timestep = torch.Tensor([0,1])

scm_timestep = torch.sin(timestep) / (torch.cos(timestep) + torch.sin(timestep))

scm_timestep_expanded = scm_timestep.view(-1, 1, 1, 1)

print(scm_timestep_expanded.dtype)

In [None]:
from diffusers.quantizers.quantization_config import NVIDIAModelOptConfig
quant_config = NVIDIAModelOptConfig(quant_type="FP8_WO", block)

quant_config.get_config_from_quant_type()

In [None]:
# !pip install "git+https://github.com/ishan-modi/diffusers.git@add-trtquant-backend"
# !pip install "git+https://github.com/ishan-modi/TensorRT-Model-Optimizer.git@fixes-hfargs-order#egg=nvidia-modelopt[torch, onnx]"

import torch
from diffusers import SanaTransformer2DModel
from diffusers.quantizers.quantization_config import NVIDIAModelOptConfig

checkpoint = "Efficient-Large-Model/Sana_600M_1024px_diffusers"

# quant_config = {"quant_type": "INT4_WO", "quant_method": "modelopt", "modules_to_not_convert": ["conv"], "block_quantize": True}
quant_config = {"quant_type": "FP8_WO", "quant_method": "modelopt", "modules_to_not_convert": ["conv"]}
quant_config = NVIDIAModelOptConfig(quant_type="FP8_WO")
model = SanaTransformer2DModel.from_pretrained(checkpoint, subfolder="transformer", quantization_config=quant_config, torch_dtype=torch.bfloat16).to('cuda')

hidden = torch.randn((2,32,32,32), dtype=torch.bfloat16).to('cuda')
encoder = torch.randn((2,10,300,2304), dtype=torch.bfloat16).to('cuda')
timestep = torch.Tensor([0,0]).to('cuda')
with torch.no_grad():
    output = model(hidden, encoder, timestep)

In [None]:
torch.__version__

In [None]:
import torch
from torch import nn

class DummyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.l1 = nn.Conv2d(128, 128, 3)
        self.l2 = nn.Conv2d(128, 256, 5)
        self.l3 = nn.Conv2d(256, 128, 3)
        
        self._init_weights()  # Call weight initialization

    def _init_weights(self):
        """Initialize weights for all layers"""
        for m in self.modules():
            if isinstance(m, nn.Linear):
                torch.nn.init.xavier_uniform_(m.weight)  # Xavier initialization
    
    def forward(self, x):
        return self.l3(self.l2(self.l1(x)))

model = DummyModel()
model

In [None]:
from modelopt.torch.quantization import quantize, compress
from modelopt.torch.quantization.config import FP8_DEFAULT_CFG

FP8_DEFAULT_CFG['quant_cfg']['*weight_quantizer'].update({'fake_quant': False})
FP8_DEFAULT_CFG['quant_cfg']['*input_quantizer'].update({'enable': False})
model = quantize(model, FP8_DEFAULT_CFG)
compress(model)

In [None]:
import torch
from modelopt.torch.quantization.utils import export_torch_mode

x = torch.randn((1,128, 10, 10))
with torch.no_grad():
    with export_torch_mode():
        out = model(x)

out.shape

In [None]:
import torch
from diffusers import SanaTransformer2DModel

checkpoint = "Efficient-Large-Model/Sana_600M_1024px_diffusers"

# quant_config = {"quant_type": "INT4_WO", "quant_method": "modelopt", "modules_to_not_convert": ["conv"], "block_quantize": True}
quant_config = {"quant_type": "FP8_WO", "quant_method": "modelopt", "modules_to_not_convert": ["conv"]}
model = SanaTransformer2DModel.from_pretrained(checkpoint, subfolder="transformer", quantization_config=quant_config, torch_dtype=torch.bfloat16)

In [None]:
from modelopt.torch.export import export_hf_checkpoint

with torch.inference_mode():
    export_hf_checkpoint(
        model,  # The quantized model.
        ".",  # The directory where the exported files will be stored.
    )

In [None]:
hidden = torch.randn((2,32,32,32), dtype=torch.bfloat16)
encoder = torch.randn((2,10,300,2304), dtype=torch.bfloat16)
timestep = torch.Tensor([0,0])
with torch.no_grad():
    output = model(hidden, encoder, timestep)

In [None]:
import torch
from diffusers import SanaTransformer2DModel
from diffusers.quantizers.quantization_config import NVIDIAModelOptConfig

checkpoint = "Efficient-Large-Model/Sana_600M_1024px_diffusers"

quant_config = {"quant_type": "FP8", "quant_method": "modelopt", "block_quantize": 128, "channel_quantize": -1}

quant_config = NVIDIAModelOptConfig(**quant_config)
print(quant_config.get_config_from_quant_type())
quant_model = SanaTransformer2DModel.from_pretrained(checkpoint, subfolder="transformer", quantization_config=quant_config, torch_dtype=torch.bfloat16)

In [None]:
quant_model.get_memory_footprint() / 1e6

In [None]:
import torch
from diffusers import SanaTransformer2DModel
from diffusers.quantizers.quantization_config import NVIDIAModelOptConfig

checkpoint = "Efficient-Large-Model/Sana_600M_1024px_diffusers"

quant_config_fp8 = {"quant_type": "FP8", "quant_method": "modelopt"}
quant_config_int4 = {"quant_type": "INT4", "quant_method": "modelopt", "block_quantize": 128, "channel_quantize": -1, "modules_to_not_convert": ["conv"]}
quant_config_nf4 = {"quant_type": "NF4", "quant_method": "modelopt", "block_quantize": 128, "channel_quantize": -1, "modules_to_not_convert": ["conv"]}
quant_config_nvfp4 = {"quant_type": "NVFP4", "quant_method": "modelopt", "block_quantize": 128, "channel_quantize": -1, "modules_to_not_convert": ["conv"]}

quant_config_fp8 = NVIDIAModelOptConfig(**quant_config_fp8)
quant_config_int4 = NVIDIAModelOptConfig(**quant_config_int4)
quant_config_nf4 = NVIDIAModelOptConfig(**quant_config_nf4)
quant_config_nvfp4 = NVIDIAModelOptConfig(**quant_config_nvfp4)

print(quant_config_fp8.get_config_from_quant_type())
quant_model_fp8 = SanaTransformer2DModel.from_pretrained(checkpoint, subfolder="transformer", quantization_config=quant_config_fp8, torch_dtype=torch.bfloat16)
# print(quant_config_int4.get_config_from_quant_type())
# quant_model_int4 = SanaTransformer2DModel.from_pretrained(checkpoint, subfolder="transformer", quantization_config=quant_config_int4, torch_dtype=torch.bfloat16).to('cuda')
# print(quant_config_nf4.get_config_from_quant_type())
# quant_model_nf4 = SanaTransformer2DModel.from_pretrained(checkpoint, subfolder="transformer", quantization_config=quant_config_nf4, torch_dtype=torch.bfloat16).to('cuda')
# print(quant_config_nvfp4.get_config_from_quant_type())
# quant_model_nvfp4 = SanaTransformer2DModel.from_pretrained(checkpoint, subfolder="transformer", quantization_config=quant_config_nvfp4, torch_dtype=torch.bfloat16).to('cuda')
# model = SanaTransformer2DModel.from_pretrained(checkpoint, subfolder="transformer", torch_dtype=torch.bfloat16).to('cuda')

print("FP8 Model Memory Footprint: ", quant_model_fp8.get_memory_footprint() / 1e6)
# print("INT4 Model Memory Footprint: ", quant_model_int4.get_memory_footprint() / 1e6)
# print("NF4 Model Memory Footprint: ", quant_model_nf4.get_memory_footprint() / 1e6)
# print("NVFP4 Model Memory Footprint: ", quant_model_nvfp4.get_memory_footprint() / 1e6)
# print("FP8 Model Memory Footprint: ", quant_model_fp8.get_memory_footprint() / 1e6)

In [None]:
from torch import nn
print(isinstance(quant_model_fp8.patch_embed.proj.weight, nn.Parameter))

quant_model_fp8.patch_embed.proj

In [None]:
quant_model_fp8.push_to_hub("ishan24/test_modelopt_quant")

In [None]:
from modelopt.torch.opt import enable_huggingface_checkpointing

enable_huggingface_checkpointing()

In [None]:
quant_model_fp8.save_pretrained("test_modelopt_quant")

In [None]:
from diffusers import SanaTransformer2DModel
import modelopt.torch.opt as mto
mto.enable_huggingface_checkpointing()

qmodel = SanaTransformer2DModel.from_pretrained("/home/ishan/vscode-workspace/diffusers/test_modelopt_quant")
qmodel.get_memory_footprint() / 1e6

In [None]:
from transformers import AutoModelForCausalLM
from diffusers import SanaTransformer2DModel
import modelopt.torch.quantization as mtq

checkpoint = "HuggingFaceTB/SmolLM2-135M"
# checkpoint = "Efficient-Large-Model/Sana_600M_1024px_diffusers"

model = AutoModelForCausalLM.from_pretrained(checkpoint)
# model = SanaTransformer2DModel.from_pretrained(checkpoint, subfolder="transformer")

config = mtq.FP8_DEFAULT_CFG
config['quant_cfg']['*weight_quantizer'].update({'fake_quant': False})
mq = mtq.quantize(model, config)
mtq.compress(mq)

mq.save_pretrained("test_modelopt_quant")

In [None]:
from transformers import AutoModelForCausalLM
from diffusers import SanaTransformer2DModel

import modelopt.torch.quantization as mtq

checkpoint = "/home/ishan/vscode-workspace/diffusers/test_modelopt_quant"
model = AutoModelForCausalLM.from_pretrained(checkpoint)
# model = SanaTransformer2DModel.from_pretrained(checkpoint)

In [None]:
import torch
from typing import Union
from modelopt.torch.quantization.utils import reduce_block_padding, reduce_amax, reduce_block_amax, convert_quantization_axis_to_reduce_axis

def quantize(
        input: torch.Tensor,
        scales: torch.Tensor = None,
        axis: Union[tuple, int, None] = None,
        block_sizes: dict = None,
    ) -> tuple:
        """Converting a tensor to a quantized format based on FP8 quantization. Only E4M3 is supported.

        Args:
            input (torch.Tensor): The input tensor to be quantized.
            scales (torch.Tensor): The scales for quantization.
            axis: The dimensions to reduce for quantization. None or int or tuple of ints.
            block_sizes (dict): A dictionary specifying the block size for each dimension.

        Note: One can only provide axis or block_sizes for FP8 quantization.

        Returns:
            tuple: FP8QTensor, scales
        """
        original_input = input

        # If block_sizes is provided, pad the input so that each dimension is divisible by the block size.
        if block_sizes:
            input = reduce_block_padding(input, block_sizes)

        print(input.shape)
        # Compute scales if not provided
        if scales is None:
            if block_sizes:
                amax = reduce_block_amax(input, block_sizes)
            else:
                reduce_axis = convert_quantization_axis_to_reduce_axis(input, axis)
                amax = reduce_amax(input, axis=reduce_axis)
            scales = amax / 448.0  # Consider parameterizing the divisor if needed

        # Determine the expected scales shape from the (possibly padded) input
        expected_shape = list(input.shape)
        expanded_scales = scales
        print(scales.shape, expanded_scales.shape)
        if block_sizes:
            for dim, block_size in block_sizes.items():
                # Convert negative indices to positive ones.
                dim = dim if dim >= 0 else len(input.shape) + dim
                # After padding, this should always hold.
                assert input.shape[dim] % block_size == 0, (
                    f"Tensor dimension {dim}, {input.shape[dim]} is not divisible by {block_size} even after padding."
                )
                # The scales tensor is expected to have size equal to input.shape[dim] // block_size.
                expected_shape[dim] = input.shape[dim] // block_size

            # Verify that the provided scales shape matches the expected shape.
            assert scales.shape == tuple(expected_shape), (
                f"Mismatch in expected scale shape: {scales.shape} vs {tuple(expected_shape)}"
            )
            
            # Expand scales along each block dimension for broadcasting.
            for dim, block_size in block_sizes.items():
                expanded_scales = expanded_scales.repeat_interleave(block_size, dim=dim)
                print(scales.shape, expanded_scales.shape)

        # Perform quantization using FP8 (E4M3) format.
        quantized_data = (input / expanded_scales).to(torch.float8_e4m3fn)

        # Crop quantized_data back to the original shape (if padding was added).
        slices = tuple(slice(0, dim) for dim in original_input.shape)
        quantized_data_cropped = quantized_data[slices]


import tracemalloc
tracemalloc.start()
t = torch.randn(4, 9000, 9000)
block_sizes = {-1:128}
quantize(t, block_sizes=block_sizes)
current, peak = tracemalloc.get_traced_memory()
print(f"Current: {current / 1024:.2f} KB; Peak: {peak / 1024:.2f} KB")
tracemalloc.stop()

In [None]:
import torch
from typing import Union
from modelopt.torch.quantization.utils import reduce_block_padding, reduce_amax, reduce_block_amax, convert_quantization_axis_to_reduce_axis

def int_quantize(input: torch.Tensor, block_size: int) -> torch.Tensor:
        """Converting a tensor to a quantized format based on INT4 (AWQ) quantization.

        Args:
            input (torch.Tensor): The input tensor to be quantized.
            block_size (int): The size of each block for quantization.

        Returns:
            tuple: Contains quantized data, input quantization config, and scale quantization config.
        """

        scale_quant_maxbound = 2 ** (4 - 1) - 1

        # pad the input if needed
        original_input = input
        input = reduce_block_padding(input.view(-1), block_sizes={-1: block_size})

        print(input.shape)
        # get scales for each block
        block_input = input.view(-1, block_size)
        print(block_input.shape)
        scales = scale_quant_maxbound / reduce_amax(block_input, -1)
        print(scales.shape)
        # expand scalers to match shape of input
        scales = scales.view(block_input.shape[0], -1)  # shape: (block_input.shape[0], 1)

        scaled_blocks = block_input * scales
        flattened = scaled_blocks.flatten()
        # uint4: 0 - 15
        flattened = flattened.round().clamp(
            -(scale_quant_maxbound + 1), scale_quant_maxbound
        ) + (scale_quant_maxbound + 1)
        flattened = flattened.to(torch.uint8)

        packed_output_uint8 = torch.empty(
            input.numel() // 2, dtype=torch.uint8, device=input.device
        )
        # pack the int4 weights into a uint8 tensor
        # packing format: w0, w1, w2, w3, w4, w5, ...
        #               | byte  | byte  | byte  |
        packed_output_uint8 = flattened[::2] << 4 | flattened[1::2]


import tracemalloc
tracemalloc.start()
t = torch.randn(4, 9000, 9000)
block_sizes = {-1:128}
int_quantize(t, block_sizes[-1])
current, peak = tracemalloc.get_traced_memory()
print(f"Current: {current / 1024:.2f} KB; Peak: {peak / 1024:.2f} KB")
tracemalloc.stop()

In [None]:
import torch
from torch import nn

class DummyModel(nn.Module):
    def __init__(self):
        super().__init__()

        self.l1 = nn.Linear(1024, 1024)
        self.l2 = nn.Linear(1024, 4096)
        self.l3 = nn.Linear(4096, 1024)
    
    def forward(self, x):
        return self.l3(self.l2(self.l1(x)))

model = DummyModel()

# get memory footprint
total = 0
for name, param in model.named_parameters():
    total += param.numel() * param.element_size()
total/1e6

In [None]:
import modelopt.torch.quantization as mtq

config = mtq.FP8_DEFAULT_CFG
config['quant_cfg']['*weight_quantizer']['fake_quant'] = False
print(config)
model = mtq.quantize(model, config)
mtq.compress(model)

In [None]:
total = 0
for name, param in model.named_parameters():
    total += param.numel() * param.element_size()
total/1e6

In [None]:
import transformers

transformers.__version__

In [None]:
import torch
from torch import nn
from torch.nn import functional as F
from einops import rearrange, repeat
from torch.nn.attention import SDPBackend, sdpa_kernel

class AttentionSeva(nn.Module):
    def __init__(
        self,
        query_dim: int,
        context_dim: int | None = None,
        heads: int = 8,
        dim_head: int = 64,
        dropout: float = 0.0,
    ):
        super().__init__()
        self.heads = heads
        self.dim_head = dim_head
        inner_dim = dim_head * heads
        context_dim = context_dim or query_dim

        self.to_q = nn.Linear(query_dim, inner_dim, bias=False)
        self.to_k = nn.Linear(context_dim, inner_dim, bias=False)
        self.to_v = nn.Linear(context_dim, inner_dim, bias=False)
        self.to_out = nn.Sequential(
            nn.Linear(inner_dim, query_dim), nn.Dropout(dropout)
        )

    def forward(
        self, x: torch.Tensor, context: torch.Tensor | None = None
    ) -> torch.Tensor:
        q = self.to_q(x)
        context = context if context is not None else x
        k = self.to_k(context)
        v = self.to_v(context)
        q, k, v = map(
            lambda t: rearrange(t, "b l (h d) -> b h l d", h=self.heads),
            (q, k, v),
        )
        with sdpa_kernel(SDPBackend.FLASH_ATTENTION):
            out = F.scaled_dot_product_attention(q, k, v)
        out = rearrange(out, "b h l d -> b l (h d)")
        out = self.to_out(out)
        return out

In [None]:
from diffusers.models.attention import Attention

model1 = AttentionSeva(1024, 1024, 8, 128)
model2 = Attention(1024, 1024, 8, dim_head=128, )

In [None]:
model1, model2

In [None]:
import torch
from einops import rearrange

x = torch.randn(16, 64, 64)
y = x.view(x.shape[0]//4, 4*x.shape[1],x.shape[2])

print(y.shape)

x = rearrange(x, "(b t) (h w) c -> b (t h w) c", t=4, h=8, w=8)
print(x.shape)

torch.allclose(x, y)

In [None]:
import os
import ast
from pathlib import Path

# Define the base folder to search
BASE_DIR = Path("./src/diffusers/pipelines")

class PipelinePropertyExtractor(ast.NodeVisitor):
    def __init__(self):
        self.pipeline_classes = {}

    def visit_ClassDef(self, node):
        inherits_from_pipeline = any(
            isinstance(base, ast.Name) and base.id == "DiffusionPipeline"
            for base in node.bases
        )
        if inherits_from_pipeline:
            properties = []
            for item in node.body:
                if isinstance(item, ast.FunctionDef):
                    for decorator in item.decorator_list:
                        if isinstance(decorator, ast.Name) and decorator.id == "property":
                            properties.append(item.name)
            self.pipeline_classes[node.name] = properties
        self.generic_visit(node)

def extract_pipeline_properties_from_file(file_path):
    with open(file_path, "r", encoding="utf-8") as f:
        tree = ast.parse(f.read(), filename=str(file_path))
        extractor = PipelinePropertyExtractor()
        extractor.visit(tree)
        return extractor.pipeline_classes

all_pipeline_properties = {}

for py_file in BASE_DIR.rglob("*.py"):
    props = extract_pipeline_properties_from_file(py_file)
    if props:
        all_pipeline_properties[str(py_file)] = props

master = {
    "_guidance_scale":  1.0,
    "clip_skip":  None,
    "_interrupt":  False,
    "_cross_attention_kwargs":  None,
    "_num_timesteps":  0,
    "_attention_kwargs":  None,
    "_guidance_rescale":  0.0,
    "_denoising_start":  None,
    "_denoising_end":  None,
    "_joint_attention_kwargs":  None,
    "_pag_scale":  0.0,
    "_adaptive_pag_scale":  0.0,
    "_current_timestep":  None
}
def process_file(file_path, cls_name, block):
    with open(file_path, "r", encoding="utf-8") as f:
        lines = f.readlines()

    modified_lines = []
    inside_target_class = False
    inside_init = False
    indent = ""
    found_super = False
    inside_call = False

    for i, line in enumerate(lines):
        stripped = line.strip()

        if stripped.startswith("class "+cls_name):
            inside_target_class = True
            cls_i = i

        if inside_target_class and stripped.startswith("def __init__"):
            inside_init = True
            
        if inside_init and "super().__init__()" in stripped:
            indent = line[:line.index("super()")]
            modified_lines.append(line)
            for val in block:
                modified_lines.append(f"{indent}{val}\n")
            found_super = True
            continue

        if inside_init and stripped.startswith("def ") and "def __init__" not in stripped:
            inside_init = False

        if not inside_init and inside_target_class and "def __call__" in stripped:
            inside_call = True
            interrupt_cnt = 0
            current_timestep_cnt = 0

        if inside_call:
            if stripped.startswith('self._interrupt = False') and interrupt_cnt == 0:
                interrupt_cnt += 1
                continue
            if stripped.startswith('self._current_timestep = None') and current_timestep_cnt == 0:
                current_timestep_cnt += 1
                continue
            if interrupt_cnt == 1 and current_timestep_cnt == 1:
                inside_call = False
            

        if inside_target_class and stripped.startswith("class ") and i != cls_i:
            inside_target_class = False
            inside_init = False

        modified_lines.append(line)

    if found_super:
        with open(file_path, "w", encoding="utf-8") as f:
            f.writelines(modified_lines)
        print(f"✅ Modified: {file_path}")

# Print results
for file, classes in all_pipeline_properties.items():
    print(f"\n{file}:")
    for cls, props in classes.items():
        print(f"  Class `{cls}`:")
        block = []
        for prop in props:
            if '_'+prop in master:
                block.append("self._"+prop+"="+str(master.get("_"+prop)))
            print(f"    - @{prop}")
        process_file(file, cls, block)

In [None]:
import torch
from diffusers import SanaTransformer2DModel
from diffusers.quantizers.quantization_config import NVIDIAModelOptConfig

checkpoint = "Efficient-Large-Model/Sana_600M_1024px_diffusers"
model_cls = SanaTransformer2DModel
input = lambda _: torch.randn((2, 32, 32, 32), dtype=torch.bfloat16), torch.randn((2,10,300,2304), dtype=torch.bfloat16), torch.Tensor([0,0])

quant_config_fp8 = {"quant_type": "FP8", "quant_method": "modelopt"}
quant_config_int4 = {"quant_type": "INT4", "quant_method": "modelopt", "block_quantize": 128, "channel_quantize": -1}
quant_config_nvfp4 = {"quant_type": "NVFP4", "quant_method": "modelopt", "block_quantize": 128, "channel_quantize": -1, "modules_to_not_convert": ["conv"]}

def test_quantization(config, checkpoint, model_cls):
    quant_config = NVIDIAModelOptConfig(**config)
    print(quant_config.get_config_from_quant_type())
    quant_model = model_cls.from_pretrained(checkpoint, subfolder="transformer", quantization_config=quant_config, torch_dtype=torch.bfloat16).to('cuda')
    print(f"Quant {config['quant_type']} Model Memory Footprint: ", quant_model.get_memory_footprint() / 1e6)
    return quant_model

def test_quant_inference(model, input, iter=100):
    torch.cuda.empty_cache()
    torch.cuda.reset_max_memory_allocated()
    inference_memory = 0
    for _ in range(iter):
        hidden.to('cuda'), encoder.to('cuda'), timestep.to('cuda') = input()
        with torch.no_grad():
            output = model(hidden, encoder, timestep)
        inference_memory += torch.cuda.max_memory_allocated()
    inference_memory /= iter
    print("Inference Memory: ", inference_memory / 1e6)


test_quant_inference(test_quantization(quant_config_fp8, checkpoint, model_cls), input)
# test_quant_inference(test_quantization(quant_config_int4, checkpoint, model_cls), input)
# test_quant_inference(test_quantization(quant_config_nvfp4, checkpoint, model_cls), input)
# test_quant_inference(model_cls.from_pretrained(checkpoint, subfolder="transformer", torch_dtype=torch.bfloat16).to('cuda'), input)

In [None]:
import torch
import math
from typing import Union, Tuple
import torchvision.transforms.functional as TF
import numpy as np

def get_resizing_factor(
    target_shape: Tuple[int, int],  # H, W
    current_shape: Tuple[int, int],  # H, W
    cover_target: bool = True,
    # If True, the output shape will fully cover the target shape.
    # If No, the target shape will fully cover the output shape.
) -> float:
    r_bound = target_shape[1] / target_shape[0]
    aspect_r = current_shape[1] / current_shape[0]
    if r_bound >= 1.0:
        if cover_target:
            if aspect_r >= r_bound:
                factor = min(target_shape) / min(current_shape)
            elif aspect_r < 1.0:
                factor = max(target_shape) / min(current_shape)
            else:
                factor = max(target_shape) / max(current_shape)
        else:
            if aspect_r >= r_bound:
                factor = max(target_shape) / max(current_shape)
            elif aspect_r < 1.0:
                factor = min(target_shape) / max(current_shape)
            else:
                factor = min(target_shape) / min(current_shape)
    else:
        if cover_target:
            if aspect_r <= r_bound:
                factor = min(target_shape) / min(current_shape)
            elif aspect_r > 1.0:
                factor = max(target_shape) / min(current_shape)
            else:
                factor = max(target_shape) / max(current_shape)
        else:
            if aspect_r <= r_bound:
                factor = max(target_shape) / max(current_shape)
            elif aspect_r > 1.0:
                factor = min(target_shape) / max(current_shape)
            else:
                factor = min(target_shape) / min(current_shape)
    return factor
def get_wh_with_fixed_shortest_side(w, h, size):
    # size is smaller or equal to zero, we return original w h
    if size is None or size <= 0:
        return w, h
    if w < h:
        new_w = size
        new_h = int(size * h / w)
    else:
        new_h = size
        new_w = int(size * w / h)
    return new_w, new_h

def transform_img_and_K(
    image: torch.Tensor,
    size: Union[int, Tuple[int, int]],
    scale: float = 1.0,
    center: Tuple[float, float] = (0.5, 0.5),
    K: torch.Tensor | None = None,
    size_stride: int = 1,
    mode: str = "crop",
):
    assert mode in [
        "crop",
        "pad",
        "stretch",
    ], f"mode should be one of ['crop', 'pad', 'stretch'], got {mode}"

    h, w = image.shape[-2:]
    if isinstance(size, (tuple, list)):
        # => if size is a tuple or list, we first rescale to fully cover the `size`
        # area and then crop the `size` area from the rescale image
        W, H = size
    else:
        # => if size is int, we rescale the image to fit the shortest side to size
        # => if size is None, no rescaling is applied
        W, H = get_wh_with_fixed_shortest_side(w, h, size)
    W, H = (
        math.floor(W / size_stride + 0.5) * size_stride,
        math.floor(H / size_stride + 0.5) * size_stride,
    )

    if mode == "stretch":
        rh, rw = H, W
    else:
        rfs = get_resizing_factor(
            (H, W),
            (h, w),
            cover_target=mode != "pad",
        )
        (rh, rw) = [int(np.ceil(rfs * s)) for s in (h, w)]

    rh, rw = int(rh / scale), int(rw / scale)
    image = torch.nn.functional.interpolate(
        image, (rh, rw), mode="area", antialias=False
    )

    cy_center = int(center[1] * image.shape[-2])
    cx_center = int(center[0] * image.shape[-1])
    if mode != "pad":
        ct = max(0, cy_center - H // 2)
        cl = max(0, cx_center - W // 2)
        ct = min(ct, image.shape[-2] - H)
        cl = min(cl, image.shape[-1] - W)
        image = TF.crop(image, top=ct, left=cl, height=H, width=W)
        pl, pt = 0, 0
    else:
        pt = max(0, H // 2 - cy_center)
        pl = max(0, W // 2 - cx_center)
        pb = max(0, H - pt - image.shape[-2])
        pr = max(0, W - pl - image.shape[-1])
        image = TF.pad(
            image,
            [pl, pt, pr, pb],
        )
        cl, ct = 0, 0

    if K is not None:
        K = K.clone()
        # K[:, :2, 2] += K.new_tensor([pl, pt])
        if torch.all(K[:, :2, -1] >= 0) and torch.all(K[:, :2, -1] <= 1):
            K[:, :2] *= K.new_tensor([rw, rh])[None, :, None]  # normalized K
        else:
            K[:, :2] *= K.new_tensor([rw / w, rh / h])[None, :, None]  # unnormalized K
        K[:, :2, 2] += K.new_tensor([pl - cl, pt - ct])

    return image, K

In [None]:
from diffusers.image_processor import VaeImageProcessor, SevaImageProcessor
from PIL import Image
import torch
import imageio.v3 as iio

path = "/home/ishan/stable-virtual-camera/data/vgg-lab-4_0.png"

img = Image.open(path)
height, width = (576, 576)
image_processor = VaeImageProcessor(
    vae_scale_factor=64,
    do_normalize=False,
)
image_processor1 = SevaImageProcessor(
    vae_scale_factor=64,
    do_normalize=False,
)
img1 = image_processor.preprocess(img, resize_mode="fill", height=height, width=width)

img = [torch.as_tensor(
    iio.imread(path) / 255.0, dtype=torch.float32
)]
img = torch.stack(img, dim=0)
img2 = transform_img_and_K(
    img.permute(0, 3, 1, 2),
    (576, 576),
    K=None,
    size_stride=64,
    mode="pad"
)[0]

print(img1.shape, img2.shape)

print(torch.allclose(img1, img2))

In [None]:
from diffusers.pipelines.seva.geometry import get_default_intrinsics

input_Ks = get_default_intrinsics(
    aspect_ratio=0.74,
    focal_length=1.0
)


In [1]:
from diffusers.pipelines.seva.pipeline_seva import test

t = torch.randn((1,3,1024,1024))
test(t)

  from .autonotebook import tqdm as notebook_tqdm


KeyboardInterrupt: 