In [1]:
import numpy as np
from pyteomics import mgf
from collections import OrderedDict

match_mass_tol = 0.1  # in Da
prefix_mass_tol = 0.5  # in Da

tools_list = [
    "AdaNovo",
    "CasanovoV1",
    "CasanovoV2",
    "ContraNovo",
    "DeepNovo",
    "InstaNovo",
    "PepNet",
    "PGPointNovo",
    "pi-HelixNovo"
    "pi-PrimeNovo",
    "pNovo3",
    "PointNovo",
    "SMSNet",   
]

_PAD = "_PAD"
_GO = "_GO"
_EOS = "_EOS"
_START_VOCAB = [_PAD, _GO, _EOS]

PAD_ID = 0
GO_ID = 1
EOS_ID = 2
vocab_reverse = [
    "A",
    "R",
    "N",
    "n",
    "D",
    "C",
    "E",
    "Q",
    "q",
    "G",
    "H",
    "I",
    "L",
    "K",
    "M",
    "m",
    "F",
    "P",
    "S",
    "T",
    "W",
    "Y",
    "V",
    "d",
    "e",
    "f",
    "g",
    "p",
]

vocab_reverse = _START_VOCAB + vocab_reverse
vocab = dict([(x, y) for (y, x) in enumerate(vocab_reverse)])
vocab_size = len(vocab_reverse)

# mass value
mass_H = 1.0078
mass_H2O = 18.0106
mass_NH3 = 17.0265
mass_N_terminus = 1.0078
mass_C_terminus = 17.0027
mass_CO = 27.9949
mass_Phosphorylation = 79.96633

mass_AA = {
    "_PAD": 0.0,
    "_GO": mass_N_terminus - mass_H,
    "_EOS": mass_C_terminus + mass_H,
    "A": 71.03711,  # 0
    "R": 156.10111,  # 1
    "N": 114.04293,  # 2
    "n": 115.02695,
    "D": 115.02694,  # 3
    "C": 160.03065,  # 103.00919,  # 4
    "E": 129.04259,  # 5
    "Q": 128.05858,  # 6
    "q": 129.0426,
    "G": 57.02146,  # 7
    "H": 137.05891,  # 8
    "I": 113.08406,  # 9
    "L": 113.08406,  # 10
    "K": 128.09496,  # 11
    "M": 131.04049,  # 12
    "m": 147.0354,
    "F": 147.06841,  # 13
    "P": 97.05276,  # 14
    "S": 87.03203,  # 15
    "T": 101.04768,  # 16
    "W": 186.07931,  # 17
    "Y": 163.06333,  # 18
    "V": 99.06841,  # 19
    "d": 25.980265, #Carbamylation and NH3 loss
    "e": -17.026549, # NH3 loss
    "f": 43.005814, # Carbamylation
    "g": 42.010565, # Acetylation
    "p": 111.032, # "Q"-17.026549
}

mass_ID = [mass_AA[vocab_reverse[x]] for x in range(vocab_size)]
mass_ID_np = np.array(mass_ID, dtype=np.float32)
mass_AA_min = mass_AA["G"]  # 57.02146

In [12]:
##antibody data process
import os
def mAbdata_process(path, antibody):
    out_path=path+'/process1'
    if not os.path.exists(out_path):
        os.makedirs(out_path)
        os.makedirs(out_path+'/HCD')
        os.makedirs(out_path+'/EThcD')
    else:
        print("folder existed:", out_path)
    
    in_path = path+'/process/HCD/'
    out_path = in_path.replace('process','process1')
    files = os.listdir(in_path)
    idx = 1
    for file in files:
        with open(in_path+file,'r') as fr:
            lines = fr.readlines()
            with open(out_path+file,'w') as fw:
                for line in lines:
                    if 'TITLE=' in line:
                        scan = str(idx)
                        idx += 1
                        tmp = line.strip().split(', ')
                        title = 'TITLE='+antibody+':'+tmp[0].split(': ')[-1]+':'+tmp[-1].split(': ')[-1] #antibody+sample+scan
                    elif 'RTINSECONDS=' in line:
                        rt = line
                    elif 'PEPMASS=' in line:
                        mass = line.strip().split(' ')[0]
                    elif 'CHARGE=' in line:
                        charge = line
                        fw.write(title+'\n')
                        fw.write(mass+'\n')
                        fw.write(charge)
                        fw.write('SCANS='+tmp[-1].split(': ')[-1]+'\n')
                        fw.write(rt)
                        fw.write('SEQ=AAAAAAA'+'\n')
                    elif line.strip():
                        fw.write(line)

    #Merge data 
    in_path = path+'/process1/HCD/'
    out_file = path+'/process1/spectrum_'+antibody+'_HCD.mgf'
    files = os.listdir(in_path)
    with open(out_file,'w') as fw:
        idx = 1
        for file in files:
            with open(in_path+file,'r') as fr:
                lines = fr.readlines()
                for line in lines:
                    if 'SCANS=' in line:
                        fw.write('SCANS='+str(idx)+'\n')
                    else:
                        fw.write(line)  


    in_path = path+'/process/EThcD/'
    out_path = in_path.replace('process','process1')
    files = os.listdir(in_path)
    idx = 1
    for file in files:
        with open(in_path+file,'r') as fr:
            lines = fr.readlines()
            with open(out_path+file,'w') as fw:
                for line in lines:
                    if 'TITLE=' in line:
                        scan = str(idx)
                        idx += 1
                        tmp = line.strip().split(', ')
                        title = 'TITLE='+antibody+':'+tmp[0].split(': ')[-1]+':'+tmp[-1].split(': ')[-1] #antibody+sample+scan
                    elif 'RTINSECONDS=' in line:
                        rt = line
                    elif 'PEPMASS=' in line:
                        mass = line.strip().split(' ')[0]
                    elif 'CHARGE=' in line:
                        charge = line
                        fw.write(title+'\n')
                        fw.write(mass+'\n')
                        fw.write(charge)
                        fw.write('SCANS='+tmp[-1].split(': ')[-1]+'\n')
                        fw.write(rt)
                        fw.write('SEQ=AAAAAAA'+'\n')
                    elif line.strip():
                        fw.write(line)
    #Merge data
    in_path = path+'/process1/EThcD/'
    out_file = path+'/process1/spectrum_'+antibody+'_EThcD.mgf'
    files = os.listdir(in_path)
    with open(out_file,'w') as fw:
        idx = 1
        for file in files:
            with open(in_path+file,'r') as fr:
                lines = fr.readlines()
                for line in lines:
                    if 'SCANS=' in line:
                        fw.write('SCANS='+str(idx)+'\n')
                    else:
                        fw.write(line)  

