<a href="https://colab.research.google.com/github/Syunta-SEKI/Transformer-Learning/blob/main/Train.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive

# Google Drive をマウント
drive.mount('/content/drive')

In [None]:
!pip install janome
!pip install torch==2.1.0
!pip install torchtext==0.16.0
!pip install torchvision==0.16.0

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import torch
from torch import nn, Tensor
import torch.nn.functional as F
from torchtext.vocab import vocab
import torchtext.transforms as T
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import numpy as np
import math
import janome
from janome.tokenizer import Tokenizer
import spacy
from collections import Counter

In [None]:
!pip install xlrd

In [None]:
import pandas as pd
df = pd.read_excel("/content/drive/MyDrive/Transformer/JEC_basic_sentence_v1-3.xls", header = None)

In [None]:
#日本語用のトークン変換関数を作成
j_t = Tokenizer()
def j_tokenizer(text):
    return [tok for tok in j_t.tokenize(text, wakati=True)]

#英語用のトークン変換関数を作成
e_t = spacy.load('en_core_web_sm')
def e_tokenizer(text):
    return [tok.text for tok in e_t.tokenizer(text)]

#各文章をトークンに変換
texts = df.iloc[:,0].apply(j_tokenizer)
targets = df.iloc[:,1].apply(e_tokenizer)

print(texts)

In [None]:
#日本語のトークン数（単語数）をカウント
j_list = []
for i in range(len(texts)):
  j_list.extend(texts[i])
j_counter = Counter()
j_counter.update(j_list)
j_v = vocab(j_counter, specials=(['<unk>', '<pad>', '<bos>', '<eos>']))   #特殊文字の定義
j_v.set_default_index(j_v['<unk>'])

#英語のトークン数（単語数）をカウント
e_list = []
for i in range(len(targets)):
  e_list.extend(targets[i])
e_counter = Counter()
e_counter.update(e_list)
e_v = vocab(e_counter, specials=(['<unk>', '<pad>', '<bos>', '<eos>']))   #特殊文字の定義
e_v.set_default_index(e_v['<unk>'])

enc_vocab_size, dec_vocab_size = len(j_v), len(e_v)
print(enc_vocab_size, dec_vocab_size)   #6446 6072

In [None]:
#各言語ごとに単語数を合わせる必要がある為、1文当たりの単語数を14に指定
j_word_count = 14
e_word_count = 14

j_text_transform = T.Sequential(
  T.VocabTransform(j_v),   #トークンに変換
  T.Truncate(j_word_count),   #14語以上の文章を14語で切る
  T.AddToken(token=j_v['<bos>'], begin=True),   #文頭に'<bos>
  T.AddToken(token=j_v['<eos>'], begin=False),   #文末に'<eos>'を追加
  T.ToTensor(),   #テンソルに変換
  T.PadTransform(j_word_count + 2, j_v['<pad>'])   #14語に満たない文章を'<pad>'で埋めて14語に合わせる
)

e_text_transform = T.Sequential(
  T.VocabTransform(e_v),   #トークンに変換
  T.Truncate(e_word_count),   #14語以上の文章を14語で切る
  T.AddToken(token=e_v['<bos>'], begin=True),   #文頭に'<bos>
  T.AddToken(token=e_v['<eos>'], begin=False),   #文末に'<eos>'を追加
  T.ToTensor(),   #テンソルに変換
  T.PadTransform(e_word_count + 2, e_v['<pad>'])   #14語に満たない文章を'<pad>'で埋めて14語に合わせる
)

class Dataset(Dataset):
  def __init__(
      self,
      df,
      j_text_transform,
      e_text_transform,
      ):

    self.texts = df.iloc[:,0].apply(j_tokenizer)
    self.targets = df.iloc[:,1].apply(e_tokenizer)
    self.j_text_transform = j_text_transform
    self.e_text_transform = e_text_transform

  def max_word(self):
    return len(self.j_v), len(self.e_v)

  def __getitem__(self, i):
    text = self.texts[i]
    text = self.j_text_transform([text]).squeeze()

    target = self.targets[i]
    target = self.e_text_transform([target]).squeeze()

    dec_input = target[:-1]
    dec_target = target[1:]   #右に1つずらす
    data = {"text": text, "dec_input": dec_input, "dec_target": dec_target}
    return data

  def __len__(self):
    return len(self.texts)


In [None]:
BATCH_SIZE = 8

dataset = Dataset(df, j_text_transform, e_text_transform)
data_loader = DataLoader(dataset,
                          batch_size=BATCH_SIZE,
                          num_workers=4,
                          drop_last=True,
                          shuffle=True)

