In [None]:
!pip install transformers

In [None]:
import torch.nn.functional as F
from torch import nn
from math import sqrt
import torch

In [None]:
# self-attention: query, keys e values da mesma sentença
# cross-attention: query: decoder / keys e values do encoder
def scaled_dot_product_attention(query, key, value, mask=None):

	# query.shape: (batch, seq_len, embed_dim)
	dim_k = query.size(-1)

	scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k)
	if mask is not None:
		scores = scores.masked_fill(mask==0, float("-inf"))
	weights = F.softmax(scores, dim=-1)
	#return weights.bmm(value)
	return torch.bmm(weights, value)

class MaskedAttentionHead(nn.Module):
	def __init__(self, embed_dim, head_dim):
		super().__init__()
		# input [batch, seq_len, embed_dim]
		# output [batch, seq_len, head_dim]
		self.q = nn.Linear(embed_dim, head_dim)
		self.k = nn.Linear(embed_dim, head_dim)
		self.v = nn.Linear(embed_dim, head_dim)

	def forward(self, hidden_state):
		seq_len = hidden_state.size(1)
		mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0)

		attn_outputs = scaled_dot_product_attention(
			self.q(hidden_state), self.k(hidden_state), self.v(hidden_state),
			mask=mask)
		return attn_outputs

class AttentionHead(nn.Module):
	def __init__(self, embed_dim, head_dim):
		super().__init__()
		# input [batch, seq_len, embed_dim]
		# output [batch, seq_len, head_dim]
		self.q = nn.Linear(embed_dim, head_dim)
		self.k = nn.Linear(embed_dim, head_dim)
		self.v = nn.Linear(embed_dim, head_dim)

	def forward(self, hidden_state):
		attn_outputs = scaled_dot_product_attention(
			self.q(hidden_state), self.k(hidden_state), self.v(hidden_state))
		return attn_outputs

class MultiHeadAttention(nn.Module):
	#Config tem os parametros baixados do model_ckpt
	def __init__(self, config):
		super().__init__()
		embed_dim = config.hidden_size
		num_heads = config.num_attention_heads
		head_dim = embed_dim // num_heads

		#lista de cabeças ...
		self.heads = nn.ModuleList(
			#intera todas as cabeças de atenção
			[AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
		)
		self.output_linear = nn.Linear(embed_dim, embed_dim)

	def forward(self, hidden_state):
		#passo o hidden_state por todas as cabeças de atenção
		#concatena todos os resultados das cabeças de atenção
		x = torch.cat([h(hidden_state) for h in self.heads], dim=-1)
		x = self.output_linear(x)

		return x

class FeedForward(nn.Module):
	def __init__(self, config):
		super().__init__()
		# "hidden_size": 768
		# "intermediate_size": 3072
		# "hidden_dropout_prob": 0.1
		#primeira camada
		self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
		#segunda camada
		self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
		self.gelu = nn.GELU()
		self.dropout = nn.Dropout(config.hidden_dropout_prob)

	def forward(self, x):

		# x.shape: (batch_size, seq_len, hidden_dim)
		x = self.linear_1(x)
		# x.shape: (batch_size, seq_len, intermediate_size)
		x = self.gelu(x)
		# x.shape: (batch_size, seq_len, intermediate_size)
		x = self.linear_2(x)
		# x.shape: (batch_size, seq_len, hidden_dim)
		x = self.dropout(x)

		return x

class TransformerEncoderLayer(nn.Module):
	def __init__(self, config):
		super().__init__()
		# "hidden_size": 768
		self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
		self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
		self.attention = MultiHeadAttention(config)
		self.feed_forward = FeedForward(config)

	def forward(self, x):

		x = x + self.attention(self.layer_norm_1(x))
		x = x + self.feed_forward(self.layer_norm_2(x))

		return x

class Embeddings(nn.Module):
	def __init__(self, config):
		super().__init__()
		# "vocab_size": 30522
		# "hidden_size": 768,
		# "max_position_embeddings": 512,
		self.token_embeddings = nn.Embedding(config.vocab_size, config.hidden_size)
		self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)

		self.layer_norm = nn.LayerNorm(config.hidden_size)
		self.dropout = nn.Dropout() # default p=0.5

	def forward(self, input_ids):
		seq_length = input_ids.size(1) # (batch, length)
		position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)  # torch.Size([1, seq_lenght])

		token_embeddings = self.token_embeddings(input_ids) # (batch, length, embed_dim)
		position_embeddings = self.position_embeddings(position_ids) # (1, length, embed_dim)
		#incorporando a posição
		embeddings = token_embeddings + position_embeddings
		embeddings = self.layer_norm(embeddings)
		embeddings = self.dropout(embeddings)
		return embeddings

