# Style Transcoders

In [1]:
# Autoreloading makes development easier
%load_ext autoreload
%autoreload 2

In [2]:
# Import libraries
import os
import sys
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow.keras as krs
import tensorflow_probability as tfp
import json
from sklearn.model_selection import train_test_split
from tools.audio_tools import read_audio, write_audio, play_audio
from tools.feature_tools import compute_mels, compute_imels, compute_mfcc, compute_imfcc, load_data, normalize_features, denormalize_features
from tools.constants import npy_classical_path, npy_jazz_path, models_path
from tools.plot_tools import make_figax, plot_history, plot_audio, plot_spectral_feature
from tools.tensorflow_tools import tune_hyperparameters, load_optimal_params

2023-12-12 19:24:41.884573: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-12-12 19:24:42.113330: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2023-12-12 19:24:42.113377: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2023-12-12 19:24:42.114631: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2023-12-12 19:24:42.217851: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-12-12 19:24:42.219852: I tensorflow/core/platform/cpu_feature_guard.cc:182] This Tens

In [9]:
import sklearn
import librosa
print(f"{np.__version__ = }")
print(f"{tf.__version__ = }")
print(f"{tfp.__version__ = }")
print(f"{sklearn.__version__ = }")
print(f"{librosa.__version__ = }")
print(__version__)

np.__version__ = '1.25.2'
tf.__version__ = '2.14.0'
tfp.__version__ = '0.22.1'
sklearn.__version__ = '1.3.1'
librosa.__version__ = '0.10.1'


NameError: name '__version__' is not defined

## Data Loading


### Load Audio & Extract Features

In [None]:
n_samples = 10
n_samples = min(n_samples, len(os.listdir(npy_classical_path)), len(os.listdir(npy_jazz_path)))
print(f"{n_samples = }")

X_c_raw = load_data(npy_classical_path, n_samples=n_samples)
X_j_raw = load_data(npy_jazz_path, n_samples=n_samples)

print(f"{X_c_raw.shape = }")
print(f"{X_j_raw.shape = }")

### Train - Validation - Test Split

In [None]:
# Fraction of data to keep apart for validation
test_size = round(0.1 * n_samples)
val_size = test_size
# Perform splits
X_c_raw_train, X_c_raw_test = train_test_split(X_c_raw, test_size=test_size, random_state=1234)
X_c_raw_train, X_c_raw_val = train_test_split(X_c_raw_train, test_size=val_size, random_state=1234)
X_j_raw_train, X_j_raw_test = train_test_split(X_j_raw, test_size=test_size, random_state=1234)
X_j_raw_train, X_j_raw_val = train_test_split(X_j_raw_train, test_size=val_size, random_state=1234)
# Verify split shapes
print(f"{X_c_raw_train.shape = }")
print(f"{X_c_raw_test.shape = }")
print(f"{X_c_raw_val.shape = }")
print(f"{X_j_raw_train.shape = }")
print(f"{X_j_raw_test.shape = }")
print(f"{X_j_raw_val.shape = }")

### Normalize Features

In [None]:
# Perform normalization
X_c_train, X_c_val, X_c_test = normalize_features(X_c_raw_train, X_raw_val=X_c_raw_val, X_raw_test=X_c_raw_test, name="classical")
X_j_train, X_j_val, X_j_test = normalize_features(X_j_raw_train, X_raw_val=X_j_raw_val, X_raw_test=X_j_raw_test, name="jazz")

print(f"{X_c_train.shape }")
print(f"{X_c_val.shape }")
print(f"{X_c_test.shape }")
print(f"{X_j_train.shape }")
print(f"{X_j_val.shape }")
print(f"{X_j_test.shape }")

In [None]:
input_shape = X_c_train.shape[1:]

print(f"{input_shape = }")

## Style Transfer Models

### Test Script

