In [1]:
import pandas as pd
from pandas import DataFrame,Series
import math
import copy
from sklearn.preprocessing import MinMaxScaler
from scipy import signal
from numpy import ndarray
from pathlib import Path as P
from typing import Any
from typing import Generator
from joblib import Parallel, delayed
import matplotlib.pyplot as plt
from scipy.special import voigt_profile
import numpy as np
from functools import partial
from scipy.optimize import minimize
import nest_asyncio
nest_asyncio.apply()

# 定义要拟合的函数列表
class peak_funcs:
    
    @staticmethod
    def exp(x, a, b, c):
        return a * np.exp(b * (x-c))
    
    @staticmethod
    def gauss(x, A, mu, sigma):
        return A * np.exp(-(x - mu)**2 / (2 * sigma**2))
    
    @staticmethod
    def lorentz(x, A, mu, gamma):
        return A / (1 + ((x - mu) / gamma)**2)
    
    @staticmethod
    def voigt(x, A, mu, sigma, gamma):
        return A * voigt_profile(x - mu, sigma, gamma)

models = [getattr(peak_funcs, i) for i in dir(peak_funcs) if not i.startswith('__')]

# 定义损失函数
def loss(params, x, y, func):
    y_pred = func(x, *params)
    loss_v = np.sum((y - y_pred) + np.maximum(0.001, 10*(y_pred - y)))
    return loss_v

# 读取excel
def get_data_from_excel(file:P)-> dict[str, ndarray]:
    filename: str= file.stem
    df: DataFrame = pd.read_excel(file, header=None)
    df.dropna(axis=0, how='all', inplace=True)
    df.dropna(axis=1, how='all', inplace=True)
    mask: DataFrame = df.applymap(lambda x: 'nm' == str(x).strip())    ## 在df中寻找值为字符串'nm'索引
    mask_series: DataFrame | Series = mask.stack()                           ## 二维数据打平转化为series(row,col,value)
    indices: list[Any] = mask_series[lambda x: x].index
    if len(indices) > 1:
        raise Exception(f'{filename}: 存在多个值为nm的单元格{indices}')
    elif len(indices) == 0:
        raise Exception(f'{filename}: 不存在值为nm的单元格')
    else:
        arr: ndarray = df.loc[indices[0][0]:,indices[0][1]:].to_numpy()
        name: str = f'{filename}-{str(arr[0][1])}'
        arr[0,1] = name
        return {filename: arr}

# 数据前处理
def pre_process(data:ndarray)-> ndarray:
    scaler = MinMaxScaler()
    arr_normalized = scaler.fit_transform(data.reshape(-1,1)).reshape(-1)
    arr_normalized = signal.savgol_filter(arr_normalized, window_length=10, polyorder=2)
    return arr_normalized, scaler

## 寻找峰值
def get_peaks(data:ndarray, threshold=10)-> list[int]:

    peaks_normal: ndarray
    _property:dict
    peaks_normal, _property = signal.find_peaks(data, prominence=0.002, distance=10)
    peaks_cwt: ndarray = signal.find_peaks_cwt(data, np.arange(1, 10), min_length=4, min_snr=1)
    ## 合并去重,过滤低值
    peaks_merged: list[Any] = sorted(list(set(peaks_normal.tolist() + peaks_cwt.tolist())))
    peaks=[i for i in peaks_merged if data[i] > 0.05]
    ## 筛选主峰
    diffs = np.diff(peaks)
    separators = np.where(diffs >= threshold)[0] + 1
    subarrays= np.split(peaks, separators)
    peaks=[]
    ## 密集区域稀疏化
    for sub in subarrays:
        if len(sub) == 1:
            sub = sub[0]
        else:
            value_in_peaks_normal =np.array([i for i in sub if i in peaks_normal])
            if len(value_in_peaks_normal) == 0:
                sub = sub.mean()
            else:
                index = np.argmin(value_in_peaks_normal - sub.mean())
                sub= value_in_peaks_normal[index]
        peaks.append(sub)
    print('peaks:',peaks)
    return peaks

