# This code is intended solely for testing purposes and is not to be used for any commercial activities.

In [14]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt


## 产生数据

In [15]:
import numpy as np
import itertools

def gaussian(x, A, cen, sigma):
    ln2 = np.log(2)
    PI = np.pi
    amplitude = A  # 振幅乘以浓度
    return (amplitude * np.sqrt(4 * ln2) / (sigma * np.sqrt(PI))) * np.exp(-4 * ln2 * (x - cen)**2 / (sigma**2))

def generate_data(areas, centers, sigma, noise_level):
    X_data = []
    y_data = []
    x = np.linspace(0, 100, 100)  # x轴范围和点数

    # 生成所有可能的峰组合
    combinations = list(itertools.product(areas, repeat=len(centers)))

    for combination in combinations:
        y_total = np.zeros_like(x)
        for area, center in zip(combination, centers):
            y_total += gaussian(x, area, center, sigma)
        
        # 加入噪声
        y_total += np.random.normal(0, noise_level, x.shape)
    
        X_data.append(x.tolist())
        y_combination = []
        for yi in y_total:
            y_combination.append([yi, combination[0], centers[0], sigma])
        y_data.append(y_combination)
    
    return np.array(X_data), np.array(y_data)


## 产生数据并存入excel

In [16]:
import os
def generate_data_set(area, center, sigma, noise_level, file_name="gaussian_peaks.xlsx"):
    
    # 删除之前的数据
    if os.path.exists(file_name):
        os.remove(file_name)

    # 生成数据
    X_data, y_data = generate_data(area, center, sigma, noise_level)

    # 查看前三个样本
    """
    for i in range(3):  
        print(f"Labels for Sample {i+1}:")
        for j, label in enumerate(y_data[i]):
            print(f"  Y: {label[0]}, Area: {label[1]}, Center: {label[2]}, Sigma: {label[3]}")
    """

    # 导出数据到Excel
    max_rows_per_sheet = 1048576  # Excel单个工作表的最大行数
    rows_per_sample = 100  # 每个样本的行数
    samples_per_sheet = max_rows_per_sheet // rows_per_sample

    # 转换元数据为DataFrame
    meta_data_list = []
    for i in range(len(y_data)):
        sample_number = i + 1
        for j, label in enumerate(y_data[i]):
            meta_data_list.append({
                "Sample": sample_number,
                "Point": j + 1,
                "Y": label[0],
                "Area": label[1],
                "Center": label[2],
                "Sigma": label[3]
            })

    df_meta = pd.DataFrame(meta_data_list)

    # 转换实际数据为DataFrame
    data_list = []
    for i in range(len(X_data)):
        sample_number = i + 1
        for x_value in X_data[i]:
            data_list.append({
                "Sample": sample_number,
                "X": x_value
            })

    df_data = pd.DataFrame(data_list)

    # 将数据分成多个工作表
    with pd.ExcelWriter(file_name) as writer:
        df_meta.to_excel(writer, sheet_name="Metadata", index=False)
        
        for start in range(0, len(X_data), samples_per_sheet):
            end = min(start + samples_per_sheet, len(X_data))
            df_data_subset = df_data[df_data["Sample"].between(start + 1, end)]
            df_data_subset.to_excel(writer, sheet_name=f"Data_{start // samples_per_sheet + 1}", index=False)

    print(f"Data successfully exported to {file_name}")

    # 自动打开Excel文件（根据需要取消注释）
    # os.system("start EXCEL.EXE gaussian_peaks.xlsx")

    return X_data, y_data



In [17]:
def test_combinations(Training_area, Training_center, sigma, noise_level):
    # 生成所有可能的峰组合
    combinations = list(itertools.product(Training_area, repeat=len(Training_center)))
    x = np.linspace(0, 100, 100)  # x轴范围和点数

    for combination in combinations:
        y_total = np.zeros_like(x)
        label = []
        for area, center in zip(combination, Training_center):
            y_total += gaussian(x, area, center, sigma)
            label.append({
                "area": area, 
                "center": center, 
                "sigma": sigma, 
                "noise_level": noise_level
            })
    
    # 打印所有生成的组合
    for combination in combinations:
        print(combination)
