<a href="https://colab.research.google.com/github/Crispy12138/Graduation_project/blob/main/Encoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

Found GPU at: /device:GPU:0


In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, MultiHeadAttention, LayerNormalization
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.callbacks import EarlyStopping
import matplotlib.pyplot as plt
import numpy as np



# Parameters from Table 1
T10 = 300  # K
F10 = 5    # m^3/h
V1 = 1     # m^3
T1s = 402  # K
CA1s = 1.95  # kmol/m^3
CA10s = 4    # kmol/m^3
Q1s = 0.0  # kJ/hr
k0 = 8.46e6  # m^3/kmol h
Cp = 0.231  # kJ/kg K
rhoL = 1000 # kg/m^3
R = 8.314  # kJ/kmol K
E = 5e4    # kJ/kmol
DeltaH = -1.15e4  # kJ/kmol

T20 = 300  # K
F20 = 5    # m^3/h
V2 = 1     # m^3
T2s = 402  # K
CA2s = 1.95  # kmol/m^3
CA20s = 4    # kmol/m^3
Q2s = 0.0  # kJ/hr


delta_CA0_min , delta_CA0_max=  -3.5, 3.5
delta_Q_min ,delta_Q_max = -5e5, 5e5
delta_CA_min ,delta_CA_max = -1.95, 2
delta_T_min , delta_T_max = -100, 100

# defination of Lyapunov function V(x) = xTPx
P = np.mat([[1060,22],[22,0.52]])
rou = 380.0

# filter points of x within the ellipse
def x_calculator (initial_number,r):
    x_list = []
    conc = np.linspace(delta_CA_min, delta_CA_max, initial_number)
    temp = np.linspace(delta_T_min,  delta_T_max,  initial_number)
    for i in range(conc.size):
        for j in range(temp.size):
            x = np.array([[conc[i]],[temp[j]]])
            Vx = x.T @ P @ x
            if Vx <= r:
                x_list.append(x.flatten())
    final_x = np.array(x_list).T
    return final_x

# disrupt x points -> x points of CSTR 2
def x_transform(x1):
    x2 = x1.copy().T
    np.random.shuffle(x2)
    x2=x2.T
    return x2

# calculate the raw values of input
def XY_calculator(h,time_step,x01,x02,number_of_point):

    delta_CA10 =np.linspace(delta_CA0_min,delta_CA0_max, number_of_point)
    delta_CA20 =x_transform(delta_CA10)

    delta_Q1   =np.linspace(delta_Q_min , delta_Q_max,   number_of_point)
    delta_Q2   =x_transform(delta_Q1)
    t = np.arange(0,(time_step*h),h)

    X = np.zeros((number_of_point,time_step,8))
    Y = np.zeros((number_of_point,time_step,4))

    for i in range(number_of_point):

        CA1 = x01[0][i] + CA1s
        T1 = x01[1][i] + T1s
        CA2 = x02[0][i] + CA2s
        T2 = x02[1][i] + T2s

        CA10 = delta_CA10[i] + CA10s
        Q1 = delta_Q1[i] + Q1s
        CA20 = delta_CA20[i] + CA20s
        Q2 = delta_Q2[i] + Q2s

        for j in range(time_step):
            X[i,j] = [x01[0][i], x01[1][i], x02[0][i], x02[1][i], \
                      delta_CA10[i], delta_Q1[i],delta_CA20[i], delta_Q2[i]]
            Y[i,j] = [CA1-CA1s, T1-T1s , CA2-CA2s, T2-T2s]
            r1 = k0 * np.exp(-E / (R * T1)) * CA1**2
            r2 = k0 * np.exp(-E / (R * T2)) * CA2**2
            CA1+= h * ((F10 / V1) * (CA10 - CA1) - r1)
            T1 += h * ((F10 / V1) * (T10 - T1) + (-DeltaH * r1) / (rhoL * Cp) + Q1 / (rhoL * Cp * V1))
            CA2+= h * ((F20 / V2) * CA20 + (F10 / V2) * CA1 - (F10 + F20) / V2 * CA2 - r2)
            T2 += h * ((F20 / V2) * T20 + (F10 / V2) * T1 - (F10 + F20) / V2 * T2 + (-DeltaH * r2) / (rhoL * Cp) + Q2s / (rhoL * Cp * V2))

    return X,Y


In [None]:
x01 = x_calculator(1000,rou)
x02 = x_transform(x01)

print(x02.shape)
print(x01.shape)
print(x01)
print(x02)

h = 1e-3
time_step=100

