In [None]:
#default_exp io.psm_reader.psm_reader

In [None]:
#hide
%reload_ext autoreload
%autoreload 2

In [None]:
#export
# for nbdev_build_docs
# import os
# __file__ = os.path.expanduser('~/Workspace/AlphaBase/alphabase/io/psm_reader/psm_reader.py')

In [None]:
#hide
# notebook does not have __file__
import alphabase.io.psm_reader.psm_reader
__file__ = alphabase.io.psm_reader.psm_reader.__file__


# Base structure for PSM readers

In [None]:
#export
import pandas as pd
import numpy as np
import alphabase.peptide.mobility as mobility
from alphabase.peptide.precursor import (
    update_precursor_mz, reset_precursor_df
)

In [None]:
#export

def translate_other_modification(
    mod_str: str, 
    mod_dict: dict
)->str:
    '''
    Translate modifications of `mod_str` to the AlphaBase 
    format mapped by mod_dict.
    Args:
        mod_str (str): mod list in str format, seperated by ';', 
            e.g. ModA;ModB
        mod_dict (dict): translate mod dict from others to AlphaBase, 
            e.g. for pFind, key=['Phospho[S]','Oxidation[M]'], 
            value=['Phospho@S','Oxidation@M']
    Returns:
        str: new mods in AlphaBase format seperated by ';'. if any
             modification is not in `mod_dict`, return pd.NA.
    '''
    if not mod_str: return ""
    ret_mods = []
    for mod in mod_str.split(';'):
        if mod in mod_dict:
            ret_mods.append(mod_dict[mod])
        else:
            return pd.NA
    return ";".join(ret_mods)

def keep_modifications(
    mod_str: str, 
    mod_set: set
)->str:
    '''
    Check if modifications of `mod_str` are in `mod_set`.
    Args:
        mod_str (str): mod list in str format, seperated by ';', 
            e.g. Oxidation@M;Phospho@S.
        mod_set (set): mod set to check
    Returns:
        str: original `mod_str` if all modifications are in mod_set 
             else pd.NA.
    '''
    if not mod_str: return ""
    for mod in mod_str.split(';'):
        if not mod in mod_set:
            return pd.NA
    return mod_str


In [None]:
#hide
assert 'a' == translate_other_modification('A', {'A':'a','B':'b'})
assert 'b' == translate_other_modification('B', {'A':'a','B':'b'})
assert 'a;a' == translate_other_modification('A;A', {'A':'a','B':'b'})
assert 'a;b' == translate_other_modification('A;B', {'A':'a','B':'b'})

In [None]:
#export
from alphabase.yaml_utils import load_yaml
import os
import copy

psm_reader_yaml = load_yaml(
    os.path.join(
        os.path.dirname(__file__),
        'psm_reader.yaml'
    )
)

## PSMReaderBase 

`PSMReaderBase` is the base abstract class for all readers. It defines the basic procedures for importing other search engine results into AlphaBase format.

The main entry method is `import_file(filename)`, and it will generate `self._psm_df` (or property `self.psm_df`) after `import_file()`.

In `import_file()` method, we designed five steps to load result files in to AlphaBase format:

1. `origin_df = self._load_file(filename)`. We load result files into a dataframe without doing any file conversion. As different search engines have different file format, some of them are not in the tabular format. **All subclass of `PSMReaderBase` need to re-implement this method**.

2. `self._translate_columns(origin_df)`. We translate columns in `origin_df` into AlphaBase columns by `self.column_mapping`. `self.column_mapping` provides a flexible way for developers to extract their required columns. 

3. `self._load_modifications(origin_df)`. As different search engines have different representation of modifications. We use this method to extract the modifications into `self._psm_df['mods']` and `self._psm_df['mod_sites']`. Note that the modification names are still in other search engines' format. **All subclass of `PSMReaderBase` need to re-implement this method**.

4. `self._translate_modifications()`. Convert modification names into AlphaBase names (`unimod_name@AA`). For most of the search engines, we need a dict (`self.modification_mapping`) to map search engine modification format into AlphaBase (`unimod_name@AA`, `unimod_name` is <umod:mod title=...> in the unimod xml file). **All subclass of `PSMReaderBase` need to re-implement this method**.

