In [3]:
import torch
import torch.autograd as autograd         # computation graph
from torch import Tensor                  # tensor node in the computation graph
import torch.nn as nn                     # neural networks
import torch.optim as optim               # optimizers e.g. gradient descent, ADAM, etc.
from torch.nn.parameter import Parameter
from torch.utils.data import random_split
from torch.nn import MSELoss

import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from mpl_toolkits.axes_grid1 import make_axes_locatable
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.ticker
from sklearn.model_selection import train_test_split
from torch.optim.lr_scheduler import StepLR
import matplotlib.pyplot as plt

import numpy as np
import time
# from pyDOE import lhs         #Latin Hypercube Sampling
import scipy.io
from Load_data2 import custom_csv_parser
torch.set_default_dtype(torch.float)
torch.manual_seed(1234)
np.random.seed(1234)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print(device)

if device == 'cuda':
    print(torch.cuda.get_device_name())



cpu


In [9]:
def grad(func, var):
    '''
    Computes the gradient of a function with respect to a variable.
    Written by Engsig-Karup, Allan P. (07/05/2024).

    Args:
    func (torch.Tensor): Function to differentiate
    var (torch.Tensor): Variable to differentiate with respect to

    Returns:
    torch.Tensor: Gradient of func with respect to var
    '''
    return torch.autograd.grad(func, var, grad_outputs=torch.ones_like(func), create_graph=True, retain_graph=True)[0] 

