In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [3]:
normal = pd.read_excel('normaldata.xlsx', header = None).to_numpy()
random = pd.read_excel('randomdata.xlsx', header = None).to_numpy()

normalt = pd.read_excel('normalt.xlsx', header = None).to_numpy()
randomt = pd.read_excel('randomt.xlsx', header = None).to_numpy()

normalkapa = pd.read_excel('normalkapa.xlsx', header = None).to_numpy()
randomkapa = pd.read_excel('randomkapa.xlsx', header = None).to_numpy()

normalkapaori = pd.read_excel('normalkapaori.xlsx', header = None).to_numpy()
randomkapaori = pd.read_excel('randomkapaori.xlsx', header = None).to_numpy()

normal = torch.from_numpy(normal).float().to(device)
random = torch.from_numpy(random).float().to(device)

normalt = torch.from_numpy(normalt).float().to(device)
randomt = torch.from_numpy(randomt).float().to(device)

normalkapa = torch.from_numpy(normalkapa).float().to(device)
randomkapa = torch.from_numpy(randomkapa).float().to(device)

normalkapaori = torch.from_numpy(normalkapaori).float().to(device)
randomkapaori = torch.from_numpy(randomkapaori).float().to(device)

In [6]:
# get lengths of all tubes
normallength = (normalt[:, 9] + normalt[:, 19] + normalt[:, 29]) * normal[:, 4] + 1 / normal[:, 6] * normal[:, 11] / 180 * np.pi + 1 / normal[:, 7] * normal[:, 12] / 180 * np.pi
randomlength = (randomt[:, 109] + randomt[:, 119]) * random[:, 4]

normallength = normallength.unsqueeze(1)
randomlength = randomlength.unsqueeze(1)

normallength_normalize = normallength / 1000
randomlength_normalize = randomlength / 1000

In [7]:
index1 = [0, 1, 2, 3, 4, 5, 13]
index2 = [0, 1, 2, 3, 4, 5, 19]

feature1 = normal[:, index1]
kapaori1 = normalkapaori * 100
kapa1 = normalkapa * 100

feature2 = random[:, index2]
kapaori2 = randomkapaori * 100
kapa2 = randomkapa * 100

ranges = [(20, 35), (0.075, 0.1), (1.3, 1.6), (1.05, 1.35), (40, 100), (0.05, 0.25), (0, 0.001)]
feature_normalize1 = torch.zeros_like(feature1)
feature_normalize2 = torch.zeros_like(feature2)
for i in range(7):
    minvalue, maxvalue = ranges[i]
    feature_normalize1[:, i] = (feature1[:, i] - minvalue) / (maxvalue - minvalue)
    feature_normalize2[:, i] = (feature2[:, i] - minvalue) / (maxvalue - minvalue)

feature_normalize1 = torch.hstack((feature_normalize1, normallength_normalize))
feature_normalize2 = torch.hstack((feature_normalize2, randomlength_normalize))

t1 = torch.linspace(0, 1, 1150).reshape(1150, 1)
t_s = t1.to(device)

frac_num1 = int(kapaori1.shape[0] * 0.7)
frac_num2 = int(kapaori2.shape[0] * 0.7)
# frac_num2 = int(kapaori2.shape[0] * 0.8)
train_feature1, test_feature1 = feature_normalize1[:frac_num1], feature_normalize1[frac_num1:]
train_feature2, test_feature2 = feature_normalize2[:frac_num2], feature_normalize2[frac_num2:]

train_kapaori1, test_kapaori1 = kapaori1[:frac_num1], kapaori1[frac_num1:]
train_kapaori2, test_kapaori2 = kapaori2[:frac_num2], kapaori2[frac_num2:]

train_kapa1, test_kapa1 = kapa1[:frac_num1], kapa1[frac_num1:]
train_kapa2, test_kapa2 = kapa2[:frac_num2], kapa2[frac_num2:]

train_feature = torch.vstack((train_feature1, train_feature2))
test_feature = torch.vstack((test_feature1, test_feature2))
train_kapaori = torch.vstack((train_kapaori1, train_kapaori2))
test_kapaori = torch.vstack((test_kapaori1, test_kapaori2))
train_kapa = torch.vstack((train_kapa1, train_kapa2))
test_kapa = torch.vstack((test_kapa1, test_kapa2))

