1) Import the required modules and libraries.

In [None]:
import pastaq as pq
import os
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import matplotlib.colors as colors
from collections import defaultdict
import ms_entropy as me
from pathlib import Path
import re
from itertools import combinations
import os
import ms_entropy as me
from pathlib import Path
from sklearn.metrics import root_mean_squared_error
import ast


2) Import the helper functions for PASTAQ

In [None]:
# helper functions for PASTAQ demonstration
import math as math
import numpy as np
import matplotlib.pyplot as plt

# helper functions
# Function to calculate the Euclidean distance
def euclidean_distance(peak1, peak2):
    peak1x = peak1.fitted_rt
    peak1y = peak1.fitted_mz
    peak2x = peak2.fitted_rt + peak2.rt_delta
    peak2y = peak2.fitted_mz
    return math.sqrt(((peak1x - peak2x)/(peak1.fitted_sigma_rt + peak2.fitted_sigma_rt)) ** 2 + ((peak1y - peak2y)/(peak1.fitted_sigma_mz + peak2.fitted_sigma_mz)) ** 2)

# Function to find the closest peak
def find_closest_peak(reference_peak, peaks, mz_tolerance = 1, rt_tolerance = 1):
    closest_peak = None
    min_distance = float('inf')
    
    for peak in peaks:
        distance = euclidean_distance(reference_peak, peak)
        if distance < min_distance:
            min_distance = distance
            closest_peak = peak
    
    if reference_peak.fitted_mz - mz_tolerance <= closest_peak.fitted_mz <= reference_peak.fitted_mz + mz_tolerance and reference_peak.fitted_rt - rt_tolerance <= closest_peak.fitted_rt + closest_peak.rt_delta <= reference_peak.fitted_rt + rt_tolerance:
        return closest_peak
    else:
        print("Reference peak not found in the peaks list.")
        return None

# get segment indices for a value
def find_segments_indices(values, starts, ends):
    """
    Finds the segment index for each value in the list.
    
    Args:
        values: A list of values to find the segments for.
        starts: A list of segment start values.
        ends: A list of segment end values.
    
    Returns:
        A list of indices corresponding to the segment each value lies in, or -1 if a value is not in any segment.
    """
    # Ensure the lists are of the same length
    if len(starts) != len(ends):
        raise ValueError("The 'starts' and 'ends' lists must have the same length.")
    
    indices = []
    
    for value in values:
        found = False
        for i in range(len(starts)):
            if starts[i] <= value <= ends[i]:
                indices.append(i)
                found = True
                break
        if not found:
            indices.append(-1)
    
    return indices

# interpolate retention time to warped retention time using aligned segments limits
def interpolate_values(values, starts, ends, new_starts, new_ends):
    """
    Interpolates the input values based on corresponding segment index values.
    
    Args:
        values: A list of values to interpolate.
        starts: A list of segment start values.
        ends: A list of segment end values.
        new_starts: A list of new segment start values for interpolation.
        new_ends: A list of new segment end values for interpolation.
    
    Returns:
        A list of interpolated values.
    """
    indices = find_segments_indices(values, starts, ends)
    interpolated_values = []
    
    for value, index in zip(values, indices):
        if index == -1:
            interpolated_values.append(None)  # Value is not in any segment
        else:
            # Perform linear interpolation
            old_start = starts[index]
            old_end = ends[index]
            new_start = new_starts[index]
            new_end = new_ends[index]
            
            # Linear interpolation formula
            if old_end != old_start:  # Avoid division by zero
                interpolated_value = new_start + ((value - old_start) / (old_end - old_start)) * (new_end - new_start)
            else:
                interpolated_value = new_start  # If old_start == old_end, use new_start
            
            interpolated_values.append(interpolated_value)
    
    return interpolated_values

# calculate Gaussian peak with mean, sigma, and height
def gaussian(x, mean, sigma, height):
    return height * np.exp(-((x - mean) ** 2) / (2 * sigma ** 2))

# plot Gaussian peak
def plot_gaussian(mean, sigma, height):
    x = np.linspace(mean - 4 * sigma, mean + 4 * sigma, 1000)
    y = gaussian(x, mean, sigma, height)
    plt.plot(x, y, label='fitted Gaussian peak', color='orange', alpha=0.75, linestyle='-', linewidth=1)

# plot mass spectra considering that only non-eros values are included in the spectra
def plot_msSpectra(mz, intensity, norm_mz_diff = 0.0035, diffFactor = 1.3, scanIdx = 0):
    """
    Plotting mass spectra, expecting spectra where 0 intensity values were omitted.
    
    Args:
        mz: A list of mz values of the mass spectra.
        intensity: A list of intensity (non-zero) values of the mass spectra.
        norm_mz_diff: Difference between two adjacent mz at measurement point corresponding to the original sampling frequency of the mass spectra.
        diffFactor: Tolerance factor allowing to vary sampling frequency of mass spectra. It is typically set to 30% (factor 1.3).
    
    Returns:
        Figure object as fig.
    """
    spectra = {'mz': mz, 'intensity': intensity}
    newSpectra = {'mz': [], 'intensity': []}
    idxSpectra = 0
    for i in range(1, len(spectra['mz'])):
        diff = spectra['mz'][i]-spectra['mz'][i-1]
        if diff > norm_mz_diff*diffFactor:
            if diff < norm_mz_diff*2:
                newSpectra['mz'].insert(idxSpectra, spectra['mz'][i-1] + diff/2)
                newSpectra['intensity'].insert(idxSpectra, 0)
                idxSpectra += 1
                print(norm_mz_diff*diffFactor, diff, norm_mz_diff*diffFactor + diff, norm_mz_diff)
                newSpectra['mz'].insert(idxSpectra, spectra['mz'][i])
                newSpectra['intensity'].insert(idxSpectra, spectra['intensity'][i])
                idxSpectra += 1
            else:
                newSpectra['mz'].insert(idxSpectra, spectra['mz'][i-1] + norm_mz_diff)
                newSpectra['intensity'].insert(idxSpectra, 0)
                idxSpectra += 1
                newSpectra['mz'].insert(idxSpectra, spectra['mz'][i] - norm_mz_diff)
                newSpectra['intensity'].insert(idxSpectra, 0)
                idxSpectra += 1
                newSpectra['mz'].insert(idxSpectra, spectra['mz'][i])
                newSpectra['intensity'].insert(idxSpectra, spectra['intensity'][i])
                idxSpectra += 1
        else:
            newSpectra['mz'].insert(idxSpectra, spectra['mz'][i])
            newSpectra['intensity'].insert(idxSpectra, spectra['intensity'][i])
            if (diff/diffFactor) > norm_mz_diff:
                norm_mz_diff = diff
            idxSpectra += 1

    fig = plt.figure(figsize=(25, 6), facecolor='white')  # Set the figure size and white background
    plt.plot(newSpectra['mz'], newSpectra['intensity'], color = 'red', marker='', linestyle='-')  # Plot mz vs. intensity
    plt.xlabel('m/z')  # Set the x-axis label
    plt.ylabel('Intensity')  # Set the y-axis label
    plt.title('Mass Spectrum of scan {}' .format(scanIdx))  # Set the title
    plt.grid(False)  # Show grid
    return fig

