In [2]:
%%html
<style type='text/css'>
.CodeMirror{
font-size: 22px;
</style>

In [3]:
%load_ext autoreload
%autoreload 2

In [4]:
import numpy as np
from Cardiac_electrophysiology import *

In [5]:
# Load data
s = np.load('s_50_1000.npy')
v = np.load('v_50_1000.npy')

print(np.shape(s))
print(np.shape(v))

(100, 3, 51, 51)
(100, 51, 51)


In [6]:
# Build the input tensor
DataModel = CE_DataModel()
x1 = np.linspace(0,1,51)
t = np.linspace(0,10,100)
u_t = np.logical_and(t >= 3, t <= 4)
x_mesh = DataModel.dx(x1,x1)
u_x_mesh = (x_mesh < 0.03)
u = u_t[:,np.newaxis,np.newaxis] * u_x_mesh[np.newaxis,:]

In [7]:
# Separate the dataset
s_train = s[:,:,-22:,-22:]
v_train = v[:,-22:,-22:]

# Build v_xx
dlt_x = x1[1]-x1[0]
v_train_xx = np.zeros(np.shape(v_train))
for i in range(1, np.shape(v_train)[2]-1):
    v_train_xx[:,:,i] = (v_train[:,:,i-1] + v_train[:,:,i+1] - 2*v_train[:,:,i])/dlt_x**2
v_train_xx = v_train_xx[:,1:-1,1:-1]

# Build v_yy
dlt_y = x1[1]-x1[0]
v_train_yy = np.zeros(np.shape(v_train))
for i in range(1, np.shape(v_train)[1]-1):
    v_train_yy[:,i,:] = (v_train[:,i-1,:] + v_train[:,i+1,:] - 2*v_train[:,i,:])/dlt_y**2
v_train_yy = v_train_yy[:,1:-1,1:-1]

# Select v_train, s_train
v_train = v_train[:,1:-1,1:-1]
s_train = s_train[:,:,1:-1,1:-1]

In [8]:
# Build training data
v1 = np.reshape(v_train[1:,:,:],(-1, 1))
v0 = np.reshape(v_train[:-1,:,:],(-1,1))
lacev_x1 = np.reshape(v_train_xx[1:,:,:],(-1,1))
lacev_x0 = np.reshape(v_train_xx[:-1,:,:],(-1,1))
lacev_y1 = np.reshape(v_train_yy[1:,:,:],(-1,1))
lacev_y0 = np.reshape(v_train_yy[:-1,:,:],(-1,1))

lace_train = np.concatenate((lacev_x0,lacev_x1,lacev_y0,lacev_y1),axis=1)

m1 = np.reshape(s_train[1:,0,:,:],(-1,1))
m0 = np.reshape(s_train[:-1,0,:,:],(-1,1))
n1 = np.reshape(s_train[1:,1,:,:],(-1,1))
n0 = np.reshape(s_train[:-1,1,:,:],(-1,1))
h1 = np.reshape(s_train[1:,2,:,:],(-1,1))
h0 = np.reshape(s_train[:-1,2,:,:],(-1,1))

x_train = np.concatenate((v0,m0,n0,h0),axis=1)
y_train = np.concatenate((v1,m1,n1,h1),axis=1)
# y_train = np.zeros((len(v1),4))
# y_train[:,0],y_train[:,1],y_train[:,2],y_train[:,3] = v1,m1,n1,h1
# x_train = np.zeros((len(v0),4))
# x_train[:,0],x_train[:,1],x_train[:,2],x_train[:,3] = v0,m0,n0,h0

u_train = np.reshape(u[:-1,-21:-1,-21:-1],(-1,1))

print(np.shape(x_train))
print(np.shape(lace_train))

(39600, 4)
(39600, 4)


In [9]:
# Linear Regression
from tensorflow.keras.layers import Layer, Dense
from tensorflow.keras.layers import Input, Add, Multiply, Lambda, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers.legacy import Adam
import numpy as np
import tensorflow as tf
tf.keras.backend.set_floatx('float64')

linear_target_dim = 4
output_dim = 1
inputs = Input((linear_target_dim,))
layer_linear = Dense(output_dim, use_bias=False)
outputs = layer_linear(inputs)
model_linear = Model(inputs=inputs, outputs=outputs)
tf.cast(model_linear.trainable_weights[0],tf.float32)-tf.ones((linear_target_dim, output_dim))

2023-02-14 10:00:15.302218: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-02-14 10:00:22.257118: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-02-14 10:00:22.289561: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.


<tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[-0.09606534],
       [-1.159835  ],
       [-1.5355389 ],
       [-1.8860911 ]], dtype=float32)>

