# 這篇程式碼是用來訓練模型
藉由輸入多名講者的資料集，讓模型學習如何分辨多名講者的聲音。<br>
訓練 ResNet50 分類模型，然後刪除最後分類層。<br>
如此便能得到生成**可辨識特定講者的聲音向量**的模型

In [None]:
import os, librosa, random
import tensorflow as tf
import numpy as np

# 獲取浮點數組
def _float_feature(value):
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))


def _int64_feature(value):
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))


# 把數據添加到TFRecord中
def data_example(data, label):
    feature = {
        'data': _float_feature(data),
        'label': _int64_feature(label),
    }
    return tf.train.Example(features=tf.train.Features(feature=feature))


'''
將資料庫分成訓練集與測試集
一則音頻隨機採樣 shards 段同秒數片段
控制變數: 採樣秒數、梅爾頻譜輸出形狀(隨採樣秒數變更)、輸出數據檔檔名
'''
def create_data_tfrecord_by_random(data_path, save_path, shards):
    data = []
    for path in os.listdir(data_path):
        data.append(os.listdir(data_path + path))
    try:
        os.remove(save_path)
    except:
        print('create {}',format(save_path))
    with tf.io.TFRecordWriter(save_path) as writer:
        for label, classpath in enumerate(data):
            print('label: ' + str(label))
            for i, path in enumerate(classpath):
                print('sample: ' + str(i))
                wav, sr = librosa.load(data_path + os.listdir(data_path)[label] + '/' + path, sr=22050)
                intervals = librosa.effects.split(wav, top_db=20)
                wav_output = []
                intervals_wav = []
                # [可能需要修改參數] 音頻長度 22050 * 秒數
                wav_len = int(22050 * 5)
                for sliced in intervals:
                    intervals_wav.extend(wav[sliced[0]:sliced[1]])
                flag = True
                for i in range(shards):
                    # 裁剪過長的音頻，過短的補0
                    if len(intervals_wav) > wav_len:
                        l = len(intervals_wav) - wav_len
                        r = random.randint(0, l)
                        wav_output = intervals_wav[r:wav_len + r]
                    else:
                        wav_output = np.concatenate((intervals_wav, np.zeros(shape=[wav_len - len(intervals_wav)], dtype=np.float32)))
                    wav_output = np.array(wav_output)
                    # 轉成梅爾頻譜
                    ps = librosa.feature.mcff(y=wav_output, sr=sr, n_mels=128, hop_length=256).reshape(-1).tolist()
                    if flag:
                        print('shape: ', librosa.feature.mfcc(y=wav_output, sr=sr, n_mels=128, hop_length=256).shape)
                        flag = False
                    tf_example = data_example(ps, label)
                    writer.write(tf_example.SerializeToString())
                    if len(wav_output) <= wav_len:
                        break

'''
將資料庫分成訓練集與測試集
一則音頻每隔一定秒數取樣同時長片段
控制變數: 間格秒數、採樣秒數、梅爾頻譜輸出形狀(隨採樣秒數變更)、輸出數據檔檔名
'''
def create_data_tfrecord_by_order(data_path, save_path):
    data = []
    for path in os.listdir(data_path):
        data.append(os.listdir(data_path + path))
    try:
        os.remove(save_path)
    except:
        print('create {}'.format(save_path))
    with tf.io.TFRecordWriter(save_path) as writer:
        for label, classpath in enumerate(data):
            print('label: ' + str(label))
            for i, path in enumerate(classpath):
                print('sample: ' + str(i))
                wav, sr = librosa.load(data_path + os.listdir(data_path)[label] + '/' + path, sr=22050)
                intervals = librosa.effects.split(wav, top_db=20)
                intervals_wav = []
                # [可能需要修改參數] 音頻長度 16000 * 秒數
                wav_len = int(22050 * 5)
                for sliced in intervals:
                    intervals_wav.extend(wav[sliced[0]:sliced[1]])
                l = len(intervals_wav) - wav_len
                r = 0
                flag = True
                while r < l:
                    wav_output = intervals_wav[r:wav_len + r]
                    # [可能需要修改參數] 取樣間隔長度 22050 * 秒數
                    r += int(22050 * 1)
                    wav_output = np.array(wav_output)
                    # 轉成梅爾頻譜
                    ps = librosa.feature.mfcc(y=wav_output, sr=sr, n_mfcc=128, hop_length=256).reshape(-1).tolist()
                    if flag:
                        print('shape: ', librosa.feature.mfcc(y=wav_output, sr=sr, n_mfcc=128, hop_length=256).shape)
                        flag = False
                    tf_example = data_example(ps, label)
                    writer.write(tf_example.SerializeToString())
                    if len(wav_output) <= wav_len:
                        break

In [None]:
create_data_tfrecord_by_order('./source/train/', './data/mfcc_train.tfrecord')
create_data_tfrecord_by_order('./source/test/', './data/mfcc_test.tfrecord')

In [None]:
import tensorflow as tf

class_dim = 50
EPOCHS = 500
BATCH_SIZE= 32
NAME = 'train_mfcc1'

model = tf.keras.models.Sequential([
    tf.keras.applications.ResNet50V2(include_top=False, weights=None, input_shape=(128, None, 1)),
    tf.keras.layers.ActivityRegularization(l2=0.5),
    tf.keras.layers.Dropout(rate=0.5),
    tf.keras.layers.GlobalMaxPooling2D(name='global_max_pooling'),
    tf.keras.layers.Dense(units=class_dim, activation=tf.nn.softmax)
])