3) Define the parameters (the parameter file used here is labelled as "TOF_parameters")

In [None]:
pastaq_parameters = pq.TOF_parameters(instrument='TOF', avg_fwhm_rt=4)

4) Input the files in the format of .mzML or .mzXML. The user can either select one sample to be used as the reference or they can set reference to 'false' for all samples and PASTAQ will determine which sample should be used as the reference. NB: this pipeline was run using a modified version of the PASTAQ __init__ py, it may be necessary to change 1350 to 1363 in the __init__ py file for PASTAQ version 0.11.2 to the lines of code provided here in the .py file labelled "Change_to_features_PQ".

In [None]:
input_files = [
    {'reference': False, 'raw_path': r'C:\Users\Folder\Subfolder\sampleX1.mzXML', 'group': 'a'},
    {'reference': False, 'raw_path': r'C:\Users\Folder\Subfolder\sampleX2.mzXML', 'group': 'a'},
    {'reference': False, 'raw_path': r'C:\Users\Folder\Subfolder\sampleX3.mzXML', 'group': 'a'},
    {'reference': False, 'raw_path': r'C:\Users\Folder\Subfolder\sampleY1.mzXML', 'group': 'b'},
    {'reference': False, 'raw_path': r'C:\Users\Folder\Subfolder\sampleY2.mzXML', 'group': 'b'},
    {'reference': False, 'raw_path': r'C:\Users\Folder\Subfolder\sampleY3.mzXML', 'group': 'b'}
]

5) DDA_Pipeline()
Run the pipeline. Force override will overwrite any files that are already in the output directory. Changing "save_grid" to true might be useful if you want to look at your samples as graphs/ EICs etc, but I think it might take a very long time to run if you have a lot of samples.

In [None]:
dda_pipeline = pq.dda_pipeline(
    pastaq_parameters,
    input_files,
    output_dir="pastaq",
    force_override=False,
    save_grid=False,
)

2) Import the MS2 information (.ms2 files) output from PASTAQ's DDA pipeline (these will be in the folder called "raw"). Fill in the path to these files as they appear on your device.

In [None]:
input_files = [{'raw_path': r"C:\Users\pastaq\raw\sampleX1.ms2"},
               {'raw_path': r"C:\Users\pastaq\raw\sampleX2.ms2"},
               {'raw_path': r"C:\Users\pastaq\raw\sampleX3.ms2"},
               {'raw_path': r"C:\Users\pastaq\raw\sampleY1.ms2"},
               {'raw_path': r"C:\Users\pastaq\raw\sampleY2.ms2"},
               {'raw_path': r"C:\Users\pastaq\raw\sampleY3.ms2"}
               ]

In [None]:
output_dir = r"C:\Users\pastaq"

3) Import the csv file called "feature_clusters_annotations" output from PASTAQ's CSV file (it will be in the folder called "quant"). It can be read using the read_csv function provided by PANDAS.

In [None]:
feature_clusters_annotations_csv = pd.read_csv(r"C:\Users\pastaq\quant\feature_clusters_annotations.csv")

4) combine_multiple_samples()
Run the combine_multiple_samples() function to link the MS2 information to the csv file containing the feature clusters. This function will skip and therefore remove any features that have no linked MSMS. If you wish to keep these features that have no linked MSMS, the function will have to be manually adjusted. The 'stem' of the input file will be used to obtain the file name. The data is extracted using the pre-existing read_raw_data() function from PASTAQ. 

