In [None]:
import copy
import csv
import json
import math
import os
import re
import urllib.parse
from typing import Dict, Iterable, Optional

import joblib
import numpy as np
import pandas as pd
import pyteomics.mgf
import requests
import spectrum_utils.spectrum as sus
import tqdm.notebook as tqdm

In [None]:
spectra_cache = joblib.load('spectra_cache.joblib')

In [None]:
def _query_msv_spectrum(usi: str) -> Optional[sus.MsmsSpectrum]:
    """
    Retrieve a spectrum from MassIVE.
    
    Parameters
    ----------
    usi : str
        The USI of the spectrum to be retrieved.
    
    Returns
    -------
    Optional[sus.MsmsSpectrum]
        The spectrum, or None if it couldn't be retrieved. The precursor m/z
        and precursor charge of the spectrum will be set to 0.
    """
    if usi in spectra_cache:
        return spectra_cache[usi]
    
    try:
        lookup_url = (f'https://massive.ucsd.edu/ProteoSAFe/'
                      f'QuerySpectrum?id={usi}')
        lookup_request = requests.get(lookup_url, timeout=None)
        lookup_request.raise_for_status()
        for spectrum_file in lookup_request.json()['row_data']:
            if any(spectrum_file['file_descriptor'].lower().endswith(extension)
                   for extension in ['mzml', 'mzxml', 'mgf']):
                scan = usi.rsplit(':', 1)[-1]
                request_url = (f'https://gnps.ucsd.edu/ProteoSAFe/'
                               f'DownloadResultFile?'
                               f'task=4f2ac74ea114401787a7e96e143bb4a1&'
                               f'invoke=annotatedSpectrumImageText&block=0&'
                               f'file=FILE->{spectrum_file["file_descriptor"]}'
                               f'&scan={scan}&peptide=*..*&force=false&'
                               f'format=JSON&uploadfile=True')
                try:
                    spectrum_request = requests.get(request_url, timeout=None)
                    spectrum_request.raise_for_status()
                    spectrum_dict = spectrum_request.json()
                except (requests.exceptions.HTTPError,
                        json.decoder.JSONDecodeError):
                    continue
                if len(spectrum_dict['peaks']) == 0:
                    continue
                mz, intensity = zip(*spectrum_dict['peaks'])
                if 'precursor' in spectrum_dict:
                    precursor_mz = float(
                        spectrum_dict['precursor'].get('mz', 0))
                    charge = int(spectrum_dict['precursor'].get('charge', 0))
                else:
                    precursor_mz, charge = 0, 0
                spec = sus.MsmsSpectrum(usi, precursor_mz, charge, mz, intensity)
                # Remove 0 intensity peaks.
                spec.filter_intensity(0.00001)
                spectra_cache[usi] = spec
                return spec
    except requests.exceptions.HTTPError:
        pass
    return _query_proxi_spectrum(usi)


def _query_proxi_spectrum(usi: str) -> Optional[sus.MsmsSpectrum]:
    request_url = (f'http://massive.ucsd.edu/ProteoSAFe/proxi/v0.1/spectra?'
                   f'resultType=full&usi={urllib.parse.quote_plus(usi)}')
    try:
        spectrum_request = requests.get(request_url, timeout=None)
        spectrum_request.raise_for_status()
        spectrum_dict = spectrum_request.json()[0]
        if 'mzs' not in spectrum_dict:
            return None
        mz = [float(m) for m in spectrum_dict['mzs']]
        intensity = [float(intensity)
                     for intensity in spectrum_dict['intensities']]
        precursor_mz, charge = 0, 0
        spec = sus.MsmsSpectrum(usi, precursor_mz, charge, mz, intensity)
        # Remove 0 intensity peaks.
        spec.filter_intensity(0.00001)
        spectra_cache[usi] = spec
        return spec
    except (requests.exceptions.HTTPError,
            json.decoder.JSONDecodeError):
        return None


def _write_spectra_mgf(filename: str, spectra: Iterable[sus.MsmsSpectrum]) \
        -> None:
    """
    Write the given spectra to an MGF file.
    
    Parameters
    ----------
    filename : str
        The MGF file name where the spectra will be written.
    spectra : Iterable[sus.MsmsSpectrum]
        The spectra to be written to the MGF file.
    """
    with open(filename, 'w') as f_out:
        pyteomics.mgf.write(_spectra_to_dicts(spectra), f_out)
        
        
def _get_charge(adduct: str) -> str:
    match = re.match('^\[.*\](\d?)([+-])$', adduct)
    if match is None:
        return 0
    count, mode = match.groups()
    count = int(count) if count else 1
    return count if mode == '+' else -count