In [None]:
import pandas as pd
df=pd.read_csv('/mnt/data_nas/jwb/antibody-gw-finalData/131_Ab_EThcD_filtered_DB_search_psm_only3mod_Trypsin.csv')
df=df[['Peptide','-10lgP','Mass','Length','ppm','m/z','RT','Area','Scan','Accession','PTM','AScore','Z','Protease','Antibody']]
df['Spectrum Name']=df['Antibody']+':'+df['Scan']
df["Modified Sequence"]=[str(i).replace('M(+15.99)', 'm').replace('Q(+.98)', 'q').replace('N(+.98)', 'n').replace(' ','').replace('C(+57.02)', 'C').replace('I','L') for i in df['Peptide']]
df_new=df[['Spectrum Name','Modified Sequence','Length','m/z','Z','Protease']]
df_new.to_csv('/mnt/data_nas/jwb/antibody-gw-finalData/131_Ab_EThcD_filtered_DB_search_psm_only3mod_Trypsin_final.csv',index=False)

In [None]:
import pandas as pd
import numpy as np
def process_casanovoV1(casanovo_path,mgf_file):
    """Parse de novo results from Casanovo to dataframe.
            :param
                casanovo_path: path to the mztab file of Casanovo
            :return
                pnovo_df: dataframe with Score, Peptide, AAScore of Casanovo
    """
    titles=[]
    with open(mgf_file,'r') as fr:
        lines = fr.readlines()
        for line in lines:
            if 'TITLE=' in line:
                titles.append(line.strip().split('TITLE=')[-1])

    casanovo_out = casanovo_path.replace('casanovo_denovo.mztab','CasanovoV1_result.csv')
    
    with open(casanovo_path) as f:           
        # Identify header
        for i, line in enumerate(f):
            if line.startswith("PSH"):
                casanovo_header = i 
                break
            
    # Load the data into a DataFrame
    df_deno = pd.read_csv(casanovo_path, sep="\t", header=casanovo_header)
    df_denovo = df_deno[df_deno['sequence'].notna()].copy()
    df_denovo['PSM_ID'] = df_denovo['PSM_ID'].astype(int)
    df_denovo['PSM_ID'] = df_denovo['PSM_ID'].apply(lambda x: titles[x] if x < len(titles) else None)

    sequence = df_denovo['sequence'].tolist()
    PSM_ID = df_denovo['PSM_ID'].tolist()
    Score = [int((i * 100 + 100)/2) for i in df_denovo['search_engine_score[1]'].tolist()]
    aascore = df_denovo['opt_ms_run[1]_aa_scores'].tolist()
    for i in range(len(aascore)):
        aascore[i]=aascore[i].replace(',', ' ')
    aascore= [i.replace(',', ' ') for i in aascore]
    aascore = [i.split() for i in aascore]
    aascore = [[str(int(float(j) * 100)) for j in i] for i in aascore]
    df_denovo['aaScore'] = [" ".join(i) for i in aascore]
    aascore = df_denovo['aaScore'].tolist()
    scan = df_denovo['PSM_ID'].tolist()
    seq = [str(i).replace('M(+15.99)', 'm').replace('Q(+.98)', 'q').replace('N(+.98)', 'n').replace(' ','').replace('C(+57.02)', 'C').replace('I','L') for i in sequence]
    DF = np.empty((len(PSM_ID), 4),dtype='object')
    DF[:,0] = PSM_ID
    DF[:,1] = seq
    DF[:,2] = Score
    DF[:,3] = aascore 

    name = ['Spectrum Name','CasanovoV1 Peptide','CasanovoV1 Score','CasanovoV1 aaScore']
    df = pd.DataFrame(DF, columns=name)
    df.to_csv(casanovo_out,index=False)
process_casanovoV1('/mnt/data_nas/jwb/AbNovoBench/denovo/casanovo3/131ab_EThcd/casanovo_denovo.mztab','/mnt/data_nas/jwb/antibody-gw-finalData/131Ab_20250213_DBsearch_reformed_EThcD_only3mod_Trypsin_DENOVO.mgf')

