In [8]:
import torch
import torch.nn as nn

#Multi-Head Attention Layer

In [9]:
class MultiHeadAttentionLayer(nn.Module):
    def __init__(self,hidden_dim, n_heads, dropout_ratio):
        '''
        hidden_dim : 하나의 단어에 대한 임베딩 차원
        n_heads : head의 개수. 즉, scaled dot product attention의 개수. 이후 concat함.
        dropout_ratio : dropout 비율
        '''

        super().__init__()

        assert hidden_dim % n_heads == 0 #hidden_dim을 n_heads로 나눴을 때 나머지가 0임을 가정.

        self.hidden_dim = hidden_dim #embedding dimension.
        self.n_heads = n_heads #head의 수. 즉, 서로 다른 attention의 수
        self.head_dim = hidden_dim // n_heads #각 head에서의 embedding 차원

        self.fc_q = nn.Linear(hidden_dim, hidden_dim)
        self.fc_k = nn.Linear(hidden_dim, hidden_dim)
        self.fc_v = nn.Linear(hidden_dim, hidden_dim)

        self.fc_o = nn.Linear(hidden_dim, hidden_dim)

        self.dropout_ratio = dropout_ratio

        self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to(device)

    def forward(self, query, key, value, mask = None):

        # query: [batch_size, query_len, hidden_dim]
        # key: [batch_size, key_len, hidden_dim]
        # value: [batch_size, value_len, hidden_dim]

        batch_size = query.shape[0]

        Q = self.fc_q(query)# Q: [batch_size, query_len, hidden_dim]
        K = self.fc_k(key)# K: [batch_size, key_len, hidden_dim]
        V = self.fc_v(value)# V: [batch_size, value_len, hidden_dim]

        #hidden_dim을 (n_heads x head_dim)으로 나누기
        #n_head개의 서로 다른 attention을 학습할 수 있도록 한다.
        Q = Q.view(batch_size, -1, self.n_heads, self.head_dim).permutate(0, 2, 1, 3)# Q: [batch_size, n_heads, query_len, head_dim]
        K = K.view(batch_size, -1, self.n_heads, self.head_dim).permutate(0, 2, 1, 3)# K: [batch_size, n_heads, key_len, head_dim]
        V = V.view(batch_size, -1, self.n_heads, self.head_dim).permutate(0, 2, 1, 3)# V: [batch_size, n_heads, value_len, head_dim]

        #attention energy 계산
        energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale # energy: [batch_size, n_heads, query_len, key_len]

        #mask를 사용하는 경우(decoder)
        if mask is not None:
            energy = energy.masked_fill(mask == 0, -1e10)

        #attention score(각 단어에 대한 확률값)
        attention = torch.softmax(energy, dim = -1) #각 head별 attention이니까 ???
        # attention: [batch_size, n_heads, query_len, key_len]

        #scaled dot product attention
        x = torch.matmul(self.dropout(attention), V) # x: [batch_size, n_heads, query_len, head_dim]

        x = x.permute(0, 2, 1, 3).contiguous() #permute는 non-contiguous한 tensor을 return하기 때문.
        # x: [batch_size, query_len, n_heads, head_dim]

        x = x.view(batch_size, -1, self.hidden_dim) #view로 concat을 한 것과 동일한 효과(n_heads와 head_dim이 다시 하나로 합쳐지니까.)
        #view는 contiguous한 tensor만 입력으로 받음. 그래서 위에서 contiguous
        # x: [batch_size, query_len, hidden_dim]

        x = self.fc_o(x)

        return x, attention #attention 결과값은 나중에 시각화 할 수 있도록.



#PositionWise Feed Forward

In [10]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, hidden_dim, pf_dim, dropout_ratio):
        '''
        hidden_dim : 하나의 단어에 대한 임베딩 차원. 논문에 따르면 512
        pf_dim : feedforward layer에서의 내부 임베딩 차원. 논문에 따르면 2048
        dropout_ratio : 드랍아웃 비율
        '''
        super().__init__()

        self.fc_1 = nn.Linear(hidden_dim, pf_dim)
        self.relu = nn.ReLU(inplace = True)
        self.fc_2 = nn.Linear(pf_dim, hidden_dim)

        self.dropout = nn.Dropout(dropout_ratio)

    def forward(self, x):
        x = self.fc_1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc_2(x)

        return x


#Encoder

