In [1]:
from peak_performance import pipeline as pl
from peak_performance import models
from peak_performance import plots
import pandas
import numpy as np
import arviz as az

User information

In [2]:
# 1: Specify the absolute path to the raw data files.
path_raw_data = r"C:\Users\niesser\Desktop\Local GitLab Repositories\peak-performance\example"

# obtain a list of raw data file names.
raw_data_files = pl.detect_npy(path_raw_data)
# raw_data_files = ["A1t1R1Part2_1_110_109.9_110.1.npy", "A1t1R1Part2_2_111_109.9_110.1.npy"]
# here, the names still contain the data type suffix which is removed by the following line.
# raw_data_files = [file[:-4] for file in raw_data_files]
print(raw_data_files)

# necessary information from the user
double_peak = 5*[False]           # 2: List with Booleans in the same order as raw_data_files. Set to True for a given signal, if the signal contains a double peak, and set to False, if it contains a single peak. Visually check this beforehand.
pre_filtering = True                # 3: Set this variable to True if you want to check for peaks before fitting/sampling to potentially save a lot of computation time. If you choose True, then you have to provide an expected retention time for each signal.
retention_time_estimate = 5*[26.2]    # 4: in case you set pre_filtering to True, give a retention time estimate (float) for each signal in raw_data_files. In case of a double peak, give two retention times (in chronological order) as a tuple containing two floats.
peak_width_estimate = 1             # 5: in case you set pre_filtering to True, give a rough estimate of the average peak width in minutes you would expect for your LC-MS method.
minimum_sn = 5                      # 6: in case you set pre_filtering to True, give a minimum signal to noise ratio for a signal to be defined as a peak during pre-filtering

['A1t1R1Part2_1_110_109.9_110.1.npy', 'A1t1R1Part2_2_111_109.9_110.1.npy', 'A1t1R1Part2_3_111_110.9_111.1.npy', 'A1t1R1Part2_4_112_110.9_111.1.npy', 'A1t1R1Part2_5_112_111.9_112.1.npy']


Pipeline

In [3]:
# create data structure and DataFrame(s) for results 
df_summary, path_results = pl.initiate(path_raw_data)
for file in raw_data_files:
    print(f"{file}")
    # parse the data and extract information from the (standardized) file name
    timeseries, acquisition, experiment, precursor_mz, product_mz_start, product_mz_end = pl.parse_data(path_raw_data, file)
    # instantiate the UserInput class all given information
    ui = pl.UserInput(path_results, raw_data_files, double_peak, retention_time_estimate, peak_width_estimate, pre_filtering, minimum_sn, timeseries, acquisition, experiment, precursor_mz, product_mz_start, product_mz_end)
    # calculate initial guesses for pre-filtering and defining prior probability distributions
    slope_guess, intercept_guess, noise_guess = models.initial_guesses(ui.timeseries[0], ui.timeseries[1])
    # apply pre-sampling filter (if selected)
    if pre_filtering:
        prefilter, df_summary = pl.prefiltering(file, ui, noise_guess, df_summary)
        if not prefilter:
            # if no peak candidates were found, continue with the next signal
            plots.plot_raw_data(file, ui)
            continue
        print(f"{ui.experiment} survived prefiltering.")
    # model selection
    if ui.user_info[file][0]:
        # double peak model
        pmodel = models.define_model_doublepeak(ui)
    else:
        # single peaks are first modeled with a skew normal distribution
        pmodel = models.define_model_skew(ui)
    # sample the chosen model
    print("pre_sampling")
    idata = pl.sampling(pmodel)
    print("post-sampling")
    # apply post-sampling filter
    resample, discard, df_summary = pl.postfiltering(file, idata, ui, df_summary)
    print(f"{resample}, {discard}")
    # if peak was discarded, continue with the next signal
    if discard:
        plots.plot_posterior(file, ui, idata, True)
        print("discarded")
        continue
    # if convergence was not yet reached, sample again with more tuning samples
    if resample:
        print("start resampling")
        idata = pl.sampling(pmodel, tune = 4000)
        resample, discard, df_summary = pl.postfiltering(file, idata, ui, df_summary)
        if discard:
            plots.plot_posterior(f"{file}", ui, idata, True)
            continue
        if resample:
            # if signal was flagged for re-sampling a second time, discard it
            # TODO: should this really be discarded or should the contents of idata be added with an additional comment? (would need to add a comment column)
            df_summary = pl.report_add_nan_to_summary(file, ui, df_summary)
            plots.plot_posterior(f"{file}", ui, idata, True)
            continue
    print("after resampling")
    # add inference data to df_summary and save it as an Excel file
    df_summary = pl.report_add_data_to_summary(file, idata, df_summary, ui)
    # perform posterior predictive sampling
    idata = pl.posterior_predictive_sampling(pmodel, idata)
    # save the inference data object in a zip file
    pl.report_save_idata(idata, ui, file)
    # plot data
    plots.plot_posterior_predictive(file, ui, idata, False)
    plots.plot_posterior(file, ui, idata, False)
# save condesed Excel file with area data
pl.report_area_sheet(path_results, df_summary)

A1t1R1Part2_1_110_109.9_110.1.npy
1 survived prefiltering.
pre_sampling


Sampling: [L, alpha, area, baseline_intercept, baseline_slope, mean, noise, std]
Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [baseline_intercept, baseline_slope, noise, mean, std, alpha, area]


Sampling 4 chains for 2_000 tune and 2_000 draw iterations (8_000 + 8_000 draws total) took 76 seconds.


post-sampling
False, False
after resampling


Sampling: [L]


A1t1R1Part2_2_111_109.9_110.1.npy
2 survived prefiltering.
pre_sampling


  signal_to_noise_condition = ui.timeseries[1][peak] / noise_width_guess > ui.minimum_sn
  check_preceding_point = ui.timeseries[1][peak - 1] / noise_width_guess > 2
  check_succeeding_point = ui.timeseries[1][peak + 1] / noise_width_guess > 2
  check_succeeding_point = ui.timeseries[1][peak + 1] / noise_width_guess > 2
  check_preceding_point = ui.timeseries[1][peak - 1] / noise_width_guess > 2
Sampling: [L, alpha, area, baseline_intercept, baseline_slope, mean, noise, std]
Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [baseline_intercept, baseline_slope, noise, mean, std, alpha, area]


Sampling 4 chains for 2_000 tune and 2_000 draw iterations (8_000 + 8_000 draws total) took 115 seconds.


post-sampling
False, True
discarded
A1t1R1Part2_3_111_110.9_111.1.npy
A1t1R1Part2_4_112_110.9_111.1.npy
4 survived prefiltering.
pre_sampling


  signal_to_noise_condition = ui.timeseries[1][peak] / noise_width_guess > ui.minimum_sn
  check_preceding_point = ui.timeseries[1][peak - 1] / noise_width_guess > 2
  check_succeeding_point = ui.timeseries[1][peak + 1] / noise_width_guess > 2
  check_succeeding_point = ui.timeseries[1][peak + 1] / noise_width_guess > 2
  check_preceding_point = ui.timeseries[1][peak - 1] / noise_width_guess > 2
Sampling: [L, alpha, area, baseline_intercept, baseline_slope, mean, noise, std]
Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [baseline_intercept, baseline_slope, noise, mean, std, alpha, area]


Sampling 4 chains for 2_000 tune and 2_000 draw iterations (8_000 + 8_000 draws total) took 142 seconds.


post-sampling
False, True
discarded
A1t1R1Part2_5_112_111.9_112.1.npy
