# CTCを用いた可変長音素認識

これまでの検討では、訓練用データに時間情報付きの音素ラベルが与えられていることが前提でした。しかし、ほとんどの音声データベースには時間情報がなく、発話内容しか与えられていません。これは、音素の時間情報を同定する作業が非常に高コストだからです。

このため、ニューラルネットを用いた音声認識は、これまでは学習時にHMM(隠れマルコフモデル)を用いた従来の音声認識を援用し、はじめに各音素の区間を音声認識によって推定する必要がありました。

これに対し、近年ニューラルネットだけで音声認識を可能とする枠組がいくつか提案されています。CTC (Connectionist Temporal Classification) は、そのようなアルゴリズムの1つです。

CTC は、基本的には LSTM を用いたフレームごとの音素認識に過ぎません。しかし、損失関数が異なります。CTC では、縮約すると正解音素列と一致するような全ての音素列の確率の和をネットワークの出力の「望ましさ」と考え、その対数にマイナスを付けた関数を損失関数とします。例えば、縮約すると /h a i/ という音素列と一致するような長さ5の音素列を全て挙げると

In [0]:
from itertools import combinations, combinations_with_replacement
import numpy as np

def expandstr(str, seqlen):
    l = []
    for expdstrlen in range(len(str), seqlen+1):
        combos = combinations(range(1, expdstrlen), len(str)-1)
        for sp in combos:
            tmp = np.array(sp)
            clens = np.append(tmp,expdstrlen) - np.insert(tmp,0,0)
            bstr = "".join([c * clens[i] for i, c in enumerate(str)])
            bcombos = combinations_with_replacement(range(0,len(str)+1), seqlen-expdstrlen)
            for bcombo in bcombos:
                xstr = ""
                for i in range(len(str)+1):
                    xstr += '_' * bcombo.count(i)
                    if i < len(str):
                        xstr += str[i] * clens[i]
                l.append(xstr)
    return l

In [0]:
expandstr('hai',5)

['__hai',
 '_h_ai',
 '_ha_i',
 '_hai_',
 'h__ai',
 'h_a_i',
 'h_ai_',
 'ha__i',
 'ha_i_',
 'hai__',
 '_haii',
 'h_aii',
 'ha_ii',
 'haii_',
 '_haai',
 'h_aai',
 'haa_i',
 'haai_',
 '_hhai',
 'hh_ai',
 'hha_i',
 'hhai_',
 'haiii',
 'haaii',
 'haaai',
 'hhaii',
 'hhaai',
 'hhhai']

となります。ただし _ はどの音素でもない記号で、ブランクと呼びます。

以下、ネットワークの定義はLSTMの時と同じです。

In [0]:
import chainer
import chainer.links as L
import chainer.functions as F

class RNN(chainer.Chain):

    def __init__(self, n_lstm_layers=1, n_mid_units=100, n_out=41, dropout=0.2):
        super(RNN, self).__init__()

        # パラメータを持つ層の登録
        with self.init_scope():
            self.l1 = L.Linear(None, n_mid_units)
            self.l2 = L.NStepBiLSTM(n_lstm_layers, n_mid_units, n_mid_units, dropout)
            self.l3 = L.Linear(n_mid_units * 2, n_out)

    def __call__(self, x):
        # データを受け取った際のforward計算を書く
        h1 = [ F.relu(self.l1(X)) for X in x ]
        hy, cy, ys = self.l2(None, None, h1)
        h2 = F.concat(ys, axis=0)
        return self.l3(h2)

In [0]:
from chainer.dataset import to_device
from chainer.dataset import concat_examples
import numpy as np

def converter(batch, device):
    # alternative to chainer.dataset.concat_examples
    
    Xs = [to_device(device, X) for X, _, __ in batch]
    ts = [to_device(device, t) for _, t, __ in batch]

    lab_batch = [lab.astype(np.int32) for _, __, lab in batch]
    labs = concat_examples(lab_batch, device, padding=0)
    
    return Xs, ts, labs

## 損失関数

CTCの損失関数は、`connectionist_temporal_classification`関数で計算できます。この関数は、これまでのようにフレームごとでなく発話全体に対して計算されます。各フレームの正解音素ラベル `t` を必要とせず、発話全体の音素ラベル列 `lab` さえあればよい点が重要なポイントです。

In [0]:
import numpy as np
from chainer.backends import cuda

