In [17]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from sklearn.preprocessing import MinMaxScaler

In [13]:
use_GPU = torch.cuda.is_available()
if use_GPU:
    mode = {"name": "cuda", "device": torch.device("cuda")}
else:
    mode = {"name": "cpu", "device": torch.device("cpu")}
error_criterion = nn.MSELoss().to(mode["device"])
loss_criterion = nn.MSELoss().to(mode["device"])

in_dim = 1
hidden_dim = 64
out_dim = 1
sequence_length = 24
batch_size = 128

num_epochs=15
num_workers=11
lr = 1e-3
regularization=1e-6

input = torch.randn(batch_size, sequence_length)
random_data_y = torch.randn(batch_size, 1)
print(input.shape, random_data_y.shape)

torch.Size([128, 24]) torch.Size([128, 1])


In [14]:
X = torch.randn(6027, 24*3)
sequence_length = 24
num_features = int(X.shape[1]/sequence_length)
# reshape X into a 3D tensor with dimensions (number of values, sequence length, number of features)
num_sequences = X.shape[0]
num_features = int(X.shape[1]/sequence_length)
X_3d = X.reshape(X.shape[0], sequence_length, num_features)
X_3d.shape

torch.Size([6027, 24, 3])

In [11]:
if X.shape[-1] != 1:
    # reshape X_train into a 3D tensor with dimensions (number of values, sequence length, number of features)
    num_values = X.shape[0]
    num_features = int(X.shape[1]/sequence_length)
    X_3d = X.reshape(num_values, sequence_length, num_features)
    X_3d = X_3d.astype(np.float32)
    X = X_3d.copy()
    X = torch.tensor(X)
else:
    X = X.unsqueeze(-1)
X.shape

AttributeError: 'Tensor' object has no attribute 'astype'

In [None]:
class LSTM1(nn.Module):
    def __init__(self, in_dim, hidden_dim, out_dim):
        super(LSTM1, self).__init__()
        self.in_dim = in_dim
        self.hidden_dim = hidden_dim
        self.out_dim = out_dim
        self.sequence_length = sequence_length
        # lstm1, lstm2, linear are all layers in the network
        self.lstm1 = nn.LSTMCell(in_dim, hidden_dim)
        self.lstm2 = nn.LSTMCell(hidden_dim, hidden_dim)
        self.linear = nn.Linear(hidden_dim, hidden_dim)
        self.linear_out = nn.Linear(hidden_dim*sequence_length, out_dim)

        
    def forward(self, y):
        outputs = []
        h_t = torch.zeros(y.size(0), self.hidden_dim, dtype=torch.float32)
        c_t = torch.zeros(y.size(0), self.hidden_dim, dtype=torch.float32)
        h_t2 = torch.zeros(y.size(0), self.hidden_dim, dtype=torch.float32)
        c_t2 = torch.zeros(y.size(0), self.hidden_dim, dtype=torch.float32)
        
        for time_step in y.split(1, dim=1):
            # N, 1
            h_t, c_t = self.lstm1(time_step, (h_t, c_t)) # initial hidden and cell states
            h_t2, c_t2 = self.lstm2(h_t, (h_t2, c_t2)) # new hidden and cell states
            output = self.linear(h_t2) # output from the last FC layer
            outputs.append(output)
        # transform list to tensor    
        outputs = torch.cat(outputs, dim=1)
        out = self.linear_out(outputs)
        return out

