In [None]:
import pandas as pd
import numpy as np
import torch
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import math
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"


In [None]:
# 构建Transformer模型
class Transformer(nn.Module):
    def __init__(self, input_dim, output_dim, hidden_dim, num_layers, num_heads):
        super(Transformer, self).__init__()
        self.input_embedding = nn.Linear(input_dim, hidden_dim)
        self.positional_encoding = PositionalEncoding(hidden_dim)
        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(hidden_dim, num_heads),
            num_layers
        )
        self.output_layer = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        x = self.input_embedding(x)
        x = self.positional_encoding(x)
        x = self.transformer_encoder(x)
        # 只取序列的最后一个时间步的输出
        x = x[-1, :, :]  # 取最后一个时间步
        x = self.output_layer(x)
        return x



class PositionalEncoding(nn.Module):

    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

data = pd.read_csv("D:/Lily/NO.1/NEWdata/LHS/LHS_20000.csv").iloc[-3000:, :]

def split_and_encode(peptide_sequence):
    encoded_sequence = []
    for char in peptide_sequence:
        if char.isalpha():
            encoded_sequence.append(ord(char))
        else:
            encoded_sequence.append(ord(char))
    return encoded_sequence

data['encoded_pep'] = data['pep'].apply(lambda x: split_and_encode(x))
X=pd.DataFrame(data['encoded_pep'].tolist())

target=data[['logP','AP','PI']]
encoding_dict = {}
for index, row in data.iterrows():
    original_sequence = row['pep']
    encoded_sequence = row['encoded_pep']
    for original_char, encoded_char in zip(original_sequence, encoded_sequence):
        if original_char not in encoding_dict:
            encoding_dict[original_char] = encoded_char

print("编码对应字典：")
for key, value in encoding_dict.items():
    print(f"{key}: {value}")
# 准备训练数据
X=np.array(X)
y=np.array(target)
scaler_X=MinMaxScaler()
scaler_X.fit(X)
sc_X=scaler_X.transform(X)

scaler_y=MinMaxScaler()
scaler_y.fit(y)
sc_y=scaler_y.transform(y)
length=sc_y.shape[0]
train_length=int(0.8*length)
val_length=int(0.9*length)
x_train=sc_X[:train_length,:]
y_train=sc_y[:train_length,:]
x_test=sc_X[val_length:,:]
y_test=sc_y[val_length:,:]
x_val=sc_X[train_length:val_length,:]
y_val=sc_y[train_length:val_length,:]
# x_trainval, x_test,y_trainval, y_test=train_test_split(sc_X, sc_y, test_size=0.2, random_state=2024)
# x_train, x_val,y_train, y_val=train_test_split(x_trainval, y_trainval, test_size=0.2, random_state=2024)
x_train = torch.tensor(x_train, dtype=torch.float32)
x_test = torch.tensor(x_test, dtype=torch.float32)
x_val = torch.tensor(x_val, dtype=torch.float32)
y_train= torch.tensor(y_train, dtype=torch.float32)
y_test= torch.tensor(y_test, dtype=torch.float32)
y_val= torch.tensor(y_val, dtype=torch.float32)
# input_data = torch.tensor(np.array(X), dtype=torch.float32)
# target_data = torch.tensor(np.array(target), dtype=torch.float32)
train_dataset = TensorDataset(x_train, y_train)
train_dataloader = DataLoader(train_dataset, batch_size=1)

# 定义模型参数
input_dim = x_train.size(1)
output_dim = y_train.size(1)
hidden_dim = 128
num_layers = 2
num_heads = 4

# 创建模型和优化器
model = Transformer(input_dim, output_dim, hidden_dim, num_layers, num_heads)
optimizer = optim.Adam(model.parameters(), lr=0.0001)
criterion = nn.MSELoss()
best_model_path = 'best_model.pth'

In [None]:
losses,val_losses=[],[]
# 初始化用于保存最佳模型的变量
best_val_loss = float('inf')
# 模型训练与验证
num_epochs = 30
for epoch in range(num_epochs):
    model.train()
    for batch_input, batch_target in train_dataloader:
        optimizer.zero_grad()
        output = model(batch_input)
        #print("====", output.shape)
        loss = criterion(output, batch_target)
        loss.backward()
        optimizer.step()
    
    # 计算验证集损失
    model.eval()
    with torch.no_grad():
        val_output = model(x_val)
        val_loss = criterion(val_output, y_val).item()
        

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Val Loss: {val_loss:.4f}")
    losses.append(float(loss))
    val_losses.append(float(val_loss))
    # 保存验证损失最小的模型
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), best_model_path)
        print(f"Best model saved at epoch {epoch+1} with val loss {val_loss:.4f}")