#test_combinations(Training_area, Training_center, sigma, noise_level)

#X_data, y_data, labels = generate_data_set(Training_area, Training_center, sigma, noise_level,file_name="gaussian_peaks.xlsx")


"""
# 分别画出后10个样本
for i in range(10):
    plt.plot(X_data[i], y_data[i])
    plt.title(f"Generated Data with Peaks {i+1}")
    plt.xlabel("X")
    plt.ylabel("Y")
    plt.show()
"""




'\n# 分别画出后10个样本\nfor i in range(10):\n    plt.plot(X_data[i], y_data[i])\n    plt.title(f"Generated Data with Peaks {i+1}")\n    plt.xlabel("X")\n    plt.ylabel("Y")\n    plt.show()\n'

## 筛选出类别1

In [18]:
def normal_peak_picking(x, y_total, user_scales, noise_level, peak_diag=2):
    all_p1, all_p2, all_p_type = [], [], []
    count_finish = 0

    for idx, (row_x, row_y_data, user_scale) in enumerate(zip(x, y_total, user_scales)):
        row_y = [item[0] for item in row_y_data]  # Extract y values
        min_intensity = noise_level * user_scale
        #print(f"Sample {idx+1}: Minimal peak intensity is set to {min_intensity}")

        count_class1 = 0
    
        p1, p2, p_type = [], [], []
        for i in range(1, len(row_x) - 1):
            if row_y[i] > row_y[i - 1] and row_y[i] > row_y[i + 1] and row_y[i] > min_intensity:
                ndiag = 0
                if i > 1 and row_y[i] > row_y[i - 2]:
                    ndiag += 1
                if i < len(row_x) - 2 and row_y[i] > row_y[i + 2]:
                    ndiag += 1
                if i > 1 and row_y[i] > row_y[i - 2]:
                    ndiag += 1
                if i < len(row_x) - 2 and row_y[i] > row_y[i + 2]:
                    ndiag += 1
                if ndiag >= peak_diag:
                    p1.append(row_x[i])
                    p2.append(row_y[i])
                    p_type.append(1)
                    count_class1 += 1

        #print(f"Sample {idx+1}: Found {count_class1} class1 peaks")
        
        if count_class1 >= 5:  # 修改为 >= 5，以确保至少有 5 个峰值
            count_finish += 1
        all_p1.append(p1)
        all_p2.append(p2)
        all_p_type.append(p_type)

    print(f"Class1 has been successfully classified")
        
    return all_p1, all_p2, all_p_type, count_finish

## 筛选出类别2

In [19]:
from scipy.ndimage import gaussian_laplace

def get_median_width_x(sigmas):
    """Calculate the median width of the Gaussian peaks."""
    return np.median(sigmas)


def laplacing_of_gaussian_convolution(data, sigma):
    """Apply Laplacian of Gaussian convolution to 2D data."""
    return gaussian_laplace(data, sigma=sigma)

def shoulder_peak_picking(x, y_total, user_scales, noise_level, median_width_x, peak_diag=2):
    all_p1, all_p2, all_p_type = [], [], []

    for idx, (row_x, row_y_data, user_scale) in enumerate(zip(x, y_total, user_scales)):
        row_y = [item[0] for item in row_y_data]  # Extract y values
        min_intensity = noise_level * user_scale
        #print(f"Sample {idx+1}: Minimal peak for shoulder intensity is set to {min_intensity}")

        xdim = len(row_x)
        nshoulder1 = int(median_width_x / 2)

        peak_map = np.zeros(xdim, dtype=int)
        p1, p2, p_type = [], [], []

        shoulder = laplacing_of_gaussian_convolution(row_y, median_width_x / np.sqrt(2))

        for i in range(1, xdim - 1):
            if (shoulder[i] > shoulder[i - 1] and shoulder[i] > shoulder[i + 1] and 
                row_y[i] > min_intensity and peak_map[i] == 0):
                
                ndiag = 0
                if i > 1 and shoulder[i] > shoulder[i - 2]:
                    ndiag += 1
                if i < xdim - 2 and shoulder[i] > shoulder[i + 2]:
                    ndiag += 1
                if i > 1 and shoulder[i] > shoulder[i - 2]:
                    ndiag += 1
                if i < xdim - 2 and shoulder[i] > shoulder[i + 2]:
                    ndiag += 1

                if ndiag >= peak_diag:
                    p1.append(row_x[i])
                    p2.append(row_y[i])
                    p_type.append(2)
                    peak_map[i] = 1  # Mark the peak as detected

        all_p1.append(p1)
        all_p2.append(p2)
        all_p_type.append(p_type)

    print(f"Class2 has been successfully classified")

    return all_p1, all_p2, all_p_type

