In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
class MultiHeadSelfAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super(MultiHeadSelfAttention, self).__init__()
        self.num_heads = num_heads
        self.embed_dim = embed_dim

        assert embed_dim % num_heads == 0, "embed_dim must be divisible by num_heads"
        self.depth = embed_dim // num_heads

        self.wq = nn.Linear(embed_dim, embed_dim)
        self.wk = nn.Linear(embed_dim, embed_dim)
        self.wv = nn.Linear(embed_dim, embed_dim)
        self.dense = nn.Linear(embed_dim, embed_dim)
        self.dropout = nn.Dropout(0.1)

    def split_heads(self, x, batch_size):
        x = x.view(batch_size, -1, self.num_heads, self.depth)
        return x.permute(0, 2, 1, 3)

    def forward(self, v, k, q, mask=None):
        batch_size = q.size(0)

        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        matmul_qk = torch.matmul(q, k.transpose(-2, -1))
        depth = k.size(-1)
        logits = matmul_qk / torch.sqrt(torch.tensor(depth, dtype=torch.float32))

        if mask is not None:
            logits += (mask * -1e9)

        attention_weights = F.softmax(logits, dim=-1)
        output = torch.matmul(attention_weights, v)
        output = output.permute(0, 2, 1, 3).contiguous()
        output = output.view(batch_size, -1, self.embed_dim)

        output = self.dropout(output)
        return self.dense(output)

class TransformerDoubleEncoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, d_ff, input_vocab_size, output_vocab_size, max_seq_length):
        super(TransformerDoubleEncoder, self).__init__()

        self.input_embedding = nn.Embedding(input_vocab_size, d_model)
        self.attn = MultiHeadSelfAttention(num_layers, num_heads)
        self.output_embedding = nn.Embedding(output_vocab_size, d_model)
        self.attn2 = MultiHeadSelfAttention(d_model, num_heads)
        self.positional_encoding = nn.Parameter(torch.zeros(1, max_seq_length, d_model))
        
        self.encoder_layers = nn.ModuleList([
            nn.TransformerEncoderLayer(d_model, num_heads, d_ff) for _ in range(num_layers)
        
        ])
        self.decoder_layers = nn.ModuleList([
            nn.TransformerDecoderLayer(d_model, num_heads, d_ff) for _ in range(num_layers)
        ])
        
        self.linear = nn.Linear(d_model, output_vocab_size)
        self.softmax = nn.Softmax(dim=-1)
        
    def forward(self, src, tgt):
        src = self.input_embedding(src) + self.positional_encoding[:, :src.size(1), :]
        atg = self.attn(self.input_embedding(src)+self.positional_encoding[:, :src.size(1), :])
        tgt = self.output_embedding(tgt) + self.positional_encoding[:, :tgt.size(1), :]
        
        for layer in self.encoder_layers:
            src = layer(src)
        
        for layer in self.encoder_layers:
            atg = layer(atg)
        
        for layer in self.decoder_layers:
            tgt = layer(tgt, src)
        
        output = self.linear(tgt)
        return self.softmax(output)

# Parámetros del modelo
num_layers = 64  # Número de capas
d_model = 2048  # Dimensionalidad de los embeddings
num_heads = 64  # Número de cabezas de atención
d_ff = 8192  # Dimensionalidad de la capa feed-forward
input_vocab_size = 600000  # Tamaño del vocabulario de entrada
output_vocab_size = 600000  # Tamaño del vocabulario de salida
max_seq_length = 256  # Longitud máxima de la secuencia

# Crear el modelo
model = TransformerDoubleEncoder(num_layers, d_model, num_heads, d_ff, input_vocab_size, output_vocab_size, max_seq_length)

# Contar el número de parámetros
num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Number of parameters: {num_params}")

In [17]:
from torch import optim
import torch.nn as nn
import numpy as np
import torch
import torch.nn.functional as F

def ids_from_chars(text):
    vocab = list(set(''.join([char for sentence in corpus for char in sentence])))
    vocab.sort()
    vocab_to_id = {char: idx for idx, char in enumerate(vocab)}
    return [vocab_to_id[char] for char in text if char in vocab_to_id]

def text_from_ids(ids):
    vocab = list(set(''.join([char for sentence in corpus for char in sentence])))
    vocab.sort()
    id_to_vocab = {idx: char for idx, char in enumerate(vocab)}
    return ''.join([id_to_vocab[idx] for idx in ids if idx in id_to_vocab])

corpus = ['User: this is a test prompt.', 'Model: Oh, okay!']
# Define the train function
optimizer = optim.Adam(model.parameters(), lr=0.001)

def train(corpus, epochs):
    for epoch in range(epochs):
        for input_example, target_example in dataset:
            input_example = torch.tensor(input_example).long()
            target_example = torch.tensor(target_example).long()
            optimizer.zero_grad()
            if len(input_example.shape) == 1:
                input_example = input_example.unsqueeze(0)
            if len(target_example.shape) == 1:
                target_example = target_example.unsqueeze(0)
            predictions = model(input_example, target_example)
            loss = criterion(predictions, target_example)
            loss.backward()
            optimizer.step()
        print(f'Epoch {epoch+1}, Loss: {loss.item()}')
# Define the chat function
def chat(input_text):
    input_ids = ids_from_chars(np.array([input_text]))
    input_ids = torch.tensor(input_ids).unsqueeze(0)
    predictions = model(input_ids)
    sampled_indices = torch.multinomial(torch.softmax(predictions, dim=-1), num_samples=1)
    sampled_indices = sampled_indices.squeeze(-1).numpy()
    output_text = text_from_ids(sampled_indices)
    return output_text

# Example usage
corpus = ['User: this is a test prompt.', 'Model: Oh, okay!']
vocab_size = len(set(corpus))
embedding_dim = 256
rnn_units = 1024
seq_length = 100


# Create a dataset from the corpus
dataset = [(ids_from_chars(np.array([input_text])), ids_from_chars(np.array([target_text]))) for input_text, target_text in zip(corpus, corpus[1:]+[''])]

train(corpus, epochs=20)

print(chat('User: this is a test prompt.'))

# Example usage

vocab_size = len(set(corpus))
embedding_dim = 256
rnn_units = 1024
seq_length = 100

train(corpus, epochs=20)

RuntimeError: Expected tensor for argument #1 'indices' to have one of the following scalar types: Long, Int; but got torch.FloatTensor instead (while checking arguments for embedding)

In [3]:
from torchinfo import summary

In [4]:
summary(model)

Layer (type:depth-idx)                                       Param #
TransformerDoubleEncoder                                     524,288
├─Embedding: 1-1                                             1,228,800,000
├─MultiHeadSelfAttention: 1-2                                --
│    └─Linear: 2-1                                           4,160
│    └─Linear: 2-2                                           4,160
│    └─Linear: 2-3                                           4,160
│    └─Linear: 2-4                                           4,160
│    └─Dropout: 2-5                                          --
├─Embedding: 1-3                                             1,228,800,000
├─MultiHeadSelfAttention: 1-4                                --
│    └─Linear: 2-6                                           4,196,352
│    └─Linear: 2-7                                           4,196,352
│    └─Linear: 2-8                                           4,196,352
│    └─Linear: 2-9                     