In [13]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [1]:
#Audio-data augmentation
from audiomentations import Compose, AddGaussianNoise, TimeStretch, PitchShift, Shift
#Various audio processing functionalities
import librosa
#Just to find the ceil 
import math
#Visualization of model history
import matplotlib.pyplot as plt
#Memory maps and various computations involving arrays
import numpy as np
#File read/information retrieval
import os
#CSV file manipulation
import pandas as pd
#To shuffle and split the dataset. We use it here to split indices of memory maps.
from sklearn.model_selection import train_test_split
import tensorflow as tf
import tensorflow.keras as keras
import time
from tensorflow.python.keras.saving import hdf5_format
import h5py
import gc

##### Assumptions 
- Any class in csv file should have a viable number of songs in them. The amount of songs we take for training and testing is equivalent to the n(songs) of that class which has the minimum amount of songs -> to balance the dataset.
- Don't have variables named X,y,X_train ,X_validation,X_test,y_train,y_validation,y_test in this script since it will either get deleted or the memory maps won't get created. We are using global so warning
- Memory maps on creation have a default value of 0 array with shape we mention on creation  [To extract the actual size of added data]

In [2]:
def Augment_Song(signal,sample_rate,n_augmentations):
    signals=list()
    signals.append(signal)
    #print("Original")
    if(n_augmentations<=0):
        return signals 
    transform1 = AddGaussianNoise(min_amplitude=0.001,max_amplitude=0.015,p=1.0)
    augmented_sound = transform1(signal, sample_rate=sample_rate)
    signals.append(augmented_sound)
    #print("Gaussian Noise")
    if(n_augmentations<=1):
        return signals 
    transform2 =  TimeStretch(min_rate=0.8, max_rate=1.25, p=0.5)
    augmented_sound = transform2(signal, sample_rate=sample_rate)
    signals.append(augmented_sound)
    #print("Time Stretch")
    if(n_augmentations<=2):
        return signals 
    transform3=PitchShift(min_semitones=-4, max_semitones=4, p=0.5)
    augmented_sound=transform3(signal, sample_rate=sample_rate)
    signals.append(augmented_sound)
    #print("Pitch Shift")
    if(n_augmentations<=3):
        return signals 
    transform4=Shift(min_fraction=-0.5, max_fraction=0.5, p=0.5)
    augmented_sound=transform4(signal, sample_rate=sample_rate)
    signals.append(augmented_sound)
    #print("Shift")
    if(n_augmentations<=4):
        return signals 
    transform5= Compose([
    AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.015, p=0.5),
    TimeStretch(min_rate=0.8, max_rate=1.25, p=0.5),
    PitchShift(min_semitones=-1, max_semitones=1, p=0.5),
    Shift(min_fraction=-0.2, max_fraction=0.2, p=0.5),
    ])
    augmented_sound=transform5(signal, sample_rate=sample_rate)
    signals.append(augmented_sound)
    #print("Combined Transforms") 
    return signals

In [3]:
def create_csv(DATASET_PATH,CSV_PATH):
    """
    DATASET_PATH -> Directory which will have folders (classes) of audio files.
    CSV_PATH -> Path of csv file
    CSV file structure : ['Filename','Genre','Path']
        Filename is the name of the audio file including extension
        Genre will be the name of the parent folder of the audio file. This assumes audio files are stored in folders having their respective class (genre) name.
        Path will be the path of the audio file including the filename as well.
    """
    df=pd.DataFrame(list(),columns=["Filename","Genre","Path"])
    for genre_folder in next(os.walk(DATASET_PATH))[1]:
        for file in next(os.walk(os.path.join(DATASET_PATH,genre_folder)))[2]:
            df.loc[len(df.index)]=[file,genre_folder.title(),os.path.join(DATASET_PATH,genre_folder,file)]
    df.to_csv(CSV_PATH,index=False)
    print(f"Path of the created csv file : {CSV_PATH}. If the dataset doesn't change in the future, Pass the path of that csv file as the function parameter to avoid writing csv file over and over again.")