data = next(iter(data_loader))
text, dec_input, target = data["text"], data["dec_input"], data["dec_target"]
print(text[0], dec_input[0], target[0], sep="\n")

In [None]:
#Transformerモデルの実装
import math

class InputEmbeddings(nn.Module):

    def __init__(self, d_model: int, vocab_size: int):
        super().__init__()
        self.d_model = d_model
        self.vocab_size = vocab_size
        self.embedding = nn.Embedding(vocab_size, d_model)

    def forward(self, x):
        return self.embedding(x) * math.sqrt(self.d_model)

class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, seq_len: int, dropout: float) -> None:
        super().__init__()
        self.d_model = d_model
        self.seq_len = seq_len
        self.dropout = nn.Dropout(dropout)

        pe = torch.zeros(seq_len, d_model)
        position = torch.arange(0, seq_len, dtype=torch.float).unsqueeze(1) # unsqueezeにより1番目の次元を追加　(seq_len, 1)型テンソル
        div_term = torch.exp(torch.arange(0, d_model, 2).float()*(-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position*div_term)
        pe[:, 1::2] = torch.cos(position*div_term)
        pe = pe.unsqueeze(0) #形状が(1, seq_len, d_model)になり、バッチ処理が可能に
        self.register_buffer("pe", pe)

    def forward(self, x):
        # xに位置エンコーディングを加える、xにはx.shape[1]までの語数(シーケンス長)が入っているのでそれに合わせて位置エンコーディングを加える
        #位置エンコーディングはブロードキャストされてバッチの数だけコピーされてxに加えることができる
        x = x + self.pe[:, :x.shape[1], :].detach() #peは勾配を不要にして学習をさせない
        return self.dropout(x)

class LayerNormalization(nn.Module):

    def __init__   (self, eps: float = 10**-6) -> None:
        super().__init__()
        self.eps = eps
        self.alpha = nn.Parameter(torch.ones(1))
        self.bias = nn.Parameter(torch.zeros(1))

    def forward(self, x):
        mean = x.mean(dim=-1, keepdims=True)
        std = x.std(dim=-1, keepdims=True)
        return self.alpha * (x-mean) / (std + self.eps) + self.bias


class FeedForwardBlock(nn.Module):

    def __init__(self, d_model: int, d_ff: int, dropout: float) -> None:
        super().__init__() #nn.moduleの継承でbackwardなどの基本的機能をこのクラスに継承し、使用可能になる
        self.linear_1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.linear_2(self.dropout(torch.relu(self.linear_1(x))))

class MultiHeadAttentionBlock(nn.Module):

    def __init__(self, d_model: int, h: int, dropout: float) -> None:
        super().__init__()
        self.d_model = d_model
        self.h = h
        assert d_model % h ==0

        self.d_k = d_model // h
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)

        self.w_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    @staticmethod
    # attentionメソッドはMultiHeadAttentionBlockの他のメソッドを参照しないのでインスタンスメソッドにしなくてよい
    # クラス外でもMultiHeadAttentionBlock.attentionとして利用可能
    def attention(query, key, value, mask, dropout: nn.Dropout):
        d_k = query.shape[-1]
        attention_scores = (query @ key.transpose(-2, -1)) / math.sqrt(d_k)
        if mask is not None:
            attention_scores = attention_scores.masked_fill(mask == 0, float("-inf"))

        attention_scores = attention_scores.softmax(dim=-1)
        if dropout is not None:
            attention_scores = dropout(attention_scores)

        return (attention_scores @ value), attention_scores



    def forward(self, q, k, v, mask):
        query = self.w_q(q) #形状(batch_size, seq_len, d_model)
        key = self.w_k(k) #形状(batch_size, seq_len, d_model)
        value = self.w_v(v)#形状(batch_size, seq_len, d_model)

        query = query.view(query.shape[0], query.shape[1], self.h, self.d_k).transpose(1, 2) #形状(batch_size, h, seq_len, d_k)
        key = key.view(key.shape[0], key.shape[1], self.h, self.d_k).transpose(1, 2)
        value = value.view(value.shape[0], value.shape[1], self.h, self.d_k).transpose(1, 2)

        x, self.attention_scores = MultiHeadAttentionBlock.attention(query, key, value, mask, self.dropout)

        x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.h*self.d_k) #形状(batch_size, seq_len, d_model)
        return self.w_o(x) #形状(batch_size, seq_len, d_model)


class ResidualConnection(nn.Module):

    def __init__(self, dropout: float) -> None:
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        self.norm = LayerNormalization()

    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))

