In [17]:
import os
import numpy as np
from scipy.io import wavfile
from sklearn.model_selection import train_test_split
from keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.metrics import MeanSquaredError
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Dense
import tensorflow as tf

In [18]:
from tensorflow.keras.layers import Input,  Activation, Concatenate
from tensorflow.keras.activations import sigmoid, relu
from tensorflow.keras.layers import Conv1D, LeakyReLU, Concatenate, Multiply, Add, Input
from tensorflow.keras.models import Model

def scale_up_plus_model(input_shape=(None, 1), output_units=399, num_stages=3):
    # Input layer
    input_layer = Input(shape=input_shape)

    # Pre-upsampling
    pre_upsampled = Conv1D(filters=16, kernel_size=8, strides=2, padding='same')(input_layer)

    # Initialize previous stage output as pre-upsampling output
    prev_output = pre_upsampled

    # Convolutional layers
    conv_layers = []
    for i in range(num_stages):
        # Concatenate pre-upsampling output and previous stage output
        concat = Concatenate()([prev_output, pre_upsampled])

        # Conv1
        conv1 = Conv1D(filters=16, kernel_size=16, strides=1, activation=LeakyReLU(alpha=0.9), padding='same')(concat)

        # Conv2
        conv2 = Conv1D(filters=1, kernel_size=16, strides=1, padding='same')(conv1)

        # Element-wise addition of pre-upsampling output and Conv2 output
        added = Add()([pre_upsampled, conv2])

        # Conv4
        conv4 = Conv1D(filters=16, kernel_size=1, strides=1, activation='sigmoid', padding='same')(added)

        # Conv3
        conv3 = Conv1D(filters=16, kernel_size=1, strides=1, padding='same')(conv1)

        # Element-wise multiplication of Conv3 and Conv4 output
        multiplied = Multiply()([conv3, conv4])

        # Element-wise addition of Conv1 output and multiplied output
        added_final = Add()([conv1, multiplied])

        # Update previous stage output for the next stage
        prev_output = added_final

        # Output layer
        output = Conv1D(filters=output_units, kernel_size=1, strides=1, padding='same')(added_final)

        # Store the output of this stage
        conv_layers.append(output)

    # Create model
    model = Model(inputs=input_layer, outputs=conv_layers)

    return model

In [19]:
# Set the path to the VCTK dataset
dataset_path = '/kaggle/input/vctk-dataset/VCTK-Corpus/VCTK-Corpus'

# Set the path to save the trained model
save_model_path = '/kaggle/working/'

In [20]:
# Set the training parameters
batch_size = 1
epochs = 10
learning_rate = 0.0001

In [21]:
import os
from scipy.io import wavfile
import librosa

X = []
y = []

i=0
for root, dirs, files in os.walk(os.path.join(dataset_path, 'wav48')):
    if i==200:
        break;
    for file in files:
        if file.endswith('.wav'):
            file_path = os.path.join(root, file)
            if i==200:
                break;
            i+=1
            try:
                sample_rate, audio_data = wavfile.read(file_path)
            except Exception as e:
                print(f"Error loading file: {file_path}")
                print(e)
                continue

            audio_data = audio_data.astype('float32') / 32767.0  # Normalize audio data to the range [-1, 1]

            # Resample the audio data to the desired sample rate (32kHz)
            target_sample_rate = 44100  # or 48000
            resampled_data = librosa.resample(audio_data, orig_sr=sample_rate, target_sr=target_sample_rate)
    

            # Split the resampled data into low-bandwidth signal (WB) and high-bandwidth signal (SWB)
            wb_data = resampled_data[::2]
            swb_data = resampled_data

            # Append the WB and SWB data to the dataset
            X.append(wb_data)
            y.append(swb_data)

In [22]:
X = np.array(X, dtype=object)
y = np.array(y, dtype=object)

In [23]:
# Check the number of samples before splitting
if len(X) == 0 or len(y) == 0:
    raise ValueError("The dataset contains no samples.")

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.14, random_state=42)

In [24]:
print("y_test shape:", y_test.shape)
print("y_train shape:", y_train.shape)

y_test shape: (29,)
y_train shape: (171,)


In [25]:
# Preprocess target data to obtain individual class labels
unique_labels_train = np.unique([tuple(label) for label in y_train])
unique_labels_test = np.unique([tuple(label) for label in y_test])
unique_labels = np.unique(np.concatenate((unique_labels_train, unique_labels_test)))  # Obtain unique target values in both training and testing data
label_map = {label: i for i, label in enumerate(unique_labels)}  # Create a mapping from target value to label index

