# Suma decimal LSTM seq2seq

Los modelos **Seq2Seq** corresponden a arquitecturas de red ideadas para convertir secuencias de un dominio (por ejemplo, oraciones en inglés) a secuencias en otro dominio (por ejemplo, las mismas oraciones traducidas al español).


Esto se puede usar para la traducción automática o para sistemas de pregunta/respuesta (generar una respuesta en lenguaje natural dada una pregunta en lenguaje natural); en general, es aplicable en cualquier situación en que se necesite generar texto.

<img src="images/seq2seq.png"/>

Hay varias formas de abordar estos problemas, utilizando RNN o utilizando convnets 1D.

Los casos más fáciles se dan cuando las secuencias de entrada y salida tienen la misma longitud. La suma de dígitos en decimal cumplen esta condición. Vamos a crear una red en Keras para abordar este problema en concreto.

## Suma decimal

La entrada a la red será una secuencia de dígitos decimales y en medio el sigo $+$. Cada dígito será como un caracter de una frase. El resultado será una secuencia de caracteres que corresponderá con la suma de los sumandos anteriores.

<img src="images/addition-rnn.png">

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import string

# Definir los caracteres permitidos y la longitud máxima de la cadena
allowed_chars = string.digits + '+ '
len_i_o = 5  # Longitud máxima de la cadena (por ejemplo, "143+13")

# Función para codificar una cadena de caracteres en un tensor one-hot
def string_to_tensor(s):
    tensor = torch.zeros(len_i_o, len(allowed_chars))
    for i, char in enumerate(s):
        tensor[i, allowed_chars.index(char)] = 1
    return tensor

# Función para decodificar un tensor one-hot en una cadena de caracteres
def tensor_to_string(tensor):
    _, max_idx = tensor.max(1)
    return ''.join([allowed_chars[i] for i in max_idx])

# Definir la arquitectura del modelo seq2seq
class Seq2Seq(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Seq2Seq, self).__init__()
        self.hidden_size = hidden_size
        self.encoder = nn.LSTM(input_size, hidden_size)
        self.decoder = nn.LSTM(hidden_size, hidden_size)
        self.output = nn.Linear(hidden_size, output_size)

    def forward(self, input):
        input_len = input.size(0)
        encoder_hidden = (torch.zeros(1, 1, self.hidden_size), torch.zeros(1, 1, self.hidden_size))
        
        for i in range(input_len):
            _, encoder_hidden = self.encoder(input[i].view(1, 1, -1), encoder_hidden)
        
        decoder_input = torch.tensor([[allowed_chars.index(' ')]])
        decoder_hidden = encoder_hidden

        output_string = []
        for _ in range(len_i_o):
            decoder_output, decoder_hidden = self.decoder(decoder_input.view(1, 1, -1), decoder_hidden)
            output = self.output(decoder_output.view(1, -1))
            _, topi = output.topk(1)
            decoder_input = topi.squeeze().detach()
            output_string.append(allowed_chars[decoder_input.item()])
            if allowed_chars[decoder_input.item()] == ' ':
                break

        return ''.join(output_string)

# Definir hiperparámetros
input_size = len(allowed_chars)
hidden_size = 64
output_size = len(allowed_chars)

# Crear el modelo
model = Seq2Seq(input_size, hidden_size, output_size)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# Entrenar el modelo
n_epochs = 10000
for epoch in range(1, n_epochs + 1):
    input_str = '143+13'
    target_str = '156'
    input_tensor = string_to_tensor(input_str)
    target_tensor = string_to_tensor(target_str)

    optimizer.zero_grad()
    output_str = model(input_tensor)
    loss = criterion(output_str.view(-1, len(allowed_chars)), target_tensor.view(-1, len(allowed_chars)))
    loss.backward()
    optimizer.step()

    if epoch % 1000 == 0:
        print(f'Epoch {epoch}/{n_epochs}, Loss: {loss.item():.4f}, Output: {output_str}')