In [None]:
import pandas as pd
import numpy as np
def process_casanovoV2(casanovo_path,mgf_file):
    """Parse de novo results from Casanovo to dataframe.
            :param
                casanovo_path: path to the mztab file of Casanovo
            :return
                pnovo_df: dataframe with Score, Peptide, AAScore of Casanovo
    """
    titles=[]
    with open(mgf_file,'r') as fr:
        lines = fr.readlines()
        for line in lines:
            if 'TITLE=' in line:
                titles.append(line.strip().split('TITLE=')[-1])
    
    casanovo_out = casanovo_path.replace('denovo.mztab','CasanovoV2_result.csv') #be carefull
    with open(casanovo_path) as f:           
        # Identify header
        for i, line in enumerate(f):
            if line.startswith("PSH"):
                casanovo_header = i 
                break
            
    # Load the data into a DataFrame
    df_deno = pd.read_csv(casanovo_path, sep="\t", header=casanovo_header)
    df_denovo = df_deno[df_deno['sequence'].notna()].copy()
               
    df_denovo['PSM_ID'] = [int(item.replace('ms_run[1]:index=', '')) for item in df_denovo['spectra_ref']]
    df_denovo['PSM_ID'] = df_denovo['PSM_ID'].apply(lambda x: titles[x] if x < len(titles) else None)

    sequence = df_denovo['sequence'].tolist()
    PSM_ID = df_denovo['PSM_ID'].tolist()
    Score = [int((i * 100 + 100)/2) for i in df_denovo['search_engine_score[1]'].tolist()]
    aascore = df_denovo['opt_ms_run[1]_aa_scores'].tolist()
    for i in range(len(aascore)):
        aascore[i]=aascore[i].replace(',', ' ')
    aascore= [i.replace(',', ' ') for i in aascore]
    aascore = [i.split() for i in aascore]
    aascore = [[str(int(float(j) * 100)) for j in i] for i in aascore]
    df_denovo['aaScore'] = [" ".join(i) for i in aascore]
    aascore = df_denovo['aaScore'].tolist()
    scan = df_denovo['PSM_ID'].tolist()
    seq = [str(i).replace('M(+15.99)', 'm').replace('Q(+.98)', 'q').replace('N(+.98)', 'n').replace(' ','').replace('C(+57.02)', 'C').replace('I','L') for i in sequence]
    DF = np.empty((len(PSM_ID), 4),dtype='object')
    DF[:,0] = PSM_ID
    DF[:,1] = seq 
    DF[:,2] = Score
    DF[:,3] = aascore 

    name = ['Spectrum Name','CasanovoV2 Peptide','CasanovoV2 Score','CasanovoV2 aaScore']
    df = pd.DataFrame(DF, columns=name)
    df.to_csv(casanovo_out,index=False)

process_casanovoV2('/mnt/data_nas/jwb/AbNovoBench/denovo/casanovo4/131ab_EThcd/denovo.mztab','/mnt/data_nas/jwb/antibody-gw-finalData/131Ab_20250213_DBsearch_reformed_EThcD_only3mod_Trypsin_DENOVO.mgf')

In [None]:
def process_pNovo3(pnovo_path):
    pnovo_df = pd.read_csv(pnovo_path, sep="	", header=None)
    pnovo_df = pnovo_df[[0, 1, 4, 5]]
    pnovo_df.columns = ['pNovo3 Index', 'pNovo3 Peptide', 'pNovo3 Score', 'pNovo3 aaScore']
    pnovo_title = pnovo_df['pNovo3 Index'].tolist()
    pnovo_title = [i.split('TITLE=')[-1] for i in pnovo_title]
    pnovo_df['Spectrum Name']=pnovo_title
    pnovo_peptide = pnovo_df['pNovo3 Peptide'].tolist()
    pnovo_peptide = [str(i).replace('a', 'n').replace('b', 'q').replace('B', 'q').replace('c', 'm').replace('I','L') for i in pnovo_peptide]
    pnovo_df['pNovo3 Peptide']=pnovo_peptide
    pnovo_score = [int(i) for i in pnovo_df['pNovo3 Score'].tolist()]
    pnovo_df['pNovo3 Score']=pnovo_score
    aascore = pnovo_df['pNovo3 aaScore'].tolist()
    aascore= [i.replace(',', ' ') for i in aascore]
    aascore = [i.split() for i in aascore]
    aascore = [[str(int(float(j))) for j in i] for i in aascore]
    pnovo_df['pNovo3 aaScore'] = [" ".join(i) for i in aascore]
    pnovo_df = pnovo_df[['Spectrum Name', 'pNovo3 Peptide', 'pNovo3 Score', 'pNovo3 aaScore']]
    pnovo_df.to_csv(os.path.join(os.path.dirname(pnovo_path), 'pNovo3_result.csv'),index=False)
process_pNovo3('/mnt/data/luoling/benchmark_docker/131Ab_hcd_denovo/pNovo', 'results.res')

In [17]:
def process_PepNet(pepnet_path):
    #'/mnt/data_nas/jwb/AbNovoBench/denovo/pepnet/131ab_EThcd/denovo.tsv'
    pepnet_df = pd.read_csv(pepnet_path, sep='\t')
    pepnet_title = pepnet_df['TITLE'].tolist()
    pepnet_title = [i.split('TITLE=')[-1] for i in pepnet_title]
    pepnet_df['Spectrum Name']=pepnet_title
    pepnet_peptide = pepnet_df['DENOVO'].tolist()
    pepnet_df['PepNet Peptide'] = [i.replace('I','L') for i in pepnet_peptide]
    pepnet_score = [int(i*100) for i in pepnet_df['Score'].tolist()]
    pepnet_df['PepNet Score']=pepnet_score
    aascore = pepnet_df['Positional Score'].tolist()
    aascore= [i.replace('[', '').replace(']', '').replace(',', ' ') for i in aascore]
    aascore = [i.split() for i in aascore]
    aascore = [[str(int(float(j)*100)) for j in i] for i in aascore]
    pepnet_df['PepNet aaScore'] = [" ".join(i) for i in aascore]
    pepnet_df = pepnet_df[['Spectrum Name', 'PepNet Peptide', 'PepNet Score', 'PepNet aaScore']]
    pepnet_df.to_csv(os.path.join(os.path.dirname(pepnet_path), 'PepNet_result.csv'),index=False)

process_PepNet('/mnt/data_nas/jwb/AbNovoBench/denovo/pepnet/131ab_EThcd', 'denovo.tsv')

'/pnovo/mnt/res/ddw'

