In [None]:
# tensorflow 用于ASR的CNN模型代码解读

# 构建CNN模型
class CNNConfig():
    # 网络结构
    filter_sizes = [2, 3, 4, 5] # 卷积核尺寸
    num_filters = 64  # 卷积核维度
    hidden_dim = 256

    # 训练过程
    learning_rate = 0.01
    num_epochs = 100
    batch_size = 256
    dropout_keep_prob = 0.5
    print_per_batch = 100       # 每100次batch迭代，打印训练信息
    save_tb_per_batch = 200
    
class ASRCNN(object):
    def __init__(self, config, width, height, num_classes):  # 20,100
        self.config = config
        # 训练过程
        # 给输入语音特征分配空间
        self.input_x = tf.placeholder(tf.float32, [None, width, height], name='input_x')
        self.input_y = tf.placeholder(tf.float32, [None, num_classes], name='input_y')
        self.keep_prob = tf.placeholder(tf.float32, name='keep_prob')

        # 对输入特征维度调整 由[0,1,2] 转置 [0,2,1]
        input_x = tf.transpose(self.input_x, [0, 2, 1])
        pooled_outputs = []

        # 对输入特征卷积操作
        for i, filter_size in enumerate(self.config.filter_sizes):
            # 从config读入参数
            with tf.name_scope("conv-maxpool-%s" % filter_size): # 给操作名加前缀 conv-maxpool
                # 卷积操作
                conv = tf.layers.conv1d(input_x, self.config.num_filters, filter_size, activation=tf.nn.relu)
                # 池化操作
                pooled = tf.reduce_max(conv, reduction_indices=[1])
                # 池化结果保存
                pooled_outputs.append(pooled)

        num_filters_total = self.config.num_filters * len(self.config.filter_sizes)  # 64*4 (卷积核维度 * 卷积操作数)
        # tf.concat : 将每次卷积-池化的结果拼接 4次, 每次64维, 共256维
        pooled_reshape = tf.reshape(tf.concat(pooled_outputs, 1), [-1, num_filters_total])  # 将4次池化后的特征合并
        # 将256维特征全连接
        fc = tf.layers.dense(pooled_reshape, self.config.hidden_dim, activation=tf.nn.relu, name='fc1')
        # 随机丢弃神经元, keep_prob:为每个神经元的保留概率
        fc = tf.nn.dropout(fc, self.keep_prob)

        # 分类器
        self.logits = tf.layers.dense(fc, num_classes, name='fc2') # 用于分类的网络层，尺寸:[fc, classes]
        self.y_pred_cls = tf.argmax(tf.nn.softmax(self.logits), 1, name="pred")  # 预测类别
        # 损失函数，交叉熵
        cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=self.logits, labels=self.input_y)
        self.loss = tf.reduce_mean(cross_entropy)
        # 优化器
        self.optim = tf.train.AdamOptimizer(learning_rate=self.config.learning_rate).minimize(self.loss)
        # 准确率
        correct_pred = tf.equal(tf.argmax(self.input_y, 1), self.y_pred_cls)
        self.acc = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
       
# 根据batch_size组合特征与标签
def batch_iter(features, labels, batch_size):
    '''
    一个一个batch地生成训练数据
    '''
    assert len(features) == len(labels), \
            "feature and label size do not match!"
    for i in range(int(len(features) / batch_size)):
        begin = i * batch_size
        end = (i + 1) * batch_size
        yield features[begin : end], labels[begin : end]

# 模型保存路径
path = '/tmp/cnn_model'
if os.path.isdir(path):
    pass
else:
    os.mkdir(path)

# 训练CNN模型
width = 20  # mfcc features
height = 100  # (max) length of utterance
classes = 10  # digits

config = CNNConfig
cnn = ASRCNN(config, width, height, classes)
session = tf.Session()
session.run(tf.global_variables_initializer())
saver = tf.train.Saver(tf.global_variables())
checkpoint_path = os.path.join(path+'/', 'model.ckpt')
print(checkpoint_path)