In [None]:
# Test transcoder
def test_transcoder(transcoder, X_test=X_c_test, Y_test=X_j_test, plot_transforms=True, save_path=None):
    # Random picks
    print("Picking samples")
    X = np.expand_dims(X_test[np.random.choice(len(X_test))], axis=0)
    Y = np.expand_dims(Y_test[np.random.choice(len(Y_test))], axis=0)
    print(f"{X.shape = }")
    print(f"{Y.shape = }")
    
    # Transcode
    print("Performing transcode")
    X_hat = transcoder.selfcode(X, XtoX=True)
    Y_hat = transcoder.selfcode(Y, XtoX=False)
    Y_fake = transcoder.transcode(X, XtoY=True)
    X_fake = transcoder.transcode(Y, XtoY=False)
    
    # Reconstruct raw spectra
    print("Denormalizing")
    X_raw = denormalize_features(X, name="classical")
    X_raw_hat = denormalize_features(X_hat, name="classical")
    X_raw_fake = denormalize_features(X_fake, name="classical")
    Y_raw = denormalize_features(Y, name="jazz")
    Y_raw_hat = denormalize_features(Y_hat, name="jazz")
    Y_raw_fake = denormalize_features(Y_fake, name="jazz")
    
    # Reconstruct audio
    print("Reconstructing audio")
    sX = np.squeeze(compute_imels(np.squeeze(X_raw)))
    sX_fake = np.squeeze(compute_imels(np.squeeze(X_raw_fake)))
    sX_hat = np.squeeze(compute_imels(np.squeeze(X_raw_hat)))
    sY = np.squeeze(compute_imels(np.squeeze(Y_raw)))
    sY_fake = np.squeeze(compute_imels(np.squeeze(Y_raw_fake)))
    sY_hat = np.squeeze(compute_imels(np.squeeze(Y_raw_hat)))
    
    if save_path is not None:
        print("Saving audio")
        os.makedirs(save_path, exist_ok = True)
        audio_files = [path for path in os.listdir(save_path) if os.path.splitext(path)[-1] == ".wav"]
        
        if len(audio_files):
            audio_nrs = [int(path.split("_")[0]) for path in audio_files]
            audio_nr = max(audio_nrs) + 1
        else:
            audio_nr = 0
        
        file_name = f"{audio_nr}".zfill(4)
        write_audio(sX, os.path.join(save_path, f"{file_name}_classical.wav"))
        write_audio(sY, os.path.join(save_path, f"{file_name}_jazz.wav"))
        write_audio(sX_hat, os.path.join(save_path, f"{file_name}_classical_hat.wav"))
        write_audio(sY_hat, os.path.join(save_path, f"{file_name}_jazz_hat.wav"))
        write_audio(sX_fake, os.path.join(save_path, f"{file_name}_classical_fake.wav"))
        write_audio(sY_fake, os.path.join(save_path, f"{file_name}_jazz_fake.wav"))
    
    # Plot raw spectra
    if plot_transforms:
        print("Plotting transforms")
        fig, ax = plot_spectral_feature(X_raw)
        ax.set_title("Classical Spectrum")
        if save_path is not None:
            fig.savefig(os.path.join(save_path, f"{file_name}_classical.png"), dpi=300, facecolor="white")
        
        fig, ax = plot_spectral_feature(X_raw_hat)
        ax.set_title(r"Classical $\rightarrow$ Classical Spectrum")
        if save_path is not None:
            fig.savefig(os.path.join(save_path, f"{file_name}_classical_hat.png"), dpi=300, facecolor="white")
        
        fig, ax = plot_spectral_feature(Y_raw_fake)
        ax.set_title(r"Classical $\rightarrow$ Jazz Spectrum")
        if save_path is not None:
            fig.savefig(os.path.join(save_path, f"{file_name}_jazz_fake.png"), dpi=300, facecolor="white")

        fig, ax = plot_spectral_feature(Y_raw)
        ax.set_title("Jazz Spectrum")
        if save_path is not None:
            fig.savefig(os.path.join(save_path, f"{file_name}_jazz.png"), dpi=300, facecolor="white")
        
        fig, ax = plot_spectral_feature(Y_raw_hat)
        ax.set_title(r"Jazz $\rightarrow$ Jazz Spectrum")
        if save_path is not None:
            fig.savefig(os.path.join(save_path, f"{file_name}_jazz_hat.png"), dpi=300, facecolor="white")
        
        fig, ax = plot_spectral_feature(X_raw_fake)
        ax.set_title(r"Jazz $\rightarrow$ Classical Spectrum")
        if save_path is not None:
            fig.savefig(os.path.join(save_path, f"{file_name}_classical_fake.png"), dpi=300, facecolor="white")

        plt.show() 
    
    # Play audio
    print("Classical audio")
    player = play_audio(sX)
    print("Classical to Classical audio")
    player = play_audio(sX_hat)
    print("Classical to Jazz audio")
    player = play_audio(sY_fake)
    print()
    
    print("Jazz audio")
    player = play_audio(sY)
    print("Jazz to Jazz audio")
    player = play_audio(sY_hat)
    print("Jazz to Classical audio")
    player = play_audio(sX_fake)