In [11]:
class EncoderLayer(nn.Module):
    def __init__(self, hidden_dim, n_heads, pf_dim, dropout_ratio, device):
        '''
        hidden_dim : 하나의 단어에 대한 임베딩 차원
        n_heads : head의 개수. 즉, attention의 개수.
        pf_dim : feedforward 내부에서의 임베딩 차원
        dropout_ratio : 드랍아웃 비율
        '''
        super().__init__()

        self.self_attention_layer_norm = nn.LayerNorm(hidden_dim) #layernorm은 batch 내에서 normalization. BN은 각 batch간 동일한 feature을 norm.
        self.ff_layer_norm = nn.LayerNorm(hidden_dim)
        self.self_attention = MultiHeadAttentionLayer(hidden_dim, n_heads, dropout_ratio, device)
        self.positionwise_feedforward = PositionWiseFeedForward(hidden_dim, pf_dim, dropout_ratio)
        self.dropout = nn.Dropout(dropout_ratio)

    def forward(self, src, src_mask):
        # src: [batch_size, src_len, hidden_dim]
        # src_mask: [batch_size, src_len]

        _src, _ = self.self_attention(src, src, src, src_mask)

        src = self.self_attention_layer_norm(src + self.dropout(_src)) # src: [batch_size, src_len, hidden_dim]

        _src = self.positionwise_feedforward(src)

        src = self.ff_layer_norm(src + self.dropout(_src)) # src: [batch_size, src_len, hidden_dim]

        return src



In [12]:
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, n_layers, n_heads, pf_dim, dropout_ratio, device, max_length = 100):
        '''
        input_dim : 하나의 단어에 대한 OHE 차원
        hidden_dim : 하나의 단어에 대한 임베딩 차원
        n_layers : 인코딩 레이어의 개수
        n_heads : 인코딩 레이어에서의 head의 개수. 즉, attention의 개수
        pf_dim : feed forward에서의 차원
        dropout_ratio : dropout 비율
        max_length : 문장 내 최대 단어 개수
        '''
        super().__init__()

        self.tok_embedding = nn.Embedding(input_dim, hidden_dim) #embedding차원으로 바꿔주기
        self.pos_embedding = nn.Embedding(max_length, hidden_dim) #학습가능한 pos embedding(논문과는 다름. 논문은 sin cos 함수 사용)

        self.layers = nn.ModuleList([EncoderLayer(hidden_dim, n_heads, pf_dim, dropout_ratio, device) for _ in range(n_layers)]) #layer 수만큼 encoder layer 생성

        self.dropout = nn.Dropout(dropout_ratio)

        self.scale = torch.sqrt(torch.FloatTensor([hidden_dim])).to(device)

    def forward(self, src, src_mask):
        # src: [batch_size, src_len]
        # src_mask: [batch_size, src_len]

        batch_size = src.shape[0] #문장의 개수
        src_len = src.shape[1] #모든 문장들 중 max(len(문장))

        pos = torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1).to(self.device) #각 문장마다 적용하기 위해 repeat으로 batch_size만큼 복제.
        # pos: [batch_size, src_len]

        src = self.dropout((self.tok_embedding(src) * self.scale) + self.pos_embedding(pos)) #단어 임베딩 한 값 + 위치 임베딩 한 값
        # src: [batch_size, src_len, hidden_dim]

        for layer in self.layers:
            src = layer(src, src_mask)

        return src # 마지막 레이어의 출력을 반환

#Decoder