In [None]:
plt.figure(figsize=(12,4), dpi=200)
plt.plot(range(num_epochs), losses, label='loss')
plt.plot(range(num_epochs), val_losses, label='val_loss')
plt.grid()
plt.legend()
plt.show()

In [None]:
# 加载最佳模型并进行测试集预测
best_model = Transformer(input_dim, output_dim, hidden_dim, num_layers, num_heads)
best_model.load_state_dict(torch.load(best_model_path))

# 使用最佳模型预测测试集
best_model.eval()
with torch.no_grad():
    test_output = best_model(x_test)
    test_loss = criterion(test_output, y_test).item()
    print(f"Test Loss: {test_loss:.4f}")

In [None]:
import matplotlib.pyplot as plt
inv_test_y=scaler_y.inverse_transform(y_test)
inv_train_y=scaler_y.inverse_transform(y_train)
inv_pred_test_y=scaler_y.inverse_transform(test_output)
plt.figure(figsize=(12,4), dpi=200)
for i in range(y_test.shape[1]):
    plt.subplot(1,3,i+1)
    plt.scatter(range(len(inv_test_y[:,i])),inv_test_y[:,i],label='true_'+target.columns[i],c='red')
    plt.plot(range(len(inv_test_y[:,i])),inv_pred_test_y[:,i],label='pred_'+target.columns[i],c='green')
    plt.grid()
    plt.legend()
plt.show()

In [None]:
from sklearn.metrics import mean_squared_error,mean_absolute_error, r2_score,explained_variance_score
metrics=[]
for i in range(y_test.shape[1]):
    print("===="*10)
    mse=mean_squared_error(inv_test_y[:,i],inv_pred_test_y[:,i])
    mae=mean_absolute_error(inv_test_y[:,i],inv_pred_test_y[:,i])
    r2=r2_score(inv_test_y[:,i],inv_pred_test_y[:,i])
    ev=explained_variance_score(inv_test_y[:,i],inv_pred_test_y[:,i])
    print(target.columns[i]," MSE:",mse)
    print(target.columns[i]," MAE:",mae)
    print(target.columns[i]," R2:",r2)
    print(target.columns[i]," EV:",ev)
    metrics.append([target.columns[i], mse, mae, r2,ev])
m_df=pd.DataFrame(metrics)
m_df.columns=['target','mse','mae','r2','ev']
m_df.to_csv("metrics.csv")
m_df

In [None]:
# plt.figure(figsize=(12,4), dpi=200)
# for i in range(y_test.shape[1]):
#     plt.subplot(1,3,i+1)
#     # inv_test_y[:,i],inv_pred_test_y[:,i]
#     # plt.scatter(inv_test_y[:,i],inv_test_y[:,i])
#     plt.scatter(X[:,0],inv_train_y[:,i])
#     plt.title(target.columns[i]+" Cruve Fit")
# plt.show()

In [None]:
# Plot setup
fig, ax = plt.subplots(figsize=(12,4), dpi=200)
bar_width = 0.2
index = range(len(m_df['target']))

# Plotting each metric with labels
for i, metric in enumerate(['mse', 'mae', 'r2','ev']):
    bars = ax.bar(
        [x + i * bar_width for x in index], 
        m_df[metric], 
        bar_width, 
        label=metric
    )
    # Adding data labels
    for bar in bars:
        yval = bar.get_height()
        ax.text(
            bar.get_x() + bar.get_width() / 2, 
            yval, 
            f'{yval:.4f}', 
            ha='center', 
            va='bottom', 
            fontsize=10
        )

# Customizing plot
ax.set_xlabel('Target')
ax.set_ylabel('Values')
ax.set_title('Metric Comparison for Targets')
ax.set_xticks([x + bar_width for x in index])
ax.set_xticklabels(m_df['target'])
ax.legend()

plt.tight_layout()
plt.grid()
plt.show()

In [None]:
r1=pd.concat([pd.DataFrame(inv_pred_test_y),pd.DataFrame(inv_test_y)], axis=1)
r1.columns=['logP','AP','PI','pred_logP','pred_AP','pred_PI']
r2=r1[['logP','pred_logP','AP','pred_AP','PI','pred_PI']]
r2.to_csv("predict_result.csv",index=False)
r2