<a href="https://colab.research.google.com/github/RealMyeong/Going_Deeper_NLP/blob/main/%ED%8A%B8%EB%9E%9C%EC%8A%A4%ED%8F%AC%EB%A8%B8_%EC%BD%94%EB%93%9C_%EB%AC%B4%EC%A7%80%EC%84%B1%EB%B0%98%EB%B3%B5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 눈감고 코드 작성 가능할때까지 때려박자.

In [None]:
def positional_encoding(pos, d_model):
  def cal_angle(position, i):
    return position / np.power(10000, (2*int(i))/d_model)
  
  def get_posi_angle_vec(position):
    return[cal_angle(position, i) for i in range(d_model)]
  
  sinusoid_table = np.array([get_posi_angle_vec(pos_i) for pos_i in range(pos)])
  sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2])
  sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2])
  return sinusoid_table

class MultiHeadAttention(tf.keras.layers.Layer):
  def __init__(self, d_model, num_heads):
    super(MultiHeadAttention, self).__init__()
    self.num_heads = num_heads
    self.d_model = d_model
    self.depth = d_model // self.num_heads
    self.W_q = tf.keras.layers.Dense(d_model)
    self.W_k = tf.keras.layers.Dense(d_model)
    self.W_v = tf.keras.layers.Dense(d_model)

    self.linear = tf.keras.layers.Dense(d_model)
  
  def scaled_dot_product_attention(self, Q, K, V, mask):
    d_k = tf.cast(K.shape[-1], tf.float32)
    QK = tf.matmul(Q, K, transpose_b=True)
    sclaed_qk = QK / tf.math.sqrt(d_k)

    if mask is not None:
      scaled_qk += (mask * 1e-9)
    
    attentions = tf.nn.softmax(scaled_qk, axis=-1)
    out = tf.matmul(attentions, V)
    return out, attentions
  
  def split_heads(self, x):
    batch_size = x.shape[0]
    split_x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
    split_x = tf.transpose(split_x, perm=[0,2,1,3])
    
    return split_x
  
  def combine_heads(self, x):
    batch_size = x.shape[0]
    combined_x = tf.transpose(x, perm=[0,2,1,3])
    combined_x = tf.reshape(x, (batch_size, -1, self.d_model))

    return combined_x
  
  def call(self, Q, K, V, mask):
    WQ = self.W_q(Q)
    WK = self.W_k(K)
    WV = self.W_v(V)

    WQ_split = self.split_heads(WQ)
    WK_split = self.split_heads(WK)
    WV_split = self.split_heads(WV)

    out, attention_weights = self.scaled_dot_product_attention(WQ_split, WK_split, WV_split, mask)
    out = self.combine_heads(out)
    out = self.linear(out)

    return out, attention_weights


class PoswiseFeedForwardNet(tf.keras.layers.Layer):
  def __init__(self, d_model, d_ff):
    super(PoswiseFeedForwardNet, self).__init__()
    self.W_1 = tf.keras.layers.Dense(d_ff, activation='relu')
    self.W_2 = tf.keras.layers.Dense(d_model)
  
  def call(self, x):
    out = self.W_1(x)
    out = self.W_2(out)

    return out


class EncoderLayer(tf.keras.layers.Layer):
  def __init__(self, d_model, n_heads, d_ff, dropout):
    super(EncoderLayer, self).__init__()
    self.enc_self_attention = MultiHeadAttention(d_model, n_heads)
    self.ffn = PoswiseFeedForwardNet(d_model, d_ff)
    self.norm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
    self.norm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
    self.dropout = tf.keras.layers.Dropout(dropout)
  
  def call(self, x, mask):
    residual = x
    out = self.norm1(x)
    out, enc_attn = self.enc_self_attention(out, out, out, mask)
    out = self.dropout(out)
    out += residual

    residual = x
    out = self.norm2(out)
    out = self.ffn(out)
    out = self.dropout(out)
    out += residual

    return out, enc_attn