tf.summary.scalar("loss", cnn.loss)
tf.summary.scalar("accuracy", cnn.acc)
merged_summary = tf.summary.merge_all()

total_batch = 0
for epoch in range(config.num_epochs):
    print('Epoch:', epoch + 1)
    batch_train = batch_iter(train_features, train_labels, config.batch_size)
    for x_batch, y_batch in batch_train:

        total_batch += 1

        # 训练一个batch
        train_feed_dict = {
                cnn.input_x: x_batch,
                cnn.input_y: y_batch,
                cnn.keep_prob: config.dropout_keep_prob
                }

        session.run(cnn.optim, feed_dict=train_feed_dict)

        # 检查loss以及acc
        if total_batch % config.print_per_batch == 0:
            # 检查训练的batch的loss以及acc
            train_loss, train_accuracy = session.run(
                    [cnn.loss, cnn.acc], feed_dict=train_feed_dict)

            # 检查验证集的batch的loss以及acc
            valid_feed_dict = {
                    cnn.input_x: valid_features,
                    cnn.input_y: valid_labels,
                    cnn.keep_prob: config.dropout_keep_prob
                    }
            valid_loss, valid_accuracy = session.run(
                    [cnn.loss, cnn.acc], feed_dict=valid_feed_dict)
            print('Steps:' + str(total_batch))
            print('train_loss:' + str(train_loss) +
                    ' train accuracy:' + str(train_accuracy) +
                    '\tvalid_loss:' + str(valid_loss) +
                    ' valid accuracy:' + str(valid_accuracy))

    saver.save(session, checkpoint_path, global_step=epoch)

    test_feed_dict = {
            cnn.input_x: test_features,
            cnn.input_y: test_labels,
            cnn.keep_prob: config.dropout_keep_prob
            }
    test_loss, test_accuracy = session.run([cnn.loss, cnn.acc],
            feed_dict=test_feed_dict)
    print('test_loss:' + str(test_loss) + ' test accuracy:' + str(test_accuracy))

# 保存模型保存路径
print(path)
fh = open(model_out, 'w', encoding='utf-8')
fh.write(path)
fh.close()

###   CNN模型评估
    
# 读取语音数据，并提取梅尔森系数及标准化    
def read_test_wave(path):
    files = os.listdir(path)
    feature = []
    features = []
    label = []
    for wav in files:
        # print(wav)
        if not wav.endswith(".wav"): continue
        ans = int(wav[0])        
        wave, sr = librosa.load(path+'/'+wav, mono=True)
        label.append(ans)
        # print("真实lable: %d" % ans)
        mfcc = librosa.feature.mfcc(wave, sr)
        mfcc = np.pad(mfcc, ((0, 0), (0, 100 - len(mfcc[0]))), mode='constant', constant_values=0)
        feature.append(np.array(mfcc))   
    features = mean_normalize(np.array(feature))
    return features,label

features, label = read_test_wave(data_path)

# 将训练好的CNN模型导入内存，预测对应的数据，并得出结果
print('loading ASRCNN model...')
with tf.Session() as sess:
    saver = tf.train.import_meta_graph(model_path+'/model.ckpt-99.meta')
    saver.restore(sess, tf.train.latest_checkpoint(model_path))  
    graph = tf.get_default_graph()
    input_x = graph.get_tensor_by_name("input_x:0")
    pred = graph.get_tensor_by_name("pred:0")
    keep_prob = graph.get_tensor_by_name("keep_prob:0")
    for i in range(0, len(label)):
        feed_dict = {input_x: features[i].reshape(1,20,100), keep_prob: 1.0}
        test_output = sess.run(pred, feed_dict=feed_dict)

        print("="*15)
        print("真实label: %d" % label[i])
        print("识别结果为:"+str(test_output[0]))
    print("Congratulation!")  