In [None]:
class LSTM2(nn.Module):
    def __init__(self, in_dim, hidden_dim, out_dim):
        super(LSTM2, self).__init__()
        self.in_dim = in_dim
        self.hidden_dim = hidden_dim
        self.out_dim = out_dim

        self.lstm1 = nn.LSTMCell(in_dim, hidden_dim)
        self.lstm2 = nn.LSTMCell(hidden_dim, hidden_dim)

        self.T_A = nn.Linear(sequence_length*hidden_dim, sequence_length)
        
        self.linear = nn.Linear(hidden_dim, hidden_dim)
        self.linear_out = nn.Linear(hidden_dim, out_dim)

        self.sigmoid = nn.Sigmoid()
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=0)
        
    def forward(self, y):
        outputs = []
        h_t = torch.zeros(y.size(0), self.hidden_dim, dtype=torch.float32)
        c_t = torch.zeros(y.size(0), self.hidden_dim, dtype=torch.float32)
        h_t2 = torch.zeros(y.size(0), self.hidden_dim, dtype=torch.float32)
        c_t2 = torch.zeros(y.size(0), self.hidden_dim, dtype=torch.float32)
        
        for time_step in y.split(1, dim=1):
            h_t, c_t = self.lstm1(time_step, (h_t, c_t)) # initial hidden and cell states
            h_t2, c_t2 = self.lstm2(h_t, (h_t2, c_t2)) # new hidden and cell states
            output = self.linear(h_t2) # output from the last FC layer
            outputs.append(output)
            
        total_ht = outputs[0]
        for i in range(1, len(outputs)):
            total_ht = torch.cat((total_ht, outputs[i]), 1)

        beta_t =  self.relu(self.T_A(total_ht))
        beta_t = self.softmax(beta_t)

        out = torch.zeros(y.size(0), self.hidden_dim)

        for i in range(len(outputs)):
                      
            out = out + outputs[i]*beta_t[:,i].reshape(out.size(0), 1)

        out = self.linear_out(out)
        
        return out

In [None]:
model = LSTM1(1, 64, 1)
y_pred = model(input)
print(y_pred.shape)
print(random_data_y.shape)
mse_loss = nn.MSELoss()
mse_loss(y_pred, random_data_y)

torch.Size([128, 1])
torch.Size([128, 1])


tensor(1.0076, grad_fn=<MseLossBackward0>)

In [None]:
model = LSTM2(1, 64, 1)
y_pred = model(input)
print(y_pred.shape)
print(random_data_y.shape)
mse_loss = nn.MSELoss()
mse_loss(y_pred, random_data_y)

torch.Size([128, 1])
torch.Size([128, 1])


tensor(1.0047, grad_fn=<MseLossBackward0>)

In [None]:
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.output_size = output_size

        # define the linear input layer
        self.linear_in = nn.Linear(input_size, hidden_size)

        # define the LSTM layer
        self.lstm = nn.LSTM(input_size=hidden_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)

        # define the batch normalization layer
        self.batch_norm = nn.BatchNorm1d(hidden_size)

        # define the linear output layer
        self.linear_out = nn.Linear(hidden_size, output_size)

    def forward(self, input):
        # apply the linear input layer
        x = self.linear_in(input)

        # apply batch normalization
        x = self.batch_norm(x.transpose(1,2)).transpose(1,2)

        # apply the LSTM layer
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        lstm_out, _ = self.lstm(x, (h0, c0))

        # check number of features
        if input.size(2) == 1:
            # apply the linear output layer
            out = self.linear_out(lstm_out[:, -1, :])
        else:
            # apply the linear output layer
            out = self.linear_out(lstm_out)

        return out

In [None]:
class TemporalAttention(nn.Module):
    def __init__(self, hidden_size):
        super(TemporalAttention, self).__init__()
        self.hidden_size = hidden_size
        self.attn = nn.Linear(hidden_size, hidden_size)
        self.v = nn.Linear(hidden_size, 1, bias=False)
    
    def forward(self, hidden_states):
        # hidden_states shape: (seq_len, batch_size, hidden_size)
        energy = torch.tanh(self.attn(hidden_states))
        attention_weights = torch.softmax(self.v(energy), dim=0)
        context_vector = torch.sum(attention_weights * hidden_states, dim=0)
        return context_vector
        

