Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve xml parsing for comet and msgfplus #11

Merged
merged 14 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
321 changes: 180 additions & 141 deletions pyprotista/parsers/ident/comet_2020_01_4_parser.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,14 @@
"""Engine parser."""
import multiprocessing as mp
import sys
from io import BytesIO
import xml.etree.ElementTree as etree

import numpy as np
import pandas as pd
import regex as re
from loguru import logger
from lxml import etree
tristan-ranff marked this conversation as resolved.
Show resolved Hide resolved
from tqdm import tqdm

from pyprotista.parsers.ident_base_parser import IdentBaseParser


def _mp_specs_init(func, reference_dict, mapping_dict):
func.reference_dict = reference_dict
func.mapping_dict = mapping_dict


def _get_single_spec_df(spectrum):
"""Primary method for reading and storing information from a single spectrum.

Attributes:
reference_dict (dict): dict with reference columns to be filled in
mapping_dict (dict): mapping of engine level column names to ursgal unified column names

Args:
spectrum (xml Element): namespace of single spectrum with potentially multiple PSMs

Returns:
(pd.DataFrame): dataframe containing spectrum information

"""
spectrum = etree.parse(BytesIO(spectrum)).getroot()
spec_records = []
spec_level_dict = _get_single_spec_df.reference_dict.copy()
spec_level_dict["spectrum_id"] = spectrum.attrib["spectrumID"].split("scan=")[-1]

# Iterate children
for psm in spectrum.findall(".//{*}SpectrumIdentificationItem"):
psm_level_dict = spec_level_dict.copy()
psm_level_dict.update(
{
_get_single_spec_df.mapping_dict[k]: psm.attrib[k]
for k in _get_single_spec_df.mapping_dict
if k in psm.attrib
}
)
cv_param_info = {
c.attrib["name"]: c.attrib["value"] for c in psm.findall(".//{*}cvParam")
}
psm_level_dict.update(
{
_get_single_spec_df.mapping_dict[k]: cv_param_info[k]
for k in _get_single_spec_df.mapping_dict
if k in cv_param_info
}
)

spec_records.append(psm_level_dict)
return pd.DataFrame(spec_records)


class Comet_2020_01_4_Parser(IdentBaseParser):
"""File parser for Comet."""

Expand All @@ -71,108 +18,213 @@ def __init__(self, *args, **kwargs):
Reads in data file and provides mappings.
"""
super().__init__(*args, **kwargs)
self.version = None
self.fixed_mods = None
self.mod_mass_map = None
self.peptide_lookup = None
self.spec_records = None
self.style = "comet_style_1"

tree = etree.parse(self.input_file)
self.root = tree.getroot()
self.reference_dict["search_engine"] = "comet_" + "_".join(
re.findall(
r"([/d]*\d+)",
self.root.find(".//{*}AnalysisSoftware").attrib["version"],
)
)
self.mapping_dict = {
v: k
for k, v in self.param_mapper.get_default_params(style=self.style)[
"header_translations"
]["translated_value"].items()
}
self.reference_dict.update({k: None for k in self.mapping_dict.values()})

@classmethod
def check_parser_compatibility(cls, file):
"""Assert compatibility between file and parser.

Args:
file (str): path to input file
file (path object): path to input file

Returns:
bool: True if parser and file are compatible

