# 第16章 系列データのモデル化 ― リカレントニューラルネットワーク

## 16.1 系列データ

系列データは**シーケンス**と呼ばれる。

### 16.1.1 系列データのモデル化：順序は大事

シーケンスを扱うときは順序が重要

### 16.1.2 系列データを表現する

リカレントニューラルネットワーク (RNN)の目的は、シーケンスをモデル化することである。

RNNは、過去の情報を記録しておき、その情報に従って新しい事象を処理できる。

### 16.1.3 シーケンスモデルのさまざまなカテゴリ

モデル化タスクを次の3つのカテゴリに分類される可能性がある。<br>
- 多対一
- 一対多
- 多対多

## 16.2 リカレントニューラルネットワーク：シーケンス

### 16.2.1 RNNのループ構造を理解する

標準のフィードフォワードニューラルネットワークでは、情報は入力層から隠れ層へ流れ、隠れ層から出力層へ流れる。

対し、RNNでは、隠れ層の入力は現在の時間刻みの入力層から得られるだけではなく、1つ前の時間刻みの隠れ層からも得られる。

ループを**リカレントエッジ**と呼ぶ。

隠れ層が1つのRNNは、**単層RNN**という呼び方が定着している。

RNNの隠れユニットはそれぞれ、入力層からの事前活性化と1つ前の時間刻み$t-1$の同じ隠れ層からの活性化という2つの入力を受け取る。

### 16.2.2 RNNで活性化を計算する

### 16.2.3 隠れ層の再帰と出力層の再帰

- 隠れ層から隠れ層への再帰
- 出力層から隠れ層への再帰
- 出力層から出力層への再帰

TensorFlow Keras API では`SimpleRNN`を使ってリカレント層を定義できる。<br>
`SimpleRNN`を使ってリカレント層を定義できる。

In [1]:
import tensorflow as tf
tf.random.set_seed(1)

rnn_layer = tf.keras.layers.SimpleRNN(
    units=2, use_bias=True, return_sequences=True)
dummy_input = tf.keras.Input(shape=(None, 5))
rnn_layer(dummy_input)
w_xh, w_oo, b_h = rnn_layer.weights
print("W_xh shape: ", w_xh.shape)
print("W_oo shape:", w_oo.shape)
print("b_h shape:", b_h.shape)

2025-01-08 12:07:46.067479: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


W_xh shape:  (5, 2)
W_oo shape: (2, 2)
b_h shape: (2,)


次に、`rnn_layer`でフォワードパスを呼び出し、各時間刻みの出力を手動で計算し、それらを比較する。

In [3]:
x_seq = tf.convert_to_tensor([[1.0]*5, [2.0]*5, [3.0]*5], dtype=tf.float32)
## SimpleRNNの出力
output = rnn_layer(tf.reshape(x_seq, shape=(1, 3, 5)))
## 出力を手動で計算
out_man = []
for t in range(len(x_seq)):
    xt = tf.reshape(x_seq[t], (1, 5))
    print("Time step {} =>".format(t))
    print("   Input      : ", xt.numpy())
    ht = tf.matmul(xt, w_xh) + b_h
    print("   Hidden     : ", ht.numpy())
    if t > 0:
        prev_o = out_man[t-1]
    else:
        prev_o = tf.zeros(shape=(ht.shape))
    ot = ht + tf.matmul(prev_o, w_oo)
    ot = tf.math.tanh(ot)
    out_man.append(ot)
    print("   Output (manual): ", ot.numpy())
    print("   SimpleRNN output : ", output[0][t].numpy())
    print()

Time step 0 =>
   Input      :  [[1. 1. 1. 1. 1.]]
   Hidden     :  [[-2.6218863   0.89948833]]
   Output (manual):  [[-0.9894949  0.7160487]]
   SimpleRNN output :  [-0.9894949  0.7160487]

Time step 1 =>
   Input      :  [[2. 2. 2. 2. 2.]]
   Hidden     :  [[-5.2437725  1.7989767]]
   Output (manual):  [[-0.9999793  0.5926438]]
   SimpleRNN output :  [-0.9999793  0.5926438]