# 迭代寻找峰值主函数
def iter_peaks(x_data, y_data, iter_num:int|None = None, results:list[dict] = []) -> list[dict]:
    """
    find the best fitting model for each peak.

    Args:
        x_data: The x-axis data points.
        y_data: The y-axis data points.
        iter_num: 最大迭代次数 (optional).
        results: 输出的结果 (optional).

    Returns:
        A list of fitting results, where each result contains:
            - name: The name of the model used for fitting.
            - params: The optimal parameters found for the model.
    """
    try:
        # 识别峰位
        peak_indexs = get_peaks(y_data)
        iter_num = iter_num if iter_num else len(peak_indexs)

        # 计算最高峰位的相关信息
        scale = len(y_data)
        max_peak_index= np.argmax(y_data[peak_indexs])
        max_intensity = y_data[peak_indexs[max_peak_index]]
        center = peak_indexs[max_peak_index] / scale
        _width_scipy=signal.peak_widths(y_data, [peak_indexs[max_peak_index]], rel_height=0.5)[0][0] / scale
        width = _width_scipy if _width_scipy > 0.02 else 0.02

        # 设置不同模型拟合函数和初猜值
        tasks = []
        for model in models:
            initial_func_guess=[]
            if model.__name__ in ['gauss','lorentz']:
                initial_func_guess = [max_intensity,center,width]
            elif model.__name__ == 'voigt':
                initial_func_guess = [max_intensity/4, center, width-0.01, width/2-0.01]
            elif model.__name__ == 'exp':
                initial_func_guess = [1.0, -10.0, -0.01]
            params = {
                'fun': partial(loss, func=model),
                'x0': initial_func_guess,
                'args': (x_data, y_data)
            }
            tasks.append({'name': model.__name__, 'params': copy.deepcopy(params)})

        # 并行加速运行拟合函数，并行失败，待研究
        ## task_results = Parallel(n_jobs=-1)(delayed(minimize)(**task['params']) for task in tasks)
        task_results=[minimize(**task['params']) for task in tasks]

        # 过滤拟合失败的结果
        task_results_filtered= [result for result in task_results if not math.isnan(result.fun)]

        # 选择拟合最好的模型
        optimal_fit_info = min(task_results_filtered, key=lambda x: x.fun)
        optimal_index = task_results.index(optimal_fit_info)
        optimal_params= optimal_fit_info.x
        model_func = models[optimal_index]

        # 保存当前拟合的最优模型参数
        results.append({
            'name': model_func.__name__,
            'params': optimal_params,
        })

        # 初始数据减去拟合函数的值，生成新的待拟合数据
        y_fit= model_func(x_data, *optimal_params)
        y_new = y_data - y_fit

        # 递归拟合上一步的残差, 直至iter_num == 0
        iter_num -= 1
        if iter_num != 0:
            return iter_peaks(x_data, y_new, iter_num, results)
        else:
            return results
    except Exception as e:
        print(f'peak process error in the {iter_num} iteration: {e}')

In [2]:
p_source: Generator[P, None, None] = P('./excel/UV统一格式').glob('**/*.xlsx')
results: list | None = Parallel(n_jobs=-1)(delayed(get_data_from_excel)(i) for i in p_source)
_results=copy.deepcopy(results)

In [3]:
models[2](0.3,*[0.24982069672131227, 0.5336658354114713, 0.26760417134639103])

0.14174720502522295

In [4]:
data = list(_results[5].values())[0][1:,1]

# 前处理数据
y_data, scaler= pre_process(data[::-1])
# scaler.inverse_transform(y_data.reshape(-1, 1)).reshape(-1)
x_data = np.linspace(0, 1, len(y_data))

# 拟合
result = iter_peaks(x_data, y_data)
print(result)

peaks: [4, 31, 60, 75, 95]
peaks: [29, 60, 98]
peaks: [29, 60, 98]


  _width_scipy=signal.peak_widths(y_data, [peak_indexs[max_peak_index]], rel_height=0.5)[0][0] / scale
  _width_scipy=signal.peak_widths(y_data, [peak_indexs[max_peak_index]], rel_height=0.5)[0][0] / scale
  loss_v = np.sum((y - y_pred) + np.maximum(0.001, 10*(y_pred - y)))


