In [None]:
import os
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.model_selection import StratifiedKFold
from sklearn.utils import shuffle
from sklearn.metrics import roc_auc_score, roc_curve
import matplotlib.pyplot as plt
import matplotlib.pylab as pylab
import gc

# ---------------------------- Reset Keras Session ---------------------------- #
def reset_keras():
    from tensorflow.keras.backend import clear_session
    clear_session()
    gc.collect()

# ---------------------------- ROC Curve Plotting ---------------------------- #
def roc(y_tests, y_test_scores, save_path=None):
    font = {'family': 'arial',
            'weight': 'bold',
            'size': 20}
    params = {'axes.labelsize': '20',
              'xtick.labelsize': '20',
              'ytick.labelsize': '20',
              'lines.linewidth': '4'}
    pylab.rcParams.update(params)
    pylab.rcParams['font.family'] = 'sans-serif'
    pylab.rcParams['font.sans-serif'] = ['Arial']
    pylab.rcParams['font.weight'] = 'bold'
    plt.figure(figsize=(7, 7), dpi=300)
    AUC = roc_auc_score(y_tests, y_test_scores)
    fpr1, tpr1, thresholds1 = roc_curve(y_tests, y_test_scores)
    plt.plot(fpr1, tpr1, linewidth=3, color='tomato', label='AUC = {:.3f}'.format(AUC))
    plt.plot([0, 1], [0, 1], linewidth=1, color='grey', linestyle="--")
    plt.yticks(np.linspace(0, 1, 6))
    plt.xticks(np.linspace(0, 1, 6))
    plt.xlim((0, 1))
    plt.ylim((0, 1))
    plt.legend(prop={'size': 20}, loc=4, frameon=False)
    plt.subplots_adjust(left=0.2, right=0.95, top=0.95, bottom=0.2)
    plt.xlabel('1–Specificity', font)
    plt.ylabel('Sensitivity', font)
    if save_path:
        plt.savefig(save_path)
    plt.show()

# ---------------------------- Custom Layers ---------------------------- #
class MultiHeadSelfAttention(layers.Layer):
    def __init__(self, embed_dim, num_heads=8):
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        if embed_dim % num_heads != 0:
            raise ValueError(
                f"embedding dimension = {embed_dim} should be divisible by number of heads = {num_heads}"
            )
        self.projection_dim = embed_dim // num_heads
        self.query_dense = layers.Dense(embed_dim)
        self.key_dense = layers.Dense(embed_dim)
        self.value_dense = layers.Dense(embed_dim)
        self.combine_heads = layers.Dense(embed_dim)

    def attention(self, query, key, value):
        score = tf.matmul(query, key, transpose_b=True)
        dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
        scaled_score = score / tf.math.sqrt(dim_key)
        weights = tf.nn.softmax(scaled_score, axis=-1)
        output = tf.matmul(weights, value)
        return output, weights

    def separate_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, inputs):
        # x.shape = [batch_size, seq_len, embedding_dim]
        batch_size = tf.shape(inputs)[0]
        query = self.query_dense(inputs)  # (batch_size, seq_len, embed_dim)
        key = self.key_dense(inputs)      # (batch_size, seq_len, embed_dim)
        value = self.value_dense(inputs)  # (batch_size, seq_len, embed_dim)
        query = self.separate_heads(query, batch_size)  # (batch_size, num_heads, seq_len, projection_dim)
        key = self.separate_heads(key, batch_size)      # (batch_size, num_heads, seq_len, projection_dim)
        value = self.separate_heads(value, batch_size)  # (batch_size, num_heads, seq_len, projection_dim)
        attention, weights = self.attention(query, key, value)
        attention = tf.transpose(attention, perm=[0, 2, 1, 3])  # (batch_size, seq_len, num_heads, projection_dim)
        concat_attention = tf.reshape(attention, (batch_size, -1, self.embed_dim))  # (batch_size, seq_len, embed_dim)
        output = self.combine_heads(concat_attention)  # (batch_size, seq_len, embed_dim)
        return output

class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.005):
        super(TransformerBlock, self).__init__()
        self.att = MultiHeadSelfAttention(embed_dim, num_heads)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim)]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

# ---------------------------- Helper Functions ---------------------------- #
def seq2num(seqlist, maxlen=21):
    out = []
    transdic = {
        'A': 8, 'C': 1, 'D': 2, 'E': 3, 'F': 4, 'G': 5, 'H': 6,
        'I': 7, 'K': 0, 'L': 9, 'M': 10, 'N': 11, 'P': 12,
        'Q': 13, 'R': 14, 'S': 15, 'T': 16, 'V': 17, 'W': 18,
        'Y': 19, '*': 20
    }
    for seq in seqlist:
        seq = seq.replace('U', '*').replace('X', '*')
        vec = [transdic.get(i, 20) for i in seq]  # 使用 get 方法处理未定义的字符
        # 确保序列长度为maxlen
        if len(vec) < maxlen:
            vec += [20] * (maxlen - len(vec))  # 填充
        else:
            vec = vec[:maxlen]  # 截断
        out.append(vec)
    out = np.array(out)
    return out

