## Import Libraries and Functions and Define the Model

In [41]:
# Physics Informed Neural Network with Taylor Series
import time
from utils import *
import math
import torch.nn as nn
import os
import warnings
from miscFun import *
'''
N_INPUT: The number of input bio-z dimensions for one heartbeat
N_FEAT: The number of physiological features
N_EXT: The number of features extracted by the CNN
'''
os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':4096:8'  # 解决 cuBLAS 上下文警告
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'  # 屏蔽部分 TensorFlow/PyTorch 冗余日志（如果存在）
# os.environ['CUDA_VISIBLE_DEVICES'] = '2' # 设置可见的 GPU 设备

warnings.filterwarnings("ignore", category=UserWarning, message="To copy construct from a tensor")
warnings.filterwarnings("ignore")

if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')
    
class Configs:
    def __init__(self):
        # General parameters
        self.begin_order = 1
        self.down_sampling_layers = 1 
        self.down_sampling_window = 2
        self.channel_independence = 1 
        self.task_name = 'regression'  # or 'short_term_forecast', 'imputation', 'anomaly_detection', 'regression'
        self.seq_len = 32  # Input sequence length
        self.label_len = 48  # Label length for forecasting
        self.pred_len = 24  # Prediction length
        self.e_layers = 2  # Number of encoder layers
        self.d_model = 512  # Dimension of the model
        self.embed = 'timeF'  # Embedding type, e.g., 'timeF' for time features
        self.freq = 's'  # Frequency of the data, e.g., 'h' for hourly data
        self.dropout = 0.1  # Dropout rate
        self.top_k = 1
        self.d_ff = 512  # Dimension of the feedforward network model
        self.num_kernels = 6 # Number of kernels in the Inception block
        self.enc_in = 32  # Number of input features (for forecasting and imputation)
        self.output_attention = False  # Whether to output attention weights
        self.factor = 1 # 'attn factor'
        self.n_heads = 8  # Number of heads in the multi-head attention
        self.moving_avg = 25
        self.activation = 'gelu'  # Activation function
        self.use_future_temporal_feature = 0 
        self.use_norm = 1

