<h1 style="font-family: Helvetica; font-size: 28px"> Loading Necessary Libraries</h1>

In [1]:
import os
import pickle
import librosa
from pydub import AudioSegment
import soundfile as sf
from matplotlib import pyplot as plt
import librosa.display
import numpy as np
import io
from keras.models import load_model

In [2]:
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Input, Conv2D, ReLU, BatchNormalization, \
    Flatten, Dense, Reshape, Conv2DTranspose, Activation, Lambda
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError

# Remove the following line
tf.compat.v1.disable_eager_execution()

# Code continues...


<h1 style="font_family:Helvetica; font-size:28px"> Segment Wav files</h1> 

In [3]:
def Segment(input_folder, output_folder):
    segment_length = 2.97
    hop_length = 2
    
    for filename in os.listdir(input_folder):
        if filename.endswith(".wav"):
            ## Set input and output file paths
            input_path = os.path.join(input_folder, filename)
            output_path = os.path.join(output_folder, filename)

            # Load audio file using librosa
            y, sr = librosa.load(input_path, sr=None)

            # Calculate segment frame and sample lengths
            segment_frames = int(segment_length * sr)
            hop_frames = int(hop_length * sr)
            total_frames = len(y)
            total_segments = int((total_frames - segment_frames) / hop_frames) + 1

            # Segment audio using a sliding window
            for i in range(total_segments):
                # Calculate start and end frame indices for current segment
                start_frame = i * hop_frames
                end_frame = start_frame + segment_frames

                # Extract audio segment
                y_segment = y[start_frame:end_frame]

                # Set output file path for current segment
                output_segment_path = output_path.replace(".wav", f"_segment{i}.wav")

                # Save audio segment to file
                #librosa.output.write_wav(output_segment_path, y_segment, sr)
                sf.write(output_segment_path, y_segment, sr)

<h1 style="font-family: Helvetica; font-size: 28px"> Load the files</h1>

In [4]:
class Loader:

    def __init__(self, sample_rate, duration, mono):
        self.sample_rate = sample_rate
        self.duration = duration
        self.mono = mono

    def load(self, file_path):
        signal = librosa.load(file_path,
                              sr=self.sample_rate,
                              duration=self.duration,
                              mono=self.mono)[0]
        return signal

<h1 style="font-family: Helvetica; font-size: 28px"> Padding if necessary </h1>

In [5]:
class Padder:

    def __init__(self, mode="constant"):
        self.mode = mode

    def left_pad(self, array, num_missing_items):
        padded_array = np.pad(array,
                              (num_missing_items, 0),
                              mode=self.mode)
        return padded_array

    def right_pad(self, array, num_missing_items):
        padded_array = np.pad(array,
                              (0, num_missing_items),
                              mode=self.mode)
        return padded_array


<h1 style="font-family: Helvetica; font-size: 28px">Extracting log_spectrogram</h1>

In [6]:
class LogSpectrogramExtractor:

    def __init__(self, frame_size, hop_length):
        self.frame_size = frame_size
        self.hop_length = hop_length

    def extract(self, signal):
        stft = librosa.stft(signal,
                            n_fft=self.frame_size,
                            hop_length=self.hop_length)[:-1]
        spectrogram = np.abs(stft)
        log_spectrogram = librosa.amplitude_to_db(spectrogram)
        return log_spectrogram

<h1 style="font-family: Helvetica; font-size: 28px">Min_Max Normaliser</h1> 

In [7]:
class MinMaxNormaliser:
    """MinMaxNormaliser applies min max normalisation to an array."""

    def __init__(self, min_val, max_val):
        self.min = min_val
        self.max = max_val

    def normalise(self, array):
        norm_array = (array - array.min()) / (array.max() - array.min())
        norm_array = norm_array * (self.max - self.min) + self.min
        return norm_array

    def denormalise(self, norm_array, original_min, original_max):
        array = (norm_array - self.min) / (self.max - self.min)
        array = array * (original_max - original_min) + original_min
        return array


<h1 style="font-family: Helvetica; font-size: 28px">Saver</h1>  