class LSTMTemporalAttention(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMTemporalAttention, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.output_size = output_size

        # define the linear input layer
        self.linear_in = nn.Linear(input_size, hidden_size)

        # define the LSTM layer
        self.lstm = nn.LSTM(input_size=hidden_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)

        # define the temporal attention layer
        self.temporal_attention = TemporalAttention(hidden_size)

        # define the batch normalization layer
        self.batch_norm = nn.BatchNorm1d(hidden_size)

        if input_size == 1:
             # define the linear output layer
            self.linear_out = nn.Linear(hidden_size, output_size)
        else:
            # define the linear output layer
            self.linear_out = nn.Linear(hidden_size * 2, output_size) # multiply by 2 to account for the concatenated input and attention output


    def forward(self, input):
        # apply the linear input layer
        x = self.linear_in(input)

        # apply batch normalization
        x = self.batch_norm(x.transpose(1,2)).transpose(1,2)

        # apply the LSTM layer
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        lstm_out, _ = self.lstm(x, (h0, c0))

        # apply temporal attention
        attention_out = self.temporal_attention(lstm_out.transpose(0, 1))

        # check number of features
        if input.size(2) == 1:
            # apply the linear output layer
            out = self.linear_out(attention_out)
        else:
            # concatenate attention output with input
            out = torch.cat((attention_out.unsqueeze(1).repeat(1, x.size(1), 1), x), dim=-1)

            # apply the linear output layer
            out = self.linear_out(out)

        return out

In [None]:
class FCN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(FCN, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.num_layers = num_layers
        
        # define the linear input layer
        self.linear_in = nn.Linear(input_size, hidden_size)

        # define the batch normalization layer
        self.batch_norm = nn.BatchNorm1d(hidden_size)
        
        # define the fully connected layers
        self.fc_layers = nn.ModuleList([
            nn.Linear(hidden_size, hidden_size) for _ in range(num_layers)
        ])

        # define the linear output layer
        self.linear_out = nn.Linear(hidden_size, output_size)
    
    def forward(self, input):
        # apply the linear input layer
        x = self.linear_in(input)

        # apply the batch normalization layer
        x = self.batch_norm(x.transpose(1,2)).transpose(1,2)
        
        # reshape the input to (seq_len, batch_size, hidden_size)
        x = x.transpose(0, 1)
        
        # apply the fully connected layers
        for fc_layer in self.fc_layers:
            x = fc_layer(x)

        # check number of features
        if input.size(2) == 1:
            # apply the linear output layer
            x = self.linear_out(x[-1])
        else:
            # apply the linear output layer
            x = self.linear_out(x).transpose(0,1)

        return x

In [None]:
class FCNTemporalAttention(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(FCNTemporalAttention, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.num_layers = num_layers
        
        # define the linear input layer
        self.linear_in = nn.Linear(input_size, hidden_size)

        # define the batch normalization layer
        self.batch_norm = nn.BatchNorm1d(hidden_size)
        
        # define the fully connected layers
        self.fc_layers = nn.ModuleList([
            nn.Linear(hidden_size, hidden_size) for _ in range(num_layers)
        ])

        # define the temporal attention layer
        self.attention = TemporalAttention(hidden_size)
        
        # define the linear output layer
        self.linear_out = nn.Linear(hidden_size, output_size)
        
        # define the linear layer to reshape the output
        self.linear_reshape = nn.Linear(output_size, output_size * 25)
    
    def forward(self, input):
        # apply the linear input layer
        x = self.linear_in(input)

        # apply the batch normalization layer
        x = self.batch_norm(x.transpose(1,2)).transpose(1,2)
        
        # reshape the input to (seq_len, batch_size, hidden_size)
        x = x.transpose(0, 1)
        
        # apply the fully connected layers
        for fc_layer in self.fc_layers:
            x = fc_layer(x)

        # apply the temporal attention layer
        attention_out = self.attention(x)
        
        # apply the linear output layer
        x = self.linear_out(attention_out)

        if not input.size(2) == 1:
            # reshape the output to (batch_size, seq_len, output_size)
            x = self.linear_reshape(x)
            x = x.view(x.size(0), 25, self.output_size)

        return x


In [None]:
# Example usage
model = LSTM(input_size=1, hidden_size=64, num_layers=2, output_size=1)

input_tensor1 = torch.randn(32, 25, 1)
output_tensor1 = model(input_tensor1)

# Example usage
model2 = LSTM(input_size=31, hidden_size=64, num_layers=2, output_size=1)

input_tensor2 = torch.randn(32, 25, 31)
output_tensor2 = model2(input_tensor2)

print(input_tensor1.shape)
print(output_tensor1.shape)

print(input_tensor2.shape)
print(output_tensor2.shape)

torch.Size([32, 25, 1])
torch.Size([32, 1])
torch.Size([32, 25, 31])
torch.Size([32, 25, 1])


In [None]:
# Example usage
model = LSTMTemporalAttention(input_size=1, hidden_size=64, num_layers=2, output_size=1)

input_tensor1 = torch.randn(32, 25, 1)
output_tensor1 = model(input_tensor1)

# Example usage
model2 = LSTMTemporalAttention(input_size=31, hidden_size=64, num_layers=2, output_size=1)

input_tensor2 = torch.randn(32, 25, 31)
output_tensor2 = model2(input_tensor2)

print(input_tensor1.shape)
print(output_tensor1.shape)

print(input_tensor2.shape)
print(output_tensor2.shape)

torch.Size([32, 25, 1])
torch.Size([32, 1])
torch.Size([32, 25, 31])
torch.Size([32, 25, 1])


In [None]:
class FCN1(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(FCN1, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.num_layers = num_layers

        # define the temporal attention layer
        self.attention = TemporalAttention(hidden_dim)

        # define the linear input layer
        self.linear_in = nn.Linear(input_size, hidden_size, bias=False)

        # define the batch normalization layer
        self.batch_norm = nn.BatchNorm1d(hidden_size)
        
        # define the fully connected layers
        self.fc_layers = nn.ModuleList([
            nn.Linear(hidden_size, hidden_size, bias=False) for _ in range(num_layers)
        ])

        # define the linear output layer
        self.linear_out = nn.Linear(hidden_size, output_size, bias=False)

        self.sigmoid = nn.Sigmoid()
        self.relu = nn.ReLU()

    def forward (self,input):
        out = self.linear_in(input)
        out = self.sigmoid(out)

        
        # apply the fully connected layers
        for fc_layer in self.fc_layers:
            out = fc_layer(out)
            out = self.sigmoid(out)            

        out = self.linear_out(out)

        if input.size(2) == 1:
            out = out.transpose(0, 1)[-1]

        return out

In [None]:
# Example usage
model = FCN1(1, 64, 2, 1)

input_tensor1 = torch.randn(32, 25, 1)
output_tensor1 = model(input_tensor1)

# Example usage
model2 = FCN1(31, 64, 2, 1)

input_tensor2 = torch.randn(32, 25, 31)
output_tensor2 = model2(input_tensor2)

print(input_tensor1.shape)
print(output_tensor1.shape)

print(input_tensor2.shape)
print(output_tensor2.shape)

torch.Size([32, 25, 1])
torch.Size([32, 1])
torch.Size([32, 25, 31])
torch.Size([32, 25, 1])


In [None]:
# Example usage
model = FCNTemporalAttention(input_size=1, hidden_size=64, num_layers=2, output_size=1)

input_tensor1 = torch.randn(32, 25, 1)
output_tensor1 = model(input_tensor1)

# Example usage
model2 = FCNTemporalAttention(input_size=31, hidden_size=64, num_layers=2, output_size=1)

input_tensor2 = torch.randn(32, 25, 31)
output_tensor2 = model2(input_tensor2)

print(input_tensor1.shape)
print(output_tensor1.shape)

print(input_tensor2.shape)
print(output_tensor2.shape)

torch.Size([32, 25, 1])
torch.Size([32, 1])
torch.Size([32, 25, 31])
torch.Size([32, 25, 1])


In [23]:


class LSTMTest(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMTest, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.output_size = output_size

        # define the linear input layer
        self.linear_in = nn.Linear(input_size, hidden_size)

        # define the LSTM layer
        self.lstm = nn.LSTM(input_size=hidden_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)

        # define the batch normalization layer
        self.batch_norm = nn.BatchNorm1d(hidden_size)

        # define the linear output layer
        self.linear_out = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # apply the linear input layer
        x = self.linear_in(x)

        # apply batch normalization
        x = self.batch_norm(x.transpose(1,2)).transpose(1,2)

        # apply the LSTM layer
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        lstm_out, _ = self.lstm(x, (h0, c0))

        # apply the linear output layer
        out = self.linear_out(lstm_out[:, -1, :])

        # squeeze the output tensor to shape [batch_size]
        out = out.squeeze()

        return out

In [31]:
# Example usage
model = LSTMTest(input_size=3, hidden_size=64, num_layers=2, output_size=1)

input_tensor1 = torch.randn(32, 25, 3)
output_tensor1 = model(input_tensor1)

print(input_tensor1.shape)
print(output_tensor1.shape)


torch.Size([32, 25, 3])
torch.Size([32])


torch.Size([128, 24])