In [None]:
# Function to link the MS2 information to the csv file containing the feature clusters, this function will skip and therefore remove any features that have no linked MSMS
def combine_multiple_samples(feature_clusters_annotations_csv, input_files, output_dir):
    # Preprocess: build a lookup dictionary to avoid filtering the DataFrame each time
    annotations_lookup = defaultdict(list)

    for _, row in feature_clusters_annotations_csv.iterrows():
        if pd.notnull(row['msms_id']):
            key = (row['file_id'], row['msms_id'])
            annotations_lookup[key].append(row)

    combined_multiple_samples = []

    for file in input_files:
        if 'stem' not in file:
            base_name = os.path.splitext(os.path.basename(file['raw_path']))[0]
            file['stem'] = base_name
        stem = file['stem']
        in_path = os.path.join(output_dir, 'raw', f"{stem}.ms2")

        if not os.path.exists(in_path):
            continue

        raw_data = pq.read_raw_data(in_path)

        for scan in raw_data.scans:
            scan_number = scan.scan_number # The scan number is the same number as the msms_id  number 
            key = (stem, scan_number)
            annotations = annotations_lookup.get(key)

            if not annotations:
                continue

            ms2_mz = scan.mz
            ms2_intensity = scan.intensity
            ms2_rt = scan.retention_time

            # Remove any scans that have no valid spectra (if necessary)
            if not ms2_mz or not ms2_intensity or len(ms2_mz) != len(ms2_intensity):
                continue

            # Convert to numpy array for faster sorting
            mz_array = np.array(ms2_mz)
            intensity_array = np.array(ms2_intensity)
            sorted_indices = np.argsort(mz_array)
            mz_intensity_pairs = list(zip(mz_array[sorted_indices], intensity_array[sorted_indices]))
            ms2_peaks = np.array(mz_intensity_pairs, dtype=np.float32)

            for row in annotations:
                combined_multiple_samples.append({
                    'cluster_id': row['cluster_id'],
                    'file_id': row['file_id'],
                    'feature_id': row['feature_id'],
                    'peak_id': row['peak_id'],
                    'msms_id': row['msms_id'],
                    'ms2_rt': ms2_rt,
                    'charge_state': row['charge_state'],
                    'ms2_peaks' : ms2_peaks
                })


    return combined_multiple_samples

In [7]:
combined_multiple_samples = combine_multiple_samples(feature_clusters_annotations_csv, input_files, output_dir)

5) Input the files containing the feature information output from PASTAQ's DDA pipeline (.features files). They will be in the folder labelled "features". This may be redundant as I have accidentally run it without running the input files and the function was able to find and extract the .features files without specifying the input files. However, I have included the step here in case anyone runs into problems with leaving it out.

In [None]:
input_files = [{'raw_path': r"C:\Users\pastaq\raw\sampleX1.features"},
               {'raw_path': r"C:\Users\pastaq\raw\sampleX2.features"},
               {'raw_path': r"C:\Users\pastaq\raw\sampleX3.features"},
               {'raw_path': r"C:\Users\pastaq\raw\sampleY1.features"},
               {'raw_path': r"C:\Users\pastaq\raw\sampleY2.features"},
               {'raw_path': r"C:\Users\pastaq\raw\sampleY3.features"}
               ]

In [None]:
output_dir = r"C:\Users\pastaq"

6) link_features()
Run the function that links the features to the large dataframe now containing both the MS2 information and the feature cluster information. 

In [None]:
# Function to link the MS1 features to the large dataframe now containing both the MS2 information and the feature cluster informations:
def link_features(combined_multiple_samples, input_files, output_dir):
    # Preprocess: build a lookup dictionary to avoid filtering the DataFrame each time
    annotations_lookup = defaultdict(list)
    linked_features = []

    for item in combined_multiple_samples:
        if pd.notnull(item['feature_id']):
            key = (item['file_id'], item['feature_id'])
            annotations_lookup[key].append(item)

    for file in input_files:
        if 'stem' not in file:
            base_name = os.path.splitext(os.path.basename(file['raw_path']))[0]
            file['stem'] = base_name
        stem = file['stem']
        in_path_features = os.path.join(output_dir, 'features', f"{stem}.features")

        if not os.path.exists(in_path_features):
            print('missing feature file/s')
            continue

        features = pq.read_features(in_path_features)

        for feature in features:
            id = feature.id
            key = (stem, id)
            annotations = annotations_lookup.get(key)

            if not annotations:
                continue

            if isinstance(annotations, list):
                for annotation in annotations:
                    linked_features.append({
                        'cluster_id': annotation['cluster_id'],
                        'file_id': annotation['file_id'],
                        'feature_id': id,
                        'feature_peak_ids': feature.peak_ids,
                        'peak_id' : annotation['peak_id'],
                        'msms_id': annotation['msms_id'],
                        'precursor_mz' : feature.monoisotopic_mz,
                        'precursor_rt': feature.monoisotopic_rt,
                        'precursor_intensity': feature.monoisotopic_height,
                        'precursor_vol': feature.monoisotopic_volume,
                        'total_intensity': feature.total_height,
                        'total_volume': feature.total_volume,
                        'average_ms1_mz': feature.average_mz,
                        'average_rt': feature.average_rt,
                        'charge_state': feature.charge_state,
                        'ms2_sample_peaks': annotation['ms2_peaks'],                  
                        })

    return linked_features


In [11]:
linked_features = link_features(combined_multiple_samples=combined_multiple_samples, input_files=input_files, output_dir=output_dir)

7) average_msms()
This step is optional, please proceed to markdown box 11 if you do not wish to do this. Run the function that finds and averages out the top MSMS linked to one peak. Here "top" is defined by the highest entropy similarity score calculated by MS entropy. The merge threshhold is the threshold that determines when two MS2 values should be added together and averaged out. Top_n is a parameter that determines the number of top spectra to be considered. The code was developed using a top_n of 3 and a mz_merge_thresh of 0.01. The "linked_features" are first converted to a dataframe using PANDAS.

In [None]:
df = pd.DataFrame(linked_features)

In [None]:
# Function to calculate the dot product for the function that finds the best average MS2 value for MS1 peaks with multiple linked MS2 values
def dot_product_with_tolerance(mz1, int1, mz2, int2, tol=0.02):
    matched1, matched2 = [], []
    for i, m1 in enumerate(mz1):
        for j, m2 in enumerate(mz2):
            if abs(m1 - m2) <= tol:
                matched1.append(int1[i])
                matched2.append(int2[j])
                break
    if not matched1:
        return 0.0
    s1 = np.array(matched1); s2 = np.array(matched2)
    if np.linalg.norm(s1) == 0 or np.linalg.norm(s2) == 0:
        return 0.0
    s1 /= np.linalg.norm(s1); s2 /= np.linalg.norm(s2)
    return float(np.dot(s1, s2))