class DPiKAN(nn.Module):

    def __init__(self, configs, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.configs = configs
        self.task_name = configs.task_name
        self.seq_len = configs.seq_len
        self.label_len = configs.label_len
        self.down_sampling_window = configs.down_sampling_window
        self.channel_independence = configs.channel_independence
        self.res_blocks = nn.ModuleList([MultiFrequencyResidual(configs)
                                         for _ in range(configs.e_layers)])
        self.add_blocks = nn.ModuleList([MultiFrequencyAdd(configs)
                                         for _ in range(configs.e_layers)])

        self.preprocess = series_decomp(configs.moving_avg)
        self.enc_in = configs.enc_in
        self.use_future_temporal_feature = configs.use_future_temporal_feature

        self.enc_embedding = DataEmbedding_wo_pos(1, configs.d_model, configs.embed, configs.freq,
                                                  configs.dropout)
        self.layer = configs.e_layers
        self.normalize_layers = torch.nn.ModuleList(
            [
                Normalize(1, affine=True, non_norm=True if configs.use_norm == 0 else False)
                for i in range(configs.down_sampling_layers + 1)
            ]
        )
        self.projection_layer = nn.Linear(
            configs.d_model, 1, bias=True)
        # self.predict_layer = nn.Linear(
        #     configs.seq_len,
        #     configs.pred_len,
        # )
        self.predict_layer = nn.Linear(
            configs.seq_len,
            1,
        )

    def __multi_level_process_inputs(self, x_enc):

        down_pool = torch.nn.AvgPool1d(self.configs.down_sampling_window)
        # B,T,C -> B,C,T
        x_enc = x_enc.permute(0, 2, 1)
        x_enc_ori = x_enc
        x_enc_sampling_list = []
        x_enc_sampling_list.append(x_enc.permute(0, 2, 1))
        for i in range(self.configs.down_sampling_layers):
            x_enc_sampling = down_pool(x_enc_ori)
            x_enc_sampling_list.append(x_enc_sampling.permute(0, 2, 1))
            x_enc_ori = x_enc_sampling
        x_enc = x_enc_sampling_list
        return x_enc

    def regression(self, X, adv_flag=False):
        # x_enc, feat_1, feat_2, feat_3 = X[:, :32], X[:, 32], X[:, 33], X[:, 34]
        x_enc = X[:, :32]
        x_enc = x_enc.unsqueeze(2)
        # Moving average
        x_enc = self.__multi_level_process_inputs(x_enc)
        #
        x_list = []
        for i, x in zip(range(len(x_enc)), x_enc):
            B, T, N = x.size()
            # normalize之后维度不对，具体是(B, T, C) -> (B, T, T)，先注释掉了

            x = self.normalize_layers[i](x, 'norm')
            x = x.permute(0, 2, 1).contiguous().reshape(B * N, T, 1)
            x_list.append(x)

        enc_out_list = []
        # do embedding
        for i, x in zip(range(len(x_list)), x_list):
            enc_out = self.enc_embedding(x, None)  # [B,T,C]
            enc_out_list.append(enc_out)
        enc_out_list_numpy = [enc_out.cpu().detach().numpy() for enc_out in enc_out_list]
        # Multi-order KAN representation learning

        for i in range(self.layer):
            enc_out_list = self.res_blocks[i](enc_out_list)
            enc_out_list = self.add_blocks[i](enc_out_list)

        dec_out = enc_out_list[0]

        dec_out = self.projection_layer(dec_out).view(B, -1).contiguous()


        dec_out = self.predict_layer(dec_out)
        dec_out = dec_out.reshape(B, 1)

        # dec_out = self.projection_layer(dec_out)
        # 这里dec的normalize也不对，需要重新改一下
        # dec_out = self.normalize_layers[0](dec_out, 'denorm')

        return dec_out

    def regression_net(self, x_enc, feat_1, feat_2, feat_3):
        # embedding
        return self.regression(torch.cat((x_enc, feat_1, feat_2, feat_3), 1))

    def Physics_net(self, feature, feat_1, feat_2, feat_3):
        # No need to calculate the gradient of adversarial loss
        u = self.regression_net(feature, feat_1, feat_2, feat_3)
        u_feat_1 = torch.autograd.grad(u, feat_1,
                                       grad_outputs=torch.ones_like(u), create_graph=True)[0].detach()
        u_feat_2 = torch.autograd.grad(u, feat_2,
                                       grad_outputs=torch.ones_like(u), create_graph=True)[0].detach()
        u_feat_3 = torch.autograd.grad(u, feat_3,
                                       grad_outputs=torch.ones_like(u), create_graph=True)[0].detach()
        pred_physics = (u[:-1, 0]
                        + (u_feat_1[:-1, 0] * (feat_1[1:, 0] - feat_1[:-1, 0]))
                        + (u_feat_2[:-1, 0] * (feat_2[1:, 0] - feat_2[:-1, 0]))
                        + (u_feat_3[:-1, 0] * (feat_3[1:, 0] - feat_3[:-1, 0]))
                        )
        return u, pred_physics

    def forward(self, X, flag, adv_flag):
        # flag to show calculate physics loss or not
        feature = X[:, :32].clone().detach().requires_grad_(True)
        feat_1 = X[:, 32].clone().detach().requires_grad_(True).unsqueeze(1)
        feat_2 = X[:, 33].clone().detach().requires_grad_(True).unsqueeze(1)
        feat_3 = X[:, 34].clone().detach().requires_grad_(True).unsqueeze(1)

        if flag:
            u, pred_physics = self.Physics_net(feature, feat_1, feat_2, feat_3)
            return u, pred_physics
        else:
            u = self.regression(X)
            return u

    def function(self, X, train_out):
        u, _ = self.regression(X, adv_flag=True)
        pred_loss = torch.mean(torch.square(u - train_out))
        return pred_loss



class PITN(nn.Module):
    def __init__(self, configs):
        super(PITN, self).__init__()
        self.configs = configs
        self.task_name = configs.task_name
        self.seq_len = configs.seq_len
        self.model = nn.ModuleList([TemporalBlock(configs) for _ in range(configs.e_layers)])
        self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq,
                                           configs.dropout)
        self.layer = configs.e_layers
        self.layer_norm = nn.LayerNorm(configs.d_model)
        # add layer_norm_adv for adversarial input
        self.layer_norm_adv = nn.LayerNorm(configs.d_model)
        self.act = F.gelu
        self.dropout = nn.Dropout(configs.dropout)
        self.projection = nn.Linear(512, 64)  # Adjust the output dimension for regression
        self.decision = nn.Linear(67, 1)
        
    def regression(self, X, adv_flag=False):
        # embedding
        x_enc, feat_1, feat_2, feat_3 = X[:, :32], X[:, 32], X[:, 33], X[:, 34]

        x_enc = x_enc.unsqueeze(1)
        feat_1 = feat_1.unsqueeze(1)
        feat_2 = feat_2.unsqueeze(1)
        feat_3 = feat_3.unsqueeze(1)

        enc_out = self.enc_embedding(x_enc)  # [B, T, C]
        # TimesNet
        for i in range(self.layer):
            if adv_flag:
                enc_out = self.layer_norm_adv(self.model[i](enc_out))
            else:
                enc_out = self.layer_norm(self.model[i](enc_out))
        # Output
        # the output transformer encoder/decoder embeddings don't include non-linearity
        output = self.act(enc_out)
        output = self.dropout(output)
        # (batch_size, seq_length * d_model)
        output = output.reshape(output.shape[0], -1)
        hidden_feature = output
        output = self.projection(output)  # (batch_size, 64)
        output = torch.cat((output, feat_1, feat_2, feat_3), 1)
        output = self.decision(output)
        return output, hidden_feature

    def regression_net(self, x_enc, feat_1, feat_2, feat_3):
        # embedding
        return self.regression(torch.cat((x_enc, feat_1, feat_2, feat_3), 1))

    def Physics_net(self, feature, feat_1, feat_2, feat_3):
        # No need to calculate the gradient of adversarial loss
        u, _ = self.regression_net(feature, feat_1, feat_2, feat_3)
        u_feat_1 = torch.autograd.grad(u, feat_1,
                                       grad_outputs=torch.ones_like(u), create_graph=True)[0]
        u_feat_2 = torch.autograd.grad(u, feat_2,
                                       grad_outputs=torch.ones_like(u), create_graph=True)[0]
        u_feat_3 = torch.autograd.grad(u, feat_3,
                                       grad_outputs=torch.ones_like(u), create_graph=True)[0]
        pred_physics = (u[:-1, 0]
                        + (u_feat_1[:-1, 0] * (feat_1[1:, 0] - feat_1[:-1, 0]))
                        + (u_feat_2[:-1, 0] * (feat_2[1:, 0] - feat_2[:-1, 0]))
                        + (u_feat_3[:-1, 0] * (feat_3[1:, 0] - feat_3[:-1, 0]))
                        )
        return u, pred_physics

    def forward(self, X, flag, adv_flag):
        # flag to show calculate physics loss or not
        self.feature = X[:, :32].clone().detach().requires_grad_(True)
        self.feat_1 = X[:, 32].clone().detach().requires_grad_(True).unsqueeze(1)
        self.feat_2 = X[:, 33].clone().detach().requires_grad_(True).unsqueeze(1)
        self.feat_3 = X[:, 34].clone().detach().requires_grad_(True).unsqueeze(1)

        if flag:
            u, pred_physics = self.Physics_net(self.feature, self.feat_1, self.feat_2, self.feat_3)
            return u, pred_physics
        else:
            u, hidden_feature = self.regression(X)
            return u, hidden_feature

    def function(self, X, train_out):
        u, _ = self.regression(X, adv_flag=True)
        pred_loss = torch.mean(torch.square(u - train_out))
        return pred_loss




