<a href="https://colab.research.google.com/github/Baekhyunjung/study_nlp/blob/main/nlp_camp/study_10%EC%9E%A5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch.utils
import torch.nn as nn
from struct import *

class Encoder(nn.Module):
  def __init__(self, word_vec_dim, hidden_size, n_layers=4, dropout_p=.2):
    super(Encoder, self).__init__()
    self.rnn = nn.LSTM(word_vec_dim,
                       int(hidden_size/2),
                       num_layers=n_layers,
                       dropout=dropout_p,
                       bidirectional=True,
                       batch_first=True)

  def forward(self, emb):
    if isinstance(emb, tuple):
      x, lengths = emb
      x = pack(x, lengths.tolist(), batch_first=True)

    else:
      x = emb

    y, h = self.rnn(x)
    if isinstance(emb, tuple):
      y, _ = unpack(y, batch_first=True)

    return y, h


In [None]:
class Generator(nn.Module):
  def __init__(self, hidden_size, output_size):
    super(Generator, self).__init__()
    self.output = nn.Linear(hidden_size, output_size)
    self.softmax = nn.LogSoftmax(dim=-1)

  def forward(self, x):
    y = self.softmax(self.output(x))
    return y

In [None]:
loss_weight = torch.ones(output_size)
loss_weight[data_loader.PAD] = 0.
crit = nn.NLLLoss(weight=loss_weight, reduction='sum')

In [None]:
import torch

a = [torch.tensor([1,2,3]), torch.tensor([3,4])]
b = torch.nn.utils.rnn.pad_sequence(a, batch_first=True)
print(b)

print(torch.nn.utils.rnn.pack_padded_sequence(b, batch_first=True, lengths=[3,2]))

tensor([[1, 2, 3],
        [3, 4, 0]])
PackedSequence(data=tensor([1, 3, 2, 4, 3]), batch_sizes=tensor([2, 2, 1]), sorted_indices=None, unsorted_indices=None)


In [None]:
dic = {'computer':9, 'dog':2, 'cat':3}

def key_value_func(query):
  weights = []
  for key in dic.keys():
    weights += [is_same(key, query)]

  weight_sum = sum(weights)
  for i, w in enumerate(weights):
    weights[i] = weights[i]/weight_sum

  answer = 0

  for weight, value in zip(weights, dic.values()):
    answer += weight * value

  return answer

def is_same(key, query):
  if key == query:
    return 1.
  else:
    return .0

In [None]:
query = 'puppy'
is_same('computer', query)

0.0

In [None]:
import torch.nn as nn

In [None]:
class Transformer(nn.Module):

	def __init__(self, src_embed, trg_embed, encoder, decoder, fc_layer):
		super(Transformer, self).__init__()
		self.src_embed = src_embed
		self.trg_embed = trg_embed
		self.encoder = encoder
		self.decoder = decoder
		self.fc_layer = fc_layer

	def forward(self, src, trg, src_mask, trg_mask):
		encoder_output = self.encoder(self.src_embed(src), src_mask)
		out = self.decoder(self.trg_embed(trg), trg_mask, encoder_output, src_mask)
		out = self.fc_layer(out)
		out = F.log_softmax(out, dim=-1)
		return out

In [None]:
class Encoder(nn.Module):

	def __init__(self, encoder_layer, n_layer):  # n_layer: Encoder Layer의 개수
		super(Encoder, self).__init__()
		self.layers = []
		for i in range(n_layer):
			self.layers.append(copy.deepcopy(encoder_layer))

	def forward(self, x, mask):
		out = x
		for layer in self.layers:
			out = layer(out, mask)
		return out

In [None]:
class EncoderLayer(nn.Module):

	def __init__(self, multi_head_attention_layer, position_wise_feed_forward_layer, norm_layer):
		super(EncoderLayer, self).__init__()
		self.multi_head_attention_layer = multi_head_attention_layer
		self.position_wise_feed_forward_layer = position_wise_feed_forward_layer
		self.residual_connection_layers = [ResidualConnectionLayer(copy.deepcopy(norm_layer)) for i in range(2)]

	def forward(self, x, mask):
		out = self.residual_connection_layers[0](x, lambda x: self.multi_head_attention_layer(x, x, x, mask))
		out = self.residual_connection_layers[1](x, lambda x: self.position_wise_feed_forward_layer(x))
		return out

