Installation - if you're not using Colab, you will need to install `tensorflow` and `tensorflow-datasets` as well

In [None]:
!pip install git+https://github.com/am1tyadav/teal pydub -q

Restart kernel after installing packages

In [None]:
import IPython

IPython.Application.instance().kernel.do_shutdown(True)

Import TensorFlow, TFDS and Teal

In [None]:
import tensorflow as tf
import numpy as np
from tensorflow.keras import layers, models
from IPython.display import Audio
import tensorflow_datasets as tfds
import teal
import matplotlib.pyplot as plt

Download the Spoken Digit dataset

In [None]:
dataset = tfds.load("spoken_digit", data_dir="./tmp")

In [None]:
SAMPLE_RATE = 8000
DURATION = 1
SAMPLE_LEN = DURATION * SAMPLE_RATE
N_FFT = 1024
HOP_LEN = 256
N_MELS = 28


def generate_noise():
    return tf.random.uniform(
        shape=(SAMPLE_LEN, ), minval=-0.005, maxval=0.005,
        dtype=tf.float32
    )
#     f = np.random.randint(400, 800)
#     x = np.arange(SAMPLE_LEN)
#     return np.random.uniform(0.01, 0.1, size=(1, )) * np.sin(2 * np.pi * f * x / SAMPLE_RATE)


def process_example_audio(example):
    audio = example["audio"]
    audio = tf.cast(audio, dtype=tf.float32) / 32768.

    num_samples = tf.shape(audio)[0]
    
    if num_samples > SAMPLE_LEN:
        output = audio[:SAMPLE_LEN]
    else:
        # Otherwise pad audio 
        padding = SAMPLE_LEN - num_samples

        if padding == 0:
            output = audio
        else:
            output = tf.pad(audio, [[0, padding]])

    noisy = output + generate_noise()
    return output, noisy

In [None]:
BATCH_SIZE = 5
TOTAL_EXAMPLES = len(dataset["train"])
NUM_TRAIN = int(0.7 * TOTAL_EXAMPLES)
NUM_VALID = TOTAL_EXAMPLES - NUM_TRAIN


dataset = dataset["train"]
dataset = dataset.map(process_example_audio)

print(f"Splitting dataset into {NUM_TRAIN} training examples and {NUM_VALID} validation examples")

train_ds = dataset.take(NUM_TRAIN).batch(BATCH_SIZE)
valid_ds = dataset.skip(NUM_TRAIN).batch(BATCH_SIZE)

In [None]:
def batch_processing(clean, noisy):
    return {
        "clean": clean,
        "noisy": noisy
    }


train_ds = train_ds.map(batch_processing).cache()
valid_ds = valid_ds.map(batch_processing).cache()

In [None]:
examples = next(iter(train_ds.take(1)))

clean = examples["clean"]
noisy = examples["noisy"]

In [None]:
Audio(clean[0], rate=SAMPLE_RATE)

In [None]:
Audio(noisy[0], rate=SAMPLE_RATE)

We will create a model using `tf.keras` Functional API

In [None]:
def log_mel_model():
    _input = layers.Input(shape=(SAMPLE_LEN, ))
    
    _stft = teal.AudioToSTFT(n_fft=N_FFT, hop_length=HOP_LEN)(_input)
    _spec, _phase = teal.STFTToSpecAndPhase()(_stft)
    _mel_spec = teal.SpectrogramToMelSpec(sample_rate=SAMPLE_RATE, n_fft=N_FFT, n_mels=N_MELS)(_spec)
    _log_mel_spec = teal.PowerToDb()(_mel_spec)
    _log_mel_spec = teal.NormalizeSpectrum()(_log_mel_spec)
    
    _model = models.Model(
        _input, [_log_mel_spec, _phase],
        name="feature_model"
    )
    return _model


def create_autoencoder(feature_model):
    _input = layers.Input(shape=(SAMPLE_LEN, ))
    _log_mel_spec, _phase = feature_model(_input)
    
    # Spec Encoder
    _x = layers.Flatten()(_log_mel_spec)
    _x = layers.Dense(128, activation="tanh")(_x)
    _x = layers.LayerNormalization()(_x)
    _x = layers.Dense(8, activation="tanh")(_x)
    _x = layers.LayerNormalization()(_x)
    
    # Spec Decoder
    _x = layers.Dense(128, activation="tanh")(_x)
    _x = layers.LayerNormalization()(_x)
    _x = layers.Dense(28 * 28, activation="tanh")(_x)
    _x = layers.Reshape((28, 28))(_x)
    _pred_log_mel_spec = teal.NormalizeSpectrum()(_x)
    
    _model = models.Model(
        _input, [_pred_log_mel_spec, _phase],
        name="autoencoder"
    )
    return _model


_feature_model = log_mel_model()
_base_model = create_autoencoder(_feature_model)

In [None]:
# Loss layer
class LossLayer(layers.Layer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
    
    def call(self, inputs):
        _spec_clean, _spec_noisy = inputs
        
        cos_spec = tf.keras.losses.CosineSimilarity()(_spec_clean, _spec_noisy)
        mse_spec = tf.keras.losses.MeanSquaredError()(_spec_clean, _spec_noisy)
        
        spec_loss = 1. + cos_spec + mse_spec
        return spec_loss


# Create siamese network
_input_clean = layers.Input(shape=(SAMPLE_LEN, ), name="clean")
_input_noisy = layers.Input(shape=(SAMPLE_LEN, ), name="noisy")

_spec_clean, _ = _feature_model(_input_clean)
_spec_noisy, _ = _base_model(_input_noisy)

_loss = LossLayer()([_spec_clean, _spec_noisy])

_siamese_net = models.Model(
    [_input_clean, _input_noisy],
    [_spec_clean, _spec_noisy],
    name="siamese_net"
)

_siamese_net.add_loss(_loss)
_siamese_net.compile(optimizer="adam")

_siamese_net.summary()

In [None]:
_ = _siamese_net.fit(
    train_ds,
    validation_data=valid_ds,
    epochs=10,
    callbacks=[
        tf.keras.callbacks.ReduceLROnPlateau(patience=3, factor=0.8),
    ]
)

In [None]:
def invert_log_mel_to_audio(log_mel_spec, phase):
    _mel_spec = teal.DbToPower()(log_mel_spec)
    _spec = teal.MelSpecToSpectrogram(sample_rate=SAMPLE_RATE, n_fft=N_FFT, n_mels=N_MELS)(_mel_spec)
    _stft = teal.SpecAndPhaseToSTFT()([_spec, phase])
    _audio = teal.STFTToAudio(n_fft=N_FFT, hop_length=HOP_LEN)(_stft)
    return teal.NormalizeAudio()(_audio)

In [None]:
valid_examples = iter(valid_ds.take(5))

In [None]:
examples = next(valid_examples)

In [None]:
clean = examples["clean"]
noisy = examples["noisy"]

In [None]:
_pred_log_mel, _phase = _base_model(noisy)
_pred_audio = invert_log_mel_to_audio(_pred_log_mel, _phase)

In [None]:
index = 3

Audio(clean[index], rate=SAMPLE_RATE)

In [None]:
Audio(noisy[index], rate=SAMPLE_RATE)

In [None]:
plt.imshow(_pred_log_mel[index]);

Audio(_pred_audio[index], rate=SAMPLE_RATE)