In [29]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CRNN(nn.Module):
    #CNN + RNN
    def __init__(self, input_len, input_dim, hidden_size, layer_num, dropout=0.5, bidirectional=True, kernel_size=3):
        super(CRNN, self).__init__()
        self.input_len = input_len
        self.input_dim = input_dim
        self.hidden_size = hidden_size
        self.kernel_size = kernel_size
        self.layer_num = layer_num
        self.dropout = dropout
        #embedding
        self.bidirectional = bidirectional
        self.conv1 = nn.Conv1d(self.input_dim, self.hidden_size, self.kernel_size, padding=self.kernel_size // 2)
        self.rnn = nn.GRU(self.hidden_size, self.hidden_size, self.layer_num, batch_first=True, dropout=self.dropout, bidirectional = self.bidirectional)
        self.out = nn.Linear(self.hidden_size * (1 + int(self.bidirectional)), 1)
        self.init_weights()

    def init_weights(self):
        torch.nn.init.xavier_uniform_(self.conv1.weight)
        torch.nn.init.xavier_uniform_(self.out.weight)

    def forward(self, x):
        x = x.transpose(1, 2)
        x = self.conv1(x)
        x = F.relu(x)
        x = x.transpose(1, 2)
        _, hn = self.rnn(x)
        h = hn[-(1 + int(self.bidirectional)):]
        x = torch.cat(h.split(1), dim=-1).squeeze(0)
        x = self.out(x)
        return x
        


class CLSTM(nn.Module):
    #CNN + LSTM
    def __init__(self, input_len, input_dim, hidden_size, layer_num, dropout=0.5, bidirectional=True, kernel_size=3):
        super(CLSTM, self).__init__()
        self.input_len = input_len
        self.input_dim = input_dim
        self.hidden_size = hidden_size
        self.kernel_size = kernel_size
        self.layer_num = layer_num
        self.dropout = dropout
        #embedding
        self.bidirectional = bidirectional
        self.conv1 = nn.Conv1d(self.input_dim, self.hidden_size, self.kernel_size, padding=self.kernel_size // 2)
        self.rnn = nn.LSTM(self.hidden_size, self.hidden_size, self.layer_num, batch_first=True, dropout=self.dropout, bidirectional = self.bidirectional)
        self.out = nn.Linear(self.hidden_size * (1 + int(self.bidirectional)), 1)
        self.init_weights()

    def init_weights(self):
        torch.nn.init.xavier_uniform_(self.conv1.weight)
        torch.nn.init.xavier_uniform_(self.out.weight)

    def forward(self, x):
        x = x.transpose(1, 2)
        x = self.conv1(x)
        x = F.relu(x)
        x = x.transpose(1, 2)
        _, (hn, _) = self.rnn(x)
        h = hn[-(1 + int(self.bidirectional)):]
        x = torch.cat(h.split(1), dim=-1).squeeze(0)
        x = self.out(x)
        return x

class RCNN(nn.Module):
    #RNN + CNN
    def __init__(self, input_len, input_dim, hidden_size, layer_num, dropout=0.5, bidirectional=True, kernel_size=3):
        super(RCNN, self).__init__()
        self.input_len = input_len
        self.input_dim = input_dim
        self.hidden_size = hidden_size
        self.kernel_size = kernel_size
        self.layer_num = layer_num
        self.dropout = dropout
        #embedding
        self.bidirectional = bidirectional
        self.rnn = nn.GRU(self.input_dim, self.hidden_size, self.layer_num, batch_first=True, dropout=self.dropout, bidirectional = self.bidirectional)
        self.conv1 = nn.Conv1d(self.hidden_size * (1 + int(self.bidirectional)), self.hidden_size, self.kernel_size, padding=self.kernel_size // 2)
        self.out = nn.Linear(self.hidden_size * self.input_len, 1)
        self.init_weights()

    def init_weights(self):
        torch.nn.init.xavier_uniform_(self.conv1.weight)
        torch.nn.init.xavier_uniform_(self.out.weight)
        
    def forward(self, x):
        x,_ = self.rnn(x)
        print(x.shape)
        x = x.transpose(1, 2)
        x = self.conv1(x)
        x = F.relu(x)
        x = x.view(x.size(0), -1)
        x = self.out(x)
        return x

class LCNN(nn.Module):
    #LSTM + CNN
    def __init__(self, input_len, input_dim, hidden_size, layer_num, dropout=0.5, bidirectional=True, kernel_size=3):
        super(LCNN, self).__init__()
        self.input_len = input_len
        self.input_dim = input_dim
        self.hidden_size = hidden_size
        self.kernel_size = kernel_size
        self.layer_num = layer_num
        self.dropout = dropout
        #embedding
        self.bidirectional = bidirectional
        self.rnn = nn.LSTM(self.input_dim, self.hidden_size, self.layer_num, batch_first=True, dropout=self.dropout, bidirectional = self.bidirectional)
        self.conv1 = nn.Conv1d(self.hidden_size * (1 + int(self.bidirectional)), self.hidden_size, self.kernel_size, padding=self.kernel_size // 2)
        self.out = nn.Linear(self.hidden_size * self.input_len, 1)
        self.init_weights()

    def init_weights(self):
        torch.nn.init.xavier_uniform_(self.conv1.weight)
        torch.nn.init.xavier_uniform_(self.out.weight)

    def forward(self, x):
        x,_ = self.rnn(x)
        x = x.transpose(1, 2)
        x = self.conv1(x)
        x = F.relu(x)
        x = x.view(x.size(0), -1)
        x = self.out(x)
        return x

In [30]:
num_samples = 1000
#test the model
X = torch.randn(num_samples, 4 )
Y = torch.randn(num_samples, 1)


In [31]:
#data loader
day = torch.randint(0, 30, (num_samples, 1))
class MyDataset(torch.utils.data.Dataset):

    def __init__(self, X, y, day, input_len=128):
        #the input data is a 1d array, indicate the minute of the day
        self.X = X
        self.y = y
        self.day = day
        self.input_len = input_len

    def __getitem__(self, index):
        #output previous self.input_len minutes data and target value
        #if the there is no enough data in the same day, pad with 0
        d = self.day[index]
        start = index - self.input_len
        if start < 0:
            start = 0
        if self.day[start] != d:
            while self.day[start] != d:
                start += 1
            #pad with 0 before start
        if index - start < self.input_len:
            x = torch.zeros(self.input_len, self.X.shape[1])
            x[self.input_len - index + start: self.input_len] = self.X[start: index].clone()
        else:
            x = self.X[start: index]
            
        y = self.y[index].clone()
        if x.shape != (self.input_len, self.X.shape[1]):
            print(x.shape, index, start)
        return x, y
    def __len__(self):
        return len(self.X)
    
mydataset = MyDataset(X, Y, day, input_len=120)
train_index = range(0, int(num_samples * 0.8))
test_index = range(0, int(num_samples * 0.8))
#test_index = range(int(num_samples * 0.8), num_samples)

train_dataset = torch.utils.data.Subset(mydataset, train_index)
test_dataset = torch.utils.data.Subset(mydataset, test_index)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=128, shuffle=False)



In [32]:
model = CRNN(input_len = 120, input_dim = 4, hidden_size = 50,  layer_num = 1, bidirectional = True, dropout = 0.5)
def correlation_t(x, y):
    return torch.sum(x * y) / (torch.sqrt(torch.sum(x * x)) * torch.sqrt(torch.sum(y * y)))

from torch.utils.data import Dataset, DataLoader
import torch.optim as optim

optimizer = optim.Adam(model.parameters(), lr=0.001)
model.train()
for epoch in range(100):
    prediction = []
    ground_truth = []
    for i, (x, y) in enumerate(train_loader):
        print(x.shape, y.shape)
        optimizer.zero_grad()
        p = model(x)
        loss = F.mse_loss(p, y)
        prediction.append(p)
        ground_truth.append(y)
        loss.backward()
        optimizer.step()
        if i % 100 == 0:
            print(epoch, i, loss.item())
    prediction = torch.cat(prediction, dim=0)
    ground_truth = torch.cat(ground_truth, dim=0)
    print(prediction.shape, ground_truth.shape)
    print(epoch, correlation_t(prediction, ground_truth).item())




torch.Size([128, 120, 4]) torch.Size([128, 1])
0 0 1.002792239189148
torch.Size([128, 120, 4]) torch.Size([128, 1])
torch.Size([128, 120, 4]) torch.Size([128, 1])
torch.Size([128, 120, 4]) torch.Size([128, 1])
torch.Size([128, 120, 4]) torch.Size([128, 1])
torch.Size([128, 120, 4]) torch.Size([128, 1])
torch.Size([32, 120, 4]) torch.Size([32, 1])
torch.Size([800, 1]) torch.Size([800, 1])
0 0.0037265142891556025
torch.Size([128, 120, 4]) torch.Size([128, 1])
1 0 1.1141608953475952
torch.Size([128, 120, 4]) torch.Size([128, 1])
torch.Size([128, 120, 4]) torch.Size([128, 1])
torch.Size([128, 120, 4]) torch.Size([128, 1])
torch.Size([128, 120, 4]) torch.Size([128, 1])


KeyboardInterrupt: 