In [None]:
def calculate_attention(self, query, key, value, mask):
	# query, key, value's shape: (n_batch, seq_len, d_k)
	d_k = key.size(-1) # get d_k
	attention_score = torch.matmul(query, key.transpose(-2, -1)) # Q x K^T, attention_score's shape: (n_batch, seq_len, seq_len)
	attention_score = attention_score / math.sqrt(d_k) # scaling
	if mask is not None:
		attention_score = score.masked_fill(mask==0, -1e9) # masking
	attention_prob = F.softmax(score, dim=-1) # softmax, attention_prob's shape: (n_batch, seq_len, seq_len)
	out = torch.matmul(attention_prob, value) # Attention_Prob x V, out's shape: (n_batch, seq_len, d_k)
	return out

In [None]:
class MultiHeadAttentionLayer(nn.Module):
  def __init__(self, d_model, h, qkv_fc_layer, fc_layer):
		# qkv_fc_layer's shape: (d_embed, d_model)
		# fc_layer's shape: (d_model, d_embed)
    super(MultiHeadAttentionLayer, self).__init__()
    self.d_model = d_model
    self.h = h
    self.query_fc_layer = copy.deepcopy(qkv_fc_layer)
    self.key_fc_layer = copy.deepcopy(qkv_fc_layer)
    self.value_fc_layer = copy.deepcopy(qkv_fc_layer)
    self.fc_layer = fc_layer

  def forward(self, query, key, value, mask=None):
		# query, key, value's shape: (n_batch, seq_len, d_embed)
		# mask's shape: (n_batch, seq_len, seq_len)
    n_batch = query.shape[0] # get n_batch

    def transform(x, fc_layer): # reshape (n_batch, seq_len, d_embed) to (n_batch, h, seq_len, d_k)
      out = fc_layer(x) # out's shape: (n_batch, seq_len, d_model)
      out = out.view(n_batch, -1, self.h, self.d_model//self.h) # out's shape: (n_batch, seq_len, h, d_k)
      out = out.transpose(1, 2) # out's shape: (n_batch, h, seq_len, d_k)
      return out

    query = transform(query, self.query_fc_layer) # query, key, value's shape: (n_batch, h, seq_len ,d_k)
    key = transform(key, self.key_fc_layer)
    value = transform(value, self.value_fc_layer)

    if mask is not None:
      mask = mask.unsqueeze(1) # mask's shape: (n_batch, 1, seq_len, seq_len)

    out = self.calculate_attention(query, key, value, mask) # out's shape: (n_batch, h, seq_len, d_k)
    out = out.transpose(1, 2) # out's shape: (n_batch, seq_len, h, d_k)
    out = contiguous().view(n_batch, -1, self.d_model) # out's shape: (n_batch, seq_len, d_model)
    out = self.fc_layer(out) # out's shape: (n_batch, seq_len, d_embed)
    return out

In [None]:
def calculate_attention(self, query, key, value, mask):
	# query, key, value's shape: (n_batch, seq_len, d_k)
	d_k = key.size(-1) # get d_k
	attention_score = torch.matmul(query, key.transpose(-2, -1)) # Q x K^T, attention_score's shape: (n_batch, seq_len, seq_len)
	attention_score = attention_score / math.sqrt(d_k) # scaling
	if mask is not None:
		attention_score = score.masked_fill(mask==0, -1e9) # masking
	attention_prob = F.softmax(score, dim=-1) # softmax, attention_prob's shape: (n_batch, seq_len, seq_len)
	out = torch.matmul(attention_prob, value) # Attention_Prob x V, out's shape: (n_batch, seq_len, d_k)
	return out

In [None]:
class PositionWiseFeedForwardLayer(nn.Module):
	def __init__(self, first_fc_layer, second_fc_layer):
		self.first_fc_layer = first_fc_layer
		self.second_fc_layer = second_fc_layer

	def forward(self, x):
		out = self.first_fc_layer(x)
		out = F.relu(out)
		out = self.dropout(out)
		out = self.second_fc_layer(out)
		return out

In [None]:
class ResidualConnectionLayer(nn.Module):
	def __init__(self, norm_layer):
		super(ResidualConnectionLayer, self).__init__()
		self.norm_layer = norm_layer

	def forward(self, x, sub_layer):
		out = sub_layer(x) + x
		out = self.norm_layer(out)
		return out

In [None]:
def subsequent_mask(size):
	atten_shape = (1, size, size)
	mask = np.triu(np.ones(atatn_shape), k=1).astype('uint8') # masking with upper triangle matrix
	return torch.from_numpy(mask)==0 # reverse (masking=False, non-masking=True)

def make_std_mask(tgt, pad):
	tgt_mask = (tgt != pad) # pad masking
	tgt_mask = tgt_mask.unsqueeze(-2) # reshape (n_batch, seq_len) -> (n_batch, 1, seq_len)
	tgt_mask = tgt_mask & Variable(subsequent_mask(tgt.size(-1)).type_as(tgt_mask.data)) # pad_masking & subsequent_masking
	return tgt_mask

In [None]:
class Decoder(nn.Module):
	def __init__(self, sub_layer, n_layer):
		super(Decoder, self).__init__()
		self.layers = []
		for i in range(n_layer):
			self.layers.append(copy.deepcopy(sub_layer))

	def forward(self, x, mask, encoder_output, encoder_mask):
		out = x
		for layer in self.layers:
			out = layer(out, mask, encoder_output, encoder_mask)
		return out

In [None]:
class DecoderLayer(nn.Module):
	def __init__(self, masked_multi_head_attention_layer, multi_head_attention_layer, position_wise_feed_forward_layer, norm_layer):
		super(DecoderLayer, self).__init__()
		self.masked_multi_head_attention_layer = ResidualConnectionLayer(masked_multi_head_attention_layer, copy.deepcopy(norm_layer))
		self.multi_head_attention_layer = ResidualConnectionLayer(multi_head_attention_layer, copy.deepcopy(norm_layer))
		self.position_wise_feed_forward_layer = ResidualConnectionLayer(position_wise_feed_forward_layer, copy.deepcopy(norm_layer))

	def forward(self, x, mask, encoder_output, encoder_mask):
		out = self.masked_multi_head_attention_layer(query=x, key=x, value=x, mask=mask)
		out = self.multi_head_attention_layer(query=out, key=encoder_output, value=encoder_output, mask=encoder_mask)
		out = self.position_wise_feed_forward_layer(x=out)
		return out

In [None]:
class TransformerEmbedding(nn.Module):
	def __init__(self, embedding, positional_encoding):
		super(TransformerEmbedding, self).__init__()
		self.embedding = nn.Sequential(embedding, positional_encoding)

	def forward(self, x):
		out = self.embedding(x)
		return out

In [None]:
class Embedding(nn.Module):
	def __init__(self, d_embed, vocab):
		super(Embedding, self).__init__()
		self.embedding = nn.Embedding(len(vocab), d_embed)
		self.vocab = vocab
		self.d_embed = d_embed

	def forward(self, x):
		out = self.embedding(x) * math.sqrt(self.d_embed)
		return out

In [None]:
class PositionalEncoding(nn.Module):
	def __init__(self, d_embed, max_seq_len=5000):
		super(PositionalEncoding, self).__init__()
		encoding = torch.zeros(max_seq_len, d_embed)
		position = torch.arange(0, max_seq_len).unsqueeze(1)
		div_term = torch.exp(torch.arange(0, d_embed, 2) * -(math.log(10000.0) / d_embed))
		encoding[:, 0::2] = torch.sin(position * div_term)
		encoding[:, 1::2] = torch.cos(position * div_term)
		self.encoding = encoding

	def forward(self, x):
		out = x + Variable(self.encoding[:, :x.size(1)], requires_grad=False)
		out = self.dropout(out)
		return out