In [7]:
# imports
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import numpy as np
import time
from collections import defaultdict

In [8]:
# calculate du^ n / dt^n for arbitrary n and use this to create a loss term
def compute_derivative(model, t, norm_term):
    
    # compute derivative of outputs with respect to inputs 
    deriv_to_t = torch.autograd.functional.jacobian(model, t, create_graph=True).sum(axis=2)
              
    return deriv_to_t

# build the loss function
def loss(x, A, v, model): # A is matrix of coefs and v is initial conditions
    
    # loss results
    loss_dict = {}
            
    # create the trial solution
    model_result = lambda t: model(t)[0]
    u = model_result(x).unsqueeze(dim = -1)

    A_times_u = torch.matmul(A, u)
    
    # print("u shape: ", u.shape)
    # print("A_times_u: ", A_times_u.shape)

    # the line below is calling the model_outputs function which returns the solution of the network... we then sum across axis=2 to get the derivatives with respect to each u
    du_dt = compute_derivative(model_result, x, 0)
    
    # print("du_dt shape: ", du_dt.shape)

    L_t_term = du_dt + A_times_u
    L_t = torch.matmul(L_t_term.mT, L_t_term)
        
    u_0 = model_result(torch.tensor([[0]], dtype=torch.float32))[0].unsqueeze(dim =-1)
    L_0_term = u_0 - v
    L_0 = torch.matmul(L_0_term.T, L_0_term)
    
    L = torch.mean(L_t) + L_0
    
    # normalize loss and use it to compute the normalization factor
    # norm_term1, norm_term2 = 1 / As[0] ** 2, 1 / As[1] ** 2 ---> these are now 1 / v[0] **2 (I.C.)
    
    loss_dict['L_D'] = L
    loss_dict['L_total'] = L

    return loss_dict


In [9]:
# class to build the network 
class BuildNetwork(nn.Module):
    def __init__(self, input_size, h_size1, h_size2, h_size3, output_size):
        super(BuildNetwork, self).__init__()
        self.l1 = nn.Linear(input_size, h_size1)
        self.tanh = nn.Tanh()
        self.l2 = nn.Linear(h_size1, h_size2)
        self.l3 = nn.Linear(h_size2, h_size3)
        self.output = nn.Linear(h_size3, output_size)
        
    def forward(self, x):
        result = self.l1(x)
        result = self.tanh(result)
        result = self.l2(result)
        result = self.tanh(result)
        result = self.l3(result)
        h = self.tanh(result)
        result = self.output(h)
        
        return result, h

In [10]:
# train and evaluate the model
def run_model(iterations, x_range, A, v, hid_lay, num_equations): # A is matrix of coefs and v is initial conditions
          
    # build the neural net model
    model = BuildNetwork(1, hid_lay[0], hid_lay[1], hid_lay[2], num_equations) # (1, 64, 64, 128, 2)
    # set-up the optimizer
    optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
    # create a learning rate scheduler
    scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.99)
    
    # extract the min and max range of x values to sample
    min_x, max_x = x_range
    
    # create a random number generator for loss computation
    rng = np.random.default_rng()
    
    # store loss and mse values
    loss_history = defaultdict(list)
    MSEs = []
    
    start_time = time.time()
#     curr_A_indx = -1 NOTE: commented out until we do transfer learning
    
    for i in range(iterations):
        
        print("Iteration: ", i)
        
        if i % 2000 == 0: 
#             curr_A_indx += 1 NOTE: commented out until we do transfer learning
#             A = A_vals[curr_A_indx] NOTE: commented out until we do transfer learning
#             print(f"Iteration {i} completed")
            pass
        
        # every batch, randomly sample from min and max range
        x = torch.arange(min_x, max_x, 0.001, requires_grad=True)
        x = x[rng.integers(low=0, high=len(x), size=500)]
        x = x.reshape(-1, 1)

        # forward: compute loss
        curr_loss = loss(x, A, v, model)
        
        # store individual loss terms for plotting
        loss_history['LD_losses'].append(curr_loss['L_D'].item())
        loss_history['Ltotal_losses'].append(curr_loss['L_total'].item())
            
        # backward: backpropagation
        curr_loss['L_total'].backward()
        
        # update weights and reset the gradients
        optimizer.step() 
        optimizer.zero_grad() 
        
        # learning rate update
        # scheduler.step()
        
        # compute the mse
        with torch.no_grad():
            # TODO: true solutions will have to be passed in and generalize it to n equations
            e_term1, e_term2 = np.exp(-1.1 * x), np.exp(2.2 * x)
            true_sol1 = (0.5 * A[0] * e_term1 * (e_term2 + 1) - (0.5 * A[1] * e_term1 * (e_term2 - 1))).numpy()
            network_sol1 = model(x)[0][:, 0].unsqueeze(dim=1).numpy()
            true_sol2 = (0.5 * A[1] * e_term1 * (e_term2 + 1) - (0.5 * A[0] * e_term1 * (e_term2 - 1))).numpy()
            network_sol2 = model(x)[0][:, 1].unsqueeze(dim=1).numpy()
            
        current_mse = np.mean((true_sol1 - network_sol1) ** 2) + np.mean((true_sol2 - network_sol2) ** 2)
        MSEs.append(current_mse)
   
    end_time = time.time()
    total_time = end_time - start_time
    
    print(f"Model Training Complete in{total_time: .3f} seconds")
    
    return loss_history, model, total_time, MSEs

