<a href="https://colab.research.google.com/github/elainedias16/TCC/blob/main/Next_step_prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [41]:
pip install transformers torch



## Masked Self-Attention

Scale dot produt attetion:
https://paperswithcode.com/method/scaled

In [42]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F

class Head(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.query = nn.Linear(config.d_model, config.head_dim, bias=config.bias)
    self.key = nn.Linear(config.d_model, config.head_dim, bias=config.bias)
    self.value = nn.Linear(config.d_model, config.head_dim, bias=config.bias)


  def forward(self, x):
    q = self.query(x)
    k = self.key(x)
    v = self.value(x)
    return q, k, v



class MaskedSelfAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.num_heads = config.num_heads
        self.head_dim = config.head_dim
        self.dropout = nn.Dropout(config.dropout)
        self.d_model = config.d_model
        self.heads = nn.ModuleList([Head(config) for _ in range(config.num_heads)])
        self.output_linear = nn.Linear(config.d_model, config.d_model)
        assert self.head_dim * self.num_heads == self.d_model, "d_model must be divisible by num_heads"


    def forward(self, x, mask=None):
        B, T, C = x.size()

        heads_output = []
        for head in self.heads:
            k, q, v = head(x)


            # Scaled dot-product attention
            scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim)

            if mask is not None:
                scores = scores.masked_fill(mask == 0, float('-inf'))

            attn_weights = F.softmax(scores, dim=-1)
            attn_weights = self.dropout(attn_weights)

            head_output = torch.matmul(attn_weights, v)
            heads_output.append(head_output)

        # Concatenate all heads' output
        concatenated_output = torch.cat(heads_output, dim=-1) # (B, T, d_model)
        output = self.output_linear(concatenated_output) # Final linear layer

        return output




## Feed Forward Nerual Network

output = input * W + **bias**

In [43]:
class FeedFoward(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.linear1 = nn.Linear(config.d_model, 4 * config.d_model, bias=config.bias)
    self.activation = nn.ReLU()
    self.linear2 = nn.Linear(config.d_model * 4,  config.d_model, bias=config.bias)
    self.dropout = nn.Dropout(config.dropout)


  def forward(self, x):
    x = self.linear1(x)
    x = self.activation(x)
    x = self.linear2(x)
    x = self.dropout(x)
    return x



## Layer Norm

In [44]:
class LayerNorm(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.norm = nn.LayerNorm(config.d_model, config.bias)

  def forward(self, x):
    self.norm(x)
    return x

## One Decoder

In [45]:
class Decoder(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.ln_1 = LayerNorm(config)
    self.masked_self_attention = MaskedSelfAttention(config)
    self.ln_2 = LayerNorm(config)
    self.feed_forward = FeedFoward(config)

  # def forward(self, x, mask):
  #   x = self.ln_1(x)
  #   x = x + self.masked_self_attention(x, mask)
  #   x = self.ln_2(x)
  #   x = x + self.feed_forward(x)
  #   return x
  def forward(self, x):
    x = self.ln_1(x)
    x = x + self.masked_self_attention(x)
    x = self.ln_2(x)
    x = x + self.feed_forward(x)
    return x



## Config

In [46]:
import torch

class Config:
    num_heads = 2
    d_model = 8 #os vetores de entrada e saída terão dimensão 8
    head_dim = 4 #cada cabeça tem dimensão 4
    dropout = 0.1  #para evitar overfiting
    bias = True
    vocab_size = 30522
    # hidden_size = 1024
    max_length = 512
    n_layer = 6
    block_size = 1024
    # hidden_size =  model.config.hidden_size,

config = Config()

In [47]:
#Paramters:

device = 'cuda' if torch.cuda.is_available() else 'cpu'
max_new_tokens = 5

print(f"Device is {device}")
print(f"Max new tokens is {max_new_tokens}")

Device is cpu
Max new tokens is 5


## Transformer

In [48]:
class Transformer(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.config = config
    self.word_token_embedding = nn.Embedding(config.vocab_size, config.d_model)
    self.position_embedding = nn.Embedding(config.block_size, config.d_model)
    self.dropout = nn.Dropout(config.dropout)
    self.blocks = nn.Sequential(*[Decoder(config) for _ in range(config.n_layer)])
    self.ln = LayerNorm(config)
    self.lm_head = nn.Linear(config.d_model, config.vocab_size, bias=False)



  def forward(self, input_ids):
    device = input_ids.device
    B, T = input_ids.size()
    # mask = torch.ones(b, t, t, device=device)

    # Positional e token embed
    tok_emb = self.word_token_embedding(input_ids)
    pos_emb = self.position_embedding(torch.arange(T, device=device))
    x = self.dropout(tok_emb + pos_emb)
    # Transformer blocks
    x = self.blocks(x)
    # Norm layer
    x = self.ln(x)
    # Final layer
    logits = self.lm_head(x)
    return logits, None #fazer a loss dps


   #From hugging Face
  def generate(self, idx, max_new_tokens):
    # idx is (B, T) array of indices in the current context
    for _ in range(max_new_tokens):
      # crop idx to the last block_size tokens
      idx_cond = idx[:, -config.block_size:]
      # get the predictions
      logits, loss = self(idx_cond)
      # focus only on the last time step
      logits = logits[:, -1, :] # becomes (B, C)
      # apply softmax to get probabilities
      probs = F.softmax(logits, dim=-1) # (B, C)

      # sample from the distribution
      idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
      # append sampled index to the running sequence
      idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
    return idx


  @staticmethod
  def print_output(model, tokenizer, idx, max_new_tokens):
    generated_tokens = model.generate(idx, max_new_tokens)
    print('Input tokens:')
    print(idx)
    print('-----------------')
    print('Output generated_tokens:')
    print(generated_tokens)
    print('-----------------')
    # Converte os índices dos tokens gerados de volta para texto
    print('Output genrated_text:')
    generated_text = tokenizer.decode(generated_tokens[0].tolist())

    # Print apenas os tokens gerados como texto
    print(generated_text)



## Run model

In [50]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import GPT2Tokenizer


# Inicialização da configuração e do modelo
config = Config()
model = Transformer(config)

# Carregar o tokenizer
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')

# Tokenizar a entrada
prompt = "I like chocolate and"
input_ids = tokenizer(prompt, return_tensors='pt')['input_ids']

print("Inputs ids")
print(input_ids)


# Garantir que os IDs dos tokens estejam dentro do vocabulário do modelo
unknown_token_id = tokenizer.unk_token_id
input_ids[input_ids >= config.vocab_size] = unknown_token_id

# Mover o modelo para o dispositivo
model = model.to(device)

# Mover input_ids para o dispositivo
input_ids = input_ids.to(device)
Transformer.print_output(model, tokenizer, input_ids, max_new_tokens)


Inputs ids
tensor([[   40,   588, 11311,   290]])
Input tokens:
tensor([[   40,   588, 11311,   290]])
-----------------
Output generated_tokens:
tensor([[   40,   588, 11311,   290,  1876, 21555, 17622, 13971,  9677]])
-----------------
Output genrated_text:
I like chocolate and invol Bog inhabitants viable FA