number_of_point = np.size(x01,1)
X,Y = XY_calculator(h,time_step,x01,x02,number_of_point)
print(f"shape of X:{np.shape(X)}")
print(f"shape of Y:{np.shape(Y)}")

(2, 183969)
(2, 183969)
[[ -1.71276276  -1.71276276  -1.71276276 ...   1.71136136   1.71136136
    1.71136136]
 [ 71.17117117  71.37137137  71.57157157 ... -71.17117117 -70.97097097
  -70.77077077]]
[[-6.21471471e-01 -1.25625626e-02  2.99799800e-01 ... -1.48343343e+00
  -4.35635636e-01  1.07082082e+00]
 [ 1.07107107e+01 -7.90790791e+00 -1.43143143e+01 ...  7.47747748e+01
   1.03103103e+01 -6.63663664e+01]]
shape of X:(183969, 100, 8)
shape of Y:(183969, 100, 4)


In [None]:
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_percentage_error

scaler_X = preprocessing.StandardScaler().fit(X.reshape(-1, 8))
scaler_y = preprocessing.StandardScaler().fit(Y.reshape(-1, 4))

RNN_input = scaler_X.transform(X.reshape(-1, 8)).reshape(-1,10,8)
RNN_output = scaler_y.transform(Y.reshape(-1, 4)).reshape(-1,10,4)

'''print("RNN_input shape is {}".format(RNN_input.shape))
print("RNN_output shape is {}".format(RNN_output.shape))'''
X_train, X_test, y_train, y_test = train_test_split(RNN_input, RNN_output, test_size=0.3, random_state=123)


print(f"scaler_X_mean= {scaler_X.mean_}")
print(f"scaler_X.var_={scaler_X.var_}")
print(f"scaler_y.mean_={scaler_y.mean_}")
print(f"scaler_y.var_={scaler_y.var_}")


# checking X_train
np.save('X_train.npy',X_train)
np.save('X_test.npy',X_test)
np.save('y_train.npy',y_train)
np.save('y_test.npy',y_test)
np.save('X_mean.npy',scaler_X.mean_)
np.save('X_std.npy',np.sqrt(scaler_X.var_))
np.save('y_mean.npy',scaler_y.mean_)
np.save('y_std.npy',np.sqrt(scaler_y.var_))
X_train = np.load('X_train.npy')
X_test = np.load('X_test.npy')
y_train = np.load('y_train.npy')
y_test = np.load('y_test.npy')

scaler_X_mean= [-9.97469659e-06  1.99417764e-03 -9.97469702e-06  1.99417764e-03
  1.77480227e-17 -1.40483310e-11 -7.76436926e-15 -1.43072114e-10]
scaler_X.var_=[7.35129844e-01 1.49855399e+03 7.35129844e-01 1.49855399e+03
 4.08337773e+00 8.33342393e+10 4.08337773e+00 8.33342393e+10]
scaler_y.mean_=[-0.46500459 23.16266499 -0.0720231   3.59869366]
scaler_y.var_=[7.82186684e-01 9.63372262e+03 5.82574227e-01 1.64912204e+03]


In [None]:
import tensorflow as tf
import numpy as np
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.callbacks import EarlyStopping
import matplotlib.pyplot as plt
import time

class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dim_feedforward, rate=0.1):
        super(EncoderLayer, self).__init__()
        self.attention = tf.keras.layers.MultiHeadAttention(d_model, num_heads)
        self.ffn = tf.keras.Sequential([
            Dense(dim_feedforward, activation='relu'),
            Dense(d_model)
        ])
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def call(self, inputs):
        attn_output = self.attention(inputs, inputs, inputs)
        attn_output = self.dropout1(attn_output)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = out1 + ffn_output
        return self.layernorm2(ffn_output)


def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                            np.arange(d_model)[np.newaxis, :],
                            d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[np.newaxis]
    return tf.cast(pos_encoding, dtype=tf.float32)

def build_transformer_model(input_shape, output_shape, d_model, num_heads, dim_feedforward):
    original_inputs = Input(shape=input_shape)
    position, _ = input_shape
    projection_layer = tf.keras.layers.Dense(units=d_model, activation='linear')
    projected_inputs = projection_layer(original_inputs)
    inputs_with_pe = positional_encoding(position, d_model) + projected_inputs
    transformer_layer = EncoderLayer(d_model, num_heads, dim_feedforward)
    x = transformer_layer(inputs_with_pe)
   # print(f"y in transformer_layer={y}")
    x = Dense(output_shape)(x)
    model = Model(original_inputs, x)
    return model