# Map target values to label indices in training data
y_train_preprocessed = np.array([label_map[tuple(label)] for label in y_train], dtype=object)

# Map target values to label indices in testing data
y_test_preprocessed = np.array([label_map[tuple(label)] for label in y_test], dtype=object)

# Convert the preprocessed target data to categorical format
y_train_categorical = to_categorical(y_train_preprocessed)
y_test_categorical = to_categorical(y_test_preprocessed)

In [26]:
unique_labels.shape

(200,)

In [27]:
output_units = 200  # Adjust the number of output units for wideband data
model = scale_up_plus_model(input_shape=(None, 1), output_units=output_units, num_stages=3)

In [28]:
import tensorflow as tf

def Loss_time_total(y_true, y_pred):
    weights = [1.0, 2.0, 3.0]  # Example weights for each stage (adjust as needed)
    loss = 0.0

    for i, w in enumerate(weights):
        # Calculate the time loss for each stage
        stage_loss = Loss_time(y_true[:, i], y_pred[:, i])  # Slice the tensors to get the appropriate dimensions
        weighted_loss = w * stage_loss
        loss += weighted_loss

    return loss


def Loss_frequency_total(y_true, y_pred):
    weights = [0.5, 1.0, 1.5]  # Example weights for each stage (adjust as needed)
    loss = 0.0

    for i, w in enumerate(weights):
        # Calculate the frequency loss for each stage
        stage_loss = Loss_frequency(y_true[:, i], y_pred[:, i])  # Slice the tensors to get the appropriate dimensions
        weighted_loss = w * stage_loss
        loss += weighted_loss

    return loss


def Loss_time(y_true, y_pred):
    # Custom time loss calculation
    loss = tf.reduce_mean(tf.square(y_true - y_pred))
    return loss


def Loss_frequency(y_true, y_pred):
    # Custom frequency loss calculation
    loss = tf.reduce_mean(tf.square(y_true - y_pred))
    return loss

In [29]:
# Compile the model
optimizer = Adam(learning_rate=learning_rate)
model.compile(optimizer=optimizer, loss=Loss_time, metrics=[Loss_frequency])

In [30]:
# Set up checkpoints to save the best model during training
checkpoint = ModelCheckpoint(save_model_path, monitor='val_loss', save_best_only=True, mode='min', verbose=1)
early_stopping = EarlyStopping(monitor='val_loss', patience=5, mode='min', verbose=1)

In [31]:
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Pad sequences in X_train
X_train_padded = pad_sequences(X_train, padding='post')
X_train_padded = X_train_padded.reshape(X_train_padded.shape[0], X_train_padded.shape[1], 1)

# Pad sequences in X_test
X_test_padded = pad_sequences(X_test, padding='post')
X_test_padded = X_test_padded.reshape(X_test_padded.shape[0], X_test_padded.shape[1], 1)

# Convert X_train to a TensorFlow tensor
X_train_tensor = tf.convert_to_tensor(X_train_padded, dtype=tf.float32)

# Convert y_train_categorical to a TensorFlow tensor
y_train_categorical_tensor = tf.convert_to_tensor(y_train_categorical, dtype=tf.float32)

# Convert X_test to a TensorFlow tensor
X_test_tensor = tf.convert_to_tensor(X_test_padded, dtype=tf.float32)

# Convert y_test_categorical to a TensorFlow tensor
y_test_categorical_tensor = tf.convert_to_tensor(y_test_categorical, dtype=tf.float32)

In [32]:
y_train_categorical = to_categorical(y_train_preprocessed, num_classes=output_units)
y_test_categorical = to_categorical(y_test_preprocessed, num_classes=output_units)

In [33]:
# Train the model
with tf.device(device):
    model.fit(X_train_tensor, y_train_categorical, batch_size=batch_size, epochs=epochs,
              validation_data=(X_test_tensor, y_test_categorical), callbacks=[checkpoint, early_stopping])

Epoch 1/10
Epoch 1: val_loss improved from inf to 0.01501, saving model to /kaggle/working/
Epoch 2/10
Epoch 2: val_loss did not improve from 0.01501
Epoch 3/10
Epoch 3: val_loss did not improve from 0.01501
Epoch 4/10
Epoch 4: val_loss did not improve from 0.01501
Epoch 5/10
Epoch 5: val_loss did not improve from 0.01501
Epoch 6/10
Epoch 6: val_loss did not improve from 0.01501
Epoch 6: early stopping