### Import a Demo Data

In [42]:
# load an example data for demo
import pandas as pd
df_demo_data = pd.read_pickle('PPG.pkl')

### Preprocess and Prepare the Train/Test Datasets

In [43]:
import numpy as np
from sklearn import preprocessing
import random
import torch.utils.data as Data
import warnings
# Initialize a SEED value to ensure that the random processes in the code can be reproduced.
SEED = 123
# Call the function with seed value
warnings.filterwarnings("ignore", category=DeprecationWarning)


# The keys for the beat data (beat_key), the target (out_key), and the features (feat_keys) are defined
beat_key = 'bioz_beats'
out_key = 'sys'
shift = 2.0
beta = 1
feat_keys = ['phys_feat_1','phys_feat_2','phys_feat_3']

# Data scaling of BP, input beats, and input features
# This scaler standardizes by removing the mean and scaling to unit variance
# This is done to ensure having the same scale, which can improve the performance of machine learning algorithms
scaler_out = preprocessing.StandardScaler().fit(df_demo_data[out_key].to_numpy()[:, None])
mean_value = scaler_out.mean_
std_value = scaler_out.scale_

contra_shift = (shift - mean_value) / (std_value ** 2)
scaler_beats = preprocessing.StandardScaler().fit(np.concatenate(df_demo_data[beat_key].to_numpy())[:, None])
scaler_X = [preprocessing.StandardScaler().fit(df_demo_data[a].to_numpy()[:, None]) for a in feat_keys]

# Apply Scaling
# The scaled versions of the BP, input beats, and input features are then added to the dataframe
df_demo_data.loc[df_demo_data.index, beat_key + '_scaled'] = df_demo_data.apply(
    lambda x: np.concatenate(scaler_beats.transform(x[beat_key][:, None])), axis=1).to_numpy()

