In [1]:
import os
import matplotlib.pyplot as plt
import librosa
import librosa.display
import scipy.io.wavfile
import numpy as np
from keras.models import Sequential, Model
from keras.layers.convolutional import Conv2D, UpSampling2D
from keras.layers import LeakyReLU, Input, Dense, Reshape, Flatten, Dropout, Activation
from tensorflow.keras.optimizers import Adam
from numpy.random import randn
from numpy.random import choice
# datapath and save could be out of a class and just functions.

In [10]:
class wavPreprocessing:
    
    mel_spec_frame_size = 512 
    highest_db = 80.0

    def datapath_classes(datapath):
        """
        using os.scandir is faster than os.listdir because the former is a generator
        datapath:
        Returns a list of sub-folders in the data path, assuming each subfolder is a separate class. Each sub-folder
        is associated with a numeric class label, which is also returned.
        :param datapath: Main data path
        :param class_labels: labels per folder
        :return: folder path, and the corresponding numeric class label for each folder
        """                    
        subfolders = [f.path for f in os.scandir(datapath) if f.is_dir() and "MACOS" not in str(f)]  # ignoring macos on ubuntu and windows       
        return [f.path for f in os.scandir(subfolders[0]) if f.is_dir()]   # fetching path with index
    
    def wavfiles(folder):
        
        """
        Returns the list of .wav files in a directory.
        :param datapath: Directory to search for wav files in.
        :return: List of paths to wav files.
        """              
        return [f.path for f in os.scandir(folder) if f.path.endswith(".wav")]               
    
    def wav_to_melspec(wavfile, n_mels=64, plot=False):
        """
        """           
        sig, fs = librosa.load(wavfile, sr=None)

    # Normalize audio to between -1.0 and +1.0
        sig /= np.max(np.abs(sig), axis=0)
        n_fft = wavPreprocessing.mel_spec_frame_size
        hop_length = int(n_fft/2)

        if len(sig) < fs: # pad if less than a second
            shape = np.shape(sig)
            padded_array = np.zeros(fs)
            padded_array[:shape[0]] = sig
            sig = padded_array

        melspec = librosa.feature.melspectrogram(y=sig,
                                                 sr=fs,
                                                 center=True,
                                                 n_fft=n_fft,
                                                 hop_length=hop_length,
                                                 n_mels=n_mels)

        if plot:
            plt.figure(figsize=(8, 6))
            plt.xlabel('Time')
            plt.ylabel('Mel-Frequency')
            librosa.display.specshow(librosa.power_to_db(melspec, ref=np.max),
                                     y_axis='mel',
                                     fmax=fs/2,
                                     sr=fs,
                                     hop_length=hop_length,
                                     x_axis='time')
            plt.colorbar(format='%+2.0f dB')
            plt.title('Mel spectrogram')
            plt.tight_layout()
            plt.show()
            
        melspec = librosa.power_to_db(melspec, ref=1.0)
        melspec = melspec / wavPreprocessing.highest_db # scale by max dB

        return melspec, fs   
    
    def check_melspec(melspec, fs, path):
        
        if melspec.shape[1] < 64:  # the melspec size is wrong
            shape = np.shape(melspec)
            padded_array = np.zeros((shape[0],64))-1
            padded_array[0:shape[0],:shape[1]] = melspec
            melspec = padded_array
        saving = StoreMelspec(melspec, fs, path)
        saving.save_melspec()                
        return melspec