### Convolutional Style Transfer

In [None]:
from models.transcoders import Conv2DTranscoder

In [None]:
default_params = {
    "feature_shape": input_shape,
    "compression": 4,
    "kernel_size": 5,
    "conv_depth": 4,
    "input_chans_multiplier": 1,
    "skip_connection": True,
    "pooling_type": "average",
    "h_reg": 1.,
    "kl_reg": 0.,
}
results_path = os.path.join("./results/ConvolutionalTranscoder")
os.makedirs(results_path, exist_ok=True)

In [None]:
# Construct model
transcoder = Conv2DTranscoder(**default_params)
transcoder.compile(optimizer="adam")

In [None]:
# Train model
checkpoint_dir = os.path.join(results_path, "checkpoints_{epoch:02d}")
checkpoint = krs.callbacks.ModelCheckpoint(
    checkpoint_dir,
    verbose=True,
    save_best_only=False,
    save_weights_only=True,
    save_freq="epoch",
)

history = transcoder.fit(X_c_train, X_j_train, epochs=50, shuffle=True, callbacks=[checkpoint], validation_data=(X_c_val, X_j_val), verbose=1)

In [None]:
# Test model
for _ in range(25):
    test_transcoder(transcoder, save_path=os.path.join(results_path, "audio", "model"))

In [None]:
# Save model
transcoder.save_weights(os.path.join(results_path, "model"))

In [None]:
# Load model
transcoder = Conv2DTranscoder(**default_params)
transcoder.load_weights(os.path.join(results_path, "model"))

In [None]:
# Test a certain checkpoint
checkpoints = [1, 2, 3, 5, 10, 20]
n_tests = 5

for checkpoint in checkpoints:
    checkpoint_name = f"checkpoints_{f'{checkpoint}'.zfill(2)}"
    transcoder = Conv2DTranscoder(**default_params)
    transcoder.load_weights(os.path.join(results_path, checkpoint_name))
    
    for _ in range(n_tests):
        test_transcoder(transcoder, save_path=os.path.join(results_path, "audio", checkpoint_name))

### Variational Style Transfer

In [None]:
from models.transcoders import VariationalTranscoder

In [None]:
default_params = {
    "feature_shape": input_shape,
    "compression": 4,
    "kernel_size": 5,
    "conv_depth": 4,
    "input_chans_multiplier": 1,
    "skip_connection": True,
    "pooling_type": "average",
    "h_reg": 1e-5,
    "kl_reg": 1e-12,
}
results_path = os.path.join("./results/VariationalTranscoder")
os.makedirs(results_path, exist_ok=True)