In [8]:
class Saver:

    def __init__(self, feature_save_dir, min_max_values_save_dir):
        self.feature_save_dir = feature_save_dir
        self.min_max_values_save_dir = min_max_values_save_dir

    def save_feature(self, feature, file_path):
        save_path = self._generate_save_path(file_path)
        np.save(save_path, feature)
        return file_path
    def save_min_max_values(self, min_max_values):
        save_path = os.path.join(self.min_max_values_save_dir,
                                 "min_max_values.pkl")
        self._save(min_max_values, save_path)

    @staticmethod
    def _save(data, save_path):
        with open(save_path, "wb") as f:
            pickle.dump(data, f)

    def _generate_save_path(self, file_path):
        file_name = os.path.split(file_path)[1]
        save_path = os.path.join(self.feature_save_dir, file_name + ".npy")
        return save_path


<h1 style="font-family: Helvetica; font-size: 28px">Extracting log_spectrogram</h1>

In [10]:
class PreprocessingPipeline:
    
    def __init__(self):
        #self.padder = None
        self.extractor = None
        self.normaliser = None
        self.saver = None
        self.min_max_values = {}
        self._loader = None
        self._num_expected_samples = None

    @property
    def loader(self):
        return self._loader

    @loader.setter
    def loader(self, loader):
        self._loader = loader
        self._num_expected_samples = int(loader.sample_rate * loader.duration)

    def process(self, audio_files_dir):
        for root, _, files in os.walk(audio_files_dir):
            for file in files:
                file_path = os.path.join(root, file)
                self._process_file(file_path)
                print(f"Processed file {file_path}")
        self.saver.save_min_max_values(self.min_max_values)

    def _process_file(self, file_path):
        signal = self.loader.load(file_path)
        if self._is_padding_necessary(signal):
            signal = self._apply_padding(signal)
        feature = self.extractor.extract(signal)
        norm_feature = self.normaliser.normalise(feature)
        save_path = self.saver.save_feature(norm_feature, file_path)
        self._store_min_max_value(save_path, feature.min(), feature.max())

    def _is_padding_necessary(self, signal):
        if len(signal) < self._num_expected_samples:
            return True
        return False

    def _apply_padding(self, signal):
        num_missing_samples = self._num_expected_samples - len(signal)
        padded_signal = self.padder.right_pad(signal, num_missing_samples)
        return padded_signal

    def _store_min_max_value(self, save_path, min_val, max_val):
        self.min_max_values[save_path] = {
            "min": min_val,
            "max": max_val
        }

<h1 style="font-family: Helvetica; font-size: 28px">Call Segement Function</h1>

In [11]:
input_folder = r"E:\E just\Spring 3rd Year\PBL\generation\Generation by one genre\class wav"
seg_out = r"E:\E just\Spring 3rd Year\PBL\generation\Generation by one genre\wav new shape"
#Segment(input_folder, seg_out)

<h1 style="font-family: Helvetica; font-size: 28px"> Instantiate all objects</h1>

In [10]:
FRAME_SIZE = 512
HOP_LENGTH = 256
DURATION = 2.97 # in seconds
SAMPLE_RATE = 22050
MONO = True

SPECTROGRAMS_SAVE_DIR = r"E:\E just\Spring 3rd Year\PBL\generation\Generation by one genre\n"
MIN_MAX_VALUES_SAVE_DIR = r"E:\E just\Spring 3rd Year\PBL\generation\Generation by one genre\min_max_values"
FILES_DIR =r"E:\E just\Spring 3rd Year\PBL\generation\Generation by one genre\wav_segments"

# instantiate all objects
loader = Loader(SAMPLE_RATE, DURATION, MONO)
padder = Padder()
log_spectrogram_extractor = LogSpectrogramExtractor(FRAME_SIZE, HOP_LENGTH)
min_max_normaliser = MinMaxNormaliser(0, 1)
saver = Saver(SPECTROGRAMS_SAVE_DIR, MIN_MAX_VALUES_SAVE_DIR)

preprocessing_pipeline = PreprocessingPipeline()
preprocessing_pipeline.loader = loader
preprocessing_pipeline.padder = padder
preprocessing_pipeline.extractor = log_spectrogram_extractor
preprocessing_pipeline.normaliser = min_max_normaliser
preprocessing_pipeline.saver = saver