# Function to find the best average MS2 value for MS1 peaks with multiple linked MS2 values
def average_msms(df, top_n=3, mz_tolerance=0.02, mz_merge_thresh=0.01):
    required = ['peak_id','cluster_id','file_id','feature_id','msms_id',
                'ms2_rt','charge_state','ms2_sample_peaks','cent_mz','cent_intensity','precursor_mz',
                'precursor_rt','precursor_intensity','precursor_vol','total_intensity','total_volume',
                'average_ms1_mz','average_rt']

    assert all(c in df.columns for c in required), "Missing required columns"

    results = []
    for (peak_id, file_id), group in df.groupby(['peak_id', 'file_id']):
        if len(group) < 2:
            row = group.iloc[0]
            results.append({
                'cluster_id': row['cluster_id'],
                'file_id' : file_id,
                'feature_id': row['feature_id'],
                'peak_id': peak_id,
                'avg_ms2_retention_time': row['ms2_rt'],
                'total_num_msms': 1,
                'charge_state': row['charge_state'],
                'avg_ms2_cent_peaks': list(zip(row['cent_mz'], row['cent_intensity'])),
                'avg_raw_ms2_sample_peaks': row['ms2_sample_peaks'],
                'avg_precursor_mz': row['precursor_mz'],
                'precursor_mz_list': row['precursor_mz'],
                'avg_precursor_rt': row['precursor_rt'],
                'precursor_rt_list': row['precursor_rt'],
                'avg_precursor_intensity' : row['precursor_intensity'],
                'precursor_intensity_list' : row['precursor_intensity'],
                'avg_precursor_vol': row['precursor_vol'],
                'precursor_vol_list': row['precursor_vol'],
                'dot_product_list': [],
                'avg_dot_product': 0.0,
                'entropy_similarity_list': [],
                'avg_entropy_similarity': 0.0,
                'ms2_sample_peaks' : row['ms2_sample_peaks']
            })
            continue

        msms_list = []
        for (_, r1), (_, r2) in combinations(group.iterrows(), 2):
            dot_product = dot_product_with_tolerance(r1['cent_mz'], r1['cent_intensity'],
                                            r2['cent_mz'], r2['cent_intensity'],
                                            tol=mz_tolerance)
            try:
                entropy_similarity = me.calculate_entropy_similarity(r1['ms2_sample_peaks'], r2['ms2_sample_peaks'])
            except Exception:
                entropy_similarity = None
            msms_list.append({
                'cluster_id': r1['cluster_id'],
                'feature_id': r1['feature_id'],
                'dot_product': dot_product,
                'entropy_similarity': entropy_similarity,
                'ms2_rt': r1['ms2_rt'],
                'cent_pairs': list(zip(r1['cent_mz'], r1['cent_intensity'])),
                'ms2_sample_peaks' : r1['ms2_sample_peaks'],
                'charge_state': r1['charge_state'],
                'precursor_mz': r1['precursor_mz'],
                'precursor_rt': r1['precursor_rt'],
                'precursor_intensity' : r1['precursor_intensity'],
                'precursor_vol': r1['precursor_vol'],
            })

        # filter out missing entropies
        msms_list = [m for m in msms_list if m['entropy_similarity'] is not None]
        if not msms_list:
            continue

        msms_list.sort(key=lambda x: x['entropy_similarity'], reverse=True)
        top_msms = msms_list[:top_n]

        all_cent_peaks = np.concatenate([np.array(m['cent_pairs']) for m in top_msms], axis=0)
        sorted_all_cent_peaks = all_cent_peaks[all_cent_peaks[:,0].argsort()]

        groups_current = []
        current = [sorted_all_cent_peaks[0]]
        for mz_i, intensity_i in sorted_all_cent_peaks[1:]:
            if abs(mz_i - current[-1][0]) <= mz_merge_thresh:
                current.append([mz_i, intensity_i])
            else:
                groups_current.append(np.array(current))
                current = [[mz_i, intensity_i]]
        groups_current.append(np.array(current))

        avg_cent_peaks = [[g[:,0].mean(), g[:,1].mean()] for g in groups_current]
        centroided_arr = np.array(avg_cent_peaks)
        centroided_arr_list = centroided_arr.tolist()

        all_raw_peaks = np.concatenate([np.array(m['ms2_sample_peaks']) for m in top_msms], axis=0)
        sorted_all_raw_peaks = all_raw_peaks[all_raw_peaks[:,0].argsort()]

        groups_current_raw = []
        current_raw = [sorted_all_raw_peaks[0]]
        for mz_i, intensity_i in sorted_all_cent_peaks[1:]:
            if abs(mz_i - current[-1][0]) <= mz_merge_thresh:
                current.append([mz_i, intensity_i])
            else:
                groups_current_raw.append(np.array(current_raw))
                current_raw = [[mz_i, intensity_i]]
        groups_current_raw.append(np.array(current_raw))

        avg_raw_peaks = [[g[:,0].mean(), g[:,1].mean()] for g in groups_current_raw]
        raw_arr = np.array(avg_raw_peaks)
        raw_arr_list = raw_arr.tolist()

        results.append({
            'cluster_id': [m['cluster_id'] for m in top_msms],
            'file_id' : file_id,
            'feature_id': [m['feature_id'] for m in top_msms],
            'peak_id': peak_id,
            'avg_ms2_retention_time': np.mean([m['ms2_rt'] for m in top_msms]),
            'total_num_msms': len(msms_list), # changed from (top_msms)
            'dot_product_list': [m['dot_product'] for m in top_msms],
            'avg_dot_product': np.mean([m['dot_product'] for m in top_msms]),
            'entropy_similarity_list': [m['entropy_similarity'] for m in top_msms],
            'avg_entropy_similarity': np.mean([m['entropy_similarity'] for m in top_msms]),
            'avg_ms2_cent_peaks': centroided_arr_list,
            'charge_state': [m['charge_state'] for m in top_msms],
            'precursor_mz_list': [m['precursor_mz'] for m in top_msms],
            'avg_precursor_mz' : np.mean([m['precursor_mz'] for m in top_msms]),
            'precursor_rt_list': [m['precursor_rt'] for m in top_msms],
            'avg_precursor_rt' : np.mean([m['precursor_rt'] for m in top_msms]),
            'precursor_intensity_list': [m['precursor_intensity'] for m in top_msms],
            'avg_precursor_intensity' : np.mean([m['precursor_intensity'] for m in top_msms]),
            'precursor_vol_list': [m['precursor_vol'] for m in top_msms],
            'avg_precursor_vol' : np.mean([m['precursor_vol'] for m in top_msms]),
            'avg_raw_ms2_sample_peaks' : raw_arr_list
        })

    return results