"""
is_mzid = file.as_posix().endswith(".mzid")
is_mzid = file.name.endswith(".mzid")

with open(file.as_posix()) as f:
with open(file) as f:
try:
head = "".join([next(f) for _ in range(10)])
except StopIteration:
head = ""
contains_engine = "Comet" in head
return is_mzid and contains_engine

def _map_mods_and_sequences(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cannot comment on line 105 but no need for file.as_posix()

Copy link
Collaborator

@fu fu Nov 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check on whether or not that file is a comet file should
a) be done on the headers only (no need to iter 10 lines)
b) if iter over the file use for line in f - no need to call next
c) no need to reset the head variable in python as it exists only in scope and is recreated every time

I would suggest to use

 with open(file) as i:
      header = i.readline()
if "Comet:EValue" in header

to be perfectly explicit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont think the header is in the first line (xml yada yada), and the try except is in place because you would wanna avoid crashes in empty files (e.g. a 0 PSM MS Amanda file)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as posix is still in there :)
with open(file.as_posix()) as f: will crash on windows - why add as_posix to path lib object? Also Doc string say file is str so str.as_posix won't work at all :)

"""Replace internal tags to retrieve sequences and formatted modification strings.
def get_version(self):
"""Retrieve version from xml.

Operations are performed inplace.
Returns:
version (str): file version
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function does not return version as str but None if elif entry_tag.endswith("AnalysisSoftware"): is not True

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also say dont put return in conditionals but set a value in the conditional and return it in the end.
You can always set None as default in the beginning if this is desired behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've now set it up exactly as you described.
Returning None is actually never desired - should always return the version, nothing else...

"""
# Register fixed mods
modifications = self.root.findall(
".//{*}AnalysisProtocolCollection/{*}SpectrumIdentificationProtocol/{*}ModificationParams/{*}SearchModification"
)
fixed_mods = {
sm.attrib["residues"]: sm.find(".//{*}cvParam[@cvRef='UNIMOD']").attrib[
"name"
]
for sm in modifications
if sm.attrib["fixedMod"] == "true"
}
if len(fixed_mods) > 0:
fixed_mod_strings = []
for fm_res, fm_name in fixed_mods.items():
fixed_mod_strings.append(
self.df["sequence"]
.str.split(fm_res)
.apply(
lambda l: ";".join(
[
fm_name + ":" + ind
for ind in (
np.cumsum(list(map(len, l[:-1]))) + range(1, len(l))
).astype(str)
]
)
version = ""
for event, entry in etree.iterparse(self.input_file):
entry_tag = entry.tag

if entry_tag.endswith("cvList"):
if entry_tag != "{http://psidev.info/psi/pi/mzIdentML/1.2}cvList":
logger.warning(
f"{entry_tag}: Wrong mzIdentML version - Parser made for version 1.2!"
)
elif entry_tag.endswith("AnalysisSoftware"):
version = "comet_" + "_".join(
re.findall("[0-9]+", entry.attrib["version"])
)
break
entry.clear()
return version

fixed_mod_strings = (
pd.concat(fixed_mod_strings, axis=1)
.agg(";".join, axis=1)
.str.rstrip(";")
)
def map_mod_mass(self):
"""Retrieve information on modifications from xml.

modification_mass_map = {
sm.attrib["massDelta"]: sm.find(".//{*}cvParam[@cvRef='UNIMOD']").attrib[
"name"
]
for sm in modifications
}
lookup = {}
for pep in self.root.findall(".//{*}Peptide"):
id = pep.attrib.get("id", "")
lookup[id] = {"modifications": []}
lookup[id]["sequence"] = pep.find(".//{*}PeptideSequence").text
for child in pep.findall(".//{*}Modification"):
lookup[id]["modifications"].append(
f"{modification_mass_map[child.attrib['monoisotopicMassDelta']]}:{child.attrib['location']}"
Returns:
fixed_mods (dict): residues and corresponding names of fixed modifications
mod_mass_map (dict): mapping of masses to modification names
"""
fixed_mods = {}
mod_mass_map = {}

mod_name = ""
modification_information = False

for event, entry in etree.iterparse(self.input_file):
entry_tag = entry.tag

if entry_tag.endswith("AdditionalSearchParams"):
modification_information = True
elif modification_information is True:
if entry_tag.endswith("cvParam"):
mod_name = entry.attrib["name"]
elif entry_tag.endswith("SearchModification"):
mod_mass_map[entry.attrib["massDelta"]] = mod_name
if entry.attrib["fixedMod"] == "true":
_key = entry.attrib["residues"]
fixed_mods[_key] = mod_name
elif entry_tag.endswith("ModificationParams"):
break
entry.clear()
return fixed_mods, mod_mass_map

def get_peptide_lookup(self):
"""Retrieve peptide ids with their corresponding sequences and modifications from xml.

Returns:
peptide_lookup (dict): peptide id with corresponding modifications and sequence
"""
peptide_lookup = {}

modifications = []
sequence = {}
peptide_information = False

for event, entry in etree.iterparse(self.input_file):
entry_tag = entry.tag

if entry_tag.endswith("DBSequence"):
peptide_information = True
elif peptide_information is True:
if entry_tag.endswith("PeptideSequence"):
sequence = entry.text
if len(self.fixed_mods) > 0:
sequence_mod_map = self.map_mods_sequences(sequence)
if sequence_mod_map != "":
modifications.append(sequence_mod_map)
elif entry_tag.endswith("Modification"):
mass = entry.attrib["monoisotopicMassDelta"]
location = entry.attrib["location"]
mass_name = self.mod_mass_map[mass]
modifications.append(mass_name + ":" + location)
elif entry_tag.endswith("Peptide"):
peptide_lookup[entry.attrib["id"]] = (
";".join(modifications),
sequence,
)
modifications = []
elif entry_tag.endswith("PeptideEvidence"):
break
entry.clear()
return peptide_lookup

def get_spec_records(self):
"""Retrieve specs from file.

Returns:
spec_records (list): information on PSMs
"""
spec_records = []

spec_results = {}
spec_ident_items = []
spec_information = False

for event, entry in etree.iterparse(self.input_file):
entry_tag = entry.tag

if entry_tag.endswith("Inputs"):
spec_information = True
elif spec_information is True:
if entry_tag.endswith("cvParam"):
if entry.attrib["name"] in self.mapping_dict:
_key = self.mapping_dict[entry.attrib["name"]]
spec_results[_key] = entry.attrib["value"]
elif entry_tag.endswith("SpectrumIdentificationItem"):
for attribute in list(entry.attrib):
if attribute in self.mapping_dict.keys():
_key = self.mapping_dict[attribute]
spec_results[_key] = entry.attrib[attribute]
mods, sequence = self.peptide_lookup[spec_results["sequence"]]
spec_results["modifications"] = mods
spec_results["sequence"] = sequence
spec_ident_items.append(spec_results)
spec_results = {}
elif entry_tag.endswith("SpectrumIdentificationResult"):
for spec_item in spec_ident_items:
spec_item.update(spec_results)
spec_item["spectrum_id"] = entry.attrib["spectrumID"].lstrip(
"scan="
)
spec_records.append(spec_item)
spec_results = {}
spec_ident_items = []
elif entry_tag.endswith("SpectrumIdentificationList"):
break
entry.clear()
return spec_records

def map_mods_sequences(self, sequence):
"""Map fixed_mods and sequence to get corresponding modifications.

fixed_mods needs at least one entry!

Args:
sequence (str): peptide sequence

Returns:
fixed_mod_strings (str): modifications of corresponding sequence
"""
fixed_mod_strings = []

for fm_res, fm_name in self.fixed_mods.items():
fixed_mod_strings.append(
pd.Series(sequence)
.str.split(fm_res)
.apply(
lambda l: ";".join(
[
fm_name + ":" + ind
for ind in (
np.cumsum(list(map(len, l[:-1]))) + range(1, len(l))
).astype(str)
]
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neary copy-paste from omssa_2_1_9:translate_mods - refactor?

)
lookup[id]["modifications"] = ";".join(lookup[id]["modifications"])
)

# TODO: check mod left strip
seq_mods = pd.DataFrame(self.df["sequence"].map(lookup).to_list())
self.df.loc[:, "modifications"] = (
seq_mods["modifications"].str.cat(fixed_mod_strings, sep=";").str.strip(";")
fixed_mod_strings = (
pd.concat(fixed_mod_strings, axis=1).agg(";".join, axis=1).str.rstrip(";")
)
self.df.loc[:, "sequence"] = seq_mods["sequence"]
fixed_mod_strings = fixed_mod_strings[0]
return fixed_mod_strings

def unify(self):
"""
Expand All @@ -181,25 +233,12 @@ def unify(self):
Returns:
self.df (pd.DataFrame): unified dataframe
"""
spec_idents = [
etree.tostring(e)
for e in self.root.findall(".//{*}SpectrumIdentificationResult")
]
logger.remove()
logger.add(lambda msg: tqdm.write(msg, end=""))
with mp.Pool(
self.params.get("cpus", mp.cpu_count() - 1),
initializer=_mp_specs_init,
initargs=(_get_single_spec_df, self.reference_dict, self.mapping_dict),
) as pool:
chunk_dfs = pool.map(
_get_single_spec_df,
tqdm(spec_idents),
)
logger.remove()
logger.add(sys.stdout)
self.df = pd.concat(chunk_dfs, axis=0, ignore_index=True)
self._map_mods_and_sequences()
self.version = self.get_version()
self.fixed_mods, self.mod_mass_map = self.map_mod_mass()
self.peptide_lookup = self.get_peptide_lookup()
self.spec_records = self.get_spec_records()
self.df = pd.DataFrame(self.spec_records)
self.df["search_engine"] = self.version
self.process_unify_style()

return self.df
Loading