# Noising Data to test how FAD is working

In [4]:
import os
import shutil
from tqdm import tqdm
import numpy as np
import soundfile as sf

In [10]:
def add_noise_to_waveform(data, noise_level=0.2):
    """ Add random noise to the waveform """
    noise = np.random.randn(*data.shape) * noise_level
    distorted_data = data + noise
    return distorted_data

def process_audio_files(src_directory, dst_directory):
    """ Process all .wav files in the source directory and save them to the destination directory with added noise """
    if not os.path.exists(dst_directory):
        os.makedirs(dst_directory)
    
    for filename in tqdm(os.listdir(src_directory)):
        if filename.endswith('.wav'):
            src_path = os.path.join(src_directory, filename)
            dst_path = os.path.join(dst_directory, filename)
            
            # Read audio file
            data, samplerate = sf.read(src_path)
            
            # Add noise to the audio data
            distorted_data = add_noise_to_waveform(data)
            
            # Save the distorted audio file
            sf.write(dst_path, distorted_data, samplerate)

In [11]:
# Directories
source_directory = '../cache/spotify_sleep_dataset/waveform_small'
destination_directory = '../cache/spotify_sleep_dataset/waveform_small_distorted_2'

# Process the audio files
process_audio_files(source_directory, destination_directory)

100%|██████████| 103/103 [00:18<00:00,  5.70it/s]


# Viewing Mel Specs

In [7]:
import pyarrow.parquet as pq
from datasets import load_dataset, load_from_disk
from torchvision.transforms import Compose, Normalize, ToTensor

dataset_name = '../cache/spotify_sleep_dataset/waveform_small/mel_spectrogram_256_256'
dataset_config_name = None
vae = None

dataset = load_from_disk(
    dataset_name,
    storage_options=dataset_config_name)["train"]


In [None]:
    # Determine image resolution
    resolution = dataset[0]["image"].height, dataset[0]["image"].width

    augmentations = Compose([
        ToTensor(),
        Normalize([0.5], [0.5]),
    ])

    def transforms(examples):
        if vae is not None and vqvae.config["in_channels"] == 3:
            images = [
                augmentations(image.convert("RGB"))
                for image in examples["image"]
            ]
        else:
            images = [augmentations(image) for image in examples["image"]]
        if encodings is not None:
            encoding = [encodings[file] for file in examples["audio_file"]]
            return {"input": images, "encoding": encoding}
        return {"input": images}

    dataset.set_transform(transforms)
    train_dataloader = torch.utils.data.DataLoader(
        dataset, batch_size=train_batch_size, shuffle=True)

    if encodings is not None:
        encodings = pickle.load(open(args.encodings, "rb"))


    if args.from_pretrained is not None:
        pipeline = AudioDiffusionPipeline.from_pretrained(args.from_pretrained)
        mel = pipeline.mel
        model = pipeline.unet
        if hasattr(pipeline, "vqvae"):
            vqvae = pipeline.vqvae

    else:
        if args.encodings is None:
            model = UNet2DModel(
                sample_size=resolution if vqvae is None else latent_resolution,
                in_channels=1
                if vqvae is None else vqvae.config["latent_channels"],
                out_channels=1
                if vqvae is None else vqvae.config["latent_channels"],
                layers_per_block=2,
                block_out_channels=(128, 128, 256, 256, 512, 512),
                down_block_types=(
                    "DownBlock2D",
                    "DownBlock2D",
                    "DownBlock2D",
                    "DownBlock2D",
                    "AttnDownBlock2D",
                    "DownBlock2D",
                ),
                up_block_types=(
                    "UpBlock2D",
                    "AttnUpBlock2D",
                    "UpBlock2D",
                    "UpBlock2D",
                    "UpBlock2D",
                    "UpBlock2D",
                ),
            )

        else:
            model = UNet2DConditionModel(
                sample_size=resolution if vqvae is None else latent_resolution,
                in_channels=1
                if vqvae is None else vqvae.config["latent_channels"],
                out_channels=1
                if vqvae is None else vqvae.config["latent_channels"],
                layers_per_block=2,
                block_out_channels=(128, 256, 512, 512),
                down_block_types=(
                    "CrossAttnDownBlock2D",
                    "CrossAttnDownBlock2D",
                    "CrossAttnDownBlock2D",
                    "DownBlock2D",
                ),
                up_block_types=(
                    "UpBlock2D",
                    "CrossAttnUpBlock2D",
                    "CrossAttnUpBlock2D",
                    "CrossAttnUpBlock2D",
                ),
                cross_attention_dim=list(encodings.values())[0].shape[-1],
            )
