In [1]:
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn, optim
from config import *
from load_data import getTrainingSet, getValidationSet, getTestData
from torch.utils.data import DataLoader
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


class FeedForwardNN(nn.Module):
    def __init__(self, input_size, size_of_hidden_layers, output_size, dropout_rate=0):
        super(FeedForwardNN, self).__init__()
        self.size_of_hidden_layers = size_of_hidden_layers
        self.fc1 = nn.Linear(input_size, size_of_hidden_layers[0])
        self.dropout1 = nn.Dropout(dropout_rate)
        for i in range(1, len(size_of_hidden_layers)):
            setattr(
                self, f'fc{i+1}', nn.Linear(size_of_hidden_layers[i-1], size_of_hidden_layers[i]))
            setattr(
                self, f'dropout{i+1}', nn.Dropout(dropout_rate))

        self.out = nn.Linear(size_of_hidden_layers[-1], output_size)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout1(x)
        for i in range(1, len(self.size_of_hidden_layers)):
            fc = getattr(self, f'fc{i+1}')
            dropout = getattr(self, f'dropout{i+1}')
            x = self.relu(fc(x))
            x = dropout(x)
        x = self.out(x)
        return x

In [None]:
class LSTM_Node(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(LSTM_Node, self).__init__()
        self.forget_gate = nn.Linear(input_size + hidden_size, hidden_size)
        self.input_gate = nn.Linear(input_size + hidden_size, hidden_size)
        self.cell_gate = nn.Linear(input_size + hidden_size, hidden_size)
        self.output_gate = nn.Linear(input_size + hidden_size, hidden_size)

    def forward(self, x, h, c):
        x_h = torch.cat((x, h), 1)
        f = torch.sigmoid(self.forget_gate(x_h))
        i = torch.sigmoid(self.input_gate(x_h))
        c_tilde = torch.tanh(self.cell_gate(x_h))
        c_new = f * c + i * c_tilde
        o = torch.sigmoid(self.output_gate(x_h))
        h_new = o * torch.tanh(c_new)
        return h_new, c_new
    
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.nodes = nn.ModuleList([LSTM_Node(input_size, hidden_size) if i == 0 else LSTM_Node(hidden_size, hidden_size) for i in range(num_layers)])

    def forward(self, x, state):
        batch_size, sequence_length, input_size = x.shape   
        h, c = state
        outputs = []
        if self.num_layers == 1:
            h=h.squeeze(0)
            c=c.squeeze(0)
            for i in range(sequence_length):
                h, c = self.nodes[0](x[:, i, :], h, c)
                outputs.append(h)
            output = torch.stack(outputs, 1), (h, c)
        else:
            for i in range(self.num_layers): # Stacked LSTMs' TODO: fix problem with inplace edits of gradient variables
                for j in range(sequence_length):
                    h[i], c[i] = self.nodes[i](x[:, j, :], h[i], c[i])
                    outputs.append(h)
        return torch.stack(outputs, 1), (h, c)
    

class LSTM_Network(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTM_Network, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = LSTM(input_size, hidden_size, num_layers) # this can be switched out with nn.LSTM to see that implementation is correct
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, sequence_length, input_size):
        assert x.shape == (x.shape[0], sequence_length, input_size)
        h0 = torch.zeros(self.num_layers, x.size(0),
                         self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers, x.size(0),
                         self.hidden_size).to(device)

        lstm_out, _ = self.lstm(x, (h0, c0))
        out = self.fc(lstm_out[:, -1, :])

        return out

In [None]:
class Conv1d:
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0):
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding

        # Initialize the weights and biases
        self.weights = torch.randn(out_channels, in_channels, kernel_size).to(device)
        self.biases = torch.zeros(out_channels).to(device)

    def forward(self, x):
        batch_size, in_channels, width = x.shape

        # Calculate the output dimensions
        out_width = (width - self.kernel_size + 2 * self.padding) // self.stride + 1
        # Create an empty tensor for the output
        out = torch.zeros(batch_size, self.out_channels, out_width).to(device)

        # Perform the convolution
        for i in range(self.out_channels):
            for j in range(out_width):
                out[:, i, j] = torch.sum(x[:, :, j * self.stride:j * self.stride + self.kernel_size] * self.weights[i, :, :]) + self.biases[i]

        return out    

class CNN_Network(nn.Module):
    def __init__(self,input_size, sequence_length, output_size):
        super(CNN_Network, self).__init__()
        self.conv1 = Conv1d(in_channels=sequence_length, out_channels=32, kernel_size=2)
        self.conv2 = Conv1d(in_channels=32, out_channels=16, kernel_size=2)
        self.fc1 = nn.Linear(16 * 4, 100)
        self.fc2 = nn.Linear(100, 50)
        self.fc3 = nn.Linear(50, output_size)

    def forward(self, x):
        x = torch.relu(self.conv1.forward(x))
        x = torch.relu(self.conv2.forward(x))
        x = x.view(-1, 16 * 4)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [None]:
def train_model(model, train_loader, epochs, optimizer, loss_function, sequence_length, input_size,val_loader, verbose=False,save_model_name=None):
    model.train()
    losses = []
    for epoch in range(epochs):
        epoch_losses = []
        for i, (x, y) in enumerate(train_loader):
            x, y = x.to(device), y.to(device)
            model.zero_grad()
            output = model(x, sequence_length, input_size)
            y = y.view(-1, 1) #TODO: make sure size is 32,1
            loss = loss_function(output, y)
            epoch_losses.append(loss.item())
            loss.backward()
            optimizer.step()
            if verbose and i % (train_loader.batch_size*10) == 0:
                print(
                    f'Epoch {epoch} Batch {i//train_loader.batch_size} loss: {loss.item()}')
        print(f'Epoch {epoch} loss: {np.mean(epoch_losses)}, val_loss: {validate(model, val_loader)}')
        losses.append(epoch_losses)
    if save_model_name != None:
        torch.save(model.state_dict(), f'trained_models/{save_model_name}.pth')
    return losses

def validate(network, val_loader):
    losses = []
    with torch.no_grad():
        for batch in val_loader:
            features, labels = batch
            network.eval()
            o = network(features.to(device), features.shape[1], features.shape[2])
            loss = torch.nn.functional.mse_loss(o, labels.to(device))
            losses.append(loss.item())
    losses = np.array(losses)
    losses = losses[~np.isnan(losses)]
    return np.mean(losses)

In [None]:
dataset = getTrainingSet(reshape=False)
val_set = getValidationSet()
features = len(dataset.X[0][0])
sequence_length = len(dataset.X[0])
fnn_input_size = features * sequence_length
size_of_hidden_layers = FNN_SIZE_OF_HIDDEN_LAYERS
output_size = OUTPUT_SIZE
fnn_epochs = FNN_EPOCHS
learning_rate = LEARNING_RATE
l2_regularization_rate = L2_REGULARIZATION_RATE
batch_size = BATCH_SIZE

fnn_model = FeedForwardNN(fnn_input_size, size_of_hidden_layers, output_size)
loss_function = nn.MSELoss()
fnn_optimizer = optim.Adam(fnn_model.parameters(), lr=learning_rate, weight_decay=l2_regularization_rate)

In [None]:
lstm_input_size = len(dataset.X[0][0])
hidden_size = LSTM_SIZE_OF_HIDDEN_LAYER
num_layers = LSTM_NUM_LAYERS
lstm_epochs = LSTM_EPOCHS


lstm_model = LSTM_Network(lstm_input_size, hidden_size, num_layers, output_size)
lstm_opimizer = optim.Adam(lstm_model.parameters(), learning_rate, weight_decay=l2_regularization_rate)
train_dataloader = DataLoader(dataset, batch_size, shuffle=False)

In [None]:
cnn_input_size = lstm_input_size
epochs = CNN_EPOCHS
cnn_learning_rate = CNN_LEARNING_RATE

model = CNN_Network(cnn_input_size, sequence_length, output_size)
loss_function = nn.MSELoss()
adam = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=l2_regularization_rate)

In [None]:
train_dataloader = torch.utils.data.DataLoader(
    dataset, batch_size, shuffle=False)
val_dataloader = torch.utils.data.DataLoader(
    val_set, batch_size, shuffle=False)

In [None]:
# FNN
losses, outputs = train_model(
    model=fnn_model,
    train_loader=train_dataloader,
    epochs=FNN_EPOCHS,
    optimizer=fnn_optimizer,
    loss_function=loss_function,
    sequence_length=sequence_length,
    input_size=fnn_input_size,
    val_loader=val_dataloader,
    verbose=False,
    save_model_name='fnn_trained_model_vA'
)

In [None]:
# LSTM
losses, outputs = train_model(
    model=lstm_model,
    train_loader=train_dataloader,
    epochs=lstm_epochs,
    optimizer=lstm_opimizer,
    loss_function=loss_function,
    sequence_length=sequence_length,
    input_size=lstm_input_size,
    val_loader=val_dataloader,
    verbose=False,
    save_model_name='lstm_trained_model_vA'
)

In [None]:
# CNN
losses, outputs = train_model(
    model=model,
    train_loader=train_dataloader,
    epochs=epochs,
    optimizer=adam,
    loss_function=loss_function,
    sequence_length=sequence_length,
    input_size=cnn_input_size,
    val_loader=val_dataloader,
    verbose=False,
    save_model_name='cnn_trained_model_vA'
)