# Sound Generation with AudioLDM2 and OpenVINO™



## Prerequisites


In [1]:
%pip install -q accelerate diffusers transformers torch gradio --extra-index-url https://download.pytorch.org/whl/cpu
%pip install -q "openvino>=2023.2.0"

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


## Instantiating Generation Pipeline 



In [2]:
from collections import namedtuple
import gc
from functools import partial
from pathlib import Path

from diffusers import AudioLDM2Pipeline
from IPython.display import Audio
import numpy as np
import openvino as ov
import torch

MODEL_ID = "cvssp/audioldm2"
pipe = AudioLDM2Pipeline.from_pretrained(MODEL_ID)

prompt = "birds singing in the forest"
negative_prompt = "Low quality"
audio = pipe(
    prompt,
    negative_prompt=negative_prompt,
    num_inference_steps=100,
    audio_length_in_s=5.0
).audios[0]

sampling_rate = 16000
Audio(audio, rate=sampling_rate)

Loading pipeline components...:   0%|          | 0/11 [00:00<?, ?it/s]

  0%|          | 0/100 [00:00<?, ?it/s]

## Convert models to OpenVINO Intermediate representation (IR) format 

We need to provide a model object, input data for model tracing to `ov.convert_model` function to obtain OpenVINO `ov.Model` object instance. Model can be saved on disk for next deployment using `ov.save_model` function.

The pipeline consists of four important parts:

* Text Encoder for creation condition to generate an image from a text prompt.
* Unet for step-by-step denoising latent image representation.
* Autoencoder (VAE) for decoding latent space to image.

In [3]:
import gc
from functools import partial
from pathlib import Path
from PIL import Image
import openvino as ov
import torch

def cleanup_torchscript_cache():
    """
    Helper for removing cached model representation
    """
    torch._C._jit_clear_class_registry()
    torch.jit._recursive.concrete_type_store = torch.jit._recursive.ConcreteTypeStore()
    torch.jit._state._clear_class_state()

### Text Encoder 
The text-encoder is responsible for transforming the input prompt, for example, "a photo of an astronaut riding a horse" into an embedding space that can be understood by the U-Net. It is usually a simple transformer-based encoder that maps a sequence of input tokens to a sequence of latent text embeddings.

The input of the text encoder is tensor `input_ids`, which contains indexes of tokens from text processed by the tokenizer and padded to the maximum length accepted by the model. Model outputs are two tensors: `last_hidden_state` - hidden state from the last MultiHeadAttention layer in the model and `pooler_out` - pooled output for whole model hidden states.

In [4]:
class ClapEncoderWrapper(torch.nn.Module):
    def __init__(self, encoder):
        super().__init__()
        encoder.eval()
        self.encoder = encoder

    def forward(self, input_ids, attention_mask):
        return self.encoder.get_text_features(input_ids, attention_mask)

clap_text_encoder_ir_path = Path('./clap_text_encoder.xml')

if not clap_text_encoder_ir_path.exists():
    with torch.no_grad():
        ov_model = ov.convert_model(
            ClapEncoderWrapper(pipe.text_encoder),  # model instance
            example_input={
                "input_ids": torch.ones((1, 512), dtype=torch.long),
                "attention_mask": torch.ones((1, 512), dtype=torch.long),
            },  # inputs for model tracing
        )
    ov.save_model(ov_model, clap_text_encoder_ir_path)
    # del ov_model
    # del pipe.text_encoder
    cleanup_torchscript_cache()
    print('Text Encoder successfully converted to IR')
else:
    # del pipe.text_encoder
    print(f"Text Encoder will be loaded from {clap_text_encoder_ir_path}")

Text Encoder will be loaded from clap_text_encoder.xml


### Second text encoder conversion

In [5]:
t5_text_encoder_ir_path = Path('./t5_text_encoder.xml')

if not t5_text_encoder_ir_path.exists():
    pipe.text_encoder_2.eval()
    with torch.no_grad():
        ov_model = ov.convert_model(
            pipe.text_encoder_2,  # model instance
            example_input=torch.ones((1, 7), dtype=torch.long),  # inputs for model tracing
        )
    ov.save_model(ov_model, t5_text_encoder_ir_path)
    # del ov_model
    # del pipe.text_encoder_2
    cleanup_torchscript_cache()
    print('Text Encoder successfully converted to IR')
else:
    # del pipe.text_encoder_2
    print(f"Text Encoder will be loaded from {t5_text_encoder_ir_path}")

Text Encoder will be loaded from t5_text_encoder.xml


### Vocoder conversion

In [6]:
vocoder_ir_path = Path('./vocoder.xml')