5. `self._post_process(filename, origin_df)`. Any required post-processing steps. For example, we remove unknown modifications here.

### Other results must be converted into the alphabase dataframe with required columns:
1. `sequence` (str): AA sequence, for example, 'ATMYPEDR'.
2. `mods` (str): modification names, separated by ';'. For example, 'Oxidation@M', 'Acetyl@Protein N-term;Oxidation@M'.
3. `mod_sites` (str): modification sites, seperated by ';'. For example, '3', '0;3'. The N-term site is 0, and the C-term site is -1, and all other modification sites start from 1.
4. `nAA` (int): number of AA in the sequence, could be set by `df['nAA']=df.sequence.str.len()`.
5. `charge` (int): precursor charge states.
6. `rt` (float): retention time (RT) of peptides, in minutes by default.
7. `rt_norm` (float): RT normalized by the maximum value, could be set by `df['rt_norm'] = df.rt/df.rt.max()`.
### and optional columns:
8. `ccs` (float): collisional cross section (CCS) value, requred for IM data.
9. `mobility` (float): precursor ion mobility value, requred for IM data.
11. `precursor_mz` (float): precursor m/z value.
12. `proteins` (str): protein names, separated by ';'.
13. `genes` (str): gene names, separated by ';'.
14. `protein_ids` (str): protein ids or uniprot ids, separated by ';'.
15. `score` (float): PSM score. The larger the better PSMs, meaning that `E-value` or `P-value` scores must be `-log()`.
16. `fdr` (float): FDR or q-value.
17. `raw_name` (str): Raw file name.
18. `spec_idx` (int): spectrum index starting from 0 in RAW data. For thermo RAW, it is also Scan number - 1. We can use it to locate the MS2 spectrum for identification.
19. `query_id` (int or str): the unique id for not only inlucdes unique spectrum (`spec_idx`), but also the precursor or MS1 isotope index. It could be `query_idx` in alphapept.
20. `decoy`: 0 if the peptide is target match, otherwise 1.

In [None]:
#export

