In [1]:
import os
import numpy as np

import warnings
import tensorflow as tf

from scipy.io import wavfile
from glob import glob

from kapre.composed import get_melspectrogram_layer
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import CSVLogger, ModelCheckpoint
from tensorflow.keras.layers import TimeDistributed, LayerNormalization
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import l2

ModuleNotFoundError: No module named 'tensorflow'

In [2]:
src_root = 'raw_data'
dst_root = 'clean'
dt = 1.0
sample_f = 44100
dummy_file = 'finger_snaps_1_4'
threshold = 120

batch_size = 16
model_type = "conv2d"

In [58]:
def Conv2D(n_classes=2, sf = sample_f, dt = dt):
    input_shape = (int(sf*dt),1)
    i = get_melspectrogram_layer(input_shape = input_shape, 
                                 n_mels = 40,
                                 pad_end = True,
                                 n_fft=512,
                                 win_length = 400,
                                 hop_length = 160,
                                 sample_rate = sf,
                                 return_decibel = True,
                                 input_data_format="channels_last",
                                 output_data_format="channels_last")
    # output format : batch, time, frequency, channels

    x = LayerNormalization(axis=2, name='batch_norm')(i.output)
    x = layers.Conv2D(8, kernel_size=(7,7), activation='tanh', padding='same', name='conv2d_tanh')(x)
    x = layers.MaxPooling2D(pool_size=(2,2), padding='same', name='max_pool_2d_1')(x)
    x = layers.Conv2D(16, kernel_size=(5,5), activation='relu', padding='same', name='conv2d_relu_1')(x)
    x = layers.MaxPooling2D(pool_size=(2,2), padding='same', name='max_pool_2d_2')(x)
    x = layers.Conv2D(16, kernel_size=(3,3), activation='relu', padding='same', name='conv2d_relu_2')(x)
    x = layers.MaxPooling2D(pool_size=(2,2), padding='same', name='max_pool_2d_3')(x)
    x = layers.Conv2D(32, kernel_size=(3,3), activation='relu', padding='same', name='conv2d_relu_3')(x)
    x = layers.MaxPooling2D(pool_size=(2,2), padding='same', name='max_pool_2d_4')(x)
    x = layers.Conv2D(32, kernel_size=(3,3), activation='relu', padding='same', name='conv2d_relu_4')(x)
    x = layers.Flatten(name='flatten')(x)
    x = layers.Dropout(rate=0.1, name='dropout_2')(x)
    x = layers.Dense(64, activation='relu', activity_regularizer=l2(0.001), name='dense_1')(x)
    o = layers.Dense(n_classes, activation='softmax', name='softmax')(x)
    model = Model(inputs=i.input, outputs=o, name='2d_convolution')
    model.compile(optimizer='adam',
                  loss='categorical_crossentropy',
                  metrics=['accuracy']) 
    return model

In [59]:
class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self,wav_paths,labels,sf,
                    dt, n_classes,batch_size = 32, shuffle = True):
        self.wav_paths = wav_paths
        self.labels = labels
        self.sf = sf
        self.dt = dt
        self.n_classes = n_classes
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.wav_paths)/self.batch_size))

    def __getitem__(self,index):
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        
        wav_paths = [self.wav_paths[k] for k in indexes]
        labels = [self.labels[k] for k in indexes]

        X = np.empty((self.batch_size,int(self.sf*self.dt),1),dtype=np.float32)
        Y = np.empty((self.batch_size, self.n_classes), dtype=np.float32)

        for i, (path, label) in enumerate(zip(wav_paths,labels)):
            rate, wav = wavfile.read(path)
            X[i,] = wav.reshape(-1,1)
            Y[i,] = to_categorical(label,num_classes=self.n_classes)
        return X, Y
    
    def on_epoch_end(self):
        self.indexes = np.arange(len(self.wav_paths))
        if self.shuffle:
            np.random.shuffle(self.indexes)

In [60]:
def train(src = dst_root, sf=sample_f, dt=dt, bs = batch_size, model_type = model_type):

    csv_path = os.path.join("logs",f"{model_type}_history.csv")

    n_classes = len(os.listdir(src))

    wav_paths = glob('{}/**'.format(src), recursive=True)
    wav_paths = [x.replace(os.sep, '/') for x in wav_paths if '.wav' in x]
    classes = sorted(os.listdir(src))
    le = LabelEncoder()
    le.fit(classes)
    labels = [os.path.split(x)[0].split('/')[-1] for x in wav_paths]
    labels = le.transform(labels)
    
    wav_train, wav_val, label_train, label_val = train_test_split(wav_paths,labels,test_size = 0.25, random_state=37)

    assert len(label_train) >= bs, "Number of train samples must be >= than batch_size"
    if len(set(label_train))!= n_classes:
        warnings.warn(f"Found {len(set(label_train))}/{n_classes} classes in training data. Increase data size or change random_state.")
    if len(set(label_val)) != n_classes:
        warnings.warn(f"Found {len(set(label_val))}/{n_classes} classes in validation data. Increase data size or change random_state.")

    tg = DataGenerator(wav_train,label_train,sf,dt,n_classes,batch_size)
    vg = DataGenerator(wav_val,label_val,sf,dt,n_classes,batch_size)

    model = Conv2D(n_classes,sf,dt)
    cp = ModelCheckpoint(f"models/{model_type}.h5",monitor = "val_loss", 
                         save_best_only = True, save_weights_only = False,
                         mode="auto", save_freq = "epoch",verbose = 0)
    csv_logger = CSVLogger(csv_path,append=False)
    model.fit(tg,validation_data = vg, epochs = 15, verbose = 1,
                callbacks=[csv_logger,cp])


In [57]:
train()

Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
