In [13]:
import os
import re
import random
from collections import defaultdict
os.chdir("/home/ys/ExplainablePPI")
from experiments.utils import Protein, organisms

random.seed(100)

In [16]:
def get_proteins(seq_dir, pair_dir, k, min_pos_neighbor=3):
    """读取氨基酸、序列，以及互作的氨基酸，并按照物种划分好"""
    proteins = defaultdict(dict)
    for orga in organisms:
        # 序列文件
        orga_fasta_fp = os.path.join(seq_dir, orga+"_test.fasta")
        lines = open(orga_fasta_fp, "r").readlines()
        pro_seq_dict = dict([l.strip().split("\t") for l in lines])

        # pair 文件, 记录互作的蛋白质
        with open(os.path.join(pair_dir, orga + "_test.tsv"), "r") as f:
            pairs = [line.strip().split("\t") for line in f.readlines()]
        pro_nei = defaultdict(set)
        for (fst, sec, label) in pairs:
            if int(label) == 1:
                pro_nei[fst].add(sec)
                pro_nei[sec].add(fst)
    
        # 存在交互且交互的蛋白注数量 > 10
        interacted_pairs = [p for p in pairs if int(p[2]) == 1]
        candiadates = [(p[0], p[1]) for p in interacted_pairs]
        fsts, secs = zip(*candiadates)
        candiadates = list(set(fsts + secs))
        candiadates = [c for c in candiadates if len(pro_nei[c]) > min_pos_neighbor]

        selected_proteins = random.sample(candiadates, k=min(len(candiadates), k))

        for pro in selected_proteins:
            # 过滤掉包含非标准氨基酸的蛋白质序列
            seq = pro_seq_dict[pro]
            seq = re.sub(r"[UZOB]", "X", seq)
            proteins[orga][pro] = Protein(name=pro, seq=seq)

        for (fst_pro, sec_pro, lable) in pairs:
            if fst_pro in proteins[orga]:
                _protein = Protein(name=sec_pro, seq=pro_seq_dict[sec_pro])
                if int(lable) == 1:
                    proteins[orga][fst_pro].add_pos_neighbor(_protein)
                else:
                    proteins[orga][fst_pro].add_neg_neighbor(_protein)
            if sec_pro in proteins[orga]:
                _protein = Protein(name=fst_pro, seq=pro_seq_dict[fst_pro])
                if int(lable) == 1:
                    proteins[orga][sec_pro].add_pos_neighbor(_protein)
                else:
                    proteins[orga][sec_pro].add_neg_neighbor(_protein)
    proteins = {orga: pros.values() for orga, pros in proteins.items()}
    return proteins

In [17]:
seq_dir = "./data/dscript/processed/seqs"
pairs_dir = "./data/dscript/processed/pairs"
k = 100

train_proteins = get_proteins(seq_dir=seq_dir, pair_dir=pairs_dir, k=100)
test_proteins = get_proteins(seq_dir=seq_dir, pair_dir=pairs_dir, k=100, min_pos_neighbor=0)

os.makedirs("./experiments/proteins/train", exist_ok=True)
os.makedirs("./experiments/proteins/test", exist_ok=True)

for orga in train_proteins:
    proteins = train_proteins[orga]
    seqs = []
    pairs = []
    for pro in proteins:
        seqs.append(f"{pro.name}\t{pro.seq}\n")
        for nei in pro.pos_neighbors:
            pairs.append(f"{pro.name}\t{nei.name}\t1\n")
            seqs.append(f"{nei.name}\t{nei.seq}\n")
        for nei in pro.neg_neighbors:
            pairs.append(f"{pro.name}\t{nei.name}\t0\n")
            seqs.append(f"{nei.name}\t{nei.seq}\n")
    with open(f"./experiments/proteins/train/{orga}.pair", "w") as f:
        f.writelines(pairs)
    with open(f"./experiments/proteins/train/{orga}.seq", "w") as f:
        f.writelines(seqs)

for orga in test_proteins:
    proteins = test_proteins[orga]
    seqs = []
    pairs = []
    for pro in proteins:
        seqs.append(f"{pro.name}\t{pro.seq}\n")
        for nei in pro.pos_neighbors:
            pairs.append(f"{pro.name}\t{nei.name}\t1\n")
            seqs.append(f"{nei.name}\t{nei.seq}\n")
        for nei in pro.neg_neighbors:
            pairs.append(f"{pro.name}\t{nei.name}\t0\n")
            seqs.append(f"{nei.name}\t{nei.seq}\n")
    with open(f"./experiments/proteins/test/{orga}.pair", "w") as f:
        f.writelines(pairs)
    with open(f"./experiments/proteins/test/{orga}.seq", "w") as f:
        f.writelines(seqs)

