### $$\frac{du}{dt}(t;\xi)=f(t;\xi),t\in[0,1]$$
### $$u(0;\xi)=b(\xi)$$
#### $\xi=(\xi_1,\xi_2)\sim\mathcal{N}(0,I)$
### $$f(t;\xi)=-\pi\sin(\pi t)-\frac{\pi}{2}\sin(\frac{\pi}{2}t)\xi_1-\frac{\pi}{4}\sin(\frac{\pi}{4}t)\xi_2 $$
### $$b(t;\xi)=1+\xi_1+\xi_2$$
#### Exact solution is $u(t;\xi)=\cos(\pi t)+\cos(\frac{\pi}{2}t)\xi_1+\cos(\frac{\pi}{4}t)\xi_2$

In [18]:
import pickle
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import grad
from torch.nn.utils import spectral_norm
import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams.update({'font.size': 20})

import time
import os
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"   
os.environ["CUDA_VISIBLE_DEVICES"]="0"
torch.set_default_dtype(torch.float64)

### 모델

In [19]:
#Feedfoward neural network
class DNN(nn.Module):
    def __init__(self, neurons, activation, spectral=False):
        #neurons: layer size. 
        #For example[2, 32, 32, 2] implies input dimension:2, two hidden layers with width : 32, 32, ouptut dimension: 2
        #activation: activation function of the neural network
        #spectral means whether or not use spectral normalization = spectral_norm in torch.nn.utils
        
        super(DNN, self).__init__()
        self.activation = activation
        self.n_layers = len(neurons)
        self.dense = {}
        self.nn_layers = nn.ModuleList()

        for i in range(self.n_layers-1):
            if spectral:
                self.dense[str(i)] = spectral_norm(nn.Linear(neurons[i], neurons[i+1]))
            else:
                self.dense[str(i)] = nn.Linear(neurons[i], neurons[i+1])
            
            if activation==torch.sin:
                c = torch.sqrt(torch.tensor(6.))
                numerator = torch.sqrt(torch.tensor(neurons[i]).double())
                nn.init.uniform_(self.dense[str(i)].weight, a=-c/numerator, b=c/numerator)
                nn.init.zeros_(self.dense[str(i)].bias)
            else:
                nn.init.xavier_uniform_(self.dense[str(i)].weight)
                nn.init.zeros_(self.dense[str(i)].bias)                
            self.nn_layers.append(self.dense[str(i)])
            
    def forward(self, *args):
        x = torch.cat(args,1)
        activation = self.activation
        for i in range(self.n_layers-2):
            x = activation(self.dense[str(i)](x))
        i = self.n_layers-2
        return self.dense[str(i)](x)

In [20]:
device='cuda'
torch.manual_seed(0)

dim_x = 1
dim_z = 2
f_sensors = 41
b_sensors = 1

