<a href="https://colab.research.google.com/github/JYP0824/Personal-Project/blob/main/Attention_is_all_you_need.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Encoder

In [15]:
import torch
import math
import torch.nn as nn

In [16]:
class PE(nn.Module):
  def __init__(self, seq_len, d_model):
    super(PE, self).__init__()

    self.encoding = torch.zeros(seq_len, d_model, device=device)
    encoding.requires_grad=False

    pos = torch.arange(0,seq_len, device=device)
    pos = pos.float().unsqueeze(dim=1)

    _2i = torch.arange(0, d_model, step=2, dvice=device)

    encoding[:,0::2] = torch.cos(pos/10000**(_2i/d_model))
    encoding[:,1::2] = torch.sin(pos/10000**(_2i/d_model))

  def forward(self, x):
    batch_size, seq_len  = x.size()
    return encoding[:seq_len, :]

In [17]:
class ScaledDotproductAttention(nn.Module):
  def __init__(self):
    super(ScaledDotproductAttention, self).__init__()

    self.softmax = nn.Softmax()

  def forward(self, q, k, v, mask=None, e=1e-12):
    batch_size, num_head, length, d_k = k.size()
    k_t = torch.view(batch_size, num_head, d_k, length)
    score = (q @ k_t) / math.sqrt(d_k)

    if mask is not None:
      score = score.masked_fill(mask ==0, -e)

    score = self.softmax(score)
    v = score @ v

    return v, score

In [18]:
class MultiHeadAttention(nn.Module):
  def __init__(self, d_model, n_head):
    super(MultiHeadAttention, self).__init__()

    self.w_q = nn.Linear(d_model, d_model)
    self.w_k = nn.Linear(d_model, d_model)
    self.w_v = nn.Linear(d_model, d_model)
    self.w_concat = nn.Linear(d_model, d_model)
    self.attention = ScaledDotproductAttention()

  def split(self, tensor):
    batch_size, head, length, d_model = tensor.size()

    d_k = d_model // self.n_head
    tensor = tensor.view(batch_size, self.n_head, length, d_k)
    return tensor

  def concat(self, tensor):
    batch_size, head, length, d_k = tensor.size()
    d_model = head * d_k

    tensor = tensor.view(batch_size, length, d_model)
    return tensor

  def forward(self,q,k,v,mask=None):
    q,k,v = self.w_q(q), self.w_k(k), self.w_v(v)
    q,k,v = self.split(q), self.split(k), self.split(v)

    temp1, attention = self.attention(q,k,v,mask=mask)
    temp2 = self.concat(temp1)
    output = self.w_concat(output)
    return output

In [19]:
class LayerNorm(nn.Module):
    def __init__(self,d_model,eps = 1e-12):
        super(LayerNorm,self).__init__()
        self.gamma = nn.Parameter(torch.ones(d_model))
        self.beta = nn.Parameter(torch.zeros(d_model))
        self.eps = eps

    def forward(self,x):
        mean = x.mean(-1,keepdim = True)
        std = x.std(-1,keepdim = True)
        # '-1' means last dimension

        out = (x-mean)/(std + self.eps)
        out = self.gamma * out + self.beta

        return out

In [20]:
class FFN(nn.Module):
  def __init__(self, d_model, hidden, drop):
    super(FFN, self).__init__()

    self.lr1 = nn.Linear(d_model, hidden)
    self.lr2 = nn.Linear(hidden, d_model)
    self.relu = nn.ReLU()
    self.dropout = nn.Dropout(p=drop)

  def forward(self, x):
    temp1 = self.lr1(x)
    temp1 = self.relu(x)
    temp1 = self.dropout(x)
    result = self.lr2(x)
    return result

In [21]:
class EncoderLayer(nn.Module):
  def __init__(self, d_model, n_head, hidden, drop):
    super(EncoderLayer, self).__init__()

    self.attention = MultiHeadAttention(d_model, n_head)
    self.norm = LayerNorm(d_model)
    self.dropout = nn.Dropout(drop)
    self.ffn = FFN(d_model, hidden, drop)

  def forward(self, x, src_mask):
    res1_x = x
    x = self.attention(q=x, k=x, v=x, mask=src_mask)
    norm1_x = self.norm(res1_x + x)
    x = self.dropout(norm1_x)

    res2_x = x
    x = self.ffn(x)
    norm2_x = self.norm(res2_x + x)
    x = self.dropout(norm2_x)

    return x

