In [2]:
import torch
import torch.nn as nn
import random

### RNN module

In [3]:
# Architecture parameters and instantiation:
input_size = 10
hidden_size = 2
num_layers = 5

rnn = nn.RNN(input_size, hidden_size, num_layers)

# Data parameters:
sequence_length = 25
batch_size = 100

# Unbatached forward:
sequence = torch.randn(sequence_length, input_size)
initial_hidden = torch.randn(num_layers, hidden_size)
output, hidden = rnn(sequence, initial_hidden)

print('Unbatched input:')
print('Output shape:', output.shape)
print('Hidden shape:', hidden.shape)

# Batched forward:
batch_seq = torch.randn(sequence_length, batch_size, input_size)
initial_hidden = torch.randn(num_layers, batch_size, hidden_size)
output, hidden = rnn(batch_seq, initial_hidden)

print('\nBatched input:')
print('Output shape:', output.shape)
print('Hidden shape:', hidden.shape)

Unbatched input:
Output shape: torch.Size([25, 2])
Hidden shape: torch.Size([5, 2])

Batched input:
Output shape: torch.Size([25, 100, 2])
Hidden shape: torch.Size([5, 100, 2])


### Sequence classifier

In [4]:
class RNNClassifier(nn.Module):
    
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super().__init__()
        
        self.rnn = nn.RNN(input_size, hidden_size, num_layers)
        self.classifier = nn.Linear(hidden_size, num_classes)
    
    def forward(self, input):
        # input.shape = [seq_length, batch_size, input_size]
        # By default, initial hidden state of a RNN forward will be zero if not provided.
        output, hidden = self.rnn(input)  # output.shape = [seq_length, batch_size, hidden_size].
        output = self.classifier(output[-1])  # output.shape = [batch_size, num_classes].
        return nn.Softmax(dim=1)(output)

In [5]:
input_size = 10
hidden_size = 2
num_layers = 5
num_classes = 6

classifier = RNNClassifier(input_size, hidden_size, num_layers, num_classes)

sequence_length = 25
batch_size = 100
batch_seq = torch.randn(sequence_length, batch_size, input_size)

classes_prediction = classifier(batch_seq)

print('Prediction shape:', classes_prediction.shape)

Prediction shape: torch.Size([100, 6])


### Sequence to sequence

In [6]:
class Encoder(nn.Module):
    
    def __init__(self, input_size, hidden_size, num_layers):
        super().__init__()
        self.rnn = nn.RNN(input_size, hidden_size, num_layers)
        
    def forward(self, input):
        output, hidden = self.rnn(input)
        return output, hidden
    
class Decoder(nn.Module):
    
    def __init__(self, output_size, hidden_size, num_layers):
        super().__init__()
        self.rnn = nn.RNN(output_size, hidden_size, num_layers)
        self.output_fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, input, h0):
        # In autoregressive models (like this one), the length of the input sequence must be 1.
        output, hidden = self.rnn(input, h0)
        # Thus, output.shape = [1, batch_size, hidden_size] and it can be squeezed before the linear layer forward.
        output = self.output_fc(output.squeeze(0))
        return output, hidden


class Seq2Seq(nn.Module):
    
    def __init__(self, input_size, output_size, hidden_size, num_layers):
        super().__init__()
        
        # Encoder and decoder hidden sizes must be equal. Same for the number of layers.
        self.encoder = Encoder(input_size, hidden_size, num_layers)
        self.decoder = Decoder(output_size, hidden_size, num_layers)


    def forward(self, input, target=None, teaching_forcing=0, max_length=30):
        
        # input.shape = [input_seq_length, batch_size, input_size]
        # target.shape = [output_seq_length, batch_size, output_size] or None in testing.
        
        # teaching_forcing indicates the probability to use real target as input in the decoder instead of
        # the previous output (non-autoregressive mode). Must be zero when testing.
        
        if teaching_forcing > 0:
            assert target is not None, 'There is no target for using teaching forcing.'
        
        outputs = []  # will save the output vector at each time (prediction at time t, for each t).
        _, encoder_hidden = self.encoder(input)  # encoder_hidden.shape = [num_layers, batch_size, hidden_size].
        
        
        # The input of the decoder at t=0 will be a null tensor by default (<SOS> in the literature).
        # input_decoder.shape = [1, batch_size, dec_input_size=output_size] (simulation a seq. of length 1):
        # The last hidden states of the encoder are the initial hidden states in the decoder.
        
        output_decoder, hidden_decoder = torch.zeros(input.shape[1], self.decoder.rnn.input_size), encoder_hidden
        
        
        output_length = len(target) if target is not None else max_length
        for t in range(output_length):
            # output_decoder.shape = [batch_size, output_size].
            # hidden_decoder.shape = [num_layers, batch_size, hidden_size].
            
            teacher_forcing = random.random() < teaching_forcing  # to use or not teaching forcing.
            input_decoder = target[t] if teacher_forcing else output_decoder  # teacher forcing uses the real (previous) target as input in the decoder.
            input_decoder = input_decoder.unsqueeze(0)
            
            output_decoder, hidden_decoder = self.decoder(input_decoder, hidden_decoder)
            outputs.append(output_decoder)
            
        return torch.stack(outputs, dim=0)

