In [1]:
!pip install diffusers transformers accelerate scipy ftfy

Collecting ftfy
  Downloading ftfy-6.3.0-py3-none-any.whl.metadata (7.1 kB)
Downloading ftfy-6.3.0-py3-none-any.whl (44 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.8/44.8 kB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: ftfy
Successfully installed ftfy-6.3.0


In [31]:
import torch
from diffusers import StableDiffusionImg2ImgPipeline
from PIL import Image
import numpy as np
import imageio
import time
import torch.nn.utils.prune as prune
import torch.nn as nn
from copy import deepcopy

In [20]:
model_id="runwayml/stable-diffusion-v1-5"
pipeline = StableDiffusionImg2ImgPipeline.from_pretrained(model_id, torch_dtype=torch.float16).to("cuda")

Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]



In [32]:
class diffusion_pruner():
  def __init__(self, model, sparsity=0.5):
    self.model = model
    self.sparsity = sparsity
    self.pruned_model = None

  def get_module_input_size(self, module):
    """Determine appropriate input size for different module types"""
    if isinstance(module, nn.Conv2d):
        # Get the module's input channels
        in_channels = module.in_channels
        # Use a reasonable default size for spatial dimensions
        return (1, in_channels, 64, 64)
    elif isinstance(module, nn.Linear):
        return (1, module.in_features)
    else:
        raise ValueError(f"Unsupported module type: {type(module)}")

  def analyse_sensitivity(self, module, num_samples=1000):
    """Analyze layer sensitivity using gradient magnitudes"""
    if not isinstance(module, (nn.Conv2d, nn.Linear)):
        return 0.0

    original_state = module.training
    module.eval()

    # Register hooks to collect gradients
    gradients = []
    def gradient_hook(grad):
        gradients.append(grad.detach().abs().mean().item())

    handles = []
    for name, param in module.named_parameters():
        if 'weight' in name:
            handle = param.register_hook(gradient_hook)
            handles.append(handle)

    # Generate random samples and compute gradients
    try:
        with torch.enable_grad():
            # Get appropriate input size for the module
            input_shape = self.get_module_input_size(module)
            noise = torch.randn(*input_shape).cuda()
            output = module(noise)
            loss = output.abs().mean()
            loss.backward()
    except Exception as e:
        print(f"Warning: Error during sensitivity analysis: {e}")
        return 0.0
    finally:
        # Remove hooks
        for handle in handles:
            handle.remove()

        # Restore original training state
        module.train(original_state)

        # Clear any remaining gradients
        if hasattr(module, 'zero_grad'):
            module.zero_grad()

    return sum(gradients) / len(gradients) if gradients else 0.0

  def adaptive_pruning(self):
    """Prune the model with different sparsity levels based on layer sensitivity"""
    self.pruned_model = deepcopy(self.model)
    for name, module in self.pruned_model.named_modules():
        if isinstance(module, (nn.Conv2d, nn.Linear)):
            # Skip attention layers and final output layers
            if 'attn' in name or 'output' in name:
                continue

            # Analyze layer sensitivity
            sensitivity = self.analyse_sensitivity(module)
            # sensitivity = 0.5
            # Adjust sparsity based on sensitivity
            layer_sparsity = self.sparsity * (1 - sensitivity)

            # Apply structured pruning
            print(name)
            prune.l1_unstructured(
                module,
                name='weight',
                amount=layer_sparsity
                # amount=0.3
            )

            # Make pruning permanent
            prune.remove(module, 'weight')


In [33]:
pruner = diffusion_pruner(pipeline.unet)

In [34]:
new_unet = pruner.adaptive_pruning()

conv_in
time_embedding.linear_1
time_embedding.linear_2
down_blocks.0.attentions.0.proj_in
down_blocks.0.attentions.0.transformer_blocks.0.ff.net.0.proj
down_blocks.0.attentions.0.transformer_blocks.0.ff.net.2
down_blocks.0.attentions.0.proj_out
down_blocks.0.attentions.1.proj_in
down_blocks.0.attentions.1.transformer_blocks.0.ff.net.0.proj
down_blocks.0.attentions.1.transformer_blocks.0.ff.net.2
down_blocks.0.attentions.1.proj_out
down_blocks.0.resnets.0.conv1
down_blocks.0.resnets.0.time_emb_proj
down_blocks.0.resnets.0.conv2
down_blocks.0.resnets.1.conv1
down_blocks.0.resnets.1.time_emb_proj
down_blocks.0.resnets.1.conv2
down_blocks.0.downsamplers.0.conv
down_blocks.1.attentions.0.proj_in
down_blocks.1.attentions.0.transformer_blocks.0.ff.net.0.proj
down_blocks.1.attentions.0.transformer_blocks.0.ff.net.2
down_blocks.1.attentions.0.proj_out
down_blocks.1.attentions.1.proj_in
down_blocks.1.attentions.1.transformer_blocks.0.ff.net.0.proj
down_blocks.1.attentions.1.transformer_blocks.0

