In [None]:
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from util import TransformerLayer, TransformerEncoder
from util import set_seed, load_data
from util import train_model, eval_model

In [None]:
import torch
import torch.nn as nn
from transformers import EsmModel, EsmTokenizer
import warnings

# 忽略警告
warnings.filterwarnings("ignore")

class PPIModel(nn.Module):
    def __init__(self, model_name="facebook/esm2_t33_650M_UR50D", hidden_dim=512, dropout=0.1):
        super(PPIModel, self).__init__()
        
        # 加载ESM模型，不添加pooling层
        self.esm = EsmModel.from_pretrained(
            model_name, 
            add_pooling_layer=False  # 关键：不使用pooler层
        )
        self.tokenizer = EsmTokenizer.from_pretrained(model_name)
        
        esm_dim = self.esm.config.hidden_size
        
        # 自定义的交互分类器
        self.classifier = nn.Sequential(
            nn.Linear(esm_dim * 2, hidden_dim),  # 两个蛋白质的表示拼接
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim // 2, 1),  # 二分类输出
            nn.Sigmoid()
        )
        
    def forward(self, seq_a, seq_b):
        # 获取蛋白质A的表示
        emb_a = self._get_protein_embedding(seq_a)
        # 获取蛋白质B的表示  
        emb_b = self._get_protein_embedding(seq_b)
        
        # 拼接两个表示并分类
        combined = torch.cat([emb_a, emb_b], dim=-1)
        output = self.classifier(combined)
        return output.squeeze()
    
    def _get_protein_embedding(self, sequence):
        """提取蛋白质序列的嵌入表示"""
        inputs = self.tokenizer(
            sequence, 
            return_tensors="pt", 
            padding=True, 
            truncation=True,
            max_length=1024
        )
        
        # 将输入移动到模型所在的设备
        inputs = {k: v.to(next(self.esm.parameters()).device) for k, v in inputs.items()}
        
        with torch.no_grad():
            outputs = self.esm(**inputs)
        
        # 使用平均池化获取整个序列的表示
        sequence_embedding = outputs.last_hidden_state.mean(dim=1)
        return sequence_embedding


In [None]:
model = PPIModel()
    
# 示例蛋白质序列
protein_a = "MALWMRLLPLLALLALWGPDPAAAFVNQHLCGSHLVEALYLVCGERGFFYTPKTRREAED"
protein_b = "GIVEQCCTSICSLYQLENYCN"

# 预测互作概率
with torch.no_grad():
    probability = model(protein_a, protein_b)
    print(f"相互作用概率: {probability.item():.4f}")

In [None]:
# 加载中等规模的ESM-2模型
model_name = "facebook/esm2_t33_650M_UR50D"
tokenizer = EsmTokenizer.from_pretrained(model_name)
model = EsmModel.from_pretrained(model_name)

tokenizer_config.json:   0%|          | 0.00/95.0 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


vocab.txt:   0%|          | 0.00/93.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/724 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/2.61G [00:00<?, ?B/s]

Some weights of EsmModel were not initialized from the model checkpoint at facebook/esm2_t33_650M_UR50D and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [3]:
# 准备蛋白质序列
sequence = "MALWMRLLPLLALLALWGPDPAAAFVNQHLCGSHLVEALYLVCGERGFFYTPKTRREAED"

# 编码和推理
inputs = tokenizer(sequence, return_tensors="pt", padding=True, truncation=True)


Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


In [4]:
inputs

{'input_ids': tensor([[ 0, 20,  5,  4, 22, 20, 10,  4,  4, 14,  4,  4,  5,  4,  4,  5,  4, 22,
          6, 14, 13, 14,  5,  5,  5, 18,  7, 17, 16, 21,  4, 23,  6,  8, 21,  4,
          7,  9,  5,  4, 19,  4,  7, 23,  6,  9, 10,  6, 18, 18, 19, 11, 14, 15,
         11, 10, 10,  9,  5,  9, 13,  2]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])}

In [None]:
with torch.no_grad():
    outputs = model(**inputs)
    
print("安装成功！模型输出形状:", outputs.last_hidden_state.shape)