In [10]:
# Koopman_DL
from tensorflow.keras.layers import Layer, Dense
from tensorflow.keras.layers import Input, Add, Multiply, Lambda, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers.legacy import Adam
import numpy as np
import tensorflow as tf
tf.keras.backend.set_floatx('float64')

class DicNN(Layer):
    """
    Trainable disctionries
    """
    
    def __init__(self, layer_sizes=[64, 64], n_psi_train=22, **kwargs):
        """_summary_

        Args:
            layer_sizes (list, optional): Number of unit of hidden layer, activation = 'tanh'. Defaults to [64, 64].
            n_psi_train (int, optional): Number of unit of output layer. Defaults to 22.
        """
        super(DicNN, self).__init__(**kwargs)
        self.layer_sizes = layer_sizes
        self.input_layer = Dense(self.layer_sizes[0], name='Dic_input', use_bias=False)
        self.hidden_layers = [Dense(layer_sizes[i], activation='tanh', name='Dic_hidden_%d'%i) for i in range(len(layer_sizes))]        
        self.output_layer = Dense(n_psi_train, name='Dic_output')
        self.n_psi_train = n_psi_train
        
    def call(self, inputs):
        psi_x_train = self.input_layer(inputs)
        for layer in self.hidden_layers:
            psi_x_train = psi_x_train + layer(psi_x_train)
        outputs = self.output_layer(psi_x_train)
        return outputs
    
    def get_config(self):
        config = super(DicNN, self).get_config()
        config.update({
            'layer_sizes': self.layer_sizes,
            'n_psi_train': self.n_psi_train
        })
        return config

class PsiNN(Layer):
    """Concatenate constant, data and trainable dictionaries together as [1, data, DicNN]

    """
    
    def __init__(
        self,
        dic_trainable=DicNN,
        layer_sizes=[64,64],
        n_psi_train=22,
        **kwargs):
        super(PsiNN, self).__init__(**kwargs)
        self.layer_sizes = layer_sizes
        self.dic_trainable = dic_trainable
        self.n_dic_customized = n_psi_train
        self.dicNN = self.dic_trainable(
            layer_sizes=self.layer_sizes,
            n_psi_train=self.n_dic_customized)
    
    def call(self, inputs):
        constant = tf.ones_like(tf.slice(inputs, [0, 0], [-1, 1]))
        psi_x_train = self.dicNN(inputs)
        outputs = Concatenate()([constant, inputs, psi_x_train])
        return outputs
    
    def generate_B(self, inputs):
        target_dim = inputs.shape[-1]
        self.basis_func_number = self.n_dic_customized + target_dim + 1
        # Form B matrix
        self.B = np.zeros((self.basis_func_number, target_dim))
        for i in range(0, target_dim):
            self.B[i + 1][i] = 1
        return self.B

    def get_config(self):
        config = super(PsiNN, self).get_config()
        config.update({
            'dic_trainable': self.dic_trainable,
            'layer_sizes': self.layer_sizes,
            'n_psi_train': self.n_dic_customized
        })
        return config
    
class KNN(Layer):
    def __init__(self, n_K=27, layer_sizes=[32, 32], **kwargs):
        self.layer_sizes = layer_sizes
        self.n_K = n_K
        self.input_layer = Dense(self.layer_sizes[0], name='K_input')
        self.hidden_layers = [Dense(layer_sizes[i], activation='tanh', name='K_hidden_%d'%i) for i in range(len(layer_sizes))]
        self.output_layer = Dense(self.n_K**2, name='K_output')
        
    def call(self, inputs_u):
        K = self.input_layer(inputs_u)
        for layer in self.hidden_layers:
            K = K + layer(K)
#         outputs = K
        outputs = self.output_layer(K)
#         return outputs
        return tf.reshape(outputs,(-1,self.n_K,self.n_K))

In [11]:
target_dim = 4
u_dim = 1

In [12]:
dic = PsiNN()
knn = KNN()

In [13]:
# Build Model
inputs_x = Input((target_dim,))
inputs_y = Input((target_dim,))
inputs_u = Input((u_dim,))
        
model_psi = Model(inputs=inputs_x, outputs=dic.call(inputs_x))
psi_x = model_psi(inputs_x)
psi_y = model_psi(inputs_y)
        