In [13]:
class DecoderLayer(nn.Module):
    def __init__(self, hidden_dim, n_heads, pf_dim, dropout_ratio, device):
        '''
        hidden_dim : 하나의 단어에 대한 embedding dimension
        n_heads : attention 갯수.
        pf_dim : feedforward 내부 임베딩 차원
        dropout_ratio : dropout 비율
        '''
        super().__init__()

        self.self_attention_layer_norm = nn.LayerNorm(hidden_dim)
        self.enc_attention_layer_norm = nn.LayerNorm(hidden_dim)
        self.ff_layer_norm = nn.LayerNorm(hidden_dim)
        self.self_attention = MultiHeadAttentionLayer(hidden_dim, n_heads, dropout_ratio, device)
        self.encoder_attention = MultiHeadAttentionLayer(hidden_dim, n_heads, dropout_ratio, device)
        self.positionwise_feedforward = PositionWiseFeedForward(hidden_dim, pf_dim, dropout_ratio)
        self.dropout = nn.Dropout(dropout_ratio)

    def forward(self, trg, enc_src, trg_mask, src_mask):
        # trg: [batch_size, trg_len, hidden_dim]
        # enc_src: [batch_size, src_len, hidden_dim]
        # trg_mask: [batch_size, trg_len]
        # src_mask: [batch_size, src_len]

        #self attention
        _trg, _ = self.self_attention(trg, trg, trg, trg_mask)
        trg = self.self_attention_layer_norm(trg + self.dropout(_trg))
        # trg: [batch_size, trg_len, hidden_dim]

        #encoder attention
        #decoder의 query를 이용해 encoder을 attention
        _trg, attention = self.encoder_attention(trg, enc_src, enc_src, src_mask)
        trg = self.enc_attention_layer_norm(trg + self.dropout(_trg))

        #positionwise feedforward
        _trg = self.positionwise_feedforward(trg)
        trg = self.ff_layer_norm(trg + self.dropout(_trg))

        return trg, attention



In [14]:
class Decoder(nn.Module):
    def __init__(self, output_dim, hidden_dim, n_layers, n_heads, pf_dim, dropout_ratio, device, max_length=100):
        '''
        output_dim : 하나의 단어에 대한 OHE 차원
        hidden_dim : 하나의 단어에 대한 embedding 차원
        n_layers : decoder layer의 개수
        n_heads : decoder layer내의 attention 개수. multihead.
        pf_dim : feedforward layer에서의 임베딩 차원
        dropout_ratio : dropout 비율
        max_length : 문장 내 최대 단어 개수
        '''
        super().__init__()

        self.device = device

        self.tok_embedding = nn.Embedding(output_dim, hidden_dim)
        self.pos_embedding = nn.Embedding(max_length, hidden_dim)

        self.layers = nn.ModuleList([DecoderLayer(hidden_dim, n_heads, pf_dim, dropout_ratio, device) for _ in range(n_layers)])

        self.fc_out = nn.Linear(hidden_dim, output_dim)

        self.dropout = nn.Dropout(dropout_ratio)

        self.scale = torch.sqrt(torch.FloatTensor([hidden_dim])).to(device)


    def forward(self, trg, enc_src, trg_mask, src_mask):
        # trg: [batch_size, trg_len]
        # enc_src: [batch_size, src_len, hidden_dim]
        # trg_mask: [batch_size, trg_len]
        # src_mask: [batch_size, src_len]

        batch_size = trg.shape[0]
        trg_len = trg.shape[1]

        pos = torch.arange(0, trg_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)

        trg = self.dropout((self.tok_embedding(trg) * self.scale) + self.pos_embedding(pos))

        for layer in self.layers:
            # 소스 마스크와 타겟 마스크 모두 사용
            trg, attention = layer(trg, enc_src, trg_mask, src_mask)

        # trg: [batch_size, trg_len, hidden_dim]
        # attention: [batch_size, n_heads, trg_len, src_len]

        output = self.fc_out(trg)

        return output, attention



#Transformer

In [15]:
class Transformer(nn.Module):
    def __init__(self, encoder, decoder, src_pad_idx, trg_pad_idx, device):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.device = device

    #소스 문장의 <pad> 토큰 값을 0으로 마스킹.
    def make_src_mask(self, src):
        # src: [batch_size, src_len]

        src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
        # src_mask: [batch_size, 1, 1, src_len]

        return src_mask

    # 타겟 문장에서 각 단어는 다음 단어가 무엇인지 알 수 없도록(이전 단어만 보도록) 만들기 위해 마스크를 사용
    #그리고 <pad> 토큰 값 0으로 마스킹
    def make_trg_mask(self, trg):
        # trg: [batch_size, trg_len]

        trg_pad_mask = (trg != self.trg_pad_idx).unsqueeze(1).unsqueeze(2)
        # trg_pad_mask: [batch_size, 1, 1, trg_len]

        trg_len = trg.shape[1]

        trg_sub_mask = torch.tril(torch.ones((trg_len, trg_len), device = self.device)).bool() #torch.tril은 하삼각행렬 만들 때 사용

        # trg_sub_mask: [trg_len, trg_len]

        trg_mask = trg_pad_mask & trg_sub_mask #둘 다 1인 경우에만 attention score을 구할 수 있도록.

        # trg_mask: [batch_size, 1, trg_len, trg_len]

        return trg_mask
