In [1]:
import csv
import numpy as np
import pandas as pd
import scipy.signal as signal
from scipy.signal import find_peaks, peak_widths, peak_prominences, find_peaks_cwt, argrelextrema
# bokeh plot settings
from bokeh.io import output_notebook, show, save
from bokeh.models import ColumnDataSource, HoverTool, Scatter
from bokeh.plotting import figure
from bokeh.sampledata.autompg import autompg_clean
from bokeh.transform import factor_cmap
from bokeh.layouts import gridplot
output_notebook()

# Append System Path

In [2]:
# Path: tests/peak_detect.ipynb
import os
# find root directory
root = os.path.dirname(os.path.abspath('./'))
# append root directory to sys.path
import sys
sys.path.append(root)

# Read data and pre-process

In [None]:
from tools.get_files import GetFiles
from tools.read_mzxml import read_mzxml


file_finder = GetFiles('/mnt/d/Data/LTQ数据/', 'mzXML')
file_list = file_finder.get_files()
xml_data_list = []
filename_list = []
for file in file_list:
    filename_list.append(file.split('/')[-1])
    xml_data_list.append([read_mzxml(file, env='Production')])

In [None]:
def get_mass_spectrum(data):
    """
    Get the mass spectrum of a specific mass from the data.
    """
    spectrum_list = []
    for i in range(len(data)):
        spectrum = pd.DataFrame({'mz': data[i][0::2], 'intensity': data[i][1::2]},)
        spectrum_list.append(spectrum)
    return spectrum_list

In [None]:
file_data_dict = dict(zip(filename_list, xml_data_list))
spectrums_list = []
for key, value in file_data_dict.items():
    point_list = value[0][0]
    spectrum_list = get_mass_spectrum(point_list)
    spectrums_list.append(spectrum_list)

file_spectrum_dict = dict(zip(filename_list, spectrums_list))

for key, value in file_spectrum_dict.items():
    for id, spectrum in enumerate(value):
        p = figure(title=key, x_axis_label='m/z', y_axis_label='intensity')
        p.line(spectrum['mz'], spectrum['intensity'], line_width=2)
        save_dir = '/mnt/d/Data/LTQ数据/' + os.path.splitext(key)[0] + '/'
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        save(p, filename='{}.html'.format(save_dir + str(id)))


In [3]:
# read data from csv file
data_dir = os.path.join(root, 'data/')
from tools.get_files import GetFiles
file_finder = GetFiles(data_dir, 'csv')
file_list = file_finder.get_files()

  0%|          | 0/10 [00:00<?, ?it/s]100%|██████████| 10/10 [00:00<00:00, 130663.68it/s]


In [4]:
# read csv files
def read_csv(file):
    df = pd.read_csv(file, )
    df.dropna()
    filename = os.path.basename(os.path.splitext(file)[0])
    return df, filename

In [5]:

data_list, filename_list = [], []
for file in file_list:
    data, filename = read_csv(file)
    data = data[['x','y']]
    data_list.append(data)
    filename_list.append(filename)

# Main Content -- Data process and visualization

## Denoise filter definition

In [None]:
# plot data with denoise function
def plot_after_filted(filter, data, title:str):
    if filter.__name__ == 'denoise':
        p = figure(plot_width=800, plot_height=400, title=title + ' with uv_filter')
        filted_data = filter(data['y'], 100)
    else:
        p = figure(plot_width=800, plot_height=400, title=title + ' with ' + filter.__name__)
        filted_data = filter(data['y'])
    p.line(data['x'], data['y'], line_dash = (4, 4), line_width=2, color='skyblue')
    p.line(data['x'], filted_data, line_width=2, color='red')
    return p

In [None]:
from filter import *
uv_denoiser = Denoiser()

# define filter function list
filter_list = [conv_filter, wavelet_noising, savgol_filter, signal.medfilt, signal.wiener, uv_denoiser.denoise]

## Data filter and plot

In [None]:
plot_list = []
for i in range(len(data_list)):
    for filter in filter_list:
        temp_p = plot_after_filted(filter, data_list[i], filename_list[i])
        plot_list.append(temp_p)

grid = gridplot(plot_list, ncols=3, plot_width=400, plot_height=400)
show(grid)

## Define peak find function by using peakutils