model = build_transformer_model(
    input_shape=(10, 8),
    output_shape=(4),
    d_model=64,
    num_heads=8,
    dim_feedforward=512
)

initial_learning_rate = 0.0001
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate,
    decay_steps=100000,
    decay_rate=0.96,
    staircase=True)
adam = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

early_stopping = EarlyStopping(monitor='val_loss', patience=50, min_delta=1e-6, restore_best_weights=True)
model.compile(adam,loss=MeanSquaredError())

start_time = time.time()
history = model.fit(
    X_train, y_train,
    epochs=500,
    batch_size=256,
    validation_split=0.1,
    callbacks=early_stopping
)
end_time = time.time()
total_training_time = end_time - start_time
print(f"total training time ={total_training_time}")

plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()
print(model.evaluate(X_test,y_test))
tf.saved_model.save(model, 'my_model_saved2')

Epoch 1/500
Epoch 2/500
Epoch 3/500
Epoch 4/500
Epoch 5/500
Epoch 6/500
Epoch 7/500
Epoch 8/500
Epoch 9/500
Epoch 10/500
Epoch 11/500
Epoch 12/500

KeyboardInterrupt: 

In [None]:
print(model.evaluate(X_test,y_test))
model.load(my_model_saved)

X_train = np.load('X_train.npy')
X_test = np.load('X_test.npy')
y_train = np.load('y_train.npy')
y_test = np.load('y_test.npy')

y_predict = model.predict(X_test)
y_predict_2d = y_predict.reshape(-1, y_predict.shape[-1])
y_predict_inversed = scaler_y.inverse_transform(y_predict_2d)
y_predict = y_predict_inversed.reshape(y_predict.shape)
print(y_predict)


y_test_2d = y_test.reshape(-1, y_test.shape[-1])
y_test_inversed = scaler_y.inverse_transform(y_test_2d)
y_test = y_test_inversed.reshape(y_test.shape)
print(y_test)



X_test_2d = X_test.reshape(-1, X_test.shape[-1])
X_plot_2d = scaler_X.inverse_transform(X_test_2d)
X_plot = X_plot_2d.reshape(X_test.shape)

y = np.linspace(-100, 100, 100000, endpoint=True)

x_upper = list()
x_lower = list()
y_plot = list()

for i in y:
    sqrt = np.sqrt(-2688000 * i**2 + 15772800000)
    if sqrt >= 0:
        y_plot.append(i)
        x_upper.append((-4400 * i + sqrt) / 212000)
        x_lower.append((-4400 * i - sqrt) / 212000)
        pass
    pass


plt.figure(figsize=(10,10))

for i in range(10):
    if i == 0:  # only add label to 1 data point
        plt.plot(X_plot[i, 0, 0], X_plot[i, 0, 1], marker="*", markersize=15, color='blue')
        plt.plot(y_test[i, :, 0], y_test[i, :, 1], color='cyan', lw=2, label='CSTR1_Test')
        plt.plot(y_predict[i, :, 0], y_predict[i, :, 1], color='black', lw=2, ls=':', label='CSTR1_Predicted')
        plt.plot(X_plot[i, 0, 2], X_plot[i, 0, 3], marker="*", markersize=15, color='orange')
        plt.plot(y_test[i, :, 2], y_test[i, :, 3], color='cyan', lw=2, label='CSTR2_Test')
        plt.plot(y_predict[i, :, 2], y_predict[i, :, 3], color='black', lw=2, ls=':', label='CSTR2_Predicted')
    else:
        plt.plot(X_plot[i, 0, 0], X_plot[i, 0, 1], marker="*", markersize=15, color='blue')
        plt.plot(y_test[i, :, 0], y_test[i, :, 1], color='cyan', lw=2)
        plt.plot(y_predict[i, :, 0], y_predict[i, :, 1], color='black', lw=2, ls=':')
        plt.plot(X_plot[i, 0, 2], X_plot[i, 0, 3], marker="*", markersize=15, color='orange')
        plt.plot(y_test[i, :, 2], y_test[i, :, 3], color='cyan', lw=2)
        plt.plot(y_predict[i, :, 2], y_predict[i, :, 3], color='black', lw=2, ls=':')

# plot stability region
plt.plot(x_lower, y_plot, color='steelblue',label='Ωρ')
plt.plot(x_upper, y_plot, color='steelblue')
plt.ylim([-100, 100])
plt.xlim([-2, 2])
plt.ylim([-100, 100])
plt.xlim([-2, 2])

plt.xlabel("C_A - C_As")
plt.ylabel("T - T_s")
plt.legend()
plt.show()