In [None]:
results = average_msms(df, top_n=3, mz_tolerance=0.02, mz_merge_thresh=0.01)

8) read_msp_file()
Define the path to and read your MSP file. The function to read the MSP file will automatically convert the retention time from minutes to seconds, if your MSP data is already given in seconds you will have to remove that line of code. This code will also remove any annotations with missing or invalid retention times.

In [None]:
# Function to parse any strings that may be present in the MSP file
def parse_array_from_string(s):
    if isinstance(s, str):
        return np.array([float(x) for x in re.findall(r"[-+]?\d*\.\d+|\d+", s)])
    return np.array([])

# Function to read and retrieve the annotations from an MSP file in the .msp format
def read_msp_file(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()
    
    spectra_data = []
    current_spectrum = {}
    peak_data_started = False
    
    for line in lines:
        line = line.strip()

        if line.startswith("Num Peaks"):
            peak_data_started = True
            continue

        if not peak_data_started:
            if line.startswith("NAME:"):
                if "|" in line:
                    # If the line contains '|', split the line after the '|' character
                    parts = line.split('|')
                    current_spectrum['name'] = parts[0].replace("NAME:", "").strip()  # Remove "NAME:" and strip any extra spaces
                    current_spectrum['saturation'] = parts[1].strip()  # After the '|'
                else:
                    # Otherwise, just use the name if no saturation is specified
                    current_spectrum['name'] = line.split(":", 1)[1].strip()
               
            elif line.startswith("PRECURSORMZ:"):
                current_spectrum['precursor_mz'] = float(line.split(":", 1)[1].strip())
            elif line.startswith("PRECURSORTYPE:"):
                current_spectrum['precursor_type'] = line.split(":", 1)[1].strip()
            elif line.startswith("IONMODE:"):
                current_spectrum['ion_mode'] = line.split(":", 1)[1].strip()              
            elif line.startswith("RETENTIONTIME:"):
                _, raw = line.split(":", 1)
                val = raw.strip()
                if not val:
                    print("[SKIP] empty retention_time")
                    continue
                try:
                    rt = float(val)
                except ValueError:
                        rt_parsed = parse_array_from_string(val)
                        if isinstance(rt_parsed, (list, np.ndarray)) and len(rt_parsed) == 1:
                            rt = float(rt_parsed[0])
                        else:
                            print("[SKIP] array invalid, skipping")
                            continue
                current_spectrum['retention_time'] = rt * 60 # convert to seconds
            elif line.startswith("Name: "):
                if current_spectrum:
                    spectra_data.append(current_spectrum)
                    current_spectrum = {"Name": line.split(":",1)[1].strip()}
            elif line.startswith("FORMULA:"):
                current_spectrum['formula'] = line.split(":", 1)[1].strip()
            elif line.startswith("INCHIKEY:"):
                current_spectrum['inchi_key'] = line.split(":", 1)[1].strip()
            elif line.startswith("SMILES:"):
                current_spectrum['smiles'] = line.split(":", 1)[1].strip()
            elif line.startswith("COMMENT:"):
                current_spectrum['comment'] = line.split(":", 1)[1].strip()

        else:
            try:
                mz, intensity = map(float, line.split())
                current_spectrum.setdefault('peaks', []).append((mz, intensity))
            except ValueError:
                # This is where the spectrum data is stored and new spectrum begins
                if current_spectrum:
                    spectra_data.append(current_spectrum)
                current_spectrum = {}  # Reset for the next spectrum
                peak_data_started = False  # Reset peak reading flag

    # Add last spectrum if it exists
    if current_spectrum:
        spectra_data.append(current_spectrum)
    
    return spectra_data

In [None]:
msp_file = r"C:\Users\path_to_your_msp_file.msp"

In [None]:
msp_data = read_msp_file(msp_file)

9) link_msp() - the function is the link MSP function that is used if the peaks with multiple MSMS have been averaged out.
Link the annotations from the MSP file to the dataframe containing all the information for your samples. The code was developed using an rt tolerance of 8 seconds and an m/z tolerance of 0.025. You can set the tolerance yourself, however, you may have to adjust the "match_score" calculations if you change the tolerance. This function only links annotations to features on the MS1 level, using the precursor rt and precursor m/z to determine a score that measures the weighted difference between the m/z and rt of the sample compared to the reference. This function performs quality checks on both the MS1 and MS2 level. 

