In [None]:
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Layer
from tensorflow.keras import Sequential
import tensorflow.keras.layers as nn

from tensorflow import einsum
from einops import rearrange
from einops.layers.tensorflow import Rearrange

import math
from inspect import isfunction

from functools import partial
from tqdm import tqdm

import numpy as np
import os
import cv2
import pathlib
from glob import glob

import time
from tensorflow.python.data.experimental import AUTOTUNE
import tensorflow_addons as tfa
import matplotlib.pyplot as plt
import pandas as pd
import skimage
from skimage.metrics import structural_similarity
import time
from skimage import filters, img_as_ubyte,morphology,measure
from scipy.ndimage import label
import copy
import random

In [None]:
### 模型########
def scaled_dot_product_attention(Q, K, V,
                                 dropout_rate=0.1,
                                 training=True,
                                 scope="scaled_dot_product_attention"):
    '''
    Q: Packed queries. 3d tensor. [N, T_q, d_k].
    K: Packed keys. 3d tensor. [N, T_k, d_k].
    V: Packed values. 3d tensor. [N, T_k, d_v].
    key_masks: A 2d tensor with shape of [N, key_seqlen]
    causality: If True, applies masking for future blinding
    dropout_rate: A floating point number of [0, 1].
    training: boolean for controlling droput
    scope: Optional scope for `variable_scope`.
    '''
    d_k = Q.get_shape().as_list()[-1]

    # dot product
    outputs = tf.matmul(Q, tf.transpose(K, [0, 2, 1]))  # (N, T_q, T_k)

    # scale
    outputs /= d_k ** 0.5

    # softmax
    outputs = tf.nn.softmax(outputs)

    # dropout
    outputs = tf.keras.layers.Dropout(dropout_rate)(outputs)

    # weighted sum (context vectors)
    outputs = tf.matmul(outputs, V)  # (N, T_q, d_v)

    return outputs
    
def multihead_attention(queries, keys, values,
                        num_heads=8, 
                        dropout_rate=0.1,
                        training=True,
                        scope="multihead_attention"):
    '''
    queries: A 3d tensor with shape of [N, T_q, d_model].
    keys: A 3d tensor with shape of [N, T_k, d_model].
    values: A 3d tensor with shape of [N, T_k, d_model].
    key_masks: A 2d tensor with shape of [N, key_seqlen]
    num_heads: An int. Number of heads.
    dropout_rate: A floating point number.
    training: Boolean. Controller of mechanism for dropout.
    causality: Boolean. If true, units that reference the future are masked.
    scope: Optional scope for `variable_scope`.
        
    Returns
      A 3d tensor with shape of (N, T_q, C)  
    '''
    d_model = queries.get_shape().as_list()[-1]
    # Linear projections
    Q = tf.keras.layers.Dense(d_model)(queries) # (N, T_q, d_model) 
    K = tf.keras.layers.Dense(d_model)(keys) # (N, T_k, d_model)
    V = tf.keras.layers.Dense(d_model)(values)# (N, T_v, d_model)
    
    # Split and concat
    Q_ = tf.concat(tf.split(Q, num_heads, axis=2), axis=0) # (h*N, T_q, d_model/h)
    K_ = tf.concat(tf.split(K, num_heads, axis=2), axis=0) # (h*N, T_k, d_model/h)
    V_ = tf.concat(tf.split(V, num_heads, axis=2), axis=0) # (h*N, T_v, d_model/h)

    # Attention
    outputs =  scaled_dot_product_attention(Q_, K_, V_, dropout_rate, training)

    # Restore shape
    outputs = tf.concat(tf.split(outputs, num_heads, axis=0), axis=2 ) # (N, T_q, d_model)

    # Residual connection
    outputs = tf.keras.layers.add([queries,outputs])

    # Normalize
    outputs = tf.keras.layers.LayerNormalization()(outputs)
 
    return outputs

def ff(inputs, num_units, scope="positionwise_feedforward"):
    '''
    inputs: A 3d tensor with shape of [N, T, C].
    num_units: A list of two integers.
    scope: Optional scope for `variable_scope`.
    Returns:
      A 3d tensor with the same shape and dtype as inputs
    '''
    # Inner layer
    outputs = tf.keras.layers.Dense(num_units[0], activation=tf.nn.relu)(inputs) 

    # Outer layer
    outputs = tf.keras.layers.Dense(num_units[1])(outputs)

    # Residual connection
    outputs = tf.keras.layers.concatenate([outputs, inputs],axis=-1)

    # Normalize
    outputs = tf.keras.layers.LayerNormalization()(outputs)
    
    return outputs

