In [None]:
#| default_exp match.match_utils

# PSM Match functionalities

In [None]:
#| export

import numpy as np
import numba
import pandas as pd
import tqdm
import os

In [None]:
#| export

@numba.njit
def match_closest_peaks(
    spec_mzs:np.ndarray,
    spec_intens:np.ndarray,
    query_mzs:np.ndarray,
    query_mz_tols:np.ndarray,
)->np.ndarray:
    """Matching query mz values against sorted MS2/spec masses, 
    only closest (minimal abs mass error) peaks are returned.

    Parameters
    ----------
    spec_mzs : np.ndarray
        sorted 1-D mz array of the spectrum

    spec_intens : np.ndarray
        1-D intensity array of the spectrum. Not used here

    query_mzs : np.ndarray
        query n-D mz array

    query_mz_tols : np.ndarray
        query n-D mz array tolerance, same shape as query_mzs

    Returns
    -------
    np.ndarray
        Matched indices of spec_mzs, -1 means no peaks were matched.
        Same shape as query_mzs 
    """
    query_left_mzs = query_mzs-query_mz_tols
    query_right_mzs = query_mzs+query_mz_tols
    idxes = np.searchsorted(spec_mzs, query_left_mzs)
    ret_indices = np.empty_like(query_mzs, dtype=np.int32)
    for i,idx in np.ndenumerate(idxes):
        min_merr = 1000000
        closest_idx = -1
        for _idx in range(idx, len(spec_mzs)):
            if spec_mzs[_idx]>query_right_mzs[i]:
                break
            elif spec_mzs[_idx]<query_left_mzs[i]:
                continue
            elif min_merr > abs(spec_mzs[_idx]-query_mzs[i]):
                min_merr = abs(spec_mzs[_idx]-query_mzs[i])
                closest_idx = _idx
        ret_indices[i] = closest_idx
    return ret_indices


@numba.njit
def match_highest_peaks(
    spec_mzs:np.ndarray,
    spec_intens:np.ndarray,
    query_mzs:np.ndarray,
    query_mz_tols:np.ndarray,
)->np.ndarray:
    """Matching query mz values against sorted MS2/spec masses, 
    only highest peaks are returned.

    Parameters
    ----------
    spec_mzs : np.ndarray
        sorted 1-D mz array of the spectrum

    spec_intens : np.ndarray
        1-D intensity array of the spectrum. Not used here

    query_mzs : np.ndarray
        query n-D mz array

    query_mz_tols : np.ndarray
        query n-D mz array tolerance, same shape as query_mzs

    Returns
    -------
    np.ndarray
        Matched indices of spec_mzs, -1 means no peaks were matched.
        Same shape as query_mzs 
    """
    query_left_mzs = query_mzs-query_mz_tols
    query_right_mzs = query_mzs+query_mz_tols
    idxes = np.searchsorted(spec_mzs, query_left_mzs)
    ret_indices = np.empty_like(query_mzs, dtype=np.int32)
    for i,idx in np.ndenumerate(idxes):
        highest = 0
        highest_idx = -1
        for _idx in range(idx, len(spec_mzs)):
            if spec_mzs[_idx]>query_right_mzs[i]:
                break
            elif spec_mzs[_idx]<query_left_mzs[i]:
                continue
            elif highest < spec_intens[_idx]:
                highest = spec_intens[_idx]
                highest_idx = _idx
        ret_indices[i] = highest_idx
    return ret_indices