preprocessing_pipeline.process(FILES_DIR)

NameError: name 'SPECTROGRAMS_SAVE_DIR' is not defined

# VAE MODEL

In [23]:
from keras import backend as K

In [24]:
def _calculate_reconstruction_loss(y_target, y_predicted):
    error = y_target - y_predicted
    reconstruction_loss = K.mean(K.square(error), axis=[1, 2, 3])
    return reconstruction_loss


def calculate_kl_loss(model):
    # wrap `_calculate_kl_loss` such that it takes the model as an argument,
    # returns a function which can take arbitrary number of arguments
    # (for compatibility with `metrics` and utility in the loss function)
    # and returns the kl loss
    def _calculate_kl_loss(*args):
        kl_loss = -0.5 * K.sum(1 + model.log_variance - K.square(model.mu) -
                               K.exp(model.log_variance), axis=1)
        return kl_loss
    return _calculate_kl_loss

In [25]:
class VAE:
    """
    VAE represents a Deep Convolutional variational autoencoder architecture
    with mirrored encoder and decoder components.
    """

    def __init__(self,
                 input_shape,
                 conv_filters,
                 conv_kernels,
                 conv_strides,
                 latent_space_dim):
        self.input_shape = input_shape
        self.conv_filters = conv_filters 
        self.conv_kernels = conv_kernels 
        self.conv_strides = conv_strides 
        self.latent_space_dim = latent_space_dim 
        self.reconstruction_loss_weight = 1000000

        self.encoder = None
        self.decoder = None
        self.model = None

        self._num_conv_layers = len(conv_filters)
        self._shape_before_bottleneck = None
        self._model_input = None

        self._build()

    def summary(self):
        self.encoder.summary()
        self.decoder.summary()
        self.model.summary()

        
  

    def compile(self, learning_rate=0.0001):
        optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
        self.model.compile(optimizer=optimizer,
                           loss=self._calculate_combined_loss,
                           metrics=[_calculate_reconstruction_loss,
                                    calculate_kl_loss(self)])

    def train(self, x_train, batch_size, num_epochs, callbacks=None):
        self.model.fit(x_train,
                       x_train,
                       batch_size=batch_size,
                       epochs=num_epochs,
                       shuffle=True, callbacks=[checkpoint_callback] + callbacks)

    def save(self, save_folder="."):
        self._create_folder_if_it_doesnt_exist(save_folder)
        self._save_parameters(save_folder)
        self._save_weights(save_folder)

    def load_weights(self, weights_path):
        self.model.load_weights(weights_path)

    def reconstruct(self, images):
        latent_representations = self.encoder.predict(images)
        reconstructed_images = self.decoder.predict(latent_representations)
        return reconstructed_images, latent_representations

    @classmethod
    def load(cls, save_folder="."):
        parameters_path = os.path.join(save_folder, "parameters.pkl")
        with open(parameters_path, "rb") as f:
            parameters = pickle.load(f)
        autoencoder = VAE(*parameters)
        weights_path = os.path.join(save_folder, "weights.h5")
        autoencoder.load_weights(weights_path)
        return autoencoder

    def _calculate_combined_loss(self, y_target, y_predicted):
        reconstruction_loss = self._calculate_reconstruction_loss(y_target, y_predicted)
        kl_loss = self._calculate_kl_loss(y_target, y_predicted)
        combined_loss = self.reconstruction_loss_weight * reconstruction_loss\
        + kl_loss
        combined_loss=reconstruction_loss
        return combined_loss

    def _calculate_reconstruction_loss(self, y_target, y_predicted):
        error = y_target - y_predicted
        reconstruction_loss = K.mean(K.square(error), axis=[1, 2, 3])
        return reconstruction_loss

    def _calculate_kl_loss(self, y_target, y_predicted):
        kl_loss = -0.5 * K.sum(1 + self.log_variance - K.square(self.mu) -
                               K.exp(self.log_variance), axis=1)
        return kl_loss

    def _create_folder_if_it_doesnt_exist(self, folder):
        if not os.path.exists(folder):
            os.makedirs(folder)

    def _save_parameters(self, save_folder):
        parameters = [
            self.input_shape,
            self.conv_filters,
            self.conv_kernels,
            self.conv_strides,
            self.latent_space_dim
        ]
        save_path = os.path.join(save_folder, "parameters.pkl")
        with open(save_path, "wb") as f:
            pickle.dump(parameters, f)

    def _save_weights(self, save_folder):
        save_path = os.path.join(save_folder, "weights.h5")
        self.model.save_weights(save_path)

    def _build(self):
        self._build_encoder()
        self._build_decoder()
        self._build_autoencoder()

    def _build_autoencoder(self):
        model_input = self._model_input
        model_output = self.decoder(self.encoder(model_input))
        self.model = Model(model_input, model_output, name="autoencoder")

    def _build_decoder(self):
        decoder_input = self._add_decoder_input()
        dense_layer = self._add_dense_layer(decoder_input)
        reshape_layer = self._add_reshape_layer(dense_layer)
        conv_transpose_layers = self._add_conv_transpose_layers(reshape_layer)
        decoder_output = self._add_decoder_output(conv_transpose_layers)
        self.decoder = Model(decoder_input, decoder_output, name="decoder")

    def _add_decoder_input(self):
        return Input(shape=self.latent_space_dim, name="decoder_input")

    def _add_dense_layer(self, decoder_input):
        num_neurons = np.prod(self._shape_before_bottleneck) # [1, 2, 4] -> 8
        dense_layer = Dense(num_neurons, name="decoder_dense")(decoder_input)
        return dense_layer

    def _add_reshape_layer(self, dense_layer):
        return Reshape(self._shape_before_bottleneck)(dense_layer)

    def _add_conv_transpose_layers(self, x):
        """Add conv transpose blocks."""
        # loop through all the conv layers in reverse order and stop at the
        # first layer
        for layer_index in reversed(range(1, self._num_conv_layers)):
            x = self._add_conv_transpose_layer(layer_index, x)
        return x

    def _add_conv_transpose_layer(self, layer_index, x):
        layer_num = self._num_conv_layers - layer_index
        conv_transpose_layer = Conv2DTranspose(
            filters=self.conv_filters[layer_index],
            kernel_size=self.conv_kernels[layer_index],
            strides=self.conv_strides[layer_index],
            padding="same",
            name=f"decoder_conv_transpose_layer_{layer_num}"
        )
        x = conv_transpose_layer(x)
        x = ReLU(name=f"decoder_relu_{layer_num}")(x)
        x = BatchNormalization(name=f"decoder_bn_{layer_num}")(x)
        return x

    def _add_decoder_output(self, x):
        conv_transpose_layer = Conv2DTranspose(
            filters=1,
            kernel_size=self.conv_kernels[0],
            strides=self.conv_strides[0],
            padding="same",
            name=f"decoder_conv_transpose_layer_{self._num_conv_layers}"
        )
        x = conv_transpose_layer(x)
        output_layer = Activation("sigmoid", name="sigmoid_layer")(x)
        return output_layer

    def _build_encoder(self):
        encoder_input = self._add_encoder_input()
        conv_layers = self._add_conv_layers(encoder_input)
        bottleneck = self._add_bottleneck(conv_layers)
        self._model_input = encoder_input
        self.encoder = Model(encoder_input, bottleneck, name="encoder")

    def _add_encoder_input(self):
        return Input(shape=self.input_shape, name="encoder_input")

    def _add_conv_layers(self, encoder_input):
        """Create all convolutional blocks in encoder."""
        x = encoder_input
        for layer_index in range(self._num_conv_layers):
            x = self._add_conv_layer(layer_index, x)
        return x

    def _add_conv_layer(self, layer_index, x):
        """Add a convolutional block to a graph of layers, consisting of
        conv 2d + ReLU + batch normalization.
        """
        layer_number = layer_index + 1
        conv_layer = Conv2D(
            filters=self.conv_filters[layer_index],
            kernel_size=self.conv_kernels[layer_index],
            strides=self.conv_strides[layer_index],
            padding="same",
            name=f"encoder_conv_layer_{layer_number}"
        )
        x = conv_layer(x)
        x = ReLU(name=f"encoder_relu_{layer_number}")(x)
        x = BatchNormalization(name=f"encoder_bn_{layer_number}")(x)
        return x

    def _add_bottleneck(self, x):
        """Flatten data and add bottleneck with Guassian sampling (Dense
        layer).
        """
        self._shape_before_bottleneck = K.int_shape(x)[1:]
        x = Flatten()(x)
        self.mu = Dense(self.latent_space_dim, name="mu")(x)
        self.log_variance = Dense(self.latent_space_dim,
                                  name="log_variance")(x)

        def sample_point_from_normal_distribution(args):
            mu, log_variance = args
            epsilon = K.random_normal(shape=K.shape(self.mu), mean=0.,
                                      stddev=1.)
            sampled_point = mu + K.exp(log_variance / 2) * epsilon
            return sampled_point

        x = Lambda(sample_point_from_normal_distribution,
                   name="encoder_output")([self.mu, self.log_variance])
        return x