def turn_to_float64(feature):
    x = np.array(feature, dtype=np.float64)
    y = x.tolist()
    return y

# ---------------------------- Model Building Functions ---------------------------- #
def build_transformer_encoder(maxlen, vocab_size, embed_dim, num_heads, ff_dim, rate=0.1):
    inputs = layers.Input(shape=(maxlen,), name="Sequence_Input")
    embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
    x = embedding_layer(inputs)
    transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim, rate)
    x = transformer_block(x, training=False)  # 设置为推理模式
    x = transformer_block(x, training=False)
    x = layers.GlobalAveragePooling1D()(x)
    x = layers.Dropout(0.05)(x)
    encoder = Model(inputs=inputs, outputs=x, name="Transformer_Encoder")
    return encoder

def build_autoencoder(encoder, vocab_size, maxlen):
    # Decoder部分，用于重建输入序列
    # 使用一个 Dense 层将编码器输出映射到 maxlen * vocab_size
    decoder = layers.Dense(maxlen * vocab_size, activation='softmax')(encoder.output)
    # 重新塑形为 (batch_size, maxlen, vocab_size)
    decoder = layers.Reshape((maxlen, vocab_size))(decoder)
    autoencoder = Model(inputs=encoder.input, outputs=decoder, name="Transformer_Autoencoder")
    return autoencoder

def build_dnn_classifier(input_dim):
    inputs = layers.Input(shape=(input_dim,), name="Combined_Features")
    x = layers.Dense(64, activation='selu')(inputs)
    x = layers.Dense(16, activation='selu')(x)
    x = layers.Dropout(0.1)(x)
    outputs = layers.Dense(2, activation='softmax')(x)
    classifier = Model(inputs=inputs, outputs=outputs, name="DNN_Classifier")
    return classifier

# ---------------------------- Main Execution ---------------------------- #
if __name__ == '__main__':
    '''读取数据并准备'''
    vocab_size = 600  # 词汇表大小，根据需要调整
    maxlen = 21
    embed_dim = 128  # 每个 token 的嵌入维度
    num_heads = 4  # 注意力头的数量
    ff_dim = 64  # Transformer 内部前馈网络的隐藏层大小
    dropout_rate = 0.1  # Dropout比例

    # 1. 准备数据
    trans_or_not = False  # 根据需要设置为 True 或 False
    namelist, data, label = prepare_data1(trans_or_not)
    data = seq2num(data, maxlen=maxlen)

    # 2. 定义 Transformer 编码器
    encoder = build_transformer_encoder(maxlen, vocab_size, embed_dim, num_heads, ff_dim, dropout_rate)
    encoder.summary()

    # 3. 构建自编码器模型
    autoencoder = build_autoencoder(encoder, vocab_size, maxlen)
    autoencoder.summary()

    # 4. 编译自编码器
    autoencoder.compile(
        loss='sparse_categorical_crossentropy',
        optimizer=Adam(learning_rate=0.0001),
        metrics=['accuracy']
    )

    # 5. 准备输入和输出
    # 对于自编码任务，输入和输出相同
    x = data  # (num_samples, maxlen)
    y = data  # (num_samples, maxlen)

    # 确保 y 的数据类型为整数类型
    y = y.astype(np.int32)

    # 6. 训练自编码器
    history = autoencoder.fit(
        x, y,
        batch_size=64,
        epochs=50,
        validation_split=0.1,
        callbacks=[
            EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True),
            ModelCheckpoint('transformer_autoencoder_best.model', save_best_only=True)
        ]
    )

    # 7. 保存Transformer编码器
    encoder.save('transformer_encoder.model')



def prepare_data1(transfer_or_not, maxlen=21):
    if transfer_or_not:
        path = './transfer'
    else:
        path = '.'
    df = pd.read_csv(os.path.join(path, 'Kcr_label.csv'))
    pepname = df['pepname'].values
    label = df['label'].values  # 仅用于分离序列，不用于编码

    def seq_dic(fileplace):
        with open(fileplace, mode='r') as file:
            peptides = file.readlines()
            pepdict = {}
            for peptide in peptides:
                peptide = peptide.rstrip().split('\t')
                pepdict[peptide[0]] = peptide[1]
        return pepdict

    pos_dict = seq_dic(os.path.join(path, 'pos_Kcr.txt'))
    neg_dict = seq_dic(os.path.join(path, 'neg_Kcr.txt'))

    # 合并正负样本到一个字典中，避免模型通过序列来源推断标签
    all_dict = {**pos_dict, **neg_dict}

    pep_seq = []
    for pepID in pepname:
        sequence = all_dict.get(pepID, '')
        if sequence:
            pep_seq.append(sequence)
        else:
            # 处理缺失序列的情况，可以选择跳过或填充
            pep_seq.append('*' * maxlen)  # 示例：用'*'填充，确保序列长度为21

    # 转换为 NumPy 数组并打乱
    data = np.array(pep_seq)
    labels = np.array(label)
    peps = np.array(pepname)
    data, labels, peps = shuffle(data, labels, peps, random_state=42)

    return peps, data, labels