In [22]:
class Encoder(nn.Module):
  def __init__(self, voc_size, d_model, hidden, n_layers, n_head, seq_len, drop, device):
    super(Encoder, self).__init__()

    self.embed = nn.Embedding(num_embeddings = voc_size, embed_dim=d_model, padding_idx=1)
    self.pe = PE(seq_len, d_model)
    self.layers = nn.ModuleList([Encoder(d_model=d_model, n_head=n_head, hidden=hidden, drop=drop) for _ in range(n_layers)])

  def forward(self, x, src_mask):
    x = self.embed(x)
    pe_x = self.pe(x)
    x = x + pe_x
    for layer in self.layers:
      x = layer(x, src_mask)

    return x

# Decoder

In [23]:
class DecoderLayer(nn.Module):
  def __init__(self, d_model, n_head, hidden, drop):
    super(DecoderLayer, self).__init__()

    self.masked_attention = MultiHeadAttention(d_model=d_model, n_head=n_head)
    self.norm = LayerNorm(d_model=d_model)
    self.dropout = nn.Dropout(drop)

    self.cross_attention = MultiHeadAttention(d_model=d_model, n_head=n_head)
    self.ffn = FFN(d_model=d_model, hidden=hidden, drop=drop)

  def forward(self, x_dec, enc, trg_mask, src_mask):
    res1_x = x_dec
    x_dec = self.masked_attention(q=x_dec, k=x_dec, v=x_dec, mask=trg_mask)
    x = self.norm(res1_x + x_dec)
    x = self.dropout(x)

    if enc is not None:
      res2_x = x
      x = self.cross_attention(q=x, k=enc, v=enc, mask=src_mask)
      x = self.norm(res2_x + x)
      x = self.dropout(x)

    res3_x = x
    x = self.ffn(x)
    x = self.norm(res3_x + x)
    x = self.dropout(x)

    return x

In [24]:
class Decoder(nn.Module):
  def __init__(self, voc_size, d_model, seq_len, hidden, n_head, n_layer, drop, device):
    super(Decoder, self).__init__()

    self.embed = nn.Embedding(num_embeddings=voc_size, embedding_dim=d_model, padding_idx=1)
    self.pe = PE(seq_len=seq_len, d_model=d_model, device='cuda')
    self.layers = nn.ModuleList([DecoderLayer(d_model=d_model, n_head=n_head, hidden=hidden, drop=drop) for _ in range(n_layer)])
    self.lr = nn.Linear(d_model, voc_size)
    self.softmax = nn.Softmax()

  def forward(self, x, trg_mask, src_mask):
    x = self.embed(x)
    pe_x = self.pe(x)
    x = x + pe_x

    for layer in self.layers:
      x = layer(x, trg_mask, src_mask)

    x = self.lr(x)
    output = self.softmax(x)

    return output

In [25]:
class Transformer(nn.Module):
    def __init__(self,src_pad_idx,trg_pad_idx,trg_sos_idx,enc_voc_size,dec_voc_size,d_model,n_head,max_len, ffn_hidden,n_layers,drop_prob,device):
        super().__init__()

        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.trg_sos_idx = trg_sos_idx

        self.encoder = Encoder(enc_voc_size = enc_voc_size, max_len = max_len, d_model = d_model, ffn_hidden = ffn_hidden,  n_head = n_head,
                               n_layers = n_layers, drop_prob = drop_prob, device = device)
        self.decoder = Decoder(dec_voc_size = dec_voc_size, max_len = max_len, d_model = d_model, ffn_hidden = ffn_hidden, n_head = n_head,
                               n_layers = n_layers, drop_prob = drop_prob, device = device)
        self.device = device

    def make_pad_mask(self,q,k):
        len_q,len_k = q.size(1),k.size(1)
        print(len_k)

        k = k.ne(self.src_pad_idx).unsqueeze(1).unsqueeze(2)
        print(k.shape)

        k = k.repeat(1,1,len_q,1)
        q = q.ne(self.src_pad_idx).unsqueeze(1).unsqueeze(3)
        q = q.repeat(1,1,1,len_k)

        mask = k & q

        return mask

    def make_no_peak_mask(self,q,k):
        len_q,len_k = q.size(1),k.size(1)
        mask = torch.tril(torch.ones(len_q,len_k)).type(torch.BoolTensor).to(self.device)

        return mask

    def forward(self,src,trg):
        src_mask = self.make_pad_mask(src,src)
        src_trg_mask = self.make_pad_mask(trg,src)
        trg_mask = self.make_pad_mask(trg,trg) * self.make_no_peak_mask(trg,trg)
        enc_src = self.encoder(src,src_mask)
        output = self.decoder(trg,enc_src,trg_mask,src_trg_mask)

        return output