In [1]:
import numpy as np
import pandas as pd
import warnings
import pickle
from sklearn.preprocessing import StandardScaler
warnings.filterwarnings("ignore", category=np.VisibleDeprecationWarning)

# Install tensorflow if not already installed
# %pip install tensorflow
from tensorflow.keras.regularizers import l2
import tensorflow as tf
import tensorflow.keras.layers as tfl

import tensorflow_addons as tfa
from sklearn.utils import class_weight
from imblearn.over_sampling import SMOTE
from sklearn.decomposition import PCA

with open("features_data/diversity/train_dataset_1.pkl", 'rb') as file:
    positive_set = pickle.load(file)

with open("features_data/diversity/train_dataset_0.pkl", 'rb') as file:
    negative_set_entire = pickle.load(file)
column_names = ['pdb_name','residue','features','label']
# 确保 positive_set 和 negative_set_entire 是 DataFrame
if isinstance(positive_set, dict):
    positive_set = pd.DataFrame.from_dict(positive_set)
if isinstance(negative_set_entire, dict):
    negative_set_entire = pd.DataFrame.from_dict(negative_set_entire)
# randomly pick negative samples to balance it with positve samples (1.5x positive samples)
Negative_Samples = negative_set_entire.sample(n=round(len(positive_set)*15), random_state=42)

# combine positive and negative sets to make the final dataset
Train_set = pd.concat([positive_set, Negative_Samples], ignore_index=True, axis=0)

# collect the features and labels of train set
np.set_printoptions(suppress=True)
X_val = [0]*len(Train_set)
for i in range(len(Train_set)):
    feat = Train_set['features'][i]
    # 提取T5特征和bio特征
    # feat = np.concatenate((feat[:1024],feat[1044:]))
    X_val[i] = feat
X_train_orig = np.asarray(X_val)
y_val = Train_set['label'].to_numpy(dtype=float)
Y_train_orig = y_val.reshape(y_val.size,1)

# Generate a random order of elements with np.random.permutation and simply index into the arrays Feature and label 
idx = np.random.permutation(len(X_train_orig))
X_train,Y_train = X_train_orig[idx], Y_train_orig[idx]
scaler = StandardScaler()
scaler.fit(X_train) # fit on training set only
X_train = scaler.transform(X_train) # apply transform to the training set

# load test data
with open("features_data/diversity/test_dataset.pkl", 'rb') as file:
    Independent_test_set = pickle.load(file)

if isinstance(Independent_test_set, dict):
    Independent_test_set = pd.DataFrame.from_dict(Independent_test_set)
# collect the features and labels for independent set
X_independent = [0]*len(Independent_test_set)
for i in range(len(Independent_test_set)):
    feat1 = Independent_test_set['features'][i]
    # feat1 = Independent_test_set['features'][i]
    # feat1 = np.concatenate((feat1[:1024],feat1[1044:]))
    X_independent[i] = feat1
X_test = np.asarray(X_independent)
y_independent = Independent_test_set['label'].to_numpy(dtype=float)
Y_test = y_independent.reshape(y_independent.size,1)
X_test = scaler.transform(X_test) # apply standardization (transform) to the test set



TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 



In [None]:
import tensorflow as tf
import tensorflow.keras.layers as tfl
from kerastuner import HyperModel
from kerastuner.tuners import BayesianOptimization
from kerastuner import Objective
import keras_tuner