In [24]:
def create_test_csv(CSV_PATH,TEST_CSV_PATH,test_percentage=10):
    """
    """
    #Decide test_set percentage for a viable observation of performance
    #create test_csv file having 1% of main_csv file
    #remove test from main_csv and add (new calculated 1%-existing test size) to it as the main_csv file grows and evaluate previous models on new test_csv file
    df=pd.read_csv(CSV_PATH)
    df=df.drop_duplicates(['Path'])
    genre_count_list=list(df["Genre"].value_counts())
    required_test_set_size=int(min(genre_count_list)/test_percentage)
    print(f"Aiming for test size : {required_test_set_size}")
    try:
        test_df=pd.read_csv(TEST_CSV_PATH)
        test_df=test_df.drop_duplicates(['Path'])
        for genre in list(df["Genre"].unique()):
            try:
                current_test_set_size_for_genre=test_df['Genre'].value_counts()[genre]
            #If genre doesn't exist in test csv file
            except KeyError:
                print(f"Genre : {genre} doesn't exist in test csv file.")
                current_test_set_size_for_genre=0
            if(current_test_set_size_for_genre>required_test_set_size):
                print(f"Test csv contains {current_test_set_size_for_genre} records for {genre} which is more than required size : {required_test_set_size}. Removing {current_test_set_size_for_genre-required_test_set_size} records from test.")
                #Taking a random sample equal to the extra elements in test.
                temp_df=test_df.loc[test_df["Genre"]==genre].sample(n=current_test_set_size_for_genre-required_test_set_size)
                #Removing that sample from test csv
                test_df=pd.merge(test_df,temp_df, indicator=True, how='outer').query('_merge=="left_only"').drop('_merge', axis=1)
            elif(current_test_set_size_for_genre<required_test_set_size):
                print(f"Required test size is : {required_test_set_size} for {genre} which is more than current test size : {current_test_set_size_for_genre}. Adding {required_test_set_size-current_test_set_size_for_genre} records to test.")
                #Removing rows that already exist in test csv file
                temp_df=pd.merge(df,test_df, indicator=True, how='outer').query('_merge=="left_only"').drop('_merge', axis=1)
                #Then taking a random sample so that our test size reaches required size.
                temp_df=temp_df.loc[temp_df["Genre"]==genre].sample(n=required_test_set_size-current_test_set_size_for_genre)
                #Adding that sample to test csv
                test_df=pd.concat([test_df, temp_df], ignore_index=True, sort=False)
            else:
                pass
        test_df.to_csv(TEST_CSV_PATH,index=False)
    except FileNotFoundError:
        test_df=pd.DataFrame(list(),columns=['Filename','Artist','Title','Genre','Path','Source'])
        for genre in list(df["Genre"].unique()):
            #Taking a random sample from the dataset
            temp_df=df.loc[df["Genre"]==genre].sample(n=required_test_set_size)
            test_df = pd.concat([test_df, temp_df], ignore_index=True, sort=False)
        test_df.to_csv(TEST_CSV_PATH,index=False)
        print(f"Path of the created test csv file : {TEST_CSV_PATH}. Pass the path for future iterations.")   
    display(test_df)