class EncoderBlock(nn.Module):

    def __init__(self, self_attention_block: MultiHeadAttentionBlock, feed_forward_block: FeedForwardBlock, dropout: float) -> None:
        super().__init__()
        self.self_attention_block = self_attention_block
        self.feed_forward_block = feed_forward_block
        self.residual_connections = nn.ModuleList([ResidualConnection(dropout) for _ in range(2)])

    def forward(self, x, src_mask):
        x = self.residual_connections[0](x, lambda x: self.self_attention_block(x, x, x, src_mask))
        x = self.residual_connections[1](x, self.feed_forward_block)
        return x

class Encoder(nn.Module):

    def __init__(self, layers: nn.ModuleList) -> None:
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization()

    #self.layersはEncoderBlockオブジェクトを格納
    def forward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)

class DecoderBlock(nn.Module):

    def __init__(self, self_attention_block: MultiHeadAttentionBlock, cross_attention_block: MultiHeadAttentionBlock, feed_forward_block: FeedForwardBlock, dropout: float):
        super().__init__()
        self.self_attention_block = self_attention_block
        self.cross_attention_block = cross_attention_block
        self.feed_forward_block = feed_forward_block
        self.residual_connections = nn.ModuleList([ResidualConnection(dropout) for _ in range(3)])

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        x = self.residual_connections[0](x, lambda x: self.self_attention_block(x, x, x, tgt_mask))
        x = self.residual_connections[1](x, lambda x:self.cross_attention_block(x, encoder_output, encoder_output, src_mask))
        x = self.residual_connections[2](x, self.feed_forward_block)
        return x


class Decoder(nn.Module):

    def __init__(self, layers: nn.ModuleList) -> None:
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization()

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        for layer in self.layers:
            x = layer(x, encoder_output, src_mask, tgt_mask)
        return self.norm(x)



class ProjectionLayer(nn.Module):

    def __init__(self, d_model: int, vocab_size: int) -> None:
        super().__init__()
        self.proj = nn.Linear(d_model, vocab_size)
        #self.projの出力は(batch_size, seq_len, vocab_size)で、各バッチの各トークンの次のトークンを予測する

    def forward(self, x):
        return torch.log_softmax(self.proj(x), dim = -1)

class Transformer(nn.Module):

    def __init__(self, encoder: Encoder, decoder: Decoder, src_embed: InputEmbeddings, tgt_embed: InputEmbeddings, src_pos: PositionalEncoding, tgt_pos: PositionalEncoding, projection_layer: ProjectionLayer) -> None:
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_embed = src_embed
        self.tgt_embed = tgt_embed
        self.src_pos = src_pos
        self.tgt_pos = tgt_pos
        self.projection_layer = projection_layer

    def encode(self, src, src_mask):
        src = self.src_embed(src)
        src = self.src_pos(src)
        return self.encoder(src, src_mask)

    def decode(self, tgt, encoder_output, src_mask, tgt_mask):
        tgt = self.tgt_embed(tgt)
        tgt = self.tgt_pos(tgt)
        return self.decoder(tgt, encoder_output, src_mask, tgt_mask)

    def project(self, x):
        return self.projection_layer(x)


def build_transformer(src_vocab_size: int, tgt_vocab_size: int, src_seq_len: int, tgt_seq_len: int, d_model: int = 512, N: int = 6, h: int = 8, dropout: float = 0.1, d_ff: int = 2048) -> Transformer:
    src_embed = InputEmbeddings(d_model, src_vocab_size)
    tgt_embed = InputEmbeddings(d_model, tgt_vocab_size)
    src_pos = PositionalEncoding(d_model, src_seq_len, dropout)
    tgt_pos = PositionalEncoding(d_model, tgt_seq_len, dropout)

    encoder_blocks = []
    for _ in range(N):
        encoder_self_attention_block = MultiHeadAttentionBlock(d_model, h, dropout)
        feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)
        encoder_block = EncoderBlock(encoder_self_attention_block, feed_forward_block, dropout)
        encoder_blocks.append(encoder_block)

    decoder_blocks = []
    for _ in range(N):
        decoder_self_attention_block = MultiHeadAttentionBlock(d_model, h, dropout)
        decoder_cross_attention_block = MultiHeadAttentionBlock(d_model, h, dropout)
        feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)
        decoder_block = DecoderBlock(decoder_self_attention_block, decoder_cross_attention_block, feed_forward_block, dropout)
        decoder_blocks.append(decoder_block)

    encoder = Encoder(nn.ModuleList(encoder_blocks))
    decoder = Decoder(nn.ModuleList(decoder_blocks))
    projection_layer = ProjectionLayer(d_model, tgt_vocab_size)
    transformer = Transformer(encoder, decoder, src_embed, tgt_embed, src_pos, tgt_pos, projection_layer)

    for p in transformer.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform(p)


    return transformer