model_k = Model(inputs=inputs_u,outputs=knn.call(inputs_u))
# outputs = tf.matmul(psi_x,model_k(inputs_u)) - psi_y
psi_x = tf.expand_dims(psi_x, 1)
psi_y = tf.expand_dims(psi_y, 1)
outputs = tf.matmul(psi_x,model_k(inputs_u))-psi_y
outputs = tf.reshape(outputs,(tf.shape(psi_x)[0],-1))
model_KoopmanDL = Model(inputs=[inputs_x, inputs_y, inputs_u], outputs=outputs)

In [14]:
# Pre-train
lr = 0.01
opt = tf.keras.optimizers.Adam(learning_rate=lr)
model_KoopmanDL.compile(optimizer=opt, loss='mse')
model_linear.compile(optimizer=opt, loss='mse')

In [15]:
iters = 5
epochs = [2,10]
zeros_data_y_train = tf.zeros_like(dic.call(y_train))
for i in range(iters):
    model_k.trainable = True
    model_psi.trainable = False
#     print(model_KoopmanDL.trainable_weights)
    model_KoopmanDL.fit([x_train, y_train, u_train], zeros_data_y_train, epochs=epochs[0])
    model_k.trainable = False
    model_psi.trainable = True
#     print(model_KoopmanDL.trainable_weights)
    model_KoopmanDL.fit([x_train, y_train, u_train], zeros_data_y_train, epochs=epochs[1])

Epoch 1/2
Epoch 2/2
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/2
Epoch 2/2
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/2
Epoch 2/2
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/2
Epoch 2/2
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/2
Epoch 2/2
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [16]:
# model_k.trainable = False
# model_psi.trainable = False
# model_KoopmanDL.summary()

In [17]:
def predict_v_koopman(model_psi, model_k, x_train, u_train):
    psi_x = model_psi(x_train)
    psi_x = tf.expand_dims(psi_x, 1)
    outputs = tf.matmul(psi_x,model_k(u_train))
    outputs = tf.reshape(outputs,(tf.shape(psi_x)[0],-1))
    return outputs[:,1]

In [18]:
def compute_linear_weight(x,y):
    y = tf.expand_dims(y, 1)
    A = tf.matmul(tf.transpose(x),x)
    b = tf.matmul(tf.transpose(x),y)
    w = tf.matmul(tf.linalg.inv(A),b)
    y_pred = tf.matmul(x,w)
    return w, y_pred

In [19]:
# Hybrid Koopman
model_k.trainable = True
model_psi.trainable = False
err = 1e-8
linear_epochs = 10
koopman_epochs = 20

# Linear Weight
linear_weight = tf.cast(tf.zeros(linear_target_dim),tf.float64)

# Set initial v_koopman
v_koopman = predict_v_koopman(model_psi, model_k, x_train, u_train)
psi_x = model_psi(x_train)
psi_x = tf.expand_dims(psi_x, 1)
outputs = tf.matmul(psi_x,model_k(u_train))
outputs = tf.reshape(outputs,(tf.shape(psi_x)[0],-1))
y_train = y_train-x_train
y_v = np.reshape(y_train[:,0], (1,-1))[0]
y_v = tf.convert_to_tensor(y_v, tf.float64)
print(outputs[:,1])
mu_pred = tf.cast(1e8*tf.ones((linear_target_dim, output_dim)),tf.float64)
# print(tf.shape(tf.reshape(v_linear, (1,-1))[0]))
while (tf.norm(mu_pred - linear_weight)>err):
    mu_pred = linear_weight
    y_linear = y_v-v_koopman
    linear_weight, v_linear = compute_linear_weight(lace_train,y_linear)
    print(linear_weight)
    y_train[:,0] = y_v-tf.reshape(v_linear, (1,-1))[0]
    model_KoopmanDL.fit([y_train, y_train, u_train], zeros_data_y_train, epochs=koopman_epochs)
    v_koopman = predict_v_koopman(model_psi, model_k, x_train, u_train)

tf.Tensor(
[299.63221681 299.63221681 299.63221681 ... 276.38290105 276.37993507
 276.37815033], shape=(39600,), dtype=float64)
tf.Tensor(
[[ 0.23770637]
 [-0.24250945]
 [ 1.12667662]
 [25.58069629]], shape=(4, 1), dtype=float64)
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
tf.Tensor(
[[  1.39365395]
 [ -1.4217908 ]
 [  6.71368008]
 [149.82146295]], shape=(4, 1), dtype=float64)
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
tf.Tensor(
[[  1.40610229]
 [ -1.43449147]
 [  6.75380179]
 [151.17867124]], shape=(4, 1), dtype=float64)
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/