def getfeatures1(namelist, labels, embed_dim=128):
    feature = []
    for i, name in enumerate(namelist):
        if labels[i] == 0:
            fileplace = './results/10features_for_negative_data1'
        else:
            fileplace = './results/10features1'
        transformer_path = os.path.join(fileplace, 'transformer', f"{name}.transformer")
        if os.path.exists(transformer_path):
            with open(transformer_path, mode='r') as file:
                fea = file.read().rstrip().split('\t')
                fea = turn_to_float64(fea)
                feature.append(fea)
        else:
            # 处理缺失文件的情况
            feature.append([0.0] * embed_dim)  # 示例：用0填充
    return np.array(feature)

def store_code1(peplist, codes, labels, transfer_or_not):
    for i, la in enumerate(labels):
        if transfer_or_not:
            storehouse = './transfer'
        else:
            storehouse = './results'
        if la == 0:
            storehouse = os.path.join(storehouse, '10features_for_negative_data1', 'transformer')
        else:
            storehouse = os.path.join(storehouse, '10features1', 'transformer')
        if not os.path.exists(storehouse):
            os.makedirs(storehouse)
        with open(os.path.join(storehouse, f"{peplist[i]}.transformer"), mode='w') as file:
            for co in codes[i]:
                file.write(str(co) + '\t')
            file.write('\n')

# 提取并保存特征 transfer
# encoding transfer-learning data or pre-training data
trans_or_not = False
namelist, data, label = prepare_data1(trans_or_not)
data = seq2num(data, maxlen=maxlen)

# 加载训练好的Transformer编码器
encoder = load_model('transformer_encoder.model')
encoder.summary()

# 提取特征
encoded_features = encoder.predict(x=data, batch_size=128)

# 保存编码后的特征
store_code1(namelist, encoded_features, label, trans_or_not)

# 提取并保存特征为CSV
transformer_feature = getfeatures1(namelist, label, embed_dim=embed_dim)
df_trans = pd.DataFrame(transformer_feature)
df_peps = pd.DataFrame(namelist, columns=['pepname'])
df_labels = pd.DataFrame(label, columns=['label'])
df_transformer = pd.concat([df_peps, df_trans, df_labels], axis=1)
df_transformer.to_csv('./results/transformer_dataset1.csv', index=False)

In [None]:
def getfeatures2(namelist, labels, embed_dim=128):
    feature = []
    for i, name in enumerate(namelist):
        if labels[i] == 0:
            fileplace = './transfer/10features_for_negative_data1'
        else:
            fileplace = './transfer/10features1'
        transformer_path = os.path.join(fileplace, 'transformer', f"{name}.transformer")
        if os.path.exists(transformer_path):
            with open(transformer_path, mode='r') as file:
                fea = file.read().rstrip().split('\t')
                fea = turn_to_float64(fea)
                feature.append(fea)
        else:
            # 处理缺失文件的情况
            feature.append([0.0] * embed_dim)  # 示例：用0填充
    return np.array(feature)

# 提取并保存特征 transfer
# encoding transfer-learning data or pre-training data
trans_or_not = True
namelist, data, label = prepare_data1(trans_or_not)
data = seq2num(data, maxlen=maxlen)

# 加载训练好的Transformer编码器
encoder = load_model('transformer_encoder.model')
encoder.summary()

# 提取特征
encoded_features = encoder.predict(x=data, batch_size=128)

# 保存编码后的特征
store_code1(namelist, encoded_features, label, trans_or_not)

# 提取并保存特征为CSV
transformer_feature = getfeatures2(namelist, label, embed_dim=embed_dim)
df_trans = pd.DataFrame(transformer_feature)
df_peps = pd.DataFrame(namelist, columns=['pepname'])
df_labels = pd.DataFrame(label, columns=['label'])
df_transformer = pd.concat([df_peps, df_trans, df_labels], axis=1)
df_transformer.to_csv('./transfer/transformer_dataset.csv', index=False)

