<a href="https://colab.research.google.com/github/ai-phasepro/LRECA/blob/main/LRECA_test_demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **LRECA test Demo**

## **Preparation**

**Before you start using the demos, please run code cell 0.1 and code cell 0.2 first, which you only need to run once if they run with no error.**

## **Test Data Input**

Then choose to run **code cell 1.1 or code cell 1.2** according to your test data, If you test **a single sequence**, please replace the sequence **‘MSGGG...RSRT’** in the code of code cell 1.1 with your sequence, and then run **code cell 1.1**.

if you test **multiple sequences**, please **upload the txt file** of one sequence per line to colab and **modify the path(pos_test_dir & neg_test_dir)** of the test file to **run code cell 1.2**.

## **Define the model and run the tests**

Then, run **code cell 2** to define the model (which is usually only run once when testing multiple times in a runtime),

Finally, run **code cell 3** to **run the model** and output the test results.

## **output**

For **single** sequences the test results are displayed in the the last line  of the code cell 3.The output of the example is “'y_score': 0.9564854502677917, 'y_pre': 1”, in which ‘y_score’ is the predicted value of the input sequence and ‘y_pre’ is the predicted label (1 represents LLPS positive, and 0 represents LLPS negative).

For test results for **multiple** sequences,

the **predicted results for each sequence** are output in **classification_output/personal_output/personal_test_roc_1.csv **

the **test metrics** are output in
**classification_output/personal_output/result_1.csv**
Note: the output test metrics in result_1.csv are meaningful only when the sequences in the upload files under the ‘pos_test_dir’ and ‘neg_test_dir’ paths have been determined to be LLPS positive and negative, respectively. Otherwise, the result_1.csv has no reference value.


## **help**

### **How to upload a test fileset to colab**

Here are two ways to upload a test dataset file.

1. Click on the **"File" icon** on **the left sidebar**, and then click on "**Upload to session storge**" at the top of the pop-up file sidebar to upload a file.

2. Run **code cell 0.3** and select file upload