#B is the samples used in Fourier embedding layer
B_size=32
B_x = torch.randn([dim_x,B_size//2], device=device)
B_z = torch.randn([dim_z,B_size//2], device=device)

save_path='WGANSN_PCN_SD'
if not os.path.exists(save_path):
    os.makedirs(save_path)
with open(save_path+'/B_x.pickle', 'wb') as f:
    pickle.dump(B_x, f, pickle.HIGHEST_PROTOCOL)
with open(save_path+'/B_z.pickle', 'wb') as f:
    pickle.dump(B_z, f, pickle.HIGHEST_PROTOCOL)

#We compute network for X and Z seperately
#Size for the basis network (network for Z) = size for the coefficient network (network for X) = [32, 32, 32, 32]
#Finally, we take an inner product for the outputs, which is utilized by a matrix product.
p=32
net_size_ux = [B_size,32,32,p]
net_size_uz = [B_size,32,32,p]
net_size_D = [f_sensors+b_sensors,64,64,1]

#Fourier embedding for X
def FEx(x):
    x_sin = torch.sin(torch.matmul(x,B_x))
    x_cos = torch.cos(torch.matmul(x,B_x))
    return torch.concat([x_cos,x_sin],-1)

#Fourier embedding for Z
def FEz(x):
    x_sin = torch.sin(torch.matmul(x,B_z))
    x_cos = torch.cos(torch.matmul(x,B_z))
    return torch.concat([x_cos,x_sin],-1)

#We use sine activation
acti = torch.sin
#generator network for X
ux = DNN(net_size_ux, activation=acti).to(device)
#generator network for Z
uz = DNN(net_size_uz, activation=acti).to(device)
#discriminator
D = DNN(net_size_D, activation=torch.relu, spectral=True).to(device)

def GU(X, Z):
    X_FE = FEx(X)
    Z = FEz(Z)
    outx = acti(ux(X_FE))
    outz = uz(Z)
    
    #this is the inner product part. If you want to understand why the inner product is utilized by an matrix product, see https://ieeexplore.ieee.org/document/9989352/. 
    out = torch.matmul(outz,outx.T)
    return out

#LHS of the initial condition (u=b)
def b_tilde(X,Z):
    return GU(X,Z)

#LHS of the physics equation (u'=f)
def f_tilde(X,Z):
    u = GU(X,Z)
    
    dummy = torch.ones(u.shape, device=device, requires_grad=True)
    tmp = grad(u, X, grad_outputs=dummy, create_graph=True)[0]
    u_x = grad(tmp[:,0].sum(), dummy, create_graph=True)[0]
    return u_x


### $f, b$의 data

In [21]:
N_train = 1000
#sensor for f
x_f = torch.linspace(0,1,f_sensors, device=device, requires_grad=True).reshape(-1,1)
#sensor for b (only one point)
x_b = torch.zeros([b_sensors,1], device=device)
# xi is the source of the randomness
xi = torch.randn([N_train,2], device=device)

#compute data for f and b
cpnt_x = -np.pi*torch.sin(np.pi*x_f)
cpnt_x = torch.concat([cpnt_x,-np.pi/2*torch.sin(np.pi/2*x_f)],-1)
cpnt_x = torch.concat([cpnt_x,-np.pi/4*torch.sin(np.pi/4*x_f)],-1)
cpnt_x = cpnt_x.reshape(1,f_sensors,3)

cpnt_xi = torch.ones([N_train,1], device=device)
cpnt_xi = torch.concat([cpnt_xi,xi],-1)
cpnt_xi = cpnt_xi.reshape(N_train, 1, 3)

with torch.no_grad():
    f_data = (cpnt_x*cpnt_xi).sum(-1)
    b_data = cpnt_xi.sum(-1)

### 모델 학습

In [22]:
#Z: noise, B: data for b, F: data for f.
#This is the WGAN method with spectral normalization (SN).
#SN is utilized when the discriminator is defined
def compute_lossD(Z, B, F):
    B_gen = b_tilde(x_b, Z)
    F_gen = f_tilde(x_f, Z)

    Dreal = D(B, F)
    Dfake = D(B_gen,F_gen)
    lossD = Dreal.mean()-Dfake.mean()
    
    return -lossD

def compute_lossG(Z):
    B_gen = b_tilde(x_b, Z)
    F_gen = f_tilde(x_f, Z)
    
    Dfake = D(B_gen,F_gen)
    lossG = -Dfake.mean()
    return lossG

def updateD(Z,B,F):
    optimizerD.zero_grad()
    lossD = compute_lossD(Z,B,F)
    lossD.backward()
    optimizerD.step()

def updateG(Z):
    optimizerG.zero_grad()
    lossG = compute_lossG(Z)
    lossG.backward()
    optimizerG.step()

In [23]:
#We use RMSprop for the WGAN model
#The main reason is that the momentum based optimizer is not working for the WGAN
torch.manual_seed(0)

D_params = list(D.parameters())
G_params = list(ux.parameters())+list(uz.parameters())
optimizerD = torch.optim.RMSprop(D_params, lr=2e-4)
optimizerG = torch.optim.RMSprop(G_params, lr=5e-5)

num_epochs = 10000
st = time.time()
for epoch in range(num_epochs):
    z = torch.randn([N_train,dim_z], device=device)
    alpha = torch.randn([N_train,1], device=device)
    updateD(z, b_data, f_data)
    updateG(z)
torch.save(D.state_dict(), save_path+'/paramsD')
torch.save(ux.state_dict(), save_path+'/paramsux')
torch.save(uz.state_dict(), save_path+'/paramsuz')

print(f"{time.time()-st} seconds")

104.71177339553833 seconds


### 결과 (plot은 integrate.ipynb로)

In [24]:
torch.manual_seed(0)

N_test = 20000
m_test = 101
x_test = torch.linspace(0,1,m_test, device=device).reshape(-1,1)
with torch.no_grad():
    z_test = torch.randn([N_test,dim_z],device=device)
    u_gen = GU(x_test, z_test).cpu()
    u_mean, u_std = u_gen.mean(0), u_gen.std(0)

In [None]:
!kill -9 {os.getpid()}