In [8]:
mean = np.mean(train_kapaori.cpu().numpy(), axis=0)
centered_data = train_kapaori.cpu().numpy() - mean[None, :]
covariance_matrix = np.cov(centered_data, rowvar=False)
eigenvalues, eigenvectors = np.linalg.eigh(covariance_matrix)
basis = eigenvectors

basis = torch.from_numpy(basis).float().to(device)

In [9]:
x_train = (train_kapaori, train_feature, t_s)
y_train = train_kapa
x_test = (test_kapaori, test_feature, t_s)
y_test = test_kapa

In [10]:
class CustomDataset(Dataset):
    def __init__(self, data, label):
        self.data = data
        self.label = label

    def __len__(self):
        return len(self.data[0])
    
    def __getitem__(self, idx):
        x1 = self.data[0]
        x2 = self.data[1]
        x3 = self.data[2]
        x11 = x1[idx]
        x22 = x2[idx]
        x33 = x3
        labels = self.label[idx]
        x = (x11, x22, x33)
        return x, labels

train_loader = DataLoader(CustomDataset(x_train, y_train), batch_size = 64, shuffle = True)
test_loader = DataLoader(CustomDataset(x_test, y_test), batch_size = 64, shuffle = False)

In [11]:
class FNN(nn.Module):
    def __init__(self, layer_sizes):
        super().__init__()
        self.dense = nn.ModuleList()
        for i in range(1, len(layer_sizes) - 1):
            self.dense.append(
                nn.Linear(in_features = layer_sizes[i - 1], out_features = layer_sizes[i])
            )
            self.dense.append(nn.Tanh())
        self.dense.append(
            nn.Linear(in_features = layer_sizes[-2], out_features = layer_sizes[-1])
        )
        # self.dense.append(nn.Tanh())

    def forward(self, inputs):
        y = inputs
        for layer in self.dense:
            y = layer(y)
        return y

In [13]:
class MIONet(nn.Module):
    def __init__(self, branch1, branch2, branch3, branch4, branch5, basis, layers1, layers2, layers3):
        super().__init__()
        self.branch1 = branch1
        self.branch2 = branch2
        self.branch3 = branch3
        self.branch4 = branch4
        self.branch5 = branch5
        self.basis = basis
        self.linears1 = torch.nn.ModuleList([torch.nn.Linear(layers1[i], layers1[i+1]) for i in range(len(layers1)-1)])
        self.linears2 = torch.nn.ModuleList([torch.nn.Linear(layers2[i], layers2[i+1]) for i in range(len(layers2)-1)]) 
        self.linears3 = torch.nn.ModuleList([torch.nn.Linear(layers3[i], layers3[i+1]) for i in range(len(layers3)-1)]) 
        self.layers1 = layers1
        self.layers2 = layers2
        self.layers3 = layers3
        self.activation = nn.Tanh()
        self.loss_function = nn.MSELoss(reduction = 'mean')
        # self.b = nn.parameter.Parameter(torch.zeros(1, 1150))
        self.w = nn.parameter.Parameter(torch.tensor(1.0))

    def forward0(self, inputs):
        x_func1, x_func2, x_loc = inputs
        if len(x_loc.shape) == 3:
            x_loc = x_loc[0, :]
        else:
            x_loc = x_loc
        x_loc = torch.hstack((x_loc, torch.sin(x_loc * 2 * torch.pi * self.w), torch.cos(x_loc * 2 * torch.pi * self.w), torch.sin(2 * x_loc * 2 * torch.pi * self.w), torch.cos(2 * x_loc * 2 * torch.pi * self.w)))
        H1 = self.branch1(x_func1)
        H2 = self.branch2(x_func2)
        H3 = self.branch3(x_loc)
        H3 = self.branch4(torch.transpose(H3, 0, 1)) # [1, 100]

        return H1, H2, H3
    
    def forward1(self, inputs):
        H1, H2, H3 = self.forward0(inputs)
        h = inputs[0]

        for i in range(len(self.layers1) - 2):
            z = self.activation(self.linears1[i](h))
            # h = ((1 - z) * H1 + z * H2) * H3
            h1 = (1 - z) * H1 + z * H3
            h2 = (1 - z) * H2 + z * H3 
            h = h1 + h2
        a = self.linears1[-1](h)
        return a
    
    def forward2(self, inputs):
        H1, H2, H3 = self.forward0(inputs)
        h = inputs[1]
  
        for i in range(len(self.layers2) - 2):
            z = self.activation(self.linears2[i](h))
            # h = ((1 - z) * H1 + z * H2) * H3
            h1 = (1 - z) * H1 + z * H3
            h2 = (1 - z) * H2 + z * H3 
            h = h1 + h2
        a = self.linears2[-1](h)
        return a

    def forward3(self, inputs):
        x_loc = inputs[2]
        if len(x_loc.shape) == 3:
            x_loc = x_loc[0, :]
        else:
            x_loc = x_loc

        x_loc = torch.hstack((x_loc, torch.sin(x_loc * 2 * torch.pi * self.w), torch.cos(x_loc * 2 * torch.pi * self.w), torch.sin(2 * x_loc * 2 * torch.pi * self.w), torch.cos(2 * x_loc * 2 * torch.pi * self.w)))
        h = x_loc

        # basis2 = compute_eigenvectors(inputs[0])

        for i in range(len(self.layers3) - 2):
            z = self.activation(self.linears3[i](h)) # [50, 50]
            h = z
        a = self.linears3[-1](h)
        return a

    def forward(self, inputs):
        x_func2 = inputs[1]
        x_loc = inputs[2]
        if len(x_loc.shape) == 3:
            x_loc = x_loc[0, :]
        else:
            x_loc = x_loc
        
        y_func1 = self.forward1(inputs)
        y_func2 = self.forward2(inputs)
        y_loc = self.forward3(inputs)
        x_merger = torch.mul(y_func1, y_func2)
        y_func = x_merger
        y_loc = torch.cat((self.basis, y_loc), axis = 1)
        b = self.branch5(x_func2)
        # y_loc = y_loc
        y1 = torch.einsum("ip, jp -> ij", y_func, y_loc)
        y = y1 + b

        return y
    
    def loss(self, inputs, outputs):
        out = self.forward(inputs)
        loss1 = self.loss_function(out, outputs)
        loss2 = torch.mean(out[:, 0] ** 2) + torch.mean(out[:, -1] ** 2)

        diff1 = (out[:, :-2] - 2 * out[:, 1:-1] + out[:, 2:]) / 10
        smooth_loss = torch.mean(diff1 ** 2)
        return loss1 + loss2 + smooth_loss


