In [43]:
import tensorflow as tf
from tensorflow.keras import layers 
from tensorflow.keras import models
from tensorflow.keras.layers.experimental import preprocessing
import numpy as np
import pickle
import os

In [None]:
with open('./x.pkl', 'rb') as f:
    x = pickle.load(f)
with open('./y.pkl', 'rb') as f:
    y = pickle.load(f)

In [2]:
X = tf.expand_dims(x, axis=-1).numpy()[:-2]
Y = y[:-2]

In [3]:
X.shape

(800, 584, 129, 1)

归一化

In [4]:
X = (X - tf.math.reduce_mean(X))/tf.math.reduce_std(X)

In [5]:
Y.shape

(800, 435)

### 建立DataSet

In [7]:
AUTOTUNE = tf.data.AUTOTUNE
audio_ds = tf.data.Dataset.from_tensor_slices((X,Y))
audio_ds= audio_ds.cache().prefetch(AUTOTUNE)

### 搭建模型

In [36]:
class Encoder(layers.Layer): # 为什么这里调用的是layers.Layer? 
    def __init__(self, units, batch_size, rate=0.2):
        super(Encoder, self).__init__()
        self.batch_sz = batch_size
        self.units = units
        self.conv1 = layers.Conv2D(32, 3, activation='relu')
        self.conv2 = layers.Conv2D(64, 3, activation='relu')
        self.pool1 = layers.MaxPooling2D()
        self.drop = layers.Dropout(rate)
        self.norm = layers.BatchNormalization()
        self.pool2 = layers.GlobalMaxPool2D()
        self.gru = tf.keras.layers.GRU(self.units,
                               return_sequences=True,
                               return_state=True,
                               dropout = 0.2,
                               recurrent_initializer='glorot_uniform')
        layers.GRU(units,
                   dropout = rate,
                   recurrent_initializer='glorot_uniform')
    
    
    def call(self, eninput, hidden):
        x = self.conv1(eninput)
        x = self.conv2(x)
        x = self.pool1(x)
        x = self.drop(x)
        x = self.norm(x)
#         print('x:',x.shape)
        x = self.pool2(x)
        x = tf.expand_dims(x,axis=1)
#         print('maxpool:',x.shape)
        output, state = self.gru(x, initial_state=hidden)
#         print('EncoderShape:',output.shape)
        return output, state
    
    def initialize_hidden_state(self):
        return tf.zeros((self.batch_sz,self.units))

In [37]:
class Attention(layers.Layer):
    def __init__(self, units):
        super(Attention, self).__init__()
        self.W1 = layers.Dense(units)
        self.W2 = layers.Dense(units)
        self.V = layers.Dense(1)
        
    def call(self, query, values):
        # [batch_size, 1, hidden] 为什么
        query_time = tf.expand_dims(query, 1)
        # [batch_size, units, 1] 为什么只输出一个呢
        score = self.V(tf.nn.tanh(self.W1(query_time) + self.W2(values)))
        # 在维度为[1]上做softmax
        attn_weights = tf.nn.softmax(score , axis=1)
        # 对values进行加权，并在维度为[1]上计算其sum
        context_vec = attn_weights*values
        context_vec = tf.reduce_sum(context_vec, axis=1)
#         print('context_vec_shape:',context_vec.shape)
        return context_vec, attn_weights

In [38]:
class Decoder(layers.Layer):
    def __init__(self, units, vocab_size, output_dim, rate=0.2):
        super(Decoder, self).__init__()
        self.embedding = layers.Embedding(1,output_dim)
        self.gru = layers.GRU(units,return_sequences=True, 
                           return_state=True,dropout = rate,
                           recurrent_initializer='glorot_uniform')
        self.attn = Attention(units)
        self.dense = layers.Dense(vocab_size)
    
    # TODO: using beam_search和teacher forcing
    def call(self, deinput, hidden, enoutput):
        # 计算decoder的hidden和encoder的ouput的context_vec
        # [batch_size, hidden_size]
        context, attn = self.attn(hidden, enoutput)
        x = self.embedding(deinput)
#         print("embeddingShape",x.shape)
        # decoder的输入加上context_vec
        # [batch_size,1,hidden_size] + [batch_size,max_len,features]
        x = tf.concat([tf.expand_dims(context,axis=1),x], axis=-1)
        # output 输出[batch_size,1,hidden_size]
        output, state = self.gru(x)
        # reshape为 [batch_size, hidden_size]
        output = tf.reshape(output, (-1, output.shape[2]))
        res = self.dense(output)
#         print("resultShape",res.shape)

        return res 

### 自定义损失函数

In [39]:
def compute_ctc_loss(logits, labels, logit_length, label_length):
    return tf.nn.ctc_loss(labels=labels,
                           logits=logits,
                           logit_length=logit_length,
                           label_length=label_length,
                           logits_time_major=False,
                           unique=None,
                           blank_index=-1,
                           name=None
                         )

### 定义模型参数

In [40]:
train_loss = tf.keras.metrics.Mean(name='train_loss')
# train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
optimizer = tf.keras.optimizers.Adam()

### 模型参数初始化