In [None]:
class Featuring(nn.Module):
    def __init__(self, input_dim, feature_dim):
        super().__init__()

        self.feature_dim = feature_dim

        self.conv1 = nn.Conv1d(input_dim, feature_dim, kernel_size=3,
                               padding=1)
        layer1 = TransformerLayer(n_heads=3, d_model=feature_dim,
                                  ff_units=10, dropout=0.2)
        self.encoder1 = TransformerEncoder(layer1, n_layers=2)

        self.conv2 = nn.Conv1d(2 * feature_dim, feature_dim, kernel_size=3,
                               padding=1)
        layer2 = TransformerLayer(n_heads=3, d_model=feature_dim,
                                  ff_units=10, dropout=0.2)
        self.encoder2 = TransformerEncoder(layer2, n_layers=2)

        self.conv3 = nn.Conv1d(2 * feature_dim, feature_dim, kernel_size=3,
                               padding=1)
        layer3 = TransformerLayer(n_heads=3, d_model=feature_dim,
                                  ff_units=10, dropout=0.2)
        self.encoder3 = TransformerEncoder(layer3, n_layers=2)

        self.conv4 = nn.Conv1d(2 * feature_dim, feature_dim, kernel_size=3,
                               padding=1)
        layer4 = TransformerLayer(n_heads=3, d_model=feature_dim,
                                  ff_units=10, dropout=0.2)
        self.encoder4 = TransformerEncoder(layer4, n_layers=2)

        self.conv5 = nn.Conv1d(2 * feature_dim, feature_dim, kernel_size=3,
                               padding=1)

        self.pool = nn.MaxPool1d(kernel_size=3, stride=3)
        # self.adaptive_pool = nn.AdaptiveAvgPool1d(1)

    def forward(self, x):
        x = x.permute(0, 2, 1)  # b, input_dim, 2000

        # First layer
        x = self.conv1(x)  # b, feature_dim, 2000,
        x = self.pool(x)  # b, feature_dim, 666
        x = x.permute(0, 2, 1)  # n, 666, feature_dim
        e = self.encoder1(x)  # b, 666, feature_dim
        x = x.permute(0, 2, 1)  # b, feature_dim, 666
        e = e.permute(0, 2, 1)  # b, feature_dim, 666
        x = torch.cat([e, x], dim=1)  # b, 2*feature_dim, 666

        # Second layer
        x = self.conv2(x)
        x = self.pool(x)
        x = x.permute(0, 2, 1)
        e = self.encoder2(x)
        x = x.permute(0, 2, 1)
        e = e.permute(0, 2, 1)
        x = torch.cat([e, x], dim=1)  # b, 2*feature_dim, 222

        # Third layer
        x = self.conv3(x)
        x = self.pool(x)
        x = x.permute(0, 2, 1)
        e = self.encoder3(x)
        x = x.permute(0, 2, 1)
        e = e.permute(0, 2, 1)
        x = torch.cat([e, x], dim=1)  # b, 2*feature_dim, 77

        # Fourth layer
        x = self.conv4(x)
        x = self.pool(x)
        x = x.permute(0, 2, 1)
        e = self.encoder4(x)
        x = x.permute(0, 2, 1)
        e = e.permute(0, 2, 1)
        x = torch.cat([e, x], dim=1)  # b, 2*feature_dim, 28

        x = self.conv5(x)  # b, feature_dim, 28
        # x = self.adaptive_pool(x) # b, feature_dim, 1
        # x = x.squeeze(-1)  # b, feature_dim
        x = x.permute(0, 2, 1)  # b, 28, feature_dim
        return x


class Classifier(nn.Module):
    def __init__(self, input_dim):
        super().__init__()

        self.fc1 = nn.Linear(input_dim, 2 * input_dim)
        self.bn1 = nn.BatchNorm1d(2 * input_dim)
        self.fc2 = nn.Linear(2 * input_dim, input_dim)
        self.bn2 = nn.BatchNorm1d(input_dim)
        self.fc3 = nn.Linear(input_dim, input_dim // 2)
        self.bn3 = nn.BatchNorm1d(input_dim // 2)
        self.fc4 = nn.Linear(input_dim // 2, input_dim // 4)
        self.bn4 = nn.BatchNorm1d(input_dim // 4)
        self.fc5 = nn.Linear(input_dim // 4, 1)

        self.dropout = nn.Dropout(0.5)
        self.leaky_relu = nn.LeakyReLU(0.3)

    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = self.leaky_relu(x)
        x = self.dropout(x)

        x = self.fc2(x)
        x = self.bn2(x)
        x = self.leaky_relu(x)
        x = self.dropout(x)

        x = self.fc3(x)
        x = self.bn3(x)
        x = self.leaky_relu(x)
        x = self.dropout(x)

        x = self.fc4(x)
        x = self.bn4(x)
        x = self.leaky_relu(x)
        x = self.dropout(x)

        x = self.fc5(x)

        x = x.squeeze(-1)
        x = F.sigmoid(x)

        return x


class InteractionModel(nn.Module):
    def __init__(self, featuring, classifier):
        super().__init__()

        self.featuring = featuring
        self.classifier = classifier
        layer = TransformerLayer(n_heads=3,
                                 d_model=2 * self.featuring.feature_dim,
                                 ff_units=10, dropout=0.5)
        self.encoder = TransformerEncoder(layer, n_layers=2)

    def forward(self, x1, x2):
        x1 = self.featuring(x1)  # b, 28, feature_dim
        x2 = self.featuring(x2)  # b, 28, feature_dim
        x = torch.cat((x1, x2), dim=2)  # b, 28, 2*feature_dim
        x = self.encoder(x)
        x = torch.mean(x, dim=1)  # b, 2*feature_dim
        x = self.classifier(x)
        return x

In [None]:

spe = "yeast"

# data_dir = "ppi-data"
# train_file = os.path.join(data_dir, spe, "action/train_action_20.tsv")
# val_file = os.path.join(data_dir, spe, "action/val_action_10.tsv")
# test_file = os.path.join(data_dir, spe, "action/test_action_10.tsv")
# epochs = 10

from google.colab import drive

drive.mount('/content/drive')
data_dir = "drive/MyDrive/ppi-data"
train_file = os.path.join(data_dir, spe, "action/train_action.tsv")
val_file = os.path.join(data_dir, spe, "action/val_action.tsv")
test_file = os.path.join(data_dir, spe, "action/test_action.tsv")
epochs = 50

embedding_h5 = os.path.join(data_dir, spe, "seq/pipr.embedding.h5")

input_dim = 13
feature_dim = 24
batch_size = 32
lr = 0.0001

set_seed(1234)

device = "cuda" if torch.cuda.is_available() else "cpu"

train_loader = load_data(train_file, batch_size, embedding_h5, train=True)
val_loader = load_data(val_file, batch_size, embedding_h5, train=False)
test_loader = load_data(test_file, batch_size, embedding_h5, train=False)


In [None]:
featuring = Featuring(input_dim, feature_dim)
classifier = Classifier(2 * feature_dim)

model = InteractionModel(featuring, classifier)
model.to(device)

optimizer = optim.Adam(model.parameters(), lr=lr)

In [None]:
train_model(model, train_loader, val_loader, optimizer, epochs, device)

eval_model(model, test_loader, device)