In [26]:
from tensorflow.keras.callbacks import ModelCheckpoint

# create a ModelCheckpoint callback to save the best model
checkpoint_callback = ModelCheckpoint(
    filepath='E:/E just/Spring 3rd Year/PBL/generation/test generation trial3/epochone/model_weights_best.h5',
    save_weights_only=True,
    monitor='loss',  # monitor validation loss to determine the best model
    save_best_only=True,  # save only the best model
    save_freq='epoch'  # save after each epoch
)


In [27]:
LEARNING_RATE = 0.00005
BATCH_SIZE = 1
EPOCHS = 6
SPECTROGRAMS_PATH = r"E:\E just\Spring 3rd Year\PBL\generation\spoken digits\100_sample_npy"


def load_fsdd(spectrograms_path):
    x_train = []
    for root, _, file_names in os.walk(spectrograms_path):
        for file_name in file_names:
            file_path = os.path.join(root, file_name)
            spectrogram = np.load(file_path)
            if np.count_nonzero(np.isnan(spectrogram)):
                print(f"Avoided")
                continue
            x_train.append(spectrogram)
    x_train = np.array(x_train)
    x_train = x_train[..., np.newaxis] # -> (3500, 256,512 , 1)
    return x_train

def train(x_train, learning_rate, batch_size, epochs, callbacks=None):
    autoencoder = VAE(
        input_shape=(256, 64, 1),
        conv_filters=(512, 256, 128, 64, 32),
        conv_kernels=(3, 3, 3, 3, 3),
        conv_strides=(1, 2, 2, 2, 1),
        latent_space_dim = 128
    )
    autoencoder.summary()
    autoencoder.compile(learning_rate)
    autoencoder.load_weights(r'D:\Education\Semester 6\CSE 321 Project Based Learning on CSE\Generation one genre\Epoch one\model_weights_best.h5')
    autoencoder.train(x_train, batch_size, epochs, callbacks=[])
    return autoencoder