feat_shape = X_train[0].size
# 定义CNN模型，接收超参数
class CNNHyperModel(HyperModel):
    def build(self, hp):
        model = tf.keras.Sequential()
        
        # 第一层卷积层
        model.add(tfl.Conv1D(
            filters=hp.Int('conv1_filters', min_value=32, max_value=128, step=32), 
            kernel_size=hp.Int('conv1_kernel_size', min_value=3, max_value=7, step=2),
            activation='relu',
            input_shape=(feat_shape, 1)
        ))
        model.add(tfl.BatchNormalization())
        model.add(tfl.Dropout(rate=hp.Float('dropout1_rate', min_value=0.2, max_value=0.5, step=0.1)))

        # 第二层卷积层
        model.add(tfl.Conv1D(
            filters=hp.Int('conv2_filters', min_value=64, max_value=256, step=64),
            kernel_size=hp.Int('conv2_kernel_size', min_value=3, max_value=7, step=2),
            activation='relu'
        ))
        model.add(tfl.BatchNormalization())
        model.add(tfl.Dropout(rate=hp.Float('dropout2_rate', min_value=0.2, max_value=0.5, step=0.1)))

        # 第三层卷积层
        model.add(tfl.Conv1D(
            filters=hp.Int('conv3_filters', min_value=32, max_value=128, step=32),
            kernel_size=hp.Int('conv3_kernel_size', min_value=3, max_value=7, step=2),
            activation='relu'
        ))
        model.add(tfl.BatchNormalization())
        model.add(tfl.Dropout(rate=hp.Float('dropout3_rate', min_value=0.2, max_value=0.5, step=0.1)))

         # 新增的第四层卷积层
        model.add(tfl.Conv1D(
            filters=hp.Int('conv4_filters', min_value=32, max_value=128, step=32),
            kernel_size=hp.Int('conv4_kernel_size', min_value=3, max_value=7, step=2),
            activation='relu'
        ))
        model.add(tfl.BatchNormalization())
        model.add(tfl.Dropout(rate=hp.Float('dropout4_rate', min_value=0.2, max_value=0.5, step=0.1)))

        # Flatten层
        model.add(tfl.Flatten())

        # 全连接层
        model.add(tfl.Dense(
            units=hp.Int('dense_units', min_value=64, max_value=256, step=64), 
            activation='relu'
        ))

        # 输出层
        model.add(tfl.Dense(1, activation='sigmoid'))

        # 编译模型
        model.compile(
            optimizer=tf.keras.optimizers.Adam(
                learning_rate=hp.Float('learning_rate', min_value=1e-5, max_value=1e-2, sampling='LOG')
            ),
            loss='binary_crossentropy',
            metrics=['AUC']
        )

        return model


# 选择贝叶斯优化调优器
tuner = BayesianOptimization(
    CNNHyperModel(),
    objective=Objective("val_auc", direction="max"),  # 优化目标
    max_trials=10,  # 最大试验次数
    executions_per_trial=1,  # 每个试验执行一次
    directory='keras_tuner_dir',  # 存储日志的目录
    project_name='cnn_hyperparam_tuning4'  # 项目名称
)

# 调整模型的超参数
tuner.search(
    X_train, Y_train,  # 训练数据
    epochs=10,  # 训练轮数
    validation_data=(X_test, Y_test),  # 验证数据
    batch_size=32  # 批大小
)

# 获取最佳超参数组合
best_hyperparameters = tuner.get_best_hyperparameters(num_trials=1)[0]
print("Best Hyperparameters:", best_hyperparameters.values)

# 使用最佳超参数训练最终模型
best_model = tuner.hypermodel.build(best_hyperparameters)
best_model.fit(X_train,Y_train, epochs=10, validation_data=(X_test, Y_test))


In [None]:
def CNN_Model():
    
    model = tf.keras.Sequential()
    model.add(tfl.Conv1D(32, 3, padding='same', activation='relu', input_shape=(feat_shape,1)))
    model.add(tfl.BatchNormalization())
    model.add(tfl.Dropout(0.2)) # 0.23

    model.add(tfl.Conv1D(128, 3, padding='same',activation='relu'))
    model.add(tfl.BatchNormalization())
    model.add(tfl.Dropout(0.3)) # 0.21

    model.add(tfl.Conv1D(32, 5, padding='same',activation='relu'))
    model.add(tfl.BatchNormalization()) 
    model.add(tfl.Dropout(0.2)) # 0.47

    model.add(tfl.Conv1D(32, 3, padding='same',activation='relu'))
    model.add(tfl.BatchNormalization()) 
    model.add(tfl.Dropout(0.3)) # 0.47

    model.add(tfl.Flatten())
    model.add(tfl.Dense(128, activation='relu'))
    # model.add(tfl.Dropout(0.5))

    model.add(tfl.Dense(32, activation='relu'))
    model.add(tfl.Dense(1, activation='sigmoid'))
    
    return model