In [20]:
import numpy as np
import pandas as pd

def save_peaks_to_excel(x, y_total, class1_results, class2_results):
    class1_count = 0
    class2_count = 0
    count_all = 0

    # 创建空的训练数据和目标标签列表
    train_x = []
    train_y = []
    
    p1_class1, p2_class1, p_type_class1, count_all_class1 = class1_results
    p1_class2, p2_class2, p_type_class2 = class2_results

    for idx, (row_x, row_y_data) in enumerate(zip(x, y_total)):
        # 提取附加信息
        areas = [item[1] for item in row_y_data]
        centers = [item[2] for item in row_y_data]
        sigmas = [item[3] for item in row_y_data]

        # 初始化所有点为 class 0
        classes = np.zeros(len(row_y_data), dtype=int)

        # 将已分类的点标记为 class 1
        class1_present = False
        for px, py, pt in zip(p1_class1[idx], p2_class1[idx], p_type_class1[idx]):
            idx_x = np.where(row_x == px)[0]
            if len(idx_x) > 0:
                classes[idx_x[0]] = pt
                class1_present = True
        
        if class1_present:
            class1_count += 1

        # 将已分类的点标记为 class 2
        class2_present = False
        for px, py, pt in zip(p1_class2[idx], p2_class2[idx], p_type_class2[idx]):
            idx_x = np.where(row_x == px)[0]
            if len(idx_x) > 0:
                classes[idx_x[0]] = pt
                class2_present = True
        
        if class2_present:
            class2_count += 1

        # 将附加信息与分类标签一起添加到 train_y
        train_x.append(list(row_x))
        train_y.append([[cls, area, center, sigma] for cls, area, center, sigma in zip(classes, areas, centers, sigmas)])

        count_all += 1

    return class1_count, class2_count, count_all, train_x, train_y


##  Find user_scale and  create training and test set

In [21]:
# Training set
Training_area = [0,1,5,50,95,100]
Training_center = [20,30,40,60,70]
sigma =  8   # 固定半高全宽
noise_level = 1/300  # 噪声水平

"""
Training_area = [0,5,40,60,95,100]
Training_center = [20,30,40,60,70]
sigma =  8   # 固定半高全宽
noise_level = 1/300  # 噪声水平
"""

# Test set
Test_area =  [0,2,30,60,100] 
Test_center = [20,30,40,60,70]  
sigma = 8    # 固定半高全宽
noise_level = 1/300  # 噪声水平

"""
Test_area =  [0,30,60,100] 
Test_center = [20,30,40,60,70]  
sigma = 8    # 固定半高全宽
noise_level = 1/300  # 噪声水平
"""

def test_combinations(Training_area, Training_center, sigma, noise_level):
    # 生成所有可能的峰组合
    combinations = list(itertools.product(Training_area, repeat=len(Training_center)))
    x = np.linspace(0, 100, 100)  # x轴范围和点数

    for combination in combinations:
        y_total = np.zeros_like(x)
        label = []
        for area, center in zip(combination, Training_center):
            y_total += gaussian(x, area, center, sigma)
            label.append({
                "area": area, 
                "center": center, 
                "sigma": sigma, 
                "noise_level": noise_level
            })
    
    # 打印所有生成的组合
    for combination in combinations:
        print(combination)