In [None]:
#ATTENTION - This function is the link MSP function that is used if the peaks with multiple MSMS have been averaged out
def link_msp(results, msp_data, mz_tolerance=0.025, rt_tolerance=8.0):
    linked_msp_features = []

    # Loop through a list of Feature objects
    for feature in results:
        cluster_id = feature['cluster_id']
        feature_id = feature['feature_id']
        peak_id = feature['peak_id']
        avg_precursor_mz = feature['avg_precursor_mz']
        avg_precursor_rt = feature['avg_precursor_rt']
        avg_precursor_intensity = feature['avg_precursor_intensity']
        avg_precursor_volume = feature['avg_precursor_vol']
        avg_ms2_cent_peaks = feature['avg_ms2_cent_peaks']
        avg_raw_ms2_sample_peaks = feature['avg_raw_ms2_sample_peaks']
        charge_state = feature['charge_state']              
        avg_normalized_area = avg_precursor_volume * 114.7977026

        # Find all matching annotations for the current scan
        best_match = None  # To store the best match found

        for annotation in msp_data:
            # Ensure required fields are present in the annotation
            if 'precursor_mz' not in annotation or 'retention_time' not in annotation:
                continue  # Skip this annotation

            else:
                # Calculate mz and rt distances directly for scalars
                mz_distance = np.abs(avg_precursor_mz - annotation['precursor_mz']) # changed this from 'mz' to 'precursor_mz'
                rt_distance = np.abs(avg_precursor_rt - annotation['retention_time'])  # changed this from 'retention_time' to 'precursor_rt' 

                # Apply the tolerance checks
                if mz_distance <= mz_tolerance and rt_distance <= rt_tolerance:
                    # If the distances are within tolerance, calculate the match score with normalization factor
                    match_score = (rt_distance*0.025) + (mz_distance*20) # lower score is better/ closer match; added weighting
                
                    # Calculate mass error in ppm using formula
                    mass_error_ppm = ((annotation['precursor_mz'] - avg_precursor_mz) / annotation['precursor_mz']) * 10**6
                
                    #Calculate root mean squared error for best match
                    y_true_mz = [annotation.get('precursor_mz')]
                    y_pred_mz = [avg_precursor_mz]
                    rmse_mz = root_mean_squared_error(y_true_mz, y_pred_mz)

                    # Convert peaks to numpy arrays for similarity calculation (MS_Entropy)
                    peaks_query = np.array(avg_raw_ms2_sample_peaks, dtype=np.float32) #peaks from given samples
                    peaks_reference = np.array(annotation.get('peaks'), dtype=np.float32)  # peaks from msp
                    if peaks_reference.ndim == 3 and peaks_reference.shape[0] == 1:
                        peaks_reference = peaks_reference[0]  # flatten if needed

                    # Skip if still not 2D
                    if peaks_reference.ndim != 2:
                            continue
                    
                    # Calculate similarity scores (MS_entropy)
                    unweighted_similarity = me.calculate_unweighted_entropy_similarity(peaks_query, peaks_reference)
                    similarity = me.calculate_entropy_similarity(peaks_query, peaks_reference) # entropy based intensity weights are applied to the peaks

                    # Extract m/z and intensity values
                    reference_mz, reference_intensity = zip(*peaks_reference)

                    # Convert to NumPy arrays
                    # Parse the mz and intensity arrays
                    sample_mz, sample_intensity = zip(*avg_ms2_cent_peaks)
                    sample_mz = np.array(sample_mz)
                    sample_intensity = np.array(sample_intensity, dtype=np.float32)
                    reference_mz = np.array(reference_mz)
                    reference_intensity = np.array(reference_intensity, dtype=np.float32)

                    # Match peaks based on m/z values
                    matched_indices = np.searchsorted(reference_mz, sample_mz)

                    # Ensure indices are within bounds (removes any indices that are out of bounds)
                    matched_indices = matched_indices[matched_indices < len(reference_mz)]

                    # Example mass spectra intensities
                    sample_spectrum = np.array(sample_intensity[:len(matched_indices)])
                    reference_spectrum = np.array(reference_intensity[matched_indices])

                    # Normalize both vectors to unit length (L2 norm)
                    sample_spectrum_normalized = sample_spectrum / np.linalg.norm(sample_spectrum)
                    reference_spectrum_normalized = reference_spectrum / np.linalg.norm(reference_spectrum)

                    # Compute dot product (cosine similarity if normalized)
                    if len(sample_spectrum) < 3 or len(reference_spectrum) < 3:
                        dot_product = None
                    else:
                        dot_product = np.dot(sample_spectrum_normalized, reference_spectrum_normalized)
                
                    if best_match is None or match_score < best_match['score']:
                        best_match = {
                            'score': float(match_score),  # Store the match score
                            'name': annotation.get('name', None),
                            'saturation' : annotation.get('saturation', None),
                            'retention_time': annotation.get('retention_time', None),
                            'precursor_mz': annotation.get('precursor_mz', None),
                            'precursor_type': annotation.get('precursor_type', None),
                            'smiles': annotation.get('smiles', None),
                            'msp_peaks': annotation.get('peaks', None),
                            }
        
                        # Create the annotated scan with only the best match
                        linked_msp_feature = {
                            'cluster_id' : cluster_id,
                            'feature_id': feature_id,
                            'peak_id': peak_id,
                            'avg_precursor_mz': avg_precursor_mz,
                            'avg_precursor_intensity' : avg_precursor_intensity,
                            'avg_precursor_rt' : avg_precursor_rt,
                            'avg_precursor_volume' : avg_precursor_volume,
                            'avg_normalized_area' : avg_normalized_area,
                            'charge_state' : charge_state,
                            'avg_ms2_cent_peaks' : avg_ms2_cent_peaks,
                            'avg_raw_ms2_sample_peaks' :  avg_raw_ms2_sample_peaks,
                            'mass_error_ppm' : float(mass_error_ppm),
                            'rmse_mz' : float(rmse_mz),
                            'unweighted_entropy_similarity' : unweighted_similarity,
                            'entropy_similarity' : similarity,
                            'dot_product': float(dot_product) if dot_product is not None else 'NA',
                            'matches': []  # List to hold the top match
                            }

                        # Add the best match if available
                        if best_match:
                            linked_msp_feature['matches'].append(best_match)
                        else:
                            linked_msp_feature['matches'] = None  # No match found
        
                        linked_msp_features.append(linked_msp_feature)

    # Return after all features are processed
    return linked_msp_features