def _spectra_to_dicts(spectra: Iterable[sus.MsmsSpectrum]) -> Iterable[Dict]:
    """
    Convert MsmsSpectrum objects to Pyteomics MGF spectrum dictionaries.
    
    Parameters
    ----------
    spectra : Iterable[sus.MsmsSpectrum]
        The spectra to be converted to Pyteomics MGF dictionaries.
        
    Returns
    -------
    Iterable[Dict]
        The given spectra as Pyteomics MGF dictionaries.
    """
    for spectrum in spectra:
        params = {'title': spectrum.identifier,
                  'pepmass': spectrum.precursor_mz,
                  'charge': _get_charge(spectrum.adduct),
                  'ion': spectrum.adduct,
                  'scans': spectrum.scan,
                  'mslevel': 2}
        yield {'params': params,
               'm/z array': spectrum.mz,
               'intensity array': spectrum.intensity}


def _batch_annotation_sheet(suspects: pd.DataFrame) -> pd.DataFrame:
    """
    Export the suspects as a spreadsheet that can be used as input for the
    reference spectral library batch creation workflow.

    Documentation: https://ccms-ucsd.github.io/GNPSDocumentation/batchupload/

    Parameters
    ----------
    suspects : pd.DataFrame
        The suspects to be exported.
    charges : Iterable[int]
        The precursor charges of the suspects.
        
    Returns
    -------
    pd.DataFrame
        Annotation sheet to be used for the reference spectral library batch
        creation workflow.
    """
    spec_lib_annotation = pd.DataFrame(index=suspects.index)
    spec_lib_annotation['FILENAME'] = (suspects['SuspectPath']
                                       .str.rsplit('/', 1).str[-1])
    spec_lib_annotation['SEQ'] = '*..*'
    spec_lib_annotation['COMPOUND_NAME'] = (
        'Suspect related to ' + suspects['CompoundName'] +
        ' (predicted molecular formula: ' +
        suspects['MolecularFormula'].fillna('unknown') + ')'
        ' with delta m/z ' + suspects['GroupDeltaMZ'].round(3).astype(str) +
        ' (putative explanation: ' + suspects['Rationale'] +
        '; atomic difference: ' + suspects['AtomicDifference'] + ')')
    spec_lib_annotation['MOLECULEMASS'] = suspects['SuspectPrecursorMZ']
    spec_lib_annotation['INSTRUMENT'] = suspects['Instrument']
    spec_lib_annotation['IONSOURCE'] = suspects['IonSource']
    spec_lib_annotation['EXTRACTSCAN'] = np.arange(1, len(suspects) + 1)
    spec_lib_annotation['SMILES'] = 'N/A'
    spec_lib_annotation['INCHI'] = 'N/A'
    spec_lib_annotation['INCHIAUX'] = 'N/A'
    spec_lib_annotation['CHARGE'] = 0
    spec_lib_annotation['IONMODE'] = suspects['IonMode']
    spec_lib_annotation['PUBMED'] = 'N/A'
    spec_lib_annotation['ACQUISITION'] = 'Crude'
    spec_lib_annotation['EXACTMASS'] = 0
    spec_lib_annotation['DATACOLLECTOR'] = 'Wout Bittremieux'
    spec_lib_annotation['ADDUCT'] = suspects['Adduct']
    spec_lib_annotation['INTEREST'] = 'N/A'
    spec_lib_annotation['LIBQUALITY'] = 4
    spec_lib_annotation['GENUS'] = 'N/A'
    spec_lib_annotation['SPECIES'] = 'N/A'
    spec_lib_annotation['STRAIN'] = 'N/A'
    spec_lib_annotation['CASNUMBER'] = 'N/A'
    spec_lib_annotation['PI'] = 'Pieter Dorrestein'
    return spec_lib_annotation

In [None]:
suspects_unique = pd.read_csv('../../data/interim/suspects_unique.csv.xz',
                              compression='xz')
suspects_unique['USI'] = (
    'mzspec:' + suspects_unique['Dataset'] + ':' +
    suspects_unique['SuspectPath'].apply(os.path.basename) +
    ':scan:' + suspects_unique['SuspectScanNr'].astype(str))

In [None]:
# Retrieve all spectra from MassIVE.
while True:
    try:
        joblib.dump(spectra_cache, 'spectra_cache.joblib')
        spectra = [_query_msv_spectrum(usi) for usi in tqdm.tqdm(
            suspects_unique['USI'], desc='USIs retrieved', unit='USI')]
        break
    # Account for connection errors while retrieving spectra.
    except:
        pass

joblib.dump(spectra_cache, 'spectra_cache.joblib')
joblib.dump(spectra, 'spectra.joblib')

In [None]:
spectra = joblib.load('spectra.joblib')