peaks: [29, 60, 104]
peaks: [29, 49, 104]
[{'name': 'gauss', 'params': array([ 1.38751182, -0.1563219 ,  0.1787239 ])}, {'name': 'gauss', 'params': array([ 0.01334158,  0.6651137 , -0.14661715])}, {'name': 'voigt', 'params': array([ 0.04522934,  0.21267611,  0.05175714, -0.02124526])}, {'name': 'lorentz', 'params': array([0.40651891, 0.1495361 , 0.01006938])}, {'name': 'gauss', 'params': array([ 0.25193942,  0.25804819, -0.01586599])}]


  loss_v = np.sum((y - y_pred) + np.maximum(0.001, 10*(y_pred - y)))
  loss_v = np.sum((y - y_pred) + np.maximum(0.001, 10*(y_pred - y)))
  loss_v = np.sum((y - y_pred) + np.maximum(0.001, 10*(y_pred - y)))
  loss_v = np.sum((y - y_pred) + np.maximum(0.001, 10*(y_pred - y)))


In [None]:
# 定义要拟合的函数列表
class peak_funcs:
    
    @staticmethod
    def exp(x, a, b, c):
        return a * np.exp(b * (x-c))
    
    @staticmethod
    def gauss(x, A, mu, sigma):
        return A * np.exp(-(x - mu)**2 / (2 * sigma**2))
    
    @staticmethod
    def lorentz(x, A, mu, gamma):
        return A / (1 + ((x - mu) / gamma)**2)
    
    @staticmethod
    def voigt(x, A, mu, sigma, gamma):
        return A * voigt_profile(x - mu, sigma, gamma)

models = [getattr(peak_funcs, i) for i in dir(peak_funcs) if not i.startswith('__')]

def loss(params, x, y, func):
    y_pred = func(x, *params)
    loss_v = np.sum((y - y_pred) + np.maximum(0.001, 10*(y_pred - y)))
    return loss_v

tasks = []
for model in models:
    initial_func_guess = [1.0, -10.0, -0.01]
    params = {
        'fun': loss,
        'x0': initial_func_guess,
        'args':(x_data, y_data, model)
    }
    tasks.append({'name': model.__name__, 'params': copy.deepcopy(params)})
# 并行加速运行拟合函数
#task_results=[minimize(**task['params']) for task in tasks]
task_results = Parallel(n_jobs=-1)(delayed(minimize)(**task['params']) for task in tasks)


In [None]:
import numpy as np
from scipy import signal
import copy
_results=copy.deepcopy(results)
data = list(_results[13].values())[0][1:,1]
y_data, scaler= pre_process(data[::-1])
# scaler.inverse_transform(y_data.reshape(-1, 1)).reshape(-1)
x_data = np.linspace(0, 1, len(y_data))
peak_indexs: list[int] = get_peaks(y_data)
plt.plot(x_data,y_data)
plt.plot(x_data[peak_indexs], y_data[peak_indexs], "x")
plt.show()


In [None]:
import numpy as np
from functools import partial
from scipy.optimize import minimize

# 定义要拟合的函数列表
models = [getattr(peak_funcs, i) for i in dir(peak_funcs) if not i.startswith('__')]

# 定义损失函数
def loss(params, x, y, func):
    y_pred = func(x, *params)
    loss_v = np.sum((y - y_pred) + np.maximum(0.001, 10*(y_pred - y)))
    return loss_v

# 前处理数据，获取峰值属性
y_data, scaler= pre_process(data[::-1])
# scaler.inverse_transform(y_data.reshape(-1, 1)).reshape(-1)
x_data = np.linspace(0, 1, len(y_data))