In [None]:
linked_msp =link_msp(results, msp_data, mz_tolerance=0.025, rt_tolerance=8.0)

10) Save your final results to a CSV file.

In [None]:
from pathlib import Path  
linked_msp_df = pd.DataFrame(linked_msp)
filepath = Path('Annotations/linked_msp.csv')  
filepath.parent.mkdir(parents=True, exist_ok=True)  
linked_msp_df.to_csv(Path('Annotations/linked_msp.csv', index=False))

11) read_msp_file()
Define the path to and read your MSP file. The function to read the MSP file will automatically convert the retention time from minutes to seconds, if your MSP data is already given in seconds you will have to remove that line of code. This code will also remove any annotations with missing or invalid retention times.

In [None]:
# Function to parse any strings that may be present in the MSP file
def parse_array_from_string(s):
    if isinstance(s, str):
        return np.array([float(x) for x in re.findall(r"[-+]?\d*\.\d+|\d+", s)])
    return np.array([])

# Function to read and retrieve the annotations from an MSP file in the .msp format
def read_msp_file(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()
    
    spectra_data = []
    current_spectrum = {}
    peak_data_started = False
    
    for line in lines:
        line = line.strip()

        if line.startswith("Num Peaks"):
            peak_data_started = True
            continue

        if not peak_data_started:
            if line.startswith("NAME:"):
                if "|" in line:
                    # If the line contains '|', split the line after the '|' character
                    parts = line.split('|')
                    current_spectrum['name'] = parts[0].replace("NAME:", "").strip()  # Remove "NAME:" and strip any extra spaces
                    current_spectrum['saturation'] = parts[1].strip()  # After the '|'
                else:
                    # Otherwise, just use the name if no saturation is specified
                    current_spectrum['name'] = line.split(":", 1)[1].strip()
               
            elif line.startswith("PRECURSORMZ:"):
                current_spectrum['precursor_mz'] = float(line.split(":", 1)[1].strip())
            elif line.startswith("PRECURSORTYPE:"):
                current_spectrum['precursor_type'] = line.split(":", 1)[1].strip()
            elif line.startswith("IONMODE:"):
                current_spectrum['ion_mode'] = line.split(":", 1)[1].strip()              
            elif line.startswith("RETENTIONTIME:"):
                _, raw = line.split(":", 1)
                val = raw.strip()
                if not val:
                    print("[SKIP] empty retention_time")
                    continue
                try:
                    rt = float(val)
                except ValueError:
                        rt_parsed = parse_array_from_string(val)
                        if isinstance(rt_parsed, (list, np.ndarray)) and len(rt_parsed) == 1:
                            rt = float(rt_parsed[0])
                        else:
                            print("[SKIP] array invalid, skipping")
                            continue
                current_spectrum['retention_time'] = rt * 60 # convert to seconds
            elif line.startswith("Name: "):
                if current_spectrum:
                    spectra_data.append(current_spectrum)
                    current_spectrum = {"Name": line.split(":",1)[1].strip()}
            elif line.startswith("FORMULA:"):
                current_spectrum['formula'] = line.split(":", 1)[1].strip()
            elif line.startswith("INCHIKEY:"):
                current_spectrum['inchi_key'] = line.split(":", 1)[1].strip()
            elif line.startswith("SMILES:"):
                current_spectrum['smiles'] = line.split(":", 1)[1].strip()
            elif line.startswith("COMMENT:"):
                current_spectrum['comment'] = line.split(":", 1)[1].strip()

        else:
            try:
                mz, intensity = map(float, line.split())
                current_spectrum.setdefault('peaks', []).append((mz, intensity))
            except ValueError:
                # This is where the spectrum data is stored and new spectrum begins
                if current_spectrum:
                    spectra_data.append(current_spectrum)
                current_spectrum = {}  # Reset for the next spectrum
                peak_data_started = False  # Reset peak reading flag

    # Add last spectrum if it exists
    if current_spectrum:
        spectra_data.append(current_spectrum)
    
    return spectra_data

In [None]:
msp_file = r"C:\Users\path_to_your_msp_file.msp"

In [None]:
msp_data = read_msp_file(msp_file)

12) link_msp()
Link the annotations from the MSP file to the dataframe containing all the information for your samples. The code was developed using an rt tolerance of 8 seconds and an m/z tolerance of 0.025. You can set the tolerance yourself, however, you may have to adjust the "match_score" calculations if you change the tolerance. This function only links annotations to features on the MS1 level, using the precursor rt and precursor m/z to determine a score that measures the weighted difference between the m/z and rt of the sample compared to the reference. This function performs quality checks on both the MS1 and MS2 level. 

In [None]:
import os
import pandas as pd
import numpy as np
from collections import defaultdict
import ms_entropy as me
from pathlib import Path
from sklearn.metrics import root_mean_squared_error
import ast

