# CONFIDENTIAL STATEMENT

We value and respect the intellectual property of all participants in our project. We assure you that any information you submit, including model architecture, Colab code, and checkpoint, will be treated with the utmost confidentiality. We will not disclose or share any information without your explicit permission.

We understand the importance of protecting your research and intellectual property rights, and we will do everything in our power to safeguard them. We appreciate your trust in our project and look forward to seeing your amazing contributions.

If you have any questions or concerns, please do not hesitate to reach out to us.

Thank you for your participation in our project.

Sincerely,

Mathieu Lagrange, CNRS, Ecole Centrale Nantes, Nantes Université

Junwon Lee, Gaudio Lab, Inc. / Korea Advanced Institute of Science & Technology (KAIST)

Modan Tailleur, Laboratoire des sciences du numérique de Nantes

Laurie Heller, Carnegie Mellon University

Keunwoo Choi, Gaudio Lab, inc.

Brian McFee, New York University

Keisuke Imoto, Doshisha University

Yuki Okamoto, Ritsumeikan University



# DCASE 2024 Task 7 Sound Scene Synthesis
This is the submission template of **DCASE 2024 Task 7 Sound Scene Synthesis**.
Please read the following code carefully and submit your model in a similar manner.


tl;dr -
- Read the existing code blocks and use the existing functions and classes
- Update "CHANGE THIS BLOCK" block to add your model
- Make sure it works by running the following "TEST BLOCK"

In [1]:
"""
DO NOT MODIFY THIS BLOCK.
"""

# Install packages for template code.
! pip install GitPython gdown==5.1.0
# Install packages for Baseline Model.
# If this cause a 'pydevd_plugins' error, simply RESTART the SESSION to solve the problem.
! pip install librosa==0.9.2 pytorch-lightning==2.1.1 transformers==4.30.2 einops==0.7.0 torchlibrosa==0.0.9 ftfy==6.1.1 braceexpand==0.1.7 webdataset==0.2.75 wget==3.2 timm==0.4.12 wandb taming-transformers-rom1504==0.0.6