class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, max_steps, max_dims, dtype=tf.float32, **kwargs):
        super().__init__(dtype=dtype, **kwargs)
        if max_dims % 2 == 1: max_dims += 1
        p, i = np.meshgrid(np.arange(max_steps), np.arange(max_dims // 2))
        pos_emb = np.empty((1, max_steps, max_dims))
        pos_emb[0, :, ::2] = np.sin(p / 10000 ** (2 * i / max_dims)).T
        pos_emb[0, :, 1::2] = np.cos(p / 10000 ** (2 * i / max_dims)).T
        self.positional_embedding = tf.constant(pos_emb.astype(self.dtype))

    def call(self, inputs):
        shape = tf.shape(inputs)
        return inputs + self.positional_embedding[:, :shape[-2], :shape[-1]]
    
    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'dtype': self.dtype,
        })
        return config

def Fatigue_PINN():
    History_input = tf.keras.layers.Input(shape=(15,2)) #历史数据，15个点
    Predicted_input_x = tf.keras.layers.Input(shape=(1)) #预测点的x坐标
    Class_curve = tf.keras.layers.Input(shape=(2,3)) #判定曲线类型
    #####Path1:deep predciton for value 
    ####Attenation部分
    encoder_in = tf.keras.layers.Dense(64)(History_input) #(None,15,64)
    decoder_in = tf.keras.layers.Dense(64)(History_input) #(None,15,64)
    """
    positional_encoding = PositionalEncoding(max_steps=15, max_dims=64)
    encoder_in = positional_encoding(encoder_in)
    decoder_in = positional_encoding(decoder_in)
    """
    layer1 = multihead_attention(encoder_in,encoder_in,decoder_in)
    layer2 = ff(layer1, [128,128])
    
    layer3 = multihead_attention(encoder_in, decoder_in, decoder_in)
    layer4 = multihead_attention(layer3, layer2, decoder_in)
    
    layer5 = ff(layer4, [128,128])
    layer6 = tf.keras.layers.Dense(64, activation=tf.nn.leaky_relu)(layer5)
    layer6 = tf.keras.layers.Flatten()(layer6)
    Layer7 = tf.keras.layers.Dense(64,activation=tf.nn.leaky_relu)(layer6)
    ####增量预测
    Increase1 = tf.keras.layers.Dense(64, activation=tf.nn.leaky_relu)(Predicted_input_x)
    ####拼接
    Dense1 = tf.keras.layers.Dense(64,activation=tf.nn.leaky_relu)(tf.concat([Layer7,Increase1],-1))
    Dense2 = tf.keras.layers.Dense(64,activation=tf.nn.leaky_relu)(Dense1)
    LayNorm = tf.keras.layers.LayerNormalization()(Dense2)
    Dense3 = tf.keras.layers.Dense(64,activation=tf.nn.leaky_relu)(LayNorm)
    Output1 = tf.keras.layers.Dense(1,activation="sigmoid")(Dense3) ###输出预测的百分比
    
    #####Path2:shallow predciton for range 
    Lstm1 = tf.keras.layers.LSTM(64,activation='tanh',return_sequences=True)(encoder_in)
    Lstm2 = tf.keras.layers.LSTM(64,activation='tanh',return_sequences=False)(Lstm1)
    Onehot1 = tf.keras.layers.Dense(64)(Class_curve)
    Onehot2 = tf.keras.layers.LSTM(64,activation='tanh',return_sequences=False)(Onehot1)
    Dense_s1 = tf.keras.layers.Dense(64,activation=tf.nn.leaky_relu)(tf.concat([Lstm2,Increase1,Onehot2],-1))
    Dense_s2 = tf.keras.layers.Dense(64,activation=tf.nn.leaky_relu)(Dense_s1)
    Output2 = tf.keras.layers.Dense(2,activation='relu')(Dense_s2) ###输出预测的范围上下限
    
    #固化模型 
    model=tf.keras.models.Model(inputs=[History_input,Predicted_input_x,Class_curve],outputs=[Output1,Output2])   
    #model.summary()
    return model