Time step 2 =>
   Input      :  [[3. 3. 3. 3. 3.]]
   Hidden     :  [[-7.8656597  2.698465 ]]
   Output (manual):  [[-1.          0.92117715]]
   SimpleRNN output :  [-1.          0.92117715]



### 16.2.4 長期的な相互作用の学習

**BPTT**は新しい課題をもたらしている。

**勾配消失**と**勾配発散**問題が発生する。

この問題に対する解決策が2つある。
- 勾配刈り込み
- T-BPTT
- 超短期記憶

### 16.2.5 LSTMのメモリセル

LSTMは勾配消失問題を解決する方法として1997年に提唱された。

LSTMの構成要素は**メモリセル**である。メモリセルは基本的には標準的なRNNの隠れ層を表す。リカレントエッヂに関連付けられる値は**セル状態**と呼ばれる。

## 16.3 リカレントニューラルネットワークの実装：TensorFlowでのシーケンスモデルの構築

2つの一般的な問題にRNNを適用する。

1. 感情分析
2. 言語モデルの構築

### 16.3.1 プロジェクト1: IMDb映画レビューの感情分析

多対一のアーキテクチャに基づいて感情分析のために多層RNNを実装する。

RNNモデルにデータを与える前に、前処理をいくつか適用しておく必要がある。

In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds
import numpy as np
import pandas as pd

df = pd.read_csv('movie_data.csv', encoding='utf-8')

In [1]:
## 手順1：Datasetを作成
target = df.pop('sentiment')
ds_raw = tf.data.Dataset.from_tensor_slices(
    (df.values, target.values))
## 調査
for ex in ds_raw.take(3):
    tf.print(ex[0].numpy()[0][:50], ex[1])
    

NameError: name 'df' is not defined

このDatasetオブジェクトを訓練データセット、テストデータセット、検証データセットに分割できる<br>
データセット全体は50,000個のデータを含んでいる。<br>
最初の25,000個のデータを評価のためにとっておき、次の20,000個のデータを訓練に、5,000個のデータを検証に使うことにする。

In [None]:
tf.random.set_seed(1)
ds_raw = ds_raw.shuffle(
    50000, reshuffle_each_iteration=False)
ds_raw_test = ds_raw.take(25000)
ds_raw_train_valid = ds_raw.skip(25000)
ds_raw_train = ds_raw_train_valid.take(20000)
ds_raw_valid = ds_raw_train_valid.skip(20000)

In [1]:
## 手順2：一意なトークン（単語）を特定
from collections import Counter
tokenizer = tfds.features.text.Tokenizer()
token_counts = Counter()
for example in ds_raw_train:
    tokens = tokenizer.tokenize(example[0].numpy()[0])
    token_counts.update(tokens)
print("Vocab-size:", len(token_counts))

一意の単語をそれぞれ一意な整数にマッピングする。<br>
`tensorflow_datasets`パッケージには、`TokenTextEncoder`というクラスがすでに含まれている。<br>
まず、`TokenTextEncoder`クラスのコンストラクタに一意なトークンを渡して`encoder`オブジェクトを作成する。

In [None]:
## 手順3：一意なトークンを整数にエンコード
encoder = tfds.features.text.TokenTextEncoder(token_counts)
example_str = "This is an example!"
print(encoder.encode(example_str))

1つ目の関数は、Eager Executionモードが有効であるかのように入力テンソルを扱う

In [1]:
## 手順3-A：変換用の関数を定義
def encode(text_tensor, label):
    text = text_tensor.numpy()[0]
    encoded_text = encoder.encode(text)
    return encoded_text, label

2つ目の関数は、`tf.py_function`を使って1つ目の関数をラッピングし、TensorFlow演算子に変換する。

In [None]:
## 3-B：encode関数をラッピングしてTensorFlow演算子に変換
def encode_map_fn(text, label):
    return tf.py_function(encode, inp=[text, label], Tout=(tf.int64, tf.int64))