In [None]:
def process_DeepNovo(deepnovo_path, mgf_file):
    """parse de novo results from DeepNoo to dataframe

            :param
                deepnovo_path: path to the result file of DeepNovo
            :return
                deepnovo_df: dataframe with Score, Peptide, AAScore of DeepNovo
    """
    try:
        titles = []
        scans = []
        with open(mgf_file,'r') as fr:
            lines = fr.readlines()
            for line in lines:
                if 'TITLE=' in line:
                    titles.append(line.strip().split('TITLE=')[-1])
                elif 'SCANS=' in line:
                    scans.append(int(line.strip().split('SCANS=')[-1]))
        scan_title_dict = dict(zip(scans, titles))

        
        with open(deepnovo_path) as f:
            deepnovo_df = pd.read_csv(deepnovo_path, sep="	", header=0)

            deepnovo_df = deepnovo_df[['scan', 'predicted_score', 'predicted_sequence',  'predicted_position_score']]

            # bug from deepnovo (v.PNAS) which causes result to be wrong aligned

            '''
            scan    output_score    output_seq  aa_scpre
            1       NaN             NaN         NaN
            NaN     24              S,E,L       12, 41, 12
            2       NaN ...
            '''
            deepnovo_scan = deepnovo_df['scan'].tolist()
            deepnovo_df['scan'] = [scan_title_dict[i] for i in deepnovo_scan]

            # change peptide from P,E,P,T,I,D to PEPTID

            deepnovo_peptide = deepnovo_df['predicted_sequence'].tolist()
            for i in range(len(deepnovo_peptide)):
                deepnovo_peptide[i] = str(deepnovo_peptide[i])
                deepnovo_peptide[i] = deepnovo_peptide[i].replace(",", "").replace("I", "L").replace("Cmod",
                    "C").replace("Mmod", "m").replace("Nmod", "n").replace("Qmod", "q")
            deepnovo_df['predicted_sequence'] = deepnovo_peptide
            deepnovo_df.columns = ['Spectrum Name','DeepNovo Score', 'DeepNovo Peptide', 'DeepNovo aaScore']

            # scale peptide score from 0 to 100

            deepnovo_score = deepnovo_df['DeepNovo Score']
            deepnovo_df['DeepNovo Score'] = [int(np.exp(i) * 100) if not pd.isna(i) else 0 for i in deepnovo_score]

            # scale amino acid score from 0 to 100

            deepnovo_aascore = deepnovo_df['DeepNovo aaScore'].tolist()
            deepnovo_aascore = [str(i).replace(",", " ") for i in deepnovo_aascore]
            deepnovo_aascore = [i.split() for i in deepnovo_aascore]
            deepnovo_aascore = [[str(int(np.exp(float(j)) * 100)) if j not in ['nan', 'NaN', '', None] else '' for j in i] for i in deepnovo_aascore]
            deepnovo_df['DeepNovo aaScore'] = [" ".join(i) for i in deepnovo_aascore]
            deepnovo_df = deepnovo_df[['Spectrum Name','DeepNovo Peptide', 'DeepNovo Score', 'DeepNovo aaScore']]
            deepnovo_df.to_csv(os.path.join(os.path.dirname(deepnovo_path),'DeepNovo_result.csv'), index=False)
    except IOError:
        logger.warning(f"DeepNovo results not accessible. Make sure they are placed in {deepnovo_path}")
        return pd.DataFrame()
process_DeepNovo('/mnt/data_nas/jwb/AbNovoBench/denovo/deepnovo2/131ab_EThcd/denovo.deepnovo_denovo','/mnt/data_nas/jwb/AbNovoBench/denovo/deepnovo2/131ab_EThcd/131ab_EThcd_forDeepNovo.mgf')

In [None]:
import numpy as np

def process_pointnovo(pointnovo_path, mgf_file):
    """parse de novo results from PointNovo to dataframe
            :param
                pointnovo_path: path to the result file of PointNovo
            :return
                pointnovo_df: dataframe with ScanNum, Score, Peptide, AAScore of PointNovo
    """
    try:
        titles = []
        scans = []
        with open(mgf_file,'r') as fr:
            lines = fr.readlines()
            for line in lines:
                if 'TITLE=' in line:
                    titles.append(line.strip().split('TITLE=')[-1])
                elif 'SCANS=' in line:
                    scans.append(int(line.strip().split('SCANS=')[-1]))
        scan_title_dict = dict(zip(scans, titles))


        
        with open(pointnovo_path) as f:
            pointnovo_df = pd.read_csv(pointnovo_path, sep="\t", header=0)
            pointnovo_df['PSM_ID'] = [scan_title_dict[i] for i in pointnovo_df['feature_id']]
            pointnovo_df = pointnovo_df[['PSM_ID', 'predicted_sequence', 'predicted_score', 'predicted_position_score']]

            # change peptide from P,E,P,T,I,D to PEPTID

            pointnovo_peptide = pointnovo_df['predicted_sequence'].tolist()
            for i in range(len(pointnovo_peptide)):
                pointnovo_peptide[i] = str(pointnovo_peptide[i])
                pointnovo_peptide[i] = pointnovo_peptide[i].replace(",", "").replace("I", "L").replace("N(Deamidation)",
                "n").replace("Q(Deamidation)", "q").replace("C(Carbamidomethylation)", "C").replace("M(Oxidation)", "m")
            pointnovo_df['predicted_sequence'] = pointnovo_peptide
            pointnovo_df.columns = ['Spectrum Name','PointNovo Peptide', 'PointNovo Score', 'PointNovo aaScore']

            # scale peptide score from 0 to 100

            pointnovo_score = pointnovo_df['PointNovo Score']
            pointnovo_df['PointNovo Score'] = [int(np.exp(i) * 100) if not pd.isna(i) else 0 for i in pointnovo_score]

            # scale AAscore from 0 to 100

            pointnovo_aascore = pointnovo_df['PointNovo aaScore'].tolist()
            pointnovo_aascore = [str(i).replace(",", " ") for i in pointnovo_aascore]
            pointnovo_aascore = [i.split() for i in pointnovo_aascore]
            pointnovo_aascore = [[str(int(np.exp(float(j)) * 100)) if j not in ['nan', 'NaN', '', None] else '' for j in i] for i in pointnovo_aascore]
            pointnovo_df['PointNovo aaScore'] = [" ".join(i) for i in pointnovo_aascore]
            pointnovo_df.to_csv(os.path.join(os.path.dirname(pointnovo_path),'PointNovo_result.csv'), index=False)
    except IOError:
        logger.warning(f"PointNovo results not accessible. Make sure they are placed in {pointnovo_path}")
        return pd.DataFrame()