In [5]:
def load_data(df,MEMORY_MAPPING_PATH,ERROR_PATH,SAMPLE_RATE,TRACK_DURATION,n_mfcc,n_fft,hop_length,n_segments,n_augmentations):
    try:
        #Deleting memory map files so that we don't face issues when creating memory maps here.
        clean(MEMORY_MAPPING_PATH,"files",file_list=["X","y"])
        try:
            error_df=pd.read_csv(ERROR_PATH)
        except FileNotFoundError:
            error_df=pd.DataFrame(list(),columns=['Filename','Genre','Path','Segment','Error'])

        genre_list=sorted(list(df["Genre"].unique()))
        print(f"Classes present in the dataframe : {genre_list}")
        SAMPLES_PER_TRACK = SAMPLE_RATE * TRACK_DURATION
        samples_per_segment = int(SAMPLES_PER_TRACK / n_segments)
        n_mfcc_vectors_per_segment = math.ceil(samples_per_segment / hop_length)
        
        max_input_size=len(df.index)*(n_augmentations+1)*n_segments
        temp_X = np.memmap(os.path.join(MEMORY_MAPPING_PATH,"X.mymemmap"), dtype='float64', mode='w+', shape=(max_input_size, n_mfcc_vectors_per_segment, n_mfcc))
        temp_y = np.memmap(os.path.join(MEMORY_MAPPING_PATH,"y.mymemmap"), dtype='uint8', mode='w+', shape=(max_input_size,))
        print("Empty memortemp_y maps X.mymemmap and y.mymemmap created.")

        #audio file count
        song_count=1
        #viable array count
        count=0
        test_null_signal_array=np.zeros((n_mfcc_vectors_per_segment,n_mfcc))
        test_null_signal_array[:,0]=-1131.370849898476
        for index,row in df.iterrows():
            starting_time=time.time()
            print(f"{song_count}. {row['Filename']} : {genre_list.index(row['Genre'])}")
            try:
                signal, sample_rate = librosa.load(row['Path'], sr=SAMPLE_RATE,duration=TRACK_DURATION)
            except Exception as e:
                error_df.loc[len(error_df.index)]=[row["Filename"],row["Genre"],row["Path"],"",e]
                print("ERROR : Couldn't read file")
                continue
            signals=Augment_Song(signal,sample_rate,n_augmentations)
            for signal in signals:
                for d in range(n_segments):
                    start = samples_per_segment * d
                    finish = start + samples_per_segment
                    #n_mfcc values beyond 128 produces 128 mfcc only.
                    mfcc = librosa.feature.mfcc(y=signal[start:finish], sr=sample_rate, n_mfcc=n_mfcc, n_fft=n_fft, hop_length=hop_length)
                    mfcc = mfcc.T
                    
                    #This check makes sure the shape of the input is consistent. Assuming this stays in, we can assume shape to be (n_mfcc_vectors_per_segment,n_mfcc) for each segment
                    if(mfcc.shape==(n_mfcc_vectors_per_segment,n_mfcc)):
                        if(np.all(np.equal(test_null_signal_array,mfcc))):
                            print(f"ERROR : Segment {d} skipped.")
                            error_df.loc[len(error_df.index)]=[row["Filename"],row["Genre"],row["Path"],d,"mfcc array equal to array produced if signal is a zero array."]
                            continue
                        if(np.any(np.isnan(mfcc))):
                            print(f"ERROR : Segment {d} skipped.")
                            error_df.loc[len(error_df.index)]=[row["Filename"],row["Genre"],row["Path"],d,"Null value present in mfcc array."]
                            continue
                        if(np.any(np.isinf(mfcc))):
                            print(f"ERROR : Segment {d} skipped.")
                            error_df.loc[len(error_df.index)]=[row["Filename"],row["Genre"],row["Path"],d,"Inf value present in mfcc array."]
                            continue
                        temp_X[count,:]=mfcc
                        temp_y[count]=genre_list.index(row['Genre'])
                        count+=1
                    else:
                        print(f"ERROR : Segment {d} skipped.")
                        error_df.loc[len(error_df.index)]=[row["Filename"],row["Genre"],row["Path"],d,"Shape of mfcc array not consistent with our calculation"]
            song_count+=1
        error_df.to_csv(ERROR_PATH,index=False)
        ending_time=time.time()
        del temp_X
        del temp_y
        gc.collect()
        print("Data loaded : ",ending_time-starting_time,"seconds",end="\n")
    except Exception as e:
        print("Exception occured :",e)
        try:
            del temp_X
        except NameError:
            pass
        try:
            del temp_y
        except NameError:
            pass
        gc.collect()
        clean(MEMORY_MAPPING_PATH,"files",file_list=["X","y"])
        raise Exception(e)
    except KeyboardInterrupt:
        #If user interrupts this process, X and y can have corrupt values since it was in the middle of adding data. 
        #We are removing the memory map files as well as deleting the variables for safety.
        #deleting variables required since referenced to the memory map will still be in memory 
        #which will not allow us to run the code again since WinError which tells us some other process is using the file.
        try:
            del temp_X
        except NameError:
            pass
        try:
            del temp_y
        except NameError:
            pass
        gc.collect()
        clean(MEMORY_MAPPING_PATH,"files",file_list=["X","y"])
        raise KeyboardInterrupt("Deleted memory maps and variables from memory since values will be corrupt")

In [6]:
def clean(MEMORY_MAPPING_PATH,s,file_list=None):
    #if(s=="csv")
    if(file_list==None):
        file_list=["X","y","X_train","X_validation","X_test","y_train","y_validation","y_test"]
    if((s=="variables") or (s=="full")):
        for variable in file_list:
            #myVars = locals() 
            #del doesn't work if locals since it returns a copy of the directory not the directory itself like globals. So for our usecase, we have to use globals [may be dangerous.]
            myVars = globals()
            try:
                del myVars[variable]
            except NameError:
                continue
            except KeyError:
                continue
        gc.collect()
    if((s=="files") or (s=="full")):
        for file in file_list:
            try:
                os.remove(os.path.join(MEMORY_MAPPING_PATH,file+".mymemmap"))
            except FileNotFoundError:
                continue

