Packages

In [1]:
import torch
import numpy as np
import h5py
import os
from torch.autograd import grad
from torch import nn
import random
os.chdir(os.path.split(os.getcwd())[0])

Read data

In [2]:
with h5py.File(".\\dataset\\unitData.h5", 'r') as f:
  d = list(f["solEvalPoints"])
  # IDs of nodes used as trunk network inputs
  nodeID = list(map(int, d[0]))
  # [x y t] of y points (trunk inputs).
  # matrix with rows [x y t]
  solEvalPoint = torch.tensor(np.concatenate([d[1][:, None], d[2][:, None], d[3][:, None]], axis = 1))
  # DeepONet output targets (G(u)(y)) for all initial conditions
  # matrix with 'initial condition' rows and 'y point' columns
  solutionValue = torch.tensor(f["solutionValues"]).T
del d
print(len(nodeID))
print(solEvalPoint.shape)
print(solutionValue.shape)

1000
torch.Size([1000, 3])
torch.Size([1000, 1000])


  solutionValue = torch.tensor(f["solutionValues"]).T


Data parameters

In [3]:
# number of sensors is equal to number of cases
nCase = solutionValue.shape[0]
nSensor = solutionValue.shape[0]
# value of concentrated load as initial condition
load = torch.tensor(-1.21 * 0.02).to(torch.float32)
evalAmount = solEvalPoint.shape[0] # number of y points (trunk inputs)
nSample = torch.numel(solutionValue) # amount of samples in dataset
print(f"Dataset contains {nSample} samples.")

Dataset contains 1000000 samples.


Organize data

