In [1]:
!pip install pyopenms=="3.2.0.1" pandas tqdm



In [2]:
import os
import gc
from tqdm import tqdm
import pandas as pd
import pyopenms as oms
from typing import Union, Sequence, Optional, List

type StrPath = os.PathLike | str



In [5]:
class OpenMS_File_Handler:
	def check_ending_experiment(self, file: StrPath) -> bool:
		"""
		Check whether the file has a mzML or mzXML ending.

		:param file: Path to file
		:type file: StrPath
		:return: Ending is mzML or mzXML
		:rtype: bool
		"""
		return (
			file.endswith(".mzML")
			or file.endswith(".MzML")
			or file.endswith(".mzXML")
			or file.endswith(".MzXML")
		)

	def read_experiment(self, experiment_path: StrPath, separator: str = "\t") -> oms.MSExperiment:
		"""
		Read in MzXML or MzML File as a pyopenms experiment. If the file is in tabular format,
		assumes that is is in long form with two columns ["mz", "inty"]

		:param experiment_path: Path to experiment
		:type experiment_path: StrPath
		:param separator: Separator of data, defaults to "\t"
		:type separator: str, optional
		:raises ValueError: The experiment must end with a valid ending.
		:return: Experiment
		:rtype: pyopenms.MSExperiment
		"""
		experiment = oms.MSExperiment()
		if experiment_path.endswith(".mzML") or experiment_path.endswith(".MzML"):
			file = oms.MzMLFile()
			file.load(experiment_path, experiment)
		elif experiment_path.endswith(".mzXML") or experiment_path.endswith(".MzXML"):
			file = oms.MzXMLFile()
			file.load(experiment_path, experiment)
		elif (
			experiment_path.endswith(".tsv")
			or experiment_path.endswith(".csv")
			or experiment_path.endswith(".txt")
		):
			exp_df = pd.read_csv(experiment_path, sep=separator)
			spectrum = oms.MSSpectrum()
			spectrum.set_peaks((exp_df["mz"], exp_df["inty"]))
			experiment.addSpectrum(spectrum)
		else:
			raise ValueError(
				f'Invalid ending of {experiment_path}. Must be in [".MzXML", ".mzXML", ".MzML", ".mzML", ".tsv", ".csv", ".txt"]'
			)
		return experiment

	def load_experiment(
		self, experiment: Union[oms.MSExperiment, StrPath], separator: str = "\t"
	) -> oms.MSExperiment:
		"""
		If no experiment is given, loads and returns it from either .mzML or .mzXML file.
		Collects garbage with gc.collect() to ensure space in the RAM.

		:param experiment: Experiment, or Path to experiment
		:type experiment: Union[oms.MSExperiment, StrPath]
		:param separator: Separator of data, defaults to "\t"
		:type separator: str, optional
		:return: Experiment
		:rtype: pyopenms.MSExperiment
		"""
		gc.collect()
		if isinstance(experiment, oms.MSExperiment):
			return experiment
		else:
			return self.read_experiment(experiment, separator=separator)

	def load_experiments(
		self,
		experiments: Union[Sequence[Union[oms.MSExperiment, StrPath]], StrPath],
		file_ending: Optional[str] = None,
		prefix: str = "",
		separator: str = "\t",
		data_load: bool = True,
	) -> Sequence[Union[oms.MSExperiment, StrPath]]:
		"""
		Load a batch of experiments.

		:param experiments: Experiments, either described by a list of paths or one path as base directory,
		or an existing experiment.
		:type experiments: Union[Sequence[Union[oms.MSExperiment,str]], str]
		:param file_ending: Ending of experiment file, defaults to None
		:type file_ending: Optional[str], optional
		:param prefix: Prefix of file, defaults to ""
		:type prefix: str, optional
		:param separator: Separator of data, defaults to "\t"
		:type separator: str, optional
		:param data_load: Load the data or just combine the base string to a list of full filepaths, defaults to True
		:type data_load: bool, optional
		:return: Experiments
		:rtype: Sequence[Union[oms.MSExperiment,str]]
		"""
		if isinstance(experiments, str):
			if file_ending:
				experiments = [
					os.path.join(experiments, file)
					for file in os.listdir(experiments)
					if file.endswith(file_ending) and file.startswith(prefix)
				]
			else:
				experiments = [
					os.path.join(experiments, file)
					for file in os.listdir(experiments)
					if self.check_ending_experiment(file) and file.startswith(prefix)
				]
		if data_load:
			experiments = [
				self.load_experiment(experiment, separator=separator)
				for experiment in tqdm(experiments)
			]
		return experiments

	def load_name(
		self, experiment: Union[oms.MSExperiment, str], alt_name: Optional[str] = None
	) -> str:
		"""
		Load the name of an experiment.

		:param experiment: Experiment
		:type experiment: Union[oms.MSExperiment, str]
		:param alt_name: Alternative Name if none is found, defaults to None
		:type alt_name: Optional[str], optional
		:raises ValueError: Raises error if no file name is found and no alt_name is given.
		:return: Name of experiment or alternative name
		:rtype: str
		"""
		if isinstance(experiment, str):
			return "".join(experiment.split(".")[:-1])
		else:
			if experiment.getLoadedFilePath():
				return "".join(os.path.basename(experiment.getLoadedFilePath()).split(".")[:-1])
			elif alt_name:
				return alt_name
			else:
				raise ValueError("No file path found in experiment. Please provide alt_name.")

	def load_names_batch(
		self,
		experiments: Union[Sequence[Union[oms.MSExperiment, str]], str],
		file_ending: str = ".mzML",
		prefix: str = "",
	) -> List[str]:
		"""
		If no experiment is given, loads and returns it from either .mzML or .mzXML file.

		:param experiments: Experiments
		:type experiments: Union[Sequence[Union[oms.MSExperiment,str]], str]
		:param file_ending: Ending of experiment file, defaults to ".mzML"
		:type file_ending: str, optional
		:param prefix: Prefix of file, defaults to ""
		:type prefix: str, optional
		:return: List of experiment names
		:rtype: List[str]
		"""
		if isinstance(experiments, str):
			if file_ending:
				return [
					self.load_name(file)
					for file in tqdm(os.listdir(experiments))
					if file.endswith(file_ending) and file.startswith(prefix)
				]
			else:
				return [
					self.load_name(file)
					for file in tqdm(os.listdir(experiments))
					if self.check_ending_experiment(file) and file.startswith(prefix)
				]
		else:
			if isinstance(experiments[0], str):
				return [self.load_name(experiment) for experiment in tqdm(experiments)]
			else:
				return [
					self.load_name(experiment, str(i))
					for i, experiment in enumerate(tqdm(experiments))
				]

	def load_experiments_df(
		self,
		data_dir: str,
		file_ending: str,
		prefix: str = "",
		separator: str = "\t",
		data_load: bool = True,
		table_backend=pd,
	) -> pd.DataFrame:
		"""
		Load a Flow injection analysis dataframe, defining important properties.

		:param data_dir: Data directorsy
		:type data_dir: str
		:param file_ending: Ending of file
		:type file_ending: str
		:param prefix: Prefix of file, defaults to ""
		:type prefix: str, optional
		:param separator: Separator for file, defaults to "\t"
		:type separator: str, optional
		:param data_load: Load data or only return list of experiments, defaults to True
		:type data_load: bool, optional
		:param table_backend: Use pandas or polars as backend, defaults to pd
		:type table_backend: _type_, optional
		:return: _description_
		:rtype: Union[pandas.DataFrame]
		"""
		print("Loading names:")
		names = self.load_names_batch(data_dir, file_ending, prefix=prefix)
		polarities = [{"pos": 1, "neg": -1}.get(name.split("_")[-1]) for name in names]
		print("Loading experiments:")
		experiments = self.load_experiments(
			data_dir, file_ending, prefix=prefix, separator=separator, data_load=data_load
		)
		fia_df = table_backend.DataFrame([names, polarities, experiments])
		fia_df = fia_df.transpose()
		fia_df.columns = ["sample", "polarity", "experiment"]
		return fia_df