In [23]:
def Preprocess(MEMORY_MAPPING_PATH,ERROR_PATH,CSV_PATH,TEST_CSV_PATH,SAMPLE_RATE,TRACK_DURATION,n_mfcc,n_fft,hop_length,n_segments,n_augmentations):
    """
    """
    Function_start_time=time.time()
    
    global X
    global y
    global X_train 
    global X_validation
    global X_test
    global y_train
    global y_validation
    global y_test 
    
    
    df=pd.read_csv(CSV_PATH)
    print("CSV file read.")
    df=df.drop_duplicates(['Path'])
    
    test_percentage=10
    create_test_csv(CSV_PATH,TEST_CSV_PATH,test_percentage)
    test_df=pd.read_csv(TEST_CSV_PATH)
    df=pd.merge(df,test_df, indicator=True, how='outer').query('_merge=="left_only"').drop('_merge', axis=1)
    genre_count_list=list(df["Genre"].value_counts())
    balanced_df=pd.DataFrame(list(),columns=['Filename','Artist','Title','Genre','Path','Source'])
    for genre in list(df["Genre"].unique()):
        #Taking a random sample from the dataset
        temp_df=df.loc[df["Genre"]==genre].sample(n=min(genre_count_list))
        balanced_df = pd.concat([balanced_df, temp_df], ignore_index=True, sort=False)
    print(f"Dataset balanced to have only {min(genre_count_list)} data points per class.")
    #To shuffle. Not needed since we are using train test split for shuffling which is a better way since this will only shuffle songs not segments as well.
    #df=df.sample(n = len(df.index))
    
    #delete memory mapping if present in directory
    file_list=["X","y","X_train","X_validation","X_test","y_train","y_validation","y_test"]
    clean(MEMORY_MAPPING_PATH,"full",file_list)
    try:
        os.remove(ERROR_PATH)
    except FileNotFoundError:
        pass
    try:
        SAMPLES_PER_TRACK = SAMPLE_RATE * TRACK_DURATION
        samples_per_segment = int(SAMPLES_PER_TRACK / n_segments)
        n_mfcc_vectors_per_segment = math.ceil(samples_per_segment / hop_length)
        #Loading for training set.
        load_data(df,MEMORY_MAPPING_PATH,ERROR_PATH,SAMPLE_RATE,TRACK_DURATION,n_mfcc,n_fft,hop_length,n_segments,n_augmentations)
        max_input_size=len(balanced_df.index)*(n_augmentations+1)*n_segments
        X = np.memmap(os.path.join(MEMORY_MAPPING_PATH,"X.mymemmap"), dtype='float64', mode='r', shape=(max_input_size, n_mfcc_vectors_per_segment, n_mfcc))
        y = np.memmap(os.path.join(MEMORY_MAPPING_PATH,"y.mymemmap"), dtype='uint8', mode='r', shape=(max_input_size,))
        #Getting the actual size of the training set since we might encounter segment errors and unreadable files.
        actual_size=len(X)
        d=np.zeros((n_mfcc_vectors_per_segment, n_mfcc),dtype=float)
        for i in range(-1,-len(X),-1):
            if(np.array_equal(X[-i],d)):
                actual_size-=1
        index_list=np.arange(0, actual_size, 1, dtype=int)
        #Splitting from training set, the validation set.
        validation_ratio=0.2
        train_indices,validation_indices=train_test_split(index_list,test_size=validation_ratio,shuffle=True)
        print(f"Splitting X in the ratio : {validation_ratio}. \n(Training + Validation) set size : {actual_size}. \nTrain set size : {len(train_indices)} \nValidation set size : {len(validation_indices)}")

        #Creating empty memory maps for train and validation
        X_train = np.memmap(os.path.join(MEMORY_MAPPING_PATH,"X_train.mymemmap"), dtype='float64', mode='w+', shape=(len(train_indices), n_mfcc_vectors_per_segment, n_mfcc))
        X_validation = np.memmap(os.path.join(MEMORY_MAPPING_PATH,"X_validation.mymemmap"), dtype='float64', mode='w+', shape=(len(validation_indices), n_mfcc_vectors_per_segment, n_mfcc))
        y_train = np.memmap(os.path.join(MEMORY_MAPPING_PATH,"y_train.mymemmap"), dtype='uint8', mode='w+', shape=(len(train_indices),))
        y_validation = np.memmap(os.path.join(MEMORY_MAPPING_PATH,"y_validation.mymemmap"), dtype='uint8', mode='w+', shape=(len(validation_indices),))
        print("Empty memory maps created for train and validation")

        #Placing respective values in validation and train memory maps.
        for index in range(0,actual_size):
            #print(index,end=":")
            if(index in train_indices):
                #print("train")
                X_train[np.where(train_indices == index)[0][0],:]=X[index]
                y_train[np.where(train_indices == index)[0][0]]=y[index]
            elif(index in validation_indices):
                #print("validation")
                X_validation[np.where(validation_indices == index)[0][0],:]=X[index]
                y_validation[np.where(validation_indices == index)[0][0]]=y[index]
            else:
                print(f"ERROR : Index {count} not in train or validation indices")
       
        print("Train and validation memory maps written.")
        clean(MEMORY_MAPPING_PATH,"variables",["X","y"])
        
        #Avoiding augmentation here since we assume test to be the actual real representation that we want to perform well on.
        load_data(test_df,MEMORY_MAPPING_PATH,ERROR_PATH,SAMPLE_RATE,TRACK_DURATION,n_mfcc,n_fft,hop_length,n_segments,0)
        max_input_size=len(test_df.index)*(n_augmentations+1)*n_segments
        X = np.memmap(os.path.join(MEMORY_MAPPING_PATH,"X.mymemmap"), dtype='float64', mode='r', shape=(max_input_size, n_mfcc_vectors_per_segment, n_mfcc))
        y = np.memmap(os.path.join(MEMORY_MAPPING_PATH,"y.mymemmap"), dtype='uint8', mode='r', shape=(max_input_size,))
        actual_size=len(X)
        for i in range(-1,-len(X),-1):
            if(np.array_equal(X[-i],d)):
                actual_size-=1
        print(f"Test set size : {actual_size}")
        X_test = np.memmap(os.path.join(MEMORY_MAPPING_PATH,"X_test.mymemmap"), dtype='float64', mode='w+', shape=(actual_size, n_mfcc_vectors_per_segment, n_mfcc))
        y_test = np.memmap(os.path.join(MEMORY_MAPPING_PATH,"y_test.mymemmap"), dtype='uint8', mode='w+', shape=(actual_size,))
        X_test[:]=X[:actual_size]
        y_test[:]=y[:actual_size]
        print("Test memory map written.")
        clean(MEMORY_MAPPING_PATH,"variables",file_list)
    except Exception as e:
        print("Exception occured : ",e)
        clean(MEMORY_MAPPING_PATH,"full",file_list)
        raise Exception(e)
    except KeyboardInterrupt:
        clean(MEMORY_MAPPING_PATH,"full",file_list)
        raise KeyboardInterrupt("Deleted memory maps and variables from memory since values will be corrupt")
    Function_end_time=time.time()
    print(f"\n\n\nPreprocessing Execution Time : {Function_end_time-Function_start_time}")

