## Section 1: Encoder

In [None]:
import torch
import torch.nn as nn
from math import sqrt
import torch.nn.functional as F

In [None]:
class Configuration():
  dim_token_emb= 768
  attention_probs_dropout_prob= 0.1
  classifier_dropout= None
  gradient_checkpointing= False
  hidden_act= "gelu"
  hidden_dropout_prob= 0.1
  hidden_size= 768
  initializer_range= 0.02
  intermediate_size= 3072
  layer_norm_eps= 1e-12
  max_position_embeddings= 512
  model_type= "encoder"
  num_attention_heads= 12
  num_hidden_layers= 12
  pad_token_id= 0
  position_embedding_type= "absolute"
  type_vocab_size= 2
  use_cache= True
  vocab_size= 30522


In [None]:
config = Configuration()

In [None]:
config.dim_token_emb

768

In [None]:
def scaled_dot_product_attention(query, key, value):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
    weights = F.softmax(scores, dim=-1)
    result = torch.bmm(weights, value)
    return result

In [None]:
class AttentionHead(nn.Module):
    def __init__(self, embed_dim, head_dim):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)

    def forward(self, hidden_state):
        attn_outputs = scaled_dot_product_attention(
            self.q(hidden_state), self.k(hidden_state), self.v(hidden_state))
        return attn_outputs

    def forward(self, hidden_state, enc_out = None):
        if(enc_out == None):
          attn_outputs = scaled_dot_product_attention(
            self.q(hidden_state), self.k(hidden_state), self.v(hidden_state))
        else:
          attn_outputs = scaled_dot_product_attention(
              self.q(enc_out), self.k(enc_out), self.v(hidden_state))
        return attn_outputs

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        if(config.model_type == "encoder"):
          self.heads = nn.ModuleList(
              [AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
          )
        else:
          self.heads = nn.ModuleList(
              [AttentionHead_mask(embed_dim, head_dim) for _ in range(num_heads)]
          )
        self.output_linear = nn.Linear(embed_dim, embed_dim)


    def forward(self, hidden_state, enc_out=None):
        if(enc_out == None):
          x = torch.cat([h(hidden_state) for h in self.heads], dim=-1)
          x = self.output_linear(x)
        else:
          x = torch.cat([h(hidden_state, enc_out) for h in self.heads], dim=-1)
          x = self.output_linear(x)
        return x

In [None]:
class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

    def forward(self, x):
        x = self.linear_1(x)
        x = self.gelu(x)
        x = self.linear_2(x)
        x = self.dropout(x)
        return x

In [None]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)

    def forward(self, x):
        # Apply layer normalization and then copy input into query, key, value
        hidden_state = self.layer_norm_1(x)
        # Apply attention with a skip connection
        x = x + self.attention(hidden_state)
        # Apply feed-forward layer with a skip connection
        x = x + self.feed_forward(self.layer_norm_2(x))
        return x

In [None]:
encoder_layer = TransformerEncoderLayer(config)

In [None]:
class Embeddings(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.token_embeddings = nn.Embedding(config.vocab_size,
                                             config.hidden_size)
        self.position_embeddings = nn.Embedding(config.max_position_embeddings,
                                                config.hidden_size)
        self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
        self.dropout = nn.Dropout()

    def forward(self, input_ids):
        # Create position IDs for input sequence
        seq_length = input_ids.size(1)
        position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)
        # Create token and position embeddings
        token_embeddings = self.token_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        # Combine token and position embeddings
        embeddings = token_embeddings + position_embeddings
        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings

In [None]:
class TransformerEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embeddings = Embeddings(config)
        self.layers = nn.ModuleList([TransformerEncoderLayer(config)
                                     for _ in range(config.num_hidden_layers)])

    def forward(self, x):
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x)
        return x

In [None]:
encoder = TransformerEncoder(config)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
x = torch.tensor([[1, 5, 6, 4, 3, 9, 5, 2, 0], [1, 8, 7, 3, 4, 5, 6, 7, 2]]).to(device)
x = torch.randint(2, (2, config.max_position_embeddings))

In [None]:
enc_out = encoder(x)