if not vocoder_ir_path.exists():
    pipe.vocoder.eval()
    with torch.no_grad():
        ov_model = ov.convert_model(
            pipe.vocoder,  # model instance
            example_input=torch.ones((1, 700, 64), dtype=torch.float32),  # inputs for model tracing
        )
    ov.save_model(ov_model, vocoder_ir_path)
    # del ov_model
    # del pipe.vocoder
    cleanup_torchscript_cache()
    print('The Vocoder successfully converted to IR')
else:
    # del pipe.vocoder
    print(f"The Vocoder will be loaded from {vocoder_ir_path}")

The Vocoder will be loaded from vocoder.xml


### GPT-2 conversion

In [7]:
from functools import partial


language_model_ir_path = Path('./language_model.xml')

language_model_inputs = {
    "inputs_embeds": torch.randn((1, 12, 768), dtype=torch.float32),
    "attention_mask": torch.ones((1, 12), dtype=torch.int64),
}

if not language_model_ir_path.exists():
    pipe.language_model.config.torchscript = True
    pipe.language_model.eval()
    pipe.language_model.__call__ = partial(pipe.language_model.__call__, kwargs={
                "past_key_values": None,
                "use_cache": False,
                "return_dict": False})
    with torch.no_grad():
        ov_model = ov.convert_model(
            pipe.language_model,  # model instance
            example_input=language_model_inputs,  # inputs for model tracing
        )

    ov_model.inputs[0].get_node().set_partial_shape(ov.PartialShape([1, -1]))
    ov_model.inputs[0].get_node().set_element_type(ov.Type.i64)
    ov_model.inputs[1].get_node().set_partial_shape(ov.PartialShape([1, -1, 768]))
    ov_model.inputs[1].get_node().set_element_type(ov.Type.f32)
    
    ov_model.validate_nodes_and_infer_types()

    ov.save_model(ov_model, language_model_ir_path)
    # del ov_model
    # del pipe.language_model
    cleanup_torchscript_cache()
    print('The Projection Model successfully converted to IR')
else:
    # del pipe.language_model
    print(f"The Projection Model will be loaded from {language_model_ir_path}")

The Projection Model will be loaded from language_model.xml


### Projection model conversion

In [8]:
projection_model_ir_path = Path('./projection_model.xml')

projection_model_inputs = {
    "hidden_states": torch.randn((1, 1, 512), dtype=torch.float32),
    "hidden_states_1": torch.randn((1, 7, 1024), dtype=torch.float32),
    "attention_mask": torch.ones((1, 1), dtype=torch.int64),
    "attention_mask_1": torch.ones((1, 7), dtype=torch.int64),
}

if not projection_model_ir_path.exists():
    pipe.projection_model.eval()
    with torch.no_grad():
        ov_model = ov.convert_model(
            pipe.projection_model,  # model instance
            example_input=projection_model_inputs,  # inputs for model tracing
        )
    ov.save_model(ov_model, projection_model_ir_path)
    # del ov_model
    # del pipe.projection_model
    cleanup_torchscript_cache()
    print('The Projection Model successfully converted to IR')
else:
    # del pipe.projection_model
    print(f"The Projection Model will be loaded from {projection_model_ir_path}")

The Projection Model will be loaded from projection_model.xml


### UNet conversion 

The process of UNet model conversion remains the same, like for original Stable Diffusion model, but with respect to the new inputs generated by ControlNet.

In [9]:
unet_ir_path = Path('./unet.xml')

pipe.unet.eval()
unet_inputs = {
    "sample": torch.randn((2, 8, 75, 16), dtype=torch.float32),
    "timestep": torch.tensor(1, dtype=torch.int64),
    "encoder_hidden_states": torch.randn((2, 8, 768), dtype=torch.float32),
    "encoder_hidden_states_1": torch.randn((2, 7, 1024), dtype=torch.float32),
    "encoder_attention_mask_1": torch.ones((2, 7), dtype=torch.int64),
}

if not unet_ir_path.exists():
    with torch.no_grad():
        ov_model = ov.convert_model(pipe.unet, example_input=unet_inputs)

    ov_model.inputs[0].get_node().set_partial_shape(ov.PartialShape((2, 8, -1, 16)))
    ov_model.inputs[2].get_node().set_partial_shape(ov.PartialShape((2, 8, 768)))
    ov_model.inputs[3].get_node().set_partial_shape(ov.PartialShape((2, -1, 1024)))
    ov_model.inputs[4].get_node().set_partial_shape(ov.PartialShape((2, -1)))
    ov_model.validate_nodes_and_infer_types()
        
    ov.save_model(ov_model, unet_ir_path)
    
    # del ov_model
    # del pipe.unet
    cleanup_torchscript_cache()
    gc.collect()
    print('Unet successfully converted to IR')
