This notebook gives an overview of the lstm neural network simulation

---------------------------------------------------------------------------------------------

Install Packages: use conda OR pip !

In [None]:
# Installing packages using CONDA (recommended)

In [None]:
#Create environment from .yml called "test_notebook_env" (this might take a few minutes)
!conda env create -f environment_test.yml

In [None]:
!conda env list

In [None]:
!conda activate test_notebook_env

In [None]:
!conda info --envs

Select the conda environment in the top right (in VS Code)

In [None]:
# Installing packages using pip

In [None]:
#create virtual env
!pip install virtualenv

In [None]:
!virtualenv venv

In [None]:
#install all packages (this might take a few minutes)
!pip install matplotlib
!pip install torch
!pip install numpy
!pip install icecream
!pip install tqdm
!pip install torch
!pip install pandas
!pip install torch.utils


Select the venv environment in the top right (in VS Code)

In [None]:
# Importing necessary libraries
import matplotlib.pyplot as plt
import torch
from torch import nn
import numpy as np
from icecream import ic
from tqdm import tqdm
from get_data import *
from dataloader import *

Data:

In [None]:
# Import the data generated via matlab/simulink:

# see get_data.py for more info
data_tensor = get_data(path = "save_data_test3.csv", timesteps_from_data=0, skip_steps_start = 0, skip_steps_end = 0, drop_half_timesteps = False, normalise_s_w=False, rescale_p=False, num_inits=0)

# View an example of a simulation run
visualise(data_tensor, num_inits=100)

LSTM - neural network


In [None]:
 #Define the LSTM model class

# Use the GPU if available
torch.set_default_dtype(torch.float64)
device = "cuda:0" if torch.cuda.is_available() else "cpu"
print(device)

class LSTMmodel(nn.Module):
    """
    LSTM model class for derivative estimation.
    """

    def __init__(self, input_size, hidden_size, out_size, layers):
        """
        Initialize the LSTM model.

        Args:
        - input_size: Size of input
        - hidden_size: Size of hidden layer
        - out_size: Size of output
        - layers: Number of layers
        """
        super().__init__()

        self.hidden_size = hidden_size
        self.input_size = input_size

        # Define LSTM layer
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers=layers, batch_first=True)

        # Define linear layer
        self.linear = nn.Linear(hidden_size, out_size)

    def forward(self, seq):
        """
        Forward pass through the LSTM model.

        Args:
        - seq: Input sequence

        Returns:
        - pred: Model prediction
        - hidden: Hidden state
        """
        lstm_out, hidden = self.lstm(seq)
        pred = self.linear(lstm_out)

        return pred, hidden


Functions for training and testing the model:

In [None]:

def train(input_data, model, weight_decay, future_decay, learning_rate=0.001):
    """
    Train the LSTM model using input data.

    Args:
    - input_data: Input data for training
    - model: LSTM model to be trained
    - ws: Window size
    - odestep: Option for using ODE steps
    - use_autograd: Option for using autograd

    Returns:
    - Mean loss over all batches
    """
    loss_fn = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate, weight_decay=weight_decay)

    model.train()
    total_loss = []

    for k, (inp, label) in enumerate(input_data):  # inp = (u, x) label = x
        
        inp=inp.to(device)
        label=label.to(device)

        # Predict one timestep :
        output, _ = model(inp)
        out = inp[:, :, 1:] + output

        # reset the gradient
        optimizer.zero_grad(set_to_none=True)
        # calculate the error
        loss = loss_fn(out[:,-1,:], label[:, 1:])

        #backpropagation
        loss.backward(retain_graph=True)
        optimizer.step()


        total_loss.append(loss.detach().cpu().numpy())

   # return the average error of the next step prediction
    return np.mean(total_loss)


In [None]:

def test(test_data, model, steps=600, ws=10, plot_opt=False):

    #test_data = test_dataloader.get_all_data() 
    model.eval()
    loss_fn = nn.MSELoss()
    test_loss = 0
    test_loss_deriv = 0
    total_loss = 0

    for i, x in enumerate(test_data):
        x=x.to(device)
        if i > 5:
            break

        with torch.inference_mode():

            pred = torch.zeros((steps, 3), device=device)
            pred_next_step = torch.zeros((steps, 3), device=device)

            if ws > 1:
                pred[0:ws, :] = x[0:ws, :]
                pred[:, 0] = x[:, 0]
                pred_next_step[0:ws, :] = x[0:ws, :]
                pred_next_step[:, 0] = x[:, 0]
            else:
                pred[0, :] = x[0, :]
                pred[:, 0] = x[:, 0]
                pred_next_step[0, :] = x[0, :]
                pred_next_step[:, 0] = x[:, 0]

            for i in range(len(x) - ws):

                out, _ = model(pred[i:i+ws, :])
                pred[i+ws, 1:] = pred[i+ws-1, 1:] + out[-1, :]
                pred_next_step[i+ws, 1:] = x[i+ws-1, 1:] + out[-1, :]
            
            test_loss += loss_fn(pred[:, 1], x[:, 1]).detach().cpu().numpy()
            test_loss_deriv += loss_fn(pred[:, 2], x[:, 2]).detach().cpu().numpy()

            total_loss += loss_fn(pred[:, 1:], x[:, 1:]).detach().cpu().numpy()

            if plot_opt:
                figure , axs = plt.subplots(1,3,figsize=(16,9))
            
                axs[0].plot(pred.detach().cpu().numpy()[:, 1], color="red", label="pred")
                axs[0].plot(pred_next_step.detach().cpu().numpy()[:, 1], color="green", label="next step from data")
                axs[0].plot(x.detach().cpu().numpy()[:, 1], color="blue", label="true", linestyle="dashed")
                axs[0].set_title("position")
                axs[0].grid()
                axs[0].legend()

                axs[1].plot(pred.detach().cpu().numpy()[:, 2], color="red", label="pred")
                axs[1].plot(pred_next_step.detach().cpu().numpy()[:, 2], color="green", label="next step from data")
                axs[1].plot(x.detach().cpu().numpy()[:, 2], color="blue", label="true", linestyle="dashed")
                axs[1].set_title("speed")
                axs[1].grid()
                axs[1].legend()

                axs[2].plot(x.detach().cpu().numpy()[:,0], label="pressure")
                axs[2].set_title("pressure")
                axs[2].grid()
                axs[2].legend()

                plt.grid(True)
                plt.legend()
                plt.show()
            
    return np.mean(test_loss), np.mean(test_loss_deriv), np.mean(total_loss)


Main training loop:

generate parameters,
prepare training/testing data,
start training loop

In [None]:
# set some parameters for learning 

                    #window_size, h_size,  l_num,  epochs, learning_rate,  part_of_data,   weight_decay,   percentage_of_data    future_decay      batch_size
parameter_sets  =    [4,             5 ,      1,       10,       0.001,           100,           1e-5,               0.8,               0.3 ,           64]
                    
window_size, h_size, l_num, epochs, learning_rate, part_of_data, weight_decay,  percentage_of_data, future_decay, batch_size = parameter_sets

# Initialize the LSTM model
#model = LSTMmodel(input_size=3, hidden_size=h_size, out_size=2, layers=l_num).to(device)
model = LSTMmodel(input_size=3, hidden_size=h_size, out_size=2, layers=l_num).to(device)
# Generate input data (the data is normalized and some timesteps are cut off)
input_data = get_data(path = "save_data_test3.csv", 
                        timesteps_from_data=0, 
                        skip_steps_start = 0,
                        skip_steps_end = 0, 
                        drop_half_timesteps = False,
                        normalise_s_w=True,
                        rescale_p=False,
                        num_inits=part_of_data)

#drop timesteps at the end of each run in the training data
cut_off_timesteps = 500

#Split data into train and test sets

np.random.seed(1234)
num_of_inits_train = int(len(input_data)*percentage_of_data)
train_inits = np.random.choice(np.arange(len(input_data)),num_of_inits_train,replace=False)
test_inits = np.array([x for x in range(len(input_data)) if x not in train_inits])

train_data = input_data[train_inits,:input_data.size(dim=1)-cut_off_timesteps,:]
test_data = input_data[test_inits,:,:]

data_set  = CustomDataset(train_data, window_size=window_size)
train_dataloader = DataLoader(data_set, batch_size=batch_size, pin_memory=True, drop_last=True)


losses = []
average_traj_err_train = []
average_traj_err_test = []

for e in tqdm(range(epochs)):
    
    loss_epoch = train(train_dataloader, model, weight_decay, future_decay, learning_rate=learning_rate)
    losses.append(loss_epoch)

    # Every few epochs get the error MSE of the true data
    # compared to the network prediction starting from some initial conditions
    if (e+1)%10 == 0:
        _,_, err_train = test(train_data, model, steps=train_data.size(dim=1), ws=window_size, plot_opt=False)
        _,_, err_test = test(test_data, model, steps=test_data.size(dim=1), ws=window_size, plot_opt=False)
        average_traj_err_train.append(err_train)
        average_traj_err_test.append(err_test)
        print(f"Epoch: {epochs}, the average next step error was : loss_epoch")
        print(f"Average error over full trajectories: training data : {err_train}")
        print(f"Average error over full trajectories: testing data : {err_test}")

Save the trainend model

In [None]:
# Save the model
path = f"Ventil_trained_NNs\my_example_model.pth"
torch.save(model.state_dict(), path)

Load the model and plot some tests

In [None]:
# Load the model and test it on the test data

path = "Ventil_trained_NNs\my_example_model.pth"
model = LSTMmodel(input_size=3, hidden_size=h_size, out_size=2, layers=l_num).to(device)
model.load_state_dict(torch.load(path, map_location=torch.device(device)))

test_loss, test_loss_deriv, total_loss = test(test_data, model, steps=input_data.size(dim=1), ws=window_size, plot_opt=True)
ic(test_loss, test_loss_deriv, total_loss)