# Evaluar el modelo
input_str = '143+13'
input_tensor = string_to_tensor(input_str)
output_str = model(input_tensor)
print(f'Input: {input_str}, Output: {output_str}')


El generador de datos es correcto.

In [117]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import string

# Clase para generar el conjunto de datos
class DatasetGenerator:

    def __init__(self):
        self.len_input = 7
        self.len_output = 4
        self.allowed_chars = string.digits + '+ '
        
    def string_to_tensor(self, s):
        tensor = torch.zeros(len(s), len(self.allowed_chars))
        for i, char in enumerate(s):
            tensor[i, self.allowed_chars.index(char)] = 1
        return tensor

    def generate_samples(self, num_samples):
        samples = []
        for _ in range(num_samples):
            num1 = random.randint(1, 999)
            num2 = random.randint(1, 999)
            result = num1 + num2
            input_str = str(num1) + '+' + str(num2)
            input_str = input_str.rjust(self.len_input)
            output_str = str(result).rjust(self.len_output)
            samples.append((input_str, output_str))

        x = [self.string_to_tensor(sample[0]) for sample in samples]
        y = [self.string_to_tensor(sample[1]) for sample in samples]
        return torch.stack(x), torch.stack(y)


In [None]:

# Definición del modelo seq2seq
class Seq2SeqModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Seq2SeqModel, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.encoder = nn.LSTM(hidden_size, hidden_size)
        self.decoder = nn.LSTM(hidden_size, hidden_size)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, input_seq, target_seq):
        input_length = input_seq.size(0)
        target_length = target_seq.size(0)

        encoder_outputs, (encoder_hidden, encoder_cell) = self.encoder(self.embedding(input_seq))
        
        decoder_hidden = encoder_hidden
        decoder_cell = encoder_cell
        
        decoder_outputs = torch.zeros(target_length, 1, self.hidden_size)
        
        for t in range(target_length):
            decoder_output, (decoder_hidden, decoder_cell) = self.decoder(decoder_outputs[t], (decoder_hidden, decoder_cell))
            output = self.fc(decoder_output)
            decoder_outputs[t] = output
        
        return decoder_outputs

# Función para preparar secuencias de entrada y objetivo
def prepare_sequence(sequence, to_ix):
    idxs = [to_ix[ch] for ch in sequence]
    return torch.tensor(idxs, dtype=torch.long).view(-1, 1)

# Hiperparámetros
input_size = 12  # Tamaño del vocabulario (0-9, "+", padding)
hidden_size = 128
output_size = 10  # 0-9
learning_rate = 0.001
num_epochs = 10000
num_samples = 10000

# Generar el conjunto de datos
dataset = DatasetGenerator(num_samples)
data = dataset.generate_samples()

# Crear diccionarios de caracteres a índices
char_to_idx = {char: idx for idx, char in enumerate("0123456789+")}
idx_to_char = {idx: char for char, idx in char_to_idx.items()}

# Crear el modelo y definir la función de pérdida y el optimizador
model = Seq2SeqModel(input_size, hidden_size, output_size)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Entrenamiento del modelo
for epoch in range(num_epochs):
    total_loss = 0
    for input_seq, target_seq in data:
        input_tensor = prepare_sequence(input_seq, char_to_idx)
        target_tensor = prepare_sequence(target_seq, char_to_idx)
        
        optimizer.zero_grad()
        
        output = model(input_tensor, target_tensor)
        loss = criterion(output.view(-1, output_size), target_tensor.view(-1))
        
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()

    if (epoch + 1) % 1000 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {total_loss}')

# Evaluar el modelo
test_input = "123+456"
test_input_tensor = prepare_sequence(test_input, char_to_idx)
output = model(test_input_tensor, torch.zeros(3, 1, hidden_size))  # Longitud máxima de salida
predicted = torch.argmax(output, dim=2)
predicted_sequence = ''.join([idx_to_char[idx] for idx in predicted.view(-1)])
print(f'Entrada: {test_input}, Predicción: {predicted_sequence}')