In [None]:
# Construct model
transcoder = VariationalTranscoder(**default_params)
transcoder.compile(optimizer="adam")

In [None]:
# Train model
checkpoint_dir = os.path.join(results_path, "checkpoints_{epoch:02d}")
checkpoint = krs.callbacks.ModelCheckpoint(
    checkpoint_dir,
    verbose=True,
    save_best_only=False,
    save_weights_only=True,
    save_freq="epoch",
)

history = transcoder.fit(X_c_train, X_j_train, epochs=50, callbacks=[checkpoint], shuffle=True, validation_data=(X_c_val, X_j_val), verbose=1)

In [None]:
# Test transcoder
for _ in range(25):
    test_transcoder(transcoder, save_path=os.path.join(results_path, "audio", "model"))

In [None]:
# Save transcoder
transcoder.save_weights(os.path.join(results_path, "model"))

In [None]:
# Load transcoder
transcoder = VariationalTranscoder(**default_params)
transcoder.load_weights(os.path.join(results_path, "model"))

In [None]:
# Test a certain checkpoint
checkpoints = [1, 2, 3, 5, 10, 20]
n_tests = 5

for checkpoint in checkpoints:
    checkpoint_name = f"checkpoints_{f'{checkpoint}'.zfill(2)}"
    transcoder = VariationalTranscoder(**default_params)
    transcoder.load_weights(os.path.join(results_path, checkpoint_name))
    
    for _ in range(n_tests):
        test_transcoder(transcoder, save_path=os.path.join(results_path, "audio", checkpoint_name))

### GAN Style Transfer

In [None]:
from models.transcoders import GANTranscoder, GANDiscriminators
from models.layers import GAN

In [None]:
def create_gan_model(**params):
    # Split generator & discriminator params
    g_params = {}
    d_params = {}
    for key, value in params.items():
        if key[:2] == "g_":
            # Generator param
            g_params[key[2:]] = value
        elif key[:2] == "d_":
            # Discriminator param
            d_params[key[2:]] = value
        else:
            # Shared param
            g_params[key] = value
            d_params[key] = value
    generator = GANTranscoder(**g_params)
    discriminator = GANDiscriminators(**d_params)
    gan = GAN(generator, discriminator)
    return gan

In [None]:
default_params = {
    "feature_shape": input_shape,
    "g_compression": 4,
    "g_kernel_size": 5,
    "g_conv_depth": 4,
    "g_input_chans_multiplier": 1,
    "g_skip_connection": True,
    "g_pooling_type": "average",
    "g_gan_reg": 0.02,
    "g_c_reg": 0.01,
    "g_s_reg": 0.01,
    "g_mode": "adain",
    "g_hidden_activation": "relu",
    "g_use_fake_style": True,
    "d_mlp_layers": 2,
    "d_conv_layers": 2,
    "d_conv_kernel_size": 3,
    "d_conv_pooling_size": 4,
    "d_conv_pooling_type": "max",
}

compile_kwargs={
    "g_optimizer": "adam",
    "d_optimizer": "adam",
}

results_path = os.path.join("./results/GANTranscoder")
os.makedirs(results_path, exist_ok=True)

In [None]:
# Build model
gan = create_gan_model(**default_params)
gan.compile(**compile_kwargs)

In [None]:
# Train model
checkpoint_dir = os.path.join(results_path, "checkpoints_{epoch:02d}")
checkpoint = krs.callbacks.ModelCheckpoint(
    checkpoint_dir,
    verbose=True,
    save_best_only=False,
    save_weights_only=True,
    save_freq="epoch",
)

history = gan.fit(X_c_train, X_j_train, epochs=50, shuffle=True, callbacks=[checkpoint], verbose=1, validation_data=(X_c_val, X_j_val))

In [None]:
# Test model
for _ in range(25):
    test_transcoder(gan.generator, save_path=os.path.join(results_path, "audio", "model"))

In [None]:
# Save transcoder
gan.generator.save_weights(os.path.join(results_path, "model"))