ds_train = ds_raw_train.map(encode_map_fn)
ds_valid = ds_raw_valid.map(encode_map_fn)
ds_test = ds_raw_test.map(encode_map_fn)
tf.random.set_seed(1)
for example in ds_train.shuffle(1000).take(5):
    print("Sequence length:", example[0].shape)

TensorFlowには、さまざまな形状の要素で構成されたデータセットをバッチに分割するために、`padded_batch`という別のメゾットが用意されている。このメゾットは、1つのバッチにまとめられる一連のシーケンスをプレースホルダ値(0)で自動的にパディングする。結果として、バッチ内のシーケンスがすべて同じ形状になる。

In [None]:
## 小さなサブセットを取得
ds_subset = ds_train.take(8)
for example in ds_subset:
    print("Individual size:", example[0].shape)

In [None]:
## このサブセットをバッチに分割
ds_batched = ds_subset.padded_batch(4, padded_shapes=([-1], []))
for batch in ds_batched:
    print("Batch dimension:", batch[0].shape)

3つのデータセットをバッチサイズ32のバッチに分割しよう

In [None]:
train_data = ds_train.padded_batch(32, padded_shapes=([-1],[]))
valid_data = ds_valid.padded_batch(32, padded_shapes=([-1],[]))
test_data = ds_test.padded_batch(32, padded_shapes=([-1], []))

特徴量の**埋め込み**について説明しておこう。

単語のインデックスを入力特徴量に変換する方法は何種類ある。<br>
実数値の要素を持つ固定サイズのベクトルに各単語をマッピングすること。<br>
有限サイズのベクトルを使って無数の実数を表すことができる。

埋め込みには、one-hotエンコーディングよりも有利な点が2つある。
- 次元の呪いの影響を抑制する特徴量空間の次元削減
- ニューラルネットワークの埋め込み層が最適化可能であることによる顕著な特徴量の抽出

`tf.keras.layers.Embedding`を利用すれば、埋め込み層を作成するのは簡単だ。

In [None]:
from tensorflow.keras.layers import Embedding
model = tf.keras.Sequential()
model.add(Embedding(input_dim=100, output_dim=6,
                    input_length=20, name="embed-layer"))
model.summary()

**RNNモデルを構築する**

kerasの`Sequential`クラスを利用すれば、埋め込み層、RNNのリカレント層、そして全結合の非リカレント層を組み合わせることができる。
- `SimpleRNN` 通常のRNN層。つまり、全結合リカレント層
- `LSTM` LSTM RNN層。長期的な依存関係を捕捉するのに役立つ。
- `GRU` GRUをリカレント層。LSTMに代わる手法として提案されている。

`input_dim=1000`, `output_dim=32`の埋め込み層から始まるRNNモデルを構築する。<br>
次に、`SimpleRNN`型の2つのリカレント層を追加する。<br>
最後に、出力層として非リカレントの全結合層を追加する。

In [None]:
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Embedding
from tensorflow.keras.layers import SimpleRNN
from tensorflow.keras.layers import Dense
model = Sequential()
model.add(Embedding(input_dim=1000, output_dim=32))
model.add(SimpleRNN(32, return_sequences=True))
model.add(SimpleRNN(32))
model.add(Dense(1))
model.summary()

**感情分析のためのRNNモデルを構築する**

In [None]:
embedding_dim = 20
vocab_size = len(token_counts) + 2
tf.random.set_seed(1)
## モデルを構築
bi_lstm_model = tf.keras.Sequential([
    tf.keras.layers.Embedding(
        input_dim=vocab_size, output_dim=embedding_dim,
        name='embed-layer'),
    tf.keras.layers.Bidirectional(
        tf.keras.layers.LSTM(64, name='lstm-layer'),
        name='bidir-lstm'),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid')
])
bi_lstm_model.summary()
## コンパイルと訓練
bi_lstm_model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-3),
    loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
    metrics=["accuracy"])
history = bi_lstm_model.fit(
    train_data, validation_data=valid_data, epochs=10)
## テストデータでの評価
test_results = bi_lstm_model.evaluate(test_data)
print("Test Acc.: {:.2f}%".format(test_results[1]*100))