In [None]:
enc_out.size()

torch.Size([2, 512, 768])

## Section 2: Decoder

### Do it your-self!

In [None]:
config.model_type = 'decoder'

In [None]:
def scaled_dot_product_attention(query, key, value, mask=None):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)


    iter = scores.size()[0]
    x = scores.size()[1]
    y = scores.size()[2]

    mask = torch.ones((iter, x, y))

    for k in range(0, iter):
      for i in range(0, x):
        for j in range(i + 1, y):
          mask[k][i][j] = 0

    if mask is not None:
        scores = scores.masked_fill(mask == 0, float("-inf"))
    weights = F.softmax(scores, dim=-1)
    return weights.bmm(value)

In [None]:
class AttentionHead_mask(nn.Module):
    def __init__(self, embed_dim, head_dim):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)

    def forward(self, hidden_state, enc_out=None):
        if(enc_out == None):
          mask = torch.ones((1, 2, 3))
          attn_outputs = scaled_dot_product_attention(
              self.q(hidden_state), self.k(hidden_state), self.v(hidden_state), mask)
        else:
          attn_outputs = scaled_dot_product_attention(
              self.q(enc_out), self.k(enc_out), self.v(hidden_state))
        return attn_outputs

In [None]:
class TransformerDecoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_3 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)

    def forward(self, x, enc_out):
        # Apply layer normalization and then copy input into query, key, value
        hidden_state1 = self.layer_norm_1(x)
        # Apply attention with a skip connection
        x = x + self.attention(hidden_state1)

        #decoder라서 추가
        #------------------------------------------------------------

        hidden_state2 = self.layer_norm_2(x)
        x = x + self.attention(hidden_state2, enc_out)

        #------------------------------------------------------------
        # Apply feed-forward layer with a skip connection
        x = x + self.feed_forward(self.layer_norm_3(x))
        return x

In [None]:
decoder_layer = TransformerDecoderLayer(config)

In [None]:
class TransformerDecoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embeddings = Embeddings(config)
        self.layers = nn.ModuleList([TransformerDecoderLayer(config)
                                     for _ in range(config.num_hidden_layers)])

    def forward(self, x, enc_out):
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x, enc_out)
        return x

In [None]:
y = torch.tensor([[1, 5, 6, 4, 3, 9, 5, 2, 0], [1, 8, 7, 3, 4, 5, 6, 7, 2]]).to(device)
y = torch.randint(2, (2, config.max_position_embeddings))
y.shape

torch.Size([2, 512])

In [None]:
decoder = TransformerDecoder(config)

In [None]:
dec_out = decoder(y, enc_out)

In [None]:
dec_out.size()

torch.Size([2, 512, 768])

In [None]:
dec_out

tensor([[[-4.4505, -3.2537,  1.6898,  ..., -1.9110,  4.5027,  0.1361],
         [ 2.2372, -2.7707,  3.4377,  ..., -3.4716,  3.1217, -1.9748],
         [ 1.8093, -3.9605,  4.9819,  ..., -3.6125, -0.2480, -1.2146],
         ...,
         [ 0.2960, -2.5317,  1.1145,  ..., -3.1193,  3.9762,  3.3910],
         [-2.5233, -2.2717,  1.6913,  ..., -2.0713,  3.6716, -1.1244],
         [ 1.4656,  2.3001,  1.3731,  ..., -2.5724,  2.1282,  2.3143]],

        [[-0.7400, -3.8738, -1.1821,  ...,  1.6580, -2.2482, -5.4730],
         [-0.1189,  1.1953, -0.4870,  ..., -1.5399, -2.4965, -1.0879],
         [ 1.3713,  1.0016,  4.1497,  ..., -1.6893,  3.5379,  0.6695],
         ...,
         [ 1.0667,  1.2944,  2.4996,  ..., -3.2825,  2.9498, -0.4780],
         [-2.8186, -1.4097,  2.5078,  ...,  0.2320,  1.4502, -1.2938],
         [-1.9650, -1.4481,  2.0200,  ...,  1.7980, -0.5777, -0.3220]]],
       grad_fn=<AddBackward0>)