## 导库

In [8]:
import librosa # 音频处理库
import os
import numpy as np
from tqdm import tqdm
import sys
import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Flatten,LSTM,TimeDistributed,Bidirectional
from keras.layers import Convolution2D, MaxPooling2D
from keras.utils import to_categorical
from keras.callbacks import LearningRateScheduler
import sklearn
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import train_test_split 
from sklearn.model_selection import StratifiedKFold, KFold
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, log_loss
# 分层采样的K折交叉 确保训练集，测试集中各类别样本的比例与原始数据集中相同
import warnings
warnings.filterwarnings('ignore')

In [9]:
dir_base = os.getcwd()
dir_base

'C:\\Users\\14595\\Desktop\\baby婴儿啼哭识别'

## 处理训练数据

In [19]:
DATA_PATH = r'C:\Users\14595\Desktop\baby婴儿啼哭识别\train\\'

def get_labels(path=DATA_PATH):
    labels = os.listdir(path)
    print(labels)
    label_indices = np.arange(0, len(labels)) # 返回[0,1,....len(labels)]步长队列
    return labels, label_indices, to_categorical(label_indices) # 标签， 标签index， index独热编码

def more_feat(wave, sr):
    a = librosa.feature.zero_crossing_rate(wave,sr)
    b = librosa.feature.spectral_centroid(wave,sr=sr)[0]
    a = np.vstack((a,b))
    b = librosa.feature.chroma_stft(wave,sr)
#     a = np.vstack((a,b))
#     b = librosa.feature.spectral_contrast(wave,sr)
    a = np.vstack((a,b))
    b = librosa.feature.spectral_bandwidth(wave,sr)
#     a = np.vstack((a,b))
#     b = librosa.feature.tonnetz(wave,sr)
    a = np.vstack((a,b))
    return a

def save_data_to_array(path=DATA_PATH):
    labels, _, _ = get_labels(path)

    for label in labels:
        # 对每一种标签
        mfcc_vectors = [] # mfcc向量
        
        wavfiles = [path + label + '\\' + wavfile for wavfile in os.listdir(path + '\\' + label)]
        for wavfile in tqdm(wavfiles, "Saving vectors of label - '{}'".format(label)):
            # 对每个音频文件
            mfcc = np.zeros((40, 704))
            wave, sr = librosa.load(wavfile, mono=True, sr=None)
            if wave.shape[0] < 360000:
                wave = np.pad(wave,(0,360000-wave.shape[0]),'constant')
            wave = wave[:360000]
            mfcc_re = librosa.feature.mfcc(wave, sr=8000, n_mfcc=40)
            # (40,704)
            mfcc = sklearn.preprocessing.scale(mfcc_re,axis=1)
            # 更多特征
            a = more_feat(wave, sr)
            norm_a = sklearn.preprocessing.scale(a,axis=1)
            norm_a = np.concatenate((norm_a,mfcc))
            mfcc_vectors.append(norm_a)
            
        mfcc_vectors = np.stack(mfcc_vectors)
        
        np.save(label + '.npy', mfcc_vectors)
        
DATA_TEST_PATH = r'C:\Users\14595\Desktop\baby婴儿啼哭识别\test'
def save_data_to_array_test(path=DATA_TEST_PATH):
    mfcc_vectors = []
        
    wavfiles = [DATA_TEST_PATH + '\\' + wavfile for wavfile in os.listdir(DATA_TEST_PATH)]
    for wavfile in tqdm(wavfiles, "Saving vectors of label - '{}'".format('test')):
        mfcc = np.zeros((40, 704))
        wave, sr = librosa.load(wavfile, mono=True, sr=None)
        if wave.shape[0] < 360000:
            wave = np.pad(wave,(0,360000-wave.shape[0]),'constant')
        wave = wave[:360000]
        mfcc_re = librosa.feature.mfcc(wave, sr=8000, n_mfcc=40)
        # (40,704)
        mfcc = sklearn.preprocessing.scale(mfcc_re,axis=1)
        # 更多特征
        a = more_feat(wave, sr)
        norm_a = sklearn.preprocessing.scale(a,axis=1)
        norm_a = np.concatenate((norm_a,mfcc))
        mfcc_vectors.append(norm_a)
            
    mfcc_vectors = np.stack(mfcc_vectors)
    np.save('test.npy', mfcc_vectors)
        

def get_train_test(split_ratio=0.8, random_state=42):
    labels, indices, _ = get_labels(DATA_PATH)


    X = np.load(labels[0] + '.npy')
    y = np.zeros(X.shape[0])

    for i, label in enumerate(labels[1:]):
        x = np.load(label + '.npy')
        X = np.vstack((X, x))
        y = np.append(y, np.full(x.shape[0], fill_value= (i + 1)))
        # y添加一组x.shape[0]大小的值为i + 1的数据
        # 标签
    return X, y

## 构建模型

In [27]:
def get_model():
    model = Sequential()
    model.add(Convolution2D(32, (3, 3), activation='relu',input_shape = (55,704,1)))
    model.add(MaxPooling2D((2, 2)))
    model.add(Dropout(0.25)) 
    model.add(Convolution2D(32, (3, 3),  activation='relu'))
    model.add(MaxPooling2D(2, 2))
    model.add(Dropout(0.25))
    model.add(Flatten())
    model.add(Dense(6, activation='softmax'))
    model.compile(optimizer='Adam',
                    loss='categorical_crossentropy',
                    metrics=['accuracy'])
    return model

In [20]:
# save_data_to_array()
# save_data_to_array_test()