In [None]:
# Add adduct information and fix potentially missing precursor m/z and charge.
for spectrum, precursor_mz, adduct in zip(
        spectra, suspects_unique['SuspectPrecursorMZ'],
        suspects_unique['Adduct']):
    if spectrum is not None:
        spectrum.adduct = adduct
        if spectrum.precursor_mz == 0:
            spectrum.precursor_mz = precursor_mz
        if spectrum.precursor_charge == 0:
            spectrum.precursor_charge = _get_charge(adduct)

In [None]:
# Export spectra for Sirius molecular formula finding.
os.makedirs('spectra', exist_ok=True)
filenames, instruments, adducts = [], [], []
for i, (spectrum, instrument, adduct) in tqdm.tqdm(enumerate(zip(
        spectra, suspects_unique['Instrument'], suspects_unique['Adduct']))):
    if instrument == 'Hybrid FT':
        instrument = 'orbitrap'
    if (instrument.lower() in ('qtof', 'orbitrap') and spectrum is not None
            and abs(spectrum.precursor_charge) == 1):
        spectrum.scan = 1
        _write_spectra_mgf(os.path.join('spectra', f'{i}.mgf'), [spectrum])
        filenames.append(f'$VSC_SCRATCH/suspect_list/spectra/{i}.mgf')
        instruments.append(instrument.lower())
        adducts.append(adduct)
filenames = pd.DataFrame({'filename': filenames, 'instrument': instruments,
                          'adduct': adducts})
spectra_per_job = 200
for i, df in enumerate(np.array_split(filenames,
                                      len(filenames) // spectra_per_job)):
    df.to_csv(f'sirius_filenames_{i}.csv', index=False)

In [None]:
! tar cJf spectra.tar.xz spectra/
! tar cJf sirius_filenames.tar.xz sirius_filenames_*.csv
! mv spectra.tar.xz ../../data/interim
! mv sirius_filenames.tar.xz ../../data/interim
! rm sirius_filenames_*.csv
! rm -rf spectra/

In [None]:
# Read Sirius molecular formulas.
! tar -xf ../../data/interim/formula.tar.xz

formulas = []
for dir_name in os.listdir('formula'):
    i = os.path.splitext(dir_name)[0]
    filename = os.path.join('formula', dir_name, 'formula_identifications.tsv')
    if os.path.exists(filename):
        formula = pd.read_csv(
            filename, sep='\t', usecols=['molecularFormula'], squeeze=True)
        if len(formula) == 0:
            continue
        else:
            formulas.append((int(i), formula.iloc[0]))
formulas = (pd.DataFrame(formulas, columns=['index', 'MolecularFormula'])
            .sort_values('index').set_index('index'))

! rm -rf formula/

In [None]:
# Add the molecular formulas from SIRIUS to the spectra.
suspects_unique = pd.merge(suspects_unique, formulas, 'left',
                           left_index=True, right_index=True)
# Only consider protonated, sodiated, or potassiated adducts,
# with an optional water loss.
suspects_unique.loc[
    ~suspects_unique['Adduct'].str.strip('[]+-').str.split('[+-]')
    .apply(lambda adduct: (2 <= len(adduct) <= 3
                           and adduct[0] == 'M'
                           and adduct[-1] in ('H', 'K', 'Na')
                           and ('H2O' in adduct[1] if len(adduct) == 3
                                else True))),
    'MolecularFormula'] = np.nan

In [None]:
# Skip invalid spectra.
valid_i, valid_spectra = [], []
for i, (spectrum, precursor_mz, adduct) in enumerate(
        zip(spectra, suspects_unique['SuspectPrecursorMZ'],
            suspects_unique['Adduct'])):
    # Filter out multiply charged spectra and spectra for which the precursor
    # mass doesn't match for some reason.
    if (spectrum is not None and abs(spectrum.precursor_charge) in (0, 1)
            and math.isclose(spectrum.precursor_mz, precursor_mz,
                             abs_tol=0.01)):
        spectrum.scan = len(valid_spectra) + 1
        spectrum.adduct = adduct
        valid_i.append(i)
        valid_spectra.append(copy.copy(spectrum))

# Export the annotation sheet and MGF for batch library creation.
annotation_sheet = _batch_annotation_sheet(suspects_unique.iloc[valid_i]
                                           .reset_index(drop=True))
annotation_sheet['FILENAME'] = 'suspect_list_batch_creation.mgf'
annotation_sheet.to_csv('../../data/processed/suspect_list_batch_creation.tsv',
                        sep='\t', index=False, na_rep='N/A',
                        quoting=csv.QUOTE_NONE)
_write_spectra_mgf('../../data/processed/suspect_list_batch_creation.mgf',
                   valid_spectra)
# https://ccms-ucsd.github.io/GNPSDocumentation/batchupload/