Collecting GitPython
  Downloading GitPython-3.1.43-py3-none-any.whl (207 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m207.3/207.3 kB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25hCollecting gdown==5.1.0
  Downloading gdown-5.1.0-py3-none-any.whl (17 kB)
Collecting beautifulsoup4 (from gdown==5.1.0)
  Downloading beautifulsoup4-4.12.3-py3-none-any.whl (147 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m147.9/147.9 kB[0m [31m11.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting filelock (from gdown==5.1.0)
  Downloading filelock-3.14.0-py3-none-any.whl (12 kB)
Collecting requests[socks] (from gdown==5.1.0)
  Using cached requests-2.31.0-py3-none-any.whl (62 kB)
Collecting tqdm (from gdown==5.1.0)
  Using cached tqdm-4.66.4-py3-none-any.whl (78 kB)
Collecting gitdb<5,>=4.0.1 (from GitPython)
  Using cached gitdb-4.0.11-py3-none-any.whl (62 kB)
Collecting smmap<6,>=3.0.1 (from gitdb<5,>=4.0.1->GitPython)
  Using cached smmap-5.0.1-py3-n

In [2]:
"""
DO NOT MODIFY THIS BLOCK.
"""
from typing import List,Dict,Tuple
from numpy import ndarray

from abc import ABC, abstractmethod
from tqdm import tqdm
from IPython import display

- You should subclass the below `SoundSynthesisModel()`
- You should implement the method  `synthesize_sounds()`.

In [3]:
"""
DO NOT MODIFY THIS BLOCK.
YOU SHOULD SUBCLASS `SoundSynthesisModel` to wrap your model.
"""
class SoundSynthesisModel(ABC):
    @abstractmethod
    def synthesize_sounds(self, text_prompts: List[str]) -> Dict[str, ndarray]:
        """Synthesize sound examples that corresponds to the given text prompts respectively.

        Args:
            text_prompts (list of strings): text prompts in string enclosed in a list.

        Return:
            sound_samples (dict): A dictionary with text prompts as keys and the corresponding sound samples as values.
                Each value should be a 1-dim array (mono signal) with sample_rate=32000.
                If your model is not working at 32,000Hz, please add a resampling logic within this method.

        """
        pass

This is a dummy example of a class inheriting the abstract class **SoundSynthesisModel**. This system always outputs some 440hz sinusoids.

In [5]:
"""
DO NOT MODIFY THIS BLOCK.
"""
import numpy as np


class SubmittedSystemExample(SoundSynthesisModel):
    """Example of a submitted system. This is a dummy model that generates a sine wave."""
    def __init__(self) -> None:
        super().__init__()
        self.sr: int = 32000 # Official Sample Rate.
        self.duration: int = 4 # Official Duration of the sound, in seconds.
        self.batch_size: int = 1 # Batch size for inference. Batched Inference Recommended, to reduce the overall generation time.

    def synthesize_sounds(self, text_prompts: List[str]) -> Dict[str, ndarray]:
        generated_samples: Dict[str, ndarray] = {}

        # batch inference
        for i in range(0, len(text_prompts), self.batch_size):
            start_idx = i
            end_idx = min(i + self.batch_size, len(text_prompts))
            batch = text_prompts[start_idx:end_idx]
            for text_prompt in batch:
                generated_samples[text_prompt] = 0.5 * np.sin(2 * np.pi * 440 * np.arange(self.duration * self.sr) / self.sr)

        assert text_prompts == list(generated_samples.keys())
        return generated_samples


==========

The code below will be executed by the organizers to generate sounds.

In [7]:
"""
DO NOT MODIFY THIS BLOCK.
This block is used for every submission.
"""
import os
import tqdm
import soundfile as sf
from google.colab import drive

drive.mount('/content/gdrive')
ROOT_PATH = "/content/gdrive/MyDrive/DCASE2024-T7"


text_prompts_list = ["a buzzer is ringing with water in the background",
    "a pig is grunting with water in the background",
    "an alarm of a car door stayin open is ringin with crowd in the background",
    "a small dog is whining with water in the background",
    "a car horn is honking with crowd in the background",
    "a baby is laughing with crowd in the background",
    "a burglar alarm is ringing with traffic in the background"] # Example text prompts from Dev. Set.

# function to read text_prompts_list from 'caption' column of a CSV file
def read_text_prompts_from_csv(filepath: str) -> List[str]:
    """Reads text prompts from a CSV file.

    Args:
        filepath (str): path to the CSV file.

    Returns:
        text_prompts (list of strings): List of text prompts.

    """
    import pandas as pd
    assert os.path.exists(filepath), f"File not found: {filepath}"
    df = pd.read_csv(filepath)
    return df['caption'].tolist()

# text_prompts_list = read_text_prompts_from_csv(os.path.join(ROOT_PATH, 'dataset/dev/caption.csv')) # organizers will update this

SR = 32000  # audio sample-rate in Hz
duration = 4  # audio duratio in seconds
submission_idx = 0  # organizers will update this

save_folder = f'submission-{submission_idx:02d}'
print(save_folder)
os.makedirs(os.path.join(ROOT_PATH, save_folder), exist_ok=False)  # set to be False so that we won't overwrite.



ModuleNotFoundError: No module named 'google.colab'

In [None]:
"""
DO NOT MODIFY THIS BLOCK.

How to Download Required Files
  - You will be allowed to download required files (e.g. checkpoints, .py files), if required.
  - There are three ways to download the required files:
  1. Google Drive: by shared google drive link. use 'google_drive_download' function.
  2. Direct Link: by direct link. use 'wget_download' function.
  3. Git Repository: by cloning the repository. use 'git_clone_checkout' function.
      In this case, you must specify the branch and commit_sha.
      DO NOT modify the code after submission. If so, your submission will be DISQUALIFIED.
  - Make sure the link(url) or git repo is public.
  - We recommend to use shared links from platforms(e.g. shared Google drive, Dropbox, Zenodo link).
  - You can set the download path, ONLY relative to the ROOT_PATH/save_folder.

How to Unpack the Files
  - If the downloaded file is compressed, you should unpack the file using 'unpack_file' function.
  - The file format should be either .tar, .tar.gz, .tar.xz, or .zip. NO OTHER FORMAT SUPPORTED.
  - You can set the unpack path, ONLY relative to the ROOT_PATH/save_folder.
"""

import os
import gdown
from git import Repo


def check_download_file_info(filename: str, shared_url: str, relative_dir: str, url_prefix: str) -> None:
  """ Check the validity of the download_file_info.

  Args:
    filename (str): The name of the file.
    shared_url (str): The shared url of the file.
    relative_dir (str): The relative directory to save the file. Relative to the ROOT_PATH/save_folder.
    url_prefix (str): The required prefix of the shared url.

  Returns:
    None

  """
  if not shared_url.startswith(url_prefix):
    raise ValueError(f"Invalid url: {shared_url}.\nMake sure the url is valid.\nIt should start with \'{url_prefix}\'.")
  if '/' in filename:
    raise ValueError(f"Invalid filename: {filename}.\nMake sure the filename does not start with \'/\'.")
  if relative_dir.startswith('/'):
    raise ValueError(f"Invalid relative_dir: {relative_dir}.\nMake sure the relative_dir is not an absolute path.")


def google_drive_download(filename: str, shared_url: str, relative_dir: str) -> None:
  """ Download the file from the shared link of google drive.

  Args:
    filename (str): The name of the file.
    shared_url (str): The shared url of the file from google drive.
    relative_dir (str): The relative directory to save the file. Relative to the ROOT_PATH/save_folder.

  Returns:
    None

  """
  check_download_file_info(filename, shared_url, relative_dir, 'https://drive.google.com')
  os.makedirs(os.path.join(ROOT_PATH, save_folder, relative_dir), exist_ok=True)
  print(f'Downloading \'{filename}\' from gdrive to {os.path.join(ROOT_PATH, save_folder, relative_dir, filename)}')
  gdown.download(url=shared_url, output=os.path.join(ROOT_PATH, save_folder, relative_dir, filename),
                 quiet=False, fuzzy=True)


def wget_download(filename: str, shared_url: str, relative_dir: str) -> None:
  """ Download the file from the shared link, except google drive, using wget command.

  Args:
    filename (str): The name of the file.
    shared_url (str): The shared url of the file.
    relative_dir (str): The relative directory to save the file. Relative to the ROOT_PATH/save_folder.

  Returns:
    None

  """
  check_download_file_info(filename, shared_url, relative_dir, 'https://')
  os.makedirs(os.path.join(ROOT_PATH, save_folder, relative_dir), exist_ok=True)

  import subprocess
  from IPython.display import display, clear_output
  import time

  command = ['wget', shared_url, '-O', os.path.join(ROOT_PATH, save_folder, relative_dir, filename), '-v']

  process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)

  while True:
    output = process.stdout.readline()
    if output == '' and process.poll() is not None:
        break
    if output:
        # Clear the previous output
        clear_output(wait=True)
        # Display the new output
        display(output.strip())
    time.sleep(0.1)  # Add a slight delay to reduce flickering

  rc = process.poll()
  if rc == 0:
      print(f"Download completed successfully (filename: {filename}).")
  else:
      print(f"Download failed with return code {rc} (filename: {filename}).")


def git_clone_checkout(output_dir: str, url: str, branch: str, commit_sha: str) -> None:
  """ Clone the repository and checkout the specific branch and commit.

  Args:
    output_dir (str): The directory to save the repository. Relative to the ROOT_PATH/save_folder.
    url (str): The url of the repository.
    branch (str): The branch name.
    commit_sha (str): The commit sha.

  Returns:
    None

  """
  if not url.startswith('https://'):
    raise ValueError(f"Invalid url: {url}.\nMake sure the url is valid.\nIt should start with \'https://\'")
  # Clone the repository (This will clone the default branch)
  os.makedirs(os.path.join(ROOT_PATH, save_folder, output_dir), exist_ok=True)
  repo = Repo.clone_from(url, os.path.join(ROOT_PATH, save_folder, output_dir))
  # Checkout the specific branch
  repo.git.checkout(branch)
  # Checkout the specific commit
  repo.git.checkout(commit_sha)


def unpack_file(file_path: str, output_dir: str) -> None:
  """ Unpack the file to the specific directory.

  Args:
    file_path (str): The path of the file to be unpacked. Relative to the ROOT_PATH/save_folder.
    output_dir (str): The directory to save the unpacked files. Relative to the ROOT_PATH/save_folder.

  Returns:
    None

  """
  import shutil

  if not os.path.exists(os.path.join(ROOT_PATH, save_folder, file_path)):
    raise ValueError(f"File not found: {os.path.join(ROOT_PATH, save_folder, file_path)}")
  file_format = '.'.join(os.path.basename(file_path).split('.')[1:])
  print(f"Unpacking file {os.path.basename(file_path)}...")

  if file_format == 'tar':
    shutil.unpack_archive(os.path.join(ROOT_PATH, save_folder, file_path), os.path.join(ROOT_PATH, save_folder, output_dir), format='tar')
  elif file_format == 'tar.gz':
    shutil.unpack_archive(os.path.join(ROOT_PATH, save_folder, file_path), os.path.join(ROOT_PATH, save_folder, output_dir), format='gztar')
  elif file_format == 'tar.xz':
    shutil.unpack_archive(os.path.join(ROOT_PATH, save_folder, file_path), os.path.join(ROOT_PATH, save_folder, output_dir), format='xztar')
  elif file_format == 'zip':
    shutil.unpack_archive(os.path.join(ROOT_PATH, save_folder, file_path), os.path.join(ROOT_PATH, save_folder, output_dir), format='zip')
  else:
    raise ValueError(f'Format {file_format} is not supported. Use .tar, .tar.gz, .tar.xz, or .zip format.')

### Change the bottom block, only

In [None]:

"""
CHANGE THIS BLOCK; to load your model so that it can be used in the next block.
Without modifying this block, you can check the result of SubmittedSystemExample.

DO NOT load checkpoints from your mounted Google drive directory; because the organizers will NOT be able to mount your drive.
You should perhaps use a shared public URL (e.g. shared Google drive, Dropbox, Zenodo link).
You can use the provided functions (google_drive_download, wget_download, git_clone_checkout, unpack_file) to download the required files.
Refer to the guidelines mentioned in the previous code block.

Any participant who submits adversarial attempt in code will be DISQUALIFIED.
  - Keep your files WITHIN 'ROOT_PATH/save_folder'.
    - DO NOT use 'ROOT_PATH/save_folder/output' folder, where the generated audio files will be saved.
    - Also, DO NOT change the working directory (e.g., os.chdir('/path/to/some/dir')) in your own code.
  - DO NOT use system commands ("! cd /root" or "os.system('cd /root')", etc.).
    - Only allowed to install required packages with "! pip required_package_name".


CAN install required packages that are not in Google Colab, using pip.
Remember that this also counts for 24hr generation time limit.
"""

### Install required packages, only if required.
# ! pip install your_package

### Import packages or code here.
import sys
sys.path.append(os.path.join(ROOT_PATH, save_folder))
# from examplePythonFile import example_function

### Download required files.
download_files_google_list = [
    # use shared links from your own google drive
    # (filename, shared_url, relative_dir)
]

download_files_wget_list = [
    # use shared links from other drives
    # (filename, shared_url, relative_dir)
]

for filename, shared_url, relative_dir in download_files_google_list:
  google_drive_download(filename, shared_url, relative_dir)
for filename, shared_url, relative_dir in download_files_wget_list:
  wget_download(filename, shared_url, relative_dir)

### If required, clone the repository and checkout the specific branch and commit.
# git_clone_checkout('path/to/github_repo', 'https://github.com/githubID/repoName.git', 'main', 'commit_sha')
### If required, unpack some files. Only '.zip', '.tar', '.tar.xz', and '.tar.gz' formats are supported.
# unpack_file('path/to/zip', 'path/to/unpack')


### Define and initialize your own model here.
# class YourOwnModel(SoundSynthesisModel):
#     def __init__(self) -> None:
#         super().__init__()
#         self.sr: int = 32000 # Official Sample Rate.
#         self.duration: int = 10 # Official Duration of the sound, in seconds.
#         self.batch_size: int = 1 # Batch size for inference. Batched Inference Recommended, to reduce the overall generation time.

#     def synthesize_sounds(self, text_prompts: List[str]) -> Dict[str, ndarray]:
#         pass

fss_model = SubmittedSystemExample()


### Any model that FAILS to run the bottom block will be DISQUALIFIED.

In [None]:

"""
TEST BLOCK

DO NOT MODIFY THIS BLOCK.

This block is used for every submission.
Make sure your model passes this block.

Test to make sure it works on someone else's Google account.
Your model MUST generate every audio within 24hrs, which is the maximum running time of Colab Pro+.
(refer to this link: https://research.google.com/colaboratory/faq.html#idle-timeouts).

Any participant's model that fails to run this block will be DISQUALIFIED.
Make sure your model cause NO ERROR while running this code block.
You MUST HEAR one of the generated sounds successfully, through ipython display widget we provided.
"""
import time
import IPython.display as ipd
import numpy as np


def check_srcs_dict(srcs_dict: Dict[str, np.ndarray], text_prompts_list: List[str], duration: int, SR: int) -> None:
    """ Check if the return value of synthesize_sounds method is valid.

    Args:
        srcs_dict (dict): the return value of synthesize_sounds method.
        text_prompts_list (list of strings): list of text prompts.
        duration (int): duration of the audio in seconds.
        SR (int): sample rate of the audio.

    Returns:
        None
    """
    assert isinstance(srcs_dict, dict), "The return value of synthesize_sounds method should be a dictionary."
    assert all(isinstance(k, str) for k in srcs_dict.keys()), "The keys of dictionary, the return value of \'synthesize_sounds\' method, should be strings (corresponding text prompt)."
    assert all(isinstance(v, np.ndarray) for v in srcs_dict.values()), "The values of dictionary, the return value of \'synthesize_sounds\' method, should be numpy arrays (audio waveform)."
    assert list(srcs_dict.keys()) == text_prompts_list, "The keys of dictionary, the return value of \'synthesize_sounds\' method, should match the input text prompts."
    for _, src in srcs_dict.items():
        assert src.ndim == 1, "The audio waveform should be mono."
        assert len(src) == int(duration * SR), "The audio waveform should be 10 seconds long."


start_time = time.time() # measure total inference time.

srcs_dict = fss_model.synthesize_sounds(text_prompts_list)
check_srcs_dict(srcs_dict, text_prompts_list, duration, SR)

os.makedirs(os.path.join(ROOT_PATH, save_folder, 'output'), exist_ok=True)
for src_text, src in tqdm.tqdm(srcs_dict.items()):
    _filepath = os.path.join(ROOT_PATH, save_folder, 'output', f"{src_text}.wav")
    src = src / np.max(np.abs(src)) # normalize the energy of the generation output
    sf.write(_filepath, src, SR, subtype='PCM_16')

inference_time = time.time() - start_time
print("Total inference time: ", inference_time)

print('Listen to the generated sound...')
print(f'- prompt: {text_prompts_list[0]}')
ipd.Audio(srcs_dict[text_prompts_list[0]], rate=SR) # listen to the generated result.

100%|██████████| 7/7 [00:00<00:00, 80.44it/s]

Total inference time:  0.11510658264160156
Listen to the generated sound...
- prompt: a buzzer is ringing with water in the background





In [None]:
"""
DO NOT MODIFY THIS BLOCK.
Before submitting your notebook, you can go to `your google drive/DCASE2024-T7` to see if the generated files are saved as expected.
"""
print('Done!')

assert submission_idx == 0

Done!


## Additional example (Baseline model)

This is the baseline model example which you could check how you can use your pre-trained module in Colab. You don't have to follow this method, but if you are confused about how to download files in Colab, please check it.

In [6]:
import os
baseline_dir:str = f'{ROOT_PATH}/baseline'
os.makedirs(baseline_dir,exist_ok=True)

NameError: name 'ROOT_PATH' is not defined

You can download the file from Google drive by sharing the link. Remember to make your link public.

In [None]:
'''
DO NOT MODIFY THIS BLOCK.
'''

import os
import gdown
from git import Repo


def check_download_file_info(filename: str, shared_url: str, relative_dir: str, url_prefix: str) -> None:
  if not shared_url.startswith(url_prefix):
    raise ValueError(f"Invalid url: {shared_url}.\nMake sure the url is valid.\nIt should start with \'{url_prefix}\'.")
  if '/' in filename:
    raise ValueError(f"Invalid filename: {filename}.\nMake sure the filename does not start with \'/\'.")
  if relative_dir.startswith('/'):
    raise ValueError(f"Invalid relative_dir: {relative_dir}.\nMake sure the relative_dir is not an absolute path.")


def google_drive_download(filename: str, shared_url: str, relative_dir: str) -> None:
  check_download_file_info(filename, shared_url, relative_dir, 'https://drive.google.com')
  os.makedirs(os.path.join(baseline_dir, relative_dir), exist_ok=True)
  print(f'Downloading \'{filename}\' from gdrive to {os.path.join(baseline_dir, relative_dir, filename)}')
  gdown.download(url=shared_url, output=os.path.join(baseline_dir, relative_dir, filename),
                 quiet=False, fuzzy=True)


def wget_download(filename: str, shared_url: str, relative_dir: str) -> None:
  check_download_file_info(filename, shared_url, relative_dir, 'https://')
  os.makedirs(os.path.join(baseline_dir, relative_dir), exist_ok=True)

  import subprocess
  from IPython.display import display, clear_output
  import time

  command = ['wget', shared_url, '-O', os.path.join(baseline_dir, relative_dir, filename), '-v']

  process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)

  while True:
    output = process.stdout.readline()
    if output == '' and process.poll() is not None:
        break
    if output:
        # Clear the previous output
        clear_output(wait=True)
        # Display the new output
        display(output.strip())
    time.sleep(0.1)  # Add a slight delay to reduce flickering

  rc = process.poll()
  if rc == 0:
      print(f"Download completed successfully (filename: {filename}).")
  else:
      print(f"Download failed with return code {rc} (filename: {filename}).")



def git_clone_checkout(output_dir: str, url: str, branch: str, commit_sha: str) -> None:
  if not url.startswith('https://'):
    raise ValueError(f"Invalid url: {url}.\nMake sure the url is valid.\nIt should start with \'https://\'")
  # Clone the repository (This will clone the default branch)
  os.makedirs(os.path.join(baseline_dir, output_dir), exist_ok=True)
  repo = Repo.clone_from(url, os.path.join(baseline_dir, output_dir))
  # Checkout the specific branch
  repo.git.checkout(branch)
  # Checkout the specific commit
  repo.git.checkout(commit_sha)


def unpack_file(file_path: str, output_dir: str) -> None:
  import shutil

  if not os.path.exists(os.path.join(baseline_dir, file_path)):
    raise ValueError(f"File not found: {os.path.join(baseline_dir, file_path)}")
  file_format = '.'.join(os.path.basename(file_path).split('.')[1:])
  print(f"Unpacking file {os.path.basename(file_path)}...")

  if file_format == 'tar':
    shutil.unpack_archive(os.path.join(baseline_dir, file_path), os.path.join(baseline_dir, output_dir), format='tar')
  elif file_format == 'tar.gz':
    shutil.unpack_archive(os.path.join(baseline_dir, file_path), os.path.join(baseline_dir, output_dir), format='gztar')
  elif file_format == 'tar.xz':
    shutil.unpack_archive(os.path.join(baseline_dir, file_path), os.path.join(baseline_dir, output_dir), format='xztar')
  elif file_format == 'zip':
    shutil.unpack_archive(os.path.join(baseline_dir, file_path), os.path.join(baseline_dir, output_dir), format='zip')
  else:
    raise ValueError(f'Format {file_format} is not supported. Use .tar, .tar.gz, .tar.xz, or .zip format.')

In [None]:
'''
DO NOT MODIFY THIS BLOCK.
'''

git_clone_checkout('./AudioLDM-training-finetuning', 'https://github.com/DCASE2024-Task7-Sound-Scene-Synthesis/AudioLDM-training-finetuning.git',
                   'main', 'a6b15e86c3d042832dee08a94beb11819b297e39')

download_files_google_list = [
    # ('checkpoints.tar', 'https://drive.google.com/file/d/1T6EnuAHIc8ioeZ9kB1OZ_WGgwXAVGOZS/view?usp=sharing',
    #  'AudioLDM-training-finetuning/data')
]

download_files_wget_list = [
    ('checkpoints.tar', 'https://www.dropbox.com/scl/fi/he6rqr24y1pc3s94lm8tc/checkpoints.tar?rlkey=5yu046f5uvdijq8eor77ej4fx&dl=0',
     'AudioLDM-training-finetuning/data'),
    ('audioldm-m-full.ckpt', 'https://zenodo.org/records/7884686/files/audioldm-m-full.ckpt',
     'AudioLDM-training-finetuning/data/checkpoints')
]

# for filename, shared_url, relative_dir in download_files_google_list:
#   google_drive_download(filename, shared_url, relative_dir)
for filename, shared_url, relative_dir in download_files_wget_list:
  wget_download(filename, shared_url, relative_dir)

unpack_file('AudioLDM-training-finetuning/data/checkpoints.tar', 'AudioLDM-training-finetuning/data')

In [None]:
'''
DO NOT MODIFY THIS BLOCK.
'''

import yaml
import torch
import importlib
import librosa
import sys
from torch.utils.data import DataLoader
from pytorch_lightning import seed_everything

sys.path.append(os.path.join(baseline_dir, 'AudioLDM-training-finetuning'))
from audioldm_train.utilities.data.dataset import AudioDataset
from audioldm_train.utilities.model_util import instantiate_from_config


def get_input_with_key(batch, k):
    fname, text, label_indices, waveform, stft, fbank = (
        batch["fname"],
        batch["text"],
        batch["label_vector"],
        batch["waveform"],
        batch["stft"],
        batch["log_mel_spec"],
    )

    ret = {}

    ret["fbank"] = (
        fbank.unsqueeze(1).to(memory_format=torch.contiguous_format).float()
    )
    ret["stft"] = stft.to(memory_format=torch.contiguous_format).float()
    ret["waveform"] = waveform.to(memory_format=torch.contiguous_format).float()
    ret["text"] = list(text)
    ret["fname"] = fname

    for key in batch.keys():
        if key not in ret.keys():
            ret[key] = batch[key]

    return ret[k]

def find_loudest_segment(audio: np.ndarray, sr: int, segment_length: int = 4, hop_length_sec: float = 2.0) -> np.ndarray:
    """
    Find the loudest segment in an audio waveform using Librosa's framing and RMS features.

    Parameters:
    - audio (np.ndarray): The audio waveform as a NumPy ndarray.
    - sr (int): The sampling rate of the audio waveform in Hz.
    - segment_length (int): The length of the segment to find in whole seconds.
    - hop_length_sec (float): The hop length for segment calculation in seconds.

    Returns:
    - np.ndarray: The loudest segment of the audio waveform.
    """
    hop_length_samples = int(sr * hop_length_sec)
    frame_length_samples = int(sr * segment_length)

    rms_values = librosa.feature.rms(y=audio, frame_length=frame_length_samples, hop_length=hop_length_samples, center=False)

    max_rms_index = np.argmax(rms_values)

    start_sample = max_rms_index * hop_length_samples
    end_sample = start_sample + frame_length_samples

    loudest_segment = audio[start_sample:end_sample]

    return loudest_segment


class BaseLineModel(SoundSynthesisModel):
    def __init__(self) -> None:
        super().__init__()

        self.sr: int = 32000 # sampling rate
        self.duration: int = 4 # audio length in seconds
        self.batch_size: int = 8 # batch size in int
        self.loudest_hop_len: float = 2.0 # hop size for function 'find_loudest_segment' in seconds

        config_yaml_path = os.path.join(repo_dir,
                                        'audioldm_train/config/2023_08_23_reproduce_audioldm/audioldm_original_medium.yaml')
        reload_from_ckpt = os.path.join(repo_dir, 'data/checkpoints/audioldm-m-full.ckpt')
        self.configs = yaml.load(open(config_yaml_path, "r"), Loader=yaml.FullLoader)
        self.configs["reload_from_ckpt"] = reload_from_ckpt
        clap_ckpt_path = self.configs["model"]["params"]["cond_stage_config"]["film_clap_cond1"]["params"]["pretrained_path"]
        self.configs["model"]["params"]["cond_stage_config"]["film_clap_cond1"]["params"]["pretrained_path"] = os.path.join(
            baseline_dir,'AudioLDM-training-finetuning', clap_ckpt_path)

        if "seed" in self.configs.keys():
            seed_everything(self.configs["seed"])
        else:
            print("SEED EVERYTHING TO 0")
            seed_everything(0)
        if "precision" in self.configs.keys():
            torch.set_float32_matmul_precision(self.configs["precision"])

        self.latent_diffusion = instantiate_from_config(self.configs["model"])
        checkpoint = torch.load(self.configs["reload_from_ckpt"])
        self.latent_diffusion.load_state_dict(checkpoint["state_dict"], strict=False)

    @torch.no_grad()
    def synthesize_sounds(self, text_prompts: List[str]) -> Dict[str, ndarray]:
        audio_list:List[ndarray] = list()

        dataset_json = {"data": [{'wav': '', 'caption': caption} for caption in text_prompts]}

        if "dataloader_add_ons" in self.configs["data"].keys():
            dataloader_add_ons = self.configs["data"]["dataloader_add_ons"]
        else:
            dataloader_add_ons = []

        val_dataset = AudioDataset(
            self.configs, split="test", add_ons=dataloader_add_ons, dataset_json=dataset_json
        )
        val_loader = DataLoader(
            val_dataset,
            batch_size=self.batch_size,
        )

        guidance_scale = self.configs["model"]["params"]["evaluation_params"][
            "unconditional_guidance_scale"
        ]
        ddim_sampling_steps = self.configs["model"]["params"]["evaluation_params"][
            "ddim_sampling_steps"
        ]
        n_candidates_per_samples = self.configs["model"]["params"]["evaluation_params"][
            "n_candidates_per_samples"
        ]

        self.latent_diffusion.eval()
        self.latent_diffusion = self.latent_diffusion.cuda()

        waveforms_dict = self.generate_sample(
            val_loader,
            unconditional_guidance_scale=guidance_scale,
            ddim_steps=ddim_sampling_steps,
            n_gen=n_candidates_per_samples,
            sampling_rate=self.configs["variables"]["sampling_rate"],
        )

        return_audio_dict = {}
        for text_prompt, waveform in waveforms_dict.items():
          # resample the audio if the model doesn't output 32,000Hz waveform
          if not self.configs['variables']['sampling_rate'] == self.sr:
            waveform = librosa.resample(waveform, orig_sr=self.configs['variables']['sampling_rate'],
                                        target_sr=self.sr)
          # pad or chop the audio if the model doesn't output 10-second audio
          if len(waveform) < self.sr * self.duration:
              waveform = np.pad(waveform, (0, (self.sr * self.duration)-len(waveform)), 'constant', constant_values=0)
          elif len(waveform) > self.sr * self.duration:
              waveform = find_loudest_segment(waveform, sr=self.sr,
                                              segment_length=self.duration, hop_length_sec=self.loudest_hop_len)
          return_audio_dict[text_prompt] = waveform

        assert len(return_audio_dict) == len(text_prompts), f"return_dict {len(return_audio_dict)} prompts {len(text_prompts)}"

        return return_audio_dict

    @torch.no_grad()
    def generate_sample(
        self,
        batchs,
        ddim_steps=200,
        ddim_eta=1.0,
        x_T=None,
        n_gen=1,
        unconditional_guidance_scale=1.0,
        unconditional_conditioning=None,
        use_plms=False,
        **kwargs,
    ) -> Dict[str, np.ndarray]:
        # Generate n_gen times and select the best
        # Batch: audio, text, fnames
        assert x_T is None
        try:
            batchs = iter(batchs)
        except TypeError:
            raise ValueError("The first input argument should be an iterable object")

        if use_plms:
            assert ddim_steps is not None

        use_ddim = ddim_steps is not None

        model = self.latent_diffusion
        waveforms = {}

        with model.ema_scope("Plotting"):
            for i, batch in enumerate(batchs):
                z, c = model.get_input(
                    batch,
                    model.first_stage_key,
                    unconditional_prob_cfg=0.0,
                )

                c = model.filter_useful_cond_dict(c)

                text = get_input_with_key(batch, "text")

                # Generate multiple samples
                batch_size = z.shape[0] * n_gen

                # Generate multiple samples at a time and filter out the best
                # The condition to the diffusion wrapper can have many format
                for cond_key in c.keys():
                    if isinstance(c[cond_key], list):
                        for i in range(len(c[cond_key])):
                            c[cond_key][i] = torch.cat([c[cond_key][i]] * n_gen, dim=0)
                    elif isinstance(c[cond_key], dict):
                        for k in c[cond_key].keys():
                            c[cond_key][k] = torch.cat([c[cond_key][k]] * n_gen, dim=0)
                    else:
                        c[cond_key] = torch.cat([c[cond_key]] * n_gen, dim=0)

                text = text * n_gen

                if unconditional_guidance_scale != 1.0:
                    unconditional_conditioning = {}
                    for key in model.cond_stage_model_metadata:
                        model_idx = model.cond_stage_model_metadata[key]["model_idx"]
                        unconditional_conditioning[key] = model.cond_stage_models[
                            model_idx
                        ].get_unconditional_condition(batch_size)

                fnames = list(get_input_with_key(batch, "fname"))

                samples, _ = model.sample_log(
                    cond=c,
                    batch_size=batch_size,
                    x_T=x_T,
                    ddim=use_ddim,
                    ddim_steps=ddim_steps,
                    eta=ddim_eta,
                    unconditional_guidance_scale=unconditional_guidance_scale,
                    unconditional_conditioning=unconditional_conditioning,
                    use_plms=use_plms,
                )

                mel = model.decode_first_stage(samples)

                waveform = model.mel_spectrogram_to_waveform(
                    mel, bs=None, name=fnames, save=False
                )

                if n_gen > 1:
                    try:
                        best_index = []
                        similarity = model.clap.cos_similarity(
                            torch.FloatTensor(waveform).squeeze(1), text
                        )
                        for i in range(z.shape[0]):
                            candidates = similarity[i :: z.shape[0]]
                            max_index = torch.argmax(candidates).item()
                            best_index.append(i + max_index * z.shape[0])

                        waveform = waveform[best_index]

                    except Exception as e:
                        print("Warning: while calculating CLAP score (not fatal), ", e)

                text = text[:len(text)//n_gen]
                assert len(text) == waveform.shape[0], f'{len(text)}, {waveform.shape}'
                for idx, text_prompt in enumerate(text):
                  assert not waveforms.get(text_prompt, False)
                  waveforms[text_prompt] = np.squeeze(waveform[idx], axis=0)

        return waveforms


# DO NOT change the working directory in your own code.
# This is only for demonstration purpose.
repo_dir = os.path.join(baseline_dir, 'AudioLDM-training-finetuning')
os.chdir(repo_dir)

For fast inference, this code synthesizes 4 sound samples per category.

In [None]:
import time
import IPython.display as ipd
import numpy as np


def check_srcs_dict(srcs_dict: Dict[str, np.ndarray], text_prompts_list: List[str], duration: int, SR: int) -> None:
    """ Check if the return value of synthesize_sounds method is valid.

    Args:
        srcs_dict (dict): the return value of synthesize_sounds method.
        text_prompts_list (list of strings): list of text prompts.
        duration (int): duration of the audio in seconds.
        SR (int): sample rate of the audio.

    Returns:
        None
    """
    assert isinstance(srcs_dict, dict), "The return value of synthesize_sounds method should be a dictionary."
    assert all(isinstance(k, str) for k in srcs_dict.keys()), "The keys of dictionary, the return value of \'synthesize_sounds\' method, should be strings (corresponding text prompt)."
    assert all(isinstance(v, np.ndarray) for v in srcs_dict.values()), "The values of dictionary, the return value of \'synthesize_sounds\' method, should be numpy arrays (audio waveform)."
    assert list(srcs_dict.keys()) == text_prompts_list, "The keys of dictionary, the return value of \'synthesize_sounds\' method, should match the input text prompts."
    for _, src in srcs_dict.items():
        assert src.ndim == 1, "The audio waveform should be mono."
        assert len(src) == int(duration * SR), "The audio waveform should be 10 seconds long."


start_time = time.time() # measure total inference time

fss_model = BaseLineModel()

srcs_dict = fss_model.synthesize_sounds(text_prompts_list)
check_srcs_dict(srcs_dict, text_prompts_list, duration, SR)

os.makedirs(os.path.join(ROOT_PATH, baseline_dir, 'output'), exist_ok=True)
for src_text, src in tqdm.tqdm(srcs_dict.items()):
    _filepath = os.path.join(ROOT_PATH, baseline_dir, 'output', f"{src_text}.wav")
    src = src / np.max(np.abs(src)) # normalize the energy of the generation output
    sf.write(_filepath, src, SR, subtype='PCM_16')

inference_time = time.time() - start_time
print("Total inference time: ", inference_time)

print('Listen to the generated sound...')
print(f'- prompt: {text_prompts_list[0]}')
ipd.Audio(srcs_dict[text_prompts_list[0]], rate=SR) # listen to the generated result

--- End of the Template. ---