In [15]:
layer_sizes1 = [1150, 2300, 2300]
layer_sizes2 = [8, 2300, 2300]
layer_sizes3 = [5, 1]
layer_sizes4 = [1150, 2300]
layer_sizes5 = [8, 1500, 1500, 1150]

branch1 = FNN(layer_sizes1).to(device)
branch2 = FNN(layer_sizes2).to(device)
branch3 = FNN(layer_sizes3).to(device)
branch4 = FNN(layer_sizes4).to(device)
branch5 = FNN(layer_sizes5).to(device)

layers1 = [1150, 2300, 2300, 2300, 2300, 2300]
layers2 = [8, 2300, 2300, 2300, 2300, 2300]
# layers3 = [3, 50, 50, 50, 50, 50]
layers3 = [5, 1150, 1150, 1150, 1150, 1150]

lr = 1e-5
model = MIONet(branch1, branch2, branch3, branch4, branch5, basis, layers1, layers2, layers3).to(device)
# model = MIONet(branch1, branch2, basis, trunk).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr = lr, weight_decay = 0.0003)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size = 1000, gamma = 0.5)

In [16]:
def initialize_weights(m):
    if hasattr(m, 'weight') and m.weight.dim() > 1:
        torch.nn.init.xavier_normal_(m.weight.data)
model.apply(initialize_weights)