In [None]:
#####训练########
model = Fatigue_PINN()
model.load_weights("./Fatigue_PINN")
History=[]
Batch_size=4
optimizer = tf.keras.optimizers.Adam(1e-5)
for epoch in range(0,100,1):
    count=0
    Average_loss1=0
    Average_loss2=0
    MinEpochs = int(len(X_data)/Batch_size)
    new_list = np.array([i for i in range (len(X_data))])
    np.random.shuffle(new_list)
    for j in range(MinEpochs):
        count+=1
        tem_list = new_list[j*Batch_size:(j+1)*Batch_size]
        train_X_data = X_data[tem_list]
        train_X_data_increase = X_data_increase[tem_list]
        train_X_data_factor = X_data_factor[tem_list]
        train_Y_data = Y_data[tem_list]
        train_Y_data_con = Y_data_con[tem_list]
        with tf.GradientTape() as tape:
            Output1,Output2 = model([train_X_data,train_X_data_increase,train_X_data_factor]) 
            Predicted_Y = Output2[:,0]+(Output2[:,1]-Output2[:,0])*Output1 
            Loss1 = tf.keras.losses.MAE(train_Y_data,Predicted_Y)
            Loss2 = tf.keras.losses.MAE(train_Y_data_con,Output2)
            Loss  = 10 * Loss1 + Loss2
            
            gradients = tape.gradient(Loss, model.trainable_variables)
            optimizer.apply_gradients(zip(gradients, model.trainable_variables))  
            Average_loss1=Average_loss1 + tf.reduce_mean(Loss1)
            Average_loss2=Average_loss2 + tf.reduce_mean(Loss2)
    Average_loss1 = Average_loss1/count
    Average_loss2 = Average_loss2/count
    History.append([epoch, Average_loss1,Average_loss2])
    tf.print("=>Epoch%4d  loss1:%4.6f loss2:%4.6f" %(epoch, Average_loss1,Average_loss2))   

    model.save_weights("./Fatigue_PINN")

In [None]:
###############for test#################
model  = Fatigue_PINN()
model.load_weights("./Fatigue_PINN")
X_data_test = [] 
X_data_increase_test = []
X_data_factor_test = [] 
Y_data_test = []
Y_data_con_test =[]
for i in range (len(degree25_400_5)-15):
    X_data_test.append(np.transpose([np.array(degree25_400_5.iloc[i:i+15,1])/step_max,np.array(degree25_400_5.iloc[i:i+15,0])]))
    X_data_factor_test.append([[0,0,1],[0,0,1]])
    X_data_increase_test.append(degree25_400_5.iloc[i+15,1]/step_max)
    Y_data_test.append(degree25_400_5.iloc[i+15,0])
    Y_data_con_test.append([1-degree25_para[0]*(degree25_400_5.iloc[i+15,1]**degree25_para[1])-0.1,1-degree25_para[0]*(degree25_400_5.iloc[i+15,1]**degree25_para[1])+0.1])

X_data_test = np.array(X_data_test) 
X_data_increase_test = np.array(X_data_increase_test)
X_data_factor_test = np.array(X_data_factor_test) 
Y_data_test = np.array(Y_data_test)
Y_data_con_test = np.array(Y_data_con_test)
    
predicted_results=[]
for i in range(len(X_data_test)):
    Output1,Output2 = model([X_data_test[i:i+1],X_data_increase_test[i:i+1],X_data_factor_test[i:i+1]]) 
    predicted_results.append(Output2[:,0]+(Output2[:,1]-Output2[:,0])*Output1)
    
plt.scatter(np.squeeze(Y_data_test),np.squeeze(predicted_results))
plt.xlim((0, 1))
plt.ylim((0, 1))
plt.grid()
plt.show()

In [None]:
#####绘制曲线######
model  = Fatigue_PINN()
model.load_weights("./Fatigue_PINN")
initial_data_x = np.zeros(len(degree25_400_5))
initial_data_x[0:20] = degree25_400_5.iloc[0:20,0]
initial_data_inc = degree25_400_5.iloc[:,1]/step_max
for i in range(20,len(initial_data_inc)):
    X_data_test = []
    X_data_factor_test =[]
    X_data_increase_test=[]
    predicted_result = []
    for j in range(11):
        tem =np.random.choice(i, 15, replace=False)
        tem.sort()
        X_data_test.append(np.transpose([np.array(degree25_400_5.iloc[tem,1])/step_max,np.array(initial_data_x[tem])]))
        X_data_factor_test.append([[0,0,1],[0,0,1]])
        X_data_increase_test.append(degree25_400_5.iloc[i,1]/step_max)
    X_data_test = np.array(X_data_test)
    X_data_factor_test = np.array(X_data_factor_test)
    X_data_increase_test= np.array(X_data_increase_test)
    Output1,Output2 = model([X_data_test,X_data_increase_test,X_data_factor_test]) 
    predicted_result.append(Output2[:,0]+(Output2[:,1]-Output2[:,0])*Output1
    initial_data_x[i] = np.median(predicted_result)

plt.plot(initial_data_inc*step_max,initial_data_x)
plt.plot(initial_data_inc*step_max,degree25_400_5.iloc[:,0])
plt.show()
Ontest_set = np.concatenate((np.array(initial_data_inc*step_max)[...,np.newaxis],
                             np.array(initial_data_x)[...,np.newaxis],
                             np.array(degree25_400_5.iloc[:,0])[...,np.newaxis]),-1)