In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import moviepy.editor as mp
import librosa
import os
import json

import tensorflow as tf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow.keras.datasets import mnist

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import (Dense, Reshape, Dropout, LeakyReLU, Flatten, Input, 
                                     BatchNormalization, Conv2D, Conv2DTranspose)
from sklearn.preprocessing import MinMaxScaler

from IPython.display import Audio, display, HTML, IFrame

from warnings import filterwarnings
filterwarnings('ignore')

class FileFormatError(IOError): pass

In [2]:
def convert_to_wav(file_name:str):
    if not os.path.isdir(os.path.join(os.getcwd(), 'tmp')):
        os.mkdir(os.path.join(os.getcwd(), 'tmp'))
    tmp_dir = os.path.join(os.getcwd(), 'tmp')
    f = file_name.replace('.mp3', '').replace('.mp4','').replace('.wmv', '') + '.wav'
    f = os.path.join(tmp_dir, f)
    if file_name.endswith('.wav'):
        return file_name
    elif file_name.endswith('.mp3'):
        audio = mp.AudioFileClip(file_name); audio.write_audiofile(f, codec='pcm_s32le')
        return f
    elif file_name.endswith('.mp4') or file_name.endswith('.wmv'):
        video = mp.VideoFileClip(file_name); video.audio.write_audiofile(f, codec='pcm_s32le')
        return f
    else:
        raise FileFormatError("Supported Formats are 'wav', 'mp3', 'mp4', 'wmv'.")

def prepare_dataset(dataset_path, json_path, sample_rate=22050, n_mfcc=13, hop_length=512, n_fft=2048, samples_to_consider=60):
    '''
        dataset_path: 
        json_path: 
        n_mfcc: 
        hop_length: 
        n_fft: 
    '''
    samples_to_consider *= sample_rate
    data = dict(mappings=[], labels=[], MFCCs=[], files=[])
    prev_i = 0
    for i, (dirpath, dirnames, filenames) in enumerate(os.walk(dataset_path)):

        for f in filenames:
            file_path = os.path.join(dirpath, f)
            file_path = convert_to_wav(file_path)
            signal, sr = librosa.load(file_path, sr=sample_rate)

            if len(signal) >= samples_to_consider:
                signal = signal[:samples_to_consider]

                MFCCs = librosa.feature.mfcc(signal, n_mfcc=n_mfcc, hop_length=hop_length, n_fft=n_fft)
                data['labels'].append(i-1)
                data['MFCCs'].append(MFCCs.T.tolist())
                data['files'].append(file_path)
                if abs(prev_i - i) == 1:
                    print(f'{file_path}: {i-1}')
                    prev_i += 1
    with open(json_path, 'w') as fp:
        json.dump(data, fp, indent=4)
        fp.close()

def load_dataset(data_path):
    '''
        data_path: Takes in the json file path
    '''
    with open(data_path, 'r') as fp:
        data = json.load(fp)
        X = np.array(data['MFCCs'])
        y = np.array(data['labels'])
        return X, y
    
def convert_to_batches(data, batch_size=10):
    dataset = tf.data.Dataset.from_tensor_slices(data).shuffle(buffer_size=100)
    dataset = dataset.batch(batch_size, drop_remainder=True).prefetch(1)
    return dataset

In [None]:
%%time
# D:\A\audio_processing\data\bed
prepare_dataset(r'D:\A\Trimester_4_Mini_Project\data\audio_dataset\karoke\english', 'music_data.json', 44100, 20, samples_to_consider=120)

In [None]:
%%time
X, y = load_dataset('music_data.json')

In [None]:
X.min(), X.max()

In [None]:
scaler = MinMaxScaler((-1, 1))
scaler.fit([[-650], [300]])
scaled_X = []
for i in range(X.shape[0]):
    scaled_X.append(scaler.transform(X[i]).tolist())
# scaled_X = scaler.transform(X.reshape(-1, 20))
# scaled_X = scaled_X.reshape(40, -1, 20)
scaled_X = np.array(scaled_X)
scaled_X.shape
X = X[..., np.newaxis]

In [None]:
scaled_X.min(), scaled_X.max()

In [None]:
X.shape, y.shape

In [None]:
X.min(), X.max()

In [None]:
def gan_model(input_shape:tuple=(10336, 13, 1)):
    generator = Sequential(name='generator')
    generator.add(Input(shape=input_shape))