In [None]:
def prepare_data2():
    pep_dict = {}
    pep_seq = []

    with open('./transfer/experiment_sites.txt', mode='r') as file:
        lines = file.readlines()
        for line in lines:
            pepname, seq = line.rstrip().split('\t')
            pep_dict[pepname] = seq
            pep_seq.append(seq)

    return np.array(list(pep_dict.keys())), np.array(pep_seq)

def store_code2(peplist, codes):
    for i, pepname in enumerate(peplist):
        storehouse = './transfer/10features_for_experiment/transformer/'
        if not os.path.exists(storehouse):
            os.makedirs(storehouse)
        with open(storehouse + pepname + r'.transformer', mode='w') as file:
            for co in codes[i]:
                file.write(str(co) + '\t')
            file.write('\n')

def getfeatures3(namelist):
    feature = []
    for i, name in enumerate(namelist):
        fileplace = './transfer/10features_for_experiment'
        transformer_path = os.path.join(fileplace, 'transformer', f"{name}.transformer")
        if os.path.exists(transformer_path):
            with open(transformer_path, mode='r') as file:
                fea = file.read().rstrip().split('\t')
                fea = turn_to_float64(fea)
                feature.append(fea)
        else:
            # 处理缺失文件的情况
            feature.append([0.0] * embed_dim)  # 示例：用0填充
    return np.array(feature)

# encoding experiment data for predicting
namelist, data = prepare_data2()
data = seq2num(data)
# 加载训练好的Transformer编码器
encoder = load_model('transformer_encoder.model')
encoder.summary()
# 提取特征
encoded_features = encoder.predict(x=data, batch_size=128)
# 保存编码后的特征
store_code2(namelist, encoded_features)

# 提取并保存特征为CSV
transformer_feature = getfeatures3(namelist)
df_trans = pd.DataFrame(transformer_feature)
df_peps = pd.DataFrame(namelist, columns=['pepname'])
df_transformer = pd.concat([df_peps, df_trans], axis=1)
df_transformer.to_csv('./transfer/10features_for_experiment/transformer_dataset.csv', index=False)

In [None]:
    # 仅使用Transformer特征进行分类
    combined_features = transformer_feature
    labels = df_transformer['label'].values

    # 定义并编译DNN分类器
    dnn = build_dnn_classifier(combined_features.shape[1])
    dnn.summary()

    dnn.compile(
        loss='binary_crossentropy',
        optimizer=Adam(learning_rate=0.0001),
        metrics=['accuracy']
    )

    # 交叉验证
    count = 0
    sfolder = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
    all_loc_pred = []
    all_loc_label = []

    for train_idx, test_idx in sfolder.split(combined_features, labels):
        count += 1
        x_train, x_test = combined_features[train_idx], combined_features[test_idx]
        y_train, y_test = labels[train_idx], labels[test_idx]

        # 转换标签为 one-hot 编码
        y_train_cat = to_categorical(y_train)
        y_test_cat = to_categorical(y_test)

        # 训练分类器
        print(f'---------------- Training Fold {count} -----------------------')
        dnn.fit(
            x_train, y_train_cat,
            batch_size=64,
            epochs=50,
            validation_data=(x_test, y_test_cat),
            callbacks=[
                EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True),
                ModelCheckpoint(f'dnn_classifier_fold{count}.model', save_best_only=True)
            ]
        )

        # 评估分类器
        print(f'---------------- Testing Fold {count} ------------------------')
        loss, accuracy = dnn.evaluate(x_test, y_test_cat)
        print(f'\n Test Loss: {loss}')
        print(f'\n Test Accuracy: {accuracy}')

        # 保存预测结果
        predictions = dnn.predict(x_test)[:, 1]
        true_labels = y_test_cat[:, 1]
        roc(true_labels, predictions)
        all_loc_pred += predictions.tolist()
        all_loc_label += true_labels.tolist()

    # 绘制整体 ROC 曲线
    roc(all_loc_label, all_loc_pred)

Model: "DNN_Classifier"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 Combined_Features (InputLa  [(None, 128)]             0         
 yer)                                                            
                                                                 
 dense_7 (Dense)             (None, 64)                8256      
                                                                 
 dense_8 (Dense)             (None, 16)                1040      
                                                                 
 dropout_3 (Dropout)         (None, 16)                0         
                                                                 
 dense_9 (Dense)             (None, 2)                 34        
                                                                 
Total params: 9330 (36.45 KB)
Trainable params: 9330 (36.45 KB)
Non-trainable params: 0 (0.00 Byte)
__________________

INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 2/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 3/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 4/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 5/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 6/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 7/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 8/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 9/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 10/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 11/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 12/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 13/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 14/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 15/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 16/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 17/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 18/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 19/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 20/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 21/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 22/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 23/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 24/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 25/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 26/50


INFO:tensorflow:Assets written to: dnn_classifier_fold1.model/assets


Epoch 27/50