process_PointNovo('/mnt/data_nas/jwb/AbNovoBench/denovo/pointnovo/131ab_EThcd/131ab_EThcd_forPointNovo.deepnovo_denovo','/mnt/data_nas/jwb/AbNovoBench/denovo/pointnovo/131ab_EThcd/131ab_EThcd_forPointNovo.mgf')

In [None]:
def process_HelixNovo(pinovo_path):
    """parse de novo results from Pi-HelixNovo to dataframe
            :param
                pinovo_path: path to the result file of Pi-HelixNovo
            :return
                pinovo_df: dataframe with Tiltle, Peptide, Score of Pi-HelixNovo
    """
    try:
        with open(pinovo_path) as f:
            pinovo_df = pd.read_csv(pinovo_path, sep="\t", header=None)
            pinovo_df.columns = ["Title", "sequence", "Score"]
            pinovo_title = pinovo_df['Title']
            pinovo_df['Title'] = [i.split('TITLE=')[-1] for i in pinovo_title]
            pinovo_peptide = pinovo_df['sequence'].tolist()
            pinovo_df['pi-HelixNovo Peptide'] = [str(i).replace('M(+15.99)', 'm').replace('Q(+.98)', 'q').replace('N(+.98)', 'n').replace(' ',
                                        '').replace('C(+57.02)', 'C').replace('I','L') for i in pinovo_peptide]
            
            # scale peptide score from 0 to 100

            pinovo_score = pinovo_df['Score']
            pinovo_df['pi-HelixNovo Score'] = [int(i * 100) for i in pinovo_score]
            pinovo_df['Spectrum Name'] = pinovo_df['Title']

            pinovo_df = pinovo_df[['Spectrum Name', 'pi-HelixNovo Peptide', "pi-HelixNovo Score"]]
            pinovo_df.to_csv(os.path.join(os.path.dirname(pinovo_path),'pi-HelixNovo_result.csv'), index=False)
    except IOError:
        logger.warning(f"Pi-HelixNovo results not accessible. Make sure they are placed in {pinovo_path}")
        return pd.DataFrame()

process_HelixNovo('/mnt/data_nas/jwb/AbNovoBench/denovo/pi-helixnovo/131ab_EThcd/denovo_denovo.txt')

In [None]:
import pandas as pd
import numpy as np
def process_AdaNovo(adanovo_path,mgf_file):
    """Parse de novo results from AdaNovo to dataframe.
            :param
                adanovo_path: path to the mztab file of AdaNovo
            :return
                pnovo_df: dataframe with Score, Peptide, AAScore of AdaNovo
    """
    titles = []
    with open(mgf_file,'r') as fr:
        lines = fr.readlines()
        for line in lines:
            if 'TITLE=' in line:
                titles.append(line.strip().split('TITLE=')[-1])
    
    adanovo_out = adanovo_path.replace('denovo.mztab','AdaNovo_result.csv')
    with open(adanovo_path) as f:           
        # Identify header
        for i, line in enumerate(f):
            if line.startswith("PSH"):
                adanovo_header = i 
                break
            
    # Load the data into a DataFrame
    df_deno = pd.read_csv(adanovo_path, sep="\t", header=adanovo_header)
    df_denovo = df_deno[df_deno['sequence'].notna()].copy()

    df_denovo['PSM_ID'] = [int(item.replace('ms_run[1]:index=', '')) for item in df_denovo['spectra_ref']]
    df_denovo['PSM_ID'] = df_denovo['PSM_ID'].apply(lambda x: titles[x] if x < len(titles) else None)

    sequence = df_denovo['sequence'].tolist()
    PSM_ID = df_denovo['PSM_ID'].tolist()
    Score = [int((i * 100 + 100)/2) for i in df_denovo['search_engine_score[1]'].tolist()]
    aascore = df_denovo['opt_ms_run[1]_aa_scores'].tolist()
    for i in range(len(aascore)):
        aascore[i]=aascore[i].replace(',', ' ')
    aascore= [i.replace(',', ' ') for i in aascore]
    aascore = [i.split() for i in aascore]
    aascore = [[str(int(float(j) * 100)) for j in i] for i in aascore]
    df_denovo['aaScore'] = [" ".join(i) for i in aascore]
    aascore = df_denovo['aaScore'].tolist()
    scan = df_denovo['PSM_ID'].tolist()
    seq = [str(i).replace('M(+15.99)', 'm').replace('Q(+.98)', 'q').replace('N(+.98)', 'n').replace(' ','').replace('C(+57.02)', 'C').replace('I','L') for i in sequence]
    DF = np.empty((len(PSM_ID), 4),dtype='object')
    DF[:,0] = PSM_ID
    DF[:,1] = seq
    DF[:,2] = Score
    DF[:,3] = aascore 

    name = ['Spectrum Name','AdaNovo Peptide','AdaNovo Score','AdaNovo aaScore']
    df = pd.DataFrame(DF, columns=name)
    df.to_csv(adanovo_out,index=False)