else:
    # del pipe.unet
    print(f"Unet will be loaded from {unet_ir_path}")

Unet will be loaded from unet.xml


### VAE Decoder conversion 

The VAE model has two parts, an encoder, and a decoder. The encoder is used to convert the image into a low-dimensional latent representation, which will serve as the input to the U-Net model. The decoder, conversely, transforms the latent representation back into an image.

During latent diffusion training, the encoder is used to get the latent representations (latents) of the images for the forward diffusion process, which applies more and more noise at each step. During inference, the denoised latents generated by the reverse diffusion process are converted back into images using the VAE decoder. During inference, we will see that we **only need the VAE decoder**. You can find instructions on how to convert the encoder part in a stable diffusion [notebook](../225-stable-diffusion-text-to-image/225-stable-diffusion-text-to-image.ipynb).

In [10]:
vae_ir_path = Path('./vae.xml')


class VAEDecoderWrapper(torch.nn.Module):
    def __init__(self, vae):
        super().__init__()
        vae.eval()
        self.vae = vae

    def forward(self, latents):
        return self.vae.decode(latents)

if not vae_ir_path.exists():
    vae_decoder = VAEDecoderWrapper(pipe.vae)
    latents = torch.zeros((1, 8, 175, 16))

    vae_decoder.eval()
    with torch.no_grad():
        ov_model = ov.convert_model(vae_decoder, example_input=latents)
        ov.save_model(ov_model, vae_ir_path)
    # del ov_model
    # del pipe.vae
    cleanup_torchscript_cache()
    print('VAE decoder successfully converted to IR')
else:
    # del pipe.vae
    print(f"VAE decoder will be loaded from {vae_ir_path}")

VAE decoder will be loaded from vae.xml


## Select inference device for Stable Diffusion pipeline 

select device from dropdown list for running inference using OpenVINO

In [11]:
import ipywidgets as widgets

core = ov.Core()

DEVICE = widgets.Dropdown(
    options=core.available_devices + ["AUTO"],
    value="CPU",
    description="Device:",
    disabled=False,
)

DEVICE

Dropdown(description='Device:', options=('CPU', 'AUTO'), value='CPU')

## Prepare Inference pipeline 


In [12]:
class OVClapEncoderWrapper:
    def __init__(self, encoder_ir, config):
        self.encoder = core.compile_model(encoder_ir, DEVICE.value)
        self.config = config

    def get_text_features(self, input_ids, attention_mask, **_):
        last_hidden_state = self.encoder([input_ids, attention_mask])[0]
        return torch.from_numpy(last_hidden_state)

class OVT5EncoderWrapper:
    def __init__(self, encoder_ir, config):
        self.encoder = core.compile_model(encoder_ir, DEVICE.value)
        self.config = config
        self.dtype = self.config.torch_dtype

    def __call__(self, input_ids, **_):
        last_hidden_state = self.encoder(input_ids)[0]
        return torch.from_numpy(last_hidden_state)[None, ...]
    
class OVVocoderWrapper:
    def __init__(self, vocoder_ir, config):
        self.vocoder = core.compile_model(vocoder_ir, DEVICE.value)
        self.config = config

    def __call__(self, mel_spectrogram, **_):
        waveform = self.vocoder(mel_spectrogram)[0]
        return torch.from_numpy(waveform)
    
class OVProjectionModelWrapper:
    def __init__(self, proj_model_ir, config):
        self.proj_model = core.compile_model(proj_model_ir, DEVICE.value)
        self.config = config
        self.output_type = namedtuple("ProjectionOutput", ["hidden_states", "attention_mask"])

    def __call__(
        self, hidden_states,
        hidden_states_1,
        attention_mask,
        attention_mask_1, **_
    ):
        output = self.proj_model({
            "hidden_states": hidden_states,
            "hidden_states_1": hidden_states_1,
            "attention_mask": attention_mask,
            "attention_mask_1": attention_mask_1,
        })
        return self.output_type(torch.from_numpy(output[0]), torch.from_numpy(output[1]))
    
class OVUnetWrapper:
    def __init__(self, unet_ir, config):
        self.unet = core.compile_model(unet_ir, DEVICE.value)
        self.config = config

    def __call__(
        self, sample,
        timestep,
        encoder_hidden_states,
        encoder_hidden_states_1,
        encoder_attention_mask_1, **_
    ):
        output = self.unet({
            "sample": sample,
            "timestep": timestep,
            "encoder_hidden_states": encoder_hidden_states,
            "encoder_hidden_states_1": encoder_hidden_states_1,
            "encoder_attention_mask_1": encoder_attention_mask_1,
        })
        return (torch.from_numpy(output[0]), )