class PSMReaderBase(object):
    def __init__(self,
        *,
        column_mapping:dict = None,
        modification_mapping:dict = None,
        fdr = 0.01,
        keep_decoy = False,
        **kwargs,
    ):
        """The Base class for all PSMReaders. The key of the sub-classes for different 
        search engine format is to re-define `column_mapping` and `modification_mapping`.
        
        Args:
            column_mapping (dict, optional): 
                A dict that maps alphabase's columns to other search engine's.
                The key of the column_mapping is alphabase's column name, and 
                the value could be the column name or a list of column names
                in other engine's result.
                If it is None, this dict will be init by 
                `self._init_column_mapping()`. The dict values could be 
                either str or list, for exaplme:
                columns_mapping = {
                    'sequence': 'NakedSequence', #str
                    'charge': 'Charge', #str
                    'proteins':['Proteins','UniprotIDs'], # list, this reader will automatically detect all of them.
                }
                Defaults to None.
            modification_mapping (dict, optional): 
                A dict that maps alphabase's modifications to other engine's.
                If it is None, this dict will be init by 
                `self._init_modification_mapping()`. The dict values could be 
                either str or list, for exaplme:
                modification_mapping = {
                    'Oxidation@M': 'Oxidation (M)', # str
                    'Phospho@S': ['S(Phospho (STY))','S(ph)','pS'], # list, this reader will automatically detect all of them.
                }
                Defaults to None.
            fdr (float, optional): FDR level to keep PSMs.
                Defaults to 0.01.
            keep_decoy(bool, optional): If keep decoy PSMs in self.psm_df.
                Defautls to False.
        
        Attributes:
            column_mapping (dict): dict structure same as column_mapping in Args.
            modification_mapping (dict): dict structure same as modification_mapping in Args.
                We must use self.set_modification_mapping(new_mapping) to update it.
            _psm_df (pd.DataFrame): the PSM DataFrame after loading from search engines.
            psm_df (pd.DataFrame): the getter of self._psm_df
            keep_fdr (float): The only PSMs with FDR<=keep_fdr were returned in self._psm_df. 
            keep_decoy (bool): If keep decoy PSMs in self.psm_df.
            _min_max_rt_norm (bool): if True, the 'rt_norm' values in self._psm_df 
                will be normalized by rt_norm = (self.psm_df.rt-rt_min)/(rt_max-rt_min).
                It is useful to normalize iRT values as they contain negative values.
                Defaults to False.
        """

        self.set_modification_mapping(modification_mapping)
        
        if column_mapping is not None:
            self.column_mapping = column_mapping
        else:
            self._init_column_mapping()

        self._psm_df = pd.DataFrame()
        self.keep_fdr = fdr
        self.keep_decoy = keep_decoy
        self._min_max_rt_norm = False

    @property
    def psm_df(self)->pd.DataFrame:
        return self._psm_df

    def set_modification_mapping(self, modification_mapping:dict):
        if modification_mapping is None:
            self._init_modification_mapping()
        elif isinstance(modification_mapping, str):
            if modification_mapping in psm_reader_yaml:
                self.modification_mapping = copy.deepcopy(
                    psm_reader_yaml[
                        modification_mapping
                    ]['modification_mapping']
                )
            else:
                raise ValueError(
                    f'Unknown modification mapping: {modification_mapping}'
                )
        else:
            self.modification_mapping = copy.deepcopy(
                modification_mapping
            )
        self._reverse_mod_mapping()

    def _init_modification_mapping(self):
        self.modification_mapping = {}
        
    def _reverse_mod_mapping(self):
        self.rev_mod_mapping = {}
        for (
            this_mod, other_mod
        ) in self.modification_mapping.items():
            if isinstance(other_mod, (list, tuple)):
                for _mod in other_mod:
                    self.rev_mod_mapping[_mod] = this_mod
            else:
                self.rev_mod_mapping[other_mod] = this_mod
                
    def _init_column_mapping(self):
        raise NotImplementedError(
            f'"{self.__class__}" must implement "_init_column_mapping()"'
        )
    
    def load(self, _file):
        """ Wrapper for import_file() """
        return self.import_file(_file)

    def import_file(self, _file):
        """
        This is the main entry function of PSM readers, 
        it imports the file with following steps:
        --------
        origin_df = self._load_file(_file)
        self._translate_columns(origin_df)
        self._translate_decoy(origin_df)
        self._translate_score(origin_df)
        self._load_modifications(origin_df)
        self._translate_modifications()
        self._post_process(origin_df)
        --------
        Args:
            _file: file path or file stream.
        """
        origin_df = self._load_file(_file)
        if len(origin_df) == 0:
            self._psm_df = pd.DataFrame()
        else:
            self._translate_columns(origin_df)
            self._translate_decoy(origin_df)
            self._translate_score(origin_df)
            self._load_modifications(origin_df)
            self._translate_modifications()
            self._post_process(origin_df)
        return self._psm_df

    def _translate_decoy(
        self, 
        origin_df:pd.DataFrame=None
    ):
        pass

    def _translate_score(
        self, 
        origin_df:pd.DataFrame=None
    ):
        # some scores are evalue/pvalue, it should be translated
        # to -log(evalue), as score is the larger the better
        pass

    def normalize_rt(self):
        if 'rt' in self.psm_df.columns:
            min_rt = self.psm_df.rt.min()
            if not self._min_max_rt_norm or min_rt > 0:
                min_rt = 0
            self.psm_df['rt_norm'] = (
                self.psm_df.rt - min_rt
            ) / (self.psm_df.rt.max()-min_rt)

    def norm_rt(self):
        self.normalize_rt()

    def normalize_rt_by_raw_name(self):
        if not 'rt' in self.psm_df.columns:
            return
        if not 'rt_norm' in self.psm_df.columns:
            self.norm_rt()
        if not 'raw_name' in self.psm_df.columns:
            return
        for raw_name, df_group in self.psm_df.groupby('raw_name'):
            self.psm_df.loc[
                df_group.index,'rt_norm'
            ] = df_group.rt_norm / df_group.rt_norm.max()

    def _load_file(self, filename:str)->pd.DataFrame:
        """
        Load original dataframe from PSM filename. 
        Different search engines may store PSMs in different ways:
        tsv, csv, HDF, XML, ...

        Args:
            filename (str): psm filename

        Raises:
            NotImplementedError: Subclasses must re-implement this method

        Returns:
            pd.DataFrame: loaded dataframe
        """
        raise NotImplementedError(
            f'"{self.__class__}" must implement "_load_file()"'
        )

    def _translate_columns(self, origin_df:pd.DataFrame):
        """
        Translate the dataframe from other search engines 
        to AlphaBase format

        Args:
            origin_df (pd.DataFrame): df of other search engines

        Returns: 
            None. Add information inplace into self._psm_df
        """
        self._psm_df = pd.DataFrame()
        for col, map_col in self.column_mapping.items():
            if isinstance(map_col, str):
                if map_col in origin_df.columns:
                    self._psm_df[col] = origin_df[map_col]
            else:
                for other_col in map_col:
                    if other_col in origin_df.columns:
                        self._psm_df[col] = origin_df[other_col]
                        break
                    
        if (
            'scan_num' in self._psm_df.columns and 
            not 'spec_idx' in self._psm_df.columns
        ):
            self._psm_df['spec_idx'] = self._psm_df.scan_num - 1
    

    def _load_modifications(self, origin_df:pd.DataFrame):
        """Read modification information from 'origin_df'. 
        Some of search engines use modified_sequence, some of them
        use additional columns to store modifications and the sites.

        Args:
            origin_df (pd.DataFrame): dataframe of original search engine.

        Returns: 
            None: Add information inplace into 
            self._psm_df['mods'], self._psm_df['mod_sites'].
        """
        raise NotImplementedError(
            f'"{self.__class__}" must implement "_load_modifications()"'
        )

    def _translate_modifications(self):
        '''
        Translate modifications to AlphaBase format.

        Raises: KeyError if `mod` in `mod_names` is 
            not in `self.modification_mapping`
        '''
        self._psm_df.mods = self._psm_df.mods.apply(
            translate_other_modification, 
            mod_dict=self.rev_mod_mapping
        )

    def _post_process(self, 
        origin_df:pd.DataFrame
    ):
        """
        Set 'nAA' columns, remove unknown modifications 
        and perform other post processings, 
        e.g. get 'rt_norm', remove decoys, filter FDR...

        Args:
            origin_df (pd.DataFrame): the loaded original df
        """
        self._psm_df['nAA'] = self._psm_df.sequence.str.len()

        self.normalize_rt_by_raw_name()

        self._psm_df = self._psm_df[
            ~self._psm_df['mods'].isna()
        ]

        keep_rows = np.ones(
            len(self._psm_df), dtype=bool
        )
        if 'fdr' in self._psm_df.columns:
            keep_rows &= (self._psm_df.fdr <= self.keep_fdr)
        if (
            'decoy' in self._psm_df.columns 
            and not self.keep_decoy
        ):
            keep_rows &= (self._psm_df.decoy == 0)

        self._psm_df = self._psm_df[keep_rows]
        
        reset_precursor_df(self._psm_df)
        
        if 'precursor_mz' not in self._psm_df:
            self._psm_df = update_precursor_mz(self._psm_df)

        if (
            'ccs' in self._psm_df.columns and 
            'mobility' not in self._psm_df.columns
        ):
            self._psm_df['mobility'] = (
                mobility.ccs_to_mobility_for_df(
                    self._psm_df,
                    'ccs'
                )
            )
        elif (
            'mobility' in self._psm_df.columns and
            'ccs' not in self._psm_df.columns
        ):
            self._psm_df['ccs'] = (
                mobility.mobility_to_ccs_for_df(
                    self._psm_df,
                    'mobility'
                )
            )

    def filter_psm_by_modifications(self, include_mod_set = set([
        'Oxidation@M','Phospho@S','Phospho@T',
        'Phospho@Y','Acetyl@Protein N-term'
    ])):
        '''
            Only keeps peptides with modifications in `include_mod_list`.
        '''
        self._psm_df.mods = self._psm_df.mods.apply(
            keep_modifications, mod_set=include_mod_set
        )
        
        self._psm_df.dropna(
            subset=['mods'], inplace=True
        )
        self._psm_df.reset_index(drop=True, inplace=True)