def scale_output(row):
    output = np.array([row[out_key]])[:, None]
    output = output.reshape(-1, 1)
    scaled_output = scaler_out.transform(output)
    return np.concatenate(scaled_output)[0]

df_demo_data[out_key + '_scaled'] = df_demo_data.apply(scale_output, axis=1).to_numpy()

# df_demo_data.loc[df_demo_data.index, out_key + '_scaled'] = df_demo_data.apply(
#     lambda x: np.concatenate(scaler_out.transform(np.array([x[out_key]])[:, None]))[0], axis=1).to_numpy()

def transform_scaler_X(x, tmp_key, tmp_count):
    value = np.array([x[tmp_key]])[:, None]
    value = value.reshape(-1, 1)
    scaled_value = scaler_X[tmp_count].transform(value)
    concatenated_value = np.concatenate(scaled_value)
    return concatenated_value

for tmp_key, tmp_count in zip(feat_keys, range(len(feat_keys))):
    # df_demo_data.loc[df_demo_data.index, tmp_key + '_scaled'] = df_demo_data.apply(
    #     lambda x: np.concatenate(scaler_X[tmp_count].transform(np.array([x[tmp_key]])[:, None])), axis=1).to_numpy()
    df_demo_data[tmp_key + '_scaled'] = df_demo_data.apply(
        lambda x, key=tmp_key, count=tmp_count: transform_scaler_X(x, key, count), axis=1).to_numpy()
# Fetch scaled feature names
X_keys = [a + '_scaled' for a in feat_keys]

# Prepare train/test using minimal training the BP
# Fetch data shapes
length_seq_x = df_demo_data.apply(lambda x: len(x[beat_key + '_scaled']), axis=1).unique()[0]

# Set the length of the target to 1
length_seq_y = 1

# Start with all points
# Reshape the scaled beat data into a 2D array where each row corresponds to a sample and each column corresponds to a time point in the beat sequence
# The same is done for the features and the target
all_beats = np.reshape(np.concatenate(df_demo_data[beat_key + '_scaled'].values), (len(df_demo_data), length_seq_x))
[all_feat1, all_feat2, all_feat3] = [df_demo_data[a].values[:, None] for a in X_keys]
all_out = df_demo_data[out_key + '_scaled'].values[:, None]

# Used only for plotting purposes
out_max_rescaled = np.concatenate(scaler_out.inverse_transform(all_out[:, 0][:, None])).max()
out_min_rescaled = np.concatenate(scaler_out.inverse_transform(all_out[:, 0][:, None])).min()
# Given different trials have time gaps, ignore first 3 instances from indices to prevent discontiunity in training
list_all_length = [0]
for _, df_tmp in df_demo_data.groupby(['trial_id']):
    list_all_length.append(len(df_tmp))
ix_ignore_all = np.concatenate(np.array([np.arange(a, a + 3, 1) for a in list(np.cumsum(list_all_length)[:-1])]))
# Update the final indices set
ix_all = list(set(np.arange(len(df_demo_data))) - set(ix_ignore_all))

# Separate train/test based on minimal training criterion
random.seed(0)
bp_dist = df_demo_data[out_key].values

# Find indices for train and test datasets
# The target values are sorted in ascending order, and the sorted indices are split into multiple subsets
# For each subset, a random index is selected as a training index
ix_split = np.split([a for a in np.argsort(bp_dist) if a not in set(ix_ignore_all)], np.cumsum(
    np.histogram(bp_dist[ix_all], bins=np.arange(bp_dist[ix_all].min(), bp_dist[ix_all].max(), 1))[0]))
ix_train = [random.Random(4).choice(a) if len(a) > 0 else -1 for a in ix_split]
ix_train = list(set(ix_train) - set([-1]))
# Test set is all remaining points not used for training
ix_test = list(set(ix_all) - set(ix_train))

 # Build train and test datasets based on the indices
train_beats = all_beats[ix_train, :]
test_beats = all_beats[ix_test, :]
[train_feat1, train_feat2, train_feat3] = [all_feat1[ix_train, :], all_feat2[ix_train, :], all_feat3[ix_train, :]]
[test_feat1, test_feat2, test_feat3] = [all_feat1[ix_test, :], all_feat2[ix_test, :], all_feat3[ix_test, :]]
train_out = all_out[ix_train, :]
test_out = all_out[ix_test, :]
train_out = [float(item) for item in train_out]
test_out = [float(item) for item in test_out]
train_out = torch.tensor(train_out, dtype=torch.float32)
test_out = torch.tensor(test_out, dtype=torch.float32)

train_feat1 = [float(item) for item in train_feat1]
train_feat2 = [float(item) for item in train_feat2]
train_feat3 = [float(item) for item in train_feat3]