class DecoderLayer(tf.keras.layers.Layer):
  def __init__(self, d_model, num_heads, d_ff, dropout):
    super(DecoderLayer, self).__init__()
    self.dec_self_attn = MultiHeadAttention(d_model, num_heads)
    self.enc_dec_attn = MultiHeadAttention(d_model, num_heads)
    self.ffn = PoswiseFeedForwardNet(d_model, d_ff)
    self.norm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
    self.norm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
    self.norm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
    self.dropout = tf.keras.layers.Dropout(dropout)

  def call(self, x, enc_out, causality_mask, padding_mask):
    residual = x
    out = self.norm1(x)
    out, dec_attn = self.dec_self_attn(out, out, out, padding_mask)
    out = self.dropout(out)
    out += residual

    residual = out
    out = self.norm2(out))
    out, enc_dec_attn = self.enc_dec_attn(out, enc_out, enc_out, causality_mask)
    out = self.dropout(out)
    out += residual

    residual = out
    out = self.norm3(out)
    out = self.ffn(out)
    out = self.dropout(out)
    out += residaul

    return out, dec_attn, enc_dec_attn


class Encoder(tf.keras.Model):
  def __init__(self,
               n_layers,
               d_model,
               n_heads,
               d_ff,
               dropout):
    super(Encoder, self).__init__()
    self.n_layers = n_layers
    self.enc_layers = [EncoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)]

    def call(self, x, mask):
      out = x
      enc_attn = list()
      for i in range(self.n_layers):
        out, enc_attn = self.enc_layers[i](out, mask)
        enc_attns.append(enc_attn)
      
      return out, enc_attn
  
class Decoder(tf.keras.Model):
  def __init__(self,
               n_layers,
               d_model,
               n_heads,
               d_ff,
               dropout):
    super(Decoder, self).__init__()
    self.n_layers=n_layers
    self.dec_layers = [DecoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)]

  def call(self, x, enc_out, causality_mask, padding_mask):
    out = x
    dec_attns = list()
    dec_enc_attns = list()
    for i in range(self.n_layers):
      out, dec_attns, enc_dec_attns = self.dec_layers[i](out, enc_out, causality_mask, padding_mask)

      dec_attns.append(dec_attn)
      dec_enc_attns(dec_enc_attn)
    return out, dec_attns, dec_enc_attns

class Transformer(tf.keras.Model):
  def __init__(self,
               n_layers,
               d_model,
               n_heads,
               d_ff,
               src_vocab_size,
               tgt_vocab_size,
               pos_len,
               dropout=0.2,
               shared=True):
    super(Transformer, self).__init__()
    self.d_model = tf.cast(d_model, tf.float32)
    self.enc_emb = tf.keras.layers.Embedding(src_vocab_size, d_model)
    self.dec_emb = tf.keras.layers.Embedding(tat_vocab_size, d_model)
    self.pos_encoding = positional_encoding(pos_len, d_model)
    self.dropout = tf.keras.layers.Dropout(dropout)
    self.encoder = Encoder(n_layers, d_model, n_heads, d_ff, dropout)
    self.decoder = Decoder(n_layers, d_model, n_heads, d_ff, dropout)
    self.fc = tf.keras.layers.Dense(tgt_vocab_size)
    self.shared = shared
    if shared: self.fc.set_weights(tf.transpose(self.dec_emb.weights))
  
  def embedding(self, emb, x):
    seq_len = x.shape[1]
    out = emb(x)
    if self.shared: out *= tf.math.sqrt(self.d_model)
    out += self.pos_encoding[np.newaxis, ...][:, :seq_len, :]
    out = self.dropout(out)

    return out
  
  def call(self, enc_in, dec_in, enc_mask, causality_mask, dec_mask):
    enc_in = self.embedding(self.enc_emb, enc_in)
    dec_in = self.embedding(self.dec_emb, dec_in)
    enc_out, enc_attns = self.encoder(enc_in, enc_mask)
    dec_out, dec_attns, dec_enc_attns = self.decoder(dec_in, enc_out, causality_mask, dec_mask)
    logits = self.fc(dec_out)
    return logits, enc_attns, dec_attns, dec_enc_attns