In [11]:
# # function to plot the neural network vs exact solution
# def plot_solution(min_x, max_x, As, lambda_vals, trained_model, axis):
    
#     u1_t = lambda t: trained_model(t)[0][:, 0]
#     u2_t = lambda t: trained_model(t)[0][:, 1]
    
#     # plot the solution accuracies
#     xx = np.linspace(min_x, max_x, 200)[:, None]

#     with torch.no_grad():
#         yy1 = u1_t(torch.Tensor(xx)).numpy()
#         yy2 = u2_t(torch.Tensor(xx)).numpy()

#     # TODO: extract this plotting code out of here and make it something that is passed in
#     # TODO: make the plotting generalize to n equations (i.e. labels will have to be passed in)
#     e_term1, e_term2 = np.exp(-1.1 * xx), np.exp(2.2 * xx)
#     yt1 = (0.5 * As[0] * e_term1 * (e_term2 + 1) - (0.5 * As[1] * e_term1 * (e_term2 - 1)))
#     yt2 = (0.5 * As[1] * e_term1 * (e_term2 + 1) - (0.5 * As[0] * e_term1 * (e_term2 - 1)))

#     axis.plot(xx, yt1, label='Actual Solution $u_1(t)$')
#     axis.plot(xx, yt2, label='Actual Solution $u_2(t)$')
#     axis.plot(xx, yy1, '--', label='Neural Network Solution $u_1(t)$')
#     axis.plot(xx, yy2, '--', label='Neural Network Solution $u_2(t)$')
#     axis.set_title("u(t) vs t for Actual and Approximation")
#     axis.set_xlabel('$t$')
#     axis.set_ylabel('$u(t)$')
#     axis.legend(loc='best')
    
# # function to plot the overall loss of the network solution
# def plot_total_loss(iterations, train_losses, axis, loss_label):
#     axis.plot(range(iterations), train_losses, label=loss_label)
#     axis.set_yscale("log")
#     axis.set_title("Total Loss vs Iterations")
#     axis.set_xlabel('Iterations')
#     axis.set_ylabel('Loss')
#     axis.legend(loc='best')
    
# # function to plot the MSEs
# def plot_mse(iterations, mses, axis):
#     axis.plot(range(iterations), mses, label='MSE')
#     axis.set_yscale("log")
#     axis.set_title("MSE vs Iterations")
#     axis.set_xlabel('Iterations')
#     axis.set_ylabel('MSE')
#     axis.legend(loc='best')
    
# # wrapper function to plot the solution and the overall loss of the network solution
# def plot_loss_mse_and_solution(min_x, max_x, As, lambda_vals, iterations, 
#                                trained_model, train_losses, loss_label, mses):
#     fig, axs = plt.subplots(1, 3,  tight_layout=True, figsize=(24, 8))
    
#     plot_total_loss(iterations=iterations, train_losses=train_losses, 
#                     axis=axs[0], loss_label=loss_label)
#     plot_solution(min_x=min_x, max_x=max_x, As=As, lambda_vals=lambda_vals, 
#                   trained_model=trained_model, axis=axs[1])
#     plot_mse(iterations=iterations, mses=mses, axis=axs[2])
    
#     plt.show()
    

In [12]:
# set-up differential equation parameters for network
x_range = [0, 2]
A = torch.tensor([[0, 1.1], [1.1, 0]])
v = torch.tensor([[0.75], [0.25]])
hid_lay = [64, 64, 128]
num_equations = 2
iterations = 10

# run model which has a loss of the MSE (i.e. no higher order terms)
loss_history_w1, trained_model_w1, time_w1, MSE_w1  = run_model(iterations=iterations, x_range=x_range, A=A, v=v, hid_lay=hid_lay, num_equations=num_equations)


Iteration:  0
Iteration:  1
Iteration:  2
Iteration:  3
Iteration:  4
Iteration:  5
Iteration:  6
Iteration:  7
Iteration:  8
Iteration:  9
Model Training Complete in 13.223 seconds


In [None]:
# Iteration:  0
# Iteration:  1
# Iteration:  2
# Iteration:  3
# Iteration:  4
# Iteration:  5
# Iteration:  6
# Iteration:  7
# Iteration:  8
# Iteration:  9
# Model Training Complete in 13.223 seconds

