In [1]:

from tensorflow.keras import layers
from tensorflow.keras.layers import TimeDistributed, BatchNormalization
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import l2
from kapre.time_frequency import Melspectrogram
from kapre.utils import Normalization2D
from kapre.augmentation import AdditiveNoise
import tensorflow as tf
import os

In [2]:
def Conv1D(N_CLASSES=10, SR=16000, DT=1.0):
    i = layers.Input(shape=(1, int(SR*DT)), name='input')
    x = Melspectrogram(n_dft=512, n_hop=160,
                       padding='same', sr=SR, n_mels=128,
                       fmin=0.0, fmax=SR/2, power_melgram=2.0,
                       return_decibel_melgram=True, trainable_fb=False,
                       trainable_kernel=False,
                       name='melbands')(i)
    x = Normalization2D(str_axis='batch', name='batch_norm')(x)
    x = layers.Permute((2,1,3), name='permute')(x)
    x = TimeDistributed(layers.Conv1D(8, kernel_size=(4), activation='tanh'), name='td_conv_1d_tanh')(x)
    x = layers.MaxPooling2D(pool_size=(2,2), name='max_pool_2d_1')(x)
    x = TimeDistributed(layers.Conv1D(16, kernel_size=(4), activation='relu'), name='td_conv_1d_relu_1')(x)
    x = layers.MaxPooling2D(pool_size=(2,2), name='max_pool_2d_2')(x)
    x = TimeDistributed(layers.Conv1D(32, kernel_size=(4), activation='relu'), name='td_conv_1d_relu_2')(x)
    x = layers.MaxPooling2D(pool_size=(2,2), name='max_pool_2d_3')(x)
    x = TimeDistributed(layers.Conv1D(64, kernel_size=(4), activation='relu'), name='td_conv_1d_relu_3')(x)
    x = layers.MaxPooling2D(pool_size=(2,2), name='max_pool_2d_4')(x)
    x = TimeDistributed(layers.Conv1D(128, kernel_size=(4), activation='relu'), name='td_conv_1d_relu_4')(x)
    x = layers.GlobalMaxPooling2D(name='global_max_pooling_2d')(x)
    x = layers.Dropout(rate=0.1, name='dropout')(x)
    x = layers.Dense(64, activation='relu', activity_regularizer=l2(0.001), name='dense')(x)
    o = layers.Dense(N_CLASSES, activation='softmax', name='softmax')(x)

    model = Model(inputs=i, outputs=o, name='1d_convolution')
    model.compile(optimizer='adam',
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    return model


def Conv2D(N_CLASSES=10, SR=16000, DT=1.0):
    initializer = tf.keras.initializers.TruncatedNormal(seed=None)
    
    i = layers.Input(shape=(1, int(SR*DT)), name='input')
    x = Melspectrogram(n_dft=512, n_hop=160,
                       padding='same', sr=SR, n_mels=128,
                       fmin=0.0, fmax=SR/2, power_melgram=2.0,
                       return_decibel_melgram=True, trainable_fb=False,
                       trainable_kernel=False,
                       name='melbands')(i)
    x = AdditiveNoise(power=10., random_gain=True, noise_type='white')(x)
    x = Normalization2D(str_axis='batch', name='batch_norm')(x)
    x = layers.Conv2D(8, kernel_size=(7,7), activation='tanh',kernel_initializer=initializer, padding='same', name='conv2d_tanh')(x)
    x = layers.MaxPooling2D(pool_size=(2,2), padding='same', name='max_pool_2d_1')(x)
    x = layers.Conv2D(16, kernel_size=(5,5), activation='relu',kernel_initializer=initializer, padding='same', name='conv2d_relu_1')(x)
    x = layers.MaxPooling2D(pool_size=(2,2), padding='same', name='max_pool_2d_2')(x)
    x = layers.Conv2D(16, kernel_size=(3,3), activation='relu',kernel_initializer=initializer, padding='same', name='conv2d_relu_2')(x)
    x = layers.MaxPooling2D(pool_size=(2,2), padding='same', name='max_pool_2d_3')(x)
    x = layers.Conv2D(32, kernel_size=(3,3), activation='relu',kernel_initializer=initializer, padding='same', name='conv2d_relu_3')(x)
    x = layers.MaxPooling2D(pool_size=(2,2), padding='same', name='max_pool_2d_4')(x)
    x = layers.Conv2D(32, kernel_size=(3,3), activation='relu',kernel_initializer=initializer, padding='same', name='conv2d_relu_4')(x)
    x = layers.Flatten(name='flatten')(x)
    x = layers.Dropout(rate=0.5, name='dropout')(x)
    x = layers.Dense(64, activation='relu',kernel_initializer=initializer, activity_regularizer=l2(0.001), name='dense')(x)
    o = layers.Dense(N_CLASSES,kernel_initializer=initializer, activation='softmax', name='softmax')(x)

    model = Model(inputs=i, outputs=o, name='2d_convolution')
    model.summary()
    initial_learning_rate = 0.06
    lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate,
        decay_steps=500,
        decay_rate=0.94,
        staircase=True)
    
    opt = tf.keras.optimizers.Adagrad(learning_rate=lr_schedule)
    
    
    model.compile(optimizer=opt,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    return model


def LSTM(N_CLASSES=10, SR=16000, DT=1.0):
    i = layers.Input(shape=(1, int(SR*DT)), name='input')
    x = Melspectrogram(n_dft=512, n_hop=160,
                       padding='same', sr=SR, n_mels=128,
                       fmin=0.0, fmax=SR/2, power_melgram=2.0,
                       return_decibel_melgram=True, trainable_fb=False,
                       trainable_kernel=False,
                       name='melbands')(i)
    x = Normalization2D(str_axis='batch', name='batch_norm')(x)
    x = layers.Permute((2,1,3), name='permute')(x)
    x = TimeDistributed(layers.Reshape((-1,)), name='reshape')(x)
    s = TimeDistributed(layers.Dense(64, activation='tanh'),
                        name='td_dense_tanh')(x)
    x = layers.Bidirectional(layers.LSTM(32, return_sequences=True),
                             name='bidirectional_lstm')(s)
    x = layers.concatenate([s, x], axis=2, name='skip_connection')
    x = layers.Dense(64, activation='relu', name='dense_1_relu')(x)
    x = layers.MaxPooling1D(name='max_pool_1d')(x)
    x = layers.Dense(32, activation='relu', name='dense_2_relu')(x)
    x = layers.Flatten(name='flatten')(x)
    x = layers.Dropout(rate=0.2, name='dropout')(x)
    x = layers.Dense(32, activation='relu',
                         activity_regularizer=l2(0.001),
                         name='dense_3_relu')(x)
    o = layers.Dense(N_CLASSES, activation='softmax', name='softmax')(x)

    model = Model(inputs=i, outputs=o, name='long_short_term_memory')
    model.compile(optimizer='adam',
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    return model

In [3]:
import tensorflow as tf
from tensorflow.keras.callbacks import CSVLogger, ModelCheckpoint
from tensorflow.keras.utils import to_categorical
import os
from scipy.io import wavfile
import pandas as pd
import numpy as np
from sklearn.utils.class_weight import compute_class_weight
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
#from models import Conv1D, Conv2D, LSTM
from tqdm import tqdm
from glob import glob
import argparse
import warnings
import pyrubberband as pyrb
import matplotlib.pyplot as plt
import random
import math


In [4]:
#audio data augmentation functions\
def save_sample(sample, rate, target_dir, fn):
    fn = fn.split('.wav')[0]
    dst_path = os.path.join(target_dir.split('.')[0], fn+'.wav')
    if os.path.exists(dst_path):
        return
    wavfile.write(dst_path, rate, sample)
    
def check_dir(path):
    if os.path.exists(path) is False:
        os.mkdir(path)   
        
def augmentation():
    src_root = 'data/clean'
    dst_root = 'data/temp_epoch'


    wav_paths = glob('{}/**'.format(src_root), recursive=True)
    wav_paths = [x for x in wav_paths if '.wav' in x]
    dirs = os.listdir(src_root)
    check_dir(dst_root)
    classes = os.listdir(src_root)
    for _cls in classes:
        target_dir = os.path.join(dst_root, _cls)
        check_dir(target_dir)
        src_dir = os.path.join(src_root, _cls)
        for fn in tqdm(os.listdir(src_dir)):
            src_fn = os.path.join(src_dir, fn)
            rate, wav = wavfile.read(src_fn)
            #wav_pitch = manipulation_pitch(wav, rate)
            wav_volum = manipulation_volum(wav_pitch, rate)
            wav = manipulation_noise(wav_volum, rate)
            save_sample(wav, rate, target_dir, fn)

def manipulation_pitch(wav, rate):
    pitch_factor = round(random.uniform(-5.0, 10.0),2)
    signal = pyrb.pitch_shift(wav, rate, pitch_factor)
    return signal

def manipulation_volum(wav, rate):
    volum_factor = round(random.uniform(0.2, 3.0),2)
    signal = np.clip(wav*volum_factor, -1, 1)
    return signal

def manipulation_noise(wav, rate):
    RMS=math.sqrt(np.mean(wav**2))
    noise=np.random.normal(0, RMS, wav.shape[0])
    noise_energy = np.sqrt(noise.dot(noise) / noise.size)
    data_energy = np.sqrt(wav.dot(wav) / wav.size)
    
    coeff = round(random.uniform(0.0, 0.4),2)
    wav += coeff * noise * data_energy / noise_energy
    return wav

In [5]:
class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, wav_paths, labels, sr, dt, n_classes,
                 batch_size=32, shuffle=True):
        self.wav_paths = wav_paths
        self.labels = labels
        self.sr = sr
        self.dt = dt
        self.n_classes = n_classes
        self.batch_size = batch_size
        self.shuffle = True
        self.on_epoch_end()


    def __len__(self):
        return int(np.floor(len(self.wav_paths) / self.batch_size))


    def __getitem__(self, index):
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        wav_paths = [self.wav_paths[k] for k in indexes]
        labels = [self.labels[k] for k in indexes]

        # generate a batch of time data
        X, Y = self.__data_generation(wav_paths, labels)
        return X, Y

    def on_epoch_end(self):
        #augmentation()
        self.indexes = np.arange(len(self.wav_paths))
        if self.shuffle:
            np.random.shuffle(self.indexes)
            
    def __data_generation(self, wav_paths, labels):
        X = np.empty((self.batch_size, 1, int(self.sr*self.dt)), dtype=np.float32)
        Y = np.empty((self.batch_size, self.n_classes), dtype=np.float32)

        for i, (path, label) in enumerate(zip(wav_paths, labels)):
            #print(path+str(label))
            rate, wav = wavfile.read(path)
            #wav = manipulation_pitch(wav, rate)
            wav = manipulation_volum(wav, rate)
            #wav = manipulation_noise(wav, rate)
            X[i,] = wav.reshape(1, -1)
            Y[i,] = to_categorical(label, num_classes=self.n_classes)
            
        return X, Y
        


def train(args):
    src_root = args.src_root
    sr = args.sample_rate
    dt = args.delta_time
    batch_size = args.batch_size
    model_type = args.model_type
    params = {'N_CLASSES':len(os.listdir(args.src_root)),
              'SR':sr,
              'DT':dt}
    models = {'conv1d':Conv1D(**params),
              'conv2d':Conv2D(**params),
              'lstm':  LSTM(**params)}
    assert model_type in models.keys(), '{} not an available model'.format(model_type)
    csv_path = os.path.join('logs', '{}_history.csv'.format(model_type))

    wav_paths = glob('{}/**'.format(src_root), recursive=True)
    wav_paths = [x.replace(os.sep, '/') for x in wav_paths if '.wav' in x]
    #print(wav_paths)
    classes = sorted(os.listdir(args.src_root))
    le = LabelEncoder()
    le.fit(classes)
    labels = [os.path.split(x)[0].split('/')[-1] for x in wav_paths]
    labels = le.transform(labels)
    wav_train, wav_val, label_train, label_val = train_test_split(wav_paths,
                                                                  labels,
                                                                  test_size=0.2,
                                                                  random_state=0)

    assert len(label_train) >= args.batch_size, 'Number of train samples must be >= batch_size'
    if len(set(label_train)) != params['N_CLASSES']:
        warnings.warn('Found {}/{} classes in training data. Increase data size or change random_state.')
    if len(set(label_val)) != params['N_CLASSES']:
        warnings.warn('Found {}/{} classes in validation data. Increase data size or change random_state.')

    tg = DataGenerator(wav_train, label_train, sr, dt,
                       params['N_CLASSES'], batch_size=batch_size)
    vg = DataGenerator(wav_val, label_val, sr, dt,
                       params['N_CLASSES'], batch_size=batch_size)
    model = models[model_type]
    cp = ModelCheckpoint('models/{}.h5'.format(model_type), monitor='val_loss',
                         save_best_only=True, save_weights_only=False,
                         mode='auto', save_freq='epoch', verbose=1)
    csv_logger = CSVLogger(csv_path, append=False)
    history = model.fit(tg, validation_data=vg,
                  epochs=200, verbose=1,callbacks=[csv_logger, cp])
    # 绘制训练 & 验证的准确率值
    plt.plot(history.history['acc'])
    plt.plot(history.history['val_acc'])
    plt.title('Model accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Test'], loc='upper left')
    plt.show()

    # 绘制训练 & 验证的损失值
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Test'], loc='upper left')
    plt.show()
    