In [None]:
def plot_peak(p, peak_index, peak_height, data, title:str):
    p.scatter(data['x'][peak_index], peak_height, size=5, color='darkcyan')
    return p

In [None]:
import peakutils

peak_plot_list = []
for i in range(len(data_list)):
    for filter in filter_list:
        temp_p = plot_after_filted(filter, data_list[i], filename_list[i])
        peak_index = peakutils.indexes(data_list[i]['y'], thres=0.5, min_dist=100)
        peak_height = data_list[i]['y'][peak_index]
        temp_peak_p = plot_peak(temp_p, peak_index, peak_height, data_list[i], filename_list[i])
        peak_plot_list.append(temp_peak_p)

grid_peak = gridplot(peak_plot_list, ncols=3, width=400, height=400)
show(grid_peak)

In [None]:
## test gaussian fit
for i in range(len(data_list)):
    test = peakutils.gaussian_fit(data_list[i]['x'], data_list[i]['y'], False)
    peak_index = peakutils.indexes(data_list[i]['y'], thres=0.5, min_dist=100)
    peak_height = data_list[i]['y'][peak_index]
    gaussian = peakutils.gaussian(data_list[i]['x'], test[0], test[1], test[2])
    p = figure(plot_width=800, plot_height=400, title=filename_list[i] + ' with gaussian fit')
    p.line(data_list[i]['x'], data_list[i]['y'], line_dash = (4, 4), line_width=2, color='skyblue')
    p.line(data_list[i]['x'], gaussian, line_width=2, color='red')
    show(p)

## Temporary gaussian fit definition

In [None]:
def guassian(x, *params):
    num_func = int(len(params)/3)

    y_list = []

    for i in range(num_func):
        y = np.zeros_like(x)
        param_range = list(range(3*i, 3*(i+1), 1))
        amp = params[int(param_range[0])]
        ctr = params[int(param_range[1])]
        wid = params[int(param_range[2])]
        y += amp * np.exp(-(x - ctr)**2 / wid)
        y_list.append(y)
    
    y_sum = np.zeros_like(x)
    for i in y_list:
        y_sum += i

    y_sum += params[-1]

    return y_sum

def peaks_fit(x, *params):
    num_func = int(len(params)/3)

    y_list = []

    for i in range(num_func):
        y = np.zeros_like(x)
        param_range = list(range(3*i, 3*(i+1), 1))
        amp = params[int(param_range[0])]
        ctr = params[int(param_range[1])]
        wid = params[int(param_range[2])]
        y += amp * np.exp(-(x - ctr)**2 / wid) + params[-1]
        y_list.append(y)
    return y_list

## Define peak find function by using findpeaks


In [12]:
from findpeaks import findpeaks
from scipy.optimize import curve_fit

def plot_peak_fit(p, peak_index, peak_height, data, title:str):
    p.scatter(data['x'][peak_index], peak_height, size=5, color='darkcyan')
    return p

def plot_valley_fit(p, valley_index, valley_height, data, title:str):
    p.star(data['x'][valley_index], valley_height, size=5, color='darkcyan')
    return p

In [13]:
# handle peak fit reuslt
def peak_fit_result(result, data):
    df = result['df']
    df['x'] = data['x']
    peak_index= df[lambda df: df['peak'] == True]
    highest_peak = df.loc[peak_index.index.tolist()]['y'].max()
    peak_index_list = peak_index[lambda peak_index: peak_index['y'] > .1 * highest_peak].index.tolist()
    valley_index = df[lambda df: df['valley'] == True]
    valley_index_list = valley_index[lambda valley_index: valley_index['y'] > .1 * highest_peak].index.tolist()
    print('peak index: ', peak_index_list)
    print('valley index: ', valley_index_list)
    return peak_index_list, valley_index_list

In [14]:
grid_fp_list = []
for i in range(len(data_list)):
    # peak_ctr = peakutils.centroid(data_list[i]['x'], data_list[i]['y'])
    fp = findpeaks(method='topology', interpolate=10, lookahead=100, limit=5)
    try:
        fp_result = fp.fit(data_list[i]['y'])
        peak_index, valley_index = peak_fit_result(fp_result, data_list[i])
        peak_height = data_list[i]['y'][peak_index]
        p = figure(plot_width=800, plot_height=400, title=filename_list[i] + ' with peak fit')
        p.line(data_list[i]['x'], data_list[i]['y'], line_dash = (4, 4), line_width=2, color='skyblue')
        p = plot_peak_fit(p, peak_index, peak_height, data_list[i], filename_list[i])
        p = plot_valley_fit(p, valley_index, data_list[i]['y'][valley_index], data_list[i], filename_list[i])
        grid_fp_list.append(p)
    except:
        print('no peak found in ', filename_list[i])
