再起型ニューラルネットワーク

In [None]:
# 分かち書きを行うためにjanomeというPythonパッケージをインストール
!pip install janome

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import pandas as pd # CSVを含む表形式データを読み込み，分析するためのパッケージ
from janome.tokenizer import Tokenizer # janomeからTokenizerクラスをimportする

train_size = 25000 # 先頭の20,000件を訓練データとして用いることにする
batch_size = 32 # ミニバッチのサイズを設定

# CSVファイルを読み込み
dataset = pd.read_csv('drive/MyDrive/実習B/slice_AI.csv')

# janomeはPythonで書かれた形態素解析器であり単語の分かち書きなどが可能
# なお，速度に難があるため，大量のテキストを処理するときには，MeCabを利用すること
tokenizer = Tokenizer(wakati=True)
# text列の文字列を単語に分割し保存する
dataset['text'] = dataset['text'].map(lambda x: list(tokenizer.tokenize(x)))

# 読み込みんだCSVのうち，先頭の `train_size` 件を訓練データ，残りをテストデータとして用いる
train_dataset, test_dataset = dataset[:train_size], dataset[train_size:]

In [None]:
import torch
import torchtext.transforms as T
from torchtext.vocab import build_vocab_from_iterator
from torchtext.vocab import FastText
from torchtext.vocab import vocab
from torch.utils.data import DataLoader

# 単語埋め込みの読み込み
fasttext = FastText(language="ja")

# fastText用の前処理を行う
fasttext_vocab = vocab(fasttext.stoi)
special_token_id = len(fasttext)
fasttext_vocab.set_default_index(special_token_id)
shape = fasttext.vectors.shape
fasttext_vectors = torch.zeros((shape[0]+1, shape[1]))
fasttext_vectors[:shape[0]] = fasttext.vectors

# テキストの変換方法を定義
text_transform = T.Sequential(
    T.VocabTransform(fasttext_vocab), # 単語をfastTextの辞書で置き換え
    T.ToTensor(padding_value=special_token_id) # (一番最後の整数値+1)という整数値で長さを揃える
)

# 各バッチからテキストとラベルを読み込む方法を定義
def collate_batch(batch):
    texts = text_transform([text for (id, label, text) in batch])
    labels = torch.Tensor([label for (id, label, text) in batch])
    return texts, labels


# 訓練データ，および，テストデータ用のデータローダを用意する
train_data_loader = DataLoader(train_dataset.values, batch_size=batch_size, shuffle=True, collate_fn=collate_batch)
test_data_loader = DataLoader(test_dataset.values, batch_size=batch_size, shuffle=False, collate_fn=collate_batch)

.vector_cache/wiki.ja.vec: 1.37GB [02:32, 9.00MB/s]                            
100%|██████████| 580000/580000 [00:43<00:00, 13253.32it/s]


In [None]:
# データローダから読み込まれるデータ構造の確認
for i, (texts, labels) in enumerate(train_data_loader):
    print(texts.shape) # textsにどのようなデータが入っているか確認
    for text, label in zip(texts, labels):
        print(text, label)
        break # 1テキストだけで終了
    break # 1テキストだけで終了

torch.Size([32, 64])
tensor([580000, 580000, 580000, 580000, 580000, 580000, 580000, 580000, 580000,
        580000, 580000, 580000, 580000, 580000, 580000, 580000, 580000, 580000,
        580000, 580000, 580000, 580000, 580000, 580000, 580000, 580000, 580000,
        580000, 580000, 580000, 580000, 580000, 580000, 580000, 580000, 580000,
        580000, 580000, 580000, 580000, 580000, 580000, 580000, 580000, 580000,
        580000, 580000, 580000, 580000, 580000, 580000, 580000, 580000, 580000,
        580000, 580000, 580000, 580000, 580000, 580000, 580000, 580000, 580000,
        580000]) tensor(0.)


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class TextClassifier(nn.Module):
    def __init__(self):
        super(TextClassifier, self).__init__()

        # 単語埋め込み（FastText）を設定する
        self.embedding = torch.nn.Embedding.from_pretrained(
            embeddings=fasttext_vectors, freeze=False)
        
        emb_size = fasttext_vectors.shape[1]
        self.lstm = nn.LSTM(emb_size, 100) # LSTMを用意する
        self.fc = nn.Linear(100, 1) # 1 x 100 の行列（この場合はベクトル）を含む全結合層を設定

    def forward(self, seq):
        x = self.embedding(seq) # 各単語を単語埋め込みに変換する
        x = x.permute(1, 0, 2) # バッチサイズ x 系列長 x 埋め込みの次元数 -> 系列長 x バッチサイズ x 単語埋め込みの次元数
        output, (h_n, c_n) = self.lstm(x) # LSTMを適用する
        h = h_n.view(-1, 100) # 各系列の最後の入力（単語）に対応する隠れ状態は，1 x バッチサイズ x 隠れ状態の次元数，となっているため，これを，バッチサイズ x 隠れ状態の次元数，と変換する．
        y = self.fc(F.relu(h)) # ReLUをかけてから，全結合層に通す．
        y = y.squeeze() # yは バッチサイズ x 1 という行列になっているため，バッチサイズと同じ次元数を持つベクトルに変換
        return y