input_size = 10
output_size = 20
hidden_size = 2
num_layers = 5

model = Seq2Seq(input_size, output_size, hidden_size, num_layers)

input_seq_length = 25
target_seq_length = 35
batch_size = 100

# Evaluation mode:
input = torch.randn(input_seq_length, batch_size, input_size)
target = torch.randn(target_seq_length, batch_size, output_size)
output = model(input, target=target, teaching_forcing=0.5)
print('Output shape:', output.shape)

Output shape: torch.Size([35, 100, 20])


### Encoder-decoder model for PINNs

In [22]:
class Encoder(nn.Module): # ! probar que el número de capas sea el número de variables hidden.
    
    def __init__(self, n_input_vars, n_hidden_vars, rnn_input_size, rnn_hidden_size, num_layers):
        """Encodes a group of points. Points will have static variables (representing the initial hidden
        state of the RNN) and sequential variables (passing through the RNN sequentially).

        Args:
            n_input_vars (int): number of variables to input rnn (for each time).
            n_hidden_vars (int): number of variables to initial hidden state.
            rnn_input_size (int): input rnn dimension (for each tame).
            rnn_hidden_size (int): rnn hidden state dimension.
            num_layers (int): number of layers for the RNN.
        """        
        super().__init__()
        self.hidden_fc = nn.Linear(n_hidden_vars, rnn_hidden_size)
        self.input_fc = nn.Linear(n_input_vars, rnn_input_size)
        self.rnn = nn.RNN(rnn_input_size, rnn_hidden_size, num_layers)
        
    def forward(self, input, h0):
        """Performs an affine transformation of both set of points (in order to allow a larger RNN cell).
        They are then passed through an RNN and the outputs are returned.

        Args:
            input (tensor[seq_length, batch_size, n_input_vars]): batch of sequences of points sampled from input variables.
            h0 (tensor[batch_size, n_hidden_vars]): batch of fixed points samples from hidden variables (one tuple per batch sequence).

        Returns:
            output_rnn (tensor[[seq_length, batch_size, rnn_hidden_size]): last layer outputs (for each time).
            hidden_rnn (tensor[num_layers, batch_size, rnn_hidden_size]): last hidden states (for each layer).
        """        
        hidden_rnn = self.hidden_fc(h0.expand)  # shape: [batch_size, rnn_hidden_size].
        input_rnn = self.input_fc(input.view(batch_size, -1))  # shape: [, rnn_input_size]
        output_rnn, hidden_rnn = self.rnn(input_rnn, hidden_rnn)
        return output_rnn, hidden_rnn
    
class Decoder(nn.Module):
    def __init__(self, n_input_vars, rnn_input_size, rnn_hidden_size, num_layers):
        super().__init__()
        self.input_fc = nn.Linear(n_input_vars, rnn_input_size)
        self.rnn = nn.RNN(rnn_input_size, rnn_hidden_size, num_layers)
        self.output_fc = nn.Linear(rnn_hidden_size, n_input_vars)
        
    def forward(self, input, h0):     
        input = self.input_fc(input)
        output, hidden = self.rnn(input, h0)
        output = self.output_fc(output.squeeze(0))
        return output, hidden