class TransformerEncoder(nn.Module):
	def __init__(self, config):
		super().__init__()
		self.embeddings = Embeddings(config)
		#implementa os 12 blocos do encoder
		self.layers = nn.ModuleList([
			TransformerEncoderLayer(config) for _ in range(config.num_hidden_layers)
		])

	def forward(self, x):
		x = self.embeddings(x)
		#processa cada layer
		for layer in self.layers:
			x = layer(x)
		return x

#Tarefas

In [None]:
# Atenção cruzada
class CrossAttentionHead(nn.Module):
  def __init__(self, embed_dim, head_dim):
    super().__init__()
		# d_hidden_state: hidden states do bloco anterior do decoder (query);
    # e_hidden_state: hidden states da saída do encoder (keys e values).
    self.q = nn.Linear(embed_dim, head_dim)
    self.k = nn.Linear(embed_dim, head_dim)
    self.v = nn.Linear(embed_dim, head_dim)
  def forward(self, e_hidden_state, d_hidden_state):
    #layer_norm_1 = nn.LayerNorm(embed_dim)
    #e_hidden_state = TransformerEncoder(config)
    #d_hidden_state = MultiHeadMaskedAttention(layer_norm_1(Embeddings(x)))
    attn_outputs = scaled_dot_product_attention(self.q(d_hidden_state),
                                               self.k(e_hidden_state),
                                               self.v(e_hidden_state))
    return attn_outputs