前処理ステップ２～４（手順２～４）をまとめた`preprocess_datasets`というヘルパー関数を定義する。

In [None]:
from collections import Counter

def preprocess_datasets(ds_raw_train, ds_raw_valid, ds_raw_test,
                        max_seq_length=None, batch_size=32):
    ## 手順2：一意なトークンを特定（手順1はすでに実行されている）
    tokenizer = tfds.features.text.Tokenizer()
    token_counts = Counter()

    for example in ds_raw_train:
        tokens = tokenizer.tokenize(example[0].numpy()[0])
        if max_seq_length is not None:
            tokens = tokens[-max_seq_length:]
        token_counts.update(tokens)

    print("Vocab-size:", len(token_counts))

    ## 手順3：テキストをエンコード
    encoder = tfds.features.text.TokenTextEncoder(token_counts)

    def encode(text_tensor, label):
        text = text_tensor.numpy()[0]
        encoded_text = encoder.encode(text)
        if max_seq_length is not None:
            encoded_text = encoded_text[-max_seq_length:]
        return encoded_text, label

    def encode_map_fn(text, label):
        return tf.py_function(encode, inp=[text, label], Tout=(tf.int64, tf.int64))

    ds_train = ds_raw_train.map(encode_map_fn)
    ds_valid = ds_raw_valid.map(encode_map_fn)
    ds_test = ds_raw_test.map(encode_map_fn)

    ## 手順4：データセットをバッチ分割
    train_data = ds_train.padded_batch(batch_size, padded_shapes=([-1], []))
    valid_data = ds_valid.padded_batch(batch_size, padded_shapes=([-1], []))
    test_data = ds_test.padded_batch(batch_size, padded_shapes=([-1], []))

    return (train_data, valid_data, test_data, len(token_counts))

さまざまなアーキテクチャを使ったモデルの構築を容易にするために、ヘルパー関数(`build_rnn_model`)をもう1つ定義する

In [None]:
from tensorflow.keras.layers import Embedding
from tensorflow.keras.layers import Bidirectional
from tensorflow.keras.layers import SimppleRNN
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import GRU

def build_rnn_model(embedding_dim, vocab_size, recurrent_type="SimpleRNN",
                    n_recurrent_units=64, n_recurrent_layers=1,
                    bidirectional=True):
    tf.random.set_seed(1)
    ## モデルを構築
    model = tf.keras.Sequential()
    model.add(Embedding(input_dim=vocab_size, output_dim=embedding_dim, name="embed-layer"))
    for i in range(n_recurrent_layers):
        return_sequences = (i < n_recurrent_layers-1)
        if recurrent_type == "SimpleRNN":
            recurrent_layer = SimpleRNN(units=n_recurrent_units,
                                        return_sequences=return_sequences,
                                        name="simprnn-layer-{}".format(i))
        if recurrent_type == "LSTM":
            recurrent_layer = LSTM(units=n_recurrent_units, 
                                   return_sequences=return_sequences,
                                   name="lstm-layer-{}".format(i))
        if recurrent_type == "GRU":
            recurrent_layer = GRU(units=n_recurrent_units,
                                  return_sequences=return_sequences,
                                  name="gru-layer-{}".format(i))
        if bidirectional:
            recurrent_layer = Bidirectional(recurrent_layer,
                                            name="bidir-" + recurrent_layer.name)
        model.add(recurrent_layer)

    model.add(tf.keras.layers.Dense(64, activation="relu"))
    model.add(tf.keras.layers.Dense(1, activation="sigmoid"))
    return model

In [None]:
batch_size = 32
embedding_dim = 20
max_seq_length = 100
train_data, valid_data, test_data, n = preprocess_datasets(
    ds_raw_train, ds_raw_valid, ds_raw_test,
    max_seq_length=max_seq_length, batch_size=batch_size)
vocab_size = n + 2
rnn_model = build_rnn_model(embedding_dim, vocab_size,
                            recurrent_type="SimpleRNN",
                            n_recurrent_units=64,
                            n_recurrent_layers=1,
                            bidirectional=True)