In [6]:
%matplotlib inline
if __name__ == '__main__':

    parser = argparse.ArgumentParser(description='Audio Classification Training')
    parser.add_argument('--model_type', type=str, default='conv2d',
                        help='model to run. i.e. conv1d, conv2d, lstm')
    parser.add_argument('--src_root', type=str, default='data/clean1',
                        help='directory of audio files in total duration')
    parser.add_argument('--batch_size', type=int, default=64,
                        help='batch size')
    parser.add_argument('--delta_time', '-dt', type=float, default=4.0,
                        help='time in seconds to sample audio')
    parser.add_argument('--sample_rate', '-sr', type=int, default=16000,
                        help='sample rate of clean audio')
    args, _ = parser.parse_known_args()

    train(args)



Model: "2d_convolution"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input (InputLayer)           [(None, 1, 64000)]        0         
_________________________________________________________________
melbands (Melspectrogram)    (None, 128, 400, 1)       296064    
_________________________________________________________________
additive_noise (AdditiveNois (None, 128, 400, 1)       0         
_________________________________________________________________
batch_norm (Normalization2D) (None, 128, 400, 1)       0         
_________________________________________________________________
conv2d_tanh (Conv2D)         (None, 128, 400, 8)       400       
_________________________________________________________________
max_pool_2d_1 (MaxPooling2D) (None, 64, 200, 8)        0         
_________________________________________________________________
conv2d_relu_1 (Conv2D)       (None, 64, 200, 16)    

KeyError: 'acc'

In [7]:
from tensorflow.keras.models import load_model


In [8]:
model = load_model('models/conv2d.h5',
        custom_objects={'Melspectrogram':Melspectrogram,
                        'Normalization2D':Normalization2D,
                        'AdditiveNoise':AdditiveNoise},compile = False)
model.save_weights('models/weights.h5')

int_axis=-1 passed but is ignored, str_axis is used instead.