### Define model input tensors

In [44]:

# The training, testing, and all data are converted to TensorFlow tensors
# The tensors for the different datasets are grouped into lists 
model_inp = torch.tensor(train_beats, dtype=torch.float32)
feat1_inp = torch.tensor(train_feat1, dtype=torch.float32)
feat2_inp = torch.tensor(train_feat2, dtype=torch.float32)
feat3_inp = torch.tensor(train_feat3, dtype=torch.float32)

inp_feat1 = feat1_inp.unsqueeze(1)
inp_feat2 = feat2_inp.unsqueeze(1)
inp_feat3 = feat3_inp.unsqueeze(1)
train_out = train_out.unsqueeze(1)
test_out = test_out.unsqueeze(1)


inp_comb = torch.cat((model_inp, inp_feat1, inp_feat2, inp_feat3), dim=1)
input_comb_np = inp_comb.clone().detach().numpy()

train_dataset = Data.TensorDataset(inp_comb, train_out)
train_data_iter = Data.DataLoader(train_dataset, batch_size=64, shuffle=True, drop_last=True)

test_feat1 = [float(item) for item in test_feat1]
test_feat2 = [float(item) for item in test_feat2]
test_feat3 = [float(item) for item in test_feat3]

model_inp_test = torch.tensor(test_beats, dtype=torch.float32)
feat1_inp_test = torch.tensor(test_feat1, dtype=torch.float32)
feat2_inp_test = torch.tensor(test_feat2, dtype=torch.float32)
feat3_inp_test = torch.tensor(test_feat3, dtype=torch.float32)


feat1_inp_test = feat1_inp_test.unsqueeze(1)
feat2_inp_test = feat2_inp_test.unsqueeze(1)
feat3_inp_test = feat3_inp_test.unsqueeze(1)

inp_comb_test = torch.cat((model_inp_test, feat1_inp_test, feat2_inp_test, feat3_inp_test), dim=1)

all_feat1 = [float(item) for item in all_feat1]
all_feat2 = [float(item) for item in all_feat2]
all_feat3 = [float(item) for item in all_feat3]

model_inp_all = torch.tensor(all_beats, dtype=torch.float32)
feat1_inp_all = torch.tensor(all_feat1, dtype=torch.float32)
feat2_inp_all = torch.tensor(all_feat2, dtype=torch.float32)
feat3_inp_all = torch.tensor(all_feat3, dtype=torch.float32)

feat1_inp_all = feat1_inp_all.unsqueeze(1)
feat2_inp_all = feat2_inp_all.unsqueeze(1)
feat3_inp_all = feat3_inp_all.unsqueeze(1)

inp_comb_all = torch.cat((model_inp_all, feat1_inp_all, feat2_inp_all, feat3_inp_all), dim=1)
ix_all = torch.tensor(ix_all)
ix_train = torch.tensor(ix_train)
ix_test = torch.tensor(ix_test)

test_dataset = Data.TensorDataset(inp_comb_test, test_out)
test_data_iter = Data.DataLoader(test_dataset, batch_size=len(inp_comb), drop_last=False)

### Train PITN model

In [None]:
configs = Configs()
model_PITN = PITN(configs)
import time

# A Deep Neural Network model is initialized with the dimension of the beats, the diemnsion of each feature, and the number of neurons in the first dense layer
N0 = 200

x_train, x_boundary, u_boundary = training_data_latin_hypercube(inp_comb, train_out, N_inner=N0)
x_adv = np.array([]).reshape((0, 35))
x_train = np.vstack([x_train, x_adv])
# No need to retrain, directly generate adversarial samples
x_adv = generate_attack_samples(model_PITN, device, x_train, N0, train_out)
# using diffusion to generate adversarial samples
foward_diffusion = diffusion_foward()
x_diffusion = foward_diffusion.q_sample(x_train, torch.tensor([20]))

# Two lists are initialized to keep track of the training and testing loss during each epoch
loss_list_pinn = []
loss_test = []
test_loss_list_pinn = []
loss_total_epoch = []
loss_physics_epoch = []
loss_dnn_epoch = []
criterion = MultiPosConLoss()
print("PITN model training started")
optimizer = optim.Adam(model_PITN.parameters(), lr=10e-4)