process_AdaNovo('/mnt/data_nas/jwb/AbNovoBench/denovo/adanovo/131ab_EThcd/denovo.mztab','/mnt/data_nas/jwb/antibody-gw-finalData/131Ab_20250213_DBsearch_reformed_EThcD_only3mod_Trypsin_DENOVO.mgf')

In [None]:
import pandas as pd
import numpy as np
def process_InstaNovo(instanovo_path,mgf_file):
    """Parse de novo results from InstaNovo to dataframe.
            :param
                instanovo_path: path to the mztab file of InstaNovo
            :return
                pnovo_df: dataframe with Score, Peptide, AAScore of InstaNovo
    """
    titles = []
    with open(mgf_file,'r') as fr:
        lines = fr.readlines()
        for line in lines:
            if 'TITLE=' in line:
                titles.append(line.strip().split('TITLE=')[-1])
    
    instanovo_out = instanovo_path.replace('predictions_before_refinement.csv','InstaNovo_result.csv')
            
    # Load the data into a DataFrame
    df_deno = pd.read_csv(instanovo_path)
    df_denovo = df_deno[df_deno['predictions'].notna()].copy()

    df_denovo['PSM_ID'] = [int(item) for item in df_denovo['scan_number']]
    df_denovo['PSM_ID'] = df_denovo['PSM_ID'].apply(lambda x: titles[x] if x < len(titles) else None)

    sequence = df_denovo['predictions'].tolist()
    PSM_ID = df_denovo['PSM_ID'].tolist()
    Score = [int(np.exp(i) * 100) for i in df_denovo['log_probabilities'].tolist()]
    aascore = df_denovo['token_log_probabilities'].tolist()
    for i in range(len(aascore)):
        aascore[i]=aascore[i].replace(',', ' ').replace('[','').replace(']','')
    aascore= [i.replace(',', ' ') for i in aascore]
    aascore = [i.split() for i in aascore]
    aascore = [[str(int(np.exp(float(j)) * 100)) for j in i] for i in aascore]

    df_denovo['aaScore'] = [" ".join(i) for i in aascore]
    aascore = df_denovo['aaScore'].tolist()
    scan = df_denovo['PSM_ID'].tolist()
    seq = [str(i).replace('M[UNIMOD:35]', 'm').replace('Q[UNIMOD:7]', 'q').replace('N[UNIMOD:7]', 'n').replace('C[UNIMOD:4]', 'C').replace('I','L') for i in sequence]
    DF = np.empty((len(PSM_ID), 4),dtype='object')
    DF[:,0] = PSM_ID
    DF[:,1] = seq
    DF[:,2] = Score
    DF[:,3] = aascore 

    name = ['Spectrum Name','InstaNovo Peptide','InstaNovo Score','InstaNovo aaScore']
    df = pd.DataFrame(DF, columns=name)
    df.to_csv(instanovo_out,index=False)
process_InstaNovo('/mnt/data_nas/jwb/AbNovoBench/denovo/instanovo_new/131ab_EThcd/predictions_before_refinement.csv','/mnt/data_nas/jwb/antibody-gw-finalData/131Ab_20250213_DBsearch_reformed_EThcD_only3mod_Trypsin_DENOVO.mgf')

In [None]:
import pandas as pd
import numpy as np
def process_ContraNovo(contranovo_path):
    """Parse de novo results from ContraNovo to dataframe.
            :param
                contranovo_path: path to the mztab file of ContraNovo
            :return
                contranovo_df: dataframe with Score, Peptide, AAScore of ContraNovo
    """
    
    contranovo_out = contranovo_path.replace('denovo.mztab','ContraNovo_result.csv')
    with open(contranovo_path) as f:           
        # Identify header
        for i, line in enumerate(f):
            if line.startswith("PSH"):
                contranovo_header = i 
                break
            
    # Load the data into a DataFrame
    df_deno = pd.read_csv(contranovo_path, sep="\t", header=contranovo_header)
    df_denovo = df_deno[df_deno['sequence'].notna()].copy()
    
    df_denovo['PSM_ID'] = [i.split('TITLE=')[-1] for i in df_denovo['spectra_ref']]
    df_denovo['PSM_ID'] = [i.replace(']','') for i in df_denovo['PSM_ID']]

    sequence = df_denovo['sequence'].tolist()
    PSM_ID = df_denovo['PSM_ID'].tolist()
    Score = [int((i * 100 + 100)/2) for i in df_denovo['search_engine_score[1]'].tolist()]
    aascore = df_denovo['opt_ms_run[1]_aa_scores'].tolist()
    for i in range(len(aascore)):
        aascore[i]=aascore[i].replace(',', ' ')
    aascore= [i.replace(',', ' ') for i in aascore]
    aascore = [i.split() for i in aascore]
    aascore = [[str(int(float(j) * 100)) for j in i] for i in aascore]
    df_denovo['aaScore'] = [" ".join(i) for i in aascore]
    aascore = df_denovo['aaScore'].tolist()
    scan = df_denovo['PSM_ID'].tolist()
    seq = [str(i).replace('M+15.995', 'm').replace('Q+0.984', 'q').replace('N+0.984', 'n').replace(' ','').replace('C+57.021', 'C').replace('-17.027Q','p').replace('+43.006-17.027','d').replace('-17.027','e').replace('+43.006','f').replace('+42.011','g').replace('I','L') for i in sequence]     
    
    DF = np.empty((len(PSM_ID), 4),dtype='object')
    DF[:,0] = PSM_ID
    DF[:,1] = seq
    DF[:,2] = Score
    DF[:,3] = aascore 

    name = ['Spectrum Name','ContraNovo Peptide','ContraNovo Score','ContraNovo aaScore']
    df = pd.DataFrame(DF, columns=name)
    df.to_csv(contranovo_out,index=False)