In [None]:
#hide
class TestReader(PSMReaderBase):
    def _init_column_mapping(self): pass
reader = TestReader(
    modification_mapping={'A':'a','B':'b'}
)
for mod, other in reader.modification_mapping.items():
    assert other in reader.rev_mod_mapping
    assert mod == reader.rev_mod_mapping[other]

# PSMReaderProvider

To make it easier to create different readers, we design a `Provider` or `Factory` called `PSMReaderProvider` to manage all reader classes. `PSMReaderProvider` is instantiated as a global object `psm_reader_provider`. 

After a subclass of `PSMReaderBase` is defined, for example `AlphaPeptReader`, we can then register it in to `psm_reader_provider` by using `psm_reader_provider.register_reader('alphapept', AlphaPeptReader)`. Once we are going to use it, we just need to create a `AlphaPeptReader` object with `psm_reader_provider.get_reader('alphapept')`.

In [None]:
#export

class PSMReaderProvider:
    def __init__(self):
        self.reader_dict = {}

    def register_reader(self, reader_type, reader_class):
        self.reader_dict[reader_type.lower()] = reader_class

    def get_reader(self, 
        reader_type:str,
        *,
        column_mapping:dict=None, 
        modification_mapping:dict=None,
        fdr=0.01, keep_decoy=False,
        **kwargs
    )->PSMReaderBase:
        return self.reader_dict[reader_type.lower()](
            column_mapping = column_mapping,
            modification_mapping=modification_mapping,
            fdr=fdr, keep_decoy=keep_decoy, **kwargs
        )

    def get_reader_by_yaml(self, 
        yaml_dict:dict,
    )->PSMReaderBase:
        return self.get_reader(
            **copy.deepcopy(yaml_dict)
        )

