# Data Preprocessing

In [1]:
from audiosep.data import split_data
import mutagen
import librosa
from scipy.io import wavfile
import librosa
import os
import math
import numpy as np
import tempfile
import soundfile as sf
from audiosep.data import load_data, split_data

In [2]:
%load_ext autoreload
%autoreload 2

In [106]:
def get_mfcc(path, n_mfcc= 13, n_fft= 2048, hop_length= 512, num_segments= 10):

    SAMPLE_RATE = 22050
    duration = 30
    samples_per_track = SAMPLE_RATE * duration

    num_samples_per_segment = int(samples_per_track / num_segments)
    expected_nmfcc_vectors_per_segment = math.ceil(num_samples_per_segment / hop_length)
    
    if type(path) == np.ndarray:
        signal = path
    else:
        signal, sr = librosa.load(path, sr=SAMPLE_RATE)

    # process by segment, extracting mfcc and storing data
    for s in range(num_segments):
        start_sample = num_samples_per_segment * s 
        finish_sample = start_sample + num_samples_per_segment

        mfcc = librosa.feature.mfcc(signal[start_sample: finish_sample],
                                    sr=SAMPLE_RATE,
                                    n_fft= n_fft,
                                    n_mfcc= n_mfcc,
                                    hop_length = hop_length)
        mfcc = mfcc.T
       
        if len(mfcc) == expected_nmfcc_vectors_per_segment:
            return mfcc

## sub sample 10

# Test Model

In [8]:
X_train, X_val, X_test, y_train, y_val, y_test = split_data()

In [9]:
print(X_train.shape)
print(X_val.shape)
print(X_test.shape)

(5397, 130, 13, 1)
(1350, 130, 13, 1)
(2249, 130, 13, 1)


In [10]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import BatchNormalization, Conv2D, MaxPool2D, Dense, Flatten, Dropout
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import regularizers

In [11]:
def initialize_conv(input_shape):
    # create model
    model = Sequential()
    
    # 1st cond layer
    model.add(Conv2D(32, (3, 3), activation= 'relu', input_shape=input_shape))
    model.add(MaxPool2D((3, 3), strides= (2, 2), padding= 'same'))
    model.add(BatchNormalization())
    
    # 2nd conv layer
    model.add(Conv2D(32, (3, 3), activation= 'relu'))
    model.add(MaxPool2D((3, 3), strides= (2, 2), padding= 'same'))
    model.add(BatchNormalization())
    
    # 3rd conv layer
    model.add(Conv2D(32, (2, 2), activation= 'relu'))
    model.add(MaxPool2D((2, 2), strides= (2, 2), padding= 'same'))
    model.add(BatchNormalization())
    
    # flatten to 1D array and feed to dense
    model.add(Flatten())
    model.add(Dense(64, activation= 'relu'))
    model.add(Dropout(0.3))
    
    # output layer
    model.add(Dense(9, activation= 'softmax'))
    
    # compile model
    model.compile(loss='sparse_categorical_crossentropy',
              optimizer=Adam(learning_rate=0.0003),
              metrics=['accuracy'])
    
    return model

In [12]:
input_shape = (X_train.shape[1], X_train.shape[2], X_train.shape[3])

In [24]:
input_shape

(130, 13, 1)

In [13]:
cnn_model = initialize_conv(input_shape)
cnn_model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 128, 11, 32)       320       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 64, 6, 32)         0         
_________________________________________________________________
batch_normalization (BatchNo (None, 64, 6, 32)         128       
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 62, 4, 32)         9248      
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 31, 2, 32)         0         
_________________________________________________________________
batch_normalization_1 (Batch (None, 31, 2, 32)         128       
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 30, 1, 32)         4

In [14]:
es = EarlyStopping(patience=10, restore_best_weights=True)

history = cnn_model.fit(X_train, y_train,
                      epochs= 30,
                      batch_size= 32,
                      validation_data= (X_val, y_val),
                      verbose=2,
                      callbacks= [es])

