# 天津大学——蛋白质设计大赛 

## 基于BERT的深度学习方法：荧光蛋白强度的预测与进化

### 简介

本项目旨在通过文献综述和深度学习技术，识别并进化荧光蛋白关键突变位点，并基于BERT架构精确预测荧光强度。在广泛的文献查询识别荧光蛋白中的关键突变位点基础上，结合最新的深度学习技术进行预测与进化。项目通过训练ProteinBERT模型来实现这一目标，将优化后的荧光蛋白强度数据集分为训练集、验证集和测试集，并使用自定义回调函数记录和绘制训练过程中的性能指标，最终使用该模型进行准确预测。该方法不仅提高了预测精度，还为荧光蛋白设计和应用提供了新的思路。



### 环境设置

首先，我们需要导入必要的库和模块。

In [None]:
import os
import pandas as pd
from tensorflow import keras
from sklearn.model_selection import train_test_split
from proteinbert import OutputType, OutputSpec, FinetuningModelGenerator, load_pretrained_model, finetune, InputEncoder
from sklearn.metrics import mean_squared_error, mean_absolute_error
import numpy as np
import matplotlib.pyplot as plt

### 加载数据

我们将数据集从CSV文件中加载，并将其分为训练集、验证集和测试集。

In [None]:
BENCHMARKS_DIR = './data'
DATASET_NAME = 'GFP_data_with_full_sequences'

OUTPUT_TYPE = OutputType(False, 'numeric')
OUTPUT_SPEC = OutputSpec(OUTPUT_TYPE, None)

train_set_file_path = os.path.join(BENCHMARKS_DIR, f'{DATASET_NAME}.train.csv')
train_set = pd.read_csv(train_set_file_path)
train_set, valid_set = train_test_split(train_set, test_size=0.1, random_state=0)

test_set_file_path = os.path.join(BENCHMARKS_DIR, f'{DATASET_NAME}.test.csv')
test_set = pd.read_csv(test_set_file_path)

print(f'{len(train_set)} training set records, {len(valid_set)} validation set records, {len(test_set)} test set records.')

### 加载预训练模型

我们加载预训练的ProteinBERT模型，并为微调创建模型生成器。

In [None]:
pretrained_model_generator, input_encoder = load_pretrained_model()
model_generator = FinetuningModelGenerator(pretrained_model_generator, OUTPUT_SPEC, dropout_rate=0.5)

### 准备验证数据

我们对验证集进行编码，并提取亮度标签。

In [None]:
X_val_encoded = input_encoder.encode_X(valid_set['Full Sequence'].values, seq_len=241)
y_val = valid_set['Brightness'].values

### 自定义回调函数

定义一个自定义回调函数，用于记录每个epoch的均方误差（MSE）和平均绝对误差（MAE），并生成相关图表。

In [None]:
class PerformanceCallback(keras.callbacks.Callback):
    def __init__(self, X_val, y_val, log_dir='./logs'):
        super().__init__()
        self.X_val = X_val
        self.y_val = y_val
        self.log_file = os.path.join(log_dir, 'training_log.txt')
        self.mse_plot_file = os.path.join(log_dir, 'mse_plot.png')
        self.mae_plot_file = os.path.join(log_dir, 'mae_plot.png')
        self.plot_file = os.path.join(log_dir, 'training_plot.png')
        self.mse = []
        self.mae = []
        os.makedirs(log_dir, exist_ok=True)
        self.initialize_log_file()

    def initialize_log_file(self):
        with open(self.log_file, 'w') as f:
            f.write(f'New Training Session\n{"="*20}\n')

    def on_epoch_end(self, epoch, logs=None):
        y_pred = self.model.predict(self.X_val)
        mse = mean_squared_error(self.y_val, y_pred)
        mae = mean_absolute_error(self.y_val, y_pred)
        self.mse.append(mse)
        self.mae.append(mae)
        with open(self.log_file, 'a') as f:
            f.write(f'Epoch {epoch + 1}: MSE = {mse}, MAE = {mae}\n')
        self.plot_mse()
        self.plot_mae()
        self.plot_metrics()

    def plot_mse(self):
        plt.figure()
        plt.plot(range(1, len(self.mse) + 1), self.mse, label='MSE')
        plt.xlabel('Epoch')
        plt.ylabel('MSE')
        plt.title('Mean Squared Error over epochs')
        plt.legend()
        plt.savefig(self.mse_plot_file)
        plt.close()

    def plot_mae(self):
        plt.figure()
        plt.plot(range(1, len(self.mae) + 1), self.mae, label='MAE')
        plt.xlabel('Epoch')
        plt.ylabel('MAE')
        plt.title('Mean Absolute Error over epochs')
        plt.legend()
        plt.savefig(self.mae_plot_file)
        plt.close()

    def plot_metrics(self):
        plt.figure()
        plt.plot(range(1, len(self.mse) + 1), self.mse, label='MSE')
        plt.plot(range(1, len(self.mae) + 1), self.mae, label='MAE')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title('MSE and MAE over epochs')
        plt.legend()
        plt.savefig(self.plot_file)
        plt.close()

### 微调模型

定义训练回调函数，并开始微调模型。

In [None]:
training_callbacks = [
    keras.callbacks.ReduceLROnPlateau(patience=1, factor=0.25, min_lr=1e-05, verbose=1),
    keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True),
    PerformanceCallback(X_val_encoded, y_val)
]

finetuned_model = finetune(
    model_generator,
    input_encoder,
    OUTPUT_SPEC,
    train_set['Full Sequence'],
    train_set['Brightness'],
    valid_set['Full Sequence'],
    valid_set['Brightness'],
    seq_len=241,
    batch_size=128,
    max_epochs_per_stage=75,
    lr=1e-04,
    begin_with_frozen_pretrained_layers=True,
    lr_with_frozen_pretrained_layers=1e-02,
    n_final_epochs=10,
    final_seq_len=241,
    final_lr=1e-05,
    callbacks=training_callbacks
)

### 保存模型

微调完成后，保存最终的模型。

In [None]:
finetuned_model.save('./saved_model')

### 使用训练后的模型进行预测

加载微调后的模型并对新的数据集进行预测。

In [None]:
# 指定数据集的路径和名称
PREDICTION_DIR = './predict/predict_data'
PREDICTION_RESULT_DIR = './predict/predict_result'
DATASET_NAME = 'seq'

# 加载数据集
test_set_file_path = os.path.join(PREDICTION_DIR, f'{DATASET_NAME}.predict.csv')
test_set = pd.read_csv(test_set_file_path)

# 加载预训练模型和输入编码器
pretrained_model_generator, input_encoder = load_pretrained_model()

# 加载模型
model = keras.models.load_model('./saved_model')

# 编码测试集序列
X_test_encoded = input_encoder.encode_X(test_set['Full Sequence'].values, seq_len=241)

# 使用模型进行预测
predictions = model.predict(X_test_encoded, batch_size=32).flatten()

# 输出预测结果
for i, prediction in enumerate(predictions):
    print(f'序列 {i+1} 的预测发光强度: {prediction}')

# 创建一个DataFrame来保存序列和预测的发光强度
results_df = pd.DataFrame({
    'Full Sequence': test_set['Full Sequence'].values,
    'Predicted Brightness': predictions
})

# 将结果保存到CSV文件
results_df.to_csv(os.path.join(PREDICTION_RESULT_DIR, f'{DATASET_NAME}.predictions.csv'), index=False)

print(f'预测结果已保存到 {os.path.join(PREDICTION_RESULT_DIR, DATASET_NAME + "predictions.csv")}')