In [3]:
class ReconstructAudio(wavPreprocessing):
    def __init__(self, melspec, fs, folder, file):          
        self.melspec = melspec
        self.fs = fs
        self.folder = folder
        self.file = file        
        self.highest_db = super().highest_db  
        self.mel_spec_frame_size = super().mel_spec_frame_size        
    
    def reconstruct(self, plot=False):
        self.melspec = self.melspec[:, :-1]
        signal = self.reconstruct_wav(plot=plot)
        try:
            os.mkdir(os.path.join(self.folder, 'recon'))
        except FileExistsError:
            pass  # should it be a return None? or do we want it to still run?
        recon_wav_filename = (os.path.join(self.folder, 'recon', self.file))        
        #scipy.io.wavfile.write(recon_wav_filename, self.fs, signal)
        scipy.io.wavfile.write(recon_wav_filename, self.fs, signal.astype(np.float32)) #  must state type to read on windows
    def reconstruct_wav(self, plot):
        """
        Given a mel-spectrogram, and a target sampling frequency, reconstructs audio.
        :param melspec: Mel-spectrogram, np.ndarray [shape=(n_mels, t)]
        :param fs: Sampling frequency
        :param plot: Flag to either plot the wav or not.
        :return: The reconstructed raw samples of an audio signal.
        """        
    #convert normed image back to dB range
        self.melspec *= self.highest_db

    # convert db back to power
        self.melspec = librosa.db_to_power(self.melspec)

        sig = librosa.feature.inverse.mel_to_audio(M=self.melspec,
                                                   sr=self.fs,
                                                   center=True,
                                                   n_fft=self.mel_spec_frame_size,
                                                   hop_length=int(self.mel_spec_frame_size/2))

    # Normalize audio to between -1.0 and +1.0
        sig /= np.max(np.abs(sig), axis=0)

        if len(sig) < self.fs: # pad if less than a second
            shape = np.shape(sig)
            padded_array = np.zeros(self.fs)
            padded_array[:shape[0]] = sig
            sig = padded_array

        if plot:
            plt.figure(figsize=(10, 4))
            plt.xlabel('Sample')
            plt.ylabel('Amplitude')
            plt.plot(sig)
            plt.title('Waveform')
            plt.tight_layout()
            plt.show()

        return sig
    

In [4]:
class StoreMelspec:
    
    def __init__(self, melspec, fs, path):
        self.path = path
        self.melspec = melspec
        self.fs = fs
    
    def save_melspec(self):    

                    
        melspec_filename = (self.path.replace('.wav', '.mel'))        
        np.savez(melspec_filename, melspec=self.melspec, fs=self.fs)

        melspec_image_filename = (self.path.replace('.wav', '.png'))
        fig = plt.figure(figsize=(10, 4))
        plt.imshow(self.melspec, origin='lower')
        plt.tight_layout()
        self.save_image(filepath=melspec_image_filename, fig=fig)
        plt.close()
        
    def save_image(self, filepath, fig=None):
        '''Save the current image with no whitespace
        Example filepath: "myfig.png" or r"C:\myfig.pdf"
        '''
        if not fig:
            fig = plt.gcf()

        plt.subplots_adjust(0,0,1,1,0,0)
        for ax in fig.axes:
            ax.axis('off')
            ax.margins(0,0)
            ax.xaxis.set_major_locator(plt.NullLocator())
            ax.yaxis.set_major_locator(plt.NullLocator())
        fig.savefig(filepath, pad_inches = 0, bbox_inches='tight')

In [5]:
def wav_preprocessing(preprocessing, path="./assignment_data"):    
    dirs = preprocessing.datapath_classes(path)    
    for d in dirs:
        wave_files = preprocessing.wavfiles(d)
        for wav in wave_files:
            melspec, fs = preprocessing.wav_to_melspec(wav)                                      
            melspec = preprocessing.check_melspec(melspec, fs, wav)                      
            audio_recons = ReconstructAudio(melspec, fs, d, os.path.normpath(os.path.basename(wav)))
            audio_recons.reconstruct()        

need_to_preprocess = None
need_to_preprocess = True
if need_to_preprocess == False:
    wav_preprocessing(wavPreprocessing)

In [6]:
def training_set_files():
    """
    Returns a list of training data files (NPZ files)
    :return:
    """
    subfolders = wavPreprocessing.datapath_classes("./assignment_data")
    return [f.path for folder in subfolders for f in os.scandir(folder) if f.path.endswith(".npz")]    # training images
        

In [7]:
train_set = training_set_files()