psm_reader_provider = PSMReaderProvider()

As we have loaded all readers in `psm_reader_provider` within alphabase.io.psm_reader.\__init__.py, we can easily access all registered readers by `psm_reader_provider`.

In [None]:
#hide
from alphabase.io.psm_reader import psm_reader_provider
from alphabase.io.psm_reader import (
    alphapept_reader, maxquant_reader, 
    pfind_reader, dia_psm_reader
)
assert isinstance(psm_reader_provider.get_reader_by_yaml(psm_reader_yaml['alphapept']), alphapept_reader.AlphaPeptReader)
assert isinstance(psm_reader_provider.get_reader_by_yaml(psm_reader_yaml['maxquant']), maxquant_reader.MaxQuantReader)
assert isinstance(psm_reader_provider.get_reader_by_yaml(psm_reader_yaml['diann']), dia_psm_reader.DiannReader)
assert isinstance(psm_reader_provider.get_reader_by_yaml(psm_reader_yaml['spectronaut']), dia_psm_reader.SpectronautReader)
assert isinstance(psm_reader_provider.get_reader_by_yaml(psm_reader_yaml['pfind']), pfind_reader.pFindReader)
reader = psm_reader_provider.get_reader_by_yaml(psm_reader_yaml['diann'])
assert np.all(np.array(reader.modification_mapping['Phospho@S'])==np.array([
    'S(Phospho (S))',
    'S(Phospho (ST))',
    'S(Phospho (STY))',
    'S(ph)',
    'S(UniMod:21)',
    'pS',
    'S[Phospho (S)]',
    'S[Phospho (ST)]',
    'S[Phospho (STY)]',
    'S[ph]',
    'S[UniMod:21]'])
)
try:
    psm_reader_provider.get_reader_by_yaml(psm_reader_yaml['unknown'])
except Exception as e:
    assert type(e) is KeyError