best_loss = float("inf")
# Two lists are initialized to keep track of the training and testing loss during each epoch
epochs = 2000
inp_comb_adv = x_adv
for epoch in range(epochs):
    start = time.time()
    train_l_sum = 0.0
    optimizer.zero_grad()
    # Traditional out
    model_PITN.to(device=device)
    # train on clean data
    # if inp_comb and train_out are not tensor, then turn them into tensor
    if not torch.is_tensor(inp_comb):
        inp_comb = torch.tensor(inp_comb, dtype=torch.float32)
    if not torch.is_tensor(train_out):
        train_out = torch.tensor(train_out, dtype=torch.float32)
    inp_comb, train_out = inp_comb.to(device=device), train_out.to(device=device)
    # model_dnn_pinn的参数意思是，输入数据，是否计算physics loss，是否使用auxiliary BN
    output, feature_clean = model_PITN(inp_comb, False, False)
    loss_dnn_clean = output - train_out
    loss_dnn_clean = torch.mean(torch.square(loss_dnn_clean))


    loss_dnn = loss_dnn_clean

    # Physics loss
    inp_comb_all = inp_comb_all.to(device=device)
    y, pred_physics = model_PITN(inp_comb_all, True, False)
    physics_loss_ini = pred_physics - y[1:, 0]
    physics_loss = torch.mean(torch.square(physics_loss_ini[ix_all[:-1].view(-1, 1)]))

    # physics_loss = K.mean(K.square(tf.gather_nd(physics_loss_ini, indices=np.array(ix_all[:-1])[:, None])))
    loss_total = loss_dnn + physics_loss * beta
    loss_total_epoch.append(np.round(loss_total.cpu().detach().numpy(), 4))
    loss_physics_epoch.append(np.round(physics_loss.cpu().detach().numpy(), 4))
    loss_dnn_epoch.append(np.round(loss_dnn.cpu().detach().numpy(), 4))

    loss_total.backward()
    optimizer.step()
    train_l_sum += loss_total.item()
    loss_list_pinn.append(float(loss_dnn.item()))
    loss_final = np.min(loss_list_pinn)

    # model evaluation
    inp_comb_test, test_out = inp_comb_test.to(device=device), test_out.to(device=device)
    pred_out, _ = model_PITN(inp_comb_test, False, False)
    test_loss = pred_out - test_out
    test_loss = torch.mean(torch.square(test_loss))
    # test_loss_list_pinn.append(float(test_loss))
    loss_test.append(float(test_loss))
    
    
    end = time.time()
    # print every 100 epochs
    if epoch % 100 == 0:
        print(
            "epoch: {}, time: {}, loss_dnn:{}, loss_physics: {}, test_loss: {}".format(epoch, round(end - start, 2),
                                                                                       np.round(loss_final, 4),
                                                                                       np.round(
                                                                                           physics_loss.cpu().detach().numpy(),
                                                                                           4), test_loss))

    # if epoch == epochs - 1:
    if (loss_final <= 0.01) | (epoch == epochs - 1):
        # wandb.finish()
        torch.save(model_PITN.state_dict(),
                   "PITN.pth")
        np.save('PITN_test_loss.npy', loss_test)
        np.save('PITN_train_loss.npy', loss_total_epoch)
        print("PITN model training Completed. Epoch %d/%d" % (
            epoch, epochs))
        break

PITN model training started
epoch: 0, time: 0.72, loss_dnn:1.856, loss_physics: 0.04569999873638153, test_loss: 2.9440460205078125
epoch: 100, time: 0.41, loss_dnn:0.1166, loss_physics: 0.18199999630451202, test_loss: 0.6982888579368591
epoch: 200, time: 0.41, loss_dnn:0.0261, loss_physics: 0.11720000207424164, test_loss: 0.690618634223938


### Train DPiKAN model

In [14]:
configs = Configs()
model_DPiKAN = DPiKAN(configs)

# Two lists are initialized to keep track of the training and testing loss during each epoch
loss_list_pinn = []
loss_test = []
test_loss_list_pinn = []
loss_total_epoch = []
loss_physics_epoch = []
loss_dnn_epoch = []
print("DPiKAN model training started")
optimizer = optim.Adam(model_DPiKAN.parameters(), lr=10e-4)