class OVVaeDecoderWrapper:
    def __init__(self, vae_ir, config):
        self.vae = core.compile_model(vae_ir, DEVICE.value)
        self.config = config
        self.output_type = namedtuple("VaeOutput", ["sample"])

    def decode(self, latents, **_):
        last_hidden_state = self.vae(latents)[0]
        return self.output_type(torch.from_numpy(last_hidden_state))
    
def generate_language_model(
    gpt_2: ov.CompiledModel,
    inputs_embeds: torch.Tensor,
    attention_mask: torch.Tensor,
    max_new_tokens: int = 8,
    **_
) -> torch.Tensor:
    """
    Generates a sequence of hidden-states from the language model, conditioned on the embedding inputs.
    """
    if not max_new_tokens:
        max_new_tokens = 8
    inputs_embeds = inputs_embeds.cpu().numpy()
    attention_mask = attention_mask.cpu().numpy()
    for _ in range(max_new_tokens):
        # forward pass to get next hidden states
        output = gpt_2({'inputs_embeds':inputs_embeds, 'attention_mask':attention_mask})

        next_hidden_states = output[0]

        # Update the model input
        inputs_embeds = np.concatenate([inputs_embeds, next_hidden_states[:, -1:, :]], axis=1)
        attention_mask = np.concatenate([attention_mask, np.ones((attention_mask.shape[0], 1))], axis=1)
    return torch.from_numpy(inputs_embeds[:, -max_new_tokens:, :])


In [16]:
pipe = AudioLDM2Pipeline.from_pretrained(MODEL_ID)
pipe.config.torchscript = True
pipe.config.return_dict = False

np.random.seed(0)
torch.manual_seed(0) 

pipe.text_encoder = OVClapEncoderWrapper(clap_text_encoder_ir_path, pipe.text_encoder.config)
pipe.text_encoder_2 = OVT5EncoderWrapper(t5_text_encoder_ir_path, pipe.text_encoder_2.config)
pipe.projection_model = OVProjectionModelWrapper(projection_model_ir_path, pipe.projection_model.config)
pipe.vocoder = OVVocoderWrapper(vocoder_ir_path, pipe.vocoder.config)
pipe.unet = OVUnetWrapper(unet_ir_path, pipe.unet.config)
pipe.vae = OVVaeDecoderWrapper(vae_ir_path, pipe.vae.config)

pipe.generate_language_model = partial(generate_language_model, core.compile_model(language_model_ir_path, DEVICE.value))

prompt = "birds singing in the forest"
negative_prompt = "Low quality"
audio = pipe(
    prompt,
    negative_prompt=negative_prompt,
    num_inference_steps=200,
    audio_length_in_s=3.0
).audios[0]

sampling_rate = 16000
Audio(audio, rate=sampling_rate)

Loading pipeline components...:   0%|          | 0/11 [00:00<?, ?it/s]

  0%|          | 0/200 [00:00<?, ?it/s]

## Try out the converted pipeline
[back to top ⬆️](#Table-of-contents:)

Now, we are ready to start generation. For improving the generation process, we also introduce an opportunity to provide a `negative prompt`. Technically, positive prompt steers the diffusion toward the output associated with it, while negative prompt steers the diffusion away from it.
The demo app below is created using [Gradio package](https://www.gradio.app/docs/interface)

In [19]:
import gradio as gr

def _generate(prompt, negative_prompt, audio_length_in_s, num_inference_steps):
    """Gradio backing function."""
    audio_values = pipe(
        prompt,
        negative_prompt=negative_prompt,
        num_inference_steps=num_inference_steps,
        audio_length_in_s=audio_length_in_s
    )
    waveform = audio_values[0].squeeze() * 2**15
    return (sampling_rate, waveform.astype(np.int16))

demo = gr.Interface(
    _generate,
    inputs=[
        gr.Textbox(label="Text Prompt"),
        gr.Textbox(label="Negative Prompt", placeholder="Example: Low quality"),
        gr.Slider(
            minimum=1.0,
            maximum=15.0,
            step=0.25,
            value=7,
            label="Audio Length (s)",
        ),
        gr.Slider(label="Inference Steps", step=5, value=150, minimum=50, maximum=250)
    ],
    outputs=[
        "audio"
    ],
    examples=[
        ["birds singing in the forest", "Low quality", 7, 150],
        ["The sound of a hammer hitting a wooden surface", "", 4, 200],
    ],
)
try:
    demo.launch(debug=True)
except Exception:
    demo.launch(share=True, debug=True)

# If you are launching remotely, specify server_name and server_port
# EXAMPLE: `demo.launch(server_name='your server name', server_port='server port in int')`
# To learn more please refer to the Gradio docs: https://gradio.app/docs/

Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.


  0%|          | 0/150 [00:00<?, ?it/s]

Keyboard interruption in main thread... closing server.