class PINN(nn.Module):
    
    def __init__(self, input_dim=1, hidden_dim=20, output_dim=7, num_hidden=1):
        super().__init__()               
        self.input = nn.Linear(in_features=input_dim, out_features=hidden_dim)
        self.activation = nn.SiLU()

        self.hidden = nn.ModuleList()
        for _ in range(num_hidden):
            self.hidden.append(nn.Linear(in_features=hidden_dim, out_features=hidden_dim))

        self.output = nn.Linear(in_features=hidden_dim, out_features=output_dim)
        # Parameters for the ODEs
        self.tau_1 = torch.tensor([49.0], requires_grad=True, device=device)       # [min]
        self.tau_2 = torch.tensor([47.0], requires_grad=True, device=device)       # [min]
        self.C_I = torch.tensor([20.1], requires_grad=True, device=device)         # [dL/min]
        self.p_2 = torch.tensor([0.0106], requires_grad=True, device=device)       # [min^(-1)]
        self.GEZI = torch.tensor([0.0022], requires_grad=True, device=device)      # [min^(-1)]
        self.EGP_0 = torch.tensor([1.33], requires_grad=True, device=device)       # [(mg/dL)/min]
        self.V_G = torch.tensor([253.0], requires_grad=True, device=device)        # [dL]
        self.tau_m = torch.tensor([47.0], requires_grad=True, device=device)       # [min]
        self.tau_sc = torch.tensor([5.0], requires_grad=True, device=device)       # [min]
        # self.S_I = torch.tensor([0.0081], requires_grad=True, device=device)

    def forward(self, t):
        u = self.activation(self.input(t))

        for hidden_layer in self.hidden:
            u = self.activation(hidden_layer(u))

        u = self.output(u)
        return u

    def _initialize_weights(self):
        # Initialize weights using Xavier initialization and biases to zero
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
    
    def LossODE(self,t,u,d,scalar):

        X = self.forward(t)

        D_1 = X[:,0]
        D_2 = X[:,1]
        I_sc = X[:,2]
        I_p = X[:,3]
        I_eff = X[:,4]
        G = X[:,5]
        G_sc = X[:,6]

        tau_1 = self.tau_1
        tau_2 = self.tau_2
        C_I = self.C_I
        p_2 = self.p_2
        GEZI = self.GEZI
        EGP_0 = self.EGP_0
        V_G = self.V_G
        tau_m = self.tau_m
        tau_sc = self.tau_sc
        # S_I = self.S_I # The special parameter we want to predict.
        ## Define the differential equations

        D_1_t = grad(D_1,t)
        D_2_t = grad(D_2,t)

        I_sc_t = grad(I_sc,t)
        I_p_t = grad(I_p,t)
        I_eff_t = grad(I_eff,t)

        G_t = grad(G,t)
        G_sc_t = grad(G_sc,t)

        Meal_1 = D_1_t - d + (D_1 / tau_m)
        Meal_2 = D_2_t - (D_1 / tau_m) + (D_2 / tau_m)

        Insulin1 = I_sc_t - (u/(tau_1*C_I)) + (I_sc / tau_1)
        Insulin2 = I_p_t - (I_sc / tau_2) + (I_p / tau_2)
        Insulin3 = I_eff_t + p_2 * I_eff - p_2 * S_I * I_p

        Glucose1 = G_t + (GEZI + I_eff) * G - EGP_0 - ((1000 * D_2) / (V_G * tau_m))
        Glucose2 = G_sc_t - (G / tau_sc) + (G_sc / tau_sc)

        loss_ode = (1/scalar[0]*torch.mean((Meal_1)**2)   + 
                    1/scalar[1]*torch.mean(Meal_2**2)     + 
                    1/scalar[2]*torch.mean(Insulin1**2)   + 
                    1/scalar[3]*torch.mean(Insulin2**2)   + 
                    1/scalar[4]*torch.mean(Insulin3**2)   + 
                    1/scalar[5]*torch.mean(Glucose1**2 )  + 
                    1/scalar[6]*torch.mean(Glucose2**2))
        return loss_ode

    def LossData(self,t,data):

        X = self.forward(t)

        D_1 = X[:,0]
        D_2 = X[:,1]
        I_sc = X[:,2]
        I_p = X[:,3]
        I_eff = X[:,4]
        G = X[:,5]
        G_sc = X[:,6]

        D1_data = data["D1"]
        D2_data = data["D2"]
        I_sc_data = data["I_sc"]
        I_p_data = data["I_p"]
        I_eff_data = data["I_eff"]
        G_data = data["G"]
        G_sc_data = data["G_sc"]

        data_loss = (torch.mean((D_1 - D1_data)**2) + 
                     torch.mean((D_2 - D2_data)**2) + 
                     torch.mean((I_sc - I_sc_data)**2) + 
                     torch.mean((I_p - I_p_data)**2) + 
                     torch.mean((I_eff - I_eff_data)**2) + 
                     torch.mean((G - G_data)**2) + 
                     torch.mean((G_sc - G_sc_data)**2))
        return data_loss

    def loss(self,t_train, t_data, u, d, data, scalar):
        loss_ode = self.LossODE(t_train, u, d, scalar)
        loss_data = self.LossData(t_data, data)

        return loss_ode, loss_data