def calculate_loss(model, batch, blank_symbol, gpu_id=0, train_mode=True):
    numframes = [X.shape[0] for (X, t, lab) in batch]
    
    # lab を行列に変形
    label_length = xp.array([len(lab) for (X, _t, lab) in batch],dtype=np.int32)
    Xs, _, labs = converter(batch, gpu_id) # アラインメント済ラベルは使わない

    with chainer.using_config('train', train_mode), \
         chainer.using_config('enable_backprop', train_mode):
        ys = net(Xs)
    
    # 発話ごとに分割
    split_point = np.cumsum(numframes)[:-1]
    y4utt = F.split_axis(ys, split_point, axis=0)
    input_length = xp.array(numframes, dtype=np.int32)
    
    # 第iフレームの確率行列(B,V)となるよう整形
    y4utt = F.pad_sequence(y4utt)
    y4frame = F.stack(y4utt, axis=1)
    x = [xi for xi in y4frame]
    
    # ロスの計算
    loss = F.connectionist_temporal_classification(x, labs, blank_symbol, input_length, label_length)

    return loss

In [0]:
import random
import numpy as np
import chainer

def reset_seed(seed=0):
    random.seed(seed)
    np.random.seed(seed)
    if gpu_id >= 0:
        chainer.cuda.cupy.random.seed(seed)

In [0]:
import numpy as np
from sklearn.preprocessing import LabelEncoder

with open('phones') as f:
    phones = f.read().splitlines()
le = LabelEncoder()
le.fit(phones)
blank_symbol = np.asscalar(le.transform(['_'])[0])
nsymbol = len(phones)

In [0]:
import numpy as np
from chainer import iterators
from chainer import optimizers
from chainer.backends import cuda
from chainer.cuda import to_cpu
from chainer.dataset import concat_examples

gpu_id = 0 # CPUを用いる場合は、この値を-1にしてください

batchsize = 100
max_epoch = 200

xp = np
if gpu_id >= 0:
    xp = chainer.cuda.cupy

reset_seed(1234) # 乱数の種をセット

train = np.load("MHT-train.npy") # train: 450 x (framelen x 26 , framelen, maxphonenum)
test = np.load("MHT-test.npy")

train_iter = iterators.SerialIterator(train, batchsize)

# dropoutを大きくしないと、訓練データの音素列に過剰に適合するようだ
net = RNN(n_lstm_layers=1, n_mid_units=200, n_out=nsymbol, dropout=0.4)

#optimizer = optimizers.SGD(lr=0.0001).setup(net)
optimizer = optimizers.Adam().setup(net)

if gpu_id >= 0:
    net.to_gpu(gpu_id)
    
while train_iter.epoch < max_epoch:

    # ---------- 学習の1イテレーション ----------
    train_batch = train_iter.next()

    loss = calculate_loss(net, train_batch, blank_symbol, gpu_id, train_mode = True)

    # 勾配の計算
    net.cleargrads()
    loss.backward()

    # バッチ単位で古い記憶を削除し、計算コストを削減する。
    loss.unchain_backward()

    # パラメータの更新
    optimizer.update()
    
    # --------------- ここまで ----------------

    # 1エポック終了ごとにValidationデータに対する予測精度を測って、
    # モデルの汎化性能が向上していることをチェックしよう
    if train_iter.is_new_epoch:  # 1 epochが終わったら
        # ロスの表示
        print('epoch:{:02d} train_loss:{:.04f} '.format(
            train_iter.epoch, float(to_cpu(loss.data))), end='')

        valid_losses = []
        
        loss_valid = calculate_loss(net, test, blank_symbol, gpu_id, train_mode=False)
        valid_losses.append(to_cpu(loss_valid.array))

        print('val_loss:{:.04f}'.format(np.mean(valid_losses)))

epoch:01 train_loss:541.2224 val_loss:401.2343
epoch:02 train_loss:470.5587 val_loss:428.6295
epoch:03 train_loss:427.5812 val_loss:344.4819
epoch:04 train_loss:421.0080 val_loss:328.5205
epoch:05 train_loss:407.3997 val_loss:335.0782
epoch:06 train_loss:386.1239 val_loss:335.0728
epoch:07 train_loss:396.0115 val_loss:322.8152
epoch:08 train_loss:404.2352 val_loss:316.6690
epoch:09 train_loss:399.6893 val_loss:312.7981
epoch:10 train_loss:383.7571 val_loss:311.9659
epoch:11 train_loss:356.4778 val_loss:308.5427
epoch:12 train_loss:371.0016 val_loss:307.5361
epoch:13 train_loss:335.9829 val_loss:303.8176
epoch:14 train_loss:356.9879 val_loss:297.6185
epoch:15 train_loss:366.5684 val_loss:296.2573
epoch:16 train_loss:350.5456 val_loss:294.5177
epoch:17 train_loss:331.8772 val_loss:293.4913
epoch:18 train_loss:346.9295 val_loss:289.7409
epoch:19 train_loss:327.4168 val_loss:285.9658
epoch:20 train_loss:345.1972 val_loss:287.1277
epoch:21 train_loss:357.7995 val_loss:278.7282
epoch:22 trai