# Function to link the annotations from the MSP file to the dataframe containing the MS1 and MS2 informataion for all of the samples provided.
def link_msp(linked_features, msp_data, mz_tolerance=0.025, rt_tolerance=8.0):
    linked_msp_features = []

    # Loop through a list of Feature objects
    for feature in linked_features:
        cluster_id = feature['cluster_id']
        file_id = feature['file_id']
        feature_id = feature['feature_id']
        feature_peak_ids = feature['feature_peak_ids']
        peak_id = feature['peak_id']
        msms_id = feature['msms_id']
        precursor_mz = feature['precursor_mz']
        precursor_rt = feature['precursor_rt']
        precursor_intensity = feature['precursor_intensity']
        precursor_volume = feature['precursor_vol']
        average_ms1_mz = feature['average_ms1_mz']
        average_rt = feature['average_rt']
        charge_state = feature['charge_state']
        ms2_sample_peaks = feature['ms2_sample_peaks']
        total_intensity = feature['total_intensity']
        total_volume = feature['total_volume']
        normalized_area = precursor_volume * 114.7977026 # multiply the volume by the normalisation factor for comparison with MS-Dial

        # Centroid the MS2 spectrum, using the default parameters but with normalize_intensity set to false.
        centroided_peaks = me.clean_spectrum(
            ms2_sample_peaks,
            min_ms2_difference_in_da=0.02,
            normalize_intensity=False
        )

        centroided_arr = np.array(centroided_peaks)
        centroided_arr_list = centroided_arr.tolist()

        # Find all matching annotations
        best_match = None

        for annotation in msp_data:
            if 'precursor_mz' not in annotation or 'retention_time' not in annotation:
                continue

            # Calculate mz and rt distances, these are weighted to give a score between 0 and 1 and to prevent skewing from the retention time.
            mz_distance = np.abs(precursor_mz - annotation['precursor_mz'])
            rt_distance = np.abs(precursor_rt - annotation['retention_time'])

            if mz_distance <= mz_tolerance and rt_distance <= rt_tolerance:
                match_score = (rt_distance * 0.025) + (mz_distance * 20) # A lower score means a closer match between the sample and the reference in the MSP file.
                mass_error_ppm = ((annotation['precursor_mz'] - precursor_mz) / annotation['precursor_mz']) * 1e6

                y_true_mz = [annotation.get('precursor_mz')]
                y_pred_mz = [precursor_mz]
                rmse_mz = root_mean_squared_error(y_true_mz, y_pred_mz)

                peaks_query = np.array(ms2_sample_peaks, dtype=np.float32)
                peaks_reference = np.array([annotation.get('peaks')], dtype=np.float32)
                if peaks_reference.ndim == 3 and peaks_reference.shape[0] == 1:
                    peaks_reference = peaks_reference[0]  # flatten if needed

                # Skip if still not 2D
                if peaks_reference.ndim != 2:
                    continue

                unweighted_similarity = me.calculate_unweighted_entropy_similarity(peaks_query, peaks_reference)
                similarity = me.calculate_entropy_similarity(peaks_query, peaks_reference)

                reference_mz, reference_intensity = zip(*peaks_reference)
                sample_mz, sample_intensity = zip(*centroided_peaks)

                sample_mz = np.array(sample_mz)
                sample_intensity = np.array(sample_intensity, dtype=np.float32)
                reference_mz = np.array(reference_mz)
                reference_intensity = np.array(reference_intensity, dtype=np.float32)

                matched_indices = np.searchsorted(reference_mz, sample_mz)
                matched_indices = matched_indices[matched_indices < len(reference_mz)]

                sample_spectrum = sample_intensity[:len(matched_indices)]
                reference_spectrum = reference_intensity[matched_indices]

                sample_spectrum_normalized = sample_spectrum / np.linalg.norm(sample_spectrum)
                reference_spectrum_normalized = reference_spectrum / np.linalg.norm(reference_spectrum)

                dot_product = None
                if len(sample_intensity) >= 3 and len(reference_intensity) >= 3:
                    dot_product = np.dot(sample_spectrum_normalized, reference_spectrum_normalized)

                if best_match is None or match_score < best_match['score']:
                    best_match = {
                        'score': float(match_score),
                        'name': annotation.get('name', None),
                        'saturation': annotation.get('saturation', None),
                        'retention_time': annotation.get('retention_time', None),
                        'precursor_mz': annotation.get('precursor_mz', None),
                        'precursor_type': annotation.get('precursor_type', None),
                        'smiles': annotation.get('smiles', None),
                        'msp_peaks': annotation.get('peaks', None),
                    }

        linked_msp_feature = {
            'cluster_id': cluster_id,
            'file_id': file_id,
            'feature_id': feature_id,
            'feature_peak_ids': feature_peak_ids,
            'peak_id': peak_id,
            'msms_id': msms_id,
            'average_ms1_mz': average_ms1_mz,
            'total_intensity': total_intensity,
            'average_rt': average_rt,
            'total_volume': total_volume,
            'precursor_mz': precursor_mz,
            'precursor_intensity': precursor_intensity,
            'precursor_rt': precursor_rt,
            'precursor_volume': precursor_volume,
            'normalized_area': normalized_area,
            'charge_state': charge_state,
            'centroided_ms2_peaks': centroided_arr_list,
            'mass_error_ppm': float(mass_error_ppm) if best_match else None,
            'rmse_mz': float(rmse_mz) if best_match else None,
            'unweighted_entropy_similarity': unweighted_similarity if best_match else None,
            'entropy_similarity': similarity if best_match else None,
            'dot_product': float(dot_product) if dot_product is not None else 'NA',
            'matches': [best_match] if best_match else None,
            'raw_ms2_peaks' : ms2_sample_peaks,
            'msp_peaks' : best_match['msp_peaks'] if best_match else None
        }

        linked_msp_features.append(linked_msp_feature)

    return linked_msp_features

In [43]:
linked_msp = link_msp(linked_features, msp_data, mz_tolerance=0.025, rt_tolerance=8.0)

13) Save your final results to a CSV file.

In [None]:
from pathlib import Path  
linked_msp_df = pd.DataFrame(linked_msp)
filepath = Path('Annotations/linked_msp.csv')  
filepath.parent.mkdir(parents=True, exist_ok=True)  
linked_msp_df.to_csv(Path('Annotations/linked_msp.csv', index=False))