def extract_precursor_info(experiments):
	precursor_infos_files = {}
	for i, row in experiments.iterrows():
		experiment = row["experiment"]
		ms2_spectra = [
			spectrum for spectrum in experiment.getSpectra() if spectrum.getMSLevel() >= 2
		]
		retention_times = []
		precursor_mzs = []
		for ms2_spectrum in ms2_spectra:
			for precursor in ms2_spectrum.getPrecursors():
				retention_times.append(ms2_spectrum.getRT())
				precursor_mzs.append(precursor.getMZ())
		precursor_infos_files[os.path.basename(experiment.getLoadedFilePath())] = pd.DataFrame(
			{"m/z": precursor_mzs, "rt": retention_times}
		)
	return precursor_infos_files


def save_precursor_info(precursor_infos_files, out_path):
	for filename, precursors in precursor_infos_files.items():
		name = ".".join(filename.split(".")[:-1])
		precursors.to_csv(os.path.join(out_path, f"{name}_precursors.tsv"), sep="\t")


def extract_precursor_info_files(in_path, out_path=None, prefix=None, file_ending=".mzML"):
	oms_file_handler = OpenMS_File_Handler()
	if not os.path.isdir(in_path):
		in_path, prefix = os.path.split(in_path)
	if not out_path:
		out_path = in_path

	experiments = oms_file_handler.load_experiments_df(
		data_dir=in_path, file_ending=file_ending, prefix=prefix
	)

	precursor_infos_files = extract_precursor_info(experiments)

	save_precursor_info(precursor_infos_files, out_path)

In [6]:
in_path = os.path.normpath(
	"D:/mine2sirius_pipe/tests/example_files/minimal"
)  # "Path_to_folder/Prefix_of_summary_files"
out_path = os.path.normpath("D:/mine2sirius_pipe/tests/example_files")

extract_precursor_info_files(in_path=in_path, out_path=out_path)

Loading names:


100%|██████████| 12/12 [00:00<?, ?it/s]


Loading experiments:


100%|██████████| 1/1 [00:00<00:00, 28.63it/s]
