In [2]:

from torch.utils.data import TensorDataset, DataLoader

import numpy as np
import pandas as pd


##------------------读取文件----------------------------------------------------
# 定义文件路径
file_path = 'vdjdb.txt'  # 将 'your_file.txt' 替换为你的文件路径

# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as file:
    # 读取文件的第一行，获取所有的信息变量名
    header = file.readline().strip().split('\t')
    tcr_data = [dict(zip(header, line.strip().split('\t'))) for line in file]
print(header)

['complex.id', 'gene', 'cdr3', 'v.segm', 'j.segm', 'species', 'mhc.a', 'mhc.b', 'mhc.class', 'antigen.epitope', 'antigen.gene', 'antigen.species', 'reference.id', 'method', 'meta', 'cdr3fix', 'vdjdb.score', 'web.method', 'web.method.seq', 'web.cdr3fix.nc', 'web.cdr3fix.unmp']


In [3]:
# --------------清洗第一步，提取所需属性-----------------------------------
selected_data = [{'cdr3': entry['cdr3'],
                  'antigen.epitope': entry['antigen.epitope'],
                  'vdjdb.score': entry['vdjdb.score']}
                 for entry in tcr_data]
##------------------------------------------------------------------------


# ---清洗第二步，转化为数据集，并删去重复元素,同时删除可信度低的行-----------------------
df_raw = pd.DataFrame(selected_data)
df_clean = df_raw[df_raw['vdjdb.score'] != '0']
df_clean = df_clean.reset_index(drop=True)

In [4]:
neg_data= df_raw[df_raw['vdjdb.score'] == '0']
neg_data = neg_data.reset_index(drop=True)
pos_data=df_raw[df_raw['vdjdb.score'] != '0']
pos_data = pos_data.reset_index(drop=True)
num_positive_samples = len(pos_data)

# 从阴性样本中随机抽取与阳性样本数量相同的样本
neg_data_sampled = neg_data.sample(n=num_positive_samples, random_state=42)
neg_data_sampled = neg_data_sampled.reset_index(drop=True)
pos_data = pos_data.reset_index(drop=True)
neg_data_sampled['label']=0
pos_data['label']=1

In [5]:
balanced_dataset = pd.concat([neg_data_sampled, pos_data], axis=0)
balanced_dataset = balanced_dataset.reset_index(drop=True)

In [6]:
balanced_dataset

Unnamed: 0,cdr3,antigen.epitope,vdjdb.score,label
0,CAVIGTTDSWGKLQF,KLGGALQAK,0,0
1,CAFMMNYGGSQGNLIF,KLGGALQAK,0,0
2,CASSGAGGEVFF,SYIGSINNI,0,0
3,CAASSLYGQNFVF,LLWNGPMAV,0,0
4,CARPPETQYF,ELAGIGILTV,0,0
...,...,...,...,...
24823,CASSQGSGGNEQFF,FPQPEQPFPWQP,2,1
24824,CAASVLYGSSNTGKLIF,QLQPFPQPELPY,2,1
24825,CASSIVGSGGYNEQFF,QLQPFPQPELPY,2,1
24826,CAPQGATNKLIF,PQQPFPQPEQPFP,2,1