KeyboardInterrupt: 

In [9]:
# # Koopman_DL
# from tensorflow.keras.layers import Layer, Dense
# from tensorflow.keras.layers import Input, Add, Multiply, Lambda, Concatenate
# from tensorflow.keras.models import Model
# from tensorflow.keras.optimizers import Adam
# import numpy as np
# import tensorflow as tf
# tf.keras.backend.set_floatx('float64')

# class DicNN(Layer):
#     """
#     Trainable disctionries
#     """
    
#     def __init__(self, layer_sizes=[64, 64], n_psi_train=22,trainable_para=True, **kwargs):
#         """_summary_

#         Args:
#             layer_sizes (list, optional): Number of unit of hidden layer, activation = 'tanh'. Defaults to [64, 64].
#             n_psi_train (int, optional): Number of unit of output layer. Defaults to 22.
#         """
#         super(DicNN, self).__init__(**kwargs)
#         self.layer_sizes = layer_sizes
#         self.input_layer = Dense(self.layer_sizes[0], name='Dic_input',trainable = trainable_para, use_bias=False)
#         self.hidden_layers = [Dense(layer_sizes[i], activation='tanh', name='Dic_hidden_%d'%i, trainable = trainable_para) for i in len(layer_sizes)]        
#         self.output_layer = Dense(n_psi_train, name='Dic_output',trainable = trainable_para)
#         self.n_psi_train = n_psi_train
        
#     def call(self, inputs):
#         psi_x_train = self.input_layer(inputs)
#         for layer in self.hidden_layers:
#             psi_x_train = psi_x_train + layer(psi_x_train)
#         outputs = self.output_layer(psi_x_train)
#         return outputs
    
#     def get_config(self):
#         config = super(DicNN, self).get_config()
#         config.update({
#             'layer_sizes': self.layer_sizes,
#             'n_psi_train': self.n_psi_train
#         })
#         return config

# class PsiNN(Layer):
#     """Concatenate constant, data and trainable dictionaries together as [1, data, DicNN]

#     """
    
#     def __init__(
#         self,
#         dic_trainable=DicNN,
#         layer_sizes=[64,64],
#         n_psi_train=22,
#         trainable_para=True,
#         **kwargs):
#         super(PsiNN, self).__init__(**kwargs)
#         self.layer_sizes = layer_sizes
#         self.dic_trainable = dic_trainable
#         self.trainable_para = trainable_para
#         self.n_dic_customized = n_psi_train
#         self.dicNN = self.dic_trainable(
#             layer_sizes=self.layer_sizes,
#             n_psi_train=self.n_dic_customized,
#             trainable_para=self.trainable_para)
    
#     def call(self, inputs):
#         constant = tf.ones_like(tf.slice(inputs, [0, 0], [-1, 1]))
#         psi_x_train = self.dicNN(inputs)
#         outputs = Concatenate()([constant, inputs, psi_x_train])
#         return outputs
    
#     def generate_B(self, inputs):
#         target_dim = inputs.shape[-1]
#         self.basis_func_number = self.n_dic_customized + target_dim + 1
#         # Form B matrix
#         self.B = np.zeros((self.basis_func_number, target_dim))
#         for i in range(0, target_dim):
#             self.B[i + 1][i] = 1
#         return self.B

#     def get_config(self):
#         config = super(PsiNN, self).get_config()
#         config.update({
#             'dic_trainable': self.dic_trainable,
#             'layer_sizes': self.layer_sizes,
#             'n_psi_train': self.n_dic_customized
#         })
#         return config
    
# class KNN(Layer):
#     def __init__(self, layer_sizes=[32, 32], n_K, trainable_para = True, **kwargs):
#         self.layer_sizes = layer_sizes
#         self.input_layer = Dense(self.layer_sizes[0], use_bias=False, name='K_input',  trainable=trainable_para)
#         self.hidden_layers = [Dense(layer_sizes[i], activation='tanh', name='K_hidden_%d'%i, trainable = trainable_para) for i in len(layer_sizes)]
#         self.output_layer = Dense(n_K**2, name='K_output', trainable = trainable_para)
#         self.n_K = n_K
        