In [28]:
#path = r"D:\Education\Semester 6\CSE 321 Project Based Learning on CSE\Generation one genre\x_train.npy"

In [29]:
#if __name__ == "__main__":
    #x_train = load_fsdd(SPECTROGRAMS_PATH)
    #x_train = np.load(path)
    #autoencoder = train(x_train, LEARNING_RATE, BATCH_SIZE, EPOCHS,callbacks=[])
    #autoencoder.save(r"D:\Education\Semester 6\CSE 321 Project Based Learning on CSE\Generation one genre\Epoch one)

# Post-processing 

In [30]:
import os
import pickle

import numpy as np
import soundfile as sf


In [31]:
class SoundGenerator:
    """SoundGenerator is responsible for generating audios from
    spectrograms.
    """

    def __init__(self, vae, hop_length):
        self.vae = vae
        self.hop_length = hop_length
        self._min_max_normaliser = MinMaxNormaliser(0, 1)

    def generate(self, spectrograms, min_max_values):
        generated_spectrograms, latent_representations = \
            self.vae.reconstruct(spectrograms)
        signals = self.convert_spectrograms_to_audio(generated_spectrograms, min_max_values)
        return signals, latent_representations

    def convert_spectrograms_to_audio(self, spectrograms, min_max_values):
        signals = []
        for spectrogram, min_max_value in zip(spectrograms, min_max_values):
            # reshape the log spectrogram
            log_spectrogram = spectrogram[:, :, 0]
            # apply denormalisation
            denorm_log_spec = self._min_max_normaliser.denormalise(
                log_spectrogram, min_max_value["min"], min_max_value["max"])
            # log spectrogram -> spectrogram
            spec = librosa.db_to_amplitude(denorm_log_spec)
            # apply Griffin-Lim
            signal = librosa.istft(spec, hop_length=self.hop_length)
            # append signal to "signals"
            signals.append(signal)
        return signals