rnn_model.summary()

In [None]:
rnn_model.compile(optimizer=tf.keras.optimizers.Adam(1e-3),
                  loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
                  metrics=["accuracy"])
history = rnn_model.fit(train_data, validation_data=valid_data, epochs=10)

In [None]:
results = rnn_model.evaluate(test_data)
print("Test Acc.: {:.2f}%".format(results[1]*100))

シーケンスを100個のトークンに切り詰め、双方向の`SimpleRNN`層を使った場合の分類正解率は80.70%である。

### 16.3.2 プロジェクト２：文字レベルの言語モデルをTensorFlowで実装する

文字レベルの言語モデルでは、入力は文字のシーケンスに分割され、RNNに1文字ずつ供給される。

RNNは、以前に検出した文字の記憶を基に新しい文字をそれぞれ処理することで、次の文字を予測する。

RNNモデルの実装を次の3つに分けて説明していく。

In [None]:
import numpy as np
# テキストを読み込んで処理
with open("1268-0.txt", "r") as fp:
    text = fp.read()

start_indx = text.find("THE MYSTERIOU ISLAND")
end_indx = text.find("End of the Project Gutenberg")
text = text[start_indx:end_indx]
char_set = set(text)
print("Total Length:", len(text))

In [None]:
print("Unique Characters:", len(char_set))

テキストを数値フォーマットに変換しなければならない。<br>
そこで、各文字から整数へのマッピングを定義する単純なPythonディクショナリ`char2int`を作成する。

文字から整数へのマッピングと、Numpy配列のインデックスを使って逆方向のマッピングを行うためのディクショナリは次のように構築する。

In [None]:
chars_sorted = sorted(char_set)
char2int = {ch: i for i, ch in enumerate(chars_sorted)}
char_array = np.array(chars_sorted)
text_encoded = np.array([char2int[ch] for ch in text], dtype=np.int32)
print("Text encoded shape:", text_encoded.shape)

In [None]:
print(text[:15], "== Encoding ==>", text_encoded[:15])
print(text_encoded[15, 21], "== Reverse ==>", 
      "".join(char_array[text_encoded[15:21]))

Numpy配列 `text_encoded` には、このテキストの全文字のエンコード値が含まれている。<br>
ここで、この配列からTensorFlowの`Dataset`を作成する。

In [None]:
import tensorflow as tf
ds_text_encoded = tf.data.Dataset.from_tensor_slices(text_encoded)
for ex in ds_text_encoded.take(5):
    print("{}".format("".join(char_array[ex.numpy()])))

多クラス分類アプローチに基づき、長さが1のシーケンス（つまり1文字）を入力として新しいテキストを反復的に生成していくことができる。

In [None]:
seq_length = 40
chunk_size = seq_length + 1
ds_chunks = ds_text_encoded.batch(chunk_size, drop_remainder=True)
## xとyを分離するための関数を定義
def split_input_target(chunk):
    input_seq = chunk[:-1]
    target_seq = chunk[1:]
    return input_seq, target_seq

ds_sequences = ds_chunks.map(split_input_target)

この変換後のデータセットでシーケンスをいくつか調べてみよう

In [None]:
for example in ds_sequences.take(2):
    print(" Input  (x): ", repr("".join(char_array[example[0].numpu()]])))
    print("Target  (y): ", repr("".join(char_array[example[1].numpu()]])))
    print()

データセットを準備するための最後の手段は、このデータセットをバッチに分割することである。<br>
このデータセットをバッチに分割するための最初の前処理ステップでは、シーケンスのチャンクを作成した。<br>
各チャンクは1つの文章を表しており、1つの訓練データに相当する。ここでは、これらの訓練データをシャッフルし、入力データを再びバッチに分割する。ただし今回は、各バッチに複数の訓練データが含まれることになる。

In [None]:
BATCH_SIZE = 64
BUFFER_SIZE = 10000
ds = ds_sequences.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

**文字レベルのRNNモデルを構築する**