#     def call(self, inputs_u, inputs_x):
#         K = self.input_layer(inputs_u)
#         for layer in self.hidden_layers:
#             K = K + layer(K)
#         outputs = self.output_layer(K)
#         return inputs_x@np.reshape(outputs,(self.n_K,-1))
    
    
# class KoopmanDLSolver(object):
#     """
#     Koopman model with dictionray
#     """
    
#     def __init__(self, dic=PsiNN, k=KNN, target_dim, u_dim, reg=0.0):
#         """Initializer

#         :param dic: dictionary
#         :type dic: class
#         :param target_dim: dimension of the variable of the equation
#         :type target_dim: int
#         :param reg: the regularization parameter when computing K, defaults to 0.0
#         :type reg: float, optional
#         """
# #         self.dic_train = dic_train # dictionary class
# #         self.dic_func = dic_train.call # dictionary functions
#         self.target_dim = target_dim # x,y dimension
#         self.dic_train = dic(dic_trainable=DicNN,layer_sizes=[64,64],n_psi_train=22,trainable_para=True)
#         self.dic_fix =  dic(dic_trainable=DicNN,layer_sizes=[64,64],n_psi_train=22,trainable_para=False)
#         self.k_train = k(layer_sizes=[32, 32], n_K=1+self.target_dim+22, trainable_para = True)
#         self.k_fix = k(layer_sizes=[32, 32], n_K=1+self.target_dim+22, trainable_para = True)
#         self.u_dim = u_dim # u_dimension
#         self.reg = reg
# #         self.KNN = KNN
# #         self.KNN_func = KNN.call
        
#     def build_model_psi(self):
#         """
#         Build model with trainable dictionary
#         The loss function is ||Psi(y) - K Psi(x)||^2 .
#         """
        
#         inputs_x = Input((self.target_dim,))
#         inputs_y = Input((self.target_dim,))
#         inputs_u = Input((self.u_dim,))
        
#         # model_psi
#         psi_x = self.dic_train.call(inputs_x)
#         psi_y = self.dic_train.call(inputs_y)
        
#         psi_next = self.k_fix.call(inputs_u, psi_x)
        
#         outputs = psi_next - psi_y
#         model_psi = Model(inputs=[inputs_x, inputs_y, inputs_u], outputs=outputs)
#         return model_psi
    
#     def build_model_k(self):
#         inputs_x = Input((self.target_dim,))
#         inputs_y = Input((self.target_dim,))
#         inputs_u = Input((self.u_dim,))
#         # model_k
#         psi_x = self.dic_fix.call(inputs_x)
#         psi_y = self.dic_fix.call(inputs_y)
        
#         psi_next = self.k_train.call(inputs_u, psi_x)
        
#         outputs = psi_next - psi_y
#         model_k = Model(inputs=[inputs_x, inputs_y, inputs_u], outputs=outputs)
        
#         return model_k

# #     def train_psi(self, model, epochs):
# #         """Train the trainable part of the dictionary

# #         :param model: koopman model
# #         :type model: model
# #         :param epochs: the number of training epochs before computing K for each inner training epoch
# #         :type epochs: int
# #         :return: history
# #         :rtype: history callback object
# #         """
# #         history = model.fit(
# #             x=self.data_train,
# #             y=self.zeros_data_y_train,
# #             epochs=epochs,
# #             validation_data=(
# #                 self.data_valid,
# #                 self.zeros_data_y_valid),
# #             batch_size=self.batch_size,
# #             verbose=1)
# #         return history
#     def separate_data(self, data_train):
#         return data_train[0], data_train[1], data_train[2]
        
#     def double_train(self, data_train, model_k, model_psi, epochs, steps, batch_size, lr):
#         self.data_train = data_train
#         self.data_x_train, self.data_y_train, self.data_u_train = self.separate_data(self.data_train)

#         self.zeros_data_y_train = tf.zeros_like(
#             self.dic_func(self.data_y_train))
        
#         self.batch_size = batch_size

#         # Compile the Koopman DL model
#         opt = Adam(lr)
#         self.model_k.compile(optimizer=opt, loss='mse')

#         # Training Loop
#         losses = []
#         for i in range(epochs):
#             # One step for computing K
#             self.K = self.compute_K(self.dic_func,
#                                     self.data_x_train,
#                                     self.data_y_train,
#                                     self.reg)
#             self.model.get_layer('Layer_K').weights[0].assign(self.K)

#             # Two steps for training PsiNN
#             self.history = self.train_psi(self.model, epochs=2)