best_loss = float("inf")
# Two lists are initialized to keep track of the training and testing loss during each epoch
epochs = 2000
for epoch in range(epochs):
    start = time.time()
    train_l_sum = 0.0
    optimizer.zero_grad()
    # Traditional out
    model_DPiKAN.to(device=device)
    # train on clean data
    # if inp_comb and train_out are not tensor, then turn them into tensor
    if not torch.is_tensor(inp_comb):
        inp_comb = torch.tensor(inp_comb, dtype=torch.float32)
    if not torch.is_tensor(train_out):
        train_out = torch.tensor(train_out, dtype=torch.float32)
    inp_comb, train_out = inp_comb.to(device=device), train_out.to(device=device)
    # model_dnn_pinn的参数意思是，输入数据，是否计算physics loss，是否使用auxiliary BN
    output = model_DPiKAN(inp_comb, False, False)
    loss_dnn_clean = output - train_out
    loss_dnn_clean = torch.mean(torch.square(loss_dnn_clean))


    loss_dnn = loss_dnn_clean

    # Physics loss
    inp_comb_all = inp_comb_all.to(device=device)
    y, pred_physics = model_DPiKAN(inp_comb_all, True, False)
    physics_loss_ini = pred_physics - y[1:, 0]
    physics_loss = torch.mean(torch.square(physics_loss_ini[ix_all[:-1].view(-1, 1)]))

    # physics_loss = K.mean(K.square(tf.gather_nd(physics_loss_ini, indices=np.array(ix_all[:-1])[:, None])))
    loss_total = loss_dnn + physics_loss 

    loss_total.backward()
    optimizer.step()
    train_l_sum += loss_total.item()
    loss_list_pinn.append(float(loss_dnn.item()))
    loss_final = np.min(loss_list_pinn)

    # model evaluation
    inp_comb_test, test_out = inp_comb_test.to(device=device), test_out.to(device=device)
    pred_out = model_DPiKAN(inp_comb_test, False, False)
    test_loss = pred_out - test_out
    test_loss = torch.mean(torch.square(test_loss))
    # test_loss_list_pinn.append(float(test_loss))
    loss_test.append(float(test_loss))
    
    
    end = time.time()
    # print every 100 epochs
    if epoch % 100 == 0:
        print(
            "epoch: {}, time: {}, loss_dnn:{}, loss_physics: {}, test_loss: {}".format(epoch, round(end - start, 2),
                                                                                       np.round(loss_final, 4),
                                                                                       np.round(
                                                                                           physics_loss.cpu().detach().numpy(),
                                                                                           4), test_loss))


    # if epoch == epochs - 1:
    if (loss_final <= 0.01) | (epoch == epochs - 1):
        # wandb.finish()
        torch.save(model_DPiKAN.state_dict(),
                   "DPiKAN.pth")
        np.save('DPiKAN_test_loss.npy', loss_test)
        np.save('DPiKAN_train_loss.npy', loss_total_epoch)
        print("DPiKAN model training Completed. Epoch %d/%d" % (
            epoch, epochs))
        break

DPiKAN model training started
epoch: 0, time: 0.35, loss_dnn:1.5506, loss_physics: 0.006200000178068876, test_loss: 121.6598129272461
epoch: 100, time: 0.24, loss_dnn:0.7845, loss_physics: 0.1054999977350235, test_loss: 0.8992868065834045
epoch: 200, time: 0.26, loss_dnn:0.4311, loss_physics: 0.1777999997138977, test_loss: 0.8298974633216858
epoch: 300, time: 0.27, loss_dnn:0.2661, loss_physics: 0.20399999618530273, test_loss: 0.7856473922729492
epoch: 400, time: 0.31, loss_dnn:0.1686, loss_physics: 0.22100000083446503, test_loss: 0.7733451128005981
epoch: 500, time: 0.25, loss_dnn:0.1194, loss_physics: 0.1889999955892563, test_loss: 0.7749971747398376
epoch: 600, time: 0.3, loss_dnn:0.0706, loss_physics: 0.1809999942779541, test_loss: 0.7448967099189758
epoch: 700, time: 0.24, loss_dnn:0.0554, loss_physics: 0.16300000250339508, test_loss: 0.7306909561157227
epoch: 800, time: 0.24, loss_dnn:0.0369, loss_physics: 0.14830000698566437, test_loss: 0.7078142762184143
epoch: 900, time: 0.31,

### Evaluate DPiKAN and PITN models

In [31]:
configs = Configs()
model_DPiKAN = DPiKAN(configs)
model_DPiKAN.eval()
model_DPiKAN.to(device=device)
model_DPiKAN.load_state_dict(torch.load("DPiKAN.pth"))

model_PITN = PITN(configs)
model_PITN.eval()
model_PITN.to(device=device)
model_PITN.load_state_dict(torch.load("PITN.pth"))
inp_comb_test = inp_comb_test.to(device)

if torch.is_tensor(test_out):
    test_out = test_out.cpu().numpy()

with torch.no_grad():
    pred_out_PITN, _ = model_PITN(inp_comb_test, False, False)
    pred_out_DPiKAN = model_DPiKAN(inp_comb_test, False, False)

pred_out_PITN = pred_out_PITN.cpu().numpy()
pred_out_DPiKAN = pred_out_DPiKAN.cpu().numpy()

np.save('PITN_pred_out.npy', pred_out_PITN)
np.save('DPiKAN_pred_out.npy', pred_out_DPiKAN)