#test_combinations(Training_area, Training_center, sigma, noise_level)

# Training set
X_data, y_data = generate_data_set(Training_area, Training_center, sigma, noise_level,file_name="gaussian_peaks_train.xlsx")
train_user_scales = [np.max([item[0] for item in sublist]) for sublist in y_data]


# Test set
X_test_data, y_test_data= generate_data_set(Test_area, Test_center, sigma, noise_level,file_name="gaussian_peaks_test.xlsx")
test_user_scales = [np.max([item[0] for item in sublist]) for sublist in y_test_data]


print(f"X_data shape: {X_data.shape}")
print(f"y_data shape: {y_data.shape}")
"""
# 分别画出后10个样本
for i in range(10):
    plt.plot(X_data[i], y_data[i])
    plt.title(f"Generated Data with Peaks {i+1}")
    plt.xlabel("X")
    plt.ylabel("Y")
    plt.show()
"""

Data successfully exported to gaussian_peaks_train.xlsx
Data successfully exported to gaussian_peaks_test.xlsx
X_data shape: (7776, 100)
y_data shape: (7776, 100, 4)


'\n# 分别画出后10个样本\nfor i in range(10):\n    plt.plot(X_data[i], y_data[i])\n    plt.title(f"Generated Data with Peaks {i+1}")\n    plt.xlabel("X")\n    plt.ylabel("Y")\n    plt.show()\n'

In [22]:
#test shoulder_peak_picking
median_width_x = get_median_width_x(sigma)
#print(y_data.shape)
class1_results = normal_peak_picking(X_data, y_data, train_user_scales, noise_level)
class2_results = shoulder_peak_picking(X_data, y_data,train_user_scales, noise_level,median_width_x)



"""
if os.path.exists("Train_Classification.xlsx"):
    os.remove("Train_Classification.xlsx")
"""

#with pd.ExcelWriter("Train_Classification.xlsx") as writer:

count1,count2,count_all,train_x,train_y = save_peaks_to_excel(X_data, y_data, class1_results, class2_results)



#os.system("start EXCEL.EXE Classification.xlsx")
print(class1_results[0][0])


Class1 has been successfully classified
Class2 has been successfully classified
[3.0303030303030303, 5.050505050505051, 7.070707070707071, 10.101010101010102, 13.131313131313131, 15.151515151515152, 19.191919191919194, 21.212121212121215, 25.252525252525253, 29.292929292929294, 32.323232323232325, 34.343434343434346, 37.37373737373738, 39.3939393939394, 42.42424242424243, 44.44444444444445, 47.47474747474748, 50.505050505050505, 53.535353535353536, 56.56565656565657, 61.61616161616162, 64.64646464646465, 67.67676767676768, 69.6969696969697, 76.76767676767678, 80.80808080808082, 85.85858585858587, 90.90909090909092, 93.93939393939395, 97.97979797979798]


## 产生训练数据


In [23]:
import pandas as pd
"""
def create_training_set(train_x, train_y, file_name="train_data.xlsx"):

    if os.path.exists(file_name):
        os.remove(file_name)

    with pd.ExcelWriter(file_name) as writer:
        for i, (x, y) in enumerate(zip(train_x, train_y)):
            # 将 x 和 y 转换为 pandas DataFrame
            df = pd.DataFrame({
                'X': x,
                'Y': y
            })

            # 将 DataFrame 写入一个新的工作表
            df.to_excel(writer, sheet_name=f'Sample_{i+1}', index=False)



creation = create_training_set(train_x, train_y, file_name="train_data.xlsx")
"""