In [7]:
##----------------接下来编码-------------------------------
encoding_map = {'A': [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
                'C': [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
                'D': [0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
                'E': [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
                'F': [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
                'G': [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
                'H': [0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
                'I': [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
                'K': [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
                'L': [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
                'M': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
                'N': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0],
                'P': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0],
                'Q': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
                'R': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
                'S': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
                'T': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0],
                'V': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
                'W': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0],
                'Y': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]}
cdr3_encoded = [[encoding_map[char] for char in sequence] for sequence in balanced_dataset['cdr3']]
antigen_encoded = [[encoding_map[char] for char in sequence] for sequence in balanced_dataset['antigen.epitope']]
##独热码成功编辑，但是矩阵长度不一致
longest_cdr3 = max(balanced_dataset['cdr3'], key=len)
print("最长的cdr3:", longest_cdr3)
print("最长cdr3的长度:", len(longest_cdr3))
longest_antigen_epitope = max(balanced_dataset['antigen.epitope'], key=len)
print("最长的antigen_epitope:", longest_antigen_epitope)
print("最长antigen_epitope的长度:", len(longest_antigen_epitope))


def padding_sequence(origin, sequence_length):
    padded = np.zeros((sequence_length, 20))
    padded[:len(origin)] = origin
    return padded


cdr3_encoded_padded = [padding_sequence(seq, len(longest_cdr3)) for seq in cdr3_encoded]
antigen_encoded_padded = [padding_sequence(seq, len(longest_antigen_epitope)) for seq in antigen_encoded]
cdr3_encoded_padded_flat = [seq.flatten() for seq in cdr3_encoded_padded]
antigen_encoded_padded_flat = [seq.flatten() for seq in antigen_encoded_padded]
balanced_dataset['cdr3_code'] = cdr3_encoded_padded_flat
balanced_dataset['antigen_code'] = antigen_encoded_padded_flat

最长的cdr3: CYSTWRLSCLLLCRDSAGAGSYQLTF
最长cdr3的长度: 26
最长的antigen_epitope: MTEYKLVVVGAVGVGKSALTIQLI
最长antigen_epitope的长度: 24


In [27]:
balanced_dataset['input'] = balanced_dataset.apply(lambda row: list(row['cdr3_code']) + list(row['antigen_code']), axis=1)

In [18]:
import torch
from torch.utils.data import TensorDataset, DataLoader
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np
# 将输入和输出数据转换为 PyTorch 张量
input_tensor = torch.tensor(balanced_dataset['input'], dtype=torch.float32)
input_tensor = input_tensor.unsqueeze(1)
labels_tensor = torch.tensor(balanced_dataset['label'])

# 在输入数据中增加一个批次维度
# 创建 TensorDataset
dataset = TensorDataset(input_tensor, labels_tensor)
print(len(dataset))
# 定义训练集和测试集大小
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])
# 创建 DataLoader
batch_size = 32
num_epochs = 5 

# 创建训练集 DataLoader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True )

# 创建测试集 DataLoader
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

24828


In [25]:
class LSTM(nn.Module):
    def __init__(self, input_dim=1000, hidden_dim=128, num_layers=2, output_dim=1, dropout=0.5):
        super(LSTM, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout)
        
        self.dropout = nn.Dropout(dropout)
        
        self.fc = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        # Initialize hidden state with zeros
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)
        
        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))  # out: tensor of shape (batch_size, seq_length, hidden_size)
        
        # Extract the last time step output
        out = out[:, -1, :]
        
        # Dropout
        out = self.dropout(out)
        
        # Fully connected layer
        out = self.fc(out)
        out = torch.sigmoid(out)
        return out

In [23]:
def accuracy(prediction, labels):
    pred = (prediction > 0.5).long()  # 将大于0.5的预测值设为1，小于等于0.5的设为0
    rights = pred.eq(labels.view_as(pred)).sum().item()  # 计算正确预测的数量
    return rights, len(labels)

In [26]:
##训练网络模型
net = LSTM()
##损失函数
criterion = nn.BCELoss()
##优化器
optimizer = optim.Adam(net.parameters(), lr=0.001)
##开始训练循环
for epoch in range(num_epochs):
    train_rights = []
    for batch_idx, (data, target) in enumerate(train_loader):
        net.train()
        output = net(data)
        loss = criterion(output, target.view(-1, 1).float())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        right = accuracy(output, target.view(-1, 1).float())
        train_rights.append(right)
        if batch_idx % 100 == 0:
            net.eval()
            val_rights = []
            for (data, target) in test_loader:
                output = net(data)
                right = accuracy(output, target.view(-1, 1).float())
                val_rights.append(right)
            # 计算准确率
            train_r = (sum([tup[0] for tup in train_rights]), sum([tup[1] for tup in train_rights]))
            val_r = (sum([tup[0] for tup in val_rights]), sum([tup[1] for tup in val_rights]))
            print("当前epoch:{} [{}/{} ({:.2f}%)]\t损失:{:.6f}\t训练集准确率:{:.2f}%\t测试集正确率:{:.2f}%".format(
                epoch, batch_idx * batch_size, len(train_loader.dataset),
                100 * batch_idx / len(train_loader),
                loss.item(),
                100 * train_r[0] / train_r[1],
                100 * val_r[0] / val_r[1]))