In [37]:
# pruned_model = pruner.pruned_model
type(pruner.pruned_model)

In [38]:
pipeline.unet = pruner.pruned_model

In [39]:
def optimize_attention(pipe):
    """Enable memory efficient attention"""
    # pipe.enable_attention_slicing(slice_size="auto")
    pipe.enable_vae_slicing()
    return pipe

In [40]:
from torch.nn.utils import prune
def prune_model(model, amount=0.5):
  for name, module in model.named_modules():
    if isinstance(module, torch.nn.Linear):
      prune.l1_unstructured(module, name="weight", amount=amount)
  return model

In [41]:
# @torch.inference_mode()  # Faster than no_grad
def generate_single_frame(pipe, prompt, image, strength, generator=None):
    """Generate a single frame with optimized settings"""
    # print(autoc)
    with torch.autocast("cuda") if autoc else torch.no_grad():  # Enable automatic mixed precision
        result = pipe(
            prompt=prompt,
            image=image,
            strength=strength,
            guidance_scale=6,
            num_inference_steps=20,  # Reduced from default 50
            generator=generator
        ).images[0]
    return result

In [42]:
def optimize_memory(pipe):
    """Apply various memory optimizations"""
    if torch.cuda.is_available():
        # Enable tf32 for better performance on Ampere GPUs
        torch.backends.cuda.matmul.allow_tf32 = True
        torch.backends.cudnn.allow_tf32 = True

        # Set torch backends for faster processing
        torch.backends.cudnn.benchmark = True
        torch.backends.cudnn.deterministic = False
    return pipe

In [43]:
def enable_model_cpu_offload(pipe):
    """Enable CPU offloading for memory optimization"""
    pipe.enable_model_cpu_offload()
    return pipe

In [47]:
autoc = torch.amp.autocast_mode.is_autocast_available("cuda")

def load_image(image_path):
    return Image.open(image_path).convert("RGB")

def generate_frames(initial_image, prompts, pipe, num_frames=5, strength=0.80):
    # Load the img2img pipeline
    # pipe = pipe.to("cuda")
    # pruned = prune_model(pipe.unet)
    # pipe.unet = pruned
    pipe = enable_model_cpu_offload(pipe)
    pipe = optimize_attention(pipe)
    pipe = optimize_memory(pipe)
    # pruned = prune.ln_structured(pipe.unet, name="weight", amount=0.5, n=2, dim=0)
    # pipe.unet = pruned

    generator = torch.Generator("cuda").manual_seed(42)
    frames = []

    current_image = initial_image
    start_time = time.time()

    for i in range(num_frames):
        # Generate a slightly different prompt for each frame
        prompt_index = min(i * len(prompts) // num_frames, len(prompts) - 1)
        frame_prompt = prompts[prompt_index]

        # Generate the next image
        # result = pipe(prompt=frame_prompt, image=current_image, strength=strength, guidance_scale=7.5)
        generated_image = generate_single_frame(pipe, frame_prompt, current_image, strength, generator=generator)
        # Get the generated image
        # generated_image = result.images[0]

        # Convert to numpy array and append to frames
        # frame = np.array(generated_image)
        # frames.append(frame)
        frames.append(generated_image)

        # Use this generated image as the input for the next iteration
        current_image = generated_image
    end_time = time.time()
    time_taken = end_time - start_time
    print(time_taken)
    return frames

def create_gif(frames, output_path, fps=10):
    # imageio.mimsave(output_path, frames, fps=fps)
    frames[0].save("animation.gif", save_all=True, append_images=frames[1:], duration=150, loop=0)

def main(initial_image_path, prompt, output_path, num_frames=10, strength=0.75):
    # Load the initial image
    initial_image = load_image(initial_image_path)

    # Generate frames using the diffusion model
    frames = generate_frames(initial_image, prompt, pipeline, num_frames, strength)

    # Create and save the GIF
    create_gif(frames, output_path)

if __name__ == "__main__":
    initial_image_path = "chester.jpg"
    prompt = ["A man singing with a crowd in the background",
              "The man turns to the crowd",
              "A stage shows up and reveals a concert happening."]
    output_path = "output.gif"
    num_frames = 5
    strength = 0.80  # Adjust this value to control how much the model changes the image each frame

    main(initial_image_path, prompt, output_path, num_frames, strength)
    print(f"GIF generated and saved to {output_path}")

  0%|          | 0/16 [00:00<?, ?it/s]

  0%|          | 0/16 [00:00<?, ?it/s]

  0%|          | 0/16 [00:00<?, ?it/s]

  0%|          | 0/16 [00:00<?, ?it/s]

  0%|          | 0/16 [00:00<?, ?it/s]

671.4524924755096
GIF generated and saved to output.gif