@numba.njit
def match_profile_peaks(
    spec_mzs:np.ndarray,
    spec_intens:np.ndarray,
    query_mzs:np.ndarray,
    query_mz_tols:np.ndarray,
)->tuple:
    """
    Matching query mz values against sorted MS2/spec profile masses,
    both left- and right-most m/z values are returned.

    Parameters
    ----------
    spec_mzs : np.ndarray
        sorted 1-D mz array of the spectrum

    spec_intens : np.ndarray
        1-D intensity array of the spectrum. Not used here

    query_mzs : np.ndarray
        query n-D mz array

    query_mz_tols : np.ndarray
        query n-D mz array tolerance, same shape as query_mzs

    Returns
    -------
    tuple
        np.ndarray: matched first (left-most) indices, the shape is the same as query_mzs.
        -1 means no peaks are matched for the query mz.

        np.ndarray: matched last (right-most) indices, the shape is the same as query_mzs.
        -1 means no peaks are matched for the query mz.
    """
    query_left_mzs = query_mzs-query_mz_tols
    query_right_mzs = query_mzs+query_mz_tols
    idxes = np.searchsorted(spec_mzs, query_left_mzs)
    first_indices = np.full_like(
        query_mzs, -1, dtype=np.int32
    )
    last_indices = np.full_like(
        query_mzs, -1, dtype=np.int32
    )
    for i,idx in np.ndenumerate(idxes):
        for first_idx in range(idx, len(spec_mzs)):
            if spec_mzs[first_idx]<query_left_mzs[i]:
                continue
            elif spec_mzs[first_idx]>query_right_mzs[i]:
                break
            else:
                first_indices[i] = first_idx
                if first_idx == len(spec_mzs)-1:
                    last_indices[i] = first_idx
                else:
                    for last_idx in range(first_idx+1, len(spec_mzs)):
                        if spec_mzs[last_idx]>query_right_mzs[i]:
                            break
                    last_indices[i] = last_idx-1
                    break
    return first_indices, last_indices


In [None]:
#| hide
spec_masses = np.array([100, 100.1, 199.9, 200.1, 300])
query_masses = np.array([10,100.0, 200, 300, 400])
Da_tols = np.ones_like(query_masses)*0.2
first_indices, last_indices = match_profile_peaks(
    spec_masses, 0, query_masses, Da_tols,
)
print(first_indices, last_indices)
assert np.all(first_indices==[-1, 0,  2,  4, -1])
assert np.all(last_indices==[-1, 1,  3,  4, -1])

[-1  0  2  4 -1] [-1  1  3  4 -1]


In [None]:
#| hide
#unittests
spec_masses = np.arange(10, dtype=float)*100
query_masses = spec_masses
Da_tols = query_masses*20*1e-6
idxes = match_closest_peaks(spec_masses, 0, query_masses, Da_tols)
assert np.all(
    np.arange(10, dtype=np.int32)==
    idxes
)

In [None]:
#| hide
#unittests
spec_masses = np.arange(10, dtype=float)*100
query_masses = spec_masses
Da_tols = query_masses*20*1e-6
spec_intens = np.ones_like(spec_masses)
idxes = match_highest_peaks(spec_masses, spec_intens, query_masses, Da_tols)
assert np.all(
    np.arange(10, dtype=np.int32)==
    idxes
)

In [None]:
#| hide
#unittests
spec_masses = np.arange(10, dtype=float)*100
query_masses = spec_masses.copy()
query_masses[1]+=0.01 # -1
query_masses[3]+=0.003 # spec_masses[3]
query_masses[5]-=0.0099 # spec_masses[5]
query_masses[6]+=0.02 #-1
spec_masses[7] = spec_masses[8]-0.007 # matched[7] = -1
spec_masses[8] += 0.01 # matched[8] = 7 as 7 is closer
Da_tols = query_masses*20*1e-6
assert np.allclose(
    match_closest_peaks(spec_masses, 0, query_masses, Da_tols),
    [0, -1,  2,  3,  4,  5, -1, -1,  7,  9]
)

In [None]:
#| hide
#unittests
spec_masses = np.arange(10, dtype=float)*100
query_masses = np.arange(20).reshape((10,2))*100
Da_tols = query_masses*20*1e-6
target = np.arange(20, dtype=np.int32)
target[10:] = -1
assert np.allclose(
    target.reshape((10,2)),
    match_closest_peaks(spec_masses, 0, query_masses, Da_tols)
)