In [8]:
def build_model(input_shape,n_classes):
    """Generates CNN model

    :param input_shape (tuple): Shape of input set
    :return model: CNN model
    """

    # build network topology
    model = keras.Sequential()

    # 1st conv layer
    model.add(keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=input_shape))
    model.add(keras.layers.MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
    model.add(keras.layers.BatchNormalization())
    
    # 2nd conv layer
    model.add(keras.layers.Conv2D(32, (3, 3), activation='relu'))
    model.add(keras.layers.MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
    model.add(keras.layers.BatchNormalization())

    # 3rd conv layer
    model.add(keras.layers.Conv2D(32, (2, 2), activation='relu'))
    model.add(keras.layers.MaxPooling2D((2, 2), strides=(2, 2), padding='same'))
    model.add(keras.layers.BatchNormalization())

    # flatten output and feed it into dense layer
    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(64, activation='relu'))
    model.add(keras.layers.Dropout(0.3))

    # output layer
    model.add(keras.layers.Dense(n_classes, activation='softmax'))

    return model

In [9]:
def plot_history(history):
    """Plots accuracy/loss for training/validation set as a function of the epochs

        :param history: Training history of model
        :return:
    """

    fig, axs = plt.subplots(2)

    # create accuracy sublpot
    axs[0].plot(history.history["accuracy"], label="train accuracy")
    axs[0].plot(history.history["val_accuracy"], label="test accuracy")
    axs[0].set_ylabel("Accuracy")
    axs[0].legend(loc="lower right")
    axs[0].set_title("Accuracy eval")

    # create error sublpot
    axs[1].plot(history.history["loss"], label="train error")
    axs[1].plot(history.history["val_loss"], label="test error")
    axs[1].set_ylabel("Error")
    axs[1].set_xlabel("Epoch")
    axs[1].legend(loc="upper right")
    axs[1].set_title("Error eval")

    plt.show()

In [10]:
def predict(model, X, y):
    """Predict a single sample using the trained model

    :param model: Trained classifier
    :param X: Input data
    :param y (int): Target
    """

    # add a dimension to input data for sample - model.predict() expects a 4d array in this case
    X = X[np.newaxis, ...] # array shape (1, 130, 13, 1)
    #(130,13) array produced for each segment. Therefore (n_segments,130,13)

    # perform prediction
    prediction = model.predict(X)

    # get index with max value
    predicted_index = np.argmax(prediction, axis=1)

    print("Target: {}, Predicted label: {}".format(y, predicted_index))

In [11]:
def create_dataset(MEMORY_MAPPING_PATH,n_mfcc,n_segments,n_mfcc_vectors_per_segment):
    """
    """
    s=time.time()
    memory_map_directory_file_list=next(os.walk(MEMORY_MAPPING_PATH))[2]
    required_memory_maps_list=["X_train.mymemmap","X_validation.mymemmap","X_test.mymemmap","y_train.mymemmap","y_validation.mymemmap","y_test.mymemmap"]
    #If the required maps not in the given directory
    if(not(all(item in memory_map_directory_file_list for item in required_memory_maps_list))):
        raise ValueError('Memory Map directory may be incorrect since required memory maps not found. Use remove=True if memory maps were not created during the previous iterations.')
        
    #Here 8 is because float64 uses 8 bytes. [float64 uses 64 bits -> 8 bytes]
    file_size = os.path.getsize(os.path.join(MEMORY_MAPPING_PATH,"X_train.mymemmap"))
    train_size=int(file_size/(n_mfcc_vectors_per_segment*n_mfcc*8))
    
    file_size = os.path.getsize(os.path.join(MEMORY_MAPPING_PATH,"X_validation.mymemmap"))
    validation_size=int(file_size/(n_mfcc_vectors_per_segment*n_mfcc*8))
    
    file_size = os.path.getsize(os.path.join(MEMORY_MAPPING_PATH,"X_test.mymemmap"))
    test_size=int(file_size/(n_mfcc_vectors_per_segment*n_mfcc*8))
    print(f"Train, Validation, Test Memory maps of sizes {train_size}, {validation_size}, {test_size} read respectively")
    
    X_train = np.memmap(os.path.join(MEMORY_MAPPING_PATH,"X_train.mymemmap"), dtype='float64', mode='r', shape=(train_size, n_mfcc_vectors_per_segment, n_mfcc))
    X_validation = np.memmap(os.path.join(MEMORY_MAPPING_PATH,"X_validation.mymemmap"), dtype='float64', mode='r', shape=(validation_size,n_mfcc_vectors_per_segment, n_mfcc))
    X_test = np.memmap(os.path.join(MEMORY_MAPPING_PATH,"X_test.mymemmap"), dtype='float64', mode='r', shape=(test_size, n_mfcc_vectors_per_segment, n_mfcc))
    y_train = np.memmap(os.path.join(MEMORY_MAPPING_PATH,"y_train.mymemmap"), dtype='uint8', mode='r', shape=(train_size,))
    y_validation = np.memmap(os.path.join(MEMORY_MAPPING_PATH,"y_validation.mymemmap"), dtype='uint8', mode='r', shape=(validation_size,))
    y_test = np.memmap(os.path.join(MEMORY_MAPPING_PATH,"y_test.mymemmap"), dtype='uint8', mode='r', shape=(test_size,))


    input_example_shape =  X_train[0].shape
    input_dtype = np.float64
    print(f"Input example shape : {input_example_shape} Input datatype : {input_dtype}")

    # generator function
    def X_train_generator():
        return iter(X_train)
    def X_validation_generator():
        return iter(X_validation)
    def X_test_generator():
        return iter(X_test)

    # create tf dataset from generator fn
    X_train_dataset = tf.data.Dataset.from_generator(
        generator=X_train_generator,
        output_types=input_dtype,
        output_shapes=input_example_shape,
    )
    # create tf dataset from generator fn
    X_validation_dataset = tf.data.Dataset.from_generator(
        generator=X_validation_generator,
        output_types=input_dtype,
        output_shapes=input_example_shape,
    )
    # create tf dataset from generator fn
    X_test_dataset = tf.data.Dataset.from_generator(
        generator=X_test_generator,
        output_types=input_dtype,
        output_shapes=input_example_shape,
    )

    label_example_shape=y_train[0].shape
    label_data_dtype=np.uint8
    print(f"Label example shape : {label_example_shape} Label datatype : {label_data_dtype}")

    # generator function
    def y_train_generator():
        return iter(y_train)
    def y_validation_generator():
        return iter(y_validation)
    def y_test_generator():
        return iter(y_test)

    # create tf dataset from generator fn
    y_train_dataset = tf.data.Dataset.from_generator(
        generator=y_train_generator,
        output_types=label_data_dtype,
        output_shapes=label_example_shape
    )
    # create tf dataset from generator fn
    y_validation_dataset = tf.data.Dataset.from_generator(
        generator=y_validation_generator,
        output_types=label_data_dtype,
        output_shapes=label_example_shape
    )
    # create tf dataset from generator fn
    y_test_dataset = tf.data.Dataset.from_generator(
        generator=y_test_generator,
        output_types=label_data_dtype,
        output_shapes=label_example_shape
    )

    train_dataset= tf.data.Dataset.zip((X_train_dataset, y_train_dataset))
    validation_dataset= tf.data.Dataset.zip((X_validation_dataset, y_validation_dataset))
    test_dataset= tf.data.Dataset.zip((X_test_dataset, y_test_dataset))

    def RESHAPE(tensor_value,label):
        tensor_value=tf.reshape(tensor_value,(input_example_shape[0], input_example_shape[1],1))
        tensor_value.set_shape(tensor_value.shape)
        return (tensor_value,label)

    train_dataset = train_dataset.map(RESHAPE)
    validation_dataset = validation_dataset.map(RESHAPE)
    test_dataset = test_dataset.map(RESHAPE)

    train_batch=train_dataset.batch(32)
    validation_batch=validation_dataset.batch(32)
    test_batch=test_dataset.batch(32)

    print(f"Dataset created : {time.time()-s} Seconds")
    return train_batch,validation_batch,test_batch

In [12]:
def training(MEMORY_MAPPING_PATH=None,ERROR_PATH=None,MODEL_SAVE_DIRECTORY=None,DATASET_PATH=None,CSV_PATH=None,TEST_CSV_PATH=None,remove=True,SAMPLE_RATE=22050,TRACK_DURATION = 30,n_mfcc=13,n_fft=2048,hop_length=512,n_segments=10,n_augmentations=0):
    """
    MEMORY_MAPPING_PATH is where we will store the mfcc numpy arrays of train,test,validation input and labels. [6 files total]
    ERROR_PATH -> csv file which will include files that are unreadable or segments inside those files that are corrupt.
    MODEL_SAVE_DIRECTORY -> The path where we want to save the model.
    If no csv file, assuming mp3/wav files are in respective genre folder where folder name = genre label
    If there is a csv file with dataset, columns = [Filename,Genre,Path]
    DATASET_PATH must be given if csv file of audio files is not there.
    DATASET_PATH AND CSV_PATH need not be given since we will read directly from the csv file - the path of the audio files.
    remove -> whether we want to remove existing memory maps or not. This WILL avoid preprocessing. If remove = True and memory maps not in location, will throw error. 
    SAMPLE_RATE= The number of data points per second in the audio file. Default is 22050.
    TRACK_DURATION = 30
    n_mfcc=13
    n_fft=2048
    hop_length=512
    n_segments=10
    n_augmentations=0
    """
    s=time.time()
    #numpy memory mapping gets created, error_csv gets created
    if((remove==True) or (CSV_PATH==None) or (MEMORY_MAPPING_PATH==None)):
        if(CSV_PATH==None and DATASET_PATH==None):
            raise ValueError('Either dataset path or csv path must be given')
        elif(CSV_PATH==None):
            #csv file will be stored in dataset directory itself
            CSV_PATH=os.path.join(DATASET_PATH,"Audio_Dataset.csv")
            create_csv(DATASET_PATH,CSV_PATH)
        else:
            pass
        if(ERROR_PATH==None):
            ERROR_PATH=os.path.join(os.path.split(CSV_PATH)[0],"Error.csv")
        if(TEST_CSV_PATH==None):
            TEST_CSV_PATH=os.path.join(os.path.split(CSV_PATH)[0],"Test.csv")
        if(MEMORY_MAPPING_PATH==None):
            MEMORY_MAPPING_PATH=os.path.split(CSV_PATH)[0]
        Preprocess(MEMORY_MAPPING_PATH,ERROR_PATH,CSV_PATH,TEST_CSV_PATH,SAMPLE_RATE,TRACK_DURATION,n_mfcc,n_fft,hop_length,n_segments,n_augmentations)
    if(MODEL_SAVE_DIRECTORY==None):
        MODEL_SAVE_DIRECTORY=os.path.join(os.path.split(CSV_PATH)[0],"Model.h5")
        
    SAMPLES_PER_TRACK = SAMPLE_RATE * TRACK_DURATION
    samples_per_segment = int(SAMPLES_PER_TRACK / n_segments)
    n_mfcc_vectors_per_segment = math.ceil(samples_per_segment / hop_length)
    train_batch,validation_batch,test_batch=create_dataset(MEMORY_MAPPING_PATH,n_mfcc,n_segments,n_mfcc_vectors_per_segment)

    df=pd.read_csv(CSV_PATH)
    n_classes=len(list(df["Genre"].unique()))
    genre_list=sorted(list(df["Genre"].unique()))
    del df
    input_shape=(n_mfcc_vectors_per_segment, n_mfcc,1)
    model=build_model(input_shape,n_classes)
    print(f"Model built with input shape : {input_shape} and output classes : {n_classes}")

    optimiser = keras.optimizers.Adam(learning_rate=0.0001)
    model.compile(optimizer=optimiser,
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    print("Model compiled")
    model.summary()

    history = model.fit(train_batch,epochs=50,validation_data=validation_batch,verbose=1)
    # plot accuracy/error for training and validation
    plot_history(history)

    # evaluate model on test set
    test_loss, test_acc = model.evaluate(test_batch, verbose=2)
    #Performance csv.loc[len(index)]=[MODEL_SAVE_DIRECTORY,test_loss,test_acc]  
    print('\nTest accuracy:', test_acc)

    with h5py.File(MODEL_SAVE_DIRECTORY, mode='w') as f:
        hdf5_format.save_model_to_hdf5(model, f)
        f.attrs['Genre_List'] = genre_list
        f.attrs["SAMPLE_RATE"] = SAMPLE_RATE
        f.attrs["TRACK_DURATION"]=TRACK_DURATION
        f.attrs["n_mfcc"]=n_mfcc
        f.attrs["n_fft"]=n_fft
        f.attrs["hop_length"]=hop_length
        f.attrs["n_segments"]=n_segments
    print(f"MODEL SUCESSFULLY SAVED AT {MODEL_SAVE_DIRECTORY}")
    print(f"Training Execution Time : {time.time()-s} Seconds")

In [13]:
DATASET_PATH="D:/Downloads/MGR Data/Data/GTZAN/Data/genres_original"
SAMPLE_RATE=22050
TRACK_DURATION = 30
n_mfcc=13
n_fft=2048
hop_length=512
n_segments=10
n_augmentations=0

In [None]:
training(DATASET_PATH=DATASET_PATH,remove=True)

In [18]:
CSV_PATH="D:/Downloads/MGR Data/Metadata/Datasets/GTZAN/GTZAN.csv"
MEMORY_MAPPING_PATH = "D:/Downloads/MGR Data/Metadata/Datasets/Combined/"
ERROR_PATH="D:/Downloads/MGR Data/Metadata/Datasets/Combined/Error.csv"
MODEL_SAVE_DIRECTORY="D:/Downloads/MGR Data/Models/EXPERIMENT 6.h5"
DATASET_PATH=None
TEST_CSV_PATH=None
SAMPLE_RATE=22050
TRACK_DURATION = 30
n_mfcc=13
n_fft=2048
hop_length=256
n_segments=10
n_augmentations=0

In [None]:
training(MEMORY_MAPPING_PATH,ERROR_PATH,MODEL_SAVE_DIRECTORY,DATASET_PATH,CSV_PATH,TEST_CSV_PATH,remove=True)