process_ContraNovo('/mnt/data_nas/jwb/AbNovoBench/denovo/contranovo/131ab_EThcd/denovo.mztab')

In [None]:
def process_PrimeNovo(primenovo_path):
    """parse de novo results from pi-PrimeNovo to dataframe
            :param
                primenovo_path: path to the mztab file of pi-PrimeNovo
            :return
                pnovo_df: dataframe with Score, Peptide, AAScore of pi-PrimeNovo
    """
    try:
        with open(primenovo_path) as f:
            
            primenovo_df = pd.read_table(primenovo_path)
            primenovo_peptide = primenovo_df['prediction'].tolist()
            primenovo_df['pi-PrimeNovo Peptide'] = [str(i).replace('M[+15.995]', 'm').replace('Q[+0.984]', 'q').replace('N[+0.984]', 'n').replace(' ',
                                        '').replace('C[+57.021]', 'C').replace('[-17.027]-Q', 'p').replace('-[17.027]-Q', 'p').replace('[+43.006-17.027]-','d').replace('+[43.006-17.027]','d').replace('[17.027]-','e').replace('[-17.027]-','e').replace('-[17.027]','e').replace('[+43.006]-','f').replace('+[43.006]','f').replace('[+42.011]-','g').replace('+[42.011]','g').replace('I','L') for i in primenovo_peptide]
            
            score = primenovo_df['score']
            primenovo_df['pi-PrimeNovo Score'] = [int(i * 100) for i in score]
            primenovo_df['Spectrum Name'] = [i.split('TITLE=')[-1] for i in primenovo_df['label']]
            primenovo_df = primenovo_df[['Spectrum Name', 'pi-PrimeNovo Peptide', 'pi-PrimeNovo Score']]
            primenovo_df.to_csv(os.path.join(os.path.dirname(primenovo_path),'pi-PrimeNovo_result.csv'), index=False)
    except IOError:
        logger.warning(f"pi-PrimeNovo results not accessible. Make sure they are placed in {primenovo_path}")
        return pd.DataFrame()

process_PrimeNovo('/mnt/data_nas/jwb/AbNovoBench/denovo/pi-primenovo/131ab_EThcd/denovo.tsv')

In [None]:
import numpy as np

def process_PGPointNovo(pgpointnovo_path, mgf_file):
    """parse de novo results from PGPointNovo to dataframe
            :param
                pgpointnovo_path: path to the result file of PGPointNovo
            :return
                pgpointnovo_df: dataframe with ScanNum, Score, Peptide, AAScore of PGPointNovo
    """
    try:
        titles = []
        scans = []
        with open(mgf_file,'r') as fr:
            lines = fr.readlines()
            for line in lines:
                if 'TITLE=' in line:
                    titles.append(line.strip().split('TITLE=')[-1])
                elif 'SCANS=' in line:
                    scans.append(int(line.strip().split('SCANS=')[-1]))
        scan_title_dict = dict(zip(scans, titles))

        with open(pgpointnovo_path) as f:
            pgpointnovo_df = pd.read_csv(pgpointnovo_path, sep="\t", header=0)
            pgpointnovo_df['Spectrum Name'] = [scan_title_dict[i] for i in pgpointnovo_df['feature_id']]
            pgpointnovo_df = pgpointnovo_df[['Spectrum Name', 'predicted_sequence', 'predicted_score', 'predicted_position_score']]

            # change peptide from P,E,P,T,I,D to PEPTID

            pgpointnovo_peptide = pgpointnovo_df['predicted_sequence'].tolist()
            for i in range(len(pgpointnovo_peptide)):
                pgpointnovo_peptide[i] = str(pgpointnovo_peptide[i])
                pgpointnovo_peptide[i] = pgpointnovo_peptide[i].replace(",", "").replace("I", "L").replace("N(Deamidation)",
                "n").replace("Q(Deamidation)", "q").replace("C(Carbamidomethylation)", "C").replace("M(Oxidation)", "m")
            pgpointnovo_df['predicted_sequence'] = pgpointnovo_peptide
            pgpointnovo_df.columns = ['Spectrum Name', 'PGPointNovo Peptide', 'PGPointNovo Score', 'PGPointNovo aaScore']

            # scale peptide score from 0 to 100

            pgpointnovo_score = pgpointnovo_df['PGPointNovo Score']
            pgpointnovo_df['PGPointNovo Score'] = [int(np.exp(i) * 100) if not pd.isna(i) else 0 for i in pgpointnovo_score]

            # scale AAscore from 0 to 100

            pgpointnovo_aascore = pgpointnovo_df['PGPointNovo aaScore'].tolist()
            pgpointnovo_aascore = [str(i).replace(",", " ") for i in pgpointnovo_aascore]
            pgpointnovo_aascore = [i.split() for i in pgpointnovo_aascore]
            pgpointnovo_aascore = [[str(int(np.exp(float(j)) * 100)) if j not in ['nan', 'NaN', '', None] else '' for j in i] for i in pgpointnovo_aascore]

            pgpointnovo_df['PGPointNovo aaScore'] = [" ".join(i) for i in pgpointnovo_aascore]
            pgpointnovo_df.to_csv(os.path.join(os.path.dirname(pgpointnovo_path),'PGPointNovo_result.csv'), index=False)
    except IOError:
        logger.warning(f"PGPointNovo results not accessible. Make sure they are placed in {pgpointnovo_path}")
        return pd.DataFrame()

process_PGPointNovo('/mnt/data_nas/jwb/AbNovoBench/denovo/pgpointnovo/131ab_EThcd/denovo.deepnovo_denovo','/mnt/data_nas/jwb/antibody-gw-finalData/131Ab_20250213_DBsearch_reformed_EThcD_only3mod_Trypsin_DENOVO.mgf')

