diff --git a/docs/source/en/_toctree.yml b/docs/source/en/_toctree.yml index ac3e02a27f74..dccd2d2aeef3 100644 --- a/docs/source/en/_toctree.yml +++ b/docs/source/en/_toctree.yml @@ -265,6 +265,8 @@ title: LDM3D Text-to-(RGB, Depth) - local: api/pipelines/stable_diffusion/adapter title: Stable Diffusion T2I-adapter + - local: api/pipelines/stable_diffusion/gligen + title: GLIGEN (Grounded Language-to-Image Generation) title: Stable Diffusion - local: api/pipelines/stable_unclip title: Stable unCLIP diff --git a/docs/source/en/api/pipelines/stable_diffusion/gligen.md b/docs/source/en/api/pipelines/stable_diffusion/gligen.md new file mode 100644 index 000000000000..bf9199cd64c6 --- /dev/null +++ b/docs/source/en/api/pipelines/stable_diffusion/gligen.md @@ -0,0 +1,46 @@ + + +# GLIGEN (Grounded Language-to-Image Generation) + +The GLIGEN model was created by researchers and engineers from [University of Wisconsin-Madison, Columbia University, and Microsoft](https://github.com/gligen/GLIGEN). The [`StableDiffusionGLIGENPipeline`] can generate photorealistic images conditioned on grounding inputs. Along with text and bounding boxes, if input images are given, this pipeline can insert objects described by text at the region defined by bounding boxes. Otherwise, it'll generate an image described by the caption/prompt and insert objects described by text at the region defined by bounding boxes. It's trained on COCO2014D and COCO2014CD datasets, and the model uses a frozen CLIP ViT-L/14 text encoder to condition itself on grounding inputs. + +The abstract from the [paper](https://huggingface.co/papers/2301.07093) is: + +*Large-scale text-to-image diffusion models have made amazing advances. However, the status quo is to use text input alone, which can impede controllability. In this work, we propose GLIGEN, Grounded-Language-to-Image Generation, a novel approach that builds upon and extends the functionality of existing pre-trained text-to-image diffusion models by enabling them to also be conditioned on grounding inputs. To preserve the vast concept knowledge of the pre-trained model, we freeze all of its weights and inject the grounding information into new trainable layers via a gated mechanism. Our model achieves open-world grounded text2img generation with caption and bounding box condition inputs, and the grounding ability generalizes well to novel spatial configurations and concepts. GLIGEN’s zeroshot performance on COCO and LVIS outperforms existing supervised layout-to-image baselines by a large margin.* + + + +Make sure to check out the Stable Diffusion [Tips](https://huggingface.co/docs/diffusers/en/api/pipelines/stable_diffusion/overview#tips) section to learn how to explore the tradeoff between scheduler speed and quality and how to reuse pipeline components efficiently! + +If you want to use one of the official checkpoints for a task, explore the [gligen](https://huggingface.co/gligen) Hub organizations! + + + +This pipeline was contributed by [Nikhil Gajendrakumar](https://github.com/nikhil-masterful). + +## StableDiffusionGLIGENPipeline + +[[autodoc]] StableDiffusionGLIGENPipeline + - all + - __call__ + - enable_vae_slicing + - disable_vae_slicing + - enable_vae_tiling + - disable_vae_tiling + - enable_model_cpu_offload + - prepare_latents + - enable_fuser + +## StableDiffusionPipelineOutput + +[[autodoc]] pipelines.stable_diffusion.StableDiffusionPipelineOutput diff --git a/src/diffusers/__init__.py b/src/diffusers/__init__.py index 9080fed4b81b..309edde7bc09 100644 --- a/src/diffusers/__init__.py +++ b/src/diffusers/__init__.py @@ -171,6 +171,7 @@ StableDiffusionControlNetPipeline, StableDiffusionDepth2ImgPipeline, StableDiffusionDiffEditPipeline, + StableDiffusionGLIGENPipeline, StableDiffusionImageVariationPipeline, StableDiffusionImg2ImgPipeline, StableDiffusionInpaintPipeline, diff --git a/src/diffusers/models/attention.py b/src/diffusers/models/attention.py index ad899212e5a5..b017db158eda 100644 --- a/src/diffusers/models/attention.py +++ b/src/diffusers/models/attention.py @@ -24,6 +24,38 @@ from .lora import LoRACompatibleLinear +@maybe_allow_in_graph +class GatedSelfAttentionDense(nn.Module): + def __init__(self, query_dim, context_dim, n_heads, d_head): + super().__init__() + + # we need a linear projection since we need cat visual feature and obj feature + self.linear = nn.Linear(context_dim, query_dim) + + self.attn = Attention(query_dim=query_dim, heads=n_heads, dim_head=d_head) + self.ff = FeedForward(query_dim, activation_fn="geglu") + + self.norm1 = nn.LayerNorm(query_dim) + self.norm2 = nn.LayerNorm(query_dim) + + self.register_parameter("alpha_attn", nn.Parameter(torch.tensor(0.0))) + self.register_parameter("alpha_dense", nn.Parameter(torch.tensor(0.0))) + + self.enabled = True + + def forward(self, x, objs): + if not self.enabled: + return x + + n_visual = x.shape[1] + objs = self.linear(objs) + + x = x + self.alpha_attn.tanh() * self.attn(self.norm1(torch.cat([x, objs], dim=1)))[:, :n_visual, :] + x = x + self.alpha_dense.tanh() * self.ff(self.norm2(x)) + + return x + + @maybe_allow_in_graph class BasicTransformerBlock(nn.Module): r""" @@ -62,6 +94,7 @@ def __init__( norm_elementwise_affine: bool = True, norm_type: str = "layer_norm", final_dropout: bool = False, + attention_type: str = "default", ): super().__init__() self.only_cross_attention = only_cross_attention @@ -120,6 +153,10 @@ def __init__( self.norm3 = nn.LayerNorm(dim, elementwise_affine=norm_elementwise_affine) self.ff = FeedForward(dim, dropout=dropout, activation_fn=activation_fn, final_dropout=final_dropout) + # 4. Fuser + if attention_type == "gated": + self.fuser = GatedSelfAttentionDense(dim, cross_attention_dim, num_attention_heads, attention_head_dim) + # let chunk size default to None self._chunk_size = None self._chunk_dim = 0 @@ -150,7 +187,9 @@ def forward( else: norm_hidden_states = self.norm1(hidden_states) - cross_attention_kwargs = cross_attention_kwargs if cross_attention_kwargs is not None else {} + # 0. Prepare GLIGEN inputs + cross_attention_kwargs = cross_attention_kwargs.copy() if cross_attention_kwargs is not None else {} + gligen_kwargs = cross_attention_kwargs.pop("gligen", None) attn_output = self.attn1( norm_hidden_states, @@ -162,6 +201,11 @@ def forward( attn_output = gate_msa.unsqueeze(1) * attn_output hidden_states = attn_output + hidden_states + # 1.5 GLIGEN Control + if gligen_kwargs is not None: + hidden_states = self.fuser(hidden_states, gligen_kwargs["objs"]) + # 1.5 ends + # 2. Cross-Attention if self.attn2 is not None: norm_hidden_states = ( diff --git a/src/diffusers/models/embeddings.py b/src/diffusers/models/embeddings.py index a5a0c5549ee9..8154cffa9883 100644 --- a/src/diffusers/models/embeddings.py +++ b/src/diffusers/models/embeddings.py @@ -544,3 +544,59 @@ def shape(x): a = a.reshape(bs, -1, 1).transpose(1, 2) return a[:, 0, :] # cls_token + + +class FourierEmbedder(nn.Module): + def __init__(self, num_freqs=64, temperature=100): + super().__init__() + + self.num_freqs = num_freqs + self.temperature = temperature + + freq_bands = temperature ** (torch.arange(num_freqs) / num_freqs) + freq_bands = freq_bands[None, None, None] + self.register_buffer("freq_bands", freq_bands, persistent=False) + + def __call__(self, x): + x = self.freq_bands * x.unsqueeze(-1) + return torch.stack((x.sin(), x.cos()), dim=-1).permute(0, 1, 3, 4, 2).reshape(*x.shape[:2], -1) + + +class PositionNet(nn.Module): + def __init__(self, positive_len, out_dim, fourier_freqs=8): + super().__init__() + self.positive_len = positive_len + self.out_dim = out_dim + + self.fourier_embedder = FourierEmbedder(num_freqs=fourier_freqs) + self.position_dim = fourier_freqs * 2 * 4 # 2: sin/cos, 4: xyxy + + if isinstance(out_dim, tuple): + out_dim = out_dim[0] + self.linears = nn.Sequential( + nn.Linear(self.positive_len + self.position_dim, 512), + nn.SiLU(), + nn.Linear(512, 512), + nn.SiLU(), + nn.Linear(512, out_dim), + ) + + self.null_positive_feature = torch.nn.Parameter(torch.zeros([self.positive_len])) + self.null_position_feature = torch.nn.Parameter(torch.zeros([self.position_dim])) + + def forward(self, boxes, masks, positive_embeddings): + masks = masks.unsqueeze(-1) + + # embedding position (it may includes padding as placeholder) + xyxy_embedding = self.fourier_embedder(boxes) # B*N*4 -> B*N*C + + # learnable null embedding + positive_null = self.null_positive_feature.view(1, 1, -1) + xyxy_null = self.null_position_feature.view(1, 1, -1) + + # replace padding with learnable null embedding + positive_embeddings = positive_embeddings * masks + (1 - masks) * positive_null + xyxy_embedding = xyxy_embedding * masks + (1 - masks) * xyxy_null + + objs = self.linears(torch.cat([positive_embeddings, xyxy_embedding], dim=-1)) + return objs diff --git a/src/diffusers/models/transformer_2d.py b/src/diffusers/models/transformer_2d.py index 344a9441ced1..aa516389be2e 100644 --- a/src/diffusers/models/transformer_2d.py +++ b/src/diffusers/models/transformer_2d.py @@ -91,6 +91,7 @@ def __init__( upcast_attention: bool = False, norm_type: str = "layer_norm", norm_elementwise_affine: bool = True, + attention_type: str = "default", ): super().__init__() self.use_linear_projection = use_linear_projection @@ -183,6 +184,7 @@ def __init__( upcast_attention=upcast_attention, norm_type=norm_type, norm_elementwise_affine=norm_elementwise_affine, + attention_type=attention_type, ) for d in range(num_layers) ] diff --git a/src/diffusers/models/unet_2d_blocks.py b/src/diffusers/models/unet_2d_blocks.py index e894628462ef..05b360def7c2 100644 --- a/src/diffusers/models/unet_2d_blocks.py +++ b/src/diffusers/models/unet_2d_blocks.py @@ -49,6 +49,7 @@ def get_down_block( only_cross_attention=False, upcast_attention=False, resnet_time_scale_shift="default", + attention_type="default", resnet_skip_time_act=False, resnet_out_scale_factor=1.0, cross_attention_norm=None, @@ -129,6 +130,7 @@ def get_down_block( only_cross_attention=only_cross_attention, upcast_attention=upcast_attention, resnet_time_scale_shift=resnet_time_scale_shift, + attention_type=attention_type, ) elif down_block_type == "SimpleCrossAttnDownBlock2D": if cross_attention_dim is None: @@ -244,6 +246,7 @@ def get_up_block( only_cross_attention=False, upcast_attention=False, resnet_time_scale_shift="default", + attention_type="default", resnet_skip_time_act=False, resnet_out_scale_factor=1.0, cross_attention_norm=None, @@ -307,6 +310,7 @@ def get_up_block( only_cross_attention=only_cross_attention, upcast_attention=upcast_attention, resnet_time_scale_shift=resnet_time_scale_shift, + attention_type=attention_type, ) elif up_block_type == "SimpleCrossAttnUpBlock2D": if cross_attention_dim is None: @@ -556,6 +560,7 @@ def __init__( dual_cross_attention=False, use_linear_projection=False, upcast_attention=False, + attention_type="default", ): super().__init__() @@ -592,6 +597,7 @@ def __init__( norm_num_groups=resnet_groups, use_linear_projection=use_linear_projection, upcast_attention=upcast_attention, + attention_type=attention_type, ) ) else: @@ -934,6 +940,7 @@ def __init__( use_linear_projection=False, only_cross_attention=False, upcast_attention=False, + attention_type="default", ): super().__init__() resnets = [] @@ -970,6 +977,7 @@ def __init__( use_linear_projection=use_linear_projection, only_cross_attention=only_cross_attention, upcast_attention=upcast_attention, + attention_type=attention_type, ) ) else: @@ -2068,6 +2076,7 @@ def __init__( use_linear_projection=False, only_cross_attention=False, upcast_attention=False, + attention_type="default", ): super().__init__() resnets = [] @@ -2106,6 +2115,7 @@ def __init__( use_linear_projection=use_linear_projection, only_cross_attention=only_cross_attention, upcast_attention=upcast_attention, + attention_type=attention_type, ) ) else: diff --git a/src/diffusers/models/unet_2d_condition.py b/src/diffusers/models/unet_2d_condition.py index fea1b4cd7823..3203537110cc 100644 --- a/src/diffusers/models/unet_2d_condition.py +++ b/src/diffusers/models/unet_2d_condition.py @@ -28,6 +28,7 @@ ImageHintTimeEmbedding, ImageProjection, ImageTimeEmbedding, + PositionNet, TextImageProjection, TextImageTimeEmbedding, TextTimeEmbedding, @@ -198,6 +199,7 @@ def __init__( conv_in_kernel: int = 3, conv_out_kernel: int = 3, projection_class_embeddings_input_dim: Optional[int] = None, + attention_type: str = "default", class_embeddings_concat: bool = False, mid_block_only_cross_attention: Optional[bool] = None, cross_attention_norm: Optional[str] = None, @@ -446,6 +448,7 @@ def __init__( only_cross_attention=only_cross_attention[i], upcast_attention=upcast_attention, resnet_time_scale_shift=resnet_time_scale_shift, + attention_type=attention_type, resnet_skip_time_act=resnet_skip_time_act, resnet_out_scale_factor=resnet_out_scale_factor, cross_attention_norm=cross_attention_norm, @@ -469,6 +472,7 @@ def __init__( dual_cross_attention=dual_cross_attention, use_linear_projection=use_linear_projection, upcast_attention=upcast_attention, + attention_type=attention_type, ) elif mid_block_type == "UNetMidBlock2DSimpleCrossAttn": self.mid_block = UNetMidBlock2DSimpleCrossAttn( @@ -535,6 +539,7 @@ def __init__( only_cross_attention=only_cross_attention[i], upcast_attention=upcast_attention, resnet_time_scale_shift=resnet_time_scale_shift, + attention_type=attention_type, resnet_skip_time_act=resnet_skip_time_act, resnet_out_scale_factor=resnet_out_scale_factor, cross_attention_norm=cross_attention_norm, @@ -560,6 +565,14 @@ def __init__( block_out_channels[0], out_channels, kernel_size=conv_out_kernel, padding=conv_out_padding ) + if attention_type == "gated": + positive_len = 768 + if isinstance(cross_attention_dim, int): + positive_len = cross_attention_dim + elif isinstance(cross_attention_dim, tuple) or isinstance(cross_attention_dim, list): + positive_len = cross_attention_dim[0] + self.position_net = PositionNet(positive_len=positive_len, out_dim=cross_attention_dim) + @property def attn_processors(self) -> Dict[str, AttentionProcessor]: r""" @@ -895,6 +908,12 @@ def forward( # 2. pre-process sample = self.conv_in(sample) + # 2.5 GLIGEN position net + if cross_attention_kwargs is not None and cross_attention_kwargs.get("gligen", None) is not None: + cross_attention_kwargs = cross_attention_kwargs.copy() + gligen_args = cross_attention_kwargs.pop("gligen") + cross_attention_kwargs["gligen"] = {"objs": self.position_net(**gligen_args)} + # 3. down is_controlnet = mid_block_additional_residual is not None and down_block_additional_residuals is not None diff --git a/src/diffusers/pipelines/__init__.py b/src/diffusers/pipelines/__init__.py index 2e8cee9ce697..16153951126a 100644 --- a/src/diffusers/pipelines/__init__.py +++ b/src/diffusers/pipelines/__init__.py @@ -90,6 +90,7 @@ StableDiffusionAttendAndExcitePipeline, StableDiffusionDepth2ImgPipeline, StableDiffusionDiffEditPipeline, + StableDiffusionGLIGENPipeline, StableDiffusionImageVariationPipeline, StableDiffusionImg2ImgPipeline, StableDiffusionInpaintPipeline, diff --git a/src/diffusers/pipelines/stable_diffusion/__init__.py b/src/diffusers/pipelines/stable_diffusion/__init__.py index 1fddb712e6a9..1cef019e06a9 100644 --- a/src/diffusers/pipelines/stable_diffusion/__init__.py +++ b/src/diffusers/pipelines/stable_diffusion/__init__.py @@ -45,6 +45,7 @@ class StableDiffusionPipelineOutput(BaseOutput): from .pipeline_cycle_diffusion import CycleDiffusionPipeline from .pipeline_stable_diffusion import StableDiffusionPipeline from .pipeline_stable_diffusion_attend_and_excite import StableDiffusionAttendAndExcitePipeline + from .pipeline_stable_diffusion_gligen import StableDiffusionGLIGENPipeline from .pipeline_stable_diffusion_img2img import StableDiffusionImg2ImgPipeline from .pipeline_stable_diffusion_inpaint import StableDiffusionInpaintPipeline from .pipeline_stable_diffusion_inpaint_legacy import StableDiffusionInpaintPipelineLegacy diff --git a/src/diffusers/pipelines/stable_diffusion/pipeline_stable_diffusion_gligen.py b/src/diffusers/pipelines/stable_diffusion/pipeline_stable_diffusion_gligen.py new file mode 100644 index 000000000000..46c6d14c3e65 --- /dev/null +++ b/src/diffusers/pipelines/stable_diffusion/pipeline_stable_diffusion_gligen.py @@ -0,0 +1,832 @@ +# Copyright 2023 The GLIGEN Authors and HuggingFace Team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import inspect +import warnings +from typing import Any, Callable, Dict, List, Optional, Union + +import PIL +import torch +from transformers import CLIPFeatureExtractor, CLIPTextModel, CLIPTokenizer + +from ...image_processor import VaeImageProcessor +from ...loaders import LoraLoaderMixin, TextualInversionLoaderMixin +from ...models import AutoencoderKL, UNet2DConditionModel +from ...models.attention import GatedSelfAttentionDense +from ...schedulers import KarrasDiffusionSchedulers +from ...utils import ( + is_accelerate_available, + is_accelerate_version, + logging, + randn_tensor, + replace_example_docstring, +) +from ..pipeline_utils import DiffusionPipeline +from . import StableDiffusionPipelineOutput +from .safety_checker import StableDiffusionSafetyChecker + + +logger = logging.get_logger(__name__) # pylint: disable=invalid-name + +EXAMPLE_DOC_STRING = """ + Examples: + ```py + >>> import torch + >>> from diffusers import StableDiffusionGLIGENPipeline + >>> from diffusers.utils import load_image + + >>> # Insert objects described by text at the region defined by bounding boxes + >>> pipe = StableDiffusionGLIGENPipeline.from_pretrained( + ... "masterful/gligen-1-4-inpainting-text-box", variant="fp16", torch_dtype=torch.float16 + ... ) + >>> pipe = pipe.to("cuda") + + >>> input_image = load_image( + ... "https://hf.co/datasets/huggingface/documentation-images/resolve/main/diffusers/gligen/livingroom_modern.png" + ... ) + >>> prompt = "a birthday cake" + >>> boxes = [[0.2676, 0.6088, 0.4773, 0.7183]] + >>> phrases = ["a birthday cake"] + + >>> images = pipe( + ... prompt=prompt, + ... gligen_phrases=phrases, + ... gligen_inpaint_image=input_image, + ... gligen_boxes=boxes, + ... gligen_scheduled_sampling_beta=1, + ... output_type="pil", + ... num_inference_steps=50, + ... ).images + + >>> images[0].save("./gligen-1-4-inpainting-text-box.jpg") + + >>> # Generate an image described by the prompt and + >>> # insert objects described by text at the region defined by bounding boxes + >>> pipe = StableDiffusionGLIGENPipeline.from_pretrained( + ... "masterful/gligen-1-4-generation-text-box", variant="fp16", torch_dtype=torch.float16 + ... ) + >>> pipe = pipe.to("cuda") + + >>> prompt = "a waterfall and a modern high speed train running through the tunnel in a beautiful forest with fall foliage" + >>> boxes = [[0.1387, 0.2051, 0.4277, 0.7090], [0.4980, 0.4355, 0.8516, 0.7266]] + >>> phrases = ["a waterfall", "a modern high speed train running through the tunnel"] + + >>> images = pipe( + ... prompt=prompt, + ... gligen_phrases=phrases, + ... gligen_boxes=boxes, + ... gligen_scheduled_sampling_beta=1, + ... output_type="pil", + ... num_inference_steps=50, + ... ).images + + >>> images[0].save("./gligen-1-4-generation-text-box.jpg") + ``` +""" + + +class StableDiffusionGLIGENPipeline(DiffusionPipeline): + r""" + Pipeline for text-to-image generation using Stable Diffusion with Grounded-Language-to-Image Generation (GLIGEN). + + This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the + library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.). + + Args: + vae ([`AutoencoderKL`]): + Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations. + text_encoder ([`~transformers.CLIPTextModel`]): + Frozen text-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)). + tokenizer ([`~transformers.CLIPTokenizer`]): + A `CLIPTokenizer` to tokenize text. + unet ([`UNet2DConditionModel`]): + A `UNet2DConditionModel` to denoise the encoded image latents. + scheduler ([`SchedulerMixin`]): + A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of + [`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`]. + safety_checker ([`StableDiffusionSafetyChecker`]): + Classification module that estimates whether generated images could be considered offensive or harmful. + Please refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for more details + about a model's potential harms. + feature_extractor ([`~transformers.CLIPImageProcessor`]): + A `CLIPImageProcessor` to extract features from generated images; used as inputs to the `safety_checker`. + """ + _optional_components = ["safety_checker", "feature_extractor"] + + def __init__( + self, + vae: AutoencoderKL, + text_encoder: CLIPTextModel, + tokenizer: CLIPTokenizer, + unet: UNet2DConditionModel, + scheduler: KarrasDiffusionSchedulers, + safety_checker: StableDiffusionSafetyChecker, + feature_extractor: CLIPFeatureExtractor, + requires_safety_checker: bool = True, + ): + super().__init__() + + if safety_checker is None and requires_safety_checker: + logger.warning( + f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure" + " that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered" + " results in services or applications open to the public. Both the diffusers team and Hugging Face" + " strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling" + " it only for use-cases that involve analyzing network behavior or auditing its results. For more" + " information, please have a look at https://github.com/huggingface/diffusers/pull/254 ." + ) + + if safety_checker is not None and feature_extractor is None: + raise ValueError( + "Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety" + " checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead." + ) + + self.register_modules( + vae=vae, + text_encoder=text_encoder, + tokenizer=tokenizer, + unet=unet, + scheduler=scheduler, + safety_checker=safety_checker, + feature_extractor=feature_extractor, + ) + self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1) + self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor, do_convert_rgb=True) + self.register_to_config(requires_safety_checker=requires_safety_checker) + + def enable_vae_slicing(self): + r""" + Enable sliced VAE decoding. When this option is enabled, the VAE will split the input tensor in slices to + compute decoding in several steps. This is useful to save some memory and allow larger batch sizes. + """ + self.vae.enable_slicing() + + def disable_vae_slicing(self): + r""" + Disable sliced VAE decoding. If `enable_vae_slicing` was previously enabled, this method will go back to + computing decoding in one step. + """ + self.vae.disable_slicing() + + def enable_vae_tiling(self): + r""" + Enable tiled VAE decoding. When this option is enabled, the VAE will split the input tensor into tiles to + compute decoding and encoding in several steps. This is useful for saving a large amount of memory and to allow + processing larger images. + """ + self.vae.enable_tiling() + + def disable_vae_tiling(self): + r""" + Disable tiled VAE decoding. If `enable_vae_tiling` was previously enabled, this method will go back to + computing decoding in one step. + """ + self.vae.disable_tiling() + + def enable_model_cpu_offload(self, gpu_id=0): + r""" + Offload all models to CPU to reduce memory usage with a low impact on performance. Moves one whole model at a + time to the GPU when its `forward` method is called, and the model remains in GPU until the next model runs. + Memory savings are lower than using `enable_sequential_cpu_offload`, but performance is much better due to the + iterative execution of the `unet`. + """ + if is_accelerate_available() and is_accelerate_version(">=", "0.17.0.dev0"): + from accelerate import cpu_offload_with_hook + else: + raise ImportError("`enable_model_offload` requires `accelerate v0.17.0` or higher.") + + device = torch.device(f"cuda:{gpu_id}") + + if self.device.type != "cpu": + self.to("cpu", silence_dtype_warnings=True) + torch.cuda.empty_cache() # otherwise we don't see the memory savings (but they probably exist) + + hook = None + for cpu_offloaded_model in [self.text_encoder, self.unet, self.vae]: + _, hook = cpu_offload_with_hook(cpu_offloaded_model, device, prev_module_hook=hook) + + if self.safety_checker is not None: + _, hook = cpu_offload_with_hook(self.safety_checker, device, prev_module_hook=hook) + + # We'll offload the last model manually. + self.final_offload_hook = hook + + # Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_prompt + def _encode_prompt( + self, + prompt, + device, + num_images_per_prompt, + do_classifier_free_guidance, + negative_prompt=None, + prompt_embeds: Optional[torch.FloatTensor] = None, + negative_prompt_embeds: Optional[torch.FloatTensor] = None, + lora_scale: Optional[float] = None, + ): + r""" + Encodes the prompt into text encoder hidden states. + + Args: + prompt (`str` or `List[str]`, *optional*): + prompt to be encoded + device: (`torch.device`): + torch device + num_images_per_prompt (`int`): + number of images that should be generated per prompt + do_classifier_free_guidance (`bool`): + whether to use classifier free guidance or not + negative_prompt (`str` or `List[str]`, *optional*): + The prompt or prompts not to guide the image generation. If not defined, one has to pass + `negative_prompt_embeds` instead. Ignored when not using guidance (i.e., ignored if `guidance_scale` is + less than `1`). + prompt_embeds (`torch.FloatTensor`, *optional*): + Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not + provided, text embeddings will be generated from `prompt` input argument. + negative_prompt_embeds (`torch.FloatTensor`, *optional*): + Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt + weighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` input + argument. + lora_scale (`float`, *optional*): + A lora scale that will be applied to all LoRA layers of the text encoder if LoRA layers are loaded. + """ + # set lora scale so that monkey patched LoRA + # function of text encoder can correctly access it + if lora_scale is not None and isinstance(self, LoraLoaderMixin): + self._lora_scale = lora_scale + + if prompt is not None and isinstance(prompt, str): + batch_size = 1 + elif prompt is not None and isinstance(prompt, list): + batch_size = len(prompt) + else: + batch_size = prompt_embeds.shape[0] + + if prompt_embeds is None: + # textual inversion: procecss multi-vector tokens if necessary + if isinstance(self, TextualInversionLoaderMixin): + prompt = self.maybe_convert_prompt(prompt, self.tokenizer) + + text_inputs = self.tokenizer( + prompt, + padding="max_length", + max_length=self.tokenizer.model_max_length, + truncation=True, + return_tensors="pt", + ) + text_input_ids = text_inputs.input_ids + untruncated_ids = self.tokenizer(prompt, padding="longest", return_tensors="pt").input_ids + + if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal( + text_input_ids, untruncated_ids + ): + removed_text = self.tokenizer.batch_decode( + untruncated_ids[:, self.tokenizer.model_max_length - 1 : -1] + ) + logger.warning( + "The following part of your input was truncated because CLIP can only handle sequences up to" + f" {self.tokenizer.model_max_length} tokens: {removed_text}" + ) + + if hasattr(self.text_encoder.config, "use_attention_mask") and self.text_encoder.config.use_attention_mask: + attention_mask = text_inputs.attention_mask.to(device) + else: + attention_mask = None + + prompt_embeds = self.text_encoder( + text_input_ids.to(device), + attention_mask=attention_mask, + ) + prompt_embeds = prompt_embeds[0] + + if self.text_encoder is not None: + prompt_embeds_dtype = self.text_encoder.dtype + elif self.unet is not None: + prompt_embeds_dtype = self.unet.dtype + else: + prompt_embeds_dtype = prompt_embeds.dtype + + prompt_embeds = prompt_embeds.to(dtype=prompt_embeds_dtype, device=device) + + bs_embed, seq_len, _ = prompt_embeds.shape + # duplicate text embeddings for each generation per prompt, using mps friendly method + prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1) + prompt_embeds = prompt_embeds.view(bs_embed * num_images_per_prompt, seq_len, -1) + + # get unconditional embeddings for classifier free guidance + if do_classifier_free_guidance and negative_prompt_embeds is None: + uncond_tokens: List[str] + if negative_prompt is None: + uncond_tokens = [""] * batch_size + elif prompt is not None and type(prompt) is not type(negative_prompt): + raise TypeError( + f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !=" + f" {type(prompt)}." + ) + elif isinstance(negative_prompt, str): + uncond_tokens = [negative_prompt] + elif batch_size != len(negative_prompt): + raise ValueError( + f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:" + f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches" + " the batch size of `prompt`." + ) + else: + uncond_tokens = negative_prompt + + # textual inversion: procecss multi-vector tokens if necessary + if isinstance(self, TextualInversionLoaderMixin): + uncond_tokens = self.maybe_convert_prompt(uncond_tokens, self.tokenizer) + + max_length = prompt_embeds.shape[1] + uncond_input = self.tokenizer( + uncond_tokens, + padding="max_length", + max_length=max_length, + truncation=True, + return_tensors="pt", + ) + + if hasattr(self.text_encoder.config, "use_attention_mask") and self.text_encoder.config.use_attention_mask: + attention_mask = uncond_input.attention_mask.to(device) + else: + attention_mask = None + + negative_prompt_embeds = self.text_encoder( + uncond_input.input_ids.to(device), + attention_mask=attention_mask, + ) + negative_prompt_embeds = negative_prompt_embeds[0] + + if do_classifier_free_guidance: + # duplicate unconditional embeddings for each generation per prompt, using mps friendly method + seq_len = negative_prompt_embeds.shape[1] + + negative_prompt_embeds = negative_prompt_embeds.to(dtype=prompt_embeds_dtype, device=device) + + negative_prompt_embeds = negative_prompt_embeds.repeat(1, num_images_per_prompt, 1) + negative_prompt_embeds = negative_prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1) + + # For classifier free guidance, we need to do two forward passes. + # Here we concatenate the unconditional and text embeddings into a single batch + # to avoid doing two forward passes + prompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds]) + + return prompt_embeds + + # Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker + def run_safety_checker(self, image, device, dtype): + if self.safety_checker is None: + has_nsfw_concept = None + else: + if torch.is_tensor(image): + feature_extractor_input = self.image_processor.postprocess(image, output_type="pil") + else: + feature_extractor_input = self.image_processor.numpy_to_pil(image) + safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device) + image, has_nsfw_concept = self.safety_checker( + images=image, clip_input=safety_checker_input.pixel_values.to(dtype) + ) + return image, has_nsfw_concept + + def prepare_extra_step_kwargs(self, generator, eta): + # prepare extra kwargs for the scheduler step, since not all schedulers have the same signature + # eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers. + # eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502 + # and should be between [0, 1] + + accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys()) + extra_step_kwargs = {} + if accepts_eta: + extra_step_kwargs["eta"] = eta + + # check if the scheduler accepts generator + accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys()) + if accepts_generator: + extra_step_kwargs["generator"] = generator + return extra_step_kwargs + + def check_inputs( + self, + prompt, + height, + width, + callback_steps, + gligen_phrases, + gligen_boxes, + negative_prompt=None, + prompt_embeds=None, + negative_prompt_embeds=None, + ): + if height % 8 != 0 or width % 8 != 0: + raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.") + + if (callback_steps is None) or ( + callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0) + ): + raise ValueError( + f"`callback_steps` has to be a positive integer but is {callback_steps} of type" + f" {type(callback_steps)}." + ) + + if prompt is not None and prompt_embeds is not None: + raise ValueError( + f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to" + " only forward one of the two." + ) + elif prompt is None and prompt_embeds is None: + raise ValueError( + "Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined." + ) + elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)): + raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}") + + if negative_prompt is not None and negative_prompt_embeds is not None: + raise ValueError( + f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:" + f" {negative_prompt_embeds}. Please make sure to only forward one of the two." + ) + + if prompt_embeds is not None and negative_prompt_embeds is not None: + if prompt_embeds.shape != negative_prompt_embeds.shape: + raise ValueError( + "`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but" + f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`" + f" {negative_prompt_embeds.shape}." + ) + + if len(gligen_phrases) != len(gligen_boxes): + ValueError( + "length of `gligen_phrases` and `gligen_boxes` has to be same, but" + f" got: `gligen_phrases` {len(gligen_phrases)} != `gligen_boxes` {len(gligen_boxes)}" + ) + + # Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents + def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None): + shape = (batch_size, num_channels_latents, height // self.vae_scale_factor, width // self.vae_scale_factor) + if isinstance(generator, list) and len(generator) != batch_size: + raise ValueError( + f"You have passed a list of generators of length {len(generator)}, but requested an effective batch" + f" size of {batch_size}. Make sure the batch size matches the length of the generators." + ) + + if latents is None: + latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype) + else: + latents = latents.to(device) + + # scale the initial noise by the standard deviation required by the scheduler + latents = latents * self.scheduler.init_noise_sigma + return latents + + def enable_fuser(self, enabled=True): + for module in self.unet.modules(): + if type(module) is GatedSelfAttentionDense: + module.enabled = enabled + + def draw_inpaint_mask_from_boxes(self, boxes, size): + inpaint_mask = torch.ones(size[0], size[1]) + for box in boxes: + x0, x1 = box[0] * size[0], box[2] * size[0] + y0, y1 = box[1] * size[1], box[3] * size[1] + inpaint_mask[int(y0) : int(y1), int(x0) : int(x1)] = 0 + return inpaint_mask + + def crop(self, im, new_width, new_height): + width, height = im.size + left = (width - new_width) / 2 + top = (height - new_height) / 2 + right = (width + new_width) / 2 + bottom = (height + new_height) / 2 + return im.crop((left, top, right, bottom)) + + def target_size_center_crop(self, im, new_hw): + width, height = im.size + if width != height: + im = self.crop(im, min(height, width), min(height, width)) + return im.resize((new_hw, new_hw), PIL.Image.LANCZOS) + + @torch.no_grad() + @replace_example_docstring(EXAMPLE_DOC_STRING) + def __call__( + self, + prompt: Union[str, List[str]] = None, + height: Optional[int] = None, + width: Optional[int] = None, + num_inference_steps: int = 50, + guidance_scale: float = 7.5, + gligen_scheduled_sampling_beta: float = 0.3, + gligen_phrases: List[str] = None, + gligen_boxes: List[List[float]] = None, + gligen_inpaint_image: Optional[PIL.Image.Image] = None, + negative_prompt: Optional[Union[str, List[str]]] = None, + num_images_per_prompt: Optional[int] = 1, + eta: float = 0.0, + generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, + latents: Optional[torch.FloatTensor] = None, + prompt_embeds: Optional[torch.FloatTensor] = None, + negative_prompt_embeds: Optional[torch.FloatTensor] = None, + output_type: Optional[str] = "pil", + return_dict: bool = True, + callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None, + callback_steps: int = 1, + cross_attention_kwargs: Optional[Dict[str, Any]] = None, + ): + r""" + The call function to the pipeline for generation. + + Args: + prompt (`str` or `List[str]`, *optional*): + The prompt or prompts to guide image generation. If not defined, you need to pass `prompt_embeds`. + height (`int`, *optional*, defaults to `self.unet.config.sample_size * self.vae_scale_factor`): + The height in pixels of the generated image. + width (`int`, *optional*, defaults to `self.unet.config.sample_size * self.vae_scale_factor`): + The width in pixels of the generated image. + num_inference_steps (`int`, *optional*, defaults to 50): + The number of denoising steps. More denoising steps usually lead to a higher quality image at the + expense of slower inference. + guidance_scale (`float`, *optional*, defaults to 7.5): + A higher guidance scale value encourages the model to generate images closely linked to the text + `prompt` at the expense of lower image quality. Guidance scale is enabled when `guidance_scale > 1`. + gligen_phrases (`List[str]`): + The phrases to guide what to include in each of the regions defined by the corresponding + `gligen_boxes`. There should only be one phrase per bounding box. + gligen_boxes (`List[List[float]]`): + The bounding boxes that identify rectangular regions of the image that are going to be filled with the + content described by the corresponding `gligen_phrases`. Each rectangular box is defined as a + `List[float]` of 4 elements `[xmin, ymin, xmax, ymax]` where each value is between [0,1]. + gligen_inpaint_image (`PIL.Image.Image`, *optional*): + The input image, if provided, is inpainted with objects described by the `gligen_boxes` and + `gligen_phrases`. Otherwise, it is treated as a generation task on a blank input image. + gligen_scheduled_sampling_beta (`float`, defaults to 0.3): + Scheduled Sampling factor from [GLIGEN: Open-Set Grounded Text-to-Image + Generation](https://arxiv.org/pdf/2301.07093.pdf). Scheduled Sampling factor is only varied for + scheduled sampling during inference for improved quality and controllability. + negative_prompt (`str` or `List[str]`, *optional*): + The prompt or prompts to guide what to not include in image generation. If not defined, you need to + pass `negative_prompt_embeds` instead. Ignored when not using guidance (`guidance_scale < 1`). + num_images_per_prompt (`int`, *optional*, defaults to 1): + The number of images to generate per prompt. + eta (`float`, *optional*, defaults to 0.0): + Corresponds to parameter eta (η) from the [DDIM](https://arxiv.org/abs/2010.02502) paper. Only applies + to the [`~schedulers.DDIMScheduler`], and is ignored in other schedulers. + generator (`torch.Generator` or `List[torch.Generator]`, *optional*): + A [`torch.Generator`](https://pytorch.org/docs/stable/generated/torch.Generator.html) to make + generation deterministic. + latents (`torch.FloatTensor`, *optional*): + Pre-generated noisy latents sampled from a Gaussian distribution, to be used as inputs for image + generation. Can be used to tweak the same generation with different prompts. If not provided, a latents + tensor is generated by sampling using the supplied random `generator`. + prompt_embeds (`torch.FloatTensor`, *optional*): + Pre-generated text embeddings. Can be used to easily tweak text inputs (prompt weighting). If not + provided, text embeddings are generated from the `prompt` input argument. + negative_prompt_embeds (`torch.FloatTensor`, *optional*): + Pre-generated negative text embeddings. Can be used to easily tweak text inputs (prompt weighting). If + not provided, `negative_prompt_embeds` are generated from the `negative_prompt` input argument. + output_type (`str`, *optional*, defaults to `"pil"`): + The output format of the generated image. Choose between `PIL.Image` or `np.array`. + return_dict (`bool`, *optional*, defaults to `True`): + Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a + plain tuple. + callback (`Callable`, *optional*): + A function that calls every `callback_steps` steps during inference. The function is called with the + following arguments: `callback(step: int, timestep: int, latents: torch.FloatTensor)`. + callback_steps (`int`, *optional*, defaults to 1): + The frequency at which the `callback` function is called. If not specified, the callback is called at + every step. + cross_attention_kwargs (`dict`, *optional*): + A kwargs dictionary that if specified is passed along to the [`AttentionProcessor`] as defined in + [`self.processor`](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/attention_processor.py). + guidance_rescale (`float`, *optional*, defaults to 0.7): + Guidance rescale factor from [Common Diffusion Noise Schedules and Sample Steps are + Flawed](https://arxiv.org/pdf/2305.08891.pdf). Guidance rescale factor should fix overexposure when + using zero terminal SNR. + + Examples: + + Returns: + [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`: + If `return_dict` is `True`, [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] is returned, + otherwise a `tuple` is returned where the first element is a list with the generated images and the + second element is a list of `bool`s indicating whether the corresponding generated image contains + "not-safe-for-work" (nsfw) content. + """ + # 0. Default height and width to unet + height = height or self.unet.config.sample_size * self.vae_scale_factor + width = width or self.unet.config.sample_size * self.vae_scale_factor + + # 1. Check inputs. Raise error if not correct + self.check_inputs( + prompt, + height, + width, + callback_steps, + gligen_phrases, + gligen_boxes, + negative_prompt, + prompt_embeds, + negative_prompt_embeds, + ) + + # 2. Define call parameters + if prompt is not None and isinstance(prompt, str): + batch_size = 1 + elif prompt is not None and isinstance(prompt, list): + batch_size = len(prompt) + else: + batch_size = prompt_embeds.shape[0] + + device = self._execution_device + # here `guidance_scale` is defined analog to the guidance weight `w` of equation (2) + # of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1` + # corresponds to doing no classifier free guidance. + do_classifier_free_guidance = guidance_scale > 1.0 + + # 3. Encode input prompt + prompt_embeds = self._encode_prompt( + prompt, + device, + num_images_per_prompt, + do_classifier_free_guidance, + negative_prompt, + prompt_embeds=prompt_embeds, + negative_prompt_embeds=negative_prompt_embeds, + ) + + # 4. Prepare timesteps + self.scheduler.set_timesteps(num_inference_steps, device=device) + timesteps = self.scheduler.timesteps + + # 5. Prepare latent variables + num_channels_latents = self.unet.in_channels + latents = self.prepare_latents( + batch_size * num_images_per_prompt, + num_channels_latents, + height, + width, + prompt_embeds.dtype, + device, + generator, + latents, + ) + + # 5.1 Prepare GLIGEN variables + max_objs = 30 + if len(gligen_boxes) > max_objs: + warnings.warn( + f"More that {max_objs} objects found. Only first {max_objs} objects will be processed.", + FutureWarning, + ) + gligen_phrases = gligen_phrases[:max_objs] + gligen_boxes = gligen_boxes[:max_objs] + # prepare batched input to the PositionNet (boxes, phrases, mask) + # Get tokens for phrases from pre-trained CLIPTokenizer + tokenizer_inputs = self.tokenizer(gligen_phrases, padding=True, return_tensors="pt").to(device) + # For the token, we use the same pre-trained text encoder + # to obtain its text feature + _text_embeddings = self.text_encoder(**tokenizer_inputs).pooler_output + n_objs = len(gligen_boxes) + # For each entity, described in phrases, is denoted with a bounding box, + # we represent the location information as (xmin,ymin,xmax,ymax) + boxes = torch.zeros(max_objs, 4, device=device, dtype=self.text_encoder.dtype) + boxes[:n_objs] = torch.tensor(gligen_boxes) + text_embeddings = torch.zeros( + max_objs, self.unet.cross_attention_dim, device=device, dtype=self.text_encoder.dtype + ) + text_embeddings[:n_objs] = _text_embeddings + # Generate a mask for each object that is entity described by phrases + masks = torch.zeros(max_objs, device=device, dtype=self.text_encoder.dtype) + masks[:n_objs] = 1 + + repeat_batch = batch_size * num_images_per_prompt + boxes = boxes.unsqueeze(0).expand(repeat_batch, -1, -1).clone() + text_embeddings = text_embeddings.unsqueeze(0).expand(repeat_batch, -1, -1).clone() + masks = masks.unsqueeze(0).expand(repeat_batch, -1).clone() + if do_classifier_free_guidance: + repeat_batch = repeat_batch * 2 + boxes = torch.cat([boxes] * 2) + text_embeddings = torch.cat([text_embeddings] * 2) + masks = torch.cat([masks] * 2) + masks[: repeat_batch // 2] = 0 + if cross_attention_kwargs is None: + cross_attention_kwargs = {} + cross_attention_kwargs["gligen"] = {"boxes": boxes, "positive_embeddings": text_embeddings, "masks": masks} + + # Prepare latent variables for GLIGEN inpainting + if gligen_inpaint_image is not None: + # if the given input image is not of the same size as expected by VAE + # center crop and resize the input image to expected shape + if gligen_inpaint_image.size != (self.vae.sample_size, self.vae.sample_size): + gligen_inpaint_image = self.target_size_center_crop(gligen_inpaint_image, self.vae.sample_size) + # Convert a single image into a batch of images with a batch size of 1 + # The resulting shape becomes (1, C, H, W), where C is the number of channels, + # and H and W are the height and width of the image. + # scales the pixel values to a range [-1, 1] + gligen_inpaint_image = self.image_processor.preprocess(gligen_inpaint_image) + gligen_inpaint_image = gligen_inpaint_image.to(dtype=self.vae.dtype, device=self.vae.device) + # Run AutoEncoder to get corresponding latents + gligen_inpaint_latent = self.vae.encode(gligen_inpaint_image).latent_dist.sample() + gligen_inpaint_latent = self.vae.config.scaling_factor * gligen_inpaint_latent + # Generate an inpainting mask + # pixel value = 0, where the object is present (defined by bounding boxes above) + # 1, everywhere else + gligen_inpaint_mask = self.draw_inpaint_mask_from_boxes(gligen_boxes, gligen_inpaint_latent.shape[2:]) + gligen_inpaint_mask = gligen_inpaint_mask.to( + dtype=gligen_inpaint_latent.dtype, device=gligen_inpaint_latent.device + ) + gligen_inpaint_mask = gligen_inpaint_mask[None, None] + gligen_inpaint_mask_addition = torch.cat( + (gligen_inpaint_latent * gligen_inpaint_mask, gligen_inpaint_mask), dim=1 + ) + # Convert a single mask into a batch of masks with a batch size of 1 + gligen_inpaint_mask_addition = gligen_inpaint_mask_addition.expand(repeat_batch, -1, -1, -1).clone() + + num_grounding_steps = int(gligen_scheduled_sampling_beta * len(timesteps)) + self.enable_fuser(True) + + # 6. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline + extra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta) + + # 7. Denoising loop + num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order + with self.progress_bar(total=num_inference_steps) as progress_bar: + for i, t in enumerate(timesteps): + # Scheduled sampling + if i == num_grounding_steps: + self.enable_fuser(False) + + if latents.shape[1] != 4: + latents = torch.randn_like(latents[:, :4]) + + if gligen_inpaint_image is not None: + gligen_inpaint_latent_with_noise = ( + self.scheduler.add_noise(gligen_inpaint_latent, torch.randn_like(gligen_inpaint_latent), t) + .expand(latents.shape[0], -1, -1, -1) + .clone() + ) + latents = gligen_inpaint_latent_with_noise * gligen_inpaint_mask + latents * ( + 1 - gligen_inpaint_mask + ) + + # expand the latents if we are doing classifier free guidance + latent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latents + latent_model_input = self.scheduler.scale_model_input(latent_model_input, t) + + if gligen_inpaint_image is not None: + latent_model_input = torch.cat((latent_model_input, gligen_inpaint_mask_addition), dim=1) + + # predict the noise residual + noise_pred = self.unet( + latent_model_input, + t, + encoder_hidden_states=prompt_embeds, + cross_attention_kwargs=cross_attention_kwargs, + ).sample + + # perform guidance + if do_classifier_free_guidance: + noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) + noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) + + # compute the previous noisy sample x_t -> x_t-1 + latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample + + # call the callback, if provided + if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0): + progress_bar.update() + if callback is not None and i % callback_steps == 0: + callback(i, t, latents) + + if not output_type == "latent": + image = self.vae.decode(latents / self.vae.config.scaling_factor, return_dict=False)[0] + image, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype) + else: + image = latents + has_nsfw_concept = None + + if has_nsfw_concept is None: + do_denormalize = [True] * image.shape[0] + else: + do_denormalize = [not has_nsfw for has_nsfw in has_nsfw_concept] + + image = self.image_processor.postprocess(image, output_type=output_type, do_denormalize=do_denormalize) + + # Offload last model to CPU + if hasattr(self, "final_offload_hook") and self.final_offload_hook is not None: + self.final_offload_hook.offload() + + if not return_dict: + return (image, has_nsfw_concept) + + return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept) diff --git a/src/diffusers/pipelines/versatile_diffusion/modeling_text_unet.py b/src/diffusers/pipelines/versatile_diffusion/modeling_text_unet.py index fe9455a19bf0..e5083df286a2 100644 --- a/src/diffusers/pipelines/versatile_diffusion/modeling_text_unet.py +++ b/src/diffusers/pipelines/versatile_diffusion/modeling_text_unet.py @@ -153,6 +153,62 @@ def get_up_block( raise ValueError(f"{up_block_type} is not supported.") +class FourierEmbedder(nn.Module): + def __init__(self, num_freqs=64, temperature=100): + super().__init__() + + self.num_freqs = num_freqs + self.temperature = temperature + + freq_bands = temperature ** (torch.arange(num_freqs) / num_freqs) + freq_bands = freq_bands[None, None, None] + self.register_buffer("freq_bands", freq_bands, persistent=False) + + def __call__(self, x): + x = self.freq_bands * x.unsqueeze(-1) + return torch.stack((x.sin(), x.cos()), dim=-1).permute(0, 1, 3, 4, 2).reshape(*x.shape[:2], -1) + + +class PositionNet(nn.Module): + def __init__(self, positive_len, out_dim, fourier_freqs=8): + super().__init__() + self.positive_len = positive_len + self.out_dim = out_dim + + self.fourier_embedder = FourierEmbedder(num_freqs=fourier_freqs) + self.position_dim = fourier_freqs * 2 * 4 # 2: sin/cos, 4: xyxy + + if isinstance(out_dim, tuple): + out_dim = out_dim[0] + self.linears = nn.Sequential( + nn.Linear(self.positive_len + self.position_dim, 512), + nn.SiLU(), + nn.Linear(512, 512), + nn.SiLU(), + nn.Linear(512, out_dim), + ) + + self.null_positive_feature = torch.nn.Parameter(torch.zeros([self.positive_len])) + self.null_position_feature = torch.nn.Parameter(torch.zeros([self.position_dim])) + + def forward(self, boxes, masks, positive_embeddings): + masks = masks.unsqueeze(-1) + + # embedding position (it may includes padding as placeholder) + xyxy_embedding = self.fourier_embedder(boxes) # B*N*4 -> B*N*C + + # learnable null embedding + positive_null = self.null_positive_feature.view(1, 1, -1) + xyxy_null = self.null_position_feature.view(1, 1, -1) + + # replace padding with learnable null embedding + positive_embeddings = positive_embeddings * masks + (1 - masks) * positive_null + xyxy_embedding = xyxy_embedding * masks + (1 - masks) * xyxy_null + + objs = self.linears(torch.cat([positive_embeddings, xyxy_embedding], dim=-1)) + return objs + + # Copied from diffusers.models.unet_2d_condition.UNet2DConditionModel with UNet2DConditionModel->UNetFlatConditionModel, nn.Conv2d->LinearMultiDim, Block2D->BlockFlat class UNetFlatConditionModel(ModelMixin, ConfigMixin): r""" @@ -298,6 +354,7 @@ def __init__( conv_in_kernel: int = 3, conv_out_kernel: int = 3, projection_class_embeddings_input_dim: Optional[int] = None, + attention_type: str = "default", class_embeddings_concat: bool = False, mid_block_only_cross_attention: Optional[bool] = None, cross_attention_norm: Optional[str] = None, @@ -556,6 +613,7 @@ def __init__( only_cross_attention=only_cross_attention[i], upcast_attention=upcast_attention, resnet_time_scale_shift=resnet_time_scale_shift, + attention_type=attention_type, resnet_skip_time_act=resnet_skip_time_act, resnet_out_scale_factor=resnet_out_scale_factor, cross_attention_norm=cross_attention_norm, @@ -579,6 +637,7 @@ def __init__( dual_cross_attention=dual_cross_attention, use_linear_projection=use_linear_projection, upcast_attention=upcast_attention, + attention_type=attention_type, ) elif mid_block_type == "UNetMidBlockFlatSimpleCrossAttn": self.mid_block = UNetMidBlockFlatSimpleCrossAttn( @@ -645,6 +704,7 @@ def __init__( only_cross_attention=only_cross_attention[i], upcast_attention=upcast_attention, resnet_time_scale_shift=resnet_time_scale_shift, + attention_type=attention_type, resnet_skip_time_act=resnet_skip_time_act, resnet_out_scale_factor=resnet_out_scale_factor, cross_attention_norm=cross_attention_norm, @@ -670,6 +730,14 @@ def __init__( block_out_channels[0], out_channels, kernel_size=conv_out_kernel, padding=conv_out_padding ) + if attention_type == "gated": + positive_len = 768 + if isinstance(cross_attention_dim, int): + positive_len = cross_attention_dim + elif isinstance(cross_attention_dim, tuple) or isinstance(cross_attention_dim, list): + positive_len = cross_attention_dim[0] + self.position_net = PositionNet(positive_len=positive_len, out_dim=cross_attention_dim) + @property def attn_processors(self) -> Dict[str, AttentionProcessor]: r""" @@ -1012,6 +1080,12 @@ def forward( # 2. pre-process sample = self.conv_in(sample) + # 2.5 GLIGEN position net + if cross_attention_kwargs is not None and cross_attention_kwargs.get("gligen", None) is not None: + cross_attention_kwargs = cross_attention_kwargs.copy() + gligen_args = cross_attention_kwargs.pop("gligen") + cross_attention_kwargs["gligen"] = {"objs": self.position_net(**gligen_args)} + # 3. down is_controlnet = mid_block_additional_residual is not None and down_block_additional_residuals is not None @@ -1331,6 +1405,7 @@ def __init__( use_linear_projection=False, only_cross_attention=False, upcast_attention=False, + attention_type="default", ): super().__init__() resnets = [] @@ -1367,6 +1442,7 @@ def __init__( use_linear_projection=use_linear_projection, only_cross_attention=only_cross_attention, upcast_attention=upcast_attention, + attention_type=attention_type, ) ) else: @@ -1569,6 +1645,7 @@ def __init__( use_linear_projection=False, only_cross_attention=False, upcast_attention=False, + attention_type="default", ): super().__init__() resnets = [] @@ -1607,6 +1684,7 @@ def __init__( use_linear_projection=use_linear_projection, only_cross_attention=only_cross_attention, upcast_attention=upcast_attention, + attention_type=attention_type, ) ) else: @@ -1711,6 +1789,7 @@ def __init__( dual_cross_attention=False, use_linear_projection=False, upcast_attention=False, + attention_type="default", ): super().__init__() @@ -1747,6 +1826,7 @@ def __init__( norm_num_groups=resnet_groups, use_linear_projection=use_linear_projection, upcast_attention=upcast_attention, + attention_type=attention_type, ) ) else: diff --git a/src/diffusers/utils/dummy_torch_and_transformers_objects.py b/src/diffusers/utils/dummy_torch_and_transformers_objects.py index df8009dd0e27..dbddf6e2c593 100644 --- a/src/diffusers/utils/dummy_torch_and_transformers_objects.py +++ b/src/diffusers/utils/dummy_torch_and_transformers_objects.py @@ -602,6 +602,21 @@ def from_pretrained(cls, *args, **kwargs): requires_backends(cls, ["torch", "transformers"]) +class StableDiffusionGLIGENPipeline(metaclass=DummyObject): + _backends = ["torch", "transformers"] + + def __init__(self, *args, **kwargs): + requires_backends(self, ["torch", "transformers"]) + + @classmethod + def from_config(cls, *args, **kwargs): + requires_backends(cls, ["torch", "transformers"]) + + @classmethod + def from_pretrained(cls, *args, **kwargs): + requires_backends(cls, ["torch", "transformers"]) + + class StableDiffusionImageVariationPipeline(metaclass=DummyObject): _backends = ["torch", "transformers"] diff --git a/tests/pipelines/stable_diffusion/test_stable_diffusion_gligen.py b/tests/pipelines/stable_diffusion/test_stable_diffusion_gligen.py new file mode 100644 index 000000000000..19d44e0cd1d9 --- /dev/null +++ b/tests/pipelines/stable_diffusion/test_stable_diffusion_gligen.py @@ -0,0 +1,143 @@ +# coding=utf-8 +# Copyright 2023 HuggingFace Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import numpy as np +import torch +from transformers import CLIPTextConfig, CLIPTextModel, CLIPTokenizer + +from diffusers import ( + AutoencoderKL, + DDIMScheduler, + StableDiffusionGLIGENPipeline, + UNet2DConditionModel, +) +from diffusers.utils.testing_utils import enable_full_determinism + +from ..pipeline_params import ( + TEXT_TO_IMAGE_BATCH_PARAMS, + TEXT_TO_IMAGE_IMAGE_PARAMS, + TEXT_TO_IMAGE_PARAMS, +) +from ..test_pipelines_common import PipelineKarrasSchedulerTesterMixin, PipelineLatentTesterMixin, PipelineTesterMixin + + +enable_full_determinism() + + +class GligenPipelineFastTests( + PipelineLatentTesterMixin, PipelineKarrasSchedulerTesterMixin, PipelineTesterMixin, unittest.TestCase +): + pipeline_class = StableDiffusionGLIGENPipeline + params = TEXT_TO_IMAGE_PARAMS | {"gligen_phrases", "gligen_boxes"} + batch_params = TEXT_TO_IMAGE_BATCH_PARAMS + image_params = TEXT_TO_IMAGE_IMAGE_PARAMS + image_latents_params = TEXT_TO_IMAGE_IMAGE_PARAMS + + def get_dummy_components(self): + torch.manual_seed(0) + unet = UNet2DConditionModel( + block_out_channels=(32, 64), + layers_per_block=2, + sample_size=32, + in_channels=4, + out_channels=4, + down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"), + up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"), + cross_attention_dim=32, + attention_type="gated", + ) + # unet.position_net = PositionNet(32,32) + scheduler = DDIMScheduler( + beta_start=0.00085, + beta_end=0.012, + beta_schedule="scaled_linear", + clip_sample=False, + set_alpha_to_one=False, + ) + torch.manual_seed(0) + vae = AutoencoderKL( + block_out_channels=[32, 64], + in_channels=3, + out_channels=3, + down_block_types=["DownEncoderBlock2D", "DownEncoderBlock2D"], + up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"], + latent_channels=4, + sample_size=128, + ) + torch.manual_seed(0) + text_encoder_config = CLIPTextConfig( + bos_token_id=0, + eos_token_id=2, + hidden_size=32, + intermediate_size=37, + layer_norm_eps=1e-05, + num_attention_heads=4, + num_hidden_layers=5, + pad_token_id=1, + vocab_size=1000, + ) + text_encoder = CLIPTextModel(text_encoder_config) + tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") + + components = { + "unet": unet, + "scheduler": scheduler, + "vae": vae, + "text_encoder": text_encoder, + "tokenizer": tokenizer, + "safety_checker": None, + "feature_extractor": None, + } + return components + + def get_dummy_inputs(self, device, seed=0): + if str(device).startswith("mps"): + generator = torch.manual_seed(seed) + else: + generator = torch.Generator(device=device).manual_seed(seed) + inputs = { + "prompt": "A modern livingroom", + "generator": generator, + "num_inference_steps": 2, + "guidance_scale": 6.0, + "gligen_phrases": ["a birthday cake"], + "gligen_boxes": [[0.2676, 0.6088, 0.4773, 0.7183]], + "output_type": "np", + } + return inputs + + def test_gligen(self): + device = "cpu" # ensure determinism for the device-dependent torch.Generator + components = self.get_dummy_components() + sd_pipe = StableDiffusionGLIGENPipeline(**components) + sd_pipe = sd_pipe.to(device) + sd_pipe.set_progress_bar_config(disable=None) + + inputs = self.get_dummy_inputs(device) + image = sd_pipe(**inputs).images + image_slice = image[0, -3:, -3:, -1] + + assert image.shape == (1, 64, 64, 3) + expected_slice = np.array([0.5069, 0.5561, 0.4577, 0.4792, 0.5203, 0.4089, 0.5039, 0.4919, 0.4499]) + + assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2 + + def test_attention_slicing_forward_pass(self): + super().test_attention_slicing_forward_pass(expected_max_diff=3e-3) + + def test_inference_batch_single_identical(self): + super().test_inference_batch_single_identical(batch_size=3, expected_max_diff=3e-3)