In [8]:
class DCGAN(StoreMelspec): 
    # https://machinelearningmastery.com/how-to-develop-a-generative-adversarial-network-for-an-mnist-handwritten-digits-from-scratch-in-keras/
    def __init__(self, training_set):
        # Input shape        
        self.img_rows = 64
        self.img_cols = 64
        self.channels = 1
        self.img_shape = (self.img_rows, self.img_cols, self.channels)
        self.latent_dim = 32
        self.trainingset = training_set

        optimizer = Adam(0.0001, 0.5)

        # Build and compile the discriminator
        self.discriminator = self.build_discriminator(depth=16)
        self.discriminator.compile(loss='binary_crossentropy',
            optimizer=optimizer,
            metrics=['accuracy'])

        # Build the generator
        self.generator = self.build_generator(depth=64)        
        # The generator takes noise as input and generates imgs
        z = Input(shape=(self.latent_dim,))    
        img = self.generator(z)

        # For the combined model we will only train the generator
        self.discriminator.trainable = False        

        # The discriminator takes generated images as input and determines validity
        # debugging  
        valid = self.discriminator(img)                
        
        # The combined model (stacked generator and discriminator)
        # Trains the generator to fool the discriminator
        self.combined = Model(z, valid)
        self.combined.compile(loss='binary_crossentropy', optimizer=optimizer)

    # generate points in latent space as input for the generator
    def generate_latent_points(self, latent_dim, n_samples):
        noise = np.random.normal(0, 1, (n_samples, self.latent_dim))
        return noise

    def build_generator(self, depth=64):
        """
        Defines a Generator network.
        :return:
        """
        model = Sequential()
        model.add(Dense(4 * 4 * int(depth*4), input_shape=(self.latent_dim,)))
        model.add(Reshape((4, 4, int(depth*4))))
        model.add(LeakyReLU(alpha=0.2))
        
        model.add(UpSampling2D())
        model.add(Conv2D(int(depth*4), kernel_size=3, strides=1, padding="same"))
        model.add(LeakyReLU(alpha=0.2))
        
        model.add(UpSampling2D())
        model.add(Conv2D(int(depth*4), kernel_size=3, strides=1, padding="same"))
        model.add(LeakyReLU(alpha=0.2))
        
        model.add(UpSampling2D())
        model.add(Conv2D(int(depth*4), kernel_size=3, strides=1, padding="same"))
        model.add(LeakyReLU(alpha=0.2))
        
        model.add(UpSampling2D())
        model.add(Conv2D(1, kernel_size=3, strides=1, padding="same"))
        model.add(Activation("tanh"))
        
        model.summary()
        noise = Input(shape=(self.latent_dim,))
        img = model(noise)
        
        return Model(noise, img)

    def build_discriminator(self, depth=16):
        """
        Defines a Discriminator network.
        :return:
        """        
        model = Sequential()
        model.add(Conv2D(int(depth), kernel_size=3, strides=1, padding="same", input_shape=self.img_shape))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Flatten())
        model.add(Dropout(0.4))
        model.add(Dense(1, activation="sigmoid"))
        model.summary()
        img = Input(shape=self.img_shape)
        validity = model(img)        
        return Model(img, validity)  # img, validity

    def train(self, epochs, batch_size=64):

        half_batch = int(batch_size/2)

        # Load the dataset and shuffle it
        X_train = np.asarray(self.trainingset)
        np.take(X_train, np.random.permutation(X_train.shape[0]), axis=0, out=X_train)

        # steps per epoch
        steps_per_epoch = int(X_train.shape[0]/half_batch)

        for epoch in range(epochs):
            for step in range(steps_per_epoch):
                # ---------------------
                #  Train Discriminator
                # ---------------------
                # Adversarial ground truths
                valid = np.ones((half_batch, 1))
                fake = np.zeros((half_batch, 1))

                # Select next batch of images (and shuffle indexes)
                idx = np.arange(step*half_batch,(step*half_batch)+half_batch)                
                np.take(idx, np.random.permutation(idx.shape[0]), axis=0, out=idx)
                imgs = []
                for file_path in X_train[idx]:                     
                    npzfile = np.load(file_path)
                    melspec = npzfile['melspec']                    
                    imgs.append(melspec)                
                imgs = np.asarray(imgs)
                imgs = np.expand_dims(imgs, axis=3)


                # Sample noise and generate a batch of new images
                noise = self.generate_latent_points(self.latent_dim, half_batch)
                gen_imgs = self.generator.predict(noise)

                # Train the discriminator (real classified as ones and generated as zeros)      
                d_loss_real, d_acc_real = self.discriminator.train_on_batch(imgs, valid)                
                d_loss_fake, d_acc_fake = self.discriminator.train_on_batch(gen_imgs, fake)
                d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
                d_acc = 0.5 * np.add(d_acc_real, d_acc_fake)


                # ---------------------
                #  Train Generator
                # ---------------------

                # Train the generator (wants discriminator to mistake images as real)
                # Sample noise and generate a batch of new images
                noise = self.generate_latent_points(self.latent_dim, batch_size)
                valid = np.ones((batch_size, 1))
                g_loss = self.combined.train_on_batch(noise, valid)

                # Plot the progress
                print("Epoch: (%d/%d) Step: (%d/%d) [D: loss_R: %f, loss_F: %f, loss: %f, acc_R: %.2f%%, acc_F: %.2f%%, acc.: %.2f%%] [G: loss: %f]"
                       % (epoch, epochs-1, step, steps_per_epoch-1, d_loss_real, d_loss_fake, d_loss, d_acc_real*100, d_acc_fake*100, d_acc*100, g_loss))

            # Save generated image samples at every epoch
            self.save_imgs(epoch)

    def save_imgs(self, epoch):
        samples = 5
        noise = self.generate_latent_points(self.latent_dim, samples)
        gen_imgs = self.generator.predict(noise)

        for sample_idx in range(0, samples):
            melspec = gen_imgs[sample_idx, :,:,0]

            fig = plt.figure(figsize=(10, 4))
            plt.imshow(melspec, origin='lower')
            plt.tight_layout()  # check where sav_image method is
            try:
                os.mkdir("images")  # develop images in a new dir? reconsider
            except FileExistsError:
                pass
            
            super().save_image(filepath=("images/epoch_%d_sample_%d.png" % (epoch,sample_idx)), fig=fig)
            #save_image(filepath=("images/epoch_%d_sample_%d.png" % (epoch,sample_idx)), fig=fig)
            plt.close()

            
            # wavwrite method handled by audio reconstruct
            audio_epoch_recons = ReconstructAudio(melspec, fs=16000, folder="images", 
                                                  file="epoch_%d_sample_%d.wav" % (epoch,sample_idx))
            audio_epoch_recons.reconstruct()




In [9]:
dcgan = DCGAN(training_set=train_set[:15])
dcgan.train(epochs=5, batch_size=8)  # 500 and 32
#train_set[:15]  CHECK LAST CELL TO INHERIT SAV METHOD AND RECONSTRUCT METHODS

2022-05-20 12:49:59.941430: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:922] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2022-05-20 12:50:00.564134: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudnn.so.8'; dlerror: libcudnn.so.8: cannot open shared object file: No such file or directory
2022-05-20 12:50:00.564160: W tensorflow/core/common_runtime/gpu/gpu_device.cc:1850] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...
2022-05-20 12:50:00.597038: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (o

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 64, 64, 16)        160       
                                                                 
 leaky_re_lu (LeakyReLU)     (None, 64, 64, 16)        0         
                                                                 
 flatten (Flatten)           (None, 65536)             0         
                                                                 
 dropout (Dropout)           (None, 65536)             0         
                                                                 
 dense (Dense)               (None, 1)                 65537     
                                                                 
Total params: 65,697
Trainable params: 65,697
Non-trainable params: 0
_________________________________________________________________
Model: "sequential_1"
______________________________