model.summary()

# 定義優化方法
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 resnet50v2 (Functional)     (None, 4, None, 2048)     23558528  
                                                                 
 activity_regularization (Ac  (None, 4, None, 2048)    0         
 tivityRegularization)                                           
                                                                 
 dropout (Dropout)           (None, 4, None, 2048)     0         
                                                                 
 global_max_pooling (GlobalM  (None, 2048)             0         
 axPooling2D)                                                    
                                                                 
 dense (Dense)               (None, 50)                102450    
                                                                 
Total params: 23,660,978
Trainable params: 23,615,538
No

In [None]:
def _parse_data_function(example):
    # [可能需要修改參數】 設置的梅爾頻譜的shape相乘的值
    data_feature_description = {
        'data': tf.io.FixedLenFeature([128*431], tf.float32),
        'label': tf.io.FixedLenFeature([], tf.int64),
    }
    return tf.io.parse_single_example(example, data_feature_description)


def train_reader_tfrecord(data_path, num_epochs, batch_size):
    raw_dataset = tf.data.TFRecordDataset(data_path)
    train_dataset = raw_dataset.map(_parse_data_function)
    train_dataset = train_dataset.shuffle(buffer_size=1000) \
        .repeat(count=num_epochs) \
        .batch(batch_size=batch_size) \
        .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    return train_dataset


def test_reader_tfrecord(data_path, batch_size):
    raw_dataset = tf.data.TFRecordDataset(data_path)
    test_dataset = raw_dataset.map(_parse_data_function)
    test_dataset = test_dataset.batch(batch_size=batch_size)
    return test_dataset

In [None]:
train_dataset = train_reader_tfrecord('./data/mfcc_train.tfrecord', EPOCHS, batch_size=BATCH_SIZE)
test_dataset = test_reader_tfrecord('./data/mfcc_test.tfrecord', batch_size=BATCH_SIZE)

In [None]:
import numpy as np
import os

try:
  os.mkdir('./models/' + NAME + '/')
except Exception as e:
  print(e)

mylog = open('./logs/{}.log'.format(NAME), 'w')

no_optim = 0

train_epoch_loss = 0
train_epoch_best_loss = 6
for batch_id, data in enumerate(train_dataset):
    # [可能需要修改參數】 設置的梅爾頻譜的shape
    sounds = data['data'].numpy().reshape((-1, 128, 431, 1))
    labels = data['label']
    # 執行訓練
    with tf.GradientTape() as tape:
        predictions = model(sounds)
        # 獲取損失值
        train_loss = tf.keras.losses.sparse_categorical_crossentropy(labels, predictions)
        train_loss = tf.reduce_mean(train_loss)
        # 獲取準確率
        train_accuracy = tf.keras.metrics.sparse_categorical_accuracy(labels, predictions)
        train_accuracy = np.sum(train_accuracy.numpy()) / len(train_accuracy.numpy())

        train_epoch_loss = train_loss

    #為避免過擬合，當損失值開始上升即時停損
    if train_epoch_loss >= train_epoch_best_loss:
        no_optim += 1
    else:
        no_optim = 0
        train_epoch_best_loss = train_epoch_loss
    if no_optim > 300:
        if train_epoch_loss < 5e-1:
            print('early stop at %d epoch' % batch_id, file=mylog)
            print('early stop at %d epoch' % batch_id)
            break
    if no_optim > 100:
        if train_epoch_loss < 5e-6:
            print('early stop at %d epoch' % batch_id, file=mylog)
            print('early stop at %d epoch' % batch_id)
            break

    # 更新梯度
    gradients = tape.gradient(train_loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    if batch_id % 20 == 0:
        print("Batch %d, Loss %f, Accuracy %f" % (batch_id, train_loss.numpy(), train_accuracy), file = mylog)
        print("Batch %d, Loss %f, Accuracy %f" % (batch_id, train_loss.numpy(), train_accuracy))
    if batch_id % 100 == 0 and batch_id != 0:
        test_losses = list()
        test_accuracies = list()
        for d in test_dataset:
            # [可能需要修改參數】 設置的梅爾頻譜的shape
            test_sounds = d['data'].numpy().reshape((-1, 128, 431, 1))
            test_labels = d['label']

            test_result = model(test_sounds)
            # 獲取損失值
            test_loss = tf.keras.losses.sparse_categorical_crossentropy(test_labels, test_result)
            test_loss = tf.reduce_mean(test_loss)
            test_losses.append(test_loss)
            # 獲取準確率
            test_accuracy = tf.keras.metrics.sparse_categorical_accuracy(test_labels, test_result)
            test_accuracy = np.sum(test_accuracy.numpy()) / len(test_accuracy.numpy())
            test_accuracies.append(test_accuracy)


        print('=================================================', file = mylog)
        print("Test, Loss %f, Accuracy %f" % (
            sum(test_losses) / len(test_losses), sum(test_accuracies) / len(test_accuracies)), file = mylog)
        print('=================================================', file = mylog)

        print('=================================================')
        print("Test, Loss %f, Accuracy %f" % (
            sum(test_losses) / len(test_losses), sum(test_accuracies) / len(test_accuracies)))
        print('=================================================')

        # 保存模型
        model.save(filepath='models/' + NAME + '/resnet.h5')
        model.save_weights(filepath='models/' + NAME + '/model_weights.h5')
mylog.close()