Saving vectors of label - 'awake':   0%|                                                       | 0/160 [00:00<?, ?it/s]

['awake', 'diaper', 'hug', 'hungry', 'sleepy', 'uncomfortable']


Saving vectors of label - 'awake': 100%|█████████████████████████████████████████████| 160/160 [00:58<00:00,  2.73it/s]
Saving vectors of label - 'diaper': 100%|████████████████████████████████████████████| 134/134 [00:57<00:00,  2.33it/s]
Saving vectors of label - 'hug': 100%|███████████████████████████████████████████████| 160/160 [01:01<00:00,  2.61it/s]
Saving vectors of label - 'hungry': 100%|████████████████████████████████████████████| 160/160 [00:58<00:00,  2.72it/s]
Saving vectors of label - 'sleepy': 100%|████████████████████████████████████████████| 144/144 [00:56<00:00,  2.53it/s]
Saving vectors of label - 'uncomfortable': 100%|█████████████████████████████████████| 160/160 [01:27<00:00,  1.83it/s]
Saving vectors of label - 'test': 100%|██████████████████████████████████████████████| 228/228 [01:36<00:00,  2.37it/s]


In [21]:
X, Y = get_train_test()

['awake', 'diaper', 'hug', 'hungry', 'sleepy', 'uncomfortable']


In [22]:
X.shape

(918, 55, 704)

In [23]:
Y.shape

(918,)

In [17]:
Y

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1.

## 训练模型

In [28]:
skf = StratifiedKFold(n_splits=5)
# 5折交叉

test_pred = np.zeros((228, 6))
for idx, (tr_idx, val_idx) in enumerate(skf.split(X, Y)):
    print(idx)

    epochs = 35 # 训练迭代次数
    batch_size = 8 
    verbose = 2 

    X_train, X_test = X[tr_idx], X[val_idx]
    y_train, y_test = Y[tr_idx], Y[val_idx]
    
    X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], X_train.shape[2], 1) 
    X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], X_test.shape[2], 1)
    print(X_train.shape)
    
    y_train_hot = to_categorical(y_train) 
    print(y_train_hot.shape)
    y_test_hot = to_categorical(y_test)
    
    model = get_model()
    # 初始化模型
    
    my_callbacks = [
        keras.callbacks.EarlyStopping(patience=10), # 早停：patience: 早停轮数
#         keras.callbacks.ModelCheckpoint(filepath='model-{0}.h5'.format(idx), save_best_only=True),
        # 在每个训练期之后保存模型。
        # filepath: 保存模型的路径。
    ]

    model.fit(X_train, y_train_hot, 
              batch_size=batch_size,  
              epochs=epochs,  
              verbose=verbose,  
              validation_data=(X_test, y_test_hot),  # 评估损失数据集
              callbacks=my_callbacks # 回调函数
             )
#     model.load_weights('model-{0}.h5'.format(idx))
    # 加载模型权重
    
    X_test = np.load('test.npy') 
    test_pred += model.predict(X_test.reshape(228, 55, 704, 1)) 

0
(734, 55, 704, 1)
(734, 6)
Epoch 1/35
92/92 - 18s - loss: 2.1995 - accuracy: 0.1798 - val_loss: 1.7859 - val_accuracy: 0.1685
Epoch 2/35
92/92 - 18s - loss: 1.7335 - accuracy: 0.2766 - val_loss: 1.7366 - val_accuracy: 0.2609
Epoch 3/35
92/92 - 18s - loss: 1.4338 - accuracy: 0.4332 - val_loss: 1.7345 - val_accuracy: 0.2609
Epoch 4/35
92/92 - 18s - loss: 1.0557 - accuracy: 0.6117 - val_loss: 1.8415 - val_accuracy: 0.2935
Epoch 5/35
92/92 - 19s - loss: 0.6786 - accuracy: 0.7493 - val_loss: 2.2799 - val_accuracy: 0.2880
Epoch 6/35
92/92 - 17s - loss: 0.3409 - accuracy: 0.8856 - val_loss: 2.4520 - val_accuracy: 0.2826
Epoch 7/35
92/92 - 19s - loss: 0.1775 - accuracy: 0.9428 - val_loss: 3.0481 - val_accuracy: 0.2989
Epoch 8/35
92/92 - 19s - loss: 0.0953 - accuracy: 0.9714 - val_loss: 3.4261 - val_accuracy: 0.3261
Epoch 9/35
92/92 - 17s - loss: 0.0649 - accuracy: 0.9796 - val_loss: 3.5660 - val_accuracy: 0.2989
Epoch 10/35
92/92 - 19s - loss: 0.0528 - accuracy: 0.9850 - val_loss: 3.5743 - v

## 模型选择

In [None]:
test_pred = np.zeros((228, 6))
for path in ['model-0.h5', 'model-2.h5', 'model-6.h5'][:1]:
    model.load_weights(path) # 加载模型权重
    
    X_test = np.load('test.npy') / 255.0
    test_pred += model.predict(X_test.reshape(228, 20, 400, 1))

## 投票

In [None]:
df = pd.DataFrame()
df['id'] = [wavfile for wavfile in os.listdir(DATA_TEST_PATH)]
df['label'] = [['hug', 'sleepy', 'uncomfortable', 'hungry', 'awake', 'diaper'][x] for x in test_pred.argmax(1)]
# x（索引）的取值：argmax(1)：从一维上进行比较，相较于0维，小一个维度 的索引
# 可能性最高的为类别
df.to_csv('.csv', index=None)