Epoch 1/30
169/169 - 4s - loss: 1.9141 - accuracy: 0.3476 - val_loss: 1.5412 - val_accuracy: 0.4570
Epoch 2/30
169/169 - 3s - loss: 1.4261 - accuracy: 0.4918 - val_loss: 1.2322 - val_accuracy: 0.5548
Epoch 3/30
169/169 - 3s - loss: 1.2225 - accuracy: 0.5533 - val_loss: 1.0704 - val_accuracy: 0.6037
Epoch 4/30
169/169 - 3s - loss: 1.1334 - accuracy: 0.5968 - val_loss: 1.0174 - val_accuracy: 0.6222
Epoch 5/30
169/169 - 3s - loss: 1.0399 - accuracy: 0.6322 - val_loss: 0.9593 - val_accuracy: 0.6548
Epoch 6/30
169/169 - 3s - loss: 0.9378 - accuracy: 0.6607 - val_loss: 0.9362 - val_accuracy: 0.6711
Epoch 7/30
169/169 - 4s - loss: 0.8910 - accuracy: 0.6828 - val_loss: 0.9292 - val_accuracy: 0.6756
Epoch 8/30
169/169 - 4s - loss: 0.8514 - accuracy: 0.6904 - val_loss: 0.8792 - val_accuracy: 0.6970
Epoch 9/30
169/169 - 4s - loss: 0.7865 - accuracy: 0.7284 - val_loss: 0.9058 - val_accuracy: 0.6741
Epoch 10/30
169/169 - 4s - loss: 0.7680 - accuracy: 0.7237 - val_loss: 0.8987 - val_accuracy: 0.6852

## On new wavfile

In [85]:
def split_audio(test_data):
    signal, sr = librosa.load(test_data)

    start = 0
    split_data = {}
    duration = int(librosa.get_duration(signal)) # duration
    
    
    if duration < 30:
        print("Please upload a song of at least 30 seconds")
        
    else:
        batch_size = sr * 30  # get num of vectors per 30 secs
        length = len(signal) # total len of signal array
        num_iter = duration // 30 # model trained with 30 second clips
        # num of times to loop over len 
        for i in range(num_iter):
            # slice 30 seconds--> batch size 
            split_data['batch_'+ str(i+1)] = signal[start: start+batch_size]
            start += batch_size  #update start index

        num_splits = len(split_data)
    
        return split_data, num_splits

# should give the num of splits as well

# possibly average the predictions of each split

In [None]:
#create temp file and save split wavefiles
#path = os.getcwd()
#f = tempfile.TemporaryDirectory(dir= path)
#for key, val in test.items():
#    sf.write(f'{f.name}/{key}.wav', val, 22050)

In [167]:
def predict(model, X):
    
    # reshape X
    if X.ndim == 3:
        X = X[np.newaxis, ...] # to match model input shape
    else:
        X = X[np.newaxis, ..., np.newaxis]
    
    # get predictions
    pred = model.predict(X) # pred is 2D array of probs for each genre class
    
    # extract index with max val
    pred = np.argmax(pred, axis=1)[0]
    
    return pred

In [168]:
def predict_new(model, X):
    
    X, num_x = split_audio(X)
    
    predictions = []

    for key, val in X.items():
        mfcc = get_mfcc(val)
        pred = predict(model, mfcc)
        predictions.append(pred)

    predictions = np.array(predictions)
    
    pred = np.argmax(predictions)
    
    # labels gotten from data.json['mapping']
    genres ={0: "Blues",
         1: "Classical",
         2: "Country",
         3: "Disco",
         4: "Hiphop",
         5: "Metal",
         6: "pop",
         7: "Reggae",
         8: "Rock"
        }
    
    print(f"Predicted genre: {genres.get(pred)}")
    
    return genres.get(pred)
    
    

In [174]:
mine_2 = 'C:/Users/cezea/Desktop/MUSIC/exports/Kulture.mp3'

In [175]:
predict_new(cnn_model, mine_2)



Predicted genre: Metal


'Metal'