#{'conv1_filters': 32, 'conv1_kernel_size': 3, 'dropout1_rate': 0.4, 'conv2_filters': 192, 'conv2_kernel_size': 5, 'dropout2_rate': 0.2, 'conv3_filters': 128, 'conv3_kernel_size': 5, 'dropout3_rate': 0.30000000000000004, 'dense_units': 128, 'learning_rate': 0.00015872369686433261
# {'conv1_filters': 32, 'conv1_kernel_size': 3, 'dropout1_rate': 0.2, 'conv2_filters': 128, 'conv2_kernel_size': 3, 'dropout2_rate': 0.30000000000000004, 'conv3_filters': 32, 'conv3_kernel_size': 5, 'dropout3_rate': 0.2, 'conv4_filters': 32, 'conv4_kernel_size': 3, 'dropout4_rate': 0.30000000000000004, 'dense_units': 128, 'learning_rate': 0.000735323218543868}


feat_shape = X_train[0].size
# print("feat_shape",feat_shape)
cnn_model = CNN_Model()

learning_rate = 0.0001
optimizer = tf.keras.optimizers.Adam(learning_rate = learning_rate)
cnn_model.compile(optimizer=optimizer,
                    loss='binary_crossentropy',
                    metrics=['AUC', 'accuracy', 'Precision', 'Recall'])

cnn_model.summary()

# Train the Model
batch_size = 32 # 32
epochs = 100
# 学习率调度器： ReduceLROnPlateau
lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_auc', 
    factor=0.5, 
    patience=3, 
    min_lr=1e-5, 
    verbose=1)
checkpoint = tf.keras.callbacks.ModelCheckpoint("myModel/multy1/embedding-pssm-bio15.h5", save_best_only=True) # save the best model weights 仅保存验证集上性能最好的最佳模型权重
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_auc', patience=5, restore_best_weights=True,verbose=1) # stop training if the validation AUC does not improve for 3 epochs
history = cnn_model.fit(X_train , Y_train, epochs=epochs, batch_size=batch_size, validation_split=0.2, callbacks=[checkpoint, early_stopping, lr_scheduler])

df_loss_auc = pd.DataFrame(history.history)

# 创建副本并重命名列
df_loss= df_loss_auc[['loss','val_loss']].copy()
df_loss.rename(columns={'loss':'train','val_loss':'validation'},inplace=True)
 
df_auc= df_loss_auc[['auc','val_auc']].copy()
df_auc.rename(columns={'auc':'train','val_auc':'validation'},inplace=True)

# 绘制损失和 AUC 曲线
Model_Loss_plot_title = 'Model Loss'
df_loss.plot(title=Model_Loss_plot_title,figsize=(12,8)).set(xlabel='Epoch',ylabel='Loss')

Model_AUC_plot_title = 'Model AUC'
df_auc.plot(title=Model_AUC_plot_title,grid=True,figsize=(12,8)).set(xlabel='Epoch',ylabel='AUC')

# 绘制accuracy, precision, recall
df_accuracy = pd.DataFrame(history.history)
df_accuracy[['accuracy','val_accuracy']].plot(title='Model Accuracy',grid=True,figsize=(12,8)).set(xlabel='Epoch',ylabel='Accuracy')
df_precision = pd.DataFrame(history.history)
df_precision[['precision','val_precision']].plot(title='Model Precision',grid=True,figsize=(12,8)).set(xlabel='Epoch',ylabel='Precision')
df_recall = pd.DataFrame(history.history)
df_recall[['recall','val_recall']].plot(title='Model Recall',grid=True,figsize=(12,8)).set(xlabel='Epoch',ylabel='Recall')


eval_result = cnn_model.evaluate(X_test, Y_test)
print(f"test loss: {round(eval_result[0],4)}, test auc: {round(eval_result[1],4)}, test accuracy: {round(eval_result[2],4)}, test precision: {round(eval_result[3],4)}, test recall: {round(eval_result[4],4),}")