In [32]:
HOP_LENGTH = 256
SAVE_DIR_ORIGINAL =r"E:\E just\Spring 3rd Year\PBL\generation\Generation by one genre\original"
SAVE_DIR_GENERATED = r"E:\E just\Spring 3rd Year\PBL\generation\Generation by one genre\generated"
MIN_MAX_VALUES_PATH = "E:\E just\Spring 3rd Year\PBL\generation\Generation by one genre\min_max_values\min_max_values.pkl"
SPECTROGRAMS_PATH =r"E:\E just\Spring 3rd Year\PBL\generation\Generation by one genre\n"

def load_fsdd(spectrograms_path):
    x_train = []
    file_paths = []
    for root, _, file_names in os.walk(spectrograms_path):
        for file_name in file_names:
            file_path = os.path.join(root, file_name)
            spectrogram = np.load(file_path) # (n_bins, n_frames, 1)
            x_train.append(spectrogram)
            file_paths.append(os.path.basename(file_path))  # Use only the file name
    x_train = np.array(x_train)
    x_train = x_train[..., np.newaxis] # -> (3000, 256, 64, 1)
    return x_train, file_paths


def select_spectrograms(spectrograms, file_paths, min_max_values, num_spectrograms=2):
    sampled_specs = []
    sampled_min_max_values = []
    
    sampled_indexes = np.random.choice(range(len(spectrograms)), num_spectrograms)
    
    for index in sampled_indexes:
        file_path = file_paths[index]
        file_path = "E:\\E just\\Spring 3rd Year\\PBL\\generation\\Generation by one genre\\n\\"  + file_path
        if file_path in min_max_values:
            sampled_specs.append(spectrograms[index])
            sampled_min_max_values.append(min_max_values[file_path])
    
    #print(file_paths)
    #print(sampled_min_max_values)
    
    return np.array(sampled_specs), sampled_min_max_values



def save_signals(signals, save_dir, sample_rate=22050):
    for i, signal in enumerate(signals):
        save_path = os.path.join(save_dir, str(i) + ".wav")
        sf.write(save_path, signal, sample_rate)



In [34]:
if __name__ == "__main__":
    
    # initialise sound generator
    vae = VAE.load(r"E:\E just\Spring 3rd Year\PBL\generation\Generation by one genre\model")
    sound_generator = SoundGenerator(vae, HOP_LENGTH)

    # load spectrograms + min max values
    with open(MIN_MAX_VALUES_PATH, "rb") as f:
        min_max_values = pickle.load(f)
    #print(min_max_values)
    specs, file_paths = load_fsdd(SPECTROGRAMS_PATH)

    # sample spectrograms + min max values
    sampled_specs, sampled_min_max_values = select_spectrograms(specs,
                                                                file_paths,
                                                                min_max_values,
                                                                5)
    
    #print(sampled_specs)
    #print(sampled_min_max_values)
    # generate audio for sampled spectrograms
    signals, _ = sound_generator.generate(sampled_specs,
                                          sampled_min_max_values)

    # convert spectrogram samples to audio
    original_signals = sound_generator.convert_spectrograms_to_audio(
        sampled_specs, sampled_min_max_values)

    # save audio signals
    save_signals(signals, SAVE_DIR_GENERATED)
    save_signals(original_signals, SAVE_DIR_ORIGINAL)

  updates=self.state_updates,