In [None]:
def process_SMSNet(smsnet_path, mgf_file):
    """parse de novo results from SMSNet to dataframe

            :param
                smsnet_path: path to the result file of SMSNet
            :return
                smsnet_df: dataframe with Score, Peptide, AAScore of SMSNet
        """
    try:
        titles = []
        with open(mgf_file,'r') as fr:
            lines = fr.readlines()
            for line in lines:
                if 'TITLE=' in line:
                    titles.append(line.strip().split('TITLE=')[-1])
        
        with open(smsnet_path) as f, open(
                smsnet_path + '_prob') as g:  # change _rescore and _prob to switch between rescoring and real
            smsnet_peptide = pd.Series([line.rstrip() for line in f])
            peptide_list = [x.replace(" ", "").replace("I", "L").replace("<s>","").replace("<unk>","") for x in
                            smsnet_peptide]
            smsnet_peptide = pd.DataFrame(peptide_list)
            aa_score = g.readlines()
            aa_score = [i.strip().split(' ') for i in aa_score]
            score_sum = []
            for i in range(len(aa_score)):
                if not aa_score[i] == ['']:
                    for j in range(len(aa_score[i])):
                        aa_score[i][j] = int(float(np.exp(float(aa_score[i][j])) * 100))
                else:
                    aa_score[i] = [0]
            for i in range(len(aa_score)):
                if not aa_score[i] == [0]:
                    score_sum.append(int(statistics.mean(aa_score[i])))
                else:
                    score_sum.append(0)
            df = pd.DataFrame({'aaScore': aa_score, 'Peptide Score': score_sum})
            smsnet_df = pd.concat([smsnet_peptide, df], axis=1)
            smsnet_df['Spectrum Name'] = titles
            smsnet_df.columns = ['SMSNet Peptide', 'SMSNet aaScore', 'SMSNet Score','Spectrum Name']
            clist = ['Spectrum Name','SMSNet Peptide','SMSNet Score', 'SMSNet aaScore']
            smsnet_df = smsnet_df[clist]
            smsnet_df.index = range(1, len(smsnet_df) + 1)

            smsnet_aascore = smsnet_df['SMSNet aaScore'].tolist()

            # remove peptide predictions which look like "<s><s><ink>SSSSLASSS"

            smsnet_df['SMSNet aaScore'] = [str(i).replace(' ', '').replace(',', ' '
                                            ).replace('[', '').replace(']', '')
                                           for i in
                                           smsnet_aascore]
            smsnet_df.to_csv(os.path.join(os.path.dirname(smsnet_path),'SMSNet_result.csv'), index=False)
    except IOError:
        logger.warning(f"SMSNet results not accessible. Make sure they are placed in {smsnet_path}")
        return pd.DataFrame()

process_SMSNet('/mnt/data_nas/jwb/AbNovoBench/denovo/SMSNet/131ab_EThcd/131ab_EThcd_forSMSNet','/mnt/data_nas/jwb/AbNovoBench/denovo/SMSNet/131ab_EThcd/131ab_EThcd_forSMSNet.mgf')

In [None]:
import pandas as pd
import os

# 主文件路径
path = '/mnt/data_nas/jwb/AbNovoBench/denovo'
db_search_df = pd.read_csv('/mnt/data_nas/jwb/antibody-gw-finalData/131_Ab_EThcD_filtered_DB_search_psm_only3mod_Trypsin_final.csv')
db_search_df.set_index("Spectrum Name", inplace=True)

# 工具名称及对应文件夹名
tools_list = [
    "AdaNovo", "CasanovoV1", "CasanovoV2", "ContraNovo", "DeepNovo", "InstaNovo",
    "PepNet", "PGPointNovo", "pi-HelixNovo", "pi-PrimeNovo", "PointNovo",'SMSNet'
]
tools_fold = [
    "adanovo", "casanovo3", "casanovo4", "contranovo", "deepnovo2", "instanovo_new",
    "pepnet", "pgpointnovo", "pi-helixnovo", "pi-primenovo", "pointnovo",'SMSNet'
]

# 合并每个工具的结果
for tool_name, tool_fold in zip(tools_list, tools_fold):
    print(tool_name)
    result_file = os.path.join(path, tool_fold,'131ab_EThcd', f"{tool_name}_result.csv")
    if not os.path.exists(result_file):
        print(f"❌ File not found: {result_file}, skipping.")
        continue

    result_df = pd.read_csv(result_file)
    result_df.set_index("Spectrum Name", inplace=True)

    # 只保留 DB_search 中的 Spectrum Name
    result_df = result_df[result_df.index.isin(db_search_df.index)]

    # 列名处理
    peptide_col = f"{tool_name} Peptide"
    score_col = f"{tool_name} Score"
    aa_score_col = f"{tool_name} aaScore" if f"{tool_name} aaScore" in result_df.columns else None

    # 填空处理
    result_df[peptide_col] = result_df[peptide_col].fillna('').replace('', '')
    result_df[score_col] = result_df[score_col].fillna(0)
    if aa_score_col:
        result_df[aa_score_col] = result_df[aa_score_col].fillna('').replace('', '')

    # 强制 peptide 为空时 score=0，aaScore=''
    empty_peptide_mask = result_df[peptide_col] == ''
    result_df.loc[empty_peptide_mask, score_col] = 0
    if aa_score_col:
        result_df.loc[empty_peptide_mask, aa_score_col] = ''

    # 要合并的列
    merge_cols = [peptide_col, score_col]
    if aa_score_col:
        merge_cols.append(aa_score_col)

    # 合并进主表
    db_search_df = db_search_df.join(result_df[merge_cols], how="left")

# 写入合并后的文件
output_path = os.path.join(path, "EThcD_Trypsin_DB_search_merged.csv")
db_search_df.reset_index(inplace=True)
db_search_df.to_csv(output_path, index=False)
print(f"✅ 合并完成，输出文件：{output_path}")