In [41]:
units = 29
vocab_size = 29
batch_size = 32
embedding_dim = 29
encoder = Encoder(units, batch_size)
attn = Attention(10)
decoder = Decoder(units, vocab_size, embedding_dim)

In [14]:
audio_ds = audio_ds.batch(batch_size)

### 保存模型权重

In [44]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(optimizer=optimizer,
                                 encoder=encoder,
                                 decoder=decoder)

### 模型训练

In [45]:
@tf.function
def train_step(x, y, enc_hidden):
    # 使用GradientTape实现梯度下降
    with tf.GradientTape() as tape:
        enc_output, enc_hidden = encoder(x, enc_hidden)
        dec_hidden = enc_hidden
        # 初始化采用0-29随机整数
        dec_input = tf.random.uniform((batch_size,1),maxval=30,dtype=tf.int32)
        for t in range(1, y.shape[1]):
            logits = tf.expand_dims(decoder(dec_input, dec_hidden, enc_output),1)
            label = tf.expand_dims(y[:,t],1)
            logit_length = [logits.shape[1]]*logits.shape[0]
            label_length = [label.shape[1]]*label.shape[0]
            loss = compute_ctc_loss(logits, label, logit_length, label_length)
            dec_input = tf.expand_dims(y[:,t],1)
    # 计算梯度变量
    variables = encoder.trainable_variables + decoder.trainable_variables
    # 梯度下降
    grads = tape.gradient(loss, variables)
    # 优化
    optimizer.apply_gradients(zip(grads, variables))
    # loss
    train_loss(loss)
#     train_accuracy(label, logits)

In [None]:
EPOCHS = 5

for epoch in range(EPOCHS):
    train_loss.reset_states()
#     train_accuracy.reset_states()
    enc_hidden = encoder.initialize_hidden_state()
    for x, y in audio_ds:
        train_step(x,y,enc_hidden)
    # 保存权重    
    if (epoch + 1) % 2 == 0:
        checkpoint.save(file_prefix=checkpoint_prefix)
    print(f"Epoch{epoch+1},"
          f"Loss:{train_loss.result()}"
#           f"Accuracy:{train_accuracy.result()*100}"
         )

x: (32, 290, 62, 64)
x: (32, 290, 62, 64)


### 加载模型权重

In [None]:
# restoring the latest checkpoint in checkpoint_dir
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

In [None]:
transcript = 'BUT IN LESS THAN FIVE MINUTES THE STAIRCASE GROANED BENEATH AN EXTRAORDINARY WEIGHT'.lower()
sample_call = '84-121123-0001.flac'

In [None]:
import librosa

In [None]:
def create_spectrogram(signals):
    '''
    function to create spectrogram from signals loaded from an audio file
    :param signals:
    :return:
    '''
    stfts = tf.signal.stft(signals, frame_length=200, frame_step=80, fft_length=256)
    spectrograms = tf.math.pow(tf.abs(stfts), 0.5)
    return spectrograms

In [None]:
def generate_audio(audio, sample_rate=8000):
    # 加载信号
    signal,_  = librosa.load(audio, sr=sample_rate)
    # 产生spectrogram
    spectro = create_spectrogram(signal)
    # 归一化
    means = tf.math.reduce_mean(spectro, 1, keepdims=True)
    stddevs = tf.math.reduce_std(spectro, 1, keepdims=True)
    X = tf.divide(tf.subtract(spectro, means), stddevs)
    return X

In [None]:
inputs = generate_audio(sample_call)
inputs = tf.reshape(inputs, (1,397,129,1))

In [None]:
hidden = [tf.zeros((1, units))]
enc_out, enc_hidden = encoder(inputs, hidden)
dec_hidden = enc_hidden

for i in range(435):
    dec_input = tf.random.uniform((1,1,29))
    output = decoder(dec_input, dec_hidden, enc_out)
    output = tf.nn.log_softmax(output)    
    predicted_id = tf.argmax(output[0]).numpy()
    output_text += alphabet[predicted_id]

In [None]:
a = []
for l in range(len(output_text)-1):
    if output_text[l] != output_text[l+1]:
        a.append(output_text[l])
a = "".join(a)

In [None]:
a

In [None]:
output = tf.nn.log_softmax(output)

In [None]:
output.shape

In [None]:
output[0].shape

In [None]:
from string import ascii_lowercase

In [None]:
predicted_id = tf.argmax(output[0]).numpy()

In [None]:
# greedy decoding
space_token = ' '
end_token = '>'
blank_token = '%'
alphabet = list(ascii_lowercase) + [space_token, end_token, blank_token]
output_text = ''

In [None]:
alphabet[predicted_id]

In [None]:
# greedy decoding
space_token = ' '
end_token = '>'
blank_token = '%'
alphabet = list(ascii_lowercase) + [space_token, end_token, blank_token]
output_text = ''
for timestep in output[0]:
    output_text += alphabet[tf.math.argmax(timestep)]

a = []
for l in range(len(output_text)-1):
    if output_text[l] != output_text[l+1]:
        a.append(output_text[l])
a = "".join(a)
a = a.replace('%', '')
print(a)