In [None]:
def make_pad_mask(tensor, pad_idx):
    """
    tensor: (batch_size, seq_len)
    pad_idx: <pad> に対応するインデックス
    return: (batch_size, 1, 1, seq_len)  [TransformerのAttnが期待する形状]
    """
    # pad 部分が 0 のマスクを作りたい場合は以下のように == pad_idx
    mask = (tensor != pad_idx).unsqueeze(1).unsqueeze(2)
    return mask  # True=Attend, False=Ignore

def make_subsequent_mask(tensor):
    """
    tensor: (batch_size, seq_len)
    サブシーケンスマスク (未来の単語を参照しないため) を作成
    """
    batch_size, seq_len = tensor.size()
    # (seq_len, seq_len) のアッパートライアングル部分が Trueになるマスク
    subsequent_mask = torch.triu(torch.ones(seq_len, seq_len), diagonal=1).bool()
    # バッチ次元を追加
    subsequent_mask = subsequent_mask.unsqueeze(0).repeat(batch_size, 1, 1)
    # 最終的には (batch_size, 1, seq_len, seq_len) などに reshape
    # MultiHeadAttentionBlock の実装に合わせて必要に応じて reshape
    return subsequent_mask.unsqueeze(1)  # (B, 1, seq_len, seq_len)


In [None]:
# (1) モデルのビルド
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 上で得た単語数
enc_vocab_size = len(j_v)
dec_vocab_size = len(e_v)

# トークン列の長さ + <bos>, <eos> で固定長(=16)を想定しているなら
src_seq_len = j_word_count + 2  # 14 + <bos>,<eos> の分
tgt_seq_len = e_word_count + 2  # 14 + <bos>,<eos> の分

model = build_transformer(
    src_vocab_size=enc_vocab_size,
    tgt_vocab_size=dec_vocab_size,
    src_seq_len=src_seq_len,
    tgt_seq_len=tgt_seq_len,
    d_model=128,     # 小さめに設定（メモリに応じて調整）
    N=2,            # 層数（デモ用に2）
    h=4,            # ヘッド数（同上）
    dropout=0.1,
    d_ff=256        # FF層の次元
).to(device)

# (2) ロス関数とオプティマイザ
# ProjectionLayer が log_softmax を返すので NLLLoss を使用
criterion = nn.NLLLoss(ignore_index=e_v['<pad>'])
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# (3) 学習ループ
num_epochs = 10
model.train()

for epoch in range(num_epochs):
    total_loss = 0.0

    for batch in data_loader:
        # batch は {"text": text, "dec_input": dec_input, "dec_target": dec_target}
        src = batch["text"].to(device)       # (B, src_seq_len)
        dec_in = batch["dec_input"].to(device)  # (B, tgt_seq_len)
        dec_tgt = batch["dec_target"].to(device) # (B, tgt_seq_len)

        # --- マスク作成 (PADトークン無視用) ---
        src_mask = make_pad_mask(src, pad_idx=j_v['<pad>'])
        tgt_mask = make_pad_mask(dec_in, pad_idx=e_v['<pad>'])

        # --- (必要に応じて) サブシーケンスマスクを tgt_mask に組み合わせ ---
        # subsequent = make_subsequent_mask(dec_in)
        # tgt_mask = tgt_mask & subsequent  # 未来を見ない & PADを見ない

        # 順伝播
        encoder_output = model.encode(src, src_mask)
        decoder_output = model.decode(dec_in, encoder_output, src_mask, tgt_mask)
        logits = model.project(decoder_output)  # (B, tgt_seq_len, dec_vocab_size)

        # ロス計算
        # nn.NLLLoss の入力は (N, C, ...) なので次元を入れ替える
        loss = criterion(logits.permute(0, 2, 1), dec_tgt)

        # 逆伝播 & パラメータ更新
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(data_loader)
    print(f"[Epoch {epoch+1}] loss: {avg_loss:.4f}")

    # --- (ここで Google Drive にモデルの重みを保存) ---
    save_path = f"/content/drive/MyDrive/Transformer/transformer_epoch_{epoch+1}.pth"
    torch.save(model.state_dict(), save_path)
    print(f"Model weights saved to {save_path}")


In [None]:
model = build_transformer(
    src_vocab_size=enc_vocab_size,
    tgt_vocab_size=dec_vocab_size,
    src_seq_len=src_seq_len,
    tgt_seq_len=tgt_seq_len,
    d_model=128,
    N=2,
    h=4,
    dropout=0.1,
    d_ff=256
).to(device)

model.load_state_dict(torch.load("/content/drive/MyDrive/Transformer/transformer_epoch_10.pth"))
print("Model weights loaded successfully!")