'\ndef create_training_set(train_x, train_y, file_name="train_data.xlsx"):\n\n    if os.path.exists(file_name):\n        os.remove(file_name)\n\n    with pd.ExcelWriter(file_name) as writer:\n        for i, (x, y) in enumerate(zip(train_x, train_y)):\n            # 将 x 和 y 转换为 pandas DataFrame\n            df = pd.DataFrame({\n                \'X\': x,\n                \'Y\': y\n            })\n\n            # 将 DataFrame 写入一个新的工作表\n            df.to_excel(writer, sheet_name=f\'Sample_{i+1}\', index=False)\n\n\n\ncreation = create_training_set(train_x, train_y, file_name="train_data.xlsx")\n'

In [24]:
import pandas as pd
import os
from concurrent.futures import ThreadPoolExecutor

def generate_data_frame(x, y_data):
    # 提取 Y 值和附加信息
    y_values = [item[0] for item in y_data]
    areas = [item[1] for item in y_data]
    centers = [item[2] for item in y_data]
    sigmas = [item[3] for item in y_data]
        
    # 创建 DataFrame
    return pd.DataFrame({'X': x, 'Y': y_values, 'Area': areas, 'Center': centers, 'Sigma': sigmas})

def create_training_set(train_x, train_y, file_name="train_data.xlsx"):
    if os.path.exists(file_name):
        os.remove(file_name)

    # 生成所有需要写入的数据
    data_to_write = []
    with ThreadPoolExecutor() as executor:
        futures = [executor.submit(generate_data_frame, x, y) for x, y in zip(train_x, train_y)]
        for i, future in enumerate(futures):
            df = future.result()
            data_to_write.append((df, f'Sample_{i+1}'))

    with pd.ExcelWriter(file_name, engine='xlsxwriter') as writer:
        for df, sheet_name in data_to_write:
            df.to_excel(writer, sheet_name=sheet_name, index=False)

# 示例调用
create_training_set(train_x, train_y, file_name="train_data.xlsx")

## 产生测试数据

In [25]:
median_width_x = get_median_width_x(sigma)
#print(y_data.shape)
test_class1_results = normal_peak_picking(X_test_data, y_test_data, test_user_scales, noise_level)
test_class2_results = shoulder_peak_picking(X_test_data, y_data,test_user_scales, noise_level,median_width_x)

"""
if os.path.exists("Test_Classification.xlsx"):
    os.remove("Test_Classification.xlsx")

with pd.ExcelWriter("Test_Classification.xlsx") as writer:"""
count1,count2,count_all,test_x,test_y = save_peaks_to_excel(X_test_data, y_test_data, test_class1_results , test_class2_results)
    
#os.system("start EXCEL.EXE Classification.xlsx")
print(count1)
print(count2)
print(count_all)
print(len(test_x))
print(len(test_y))

Class1 has been successfully classified
Class2 has been successfully classified
1024
1024
1024
1024
1024


In [26]:
import pandas as pd
import os
from concurrent.futures import ThreadPoolExecutor

def generate_data_for_sheet(x, y_data):
    y_values = [item[0] for item in y_data]
    areas = [item[1] for item in y_data]
    centers = [item[2] for item in y_data]
    sigmas = [item[3] for item in y_data]
    
    # 创建 DataFrame
    return pd.DataFrame({'X': x, 'Y': y_values, 'Area': areas, 'Center': centers, 'Sigma': sigmas})

def create_test_set(train_x, train_y, file_name="test_data.xlsx"):
    if os.path.exists(file_name):
        os.remove(file_name)

    # 生成所有需要写入的数据
    data_to_write = []
    for i, (x, y) in enumerate(zip(train_x, train_y)):
        data_to_write.append((generate_data_for_sheet(x, y), f'Sample_{i+1}'))

    with pd.ExcelWriter(file_name, engine='xlsxwriter') as writer:
        # 并行写入
        with ThreadPoolExecutor() as executor:
            futures = [executor.submit(write_sheet, writer, data, sheet_name) for data, sheet_name in data_to_write]
            for future in futures:
                future.result()  # 等待所有线程完成

def write_sheet(writer, data, sheet_name):
    data.to_excel(writer, sheet_name=sheet_name, index=False)


create_test_set(test_x,test_y , file_name="test_data.xlsx")