In [15]:
import torch
import torch.nn as nn
import numpy as np
import scipy.io as sp
from functools import partial
from pyDOE import lhs
from torch.autograd import Variable

iter = 0

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')


In [2]:
def set_seed(seed: int = 42):
    """
    Seeding the random variables for reproducibility
    """
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


def derivative(dy: torch.Tensor, x: torch.Tensor, order: int = 1) -> torch.Tensor:
    """
    This function calculates the derivative of the model at x_f
    """
    for i in range(order):
        dy = torch.autograd.grad(
            dy, x, grad_outputs=torch.ones_like(dy), create_graph=True, retain_graph=True)[0]
    return dy


$$u_t-u_{xx}=0,\ x\in[0,1], t\in [0,T],$$
$$u(0,x)=\sin (2\pi x),$$
$$u(t,0)=0,$$
$$u_x(t,1)=2\pi e^{-t}.$$
We may define $f$ as $u_t-u_{xx}$.

In [3]:
class Wave(nn.Module):
    """
    Define the SchrodingerNN,
    it consists of 5 hidden layers
    """
    def __init__(self, ):
        # Input layer
        super(Wave, self).__init__()
        self.linear_in = nn.Linear(2, 100)
        # Output layer
        self.linear_out = nn.Linear(100, 1)
        # Hidden Layers
        self.layers = nn.ModuleList(
            [nn.Linear(100, 100) for i in range(5)]
        )
        # Activation function
        self.act = nn.Tanh() # How about LeakyReLU? Or even Swish?

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.linear_in(x)
        for layer in self.layers:
            x = self.act(layer(x))
        x = self.linear_out(x)
        return x

In [11]:
def f(model, x_f, t_f):
    """
    This function evaluates the PDE at collocation points.
    """
    u = model(torch.stack((x_f, t_f), axis = 1)) # Concatenates a seq of tensors along a new dimension
    u_t = derivative(u, t_f, order=1)
    u_xx = derivative(u, x_f, order=2)
    return u_t - u_xx


def mse_f(model, x_f, t_f):
    """
    This function calculates the MSE for the PDE.
    """
    f_u = f(model, x_f, t_f)
    return (f_u ** 2).mean()


def mse_0(model, x_0, u_0):
    """
    This function calculates the MSE for the initial condition.
    u_0 is the real values
    """
    t_0 = torch.zeros_like(x_0) # creating a t_0 with zero values
    f = model(torch.stack((x_0, t_0), axis=1))
    # extracting the u and v values from the model output
    return ((f - u_0) ** 2).mean()

def mse_b(model, t_b):
    """
    This function calculates the MSE for the boundary condition.
    """
    x_b_upper = torch.zeros_like(t_b)
    x_b_upper.requires_grad = True
    u_b_upper = model(torch.stack((x_b_upper, t_b),axis = 1))
    mse_dirichlet = (u_b_upper**2).mean()

    x_b_lower = 2*np.pi*torch.exp(-t_b)
    x_b_lower.requires_grad = True
    u_b_lower = model(torch.stack((x_b_lower, t_b),axis = 1))
    u_x_b_lower = derivative(u_b_lower, x_b_lower, 1)
    mse_neumann = ((u_b_lower - u_x_b_lower)**2).mean()

    return mse_dirichlet + mse_neumann


def init_weights(m):
    """
    This function initializes the weights of the model by the normal Xavier initialization method.
    """
    if type(m) == nn.Linear:
        torch.nn.init.xavier_normal_(m.weight)
        m.bias.data.fill_(0.01)


def closure(model, optimizer, x_f, t_f, x_0, u_0, t):
    """
    The closure function to use L-BFGS optimization method.
    """
    optimizer.zero_grad()
    # evaluating the MSE for the PDE
    loss = mse_f(model, x_f, t_f) + mse_0(model, x_0, u_0) + mse_b(model, t)
    loss.backward(retain_graph=True)
    global iter
    iter += 1
    if iter % 50 == 0:
        print(f" Iteration: {iter}  loss: {loss.item()}")
    #if iter % 100 == 0:
    #    torch.save(model.state_dict(), f'Schrodingers_Equation/models/model_LBFGS_{iter}.pt')
    return loss


def train(model, x_f, t_f, x_0, u_0, t):
    # Initialize the optimizer
    optimizer = torch.optim.LBFGS(model.parameters(),
                                  lr=1,
                                  max_iter=50000,
                                  max_eval=50000,
                                  history_size=50,
                                  tolerance_grad=1e-05,
                                  tolerance_change=0.5 * np.finfo(float).eps,
                                  line_search_fn="strong_wolfe")

    # the optimizer.step requires the closure function to be a callable function without inputs
    # therefore we need to define a partial function and pass it to the optimizer
    closure_fn = partial(closure, model, optimizer, x_f, t_f, x_0, u_0, t)
    optimizer.step(closure_fn)

In [18]:
x_ic = np.random.uniform(low=0, high=1.0, size=100)
t_ic = np.zeros_like(x_ic)

x_bc = np.random.uniform(low=0, high=1.0, size=100)
t_bc = np.random.uniform(low=0, high=1.0, size=100)

u_ic = np.sin(2*np.pi*x_ic)
u_bc = np.zeros_like(u_ic)

train_idx = np.random.randint(low=0, high=199, size=(100))
x_train = np.append(x_ic, x_bc)[train_idx]
t_train = np.append(t_ic, t_bc)[train_idx]
u_train = np.append(u_ic, u_bc)[train_idx]

train_idx = np.random.randint(low=0, high=99, size=(100))
t_x_train = np.random.uniform(low=0.0, high=1.0, size=(100))
t_x_train = t_x_train[train_idx]
x_x_train = np.zeros_like(t_x_train)+1
u_x_train = 2*np.pi*np.exp(-t_x_train)
u_x_train = u_x_train[train_idx]

In [19]:
# Upper and lower bounds of the spatial and temporal domains
lb = np.array([0.0, 0.0])
ub = np.array([1.0, 1.0])
# Number of initial, boundary and collocation points
N_f = 20_000  # collocation points

# Loading the training points

x_ic = Variable(torch.from_numpy(x_ic.astype(np.float32)),requires_grad=True).to(device)
u_ic = Variable(torch.from_numpy(u_ic.astype(np.float32)),requires_grad=True).to(device)
t_x_train = Variable(torch.from_numpy(t_x_train.astype(np.float32)),requires_grad=True).to(device)
# collocation data points using latin hypercube sampling method
c_f = lb + (ub - lb) * lhs(2, N_f)
x_f = torch.from_numpy(c_f[:, 0].astype(np.float32)).to(device)
x_f.requires_grad = True
t_f = torch.from_numpy(c_f[:, 1].astype(np.float32)).to(device)
t_f.requires_grad = True


# Instantiate the model
model = Wave().to(device)
# Apply the initialization function to the model weights
model.apply(init_weights)

# Training the model
model.train()
train(model, x_f, t_f, x_ic, u_ic, t_x_train)
# torch.save(model.state_dict(), 'Schrodingers_Equation/models/model_LBFGS.pt')


RuntimeError: you can only change requires_grad flags of leaf variables.