In [None]:
%pip install torchtext==0.6.0
%%capture

In [None]:
import spacy

spacy_en = spacy.load('en_core_web_sm') # 영어 토큰화(tokenization)
spacy_de = spacy.load('de_core_news_sm') # 독일어 토큰화(tokenization)

In [None]:
tokenized = spacy_en.tokenizer("I am a graduate student.")
for i, token in enumerate(tokenized):
    print(f"index {i}: {token.text}")

In [None]:
def tokenize_de(text):
    return [token.text for token in spacy_de.tokenizer(text)]

def tokenize_en(text):
    return [token.text for token in spacy_en.tokenizer(text)]

In [None]:
from torchtext.data import Field, BucketIterator

SRC = Field(tokenize=tokenize_de, init_token="<sos>", eos_token="<eos>", lower=True, batch_first=True)
TRG = Field(tokenize=tokenize_en, init_token="<sos>", eos_token="<eos>", lower=True, batch_first=True)

In [None]:
from torchtext.datasets import Multi30k

train_dataset, valid_dataset, test_dataset = Multi30k.splits(exts=(".de", ".en"), fields=(SRC, TRG))

In [None]:
print(f"training dataset size: {len(train_dataset.examples)}")
print(f"validation dataset size: {len(valid_dataset.examples)}")
print(f"testing dataset size: {len(test_dataset.examples)}")

print(vars(train_dataset.examples[30])['src'])
print(vars(train_dataset.examples[30])['trg'])

In [None]:
SRC.build_vocab(train_dataset, min_freq=2)
TRG.build_vocab(train_dataset, min_freq=2)

print(f"len(SRC): {len(SRC.vocab)}")
print(f"len(TRG): {len(TRG.vocab)}")

print(TRG.vocab.stoi["abcabc"]) # 없는 단어: 0
print(TRG.vocab.stoi[TRG.pad_token]) # 패딩(padding): 1
print(TRG.vocab.stoi["<sos>"]) # <sos> : 2
print(TRG.vocab.stoi["<eos>"]) # <eos> : 3
print(TRG.vocab.stoi["hello"])
print(TRG.vocab.stoi["world"])

In [None]:
import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

BATCH_SIZE = 128

# 일반적인 데이터 로더(data loader)의 iterator와 유사하게 사용 가능
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_dataset, valid_dataset, test_dataset),
    batch_size=BATCH_SIZE,
    device=device)

In [None]:
for i, batch in enumerate(train_iterator):
    src = batch.src
    trg = batch.trg

    print(f"first batch size: {src.shape}")
    
    # 현재 배치에 있는 하나의 문장에 포함된 정보 출력
    for i in range(src.shape[1]):
        print(f"index {i}: {src[0][i].item()}") # 여기에서는 [Seq_num, Seq_len]

    # 첫 번째 배치만 확인
    break

Embedding: [num_words, d_model]

Q, K: [num_words, d_k]

V: [num_words, d_v]

Attention(Q, K, V): [num_words, d_v]

Concat(head_1, ..., head_h): [num_words, h * d_v]

MultiHeadAttention(Q, K, V): [num_words, d_model]

In [75]:
import torch.nn as nn

class Multi_Head_Attention_Layer(nn.Module):
    def __init__(self, d_model, d_k, d_v, n_heads, dropout_ratio, device):
        super().__init__()
        
        assert d_model % n_heads == 0 and d_model // n_heads == d_v
        self.d_model = d_model # 512
        self.d_k = d_k # 64
        self.d_v = d_v # 64
        self.n_heads = n_heads # 8
        
        self.fc_Q = nn.Linear(d_model, d_k * n_heads) # W_Q: [d_model, d_k * n_heads]
        self.fc_K = nn.Linear(d_model, d_k * n_heads) # W_K: [d_model, d_k * n_heads]
        self.fc_V = nn.Linear(d_model, d_v * n_heads) # W_Q: [d_model, d_v]
        
        self.fc_O = nn.Linear(d_model, d_model) # W_O: [d_model, d_model]
        
        self.dropout = nn.Dropout(dropout_ratio)
        
        self.scale = torch.sqrt(torch.FloatTensor([self.d_k])).to(device)
    
    def forward(self, query, key, value, mask = None):
        batch_size = query.shape[0] # query: [batch_size, num_words, d_model]
        Q = self.fc_Q(query) # Q: [batch_size, num_words, d_k * n_heads]
        K = self.fc_K(key) # K: [batch_size, num_words, d_k * n_heads]
        V = self.fc_V(value) # V: [batch_size, num_words, d_v * n_heads]
        
        Q = Q.view(batch_size, -1, self.n_heads, self.d_k).permute(0, 2, 1, 3) # Q: [batch_size, n_heads, num_words, d_k]
        K = K.view(batch_size, -1, self.n_heads, self.d_k).permute(0, 2, 1, 3) # K: [batch_size, n_heads, num_words, d_k]
        V = V.view(batch_size, -1, self.n_heads, self.d_v).permute(0, 2, 1, 3) # V: [batch_size, n_heads, num_words, d_v]
        
        energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale # similarity between Q and K
        # energy: [batch_size, n_heads, num_words, num_words]
        print("energy size: ", energy.shape)
        
        if mask:
            energy = energy.masked_fill(mask == 0, -1e10)
        
        attention = torch.softmax(energy, dim = -1) # [batch_size, n_heads, num_words, num_words]
        
        x = torch.matmul(self.dropout(attention), V) # [batch_size, n_heads, num_words, d_v]
        x = x.permute(0, 2, 1, 3).contiguous()       # [batch_size, num_words, n_heads, d_v]
        x = x.view(batch_size, -1, self.d_model)     # [batch_size, num_words, d_model]
        x = self.fc_O(x)                             # [batch_size, num_words, d_model]

        return x, attention