grid_fp = gridplot(grid_fp_list, ncols=3, plot_width=400, plot_height=400)
show(grid_fp)

[findpeaks] >Finding peaks in 1d-vector using [topology] method..
[findpeaks] >Interpolating 1d-vector by factor 10
[findpeaks] >Detect peaks using topology method with limit at 5.
peak index:  [410, 429, 441]
valley index:  [423, 440]
[findpeaks] >Finding peaks in 1d-vector using [topology] method..
[findpeaks] >Interpolating 1d-vector by factor 10
[findpeaks] >Detect peaks using topology method with limit at 5.
peak index:  [256]
valley index:  []
[findpeaks] >Finding peaks in 1d-vector using [topology] method..
[findpeaks] >Interpolating 1d-vector by factor 10
[findpeaks] >Detect peaks using topology method with limit at 5.
peak index:  [279, 299]
valley index:  [290]
[findpeaks] >Finding peaks in 1d-vector using [topology] method..
[findpeaks] >Interpolating 1d-vector by factor 10
[findpeaks] >Detect peaks using topology method with limit at 5.
peak index:  [287, 307]
valley index:  [301]
[findpeaks] >Finding peaks in 1d-vector using [topology] method..
[findpeaks] >Interpolating 1

## Curve fit with gaussian

In [16]:
# curve fit
from lmfit.models import GaussianModel, LinearModel, ExponentialModel
grid_cf_list = []
for i in range(len(data_list)):
    exp_mod = ExponentialModel(prefix='exp_')
    pars = exp_mod.guess(data_list[i]['y'], x=data_list[i]['x'])
    gauss_main = GaussianModel(prefix='gauss_main_')
    pars.update(gauss_main.make_params())
    fp = findpeaks(method='topology', interpolate=10, lookahead=100, limit=5)
    try:
        fp_result = fp.fit(data_list[i]['y'])
        peak_index, valley_index = peak_fit_result(fp_result, data_list[i])

        pars['gauss_main_center'].set(data_list[i]['x'][peak_index[0]], min=data_list[i]['x'][peak_index[0]]-50, max=data_list[i]['x'][peak_index[0]]+50)
        pars['gauss_main_sigma'].set(5, min=0.1, max=50)
        pars['gauss_main_amplitude'].set(10, min=0, max=150)
        p = figure(plot_width=800, plot_height=400, title=filename_list[i] + ' with curve fit')
        p.line(data_list[i]['x'], data_list[i]['y'], line_dash = (4, 4), line_width=2, color='skyblue')
        if len(peak_index) > 1:
            gauss_side = GaussianModel(prefix='gauss_side_')
            pars.update(gauss_side.make_params())
            pars['gauss_side_center'].set(data_list[i]['x'][peak_index[1]], min=data_list[i]['x'][peak_index[1]]-50, max=data_list[i]['x'][peak_index[1]]+50)
            pars['gauss_side_sigma'].set(5, min=0.1, max=50)
            pars['gauss_side_amplitude'].set(10, min=0, max=150)
            mod = exp_mod + gauss_main + gauss_side
            out = mod.fit(data_list[i]['y'], pars, x=data_list[i]['x'])
            comps = out.eval_components(x=data_list[i]['x'])
            p.line(data_list[i]['x'], comps['gauss_side_'], line_width=2, color='lightcoral')
        else:
            mod = exp_mod + gauss_main
            out = mod.fit(data_list[i]['y'], pars, x=data_list[i]['x'])
            comps = out.eval_components(x=data_list[i]['x'])

        main_peak_width = signal.peak_widths(data_list[i]['y'], peak_index, rel_height=0.5)
        print('main peak width: ', main_peak_width.T)
        # fwhm = [out.result.params['gauss_main_fwhm'].value, out.result.params['gauss_side_fwhm'].value]
        # print(fwhm)
        # break
        # print(out.fit_report())
        p.line(data_list[i]['x'], comps['gauss_main_'], line_width=2, color='darkcyan')
        peak_height = data_list[i]['y'][peak_index]
        p.line(data_list[i]['x'], data_list[i]['y'], line_dash = (4, 4), line_width=2, color='skyblue')
        p = plot_peak_fit(p, peak_index, peak_height, data_list[i], filename_list[i])
        p = plot_valley_fit(p, valley_index, data_list[i]['y'][valley_index], data_list[i], filename_list[i])
        grid_cf_list.append(p)
    except:
        print('no peak found in ', filename_list[i])