In [None]:
# print(0.0522 - 0.2502)
# print(-0.0353 + 0.0705)
# a = torch.ones([500, 2, 1])
# print(torch.matmul(a.mT, a).shape)
# print(torch.matmul(a, a.mT).shape)
# - 0.1804 -0.25

tensor([[0.5288]], grad_fn=<AddBackward0>)

In [None]:
# a = torch.tensor([[1, 2], [3, 4]])
# b = torch.tensor([[[10], [20]], [[30], [40]], [[50], [60]]])
# print(a.shape)
# print(b.shape)

# print(torch.matmul(a, b).shape)

In [None]:
# M_data = [[1, 2], [3, 4]]
# M = torch.tensor(M_data)
# print(M)

# torch.matmul(M, M)
A = torch.tensor([[0, 1.1], [1.1, 0]])
print(A)
v = torch.tensor([0.75, 0.25])
print(v)

In [None]:
A = [0.75, 0.25]
xx = torch.Tensor(np.linspace(0, 2, 200)[:, None])
u_1 = trained_model_w1(xx)[0][:, 0].detach().numpy()
u_2 = trained_model_w1(xx)[0][:, 1].detach().numpy()

e_term1, e_term2 = np.exp(-1.1 * xx), np.exp(2.2 * xx)

yt1 = (0.5 * A[0] * e_term1 * (e_term2 + 1) - (0.5 * A[1] * e_term1 * (e_term2 - 1))).numpy()
yt2 = (0.5 * A[1] * e_term1 * (e_term2 + 1) - (0.5 * A[0] * e_term1 * (e_term2 - 1))).numpy()

plt.scatter(xx, u_1, label='u1')
plt.scatter(xx, yt1, label='sol u1', s=5)
plt.scatter(xx, u_2, label='u2')
plt.scatter(xx, yt2, label='sol u2', s=5)
plt.legend();

In [None]:
# plot overall loss and network solution for model which has a loss of the MSE (i.e. no higher order terms)
print("\n Model 1: Loss = L_D")
plot_loss_mse_and_solution(min_x=min_x, max_x=max_x, As=A_vals[3], lambda_vals=lambda_vals, 
                       iterations=iterations, trained_model=trained_model_w1, 
                       train_losses=loss_history_w1['Ltotal_losses'], 
                       loss_label='Total Loss ($L_D$)', mses=MSE_w1)

In [None]:
# # create a t to take derivative with respect to 
# rng = np.random.default_rng()
# t_eval = torch.arange(0, 2, 0.001, requires_grad=True)
# t_eval = t_eval[rng.integers(low=0, high=2000, size=500)]
# t_eval = t_eval.reshape(-1, 1)

# # forward pass to extract H 
# output, H = trained_model_w1(t_eval) 

# print("H shape before column of 1s: ", H.shape)
# H = torch.cat((torch.ones(len(t_eval), 1), H), 1)
# print("H shape after column of 1s: ", H.shape)

# # compute derivative w/r/t H and multiply it by its transpose
# dH_dt = torch.autograd.functional.jacobian(trained_model_w1 ,t_eval, create_graph=True)[1].sum(axis=2)

# print("dh_dt before adjusting: ", dH_dt.shape)
# dH_dt = torch.cat((torch.zeros(len(t_eval), 1, 1), dH_dt), 1)
# print("dh_dt after adjusting: ", dH_dt.shape)

In [None]:
# # compute dH_dt times dH_dt transpose 
# dH_dt_times_dH_dt_T = torch.matmul(dH_dt, dH_dt.mT)
# print("dH_dt * dH_dt_T: ", dH_dt_times_dH_dt_T.shape)

# # compute dH_dt times H transpose 
# new_H = torch.unsqueeze(H, dim=-1)
# dH_dt_times_H_T = torch.matmul(dH_dt, new_H.mT)
# print("dH_dt * H_T: ", dH_dt_times_H_T.shape)

# # compute H times dH_dt transpose
# H_times_dH_dt_T = torch.matmul(new_H, dH_dt.mT)

# # compute H times H transpose
# H_times_H_T = torch.matmul(new_H, new_H.mT)
# print("H * H_T: ", H_times_H_T.shape)

# # compute the sum of matrices across time
# # w_sum_term = dH_dt_times_dH_dt_T + 2 * lambda_val * dH_dt_times_H_T + lambda_val ** 2 * H_times_H_T
# w_sum_term = dH_dt_times_dH_dt_T + lambda_val * dH_dt_times_H_T + lambda_val * H_times_dH_dt_T + lambda_val ** 2 * H_times_H_T 

