# Imports

In [3]:
import requests
import os
import shutil
import zipfile
import tensorflow as tf
import os
import numpy as np
import librosa
import glob
import datetime
import matplotlib.pyplot as plt
import soundfile as sf

# Download dataset

In [4]:
def download_ds(url, download_path, target_path):
    response = requests.get(url, stream=True)

    if response.status_code != 200:
        print(f"Failed to download file. Status code: {response.status_code}")
        return

    # with open(download_path, 'wb') as file:
    #     shutil.copyfileobj(response.raw, file)
    # print("Download completed successfully.")

    with zipfile.ZipFile(download_path, 'r') as zip_ref:
        zip_ref.extractall(target_path)
    print("Extraction completed successfully.")

    os.remove(download_path)

## Free Music Archive (FMA)

In [5]:
!wget https://os.unil.cloud.switch.ch/fma/fma_small.zip -O train_ds.zip

--2024-05-29 16:40:18--  https://os.unil.cloud.switch.ch/fma/fma_small.zip
Resolving os.unil.cloud.switch.ch (os.unil.cloud.switch.ch)... 86.119.28.16, 2001:620:5ca1:201::214
Connecting to os.unil.cloud.switch.ch (os.unil.cloud.switch.ch)|86.119.28.16|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 7679594875 (7.2G) [application/zip]
Saving to: ‘train_ds.zip’


2024-05-29 16:45:37 (23.0 MB/s) - ‘train_ds.zip’ saved [7679594875/7679594875]



In [6]:
train_ds_url = 'https://os.unil.cloud.switch.ch/fma/fma_small.zip'

download_path = 'train_ds.zip'

train_ds_path = 'train_ds'

if not os.path.exists(train_ds_path):
    download_ds(train_ds_url, download_path, train_ds_path)

KeyboardInterrupt: 

# Constants

In [36]:
BATCH_SIZE = 32
NUM_BATCHES = 4
TRAIN_DS_SIZE = NUM_BATCHES * BATCH_SIZE
EPOCHS = 2
LATENT_DIM = 128
LEARING_RATE = 0.00005
OPTIMIZER = 'adam'
LOSS = 'mse'


AUDIO_SAMPLE_RATE = 22050
N_FFT = 2048  # Define the FFT window size to reduce frequency bins
HOP_LENGTH = N_FFT // 4  # Define the hop length (adjust as needed)
TRACK_DURATION = 10 # seconds
MAX_AUDIO_LENGTH = AUDIO_SAMPLE_RATE * TRACK_DURATION
WINDOW = "hann"

# Preprocess dataset

In [8]:
def compute_spectrogram(y, sr, n_fft=N_FFT, hop_length=HOP_LENGTH):
    D = librosa.stft(y, n_fft=n_fft, hop_length=hop_length)
    S_db = librosa.amplitude_to_db(np.abs(D), ref=np.max)
    return S_db

def normalize_spectrogram(S_db):
    S_db_norm = (S_db - S_db.min()) / (S_db.max() - S_db.min())
    return S_db_norm

def pad_or_truncate_spectrogram(S_db_norm, max_length=MAX_AUDIO_LENGTH):
    current_length = S_db_norm.shape[1]
    if current_length < max_length:
        padding = max_length - current_length
        S_db_norm = np.pad(S_db_norm, ((0, 0), (0, padding)), mode='constant')
    else:
        S_db_norm = S_db_norm[:, :max_length]
    return S_db_norm

def prepare_input_for_autoencoder(S_db_norm):
    S_db_norm = np.expand_dims(S_db_norm, axis=-1)
    return S_db_norm

def load_audio_as_spectrogram(file_path):
    y, sr = librosa.load(file_path, sr=AUDIO_SAMPLE_RATE, mono=True, duration=10)  # Load only first 10 seconds
    S_db = compute_spectrogram(y, sr, hop_length=HOP_LENGTH)  # Adjust hop length to reduce time frames
    S_db_norm = normalize_spectrogram(S_db)
    S_db_norm = pad_or_truncate_spectrogram(S_db_norm)
    input_data = prepare_input_for_autoencoder(S_db_norm)
    return input_data, y  # Return both the spectrogram and the original audio

def revert_spectrogram(S_db_norm, sr=AUDIO_SAMPLE_RATE, hop_length=HOP_LENGTH):
    S_db = S_db_norm * 80 - 80  # Reverting normalization
    S = librosa.db_to_amplitude(S_db)
    y = librosa.istft(S, hop_length=hop_length)
    return y

def get_all_mp3_paths(root_dir):
    mp3_paths = glob.glob(os.path.join(root_dir, '**/*.mp3'), recursive=True)
    return mp3_paths

# Wrapper function to use with TensorFlow
def load_audio_as_spectrogram_wrapper(file_path):
    spectrogram, _ = load_audio_as_spectrogram(file_path.numpy().decode('utf-8'))
    return tf.convert_to_tensor(spectrogram, dtype=tf.float32)