if __name__ == "__main__":
    # Check for CUDA availability
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    hidden_dim = 128
    num_hidden_layers = 3

    # Load data and pre-process
    data = custom_csv_parser('../Patient2.csv')
    n_data = len(data["G"])

    # Split data into training and validation
    torch.manual_seed(42)

    indices = torch.randperm(n_data)

    n_train = int(n_data * 0.1)

    train_indices = indices[:n_train]
    val_indices = indices[n_train:]

    # Split the data dictionary 
    data_train = {}
    data_val = {}

    for key in data.keys():
        data_tensor = torch.tensor(data[key], requires_grad=True, device=device)          # Ensure data is a tensor
        data_train[key] = data_tensor[train_indices]
        data_val[key] = data_tensor[val_indices]

    # Define the model
    model = PINN(hidden_dim=hidden_dim, num_hidden=num_hidden_layers).to(device)

    S_I = torch.tensor([0.0], requires_grad=True, device=device)

    optimizer = torch.optim.Adam(list(model.parameters()) + [S_I], lr=1e-3, weight_decay=1e-5)


    num_epoch = 30000


    

      # Collocation points
    ### Options for improvement, try and extrend the collocation points after a large sum of training epochs, T + 5 or something
    
    d_train = data_train["Meal"]
    d_val = data_val["Meal"]
    
    u_train = data_train["Insulin"]
    u_val = data_val["Insulin"]
    
    t_train_data = data_train["t"].reshape(-1, 1)
    t_val_data = data_val["t"].reshape(-1, 1)
    
    T = data["t"][-1]
    t_train = torch.linspace(0, T, n_train, requires_grad=True, device=device).reshape(-1, 1)
    
    
    # Setup arrays for saving the losses
    train_losses = []
    val_losses = []
    learning_rates = []

    scalar = torch.Tensor([data_train["D1"].mean(), data_train["D2"].mean(), data_train["I_sc"].mean(), data_train["I_p"].mean(), data_train["I_eff"].mean(), data_train["G"].mean(), data_train["G_sc"].mean()])

    # Begin training our model
    for epoch in range(num_epoch):
        optimizer.zero_grad()
        loss_ode, loss_data = model.loss(t_train, t_train_data, u_train, d_train, data_train, scalar)
        loss = loss_ode + loss_data
        loss.backward(retain_graph=True)
        optimizer.step()
        # scheduler.step()
        
        # Get current learning rate
        current_lr = optimizer.param_groups[0]['lr']

        # Create the console output and plot
        if epoch % 100 == 0:
            with torch.no_grad():
                model.eval()
                val_loss = model.LossData(t_val_data, data_val)

            train_losses.append(loss.item())
            val_losses.append(val_loss.item())
            learning_rates.append(current_lr)

            # Print training and validation loss
        if epoch % 1000 == 0:
            # print(f"Epoch {epoch}, Loss: {loss.item():.6f}, Val Loss: {val_loss.item():.6f}")
            # print(f"Epoch {epoch}, Loss: {loss.item():.6f}, Val Loss: {val_loss.item():.6f}, S_I: {S_I.item():.6f}")
            # print(f"Epoch {epoch}, Loss: {loss.item():.6f}, Val Loss: {val_loss.item():.6f}, GEZI: {GEZI.item():.6f}, S_I: {S_I.item():.6f}")
            print(f"Epoch {epoch}, Loss ODE: {loss_ode.item():.6f}, Loss data: {loss_data.item():.6f}")#, S_I: {S_I.item():.6f}")

    

    # Plot training and validation losses and learning rate
    plt.figure(figsize=(18, 5))

    # First subplot for losses
    plt.subplot(1, 3, 1)
    epochs = range(0, num_epoch, 100)  # Since we record losses every 100 epochs
    plt.plot(epochs, train_losses, label='Training Loss')
    plt.plot(epochs, val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Losses')
    plt.legend()

    # Second subplot for learning rate
    plt.subplot(1, 3, 2)
    plt.plot(epochs, learning_rates, label='Learning Rate', color='green')
    plt.xlabel('Epoch')
    plt.ylabel('Learning Rate')
    plt.title('Learning Rate Schedule')
    plt.legend()

    # Third subplot for glucose predictions
    plt.subplot(1, 3, 3)
    t_test = torch.linspace(0, T + 150, 2500, device=device).reshape(-1, 1)
    X_pred = model(t_test)
    G_pred = X_pred[:, 5].detach().cpu().numpy()

    plt.plot(t_test.cpu().numpy(), G_pred, label='Predicted Glucose (G)')
    plt.plot(data["t"], data["G"], label='True Glucose (G)')
    plt.xlabel('Time (t)')
    plt.ylabel('Glucose Level')
    plt.title('Predicted vs True Glucose Levels')
    plt.legend()

    plt.tight_layout()
    plt.show()

Epoch 0, Loss ODE: 32.922703, Loss data: 27352.183594
Epoch 1000, Loss ODE: 26.644047, Loss data: 3.729960


KeyboardInterrupt: 