See the official [Google Colab instructions](https://colab.research.google.com/notebooks/io.ipynb#scrollTo=7Z2jcRKwUHqV) for more file uploading options

### **When testing multiple sequences using the txt file format, do I have to split the positive and negative data to upload?**

**Not required**, distinguishing between positives and negatives for test set files is a performance metric that facilitates statistical modeling. If you are testing an unlabeled dataset or just want to see the predictions of the sequences, **you can set only one of the paths (pos_test_dir / neg_test_dir) to the path of the file to be tested and set the other to an empty string ("")**, in **which case the performance metrics are meaningless**. In this case, the performance metrics are meaningless. You can see the results for each sequence in the sequence prediction output file (classification_output/personal_output/personal_test_roc_1.csv).



In [None]:
# 0.1 First run the code grid to download the data needed for the demo and unzip it.
!gdown "https://drive.google.com/uc?export=download&id=1AfIylZdDgd7yZYD2seVw9998SpBVM0BW"
!unzip LRECA_testdata.zip

Downloading...
From: https://drive.google.com/uc?export=download&id=1AfIylZdDgd7yZYD2seVw9998SpBVM0BW
To: /content/LRECA_testdata.zip
  0% 0.00/4.92M [00:00<?, ?B/s]100% 4.92M/4.92M [00:00<00:00, 106MB/s]
Archive:  LRECA_testdata.zip
  inflating: model_mydata_1.pt       
  inflating: neg_word_list_1479.txt  
  inflating: neg_word_list_mydata_test.txt  
  inflating: pos_word_list_mydata_all_1507.txt  
  inflating: pos_word_list_mydata_test.txt  
  inflating: requirements.txt        


In [18]:
# 0.2 Configure the runtime environment
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu # if using GPU runtime,please comment out this line
!pip install -r requirements.txt

Looking in indexes: https://download.pytorch.org/whl/cpu


In [None]:
# 0.3 upload file to colab
from google.colab import files
uploaded = files.upload()

for fn in uploaded.keys():
    print('User uploaded file "{name}" with length {length} bytes'.format(name=fn, length=len(uploaded[fn])))

In [None]:
# 1.1 Enter a single sequence to be tested, and select a code cell to run as needed(1.1 or 1.2).
single = 1
single_test_sequence = ['MSGGGVIRGPAGNNDCRIYVGNLPPDIRTKDIEDVFYKYGAIRDIDLKNRRGGPPFAFVEFEDPRDAEDAVYGRDGYDYDGYRLRVEFPRSGRGTGRGGGGGGGGGAPRGRYGPPSRRSENRVVVSGLPPSGSWQDLKDHMREAGDVCYADVYRDGTGVVEFVRKEDMTYAVRKLDNTKFRSHEGETAYIRVKVDGPRSPSYGRSRSRSRSRSRSRSRSNSRSRSYSPRRSRGSPRYSPRHSRSRSRT'] # please enter your protein sequence


pos_test_sequence = [protein.lower() for protein in single_test_sequence]
neg_test_sequence = [protein.lower() for protein in single_test_sequence]

In [None]:
# 1.2 Enter the file address of the txt file containing the test sequences, one sequence per line, please select a code cell to run as needed(1.1 or 1.2).
pos_test_dir = "pos_word_list_mydata_test.txt" # Needs to be modified to your test dataset file
neg_test_dir = ""
single = 0
def readdata_test(pos_protein_dir, neg_protein_dir):
    pos_protein_path = pos_protein_dir
    neg_protein_path = neg_protein_dir
    if pos_protein_path == '':
        pos_word_list = []
    else:
      with open(pos_protein_path, 'r') as f:
          pos_word_list = f.read().splitlines()

    if neg_protein_path == '':
        neg_word_list = []
    else:
      with open(neg_protein_path, 'r') as f:
          neg_word_list = f.read().splitlines()

    pos_sequence = [protein.lower() for protein in pos_word_list]
    neg_sequence = [protein.lower() for protein in neg_word_list]
    return pos_sequence, neg_sequence

pos_test_sequence,neg_test_sequence = readdata_test(pos_test_dir, neg_test_dir)

In [None]:
# 2.Define the model
import os
import torch
from torch.utils.data import dataset, dataloader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
import numpy as np
import torch.nn.functional as F
from torch import LongTensor, Tensor, from_numpy, max_pool1d, nn, unsqueeze,optim
import argparse
from sklearn.model_selection import StratifiedKFold
from sklearn import metrics
from torch.optim.lr_scheduler import ReduceLROnPlateau
import pandas as pd
import copy
import random
def readdata(pos_protein_dir, neg_protein_dir, length, pos_seed, neg_seed):
    pos_protein_path = pos_protein_dir
    neg_protein_path = neg_protein_dir
    with open(pos_protein_path, 'r') as f:
        pos_word_list = f.read().splitlines()
    f.close
    with open(neg_protein_path, 'r') as f:
        neg_word_list = f.read().splitlines()
    f.close

    np.random.seed(pos_seed)
    np.random.shuffle(pos_word_list)
    np.random.seed(neg_seed)
    np.random.shuffle(neg_word_list)
    if length is not None:
        neg_word_list = neg_word_list[:length]
        pos_word_list = pos_word_list[:length]
    pos_sequence = pos_word_list
    neg_sequence = neg_word_list
    return pos_sequence, neg_sequence



def word2Num(train, test, min=0, max=None, max_features=None):
    dic = {}
    count = {}
    for list0 in train:
        list0 = list0.replace(' ', '')
        for word in list0:
            count[word] = count.get(word, 0) + 1
    if min is not None:
        count = {word:value for word,value in count.items() if value>min}
    if max is not None:
        count = {word:value for word,value in count.items() if value<max}
    if  max_features is not None:
        temp = sorted(count.items(), key=lambda x:x[-1], reverse=True)[:max_features]
        count = dict(temp)
    for word in count:
        dic[word] = len(dic) + 1
    print(dic)
    Num = []
    for list0 in train:
        list0 = list0.replace(' ', '')
        num = []
        for word in list0:
            num.append(dic.get(word))
        Num.append(num)
    print(len(Num))
    Num2 = []
    for list0 in test:
        list0 = list0.replace(' ', '')
        num2 = []
        for word in list0:
            num2.append(dic.get(word))
        Num2.append(num2)
    print(len(Num2))
    return Num, Num2, dic


def collate_fn(data):
    data.sort(key=lambda tuple: len(tuple[0]), reverse=True)
    data_length = [len(tuple[0]) for tuple in data]
    data_ten, data_label = [], []
    for tuple in data:
        data_ten.append(tuple[0])
        data_label.append(tuple[1])
    data_ten = pad_sequence(data_ten, batch_first=True,padding_value=0)
    data_label = torch.LongTensor(data_label)
    data_length = torch.LongTensor(data_length)
    return data_ten, data_label, data_length


class Mydata(dataset.Dataset):
    def __init__(self, data, label):
        self.data = data
        self.label = label
    def __getitem__(self, idx):
        protein = self.data[idx]
        label = self.label[idx]
        return protein, label
    def __len__(self):
        assert len(self.data)==len(self.label)
        return len(self.data)


class ECALayer(nn.Module):
    """Constructs a ECA module.
    Args:
        channel: Number of channels of the input feature map
        k_size: Adaptive selection of kernel size
    """
    def __init__(self, k_size=5): # 3 5
        super(ECALayer, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool1d(1)
        self.conv = nn.Conv1d(1, 1, kernel_size=k_size, padding=(k_size - 1) // 2, bias=False)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x, length):
        # feature descriptor on the global spatial information
        b, e, t = x.size()

        for i in range(b):
            x_pack = x[i][: , : length[i]].unsqueeze(0)
            x_avg = self.avg_pool(x_pack)
            if i == 0:
                y = x_avg.clone()
            else:
                y = torch.cat((y, x_avg), dim=0)

        # Two different branches of ECA module
        y = self.conv(y.transpose(-1, -2)).transpose(-1, -2)

        # Multi-scale information fusion
        y = self.sigmoid(y)

        return x * y.expand_as(x)


class RCNN(nn.Module):
    def __init__(self, vocab_size, embedding_num, hidden_dim, num_layers, biFlag, dropout=0.2):
        super(RCNN, self).__init__()
        self.vocab_size = vocab_size
        self.embedding_num = embedding_num
        self.hiddern_dim = hidden_dim
        self.num_layers = num_layers
        if biFlag:
            self.bi_num = 2
        else:
            self.bi_num = 1
        self.biFlag = biFlag
        self.device = torch.device("cuda")
        self.ECABlock= ECALayer()
        self.embedding = nn.Embedding(vocab_size, embedding_num, padding_idx=0)   # 需要添加padding_idx
        self.lstm = nn.LSTM(input_size= embedding_num, hidden_size=hidden_dim, num_layers=num_layers, batch_first=True, bidirectional=biFlag)
        self.globalmaxpool = nn.AdaptiveMaxPool1d(1)
        self.linear = nn.Sequential(
            nn.Dropout(dropout),

            nn.Linear(self.bi_num*hidden_dim + embedding_num, 128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128,32),
            nn.ReLU(),
            nn.Linear(32,2)
        )

    def forward(self, x, length):
        embed = self.embedding(x)
        x = pack_padded_sequence(embed, length.cpu(), batch_first=True)
        x, (ht,ct) = self.lstm(x)
        out, out_len = pad_packed_sequence(x, batch_first=True)
        out = torch.cat((embed, out), 2)
        out = F.relu(out)
        out = out.permute(0, 2, 1)

        out1 = self.ECABlock(out, length)
        out = out + out1

        out = self.globalmaxpool(out).squeeze()
        out = F.relu(out)
        out = self.linear(out)
        return out


def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    np.random.seed(seed)

    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

In [None]:
# 3.running model
if __name__== '__main__':
    device = torch.device("cpu")
    seed = 1
    set_seed(seed)
    pos_protein_dir = 'pos_word_list_mydata_all_1507.txt'
    neg_protein_dir = 'neg_word_list_1479.txt'
    model_path = 'model_mydata_1.pt'
    list_length = 1479

    pos_seed_list = [20]
    neg_seed_list = [21]


    for i in range(len(pos_seed_list)):
        pos_seed = pos_seed_list[i]
        neg_seed = neg_seed_list[i]
        pos_sequence, neg_sequence = readdata(pos_protein_dir, neg_protein_dir, list_length, pos_seed, neg_seed)

        if not os.path.exists("classification_output/personal_output"):
            os.makedirs("classification_output/personal_output")
        # auc_save_csv = 'classification_output/personal_output/personal_test_roc_{}.csv'.format((i+1))
        result_save_csv = 'classification_output/personal_output/result_{}.csv'.format((i+1))
        # df_test = pd.DataFrame(columns=['y_true', 'y_score'])
        # df_test.to_csv(auc_save_csv, mode='w', index=False)
        df_test = pd.DataFrame(columns=['acc', 'sen', 'spe', 'auc'])
        df_test.to_csv(result_save_csv, index=False)

        neg_num = len(neg_test_sequence)
        pos_num = len(pos_test_sequence)
        print('pos_num=',pos_num)
        print('neg_num=',neg_num)

        neg_num = len(neg_sequence)
        pos_num = len(pos_sequence)

        start = 0
        interval = 0.1
        val_split = 0.1

        total_tp = 0
        total_p = 0
        total_n = 0
        total_tn = 0

        save_sen = []
        save_spe = []
        save_acc = []

        fold = 0
        total_correct, total_F1, total_R, total_precision = [],[],[],[]


        test_pos_seq = pos_test_sequence
        test_neg_seq = neg_test_sequence

        train_val_pos_seq = pos_sequence[:int(pos_num*start)] + pos_sequence[int(pos_num*(start+interval)):]
        train_val_neg_seq = neg_sequence[:int(neg_num * start)] + neg_sequence[int(neg_num * (start + interval)):]
        train_val_pos_num = len(train_val_pos_seq)
        train_val_neg_num = len(train_val_neg_seq)
        np.random.shuffle(train_val_pos_seq)
        np.random.shuffle(train_val_neg_seq)
        val_pos_seq = train_val_pos_seq[:int(train_val_pos_num*val_split)]
        train_pos_seq = train_val_pos_seq[int(train_val_pos_num*val_split):]
        val_neg_seq = train_val_neg_seq[:int(train_val_neg_num*val_split)]
        train_neg_seq = train_val_neg_seq[int(train_val_neg_num*val_split):]

        test_y = np.hstack((np.zeros(shape=(len(test_neg_seq), )),
                        np.ones(shape=(len(test_pos_seq), ))))

        print('test_pos', test_y[test_y == 1].shape)
        print('test_neg', test_y[test_y == 0].shape)

        if (len(test_neg_seq) and len(test_pos_seq)):
          flag = 1
        else:
          flag = 0

        train_seq = train_neg_seq + train_pos_seq
        val_seq = val_neg_seq + val_pos_seq
        train_val_seq = train_seq + val_seq
        test_seq = test_neg_seq + test_pos_seq

        state = np.random.get_state()
        np.random.shuffle(test_seq)
        np.random.set_state(state)
        np.random.shuffle(test_y)

        _, test_num, vocab  = word2Num(train_seq, test_seq)
        test_data_size = len(test_num)


        test_ten = []
        for list1 in test_num:
            test_ten.append(torch.LongTensor(list1))

        test_label_ten = from_numpy(test_y)
        test_label_ten = test_label_ten.type(torch.LongTensor)

        state_dict = torch.load(model_path, map_location=torch.device('cpu'))
        rcnn = RCNN(len(vocab)+1, 1024, 100, 1, True)
        rcnn = rcnn.to(device)
        rcnn.load_state_dict(state_dict)
        rcnn.eval()
        print(rcnn)

        test = Mydata(test_ten, test_label_ten)

        set_seed(seed)
        test_dataloader = dataloader.DataLoader(dataset=test, batch_size=32,shuffle=False, collate_fn=collate_fn)

        test_loss = 0
        y_true_test = []
        y_pre_test = []
        y_score_test = []
        total_labels_test = 0
        with torch.no_grad():
            for input, label, length in test_dataloader:
                input = input.to(device)
                label = label.to(device)
                length = length.to(device)

                output = rcnn(input, length)
                _, predicted = torch.max(output,1)
                y_pre_test.extend(predicted.cpu())
                y_true_test.extend(label.cpu())
                y_score_test.extend(torch.softmax(output, dim=-1)[:, 1].cpu().detach())
                total_labels_test += label.size(0)

                test_correct = metrics.accuracy_score(y_true_test, y_pre_test)
                test_F1 = metrics.f1_score(y_true_test, y_pre_test, average='macro')
                test_R = metrics.recall_score(y_true_test, y_pre_test)
                test_precision = metrics.precision_score(y_true_test, y_pre_test)
                if flag and (not single):
                  test_auc = metrics.roc_auc_score(y_true_test, y_score_test)

                  save_content = 'Test: Correct: %.5f, Precision: %.5f, R: %.5f, F1(macro): %.5f, AUC:%.5f, test_loss: %f' % \
                              (test_correct, test_precision, test_R, test_F1, test_auc, test_loss)
                  print(save_content)

            y_true_data = [i.item() for i in y_true_test]
            y_score_data = [i.item() for i in y_score_test]
            y_pre_data = [i.item() for i in y_pre_test]
            if single:
              print({'Seq':test_seq[0], 'y_pre': np.array(y_pre_data)[0], 'y_score':y_score_data[0]})
            else:
              if not os.path.exists("classification_output/personal_output"):
                  os.makedirs("classification_output/personal_output")
              auc_save_csv = 'classification_output/personal_output/personal_test_roc_{}.csv'.format((i+1))
              df_test = pd.DataFrame(columns=['Seq', 'y_true', 'y_score', 'y_pre'])
              df_test.to_csv(auc_save_csv,mode='w', index=False)
              auc_dict = {'Seq':test_seq , 'y_true':y_true_data, 'y_pre': np.array(y_pre_data), 'y_score':y_score_data}
              auc_score = pd.DataFrame(auc_dict)
              auc_score.to_csv(auc_save_csv, mode='a', header=False, index=False, float_format='%.4f')

              p = np.array(y_pre_data)[np.array(y_true_data) == 1]
              tp = p[p == 1]
              n = np.array(y_pre_data)[np.array(y_true_data) == 0]
              tn = n[n == 0]

              sen = tp.shape[0] / p.shape[0] if p.shape[0] > 0 else 1
              spe = tn.shape[0] / n.shape[0] if n.shape[0] > 0 else 1
              acc = (tp.shape[0] + tn.shape[0]) / (p.shape[0] + n.shape[0])
              if flag:
                auc = metrics.roc_auc_score(y_true_data, y_score_data)
              else:
                auc = 0
              print('sen:', sen)
              print('spe:', spe)
              print('acc:', acc)
              print('auc:', auc)

              # list1 = [test_loss, test_correct, test_F1, test_R, test_precision, (tn.shape[0] / n.shape[0] if n.shape[0] > 0 else 1), test_auc]
              test_dict = {'acc':[acc], 'sen':[sen], 'spe':[spe], 'auc':[auc]}
              # list.extend(list1)
              # data_test = pd.DataFrame([list])
              test_score = pd.DataFrame(test_dict)
              print(test_score)
              test_score.to_csv(result_save_csv, mode='a', header=False, index=False, float_format='%.4f')

pos_num= 147
neg_num= 0
test_pos (147,)
test_neg (0,)
{'m': 1, 's': 2, 'w': 3, 'l': 4, 't': 5, 'e': 6, 'd': 7, 'i': 8, 'r': 9, 'g': 10, 'f': 11, 'y': 12, 'k': 13, 'a': 14, 'h': 15, 'p': 16, 'c': 17, 'v': 18, 'n': 19, 'q': 20}
2398
147
RCNN(
  (ECABlock): ECALayer(
    (avg_pool): AdaptiveAvgPool1d(output_size=1)
    (conv): Conv1d(1, 1, kernel_size=(5,), stride=(1,), padding=(2,), bias=False)
    (sigmoid): Sigmoid()
  )
  (embedding): Embedding(21, 1024, padding_idx=0)
  (lstm): LSTM(1024, 100, batch_first=True, bidirectional=True)
  (globalmaxpool): AdaptiveMaxPool1d(output_size=1)
  (linear): Sequential(
    (0): Dropout(p=0.2, inplace=False)
    (1): Linear(in_features=1224, out_features=128, bias=True)
    (2): ReLU()
    (3): Dropout(p=0.2, inplace=False)
    (4): Linear(in_features=128, out_features=32, bias=True)
    (5): ReLU()
    (6): Linear(in_features=32, out_features=2, bias=True)
  )
)
sen: 0.891156462585034
spe: 1
acc: 0.891156462585034
auc: 0
        acc       sen  spe