In [None]:
import torch
import torch.optim as optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # CPUもしくはGPUのどちらを使うかを設定
model = TextClassifier() # ニューラルネットワークモデルのインスタンスを生成
model = model.to(device) # CPUもしくはGPUのどちらを設定
optimizer = optim.Adam(model.parameters()) # 基本的な学習方法はミニバッチ勾配降下法ではあるが，その中でもよく用いられるAdamと呼ばれる方法を用いることにする

criterion = nn.BCEWithLogitsLoss() # 二値分類用の交差エントロピーを最小化することにする

epoch_size = 20 # 勾配降下法はすべてのデータでパラメータを更新したら終わりではなく，全データでの更新（=1エポック）を複数回行う必要がある

model.train() # モデルを学習モードに変更

# `epoch_size`の数だけ以下を繰り返す
for epoch in range(epoch_size):
    losses = []
    # イテレータはミニバッチ勾配降下法のために，`batch_size`で指定した数ごとにデータをわけて読み込んでくれる．
    for batch_idx, (texts, labels) in enumerate(train_data_loader):
        texts, labels = texts.to(device), labels.to(device)
        optimizer.zero_grad() # 勾配の初期化
        y = model(texts) # 現時点でのモデルの出力を得る
        loss = criterion(y, labels.type(torch.float)) # 交差エントロピーの計算
        loss.backward() # 交差エントロピーの勾配計算
        optimizer.step() # パラメータ更新
        losses.append(loss.item())

    # 現在の交差エントロピーを出力
    print('Epoch: {}\tCross Entropy: {:.6f}'.format(epoch, sum(losses)))


Epoch: 0	Cross Entropy: 339.739955
Epoch: 1	Cross Entropy: 292.401647
Epoch: 2	Cross Entropy: 255.345709
Epoch: 3	Cross Entropy: 206.589820
Epoch: 4	Cross Entropy: 136.217601
Epoch: 5	Cross Entropy: 89.109435
Epoch: 6	Cross Entropy: 76.721182
Epoch: 7	Cross Entropy: 68.483462
Epoch: 8	Cross Entropy: 62.532087
Epoch: 9	Cross Entropy: 60.494997
Epoch: 10	Cross Entropy: 57.860027
Epoch: 11	Cross Entropy: 55.185145
Epoch: 12	Cross Entropy: 55.506827
Epoch: 13	Cross Entropy: 55.487376
Epoch: 14	Cross Entropy: 49.551541
Epoch: 15	Cross Entropy: 49.980563
Epoch: 16	Cross Entropy: 55.254276
Epoch: 17	Cross Entropy: 47.211614
Epoch: 18	Cross Entropy: 96.775977
Epoch: 19	Cross Entropy: 52.284536


In [None]:
correct = 0
model.eval() # モデルを評価モードに変更
for batch_idx, (texts, labels) in enumerate(test_data_loader):
    texts, labels = texts.to(device), labels.to(device)
    y = model(texts) # モデルの出力を得る
    result = torch.sigmoid(y) # `TextClassifier`ではsigmoid関数を適用していなかったのでここで適用
    prediction = result >= 0.5 # `result`ベクトルと同じ次元を持ち，`result`の中で0.5以上である次元がTrue，それ以外がFalseであるベクトルを`prediction`とする
    target = labels == 1 # `labels`ベクトルと同じ次元を持ち，`labels`の中で1である次元がTrue，それ以外がFalseであるベクトルを`target`とする
    correct_num = target.eq(prediction).sum().item() # `prediction`ベクトルと`target`ベクトルでTrue/Falseが一致したものの数を数える
    correct += correct_num

# test_iterator.datasetにはテストデータ全体が入っているので，これの長さはテストデータの事例数となる
print("Accuracy: {:.3f}".format(correct / len(test_data_loader.dataset)))

Accuracy: 0.935
