In [3]:
#load model
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
%cd /content/drive/MyDrive/diffusion/

/content/drive/MyDrive/diffusion


In [5]:
!pip install librosa==0.8.0
!pip install tensorflow_addons

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [6]:
!pip install optuna

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [7]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import os

import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow import keras
from tensorflow.keras import layers
import math
import matplotlib.pyplot as plt
from tqdm import tqdm
import librosa 
from glob import glob
import tensorflow_hub as hub
import soundfile as sf
import optuna
import random
from functools import partial
import warnings
import IPython.display as ipd
warnings.filterwarnings('ignore')

In [8]:
tf.config.list_physical_devices()

[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU')]

In [9]:
module = hub.KerasLayer('https://tfhub.dev/google/soundstream/mel/decoder/music/1')

SAMPLE_RATE = 16000
N_FFT = 1024
HOP_LENGTH = 320
WIN_LENGTH = 640
N_MEL_CHANNELS = 128
MEL_FMIN = 0.0
MEL_FMAX = int(SAMPLE_RATE // 2)
CLIP_VALUE_MIN = 1e-5
CLIP_VALUE_MAX = 1e8

MEL_BASIS = tf.signal.linear_to_mel_weight_matrix(
    num_mel_bins=N_MEL_CHANNELS,
    num_spectrogram_bins=N_FFT // 2 + 1,
    sample_rate=SAMPLE_RATE,
    lower_edge_hertz=MEL_FMIN,
    upper_edge_hertz=MEL_FMAX)

def calculate_spectrogram(samples):
    """Calculate mel spectrogram using the parameters the model expects."""
    fft = tf.signal.stft(
      samples,
      frame_length=WIN_LENGTH,
      frame_step=HOP_LENGTH,
      fft_length=N_FFT,
      window_fn=tf.signal.hann_window,
      pad_end=True)
    fft_modulus = tf.abs(fft)

    output = tf.matmul(fft_modulus, MEL_BASIS)

    output = tf.clip_by_value(
      output,
      clip_value_min=CLIP_VALUE_MIN,
      clip_value_max=CLIP_VALUE_MAX)
    output = tf.math.log(output)
    return output


In [10]:
music_files = glob("classical/*.wav")
data, sr = librosa.load(music_files[1], sr=SAMPLE_RATE)

In [11]:
reconstructed_samples = module([calculate_spectrogram(data)])

In [12]:
def sinusoidal_embedding(x):
    embedding_min_frequency = 1.0
    embedding_max_frequency = 1000.0
    embedding_dims = 32
    frequencies = tf.exp(
        tf.linspace(
            tf.math.log(embedding_min_frequency),
            tf.math.log(embedding_max_frequency),
            embedding_dims // 2,
        )
    )
    angular_speeds = 2.0 * math.pi * frequencies
    embeddings = tf.concat(
        [tf.sin(angular_speeds * x), tf.cos(angular_speeds * x)], axis=3
    )
    return embeddings


def ResidualBlock(width):
    def apply(x):
        input_width = x.shape[3]
        if input_width == width:
            residual = x
        else:
            residual = layers.Conv2D(width, kernel_size=1)(x)
        x = layers.BatchNormalization(center=False, scale=False)(x)
        x = layers.Conv2D(
            width, kernel_size=3, padding="same", activation=keras.activations.swish
        )(x)
        x = layers.Conv2D(width, kernel_size=3, padding="same")(x)
        x = layers.Add()([x, residual])
        return x

    return apply


def DownBlock(width, block_depth):
    def apply(x):
        x, skips = x
        for _ in range(block_depth):
            x = ResidualBlock(width)(x)
            skips.append(x)
        x = layers.AveragePooling2D(pool_size=2)(x)
        return x

    return apply


def UpBlock(width, block_depth, attention=False):
    def apply(x):
        x, skips = x
        x = layers.UpSampling2D(size=2, interpolation="bilinear")(x)
        for _ in range(block_depth):
            skip = skips.pop()
            x = layers.Concatenate()([x, skip] if not attention else [
                x, skip, layers.MultiHeadAttention(
                    num_heads=4, key_dim=1, attention_axes=(1, 2)
                )(x, skip)
            ])
            x = ResidualBlock(width)(x)
        return x

    return apply


def get_network(widths, block_depth, dim1=256, dim2=128, channels=1, attention=False):
    noisy_input = keras.Input(shape=(dim1, dim2, channels))
    noise_variances = keras.Input(shape=(1, 1, 1))
    
    upsample_shape = (dim1, dim2)

    e = layers.Lambda(sinusoidal_embedding)(noise_variances)
    e = layers.UpSampling2D(size=upsample_shape, interpolation="nearest")(e)

    x = layers.Conv2D(widths[0], kernel_size=1)(noisy_input)
    x = layers.Concatenate()([x, e])

    skips = []
    for width in widths[:-1]:
        x = DownBlock(width, block_depth)([x, skips])

    for _ in range(block_depth):
        x = ResidualBlock(widths[-1])(x)

    for idx, width in enumerate(reversed(widths[:-1])):
        x = UpBlock(width, block_depth, attention=attention and idx == 0)([x, skips])

    x = layers.Conv2D(channels, kernel_size=1, kernel_initializer="zeros")(x)

    return keras.Model([noisy_input, noise_variances], x, name="residual_unet")


class DDIM(keras.Model):
    """DDIM model modified from this tutorial: https://keras.io/examples/generative/ddim/"""
    
    def __init__(self, widths, block_depth, attention=False, dim1=256, dim2=128, min_signal_rate=0.02, max_signal_rate=0.95, ema = 0.999):
        super().__init__()

        self.normalizer = layers.Normalization(axis=(2,3))
        self.network = get_network(widths, block_depth, attention=attention, dim1=dim1, dim2=dim2)
        self.ema_network = keras.models.clone_model(self.network)
        self.min_signal_rate = min_signal_rate
        self.max_signal_rate = max_signal_rate
        self.ema = ema

    def compile(self, **kwargs):
        super().compile(**kwargs)

        self.noise_loss_tracker = keras.metrics.Mean(name="noise_loss")
        self.data_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")

    @property
    def metrics(self):
        return [self.noise_loss_tracker, self.data_loss_tracker]

    def denormalize(self, data):
        data = self.normalizer.mean + data * self.normalizer.variance**0.5
        return tf.clip_by_value(data, -128.0, 128.0)

    def diffusion_schedule(self, diffusion_times):
        start_angle = tf.acos(self.max_signal_rate)
        end_angle = tf.acos(self.min_signal_rate)
        diffusion_angles = start_angle + diffusion_times * (end_angle - start_angle)
        signal_rates = tf.cos(diffusion_angles)
        noise_rates = tf.sin(diffusion_angles)
        return noise_rates, signal_rates

    def denoise(self, noisy_data, noise_rates, signal_rates, training):
        if training:
            network = self.network
        else:
            network = self.ema_network
        pred_noises = network([noisy_data, noise_rates**2], training=training)
        pred_data = (noisy_data - noise_rates * pred_noises) / signal_rates

        return pred_noises, pred_data

    def reverse_diffusion(self, initial_noise, diffusion_steps):
        num_examples = tf.shape(initial_noise)[0]
        step_size = 1.0 / diffusion_steps

        # important line:
        # at the first sampling step, the "noisy data" is pure noise
        # but its signal rate is assumed to be nonzero (min_signal_rate)
        next_noisy_data = initial_noise
        for step in tqdm(range(diffusion_steps)):
            noisy_data = next_noisy_data

            # separate the current noisy data to its components
            diffusion_times = tf.ones((num_examples, 1, 1, 1)) - step * step_size
            noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
            pred_noises, pred_data = self.denoise(
                noisy_data, noise_rates, signal_rates, training=False
            )
            # network used in eval mode

            # remix the predicted components using the next signal and noise rates
            next_diffusion_times = diffusion_times - step_size
            next_noise_rates, next_signal_rates = self.diffusion_schedule(
                next_diffusion_times
            )
            next_noisy_data = (
                next_signal_rates * pred_data + next_noise_rates * pred_noises
            )
            # this new noisy data will be used in the nexnt step

        return pred_data

    def generate(self, num_examples, shape, diffusion_steps):
        # noise -> data -> denormalized data
        initial_noise = tf.random.normal(shape=(num_examples, shape[0], shape[1], shape[2]))
        generated_data = self.reverse_diffusion(initial_noise, diffusion_steps)
        generated_data = self.denormalize(generated_data)
        return generated_data

    def train_step(self, data):
        batch_size = tf.shape(data)[0]
        # normalize data to have standard deviation of 1, like the noises
        data = self.normalizer(data, training=True)
        noises = tf.random.normal(shape=tf.shape(data))

        # sample uniform random diffusion times
        diffusion_times = tf.random.uniform(
            shape=(batch_size, 1, 1, 1), minval=0.0, maxval=1.0
        )
        noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
        # mix the data with noises accordingly
        noisy_data = signal_rates * data + noise_rates * noises

        with tf.GradientTape() as tape:
            # train the network to separate noisy data to their components
            pred_noises, pred_data = self.denoise(
                noisy_data, noise_rates, signal_rates, training=True
            )

            noise_loss = self.loss(noises, pred_noises)  # used for training
            data_loss = self.loss(data, pred_data)  # only used as metric

        gradients = tape.gradient(noise_loss, self.network.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.network.trainable_weights))

        self.noise_loss_tracker.update_state(noise_loss)
        self.data_loss_tracker.update_state(data_loss)

        # track the exponential moving averages of weights
        for weight, ema_weight in zip(self.network.weights, self.ema_network.weights):
            ema_weight.assign(self.ema * ema_weight + (1 - self.ema) * weight)

        # KID is not measured during the training phase for computational efficiency
        return {m.name: m.result() for m in self.metrics}

    def test_step(self, data):
        # normalize data to have standard deviation of 1, like the noises
        batch_size = tf.shape(data)[0]
        
        data = self.normalizer(data, training=False)
        noises = tf.random.normal(shape=tf.shape(data))

        # sample uniform random diffusion times
        diffusion_times = tf.random.uniform(
            shape=(batch_size, 1, 1, 1), minval=0.0, maxval=1.0
        )
        noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
        # mix the data with noises accordingly
        noisy_data = signal_rates * data + noise_rates * noises

        # use the network to separate noisy data to their components
        pred_noises, pred_data = self.denoise(
            noisy_data, noise_rates, signal_rates, training=False
        )

        noise_loss = self.loss(noises, pred_noises)
        data_loss = self.loss(data, pred_data)

        self.data_loss_tracker.update_state(data_loss)
        self.noise_loss_tracker.update_state(noise_loss)

        return {m.name: m.result() for m in self.metrics}

In [13]:
def load_at_interval(x, rate=10_000, feats=256, duration=3.3):
    """Load music from file at some offset. Return MDCT spectrogram of that data"""
    file = x[0].numpy().decode()
    idx = x[1].numpy()
    audio, sr = librosa.load(file, duration=duration, sr=rate, offset=idx)
    audio_fill = np.zeros(int(rate*duration), dtype=np.float32)
    audio_fill[:len(audio)] = audio
    spec = calculate_spectrogram(audio_fill)#spec = tf.signal.mdct(audio_fill, feats)
    return spec

def load_audio(x,y, rate=10_000, mdct_feats=256, duration=3.3):
    """TF function for loading MDCT spectrogram from file."""
    out = tf.py_function(lambda x,y: load_at_interval( 
        (x,y), rate=rate, feats=mdct_feats, duration=duration
    ), inp=[x,y], Tout=tf.float32)
    return out

def get_files_dataset(
        glob_location,
        total_seconds=2,
        out_len = 5.12,
        hop_size=1,
        max_feats = 2048,
        batch_size=4,
        shuffer_size=1000,
        scale=1,
        rate= 44100,#16_000 maestro, 44100
        mdct_feats=256
    ):
    """Get file dataset loader for a glob of audio files."""
    
    files = glob(
        glob_location,
        recursive=True
    )
    
    def file_list_generator():
        for _ in range(total_seconds):
            for file in files:
                yield file, _*hop_size
                
    load_fn = partial(load_audio, duration=out_len, rate=rate, mdct_feats=mdct_feats)
                
    dg =tf.data.Dataset.from_generator(file_list_generator, output_signature = (
        tf.TensorSpec(shape=(), dtype=tf.string), 
        tf.TensorSpec(shape=(), dtype=tf.int32))).shuffle(shuffer_size).map(
            load_fn, num_parallel_calls=tf.data.AUTOTUNE
        ).map(
            lambda x: tf.expand_dims(x, -1)[:max_feats, :, :]*scale
        ).map(
            lambda x: tf.ensure_shape(x, (max_feats, mdct_feats//2, 1))
        ).batch(batch_size).prefetch(tf.data.AUTOTUNE)
    
    return dg

In [14]:
# min_signal_rate = 0.02
# max_signal_rate = 0.95

# # architecture
# embedding_dims = 32
# embedding_max_frequency = 1000.0
# widths = [32, 64, 96, 128]
# block_depth = 2

# # optimization
# batch_size = 64
# ema = 0.999
# learning_rate = 1e-3
# weight_decay = 1e-4

# min_signal_rate = 0.02
# max_signal_rate = 0.95
# ema = 0.999

In [15]:
!git clone https://github.com/gudgud96/frechet-audio-distance.git

fatal: destination path 'frechet-audio-distance' already exists and is not an empty directory.


In [16]:
!pip install frechet_audio_distance

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [17]:
from frechet_audio_distance import FrechetAudioDistance

In [18]:
class DiffusionTuningObjective:  
    def __init__(self,):
      self.music_files = glob("classical/*.wav")
      self.data, self.sr = librosa.load(music_files[1], sr=SAMPLE_RATE)
      

    
    def get_params(self, trial) -> dict:
        return {
        "batch_size": trial.suggest_int("batch_size", 16, 256, 64),
        "learning_rate": trial.suggest_loguniform("learning_rate", 3e-5, 3e-1),
        "weight_decay": trial.suggest_loguniform("weight_decay", 1e-5, 1e-1),
        "ema": trial.suggest_float("ema", 0.95 , 0.99),
        "min_signal_rate": trial.suggest_float("min_signal_rate", 0.01, 0.20),
        "max_signal_rate": trial.suggest_float("max_signal_rate", 0.89, 0.98),
        "block_depth": trial.suggest_int("block_depth", 2, 4, 2)
        }
     
    def __call__(self, trial):
        params = self.get_params(trial)
        dataset = get_files_dataset(
            "classical/*.wav", 
            out_len=5.12, 
            max_feats=256, 
            total_seconds=20, 
            scale=1,
            batch_size=params['batch_size']
        )
        for test_batch in dataset.take(1):
          self.shape = test_batch.shape
        num_total_examples = (len(self.music_files) * 25) // self.shape[0]
        model = DDIM(widths = [32, 64, 96, 128], block_depth = params['block_depth'], attention=True, dim1=self.shape[1], dim2=self.shape[2],min_signal_rate= params['min_signal_rate'], max_signal_rate=params['max_signal_rate'], ema = params['ema'])
        model.normalizer.adapt(dataset, steps=100)
        model.compile(
        loss=tf.keras.losses.MSE,
        optimizer=tfa.optimizers.AdamW(
            learning_rate = 3e-4,
            weight_decay = 1e-4
        ))
        dataset = dataset.cache()
        history = model.fit(dataset.repeat(), steps_per_epoch=num_total_examples, epochs=100)
        specs = model.generate(32, self.shape[1:], 1000)

        EXPERIMENT_NAME = "test_save_model"

        model_results_path = f"generated/{EXPERIMENT_NAME}"
        #create the director if it doesnt exist already
        if os.path.exists(model_results_path) == False:
          os.mkdir(model_results_path)

        for i in range(len(specs)):
          print(i)
          sf.write(f'{model_results_path}/tone{i}.wav', module([specs[i, :, :, 0]]).numpy().T, SAMPLE_RATE)
        frechet = FrechetAudioDistance(
            use_pca=False, 
            use_activation=False,
            verbose=False
        )
        fad_score = frechet.score(model_results_path, 'classical')
        return fad_score

In [None]:
import time
start_time = time.time()
study = optuna.create_study(direction="minimize")
study.optimize(DiffusionTuningObjective(),n_trials=10)

print("Number of finished trials: {}".format(len(study.trials)))

print("Best trial:")
trial = study.best_trial

print("  Value: {}".format(trial.value))

print("  Params: ")
for key, value in trial.params.items():
    print("    {}: {}".format(key, value))
print(time.time() - start_time)

[32m[I 2022-12-14 03:40:32,713][0m A new study created in memory with name: no-name-d93604d3-eb2c-4b9f-86ba-85564f34f18f[0m


Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100