MIONet(
  (branch1): FNN(
    (dense): ModuleList(
      (0): Linear(in_features=1150, out_features=2300, bias=True)
      (1): Tanh()
      (2): Linear(in_features=2300, out_features=2300, bias=True)
    )
  )
  (branch2): FNN(
    (dense): ModuleList(
      (0): Linear(in_features=8, out_features=1500, bias=True)
      (1): Tanh()
      (2): Linear(in_features=1500, out_features=2300, bias=True)
    )
  )
  (branch3): FNN(
    (dense): ModuleList(
      (0): Linear(in_features=5, out_features=1, bias=True)
    )
  )
  (branch4): FNN(
    (dense): ModuleList(
      (0): Linear(in_features=1150, out_features=2300, bias=True)
    )
  )
  (branch5): FNN(
    (dense): ModuleList(
      (0): Linear(in_features=8, out_features=1500, bias=True)
      (1): Tanh()
      (2): Linear(in_features=1500, out_features=1500, bias=True)
      (3): Tanh()
      (4): Linear(in_features=1500, out_features=1150, bias=True)
    )
  )
  (linears1): ModuleList(
    (0): Linear(in_features=1150, out_features=

In [17]:
def mean_absolute_error(y_true, y_pred):
    mae = torch.mean(torch.abs(y_true - y_pred))
    return mae

def standard_deviation_error(y_true, y_pred):
    sdm = torch.std(y_true - y_pred)
    return sdm

In [18]:
steps = 5000
min_loss = 0.0006
loss_list = []
test_loss_list = []
for i in range(steps):
    model.train()
    for inputs, outputs in train_loader:
        optimizer.zero_grad()
        loss = model.loss(inputs, outputs)
        loss.backward(retain_graph = True)
        optimizer.step()
    loss_list.append(loss.item())
    scheduler.step()

    model.eval()
    total_test_loss = 0
    total_test_samples = 0
    with torch.no_grad():
        for inputs, outputs in test_loader:
            test_loss = model.loss(inputs, outputs)
            total_test_loss += test_loss.item() * len(inputs)
            total_test_samples += len(inputs)

        average_test_loss = total_test_loss / total_test_samples
        test_loss_list.append(average_test_loss)

    if (i + 1) % 1000 == 0:
        print('+++++++++++++', i + 1, '+++++++++++++')
        print('loss is: ', loss.detach().cpu().numpy())
        print('test_loss is: ', average_test_loss)

    if loss < min_loss and average_test_loss < 0.0007:
        min_loss = loss
        torch.save(model.state_dict(), 'model1.pth')
        print('min_loss is: ', min_loss.detach().cpu().numpy(), 'loss is', average_test_loss)

+++++++++++++ 1000 +++++++++++++
loss is:  0.019145004
test_loss is:  0.027126555020610493
+++++++++++++ 2000 +++++++++++++
loss is:  0.0005271784
test_loss is:  0.0013933835628752906
min_loss is:  0.00044686018 loss is 0.000687844876665622
min_loss is:  0.00037870606 loss is 0.0006984383799135685
min_loss is:  0.00031623818 loss is 0.0006919117683234314
min_loss is:  0.00029827835 loss is 0.0006957636020767192
min_loss is:  0.00029467142 loss is 0.0006631536719699701
min_loss is:  0.0002825747 loss is 0.0006609913931849102
min_loss is:  0.00027947806 loss is 0.0006737061194144189
min_loss is:  0.00027545524 loss is 0.0006927875219844282
min_loss is:  0.00027437595 loss is 0.0006736946719077727
+++++++++++++ 3000 +++++++++++++
loss is:  0.00030352015
test_loss is:  0.0006754835291455189
min_loss is:  0.00026668716 loss is 0.0006795131484977901
min_loss is:  0.00026645054 loss is 0.0006625154443706075
min_loss is:  0.000250544 loss is 0.0006500892923213542
min_loss is:  0.0002432963 los

In [21]:
model.eval()
y_pred1 = model.forward(x_train)
y_pred2 = model.forward(x_test)
y_pred = torch.vstack((y_pred1, y_pred2))

y_true = torch.vstack((y_train, y_test))

y_pred = y_pred2
y_true = test_kapa

residual = y_pred - y_true
r2_values = torch.empty(y_pred.shape[0])
for i in range(y_pred.shape[0]):
    r2_values[i] = 1 - torch.sum(residual[i, :] ** 2) / torch.sum((y_true[i, :] - torch.mean(y_true[i, :])) ** 2)

average_r2 = r2_values.mean()

mae1 = mean_absolute_error(y_true, y_pred)
sdm1 = standard_deviation_error(y_true, y_pred)
mse = torch.mean((y_true - y_pred) ** 2)

print('mse:', mse)
print('mae:', mae1)
print('sdm:', sdm1)
print('r2:', average_r2)

mse: tensor(0.0005, device='cuda:0', grad_fn=<MeanBackward0>)
mae: tensor(0.0153, device='cuda:0', grad_fn=<MeanBackward0>)
sdm: tensor(0.0233, device='cuda:0', grad_fn=<StdBackward0>)
r2: tensor(0.9512, grad_fn=<MeanBackward0>)