# Atenção cruzada com múltiplas cabeças
class MultiHeadCrossAttention(nn.Module):
	#Config tem os parametros baixados do model_ckpt
	def __init__(self, config):
		super().__init__()
		embed_dim = config.hidden_size
		num_heads = config.num_attention_heads
		head_dim = embed_dim // num_heads

		#lista de cabeças ...
		self.heads = nn.ModuleList(
			#intera todas as cabeças de atenção
			[CrossAttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
		)
		self.output_linear = nn.Linear(embed_dim, embed_dim)

	def forward(self, hidden_state):
		#passo o hidden_state por todas as cabeças de atenção
		#concatena todos os resultados das cabeças de atenção
		x = torch.cat([h(hidden_state) for h in self.heads], dim=-1)
		x = self.output_linear(x)

		return x

# Atenção com máscara com múltiplas cabeças
class MultiHeadMaskedAttention(nn.Module):
	#Config tem os parametros baixados do model_ckpt
	def __init__(self, config):
		super().__init__()
		embed_dim = config.hidden_size
		num_heads = config.num_attention_heads
		head_dim = embed_dim // num_heads

		#lista de cabeças ...
		self.heads = nn.ModuleList(
			#intera todas as cabeças de atenção
			[MaskedAttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
		)
		self.output_linear = nn.Linear(embed_dim, embed_dim)

	def forward(self, hidden_state):
		x = torch.cat([h(hidden_state) for h in self.heads], dim=-1)
		x = self.output_linear(x)

		return x

# Bloco do decoder do modelo Transformer
class TransformerDecoderLayer(nn.Module):
	def __init__(self, config):
		super().__init__()
		self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
		self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
		self.layer_norm_3 = nn.LayerNorm(config.hidden_size)

		self.masked_attention = MultiHeadMaskedAttention(config)
		self.cross_attention = MultiHeadCrossAttention(config)
		self.feed_forward = FeedForward(config)

	def forward(self, x):
		x2 = self.layer_norm_1(x)
		x = x + self.masked_attention(x2)
		x2 = self.layer_norm_2(x)
		x = x + self.cross_attention(x2)
		x2 = self.layer_norm_3(x)
		x = x + self.feed_forward(x2)

		return x

# Decoder do modelo Transformer, formado por múltiplas camadas da classe TransformerDecoderLayer
class TransformerDecoder(nn.Module):
	def __init__(self, config):
		super().__init__()
		self.embeddings = Embeddings(config)
		#implementa os blocos do Dencoder
		self.layers = nn.ModuleList([
			TransformerDecoderLayer(config) for _ in range(config.num_hidden_layers)
		])

	def forward(self, x):
		x = self.embeddings(x)
		#processa cada layer
		for layer in self.layers:
			x = layer(x)
		return x

# Transformer
class Transformer(nn.Module):

	def __init__(self, config):
		super().__init__()
		self.encoder = TransformerEncoder(config)
		self.decoder = TransformerDecoder(config)

	#def forward(self, src, trg, src_mask, trg_mask):
	def forward(self, x, target):
		#x: sequencia de entrada
		#target: sequencia alvo
		encoder_hidden_state = self.encoder(x)
		decoder_hidden_state = self.decoder(x)

		return decoder_hidden_state

#Testando ...

In [None]:
# hunggingface
from transformers import AutoConfig
from transformers import AutoTokenizer

model_ckpt = "bert-base-uncased"
#baixando os parametros do model_ckpt
config = AutoConfig.from_pretrained(model_ckpt)
print(config)
embed_dim = config.hidden_size
num_heads = config.num_attention_heads
head_dim = embed_dim // num_heads
print("embed_dim:", embed_dim)
print("head_dim:", head_dim)

#tokenizador do model_ckpt
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)

text = "testing my favorite machine learning architecture"

#"pt" = tensor do Pytorch
#retorna o id dos embededs
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=True)
print('texto: ', text)
print('inputs.input_ids: ', inputs.input_ids)
print('inputs.input_ids.shape: ',inputs.input_ids.shape)

Downloading (…)lve/main/config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

BertConfig {
  "_name_or_path": "bert-base-uncased",
  "architectures": [
    "BertForMaskedLM"
  ],
  "attention_probs_dropout_prob": 0.1,
  "classifier_dropout": null,
  "gradient_checkpointing": false,
  "hidden_act": "gelu",
  "hidden_dropout_prob": 0.1,
  "hidden_size": 768,
  "initializer_range": 0.02,
  "intermediate_size": 3072,
  "layer_norm_eps": 1e-12,
  "max_position_embeddings": 512,
  "model_type": "bert",
  "num_attention_heads": 12,
  "num_hidden_layers": 12,
  "pad_token_id": 0,
  "position_embedding_type": "absolute",
  "transformers_version": "4.33.2",
  "type_vocab_size": 2,
  "use_cache": true,
  "vocab_size": 30522
}

embed_dim: 768
head_dim: 64


Downloading (…)okenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)/main/tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

texto:  testing my favorite machine learning architecture
inputs.input_ids:  tensor([[ 101, 5604, 2026, 5440, 3698, 4083, 4294,  102]])
inputs.input_ids.shape:  torch.Size([1, 8])


In [None]:
encoder = TransformerEncoder(config)
out = encoder(inputs.input_ids)
print(out.shape)

torch.Size([1, 8, 768])


In [None]:
cross_attention = CrossAttentionHead(embed_dim,num_heads)
#cross_attention()

layer_norm_1 = nn.LayerNorm(embed_dim)
e_hidden_state = TransformerEncoder(config)
d_hidden_state = MultiHeadMaskedAttention(layer_norm_1(Embeddings(inputs.input_ids)))

AttributeError: ignored