## 音素認識結果の観察

In [0]:
test_utterance_number = 0

if gpu_id >= 0:
    net.to_gpu(gpu_id)

Xs, _, __ = converter(test, gpu_id)

# テストデータを1つ取り出します
X_test = Xs[test_utterance_number]
_, __, lab_test = test[test_utterance_number]

with chainer.using_config('train', False), \
     chainer.using_config('enable_backprop', False):
    y_test = net([X_test])

# Variable形式で出てくるので中身を取り出す
y_test = y_test.array

# 結果をCPUに送る
y_test = to_cpu(y_test)

# 予測確率の最大値のインデックスを見る
pred_label = y_test.argmax(axis=1)

テストデータの0番目 (Jセット第1文) の正解音素列は次の通りです。

In [0]:
le.inverse_transform(lab_test)

  if diff:


array(['sil', 'ch', 'i', 'i', 's', 'a', 'n', 'a', 'u', 'n', 'a', 'g', 'i',
       'y', 'a', 'n', 'i', 'pau', 'n', 'e', 'Q', 'k', 'i', 'n', 'o', 'y',
       'o', 'o', 'n', 'a', 'm', 'o', 'n', 'o', 'g', 'a', 'm', 'i', 'n',
       'a', 'g', 'i', 'r', 'u', 'sil'], dtype='<U3')

これに対し、まず各フレームに対する音素認識結果を見てみます。

In [0]:
le.inverse_transform(pred_label)

  if diff:


array(['sil', 'sil', '_', '_', '_', '_', '_', '_', '_', '_', '_', '_',
       '_', '_', '_', '_', '_', '_', '_', '_', '_', '_', '_', '_', '_',
       '_', '_', '_', '_', 'ch', 'ch', 'ch', '_', '_', 'i', 'i', '_', '_',
       '_', 'i', 'i', 'i', 'i', 'i', '_', '_', '_', '_', '_', '_', 's',
       's', '_', '_', '_', '_', 'a', 'a', '_', '_', '_', '_', 'n', 'n',
       'n', 'a', 'a', '_', '_', '_', '_', '_', '_', '_', '_', 'u', 'u',
       '_', '_', '_', '_', '_', '_', '_', '_', '_', 'n', 'n', 'a', 'a',
       'a', '_', '_', '_', '_', '_', '_', '_', '_', '_', '_', 'g', 'g',
       'g', '_', '_', '_', '_', '_', '_', 'i', 'i', 'i', 'i', '_', '_',
       '_', 'a', 'a', 'a', 'a', '_', '_', '_', '_', '_', '_', '_', '_',
       'n', 'n', '_', '_', '_', '_', '_', '_', '_', '_', '_', 'i', 'i',
       'i', 'i', '_', '_', '_', '_', 'pau', 'pau', '_', '_', '_', '_',
       '_', '_', '_', '_', '_', '_', '_', '_', '_', '_', '_', '_', '_',
       '_', '_', '_', '_', '_', '_', '_', '_', '_', '_', '_', '

多くのフレームで _ (ブランク) が出力されているのがわかります。これは、最も「自信がある」フレームに対してだけ具体的な音素が出力され、音素境界に近い曖昧な部分では何も出力していないものと解釈することができます。

この結果をLSTM等と単純に比較することはできません。CTCの学習には音素の時間情報を使っていないので、こちらの方が不利だからです。

出力された冗長な音素列を縮約することで、次のように最終的な音素認識結果が得られます。

In [0]:
mask = pred_label[:-1] != pred_label[1:]
tmp_label = pred_label[np.append(mask,True)]
mask = tmp_label != blank_symbol
le.inverse_transform(tmp_label[mask])

  if diff:


array(['sil', 'ch', 'i', 'i', 's', 'a', 'n', 'a', 'u', 'n', 'a', 'g', 'i',
       'a', 'n', 'i', 'pau', 'n', 'e', 'Q', 'k', 'i', 'n', 'o', 'y', 'o',
       'o', 'n', 'a', 'm', 'o', 'n', 'o', 'g', 'a', 'm', 'i', 'n', 'a',
       'g', 'i', 'r', 'u', 'sil'], dtype='<U3')