In [4]:
class acoustDataset(torch.utils.data.Dataset):
    """Dataset subclass to organize data"""
    def __init__(self, evalPoint: torch.tensor, solVal: torch.tensor) -> None:
        super(acoustDataset, self).__init__()
        self.trunkInput = evalPoint
        # target for model output
        self.target = torch.flatten(solVal)
    
    def __getitem__(self, idx):
        """Overwrite method to fetch sample"""
        # branch input load value at specific sensor. 0 everywhere else
        branchInput = torch.zeros(nSensor, dtype = torch.float32)
        branchInput[idx - (idx // nCase) * nCase] = load
        branchInput.requires_grad_(True)
        # index to access tensor with coordinates of evaluation point (trunk inputs)
        i = idx - (idx // evalAmount) * evalAmount
        trunkIn = [self.trunkInput[i, s].unsqueeze(0).requires_grad_(True) for s in range(3)]
        # sample has (case evaluation at sensors, x, y, t, G(u)(y))
        return branchInput, *trunkIn, self.target[idx].unsqueeze(0).requires_grad_(True)
        # return (branchInput, self.trunkInput[i, 0].unsqueeze(0), self.trunkInput[i, 1].unsqueeze(0), self.trunkInput[i, 2].unsqueeze(0), self.target[idx].unsqueeze(0))
    
    def __len__(self):
        """Overwrite method to return size of dataset"""
        return torch.numel(self.target)

In [5]:
dataloader = torch.utils.data.DataLoader(acoustDataset(solEvalPoint, solutionValue),
    batch_size = 64, shuffle = True, num_workers = 0
)
branchBatch, xBatch, yBatch, tBatch, targetBatch = next(iter(dataloader))
print(f"""Batch shapes:
  branch network inputs: {branchBatch.shape}
  trunk network inputs:
    x {xBatch.shape}
    y {yBatch.shape}
    t {tBatch.shape}
  model output targets: {targetBatch.shape}""")

Batch shapes:
  branch network inputs: torch.Size([64, 1000])
  trunk network inputs:
    x torch.Size([64, 1])
    y torch.Size([64, 1])
    t torch.Size([64, 1])
  model output targets: torch.Size([64, 1])


ML parameters

In [6]:
branchDepth = 7 # number of layers in branch network (MLP architecture)
branchWidth = 100 # number of neurons in layers of branch network (MLP architecture)
trunkDepth = 7 # number of layers in trunk network (MLP architecture)
trunkWidth = 100 # number of neurons in layers of trunk network (MLP architecture)
networkOutputDim = 100 # dimension of outputs of networks

Setup model

In [7]:
class branchNet(nn.Module):
    """Branch network definition"""
    def __init__(self, inDim: int, nnDepth: int, nnWidth: int):
        super().__init__()
        # Input layer. Resizes input to desired network width
        self.inputLayer = nn.Linear(inDim, nnWidth)
        # intermediate dense layers. constant dimension
        self.MLPstack = nn.ModuleList([nn.Linear(nnWidth, nnWidth) for _ in range(nnDepth - 2)])
        # output layer. resizes network intermediate representation to networkOutputDim
        self.outputLayer = nn.Linear(nnWidth, networkOutputDim)

    def forward(self, x): # forward pass
        x = self.inputLayer(x)
        for l in self.MLPstack:
            x = l(x)
        return self.outputLayer(x)
    
class trunkNet(nn.Module):
    """Trunk network definition"""
    def __init__(self, nnDepth: int, nnWidth: int):
        super().__init__()
        # Input layer. Resizes input to desired network width.
        # consider trunk network as receiving individual inputs
        # for each dimension (x, y, t)
        self.xCoord = nn.Linear(1, nnWidth)
        self.yCoord = nn.Linear(1, nnWidth)
        self.tCoord = nn.Linear(1, nnWidth)
        # intermediate dense layers. constant dimension
        self.MLPstack = nn.ModuleList([nn.Linear(nnWidth, nnWidth) for _ in range(nnDepth - 2)])
        # output layer. resizes network intermediate representation to networkOutputDim
        self.outputLayer = nn.Linear(nnWidth, networkOutputDim)

    def forward(self, x, y, t): # forward pass
        x = self.xCoord(x)
        y = self.yCoord(y)
        t = self.tCoord(t)
        o = x + y + t
        for l in self.MLPstack:
            o = l(o)
        return self.outputLayer(o)
    
# Branch network. Embeds evaluations of the input functions 'u' at sensors
branchNetwork = branchNet(nSensor, branchDepth, branchWidth)
branchOut = branchNetwork(branchBatch)
# Trunk network. Embeds spatiotemporal coordinates (x, y, t) of
# point y, in which the PDE solution (G(u)(y)) is evaluated
trunkNetwork = trunkNet(trunkDepth, trunkWidth)
trunkOut = trunkNetwork(xBatch, yBatch, tBatch)
# Shapes of outputs of networks
print(f"""Branch - batched input {branchBatch.shape} -> output: {branchOut.shape}.
Trunk - batched input ({xBatch.shape}, {yBatch.shape}, {tBatch.shape}) -> output: {trunkOut.shape}.""")

class PI_deepONet(nn.Module):
    """Class for physics-informed DeepONet"""
    def __init__(self, branch: nn.Module, trunk: nn.Module):
        super().__init__()
        self.branch = branch
        self.trunk = trunk

    def forward(self, case: torch.tensor, x: torch.tensor, y: torch.tensor, t: torch.tensor):
        # transpose trunk output and multiply both for
        # sample-by-sample dot product in diagonal of result
        return torch.diagonal(torch.matmul(self.branch(case), self.trunk(x, y, t).T))
    
model = PI_deepONet(branchNetwork, trunkNetwork)
modelOut = model(branchBatch, xBatch, yBatch, tBatch)
print(f"Batch PI-DeepONet output shape: {modelOut.shape}")

Branch - batched input torch.Size([64, 1000]) -> output: torch.Size([64, 100]).
Trunk - batched input (torch.Size([64, 1]), torch.Size([64, 1]), torch.Size([64, 1])) -> output: torch.Size([64, 100]).
Batch PI-DeepONet output shape: torch.Size([64])


Loss

In [8]:
# outM = model(branchBatch, xBatch, yBatch, tBatch)
# p_x = grad(outM.sum(), xBatch, create_graph = True)[0]
# p_xx = grad(p_x.sum(), xBatch)[0]
# print(p_x.shape)
# print(p_xx.shape)

# def sf(bb, xb, yb, tb):
#     return torch.sum(model(bb, xb, yb, tb))
# torch.autograd.functional.hessian(sf, (branchBatch, xBatch, yBatch, tBatch), create_graph = False, strict = False)

def PDEresidual(case: torch.tensor, evalPoint: torch.tensor, m: torch.Module) -> torch.tensor:
    """PDE residual through AD for physics-informed loss term"""
    p = model(case, evalPoint)
    # p_x = torch.autograd.grad(p, , create_graph=True)[0]

SyntaxError: invalid syntax (660270611.py, line 14)

Optimizer

In [9]:
opt = torch.optim.AdamW(model.parameters())

Train

In [12]:
def train_loop(dataloader, deepONet, loss_fn, optimizer, epochAmount):
    size = len(dataloader.dataset)
    # Set the model to training mode. Added for best practices
    deepONet.train()
    # iterate in batches
    for epoch in range(epochAmount):
        print(f"EPOCH   {epoch}\n")
        for (batch, (sampleCase, sampleX, sampleY, sampleT, sampleG_uy)) in enumerate(dataloader):
            # Compute prediction and loss
            modelOut = deepONet(sampleCase, sampleX, sampleY, sampleT)
            loss = loss_fn(modelOut, sampleG_uy)
            # Backpropagation
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

            if batch % 100 == 0:
                # loss, current = loss.item(), (batch + 1) * len(dataloader)
                # print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
                print(f"loss: {loss.item()}")

train_loop(dataloader, model, torch.nn.functional.mse_loss, opt, 1)

EPOCH   0

loss: 0.5413976311683655


  loss = loss_fn(modelOut, sampleG_uy)


loss: 0.005389668978750706
loss: 0.012681028805673122
loss: 0.008139814250171185
loss: 0.00709192780777812
loss: 0.006987951695919037
loss: 0.013568645343184471
loss: 0.017612842842936516
loss: 0.006359223276376724
loss: 0.022259794175624847
loss: 0.016716646030545235
loss: 0.01087149977684021
loss: 0.007696544751524925
loss: 0.00803461018949747
loss: 0.02074422501027584
loss: 0.00559166120365262
loss: 0.00780774699524045
loss: 0.004685216583311558
loss: 0.009502287954092026
loss: 0.017272019758820534
loss: 0.009355535730719566
loss: 0.009582115337252617
loss: 0.005427428521215916
loss: 0.01159230899065733
loss: 0.012830229476094246
loss: 0.004507889971137047
loss: 0.011269123293459415
loss: 0.017415864393115044
loss: 0.008325275033712387
loss: 0.010523764416575432
loss: 0.013874617405235767
loss: 0.01006183959543705
loss: 0.006661584600806236
loss: 0.008424133993685246
loss: 0.005464915186166763
loss: 0.016221430152654648
loss: 0.013726095668971539
loss: 0.017993604764342308
loss: 0.0