class EDPINN(nn.Module):
    
    def __init__(self, input_vars, hidden_vars, target_vars, rnn_input_size, rnn_hidden_size, num_layers):
        super().__init__()
        self.input_vars = input_vars
        self.hidden_vars = hidden_vars
        self.target_vars = target_vars
        
        self.encoder = Encoder(input_vars, hidden_vars, rnn_input_size, rnn_hidden_size, num_layers)
        self.decoder = Decoder(target_vars, rnn_input_size, rnn_hidden_size, num_layers)

    def forward(self, input_points, hidden_points):
        seq_length, n_samples, input_vars = input_points.shape        
        outputs = torch.empty(seq_length, n_samples, self.target_vars)
        
        # Encoder:
        _, encoder_hidden = self.encoder(input_points, hidden_points)
        
        # Decoder:
        input_decoder, hidden_decoder = torch.zeros(self., n_hidden_vars), encoder_hidden
        for t in range(seq_length):
            input_decoder = input_decoder.unsqueeze(0)
            input_decoder, hidden_decoder = self.decoder(input_decoder, hidden_decoder)
            outputs[t] = input_decoder
            
        return outputs

In [23]:
# Data generation:

# ! dejar al tiempo en la variable secuencial puede permitir estudiar la pinn hasta tiempos más largos.

input_vars = 1
hidden_vars = 2
target_vars = 1
rnn_input_size = 5
rnn_hidden_size = 10
num_layers = 1 # arreglar.

net = AttentionPINN(input_vars, hidden_vars, target_vars, rnn_input_size, rnn_hidden_size, num_layers)

n_samples = 100
seq_length = 20

# num_layers, batch_size, hidden_size
input_points = torch.randn(seq_length, n_samples, input_vars)
hidden_points = torch.randn(n_samples, hidden_vars)

net(input_points, hidden_points).shape

torch.Size([20, 100, 1])

In [None]:
# input_size = 10
# output_size = 20
# hidden_size = 1
# num_layers = 5

# model = Seq2Seq(input_size, output_size, hidden_size, num_layers)

# input_seq_length = 25
# target_seq_length = 35
# batch_size = 100

# # Evaluation mode:
# input = torch.randn(input_seq_length, batch_size, input_size)
# target = torch.randn(target_seq_length, batch_size, output_size)
# output = model(input, target=target, teaching_forcing=0.5)
# print('Output shape:', output.shape)

In [None]:
class Encoder(nn.Module):
    
    def __init__(self, input_vars, hidden_vars, rnn_input_size, rnn_hidden_size, num_layers):
        super().__init__()
        
        self.hidden_fc = nn.Linear(hidden_vars, rnn_hidden_size)
        self.input_fc = nn.Linear(input_vars, rnn_input_size)
        self.rnn = nn.RNN(rnn_input_size, rnn_hidden_size, num_layers)
        
    def forward(self, input_points, hidden_points):
        hidden_points = self.hidden_fc(hidden_points).unsqueeze(0)
        input_points = self.input_fc(input_points)
        output, hidden = self.rnn(input_points, hidden_points)
        return output, hidden
    
class Decoder(nn.Module):
    def __init__(self, target_vars, rnn_input_size, rnn_hidden_size, num_layers):
        super().__init__()
        self.input_fc = nn.Linear(target_vars, rnn_input_size)
        self.rnn = nn.RNN(rnn_input_size, rnn_hidden_size, num_layers)
        self.output_fc = nn.Linear(rnn_hidden_size, target_vars)
        
    def forward(self, input, h0):
        input = self.input_fc(input)
        output, hidden = self.rnn(input, h0)
        output = self.output_fc(output.squeeze(0))
        return output, hidden


class EDPINN(nn.Module):
    
    def __init__(self, input_vars, hidden_vars, target_vars, rnn_input_size, rnn_hidden_size, num_layers):
        super().__init__()
        self.input_vars = input_vars
        self.hidden_vars = hidden_vars
        self.target_vars = target_vars
        
        self.encoder = Encoder(input_vars, hidden_vars, rnn_input_size, rnn_hidden_size, num_layers)
        self.decoder = Decoder(target_vars, rnn_input_size, rnn_hidden_size, num_layers)

    def forward(self, input_points, hidden_points):
        seq_length, n_samples, input_vars = input_points.shape        
        outputs = torch.empty(seq_length, n_samples, self.target_vars)
        
        # Encoder:
        _, encoder_hidden = self.encoder(input_points, hidden_points)
        
        # Decoder:
        input_decoder, hidden_decoder = torch.zeros(self., n_hidden_vars), encoder_hidden
        for t in range(seq_length):
            input_decoder = input_decoder.unsqueeze(0)
            input_decoder, hidden_decoder = self.decoder(input_decoder, hidden_decoder)
            outputs[t] = input_decoder
            
        return outputs