grid_cf = gridplot(grid_cf_list, ncols=3, plot_width=400, plot_height=400)
show(grid_cf)

    

[findpeaks] >Finding peaks in 1d-vector using [topology] method..
[findpeaks] >Interpolating 1d-vector by factor 10
[findpeaks] >Detect peaks using topology method with limit at 5.
peak index:  [410, 429, 441]
valley index:  [423, 440]
no peak found in  432.3-7
[findpeaks] >Finding peaks in 1d-vector using [topology] method..
[findpeaks] >Interpolating 1d-vector by factor 10
[findpeaks] >Detect peaks using topology method with limit at 5.
peak index:  [256]
valley index:  []
no peak found in  432.3-3
[findpeaks] >Finding peaks in 1d-vector using [topology] method..
[findpeaks] >Interpolating 1d-vector by factor 10
[findpeaks] >Detect peaks using topology method with limit at 5.
peak index:  [279, 299]
valley index:  [290]
no peak found in  432.3-5
[findpeaks] >Finding peaks in 1d-vector using [topology] method..
[findpeaks] >Interpolating 1d-vector by factor 10
[findpeaks] >Detect peaks using topology method with limit at 5.
peak index:  [287, 307]
valley index:  [301]
no peak found in

In [None]:
comps

## Curve fit with Lorentzian

In [None]:
# curve fit
from lmfit.models import GaussianModel, LinearModel, ExponentialModel, LorentzianModel
grid_cf_list = []
for i in range(len(data_list)):
    exp_mod = ExponentialModel(prefix='exp_')
    pars = exp_mod.guess(data_list[i]['y'], x=data_list[i]['x'])
    loren_main = LorentzianModel(prefix='loren_main_')
    pars.update(loren_main.make_params())
    fp = findpeaks(method='topology', interpolate=10, lookahead=100, limit=5)
    try:
        fp_result = fp.fit(data_list[i]['y'])
        peak_index, valley_index = peak_fit_result(fp_result, data_list[i])
        gaussian_fit_result = peakutils.gaussian_fit(data_list[i]['x'], data_list[i]['y'], False)
        pars['loren_main_center'].set(gaussian_fit_result[1], min=data_list[i]['x'][peak_index[0]]-50, max=data_list[i]['x'][peak_index[0]]+50)
        pars['loren_main_sigma'].set(gaussian_fit_result[2], min=0.1, max=50)
        pars['loren_main_amplitude'].set(gaussian_fit_result[0], min=0, max=150)
        mod = exp_mod + loren_main
        out = mod.fit(data_list[i]['y'], pars, x=data_list[i]['x'])
        comps = out.eval_components(x=data_list[i]['x'])
        print(out.fit_report())
        p = figure(plot_width=800, plot_height=400, title=filename_list[i] + ' with curve fit')
        p.line(data_list[i]['x'], data_list[i]['y'], line_dash = (4, 4), line_width=2, color='skyblue')
        p.line(data_list[i]['x'], out.best_fit, line_width=2, color='darkcyan')
        peak_height = data_list[i]['y'][peak_index]
        p.line(data_list[i]['x'], data_list[i]['y'], line_dash = (4, 4), line_width=2, color='skyblue')
        p = plot_peak_fit(p, peak_index, peak_height, data_list[i], filename_list[i])
        p = plot_valley_fit(p, valley_index, data_list[i]['y'][valley_index], data_list[i], filename_list[i])
        grid_cf_list.append(p)
    except:
        print('no peak found in ', filename_list[i])
grid_cf = gridplot(grid_cf_list, ncols=3, plot_width=400, plot_height=400)
show(grid_cf)