if not isinstance(test_out, np.ndarray):  # 如果 test_out 不是 numpy 数组
    test_out = test_out.numpy()  

test_out = np.array(test_out).reshape(-1, 1)
corr_PITN = np.corrcoef(np.concatenate(test_out[:]), np.concatenate(pred_out_PITN)[:])[0][1]
rmse_PITN = np.sqrt(np.mean(np.square
                           (np.concatenate(scaler_out.inverse_transform(np.concatenate(test_out)[:][:, None]))-
                            np.concatenate(scaler_out.inverse_transform(np.concatenate(pred_out_PITN)[:][:, None])))))


corr_DPiKAN = np.corrcoef(np.concatenate(test_out)[:], np.concatenate(pred_out_DPiKAN)[:])[0][1]
rmse_DPiKAN = np.sqrt(np.mean(np.square(
    np.concatenate(scaler_out.inverse_transform(np.concatenate(test_out)[:][:, None]))-
    np.concatenate(scaler_out.inverse_transform(np.concatenate(pred_out_DPiKAN)[:][:, None])))))

print('#### PITN Performance ####')
print('Corr: %.2f,  RMSE: %.1f'%(corr_PITN, rmse_PITN))

print('#### DPiKAN Performance ####')
print('Corr: %.2f,  RMSE: %.1f'%(corr_DPiKAN, rmse_DPiKAN))

#### PITN Performance ####
Corr: 0.59,  RMSE: 5.5
#### DPiKAN Performance ####
Corr: 0.62,  RMSE: 5.2


### Plot predictions

In [39]:
pred_out_DPiKAN = np.load('DPiKAN_pred_out.npy')
pred_out_PITN = np.load('PITN_pred_out.npy')
ix_test = ix_test.cpu().numpy() if torch.is_tensor(ix_test) else ix_test
s=figure(width=770,height=400,y_range=(out_min_rescaled-20,out_max_rescaled+20))
s.scatter(ix_test, np.concatenate(scaler_out.inverse_transform(pred_out_DPiKAN)), size=7, line_color=None, color=palettes.Colorblind8[5], legend_label='DPiKAN')
s.scatter(ix_test, np.concatenate(scaler_out.inverse_transform(pred_out_PITN)), size=7, line_color=None, color=palettes.Colorblind8[3], legend_label='PITN')
s.line(list(range(len(df_demo_data))),np.concatenate(scaler_out.inverse_transform(all_out[:,0][:,None])),line_width=3,line_color='black',line_alpha=1,line_dash='dashed',legend_label='True BP')
s.xaxis.axis_label='Beat time (s)'
s.yaxis.axis_label='SBP (mmHg)'

output_notebook() 
figure_settings(s)
show(s)


### Calculate Efficiency

In [38]:
from thop import profile
configs = Configs()
model_DPiKAN = DPiKAN(configs)
model_PITN = PITN(configs)
input_seq = torch.randn(1, 35, dtype=torch.float)
flops_DPiKAN_test, params_DPiKAN_test = profile(model_DPiKAN, inputs=(input_seq, False, False))
print("DPiKAN: flops: {}, params: {}".format(flops_DPiKAN_test, params_DPiKAN_test))

print("-----------------------------------------------------------")

flops_PITN, params_PITN = profile(model_PITN, inputs=(input_seq, False, False,))
print("PITN: flops: {}, params: {}".format(flops_PITN, params_PITN))


[INFO] Register zero_ops() for <class 'torch.nn.modules.container.Sequential'>.
[INFO] Register count_convNd() for <class 'torch.nn.modules.conv.Conv1d'>.
[INFO] Register zero_ops() for <class 'torch.nn.modules.dropout.Dropout'>.
[INFO] Register count_avgpool() for <class 'torch.nn.modules.pooling.AvgPool1d'>.
[INFO] Register count_linear() for <class 'torch.nn.modules.linear.Linear'>.
DPiKAN: flops: 237600.0, params: 8226.0
-----------------------------------------------------------
[INFO] Register count_convNd() for <class 'torch.nn.modules.conv.Conv2d'>.
[INFO] Register zero_ops() for <class 'torch.nn.modules.container.Sequential'>.
[INFO] Register count_convNd() for <class 'torch.nn.modules.conv.Conv1d'>.
[INFO] Register zero_ops() for <class 'torch.nn.modules.dropout.Dropout'>.
[INFO] Register count_normalization() for <class 'torch.nn.modules.normalization.LayerNorm'>.
[INFO] Register count_linear() for <class 'torch.nn.modules.linear.Linear'>.
PITN: flops: 299978819.0, params: 2