Kerasの`Sequential`クラスを使ってRNNモデルを構築する。
続いて、訓練パラメータを設定し、この関数を呼び出してRNNモデルを取得する。

In [None]:
def build_model(vocab_size, embedding_dim, rnn_units):
    model = tf.keras.Sequential([
        tf.keras.layers.Embedding(vocab_size, embedding_dim),
        tf.keras.layers.LSTM(rnn_units, return_sequences=True),
        tf.keras.layers.Dense(vocab_size)
    ])
    return model

## 訓練パラメータを設定
chrarset_size = len(char_array)
embedding_dim = 256
rnn_units = 512
tf.random.set_seed(1)
model = build_model(vocab_size=charset_size,
                    embedding_dim=embedding_dim,
                    rnn_units=rnn_units)
model.summary()

In [None]:
model.compile(optimizer="adam", loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True))
model.fit(ds, epochs=20)

**新しいテキストを生成するためにRNNモデルを評価する**

ソフトマックス関数を利用すれば、これらのロジットを「特定の文字が次の文字になる確率」をすぐに変換できる。シーケンスの次の文字を予測するには、ロジットの値が最も大きい要素を選択すればよい。この選択は、最も確率が高い文字を選択することに等しい。

TensorFlowには、`tf.random.categorical`という関数がすでに定義されている。この関数を利用すれば、カテゴリ分布からデータをランダムに抽出できる。

In [None]:
tf.random.set_seed(1)
logits = [[1.0, 1.0, 1.0]]
print("Probabilities:", tf.math.softmax(logits).numpy()[0]))
samples = tf.random.categorical(logits=logits, num_samples=10)
tf.print(samples.numpy())

与えられたロジットに基づき、これらのカテゴリ値の確率は同じになる。

In [None]:
tf.random.set_seed(1)
logits = [[1.0, 1.0, 3.0]]
print("Probabilities:", tf.math.softmax(logits).numpy()[0]))

In [None]:
samples = tf.random.categorical(logits=logits, num_samples=10)
tf.print(samples.numpy())

`sample`という関数を定義する。この関数は、入力として短い文字列(`starting_str`)を受け取り、新しい文字列(`generated_str`)を生成する

`generated_str`が目的の長さになるで、`generated_str`から最後の`max_input_length`個の文字を取り出し、この文字列を使って新しい文字を生成する。生成したシーケンスを次の要素を生成するための入力として利用するプロセスを**自己回帰**と呼ぶ。

In [None]:
def sample(model, starting_str, len_generated_text=500,
          max_input_length=40, scale_factor=1.0):

    encoded_input = [char2int[s] for s in starting_str]
    encoded_input = tf.reshape(encoded_input, (1, -1))

    generated_str = starting_str

    model.reset_states()
    for i in range(len_generated_text):
        logits = model(encoded_input)
        logits = tf.squeeze(logits, 0)

        scaled_logits = logits * scale_factor
        new_char_indx = tf.random.categorical(scaled_logits, num_samples=1)
        new_char_indx = tf.squeeze(new_char_index)[-1].numpy()
        generated_str += str(char_array[new_char_indx])
        new_char_indx = tf.expand_dims([new_char_indx], 0)
        encoded_input = tf.concat([encoded_input, new_char_indx], axis=1)
        encoded_input = encoded_input[:, -max_input_length:]

    return generated_str

さっそく新しいテキストを生成してみよう。

In [None]:
tf.random.set_seed(1)
print(sample(model, starting_str="The island"))

In [None]:
logits = np.array([[1.0, 1.0, 3.0]])
print("Probabilities before scaling: ",
     tf.math.softmax(logits).numpy()[0])
print("Probabilities after scaling with 0.5: ",
     tf.math.softmax(0.5*logits).numpy()[0])
print("Probabilities after scaling with 0.1: ",
     tf.math.softmax(0.1*logits).numpy()[0])

$\alpha=2.0$: より予測可能

In [None]:
tf.random.set_seed(1)
print(sample(model, starting_str="The island", scale_factor=2.0))

In [None]:
tf.random.set_seed(1)
print(sample(model, starting_str="The island", scale_factor=1.0))