def iter_peaks(x_data, y_data, iter_num:int|None = None, results:list[dict] = []) -> list[dict]:
    
    try:
        # 识别峰位
        peak_indexs = get_peaks(y_data)
        iter_num = iter_num if iter_num else len(peak_indexs)

        # 计算最高峰位的相关信息
        scale = len(y_data)
        max_peak_index= np.argmax(y_data[peak_indexs])
        max_intensity = y_data[peak_indexs[max_peak_index]]
        center = peak_indexs[max_peak_index] / scale
        _width_scipy=signal.peak_widths(y_data, [peak_indexs[max_peak_index]], rel_height=0.5)[0][0] / scale
        width = _width_scipy if _width_scipy > 0.02 else 0.02

        # 设置不同模型拟合函数和初猜值
        tasks = []
        for model in models:
            if model.__name__ in ['gauss','lorentz']:
                initial_func_guess = [max_intensity,center,width]
            elif model.__name__ == 'voigt':
                initial_func_guess = [max_intensity/4, center, width-0.01, width/2-0.01]
            elif model.__name__ == 'exp':
                initial_func_guess = [1.0, -10.0, -0.01]
            params = {
                'fun':partial(loss, func=model),
                'x0':initial_func_guess,
                'args':(x_data, y_data)
            }
            tasks.append({'name': model.__name__, 'params': params})

        # 并行加速运行拟合函数
        task_results = Parallel(n_jobs=-1)(delayed(minimize)(**task['params'])  for task in tasks)

        # 过滤拟合失败的结果
        task_results_filtered= [result for result in task_results if not math.isnan(result.fun)]

        # 选择拟合最好的模型
        optimal_fit_info = min(task_results_filtered, key=lambda x: x.fun)
        optimal_index = task_results.index(optimal_fit_info)
        optimal_params= optimal_fit_info.x
        model_func = models[optimal_index]

        # 初始数据减去拟合函数的值，生成新的待拟合数据
        y_fit= model_func(x_data, *optimal_params)
        y_new = y_data - y_fit

        
        iter_num -= 1
    except Exception as e:
        print(f'peak process error in the {iter_num} iteration: {e}')

    if iter_num != 0:
        # Recursively fit the remaining peaks
        return iter_peaks(x_data, y_new, iter_num, results)
    else:
        return results




## 使用偏函数设定每个peak_fun的损失函数
# loss_partials = [partial(loss, func=model) for model in models]

# initial_func_guess = [1,0.2,0.5]
# result = minimize(loss_partials[1], initial_func_guess, args=(np.linspace(0, 1, len(data_arr)), data_arr))

print(results)

In [None]:
results_index=1
y_pred=models[results_index](np.linspace(0, 1, len(y_data)),*results[results_index].x)#*fit_results[1]['params'])
plt.plot(np.linspace(0, 1, len(y_data)),y_data)
plt.plot(np.linspace(0, 1, len(y_data)), y_pred, "r")
plt.show()
print(models[3])

In [None]:
x_data = np.linspace(0, 1, len(y_data))
new_data = [y_data - models[i](x_data, *v.x) for i,v in enumerate(results)]
errors=[sum(i) for i in new_data]
errors

In [None]:
plt.plot(np.linspace(0, 1, len(y_data)), y_data-y_pred)

In [None]:
import numpy as np
from scipy.optimize import minimize

# 定义要拟合的函数
def func(x, a, b, c):
    return a * np.exp(-b * x) + c

# 生成一些模拟数据
x_data = np.linspace(0, 4, 50)
y = func(x_data, 2.5, 1.3, 0.5)
np.random.seed(1729)
y_noise = 0.2 * np.random.normal(size=x_data.size)
y_data = y + y_noise

# 定义自定义的损失函数
def custom_loss_function(params, x, y):
    a, b, c = params
    y_pred = func(x, a, b, c)
    # 这里可以根据需要定义自己的损失函数，比如最大似然估计等
    # 这里使用简单的平方损失作为示例
    print(y_pred)
    loss = np.sum((y - y_pred)**2)
    return loss

# 使用minimize进行拟合，传入自定义的损失函数
initial_guess = [1.0, 1.0, 1.0]
result = minimize(fun=custom_loss_function, x0=initial_guess, args=(x_data, y))

# 输出拟合的参数
print(result.x)

In [None]:
peak_property

In [None]:
signal.peak_prominences(data[:,1], [248, 269, 289, 305, 383],)

In [None]:
initial_guess = []
for i in peakind:
    width=signal.peak_widths(data_arr, [i], rel_height=0.5)[0][0]
    height=signal.peak_prominences(data_arr, [i])[0][0]
    center = i
    amplitude = height if height != 0 else 0.1
    sigma = width/2.355 if width != 0 else 0.1
    gamma = sigma/2 if width != 0 else 0.1
    initial_guess.extend([center,amplitude,sigma,gamma])
    print(center,amplitude,sigma,gamma)