# print("W_sum shape before summing across time: ", w_sum_term.shape)
# w_sum_term = w_sum_term.sum(axis=0)
# w_sum_term = w_sum_term / len(t_eval)
# print("W_sum shape after summing across time: ", w_sum_term.shape)

# # compute H_0 and multiply it by its transpose 
# output_0, H_0 = trained_model_w1(torch.tensor([[0.]], dtype=torch.float32, requires_grad=True))
# print("H_0 shape before appending 1: ", H_0.shape)
# H_0 = torch.cat((torch.ones(1, 1), H_0), 1)
# print("H_0 shape after appending 1: ", H_0.shape)

# H_0_T_times_H_0 = torch.matmul(H_0.T, H_0)
# print("H_0_T * H_0 shape: ", H_0_T_times_H_0.shape)

In [None]:
# # build the M matrix and find its inverse 
# M = w_sum_term + H_0_T_times_H_0
# M_inverse = torch.linalg.pinv(M)

# # compute W_out
# w_out = torch.matmul(M_inverse, H_0.T) * 0.75

In [None]:
# tranfer_learned = [torch.matmul(w_out.T, new_H[i]).item() for i in range(500)]
# times = list(t_eval.detach().numpy().squeeze())

# true = [0.75 * np.exp(-lambda_val * time) for time in times]
# plt.scatter(times, tranfer_learned, label = 'Transfer Learning')
# plt.scatter(times, true, label='Analytical', s=10)
# plt.xlabel("t")
# plt.ylabel('y')
# plt.title("Transfer Learning vs Analytical Solution")
# plt.legend();

In [None]:
# start_one_shot_time = time.time()
# rng = np.random.default_rng(12345)
# for i in range(10000):
#     random_u_0 = rng.random()
#     w_out = torch.matmul(M_inverse, H_0.T) * random_u_0
    
# end_one_shot_time = time.time() 
# total_one_shot = end_one_shot_time - start_one_shot_time
# print(f"Total Time to evaluate w_out for 10000 different initial conditions: {total_one_shot: .2f} seconds")

In [None]:
# TODO:

# 1. make code work with 2 linear ODEs

# 2. make the code generalize to a system of n ODEs

# 3. compute the individual terms needed in the w_out calculation for a system

# 4. see how formula works and what can be changed while still doing it in "one shot"

# 5. Are the higher order terms, L_D and L_DD, etc.. still relevant?

# 6. see which parts of the formula are dependent on time and ask Pavlos how that works
    # (changing W_o means that only u_o will change eventually?)

# 7. try using a different diff. equation

# 9. make loss function proper


In [None]:
# residuals = (torch.matmul(w_out.T, dH_dt[:]) + lambda_val * torch.matmul(w_out.T, new_H[:])) ** 2
# residuals = residuals.reshape(500, 1).detach().numpy()

# print(f"Mean of residuals: {residuals.mean()}")
# initial_cond_residuals = (torch.matmul(w_out.T, H_0.T) - 0.75) ** 2
# print(f"Initial Condition Residual: {initial_cond_residuals.item()}")

# plt.plot(residuals);
# plt.xlabel("t")
# plt.ylabel("Residual")
# plt.title("Plot of Residuals");

In [None]:
# print(f"Difference Between Analytical and Tranfer Learned for t ={times[0]: .3f}: {(0.75 * np.exp(-lambda_val * times[0]) - torch.matmul(w_out.T, H[0])).item(): .5f}")
# print(f"Difference Between Analytical and Tranfer Learned for t ={times[100]: .3f}: {(0.75 * np.exp(-lambda_val * times[100]) - torch.matmul(w_out.T, H[100])).item(): .5f}")
# print(f"Difference Between Analytical and Tranfer Learned for t ={times[200]: .3f}: {(0.75 * np.exp(-lambda_val * times[200]) - torch.matmul(w_out.T, H[200])).item(): .5f}")

In [None]:
# from sklearn.preprocessing import StandardScaler
# from sklearn.decomposition import PCA
# H_test = H.detach()
# gm_H = torch.matmul(H_test.T, H_test)
# pca = PCA(n_components=5)
# pca.fit(gm_H)
# print(pca.explained_variance_ratio_)

# Appendix

### Notes:

- $u_t = lambda$ $t: A + t * model(t)$ is outdated, so we use $ u_t = lambda$ $t: A + (1 - exp(-t)) * model(t)$

- We are solving $\frac{du}{dt} + \lambda * u = 0$ where $f = -\lambda * u$

- Old Model Code: model = nn.Sequential(nn.Linear(1, 64), nn.Tanh(), nn.Linear(64, 64), nn.Tanh(), nn.Linear(64,1))

- Old solution parametrization: u_t = lambda t: A + (1 - torch.exp(-t)) * model(t)

- Original normalization term: norm_term = 1 / u_t(torch.tensor([[0]], dtype=torch.float32)) ** 2