<a href="https://colab.research.google.com/github/Drausio72/Retrieval-Based-Guitar-Conversion-RGC/blob/main/RGC_Incrememental.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title **1. Mount Drive and Install Libraries**
from google.colab import drive
drive.mount('/content/drive/')
!pip install librosa tensorflow

import os
import librosa
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, Conv2DTranspose
from tensorflow.keras.models import Model
from sklearn.metrics import mean_squared_error

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [None]:
#@title **2. Define Data Paths and Model Name**
base_path = "/content/drive/MyDrive/AudioData"
clean_path = os.path.join(base_path, "Clean_Files")
distorted_path = os.path.join(base_path, "Distorted_Files")
model_name = "DI_Model"

# Validation paths
validation_clean_path = os.path.join(base_path, "Validation_Clean_Files")
validation_distorted_path = os.path.join(base_path, "Validation_Distorted_Files")

In [None]:
#@title **3. Define Utility Functions for Data Preparation and Loading**
def load_audio_files(path, sr=44100, duration=5, pad=True):
    """Load and pad/trim an audio file to the same length."""
    audio, _ = librosa.load(path, sr=sr, mono=True, duration=duration)
    if pad:
        # Pad or trim to fixed length
        audio = librosa.util.fix_length(audio, size=sr*duration)
    return audio

def audio_to_spectrogram(signal, n_fft=2048, hop_length=512):
    """Convert an audio waveform to a normalized spectrogram."""
    S = librosa.stft(signal, n_fft=n_fft, hop_length=hop_length)
    S_db = librosa.amplitude_to_db(np.abs(S), ref=np.max)
    # Normalize spectrogram to be in range [0, 1]
    S_db -= S_db.min()
    S_db /= S_db.max()
    return S_db

def get_files_from_directory(directory):
    """ List all .wav files in the specified directory """
    return [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.wav')]


In [None]:
#@title **4. Prepare Data for Training**
def prepare_data(clean_dir, distorted_dir):
    clean_files = get_files_from_directory(clean_dir)
    distorted_files = get_files_from_directory(distorted_dir)
    assert len(clean_files) == len(distorted_files), "Mismatch in file counts between clean and distorted data."

    clean_signals = [load_audio_files(f) for f in clean_files]
    distorted_signals = [load_audio_files(f) for f in distorted_files]

    X_train = np.array([audio_to_spectrogram(signal) for signal in distorted_signals])
    y_train = np.array([audio_to_spectrogram(signal) for signal in clean_signals])

    return X_train, y_train



X_train shape: (1, 1025, 431, 1)
y_train shape: (1, 1025, 431, 1)


In [None]:
#@title **5. Define Model Architecture**
def build_model(input_shape):
    inputs = Input(shape=input_shape)
    x = Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = Conv2DTranspose(64, (3, 3), activation='relu', padding='same')(x)
    outputs = Conv2D(1, (3, 3), activation='sigmoid', padding='same')(x)

    model = Model(inputs, outputs)
    return model

# Prepare initial training data
X_train, y_train = prepare_data(clean_path, distorted_path)

# Build and Compile Model**
input_shape = X_train.shape[1:]
model = build_model(input_shape)
model.compile(optimizer='adam', loss='mse')

In [None]:
#@title **6. Set up Checkpoints and Resume Training**
checkpoint_dir = f'/content/drive/MyDrive/Training/{model_name}/'
checkpoint_path = os.path.join(checkpoint_dir, "cp-{epoch:04d}.ckpt")
if not os.path.exists(checkpoint_dir):
    os.makedirs(checkpoint_dir)

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if (latest_checkpoint):
    model.load_weights(latest_checkpoint)
    print(f"Resuming training from checkpoint: {latest_checkpoint}")

cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path, save_weights_only=True, verbose=1)


Resuming training from checkpoint: /content/drive/MyDrive/Training/DI_Model/cp-1490.ckpt


In [None]:
#@title **7. Train and Save Model**
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir='./logs')
batch_size = 32  # Start with 32, consider doubling to see effects
epochs = 100  # Start with 100, use early stopping to halt if necessary
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True
)
model.fit(X_train, y_train,
          #validation_data=(X_val, y_val),
          epochs=epochs,
          batch_size=batch_size,
          callbacks=[early_stopping, tensorboard_callback])
final_model_path = f'/content/drive/MyDrive/Models/{model_name}.h5'
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)  # Adjust learning rate as needed
model.compile(optimizer=optimizer, loss='mse')
model.save(final_model_path)

print(f"Model saved to {final_model_path}")


In [None]:
#@title **8. Calculate Error Before Incremental Training**
def calculate_error(model, clean_dir, distorted_dir):
    clean_files = get_files_from_directory(clean_dir)
    distorted_files = get_files_from_directory(distorted_dir)

    clean_signals = [load_audio_files(f) for f in clean_files]
    distorted_signals = [load_audio_files(f) for f in distorted_files]

    X_val = np.array([audio_to_spectrogram(signal) for signal in distorted_signals])
    y_val = np.array([audio_to_spectrogram(signal) for signal in clean_signals])

    predictions = model.predict(X_val)
    mse = mean_squared_error(y_val.flatten(), predictions.flatten())

    return mse

# Calculate and print error before incremental training
error_before = calculate_error(model, validation_clean_path, validation_distorted_path)
print(f"Error before incremental training: {error_before}")

In [None]:
#@title **9. Incremental Training**
# Example for handling new data periodically
# This is a placeholder; in practice, schedule this as needed
new_clean_path = os.path.join(base_path, "New_Clean_Files")
new_distorted_path = os.path.join(base_path, "New_Distorted_Files")

new_X_train, new_y_train = prepare_data(new_clean_path, new_distorted_path)
model.fit(new_X_train, new_y_train, epochs=5, batch_size=10, callbacks=[cp_callback])

# Save the updated model
model.save(final_model_path)
print("Model updated with new data and saved again.")

#@title **10. Calculate Error After Incremental Training**
# Calculate and print error after incremental training
error_after = calculate_error(model, validation_clean_path, validation_distorted_path)
print(f"Error after incremental training: {error_after}")

# Calculate percentage improvement
error_improvement = ((error_before - error_after) / error_before) * 100
print(f"Error percentage improvement: {error_improvement:.2f}%")