In [None]:
peak_property,data[:,0][peakind]

In [None]:
results_half=signal.peak_widths(data[:,1], peakind, rel_height=0.5)
plt.plot(data[:,1])
plt.plot(peakind, data[:,1][peakind], "x")
plt.hlines(*results_half[1:], color="C2")
plt.show()

In [None]:
peakind

In [None]:
signal.peak_widths(data[:,1], [185], rel_height=0.5)

In [None]:
signal.peak_prominences(data[:,1],peakind)

In [None]:
results_full = signal.peak_widths(data[:,1], [185], rel_height=1)
results_full

In [None]:
data[:,0][peakind],len(peakind)

In [None]:
import numpy as np
from scipy.optimize import curve_fit
from scipy.special import wofz

# 定义Voigt函数
def voigt(x, center, amplitude, sigma, gamma):
    """
    Voigt函数是高斯函数和洛伦兹函数的卷积。
    center: 峰的中心位置
    amplitude: 峰的高度
    sigma: 高斯分量的标准偏差
    gamma: 洛伦兹分量的半宽度
    """
    z = ((x-center) + 1j*gamma) / (sigma*np.sqrt(2))
    return amplitude * np.real(wofz(z)) / (sigma*np.sqrt(2*np.pi))

# 构建多个Voigt峰的组合函数
def multiple_voigt(x, *params):
    """
    params: 一个包含所有Voigt峰参数的列表，每个Voigt峰需要4个参数: center, amplitude, sigma, gamma
    """
    y = np.zeros_like(x,dtype=np.float64)
    for i in range(0, len(params), 4):
        center = params[i]
        amplitude = params[i+1]
        sigma = params[i+2]
        gamma = params[i+3]
        y += voigt(x, center, amplitude, sigma, gamma)
    return y

# 假设的光谱数据及其噪声
xdata = range(len(data_arr))
ydata = data_arr

# 初始猜测
initial_guess = []
for i in peakind:
    width=signal.peak_widths(data_arr, [i], rel_height=0.5)[0][0]
    height=signal.peak_prominences(data_arr, [i])[0][0]
    amplitude = 20*data_arr[i]
    sigma = width/2.355 if width != 0 else 10
    gamma = sigma/16 if width != 0 else 1
    initial_guess.extend([center,amplitude,sigma,gamma])
    print(center,amplitude,sigma,gamma)

# 执行拟合
popt, pcov = curve_fit(multiple_voigt, xdata, ydata, p0=initial_guess, maxfev=10000,method='trf')

# 输出最优拟合参数
print(popt)

In [None]:
initial_guess = []
for i in peakind:
    width=signal.peak_widths(data_arr, [i], rel_height=0.5)[0][0]
    height=signal.peak_prominences(data_arr, [i])[0][0]
    center = i
    amplitude = 10*data_arr[i]*(1+height)
    sigma = width if width != 0 else 10
    gamma = sigma/100 if width != 0 else 0.02
    initial_guess.extend([center,amplitude,sigma,gamma])
    print(center,amplitude,sigma,gamma)


In [None]:
plt.plot(xdata,ydata, "c")
plt.plot(xdata,multiple_voigt(xdata,*initial_guess), "r")
plt.show()

In [None]:
sss=np.zeros_like([5,2])
type(sss[0])

In [2]:
import logger_config

logger = logger_config.get_logger(__name__)
logger.debug('This is a debug message')

In [3]:
import sys
sys.path

['c:\\Users\\chensq\\Desktop\\webApp\\webdist.1.0\\backend01',
 'F:\\program\\chemoffice\\ChemScript\\Lib',
 'f:\\program\\miniconda\\python311.zip',
 'f:\\program\\miniconda\\DLLs',
 'f:\\program\\miniconda\\Lib',
 'f:\\program\\miniconda',
 '',
 'f:\\program\\miniconda\\Lib\\site-packages',
 'f:\\program\\miniconda\\Lib\\site-packages\\win32',
 'f:\\program\\miniconda\\Lib\\site-packages\\win32\\lib',
 'f:\\program\\miniconda\\Lib\\site-packages\\Pythonwin']