#     generator.add(Flatten())
#     generator.add(Dense(10336*13))
#     generator.add(Reshape([25840, 20, 1]))
    generator.add(Conv2DTranspose(64, (3,3), padding='same'))
    generator.add(BatchNormalization())
    generator.add(Conv2DTranspose(32, (3,3), padding='same'))
    generator.add(BatchNormalization())
    generator.add(Conv2DTranspose(16, (3,3), padding='same'))
    generator.add(BatchNormalization())
    generator.add(Conv2DTranspose(8, (3,3), padding='same'))
    generator.add(BatchNormalization())
    generator.add(Conv2DTranspose(4, (3,3), padding='same'))
    generator.add(BatchNormalization())
    generator.add(Conv2DTranspose(2, (3,3), padding='same'))
    generator.add(BatchNormalization())
    generator.add(Conv2DTranspose(1, (3,3), padding='same'))
    
    discriminator = Sequential(name='discriminator')
    discriminator.add(Input(shape=input_shape))
    discriminator.add(Conv2D(64, (3,3), padding='same'))
    discriminator.add(Dropout(0.5))
    discriminator.add(Conv2D(32, (3,3), padding='same'))
    discriminator.add(Dropout(0.5))
    discriminator.add(Conv2D(16, (3,3), padding='same'))
    discriminator.add(Dropout(0.5))
    discriminator.add(Conv2D(8, (3,3), padding='same'))
    discriminator.add(Dropout(0.5))
    discriminator.add(Conv2D(4, (3,3), padding='same'))
    discriminator.add(Dropout(0.5))
    discriminator.add(Conv2D(2, (3,3), padding='same'))
    discriminator.add(Dropout(0.5))
    discriminator.add(Conv2D(1, (3,3), padding='same'))
    discriminator.add(Dropout(0.5))
    discriminator.add(Flatten())
    discriminator.add(Dense(1, activation='sigmoid'))
    discriminator.compile(loss='binary_crossentropy', optimizer='adam')
    discriminator.trainable = False
    
    GAN = Sequential([generator, discriminator], name='GAN')
    GAN.compile(loss='binary_crossentropy', optimizer='adam')
    GAN.layers[0].summary()
    GAN.layers[1].summary()
    return GAN

def train_gan(model, epochs, dataset, batch_size=10, input_shape=[10336, 20, 1]):
    generator, discriminator = model.layers

    for epoch in range(epochs):
        print(f'Currently on Epoch {epoch+1}')

        for i, X_batch in enumerate(dataset):
            i += 1
            if i % 100 == 0:
                print(f'\t Currently on batch number {i} of {len(data) // batch_size}')

            # DISCRIMINATOR Training Phase
            noise = tf.random.normal(shape=[batch_size] + input_shape)
            gen_audio = generator(noise)
            X_fake_vs_real = tf.concat([gen_audio, tf.dtypes.cast(X_batch, tf.float32)], axis=0)
            y1 = tf.constant([[0.0]]*batch_size + [[1.0]]*batch_size)
            discriminator.trainable = True
            discriminator.train_on_batch(X_fake_vs_real, y1)

            # GENERATOR Training Phase
            noise = tf.random.normal(shape=[batch_size] + input_shape)
            y2 = tf.constant([[1.0]] * batch_size)
            discriminator.trainable = False
            GAN.train_on_batch(noise, y2)

In [None]:
GAN = gan_model(input_shape=(10336, 20, 1))

In [None]:
%%time
data = convert_to_batches(X)
data

In [None]:
for d in data:
    print(d.shape)
    break

In [None]:
%%time
train_gan(model=GAN, epochs=5, dataset=data)

In [None]:
for audios in data:
    print(audios.shape)
    break

In [None]:
GAN.layers[0](audios)[0].shape

In [None]:
inv_data = scaler.inverse_transform(np.array(GAN.layers[0](audios)[0]).reshape(10336, 20))

In [None]:
inv_data[inv_data == np.inf].shape

In [None]:
inv_data.shape

In [None]:
inv_data.min(), inv_data.max()

In [None]:
type(inv_data)

In [None]:
audios[0].shape

In [None]:
librosa.feature.inverse.mfcc_to_audio(np.array(GAN.layers[0](audios)[0]).reshape(10336, 20))

In [None]:
display(Audio(librosa.feature.inverse.mfcc_to_audio(np.array(GAN.layers[0](audios)[0]).reshape(10336, 20)), rate=44100))