In [None]:
# Load transcoder
gan = create_gan_model(**default_params)
gan.generator.load_weights(os.path.join(results_path, "model"))

In [None]:
# Test a certain checkpoint
checkpoints = [1, 2, 3, 5, 10, 20]
n_tests = 5

for checkpoint in checkpoints:
    checkpoint_name = f"checkpoints_{f'{checkpoint}'.zfill(2)}"
    gan = create_gan_model(**default_params)
    gan.generator.load_weights(os.path.join(results_path, "model"))
    
    for _ in range(n_tests):
        test_transcoder(gan.generator, save_path=os.path.join(results_path, "audio", checkpoint_name))

### MUNIT Style Transfer

In [None]:
from models.transcoders import GANTranscoder, GANDiscriminators
from models.layers import GAN

In [None]:
def create_gan_model(**params):
    # Split generator & discriminator params
    g_params = {}
    d_params = {}
    for key, value in params.items():
        if key[:2] == "g_":
            # Generator param
            g_params[key[2:]] = value
        elif key[:2] == "d_":
            # Discriminator param
            d_params[key[2:]] = value
        else:
            # Shared param
            g_params[key] = value
            d_params[key] = value
    generator = GANTranscoder(**g_params)
    discriminator = GANDiscriminators(**d_params)
    gan = GAN(generator, discriminator)
    return gan

In [None]:
default_params = {
    "feature_shape": input_shape,
    "g_compression": 4,
    "g_kernel_size": 5,
    "g_conv_depth": 4,
    "g_input_chans_multiplier": 1,
    "g_skip_connection": True,
    "g_pooling_type": "average",
    "g_gan_reg": 0.02,
    "g_c_reg": 0.01,
    "g_s_reg": 0.01,
    "g_use_fake_style": True,
    "g_is_munit": True,
    "g_style_dim": 8,
    "g_adain_momentum": 0.1,
    "g_adain_epsilon": 1e-5,
    "d_mlp_layers": 2,
    "d_conv_layers": 2,
    "d_conv_kernel_size": 3,
    "d_conv_pooling_size": 4,
    "d_conv_pooling_type": "max",
}

compile_kwargs={
    "g_optimizer": "adam",
    "d_optimizer": "adam",
}

results_path = os.path.join("./results/MUNITTranscoder")
os.makedirs(results_path, exist_ok=True)

In [None]:
# Build model
gan = create_gan_model(**default_params)
gan.compile(**compile_kwargs)

In [None]:
# Train model
checkpoint_dir = os.path.join(results_path, "checkpoints_{epoch:02d}")
checkpoint = krs.callbacks.ModelCheckpoint(
    checkpoint_dir,
    verbose=True,
    save_best_only=False,
    save_weights_only=True,
    save_freq="epoch",
)

history = gan.fit(X_c_train, X_j_train, epochs=50, shuffle=True, callbacks=[checkpoint], verbose=1, validation_data=(X_c_val, X_j_val))

In [None]:
# Test model
for _ in range(25):
    test_transcoder(gan.generator, save_path=os.path.join(results_path, "audio", "model"))

In [None]:
# Save transcoder
gan.generator.save_weights(os.path.join(results_path, "model"))

In [None]:
# Load transcoder
gan = create_gan_model(**default_params)
gan.generator.load_weights(os.path.join(results_path, "model"))

In [None]:
# Test a certain checkpoint
checkpoints = [1, 2, 3, 5, 10]#, 20]
n_tests = 5

for checkpoint in checkpoints:
    checkpoint_name = f"checkpoints_{f'{checkpoint}'.zfill(2)}"
    gan = create_gan_model(**default_params)
    gan.generator.load_weights(os.path.join(results_path, "model"))
    
    for _ in range(n_tests):
        test_transcoder(gan.generator, save_path=os.path.join(results_path, "audio", checkpoint_name))