def prepare_dataset(mp3_paths):
    dataset = tf.data.Dataset.from_tensor_slices(mp3_paths)
    dataset = dataset.map(lambda x: tf.py_function(load_audio_as_spectrogram_wrapper, [x], tf.float32), num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(32).prefetch(tf.data.AUTOTUNE)
    return dataset

In [9]:
mp3_paths = get_all_mp3_paths("/content/train_ds")
train_ds = prepare_dataset(mp3_paths[:TRAIN_DS_SIZE])
input_shape = (N_FFT // 2 + 1, MAX_AUDIO_LENGTH, 1)

for data in train_ds:
    print(f"Data shape: {data.shape}")
    assert data.shape == (BATCH_SIZE, *input_shape)

Data shape: (5, 513, 220500, 1)


AssertionError: 

In [37]:
def load_audio(paths: list, sample_rate: int, duration: float) -> list[np.array]:
  signals = []
  for file_path in paths:
    signal = librosa.load(file_path, sr=sample_rate, duration=duration, mono=True)[0]
    signals.append(signal)
  return signals

def generate_spectrogram(signal: np.array, hop_length: int, frame_size: int) -> np.array:
  S = np.abs(librosa.stft(signal, hop_length=hop_length, n_fft=frame_size, window=WINDOW))
  return S

def show_spectrogram(S: np.array, hop_length: int, frame_size: int):
  plt.figure()
  librosa.display.specshow(librosa.amplitude_to_db(S, ref=np.max), sr=sr, hop_length=hop_length, n_fft=frame_size, x_axis='time', y_axis='log')
  plt.colorbar(format='%+2.0f dB')
  plt.title('Spectrogram')
  plt.show()


def reconstruct_audio(S: np.array, hop_length: int, frame_size: int) -> np.array:
  y_reconstructed = librosa.istft(S, hop_length=hop_length, n_fft=frame_size, window=WINDOW)

  # Normalization
  peak_amplitude = np.max(np.abs(y_reconstructed))
  normalized_y = y_reconstructed / peak_amplitude

  return normalized_y

def save_audio(signal: np.array, output_path: str, sample_rate: int):
  sf.write(output_filename, normalized_y, sr)


# Sanity check
Lets define function for reverting spectrogram to audio and check if reverting train_ds produces input autio (module IPython.display for listening audio in notebooks).

In [38]:
import IPython.display as ipd

validation_samples = mp3_paths[:5]

validation_spectrograms = []
validation_original_audios = []

signals = load_audio(validation_samples, AUDIO_SAMPLE_RATE, TRACK_DURATION)

for signal in signals:
  S = generate_spectrogram(signal, HOP_LENGTH, N_FFT)

  reconstructed_signal = reconstruct_audio(S, HOP_LENGTH, N_FFT)

  print("Original Audio:")
  ipd.display(ipd.Audio(signal, rate=AUDIO_SAMPLE_RATE))

  print("Reconstructed Audio:")
  ipd.display(ipd.Audio(reconstructed_signal, rate=AUDIO_SAMPLE_RATE))


Original Audio:


Reconstructed Audio:


Original Audio:


Reconstructed Audio:


Original Audio:


Reconstructed Audio:


Original Audio:


Reconstructed Audio:


Original Audio:


Reconstructed Audio:


In [None]:
def revert_spectrogram(S_db_norm, sr=AUDIO_SAMPLE_RATE, hop_length=HOP_LENGTH):
    # Revert normalization
    S_db = S_db_norm * (S_db_norm.max() - S_db_norm.min()) + S_db_norm.min()
    # Convert dB-scaled spectrogram back to amplitude
    S = librosa.db_to_amplitude(S_db * 80 - 80)  # Scale back to original range
    # Inverse STFT
    y = librosa.istft(S, hop_length=hop_length)
    return y

# Define model architecture

In [None]:
def build_autoencoder(input_shape):
    inputs = tf.keras.Input(shape=input_shape)
    # Encoder
    x = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
    x = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x)
    x = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x)
    x = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x)
    encoded = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x)

    # Decoder
    x = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same')(encoded)
    x = tf.keras.layers.UpSampling2D((2, 2))(x)
    x = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = tf.keras.layers.UpSampling2D((2, 2))(x)
    x = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = tf.keras.layers.UpSampling2D((2, 2))(x)
    decoded = tf.keras.layers.Conv2D(1, (3, 3), activation='sigmoid', padding='same')(x)

    autoencoder = tf.keras.Model(inputs, decoded)
    autoencoder.compile(optimizer=OPTIMIZER, loss=LOSS)
    return autoencoder

In [None]:
autoencoder = build_autoencoder(input_shape)

In [None]:
autoencoder.fit(train_ds.map(lambda x: (x, x)), epochs=EPOCHS)

Epoch 1/2


: 

# Metrics

In [None]:
def spectral_loss(y_true, y_pred):
    spectrogram_true = tf.signal.stft(y_true, frame_length=256, frame_step=64)
    spectrogram_pred = tf.signal.stft(y_pred, frame_length=256, frame_step=64)
    magnitude_true = tf.abs(spectrogram_true)
    magnitude_pred = tf.abs(spectrogram_pred)
    return tf.reduce_mean(tf.abs(magnitude_true - magnitude_pred))

# Save model

In [None]:
current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
model_save_path = f'models/audio_autoencoder_{current_time}'
autoencoder.save(model_save_path, save_format='tf')

INFO:tensorflow:Assets written to: models/audio_autoencoder_20240522_034441\assets


INFO:tensorflow:Assets written to: models/audio_autoencoder_20240522_034441\assets
