In [None]:
# 创建文件夹
import os
import sys

def create_folder(folder_name):
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)
        print("文件夹创建成功")
    else:
        print("文件夹已存在")
pwd="/home/jhzhai/Nucleation/peek-find/cool700/nc2600"
# pwd="/home/jhzhai/Nucleation/peek-find/cool800/nc1600"
# pwd="/home/jhzhai/Nucleation/peek-find/cool900/nc1200"
# pwd="/home/jhzhai/Nucleation/peek-find/cool1000/nc800"
# pwd="/home/jhzhai/Nucleation/peek-find/cool1100/nc600"
create_folder(pwd)

In [None]:
import os

# Define the folder structure
folder_structure = [
    "PEM4Fe/src",
    "PEM4Fe/src/__init__.py",
    "PEM4Fe/src/config.py",
    "PEM4Fe/src/data_processing.py",
    "PEM4Fe/src/fitting.py",
    "PEM4Fe/src/peak_detection.py",
    "PEM4Fe/src/visualization.py",
    "PEM4Fe/src/output.py",
    "PEM4Fe/src/main.py",
    "PEM4Fe/tests",
    "PEM4Fe/tests/__init__.py",
    "PEM4Fe/tests/test_config.py",
    "PEM4Fe/tests/test_data_processing.py",
    "PEM4Fe/tests/test_fitting.py",
    "PEM4Fe/tests/test_peak_detection.py",
    "PEM4Fe/tests/test_visualization.py",
    "PEM4Fe/tests/test_output.py",
    "PEM4Fe/config.json",
    "PEM4Fe/requirements.txt",
    "PEM4Fe/README.md"
]

# Create the folders and files
for path in folder_structure:
    if path.endswith('.py') or path.endswith('.json') or path.endswith('.txt') or path.endswith('.md'):
        # Create file
        with open(path, 'w') as f:
            pass
    else:
        # Create directory
        os.makedirs(path, exist_ok=True)

print("文件夹结构已创建")


以下是进一步优化代码的建议，整理为列表形式：

---

### **路径依赖性改进**
1. **参数化路径**：
   - 使用配置文件（如 JSON、YAML）或命令行参数传递路径，避免硬编码路径。
   - 示例：通过 `argparse` 模块让用户在运行脚本时指定路径。
2. **动态路径检测**：
   - 检测当前运行环境的路径，并根据环境自动调整路径设置。

---

### **默认参数配置**
1. **参数动态调整**：
   - 引入基于数据特征的动态参数调整（如根据数据分布自动设置 `width_threshold` 和 `num_density`）。
2. **参数文件化**：
   - 将所有默认参数放入配置文件，方便统一管理和修改。
3. **用户可调参数**：
   - 提供命令行或交互界面，让用户动态修改关键参数。

---

### **性能优化**
1. **启用并行处理**：
   - 使用 `joblib` 的 `Parallel` 和 `delayed` 并行化对每一列数据的处理。
   - 示例：将 `process_column` 调用包裹在 `Parallel` 中处理。
2. **数据分块处理**：
   - 对大数据集按时间或行分块处理，减少内存占用。
3. **优化绘图性能**：
   - 仅在需要时生成图像，避免不必要的重复计算和绘制。

---

### **异常处理改进**
1. **文件异常**：
   - 捕获文件读取异常（如文件不存在、格式不正确），并输出清晰的错误提示。
2. **参数异常**：
   - 检查用户输入参数是否合理（如 `width_threshold` 和 `num_density` 是否超出范围）。
3. **拟合异常**：
   - 对指数拟合失败的列标记为警告，而不是终止整个脚本。
4. **日志记录**：
   - 使用 `logging` 模块记录异常信息和运行状态，方便调试。

---

### **模块化设计**
1. **功能拆分**：
   - 将指数拟合、峰值检测和绘图功能提取为独立模块或类。
   - 示例：创建 `FittingModule` 和 `PeakDetectionModule` 两个独立模块。
2. **增加复用性**：
   - 将通用功能（如数据清洗、异常处理）设计为工具函数，支持其他项目复用。
3. **测试覆盖**：
   - 针对模块化后的代码，编写单元测试以覆盖所有主要功能。

---

### **用户体验改进**
1. **增强可视化**：
   - 添加更多图表交互性（如 `plotly` 支持放大和查看详细数据点）。
   - 在分析图中添加标注和颜色区分，提高可读性。
2. **动态运行反馈**：
   - 在处理大数据时，显示每个列的处理进度（如使用 `tqdm` 模块）。
3. **错误修复提示**：
   - 如果出现错误，提供修复建议，而不仅仅是报错信息。

---

### **数据格式扩展**
1. **支持更多数据格式**：
   - 增加对 CSV、Excel 和 HDF5 等数据格式的支持。
2. **灵活数据输入**：
   - 让用户指定数据格式和列名，而不是强制依赖固定命名规则。

---

### **代码维护与扩展**
1. **添加注释和文档**：
   - 为每个函数添加详细注释，包括参数解释和返回值说明。
   - 提供完整的用户文档，指导如何使用脚本及调整参数。
2. **代码版本控制**：
   - 使用 `git` 或其他版本控制工具，管理代码的修改历史。
3. **持续集成**：
   - 引入自动化测试（如 `GitHub Actions`），确保代码修改不会引入新问题。

---

### **总结**
通过以上优化措施，代码的可移植性、性能、可读性和用户体验将大大提升，同时为未来的功能扩展奠定基础。如果有任何具体问题，欢迎进一步讨论！

In [7]:
import os
import sys
import yaml
import json
import logging
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from joblib import Parallel, delayed
from scipy.optimize import curve_fit
from scipy.optimize import minimize
from scipy.signal import find_peaks
from scipy.ndimage import gaussian_filter1d
import argparse

# -------------------------- 配置模块 -------------------------- #

In [8]:
# -------------------------- 配置模块 -------------------------- #
def load_config(config_file):
    """
    加载配置文件（支持 JSON 和 YAML）。
    
    参数:
    - config_file (str): 配置文件的路径。
    
    返回:
    - dict: 包含配置参数的字典。
    """
    _, ext = os.path.splitext(config_file)
    try:
        with open(config_file, 'r') as file:
            if ext == '.json':
                return json.load(file)
            elif ext in ('.yaml', '.yml'):
                return yaml.safe_load(file)
            else:
                raise ValueError("不支持的配置文件格式，请使用 JSON 或 YAML。")
    except FileNotFoundError:
        raise FileNotFoundError(f"配置文件 {config_file} 不存在，请检查路径。")
    except (json.JSONDecodeError, yaml.YAMLError):
        raise ValueError(f"配置文件 {config_file} 格式错误，无法解析。")

In [None]:
#测试
# config = load_config("/home/jhzhai/Nucleation/peek-find/config.yaml")  # 或者 "config.json"
config = load_config("C:\\Users\\husky\\Desktop\\peek-find\\PEM4Fe\\data\\config.yaml")  # 或者 "config.json"
print(config)

In [10]:
def parse_arguments(args=None):
    """
    解析命令行参数，支持 Jupyter Notebook。
    
    参数:
    - args (list): 自定义参数列表（默认为 None，使用 sys.argv）。
    
    返回:
    - argparse.Namespace: 包含解析后的参数对象。
    """
    parser = argparse.ArgumentParser(description="解析脚本运行参数")
    
    # 添加参数
    parser.add_argument("--config", type=str, default="config.json", help="配置文件的路径（默认为 config.json）")
    parser.add_argument("--input_dir", type=str, help="输入数据文件的目录，覆盖配置文件中的 input_dir 参数")
    parser.add_argument("--output_dir", type=str, help="输出结果保存的目录，覆盖配置文件中的 output_dir 参数")
    parser.add_argument("--Nsc", type=float, help="Nsc 参数，覆盖配置文件中的 Nsc 值")
    parser.add_argument("--N0", type=float, help="N0 参数，覆盖配置文件中的 N0 值")
    parser.add_argument("--pressure", type=float, help="pressure 参数，覆盖配置文件中的 pressure 值")
    
    # 如果在 Jupyter Notebook 中运行，传入自定义参数
    if args is None:
        args = sys.argv[1:]  # 默认从命令行读取参数

    # 覆写配置文件并保存yaml文件
    args = parser.parse_args(args)
    if args.input_dir:
        config["input_dir"] = args.input_dir
    if args.output_dir:
        config["output_dir"] = args.output_dir
    if args.Nsc:
        config["Nsc"] = args.Nsc
    if args.N0:
        config["N0"] = args.N0
    if args.pressure:
        config["pressure"] = args.pressure
    # with open(r"C:\Users\husky\Desktop\peek-find\PEM4Fe\data\config.yaml", "w") as f:
    #     yaml.dump(config, f)

    
    return args

In [None]:
#测试
if __name__ == "__main__":
    # 自定义参数列表（模拟命令行）
    # notebook_args = [
    #     "--config", "config.json",
    #     "--input_dir", "/home/jhzhai/Nucleation/peek-find/cool700/nc2600",
    #     "--output_dir", "/home/jhzhai/Nucleation/peek-find/cool700/nc2600",
    #     "--Nsc", "100.0",
    #     "--N0", "50.0"
    # ]
    notebook_args = [
        "--config", "config.json",
        "--pressure","360",
        "--Nsc", "3900.0",
        "--N0", "2600.0",
        

    ]
    args = parse_arguments(notebook_args)
    print(args)

In [12]:
input_dir=args.input_dir
output_dir=args.output_dir
Press=args.pressure

input_dir=config['input_dir']
output_dir=config['output_dir']
Press=config['pressure']


In [13]:
def setup_logging(output_dir="./logs", log_file="app.log", level=logging.INFO):
    """
    设置日志记录。
    
    参数:
    - output_dir (str): 日志文件保存的目录，默认是 "./logs"。
    - log_file (str): 日志文件名，默认保存为 "app.log"。
    - level (int): 日志级别，默认是 INFO。
    
    返回:
    - None
    """
    # 确保日志目录存在
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # 完整日志文件路径
    log_file_path = os.path.join(output_dir, log_file)

    # 清理旧的日志配置
    for handler in logging.root.handlers[:]:
        logging.root.removeHandler(handler)

    # 配置日志格式
    log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

    # 配置日志记录
    logging.basicConfig(
        level=level,  # 设置日志级别
        format=log_format,  # 设置日志格式
        handlers=[
            logging.FileHandler(log_file_path),  # 将日志写入文件
            logging.StreamHandler()             # 同时输出到控制台
        ]
    )

    # 测试日志
    logging.info(f"日志文件保存路径: {log_file_path}")

In [None]:
#测试
if __name__ == "__main__":
    # 设置日志保存目录和文件名
    log_file = "application.log"
    
    # 调用日志设置函数
    setup_logging(output_dir=output_dir, log_file=log_file, level=logging.DEBUG)

    # 测试日志输出
    logging.debug("DEBUG log test")
    logging.info("INFO log test")
    logging.warning("WARNING log test")
    logging.error("ERROR log test")
    logging.critical("CRITICAL log test")


# -------------------------- 数据处理模块 -------------------------- #

In [61]:
def collect_data_from_files(directory):
    """
    从指定目录中收集所有符合命名规则的文本文件，并合并为一个 DataFrame。
    
    参数:
    - directory (str): 数据文件所在目录路径。
    
    返回:
    - pd.DataFrame: 合并后的数据框，包含时间列 't' 和多个 'N_' 列。
    """
    logging.info(f"开始从目录 {directory} 中收集数据文件...")
    combined_df = pd.DataFrame()
    file_count = 0

    if not os.path.exists(directory):
        logging.error(f"指定的目录 {directory} 不存在。")
        return combined_df

    for filename in os.listdir(directory):
        if filename.startswith('pem-Nc-') and filename.endswith('.txt'):
            filepath = os.path.join(directory, filename)
            try:
                suffix = filename.split('-')[-1].split('.')[0]
                # 使用原始字符串避免转义警告
                df = pd.read_csv(filepath, sep=r'\s+', names=['t', f'N_{suffix}'])
                if combined_df.empty:
                    combined_df = df
                else:
                    combined_df = pd.merge(combined_df, df, on='t', how='outer')
                file_count += 1
                logging.info(f"成功读取文件: {filepath}")
            except Exception as e:
                logging.warning(f"读取文件 {filepath} 失败: {e}")

    if file_count == 0:
        logging.warning(f"目录 {directory} 中没有找到符合命名规则的文件。")
    else:
        logging.info(f"共处理了 {file_count} 个文件。")

    return combined_df

def clean_data(t, N):
    """
    清洗数据，去除无效值（NaN 和无穷值）。
    
    参数:
    - t (pd.Series): 时间序列。
    - N (pd.Series): 数据序列。
    
    返回:
    - pd.Series: 清洗后的时间序列。
    - pd.Series: 清洗后的数据序列。
    """
    logging.info("开始清洗数据...")
    initial_length = len(N)

    # 去除无效值
    clean_mask = np.isfinite(N)
    t_clean = t[clean_mask]
    N_clean = N[clean_mask]

    # 重置索引
    t_clean = t_clean.reset_index(drop=True)
    N_clean = N_clean.reset_index(drop=True)

    cleaned_length = len(N_clean)
    logging.info(f"数据清洗完成：原始数据点 {initial_length} 个，清洗后数据点 {cleaned_length} 个。")
    
    return t_clean, N_clean

def load_clean_save_data(directory_path, output_csv_path):
    """
    加载、清洗并保存数据。
    
    参数:
    - directory_path (str): 数据文件所在目录路径。
    - output_csv_path (str): 保存合并后数据的 CSV 文件名。
    
    返回:
    - pd.DataFrame: 清洗后的数据框。
    - float: 从目录名中提取的 nc 值。
    """
    # 提取目录名中的 nc 值
    base_dir = os.path.basename(directory_path)
    try:
        nc_str = [part for part in base_dir.split('/') if 'nc' in part][0]
        nc = float(nc_str.replace('nc', ''))
    except (IndexError, ValueError):
        logging.error("无法从目录名中提取 'nc' 值，请确保目录名包含 'nc' 后跟数值，例如 'nc1000'")
        nc = None
    
    collected_data = collect_data_from_files(directory_path)

    output_csv_path=output_csv_path+'\pem_N.csv'
    save_to_csv(collected_data, os.path.join(directory_path, output_csv_path))
    logging.info(f"nc={nc}已处理完毕")
    df = pd.read_csv(os.path.join(directory_path, output_csv_path))
    return df, nc

def save_to_csv(dataframe, output_file):
    """
    将 DataFrame 保存为 CSV 文件。
    
    参数:
    - dataframe (pd.DataFrame): 要保存的数据框。
    - output_file (str): 输出 CSV 文件路径。
    """
    dataframe.to_csv(output_file, index=False)
    logging.info(f"数据成功保存到 {output_file}")


In [None]:
#测试
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
    collected_data,nc = load_clean_save_data(input_dir,output_dir)

# -------------------------- 拟合模块 -------------------------- #

In [208]:
# -------------------------- 拟合模块 -------------------------- #
# src/fitting.py

class FittingModule:
    """指数拟合模块"""
    
    def __init__(self, logger=None, config=None):
        """
        初始化拟合模块。
        
        参数:
        - logger (logging.Logger): 可选的日志记录器。如果未提供，将使用根日志记录器。
        - config (dict): 配置字典，包含拟合参数。
        """
        self.logger = logger or logging.getLogger(__name__)
        self.config = config or {}
        self.initial_fit_points = self.config.get('initial_fit_points', 10)
        self.min_r_squared = self.config.get('min_r_squared', 0.95)
        self.max_iterations = self.config.get('max_iterations', 10)
        self.output_dir = self.config.get('output_dir', './output')
        
        logging.info("拟合模块初始化完成, output_dir: %s", self.output_dir)
        
        # 创建输出目录的子文件夹
        self.init_output_dir = os.path.join(self.output_dir, "0_initial")
        os.makedirs(self.init_output_dir, exist_ok=True)
        self.fit_output_dir = os.path.join(self.output_dir, "1_exponential_fit")
        os.makedirs(self.fit_output_dir, exist_ok=True)
    
    def exponential_func(self, t, a, b, c, d):
        """
        定义拟合函数：a * exp(b * (t - c)) + d * t + e
        
        参数:
        - t (array-like): 自变量数据。
        - a, b, c, d, e (float): 拟合参数。
        
        返回:
        - array-like: 函数值。
        """
        return a * np.exp(b * (t - c)**(3/2)) + d
    
    def line_exponential_func(self, t, a, b ,c, d, e, f):
        '''
        定义分段拟合函数：a * exp(b * (t - c)) + d * t
        
        参数:
        - t (array-like): 自变量数据。
        - a, b, c, d, e (float): 拟合参数。
        
        返回:
        - array-like: 函数值。
        '''
        # 找到nb.abs(t-c)最小值的位置
        index = np.argmin(np.abs(t-f))
        N1 = e*np.ones_like(t[:index])
        N2 = a * np.exp(b * (t[index:] - c)**(3/2)) + d
        return np.concatenate((N1,N2))
    

    def find_line(self, new_popt, t_fit, N_fit, new_start_idx, new_end_idx):
        """
        优化拟合参数c和e，使得拟合的R²尽可能大。
        
        参数:
        - new_popt (list): 当前拟合的参数，包含 a, b, c, d, e。
        - new_r_squared (float): 当前拟合的 R² 值。
        - new_start_idx (int): 拟合数据的起始索引。
        - new_end_idx (int): 拟合数据的结束索引。
        
        返回:
        - final_popt (list): 优化后的拟合参数，包含 a, b, c, d, e。
        - final_r_squared (float): 优化后的 R² 值。
        """
        
        # 提取当前拟合参数
        a, b, c, d= new_popt
        e = 0.9999*d
        f = 1.0001*c
        # 优化目标函数，最大化 R²
        def objective(params):
            # 固定 a, b, c, d, 只优化 e 和 f
            e, f = params
            
            # 使用 line_exponential_func 进行拟合
            fitted_N = self.line_exponential_func(t_fit, a, b, c, d, e, f)
            index = np.argmin(np.abs(t_fit-f))
            gap = np.abs(a * np.exp(b * (t_fit[index] - c)**(3/2)) + d - e)

            # 计算拟合残差
            residuals = fitted_N - N_fit
            ss_res = np.sum(residuals**2)
            ss_tot = np.sum((N_fit - np.mean(N_fit))**2)
            
            # 计算 R² 值
            r_squared = 1 - (ss_res / ss_tot)
            
            self.logger.info(f"优化中：参数={params}, R²={r_squared:.4f}")
            
            # 目标是最大化 R²，最小化负的 R²
            return -r_squared*(f/t_fit[-1])*(1-gap/(np.max(N_fit)-np.min(N_fit))) if r_squared >= self.min_r_squared else 0

        # 使用 minimize 优化 c 和 e
        result = minimize(objective, x0=[e, f], bounds=[(0, np.inf), (t_fit[0], t_fit[-1])], method='L-BFGS-B')
        
        # 获取优化后的参数
        optimized_e, optimized_f = result.x
        final_popt = [a, b, c, d, optimized_e, optimized_f]

        # 计算最终的 R²
        fitted_N = self.line_exponential_func(t_fit, *final_popt)

        # 计算拟合残差
        residuals = fitted_N - N_fit
        ss_res = np.sum(residuals**2)
        ss_tot = np.sum((N_fit - np.mean(N_fit))**2)
        final_r_squared = 1 - (ss_res / ss_tot)

        self.logger.debug(f"最后拟合结果：参数={final_popt}, R²={final_r_squared:.4f}")
        return final_popt, final_r_squared, np.argmin(np.abs(t_fit-f)), new_end_idx


    def fit_dynamic(self, t, N, index, initial_fit_points=None, min_r_squared=None, max_iterations=None):
        """
        动态进行指数拟合，逐步扩大拟合范围直到满足置信度要求或达到最大迭代次数。
        
        参数:
        - t (pd.Series or np.ndarray): 时间序列。
        - N (pd.Series or np.ndarray): 数据序列。
        - initial_fit_points (int, optional): 初始拟合的数据点数。
        - min_r_squared (float, optional): 最小的 R² 值要求。
        - max_iterations (int, optional): 最大拟合尝试次数，防止无限循环。
        
        返回:
        - dict or None: 包含拟合参数和 R² 值的字典，若拟合失败则返回 None。
        """
        # 使用传入的参数或配置中的参数
        initial_fit_points = initial_fit_points or self.initial_fit_points
        min_r_squared = min_r_squared or self.min_r_squared
        max_iterations = max_iterations or self.max_iterations
        
        self.logger.info("开始动态指数拟合...\n - 初始拟合点数: %d  - 最小 R²: %.2f  - 最大迭代次数: %d", initial_fit_points, min_r_squared, max_iterations)
        
        # 确保输入为 numpy 数组
        t = np.array(t)
        N = np.array(N)
        self.plot_source(t, N, index)

        total_points = len(t)
        if total_points < initial_fit_points:
            self.logger.error(f"数据点不足，无法进行拟合。需要至少 {initial_fit_points} 个点，当前有 {total_points} 个点。")
            return None
        
        # 初始拟合范围：最后 initial_fit_points 个点
        start_idx = total_points - initial_fit_points
        end_idx = total_points
        new_start_idx = start_idx
        new_end_idx = end_idx

        iteration = 0
        new_popt = None
        new_r_squared = -np.inf
        new_t_fit=t

        
        while iteration < max_iterations and start_idx >= 0:
            self.logger.debug(f"拟合迭代 {iteration + 1}: 使用数据点 {start_idx} 到 {end_idx}（共 {end_idx - start_idx} 个点）")
            t_fit = t[start_idx:end_idx]
            N_fit = N[start_idx:end_idx]
            
            try:
                # 初始参数猜测
                initial_params = [1.0, 0.1, t_fit[0], np.min(N_fit)]
                # 参数边界
                bounds = ([0, 0, 0, 0], [np.inf, np.inf, np.min(t_fit), np.max(N_fit)])

                # # 初始参数猜测
                # initial_params = [np.min(N_fit), 1, 1, np.min(t_fit)]
                # # 参数边界
                # bounds = ([0, 0, 0, 0], [np.max(N_fit), np.inf, np.inf, np.min(t_fit)])
                
                popt, pcov = curve_fit(
                    self.exponential_func, 
                    t_fit, 
                    N_fit, 
                    p0=initial_params, 
                    bounds=bounds, 
                    maxfev=10000
                )
                
                # 计算拟合结果
                residuals = N_fit - self.exponential_func(t_fit, *popt)
                ss_res = np.sum(residuals**2)
                ss_tot = np.sum((N_fit - np.mean(N_fit))**2)
                r_squared = 1 - (ss_res / ss_tot)
                
                self.logger.debug(f"拟合结果：参数={popt}, R²={r_squared:.4f}")
                
                # 检查 R² 是否达到要求
                if r_squared < min_r_squared:
                    self.logger.info(f"拟合成功，R²={new_r_squared:.4f}，使用数据点 {new_start_idx} 到 {new_end_idx}")
                    final_popt, final_r_squared, final_start_idx, final_end_idx= self.find_line(new_popt, t_fit, N_fit, new_start_idx, new_end_idx)
                    # 绘制并保存拟合结果
                    self.plot_fit(t_fit, N_fit, new_t_fit, index, final_popt, final_r_squared, final_start_idx, final_end_idx)
                    return {
                        "index": index,
                        'params': final_popt,
                        'r_squared': final_r_squared,
                        'start_idx': final_start_idx,
                        'end_idx': final_end_idx
                    }
                else:
                    # 保存最佳拟合结果
                    new_popt = popt
                    new_r_squared = r_squared
                    new_start_idx = start_idx
                    new_end_idx = end_idx
                    new_t_fit = t_fit
                
                # 扩大拟合范围
                start_idx = max(0, start_idx - initial_fit_points)
                iteration += 1
                
            except RuntimeError as e:
                self.logger.warning(f"拟合失败在数据点 {start_idx} 到 {end_idx}: {e}")
                # 扩大拟合范围继续尝试
                start_idx = max(0, start_idx - initial_fit_points)
                iteration += 1
            except Exception as e:
                self.logger.error(f"拟合过程中发生未预料的错误: {e}")
                return None
        
        # 如果未达到要求，返回最佳结果并绘图
        if new_r_squared >= min_r_squared:
            self.logger.info(f"拟合成功，R²={r_squared:.4f}，使用数据点 {start_idx} 到 {end_idx}")
            final_popt, final_r_squared, final_start_idx, final_end_idx= self.find_line(new_popt, t_fit, N_fit, new_start_idx, new_end_idx)
            # 绘制并保存拟合结果
            self.plot_fit(t_fit, N_fit, new_t_fit, index, final_popt, final_r_squared, final_start_idx, final_end_idx)
            return {
                "index": index,
                'params': final_popt,
                'r_squared': final_r_squared,
                'start_idx': final_start_idx,
                'end_idx': final_end_idx
            }
        elif iteration >= max_iterations:
            self.logger.error("超过限制次数，所有拟合尝试均失败。")
            return None
    
    def plot_fit(self, t_fit, N_fit, new_t_fit, index, popt, r_squared, start_idx, end_idx):
        """
        绘制拟合曲线并保存图像。
        
        参数:
        - t_fit (np.ndarray): 拟合的时间序列。
        - N_fit (np.ndarray): 拟合的数据序列。
        - popt (list): 拟合参数。
        - r_squared (float): 拟合的 R² 值。
        - start_idx (int): 拟合的起始索引。
        - end_idx (int): 拟合的结束索引。
        
        返回:
        - None
        """
        plt.figure(figsize=(10, 6))
        plt.scatter(t_fit, N_fit, label='Data', color='blue')
        fitted_N = self.line_exponential_func(new_t_fit, *popt)
        plt.plot(new_t_fit, fitted_N, label=f'Fit (R²={r_squared:.4f})', color='red')
        plt.vlines(popt[-1], np.min(N_fit), np.max(N_fit), colors='green', linestyles='dashed', label='Increase Parse')
        plt.title(f'N{index} Exponential Fit: Data points {start_idx} to {end_idx}')
        plt.xlabel('Time(ps)')
        plt.ylabel('N')
        plt.legend()
        plt.grid(True)
        
        # 定义保存路径
        plot_filename = f"1_exponential_fit\{index}-{start_idx}_{end_idx}.png"
        print(plot_filename)
        plot_path = os.path.join(self.output_dir, plot_filename)
        plt.savefig(plot_path)
        plt.close()
        self.logger.info(f"拟合图已保存到 {plot_path}")

    def plot_source(self, t, N, index):
        """
        绘制原始数据并保存图像。
        
        参数:
        - t (np.ndarray): 时间序列。
        - N (np.ndarray): 数据序列。
        - index (int): 数据索引。
        
        返回:
        - None
        """
        plt.figure(figsize=(10, 3))
        plt.plot(t, N, "-o", label='Data', color='blue')
        plt.title(f'N{index} Source Data')
        plt.xlabel('Time(ps)')
        plt.ylabel('N')
        plt.legend()
        plt.grid(True)
        
        # 定义保存路径
        plot_filename = f"0_initial\{index}-source.png"
        plot_path = os.path.join(self.output_dir, plot_filename)
        plt.savefig(plot_path)
        plt.close()
        self.logger.info(f"原始数据图已保存到 {plot_path}")


In [None]:
if __name__ == "__main__":
    # 配置日志记录（如上）
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s",
        handlers=[
            logging.StreamHandler()           # 同时输出到控制台
        ]
    )
    # 初始化 FittingModule 实例
    config = load_config("C:\\Users\\husky\\Desktop\\peek-find\\PEM4Fe\\data\\config.yaml")  # 或者 "config.json"
    fitting_module = FittingModule(config=config)
    logging.info("FittingModule 实例已初始化。")

    # 提取时间序列和数据序列
    t = collected_data['t']

    for col in collected_data.columns[1:]:  # 从第2列到结束
        N = collected_data[col]  # 确保 'N' 列名正确
        index = str(col)
        logging.info(f"开始执行动态指数拟合：{col}...")
        # 去除无效值
        t_clean, N_clean = clean_data(t, N)
        fit_result = fitting_module.fit_dynamic(t_clean, N_clean, index)
        
        if fit_result:
            # print(fit_result)
            params = fit_result['params']
            r_squared = fit_result['r_squared']
            logging.info(f"拟合成功。R² = {r_squared:.4f}")
            logging.info(f"拟合参数: a = {params[0]:.4f}, b = {params[1]:.4f}, c = {params[2]:.4f}, d = {params[3]:.4f}, e = {params[4]:.4f}, f = {params[5]:.4f}")
        else:
            logging.error("拟合失败。")
        logging.info("--------------------------------------------------")

# -------------------------- 峰值检测模块 -------------------------- #

In [241]:
import numpy as np
import logging
import os
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
from scipy.ndimage import gaussian_filter1d

class PeakDetectionModule:
    """峰值检测模块"""
    
    def __init__(self, config=None, logger=None):
        """
        初始化拟合模块。
        
        参数:
        - config (dict): 配置字典，包含拟合参数。
        - logger (logging.Logger): 可选的日志记录器。如果未提供，将使用根日志记录器。
        """
        self.logger = logger or logging.getLogger(__name__)
        self.config = config or {}
        
        # 提取配置中的参数
        self.Nsc = self.config.get('Nsc', 1000)
        self.N0 = self.config.get('N0', 0)

        self.width_threshold = self.config.get('width_threshold', 5)
        self.plateau_size = self.config.get('plateau_size', 1)
        self.num_density = self.config.get('num_density', -1)

        self.window_size = self.config.get('window_size', 5)
        self.tolerance_ratio = self.config.get('tolerance_ratio', 0.1)

        self.output_dir = self.config.get('output_dir', './output')
        
        # 创建输出目录的子文件夹
        self.plot_output_dir = os.path.join(self.output_dir, "2_peak_detection_plots")
        os.makedirs(self.plot_output_dir, exist_ok=True)
        
        self.logger.info("峰值检测模块初始化完成, output_dir: %s", self.output_dir)

    def detect_peaks(self, N, column_name, Nsc=None, width_threshold=None):
        """
        检测数据中的峰值。
        
        参数:
        - N (array-like): 数据序列。
        - Nsc (float, optional): 峰值的阈值。若未提供，则使用 config 中的 Nsc。
        - width_threshold (int, optional): 峰的最小宽度（数据点数量）。若未提供，则使用 config 中的值。
        
        返回:
        - list: 包含每个峰的信息的列表，每个元素是一个字典，包含峰的范围、宽度和均值。
        """
        Nsc = Nsc or self.Nsc
        width_threshold = width_threshold or self.width_threshold
        plateau_size=self.plateau_size

        self.logger.info("开始检测峰值...")

        # 寻找峰值
        peaks, properties = find_peaks(N, height=self.Nsc, width=width_threshold, plateau_size=plateau_size)
        peaks_info = []

        for i, peak in enumerate(peaks):
            width = int(properties['widths'][i])
            if width >= width_threshold:
                left_ips = int(properties['left_ips'][i])
                right_ips = int(properties['right_ips'][i])
                peak_range = range(left_ips, right_ips + 1)
                N_peak = N[peak_range]
                peak_mean = np.mean(N_peak)
                self.logger.debug(f"峰值 {peak}: left={left_ips}, right={right_ips}, width={width}, mean={peak_mean:.4f}")
                if peak_mean > Nsc:
                    peaks_info.append({
                        'column': column_name,
                        'peak_index': peak,
                        'left': left_ips,
                        'right': right_ips,
                        'width': width,
                        'mean': peak_mean,
                        'indices': peak_range
                    })

        self.logger.info(f"检测到 {len(peaks_info)} 个峰值。")
        return peaks_info

    def plot_peaks(self, ax, t, N, peaks_info, column_name):
        """
        绘制峰值检测结果，并保存图像。
        
        参数:
        - ax (matplotlib.axes._axes.Axes): 子图对象，用于绘制数据。
        - t (array-like): 时间序列。
        - N (array-like): 数据序列。
        - peaks_info (list): 包含峰值信息的列表。
        - column_name (str): 数据列名。
        """
        self.logger.info(f"绘制峰值检测图 for {column_name}...")
        
        ax.plot(t, N, label="Original Data", color="blue")
        
        # 绘制峰值
        for peak_info in peaks_info:
            peak_t = t[peak_info['indices']]
            peak_N = N[peak_info['indices']]
            ax.plot(peak_t, peak_N, color="red")
            peak_left = peak_info['left']
            peak_right = peak_info['right']
            ax.fill_between(t[peak_left:peak_right + 1], N[peak_left:peak_right + 1], self.N0, alpha=0.5)
            ax.hlines(peak_info['mean'], t[peak_left], t[peak_right], color="r", linestyle="--")
        

    def plot_density_analysis(self, ax, ax_twin, N, peaks_info, column_name, num_density=-1):
        """
        绘制右侧子图，显示峰值密集度分析和直方图。
        
        参数:
        - ax (matplotlib.axes._axes.Axes): 子图对象，用于绘制密集区域分析。
        - ax_twin (matplotlib.axes._axes.Axes): 共享 y 轴的子图对象，用于绘制直方图和密度曲线。
        - N (pd.Series): 数据序列。
        - Nsc (float): 峰值的阈值。
        - N0 (float): 数据的基准值。
        - column_name (str): 数据列名。
        - directory_path (str): 保存结果的目录路径。
        - num_density (int): 检测的密集区域峰值数量，默认 -1 表示检测所有峰值。
        
        返回:
        - list: 包含密集区域峰值信息的列表。
        """
        self.logger.info(f"开始绘制密集区域分析图 for {column_name}...")
        widths = [info['width'] for info in peaks_info]
        means = [info['mean'] for info in peaks_info]

        Nsc, N0 = self.Nsc, self.N0
        
        if means:
            N_cut = np.max(means)
        else:
            N_cut = Nsc
        ymax = N.max()
        
        if widths and means:
            ax.scatter(widths, means, color='blue')

        # 设置 ax 的 y 轴范围
        ax.set_ylim(Nsc, ymax)
        ax.set_title(f"{column_name} Density Analysis")
        
        # 绘制原始直方图
        ax_twin.hist(N[N >= N_cut], bins=2000, orientation='horizontal', alpha=0.6, color='gray', label='(N | N >= Nsc) Distribution')

        # 计算直方图数据
        N_smooth = N[(N >= N_cut) & (N <= ymax)]
        counts, bin_edges = np.histogram(N_smooth, bins=1000)
        bin_centers = 0.5 * (bin_edges[1:] + bin_edges[:-1])

        # 对直方图数据进行高斯平滑
        sigma = 0.5  # 控制平滑强度
        smoothed_counts = gaussian_filter1d(counts[:np.nonzero(counts)[0][-1] + 1], sigma=sigma)

        # 绘制高斯平滑曲线
        ax_twin.plot(smoothed_counts, bin_centers, color='red', linestyle='-', linewidth=0.5, label='Smoothed Density', alpha=0.8)
        
        # 峰值检测
        peak_indices, properties = find_peaks(smoothed_counts, height=0, plateau_size=0, width=0)

        density_info = []

        if peak_indices.size != 0:
            up = np.array(properties["plateau_sizes"])
            down = np.array(properties["width_heights"])
            high = np.array(properties["peak_heights"])
            sorted_peak_indices = peak_indices[np.argsort((up + down) * 0.5 * high)][::-1]

            # 确保 num_density 不超过 sorted_peak_indices 的长度
            num_density = min(num_density, len(sorted_peak_indices))

            density_place = bin_centers[sorted_peak_indices[:num_density]]
            ax_twin.scatter(smoothed_counts[sorted_peak_indices[:num_density]], bin_centers[sorted_peak_indices[:num_density]], 
                            color='green', label='Density Peaks', marker='x')

            for i, N_forecast in enumerate(density_place):
                density_info.append({
                    'peak_index': i,
                    'forecast': N_forecast,
                    'smoothed_count': smoothed_counts[sorted_peak_indices[i]],
                })
        
        # 添加图例
        ax_twin.legend(fontsize=8)
        
        return density_info
    
    def find_nearest_interval(self, ax, t, N, N_forecast, window_size=None, tolerance_ratio=None):
        """
        寻找 N 中接近 N_forecast 的所有区段，返回多个区段的起止点。

        参数：
        - ax (matplotlib.axes._axes.Axes): 绘图的子图，用于标记结果。
        - t (array-like): 时间序列。
        - N (array-like): 实际数据数组。
        - N_forecast (float): 目标值，寻找接近该值的多个区段。
        - window_size (int): 平滑窗口大小，默认为 5。
        - tolerance_ratio (float): 容差范围比例，默认为 0.01。

        返回：
        - list of dict: 每个区段的信息，包括起止点、均值等。
        """
        window_size = self.window_size or window_size
        tolerance_ratio = self.tolerance_ratio or tolerance_ratio

        # Step 0: 归一化
        N = np.array(N)
        min_N = N.min()
        max_N = N.max()
        Nsc = (self.Nsc - N.min()) / (N.max() - N.min())  # 归一化 Nsc
        N_forecast = (N_forecast - N.min()) / (N.max() - N.min())  # 归一化 N_forecast
        N = (N - N.min()) / (N.max() - N.min())

        tolerance = tolerance_ratio * N_forecast  # 容差范围

        # Step 1: 平滑数据
        N_smooth = pd.Series(N).rolling(window=window_size, center=True).mean().fillna(0).to_numpy()

        # Step 2: 过滤小于 Nsc 的值
        valid_indices = N > Nsc
        N_smooth[~valid_indices] = 0

        # Step 3: 找到所有满足条件的索引
        valid_mask = np.abs(N_smooth - N_forecast) <= tolerance
        valid_indices = np.where(valid_mask)[0]  # 获取满足条件的索引
        self.logger.debug(f"目标 {N_forecast*(max_N-min_N)+min_N}，找到 {len(valid_indices)} 个满足条件的索引")

        if len(valid_indices) == 0:
            self.logger.warning(f"未找到满足条件的区段")
            return None, None, None
        
        # Step 4: 合并连续的有效索引为区段
        intervals = []
        if len(valid_indices) > 0:
            start_idx = valid_indices[0]
            for i in range(1, len(valid_indices)):
                # 判断是否连续
                if valid_indices[i] != valid_indices[i - 1] + 1:
                    # 新区段开始
                    end_idx = valid_indices[i - 1]
                    intervals.append((start_idx, end_idx))
                    start_idx = valid_indices[i]
            # 添加最后一个区段
            intervals.append((start_idx, valid_indices[-1]))

        intervals.sort(key=lambda x: x[0])
        merged_intervals = []
        for current in intervals:
            # 如果 merged_intervals 为空，或者当前区间与最后一个区间不重叠，则加入结果
            if not merged_intervals or merged_intervals[-1][1] < current[0]:
                merged_intervals.append(current)
            else:
                # 否则，合并当前区间和最后一个区间
                merged_intervals[-1] = (merged_intervals[-1][0], max(merged_intervals[-1][1], current[1]))

        # Step 5: 计算每个区段的均值，并返回信息
        interval_info = []
        for start_idx, end_idx in merged_intervals:
            start_idx = start_idx - window_size // 2
            end_idx = end_idx + window_size // 2
            interval_mean = np.mean(N[start_idx:end_idx + 1]*(max_N - min_N) + min_N)
            interval_info.append({
                'start_index': start_idx,
                'end_index': end_idx,
                'mean': interval_mean,
                'std': np.std(N[start_idx:end_idx + 1]*(max_N - min_N) + min_N)
            })
            self.logger.debug(f"区段 [{start_idx}-{end_idx}] 的均值为 {interval_mean:.4f}, 标准差为 {np.std(N[start_idx:end_idx + 1]):.4f}")
            # 可视化标记区段
            ax.fill_between(t[start_idx:end_idx + 1], N[start_idx:end_idx + 1], alpha=0.3, color='green')
        
        return start_idx, end_idx, interval_mean


    def plot_plateaus(self, ax, t, N, density_info, column_name):
        '''
        绘制平台检测结果，并保存图像。

        参数:
        - ax (matplotlib.axes._axes.Axes): 子图对象，用于绘制数据。
        - t (array-like): 时间序列。
        - N (array-like): 数据序列。
        - peaks_info (list): 包含峰值信息的列表。
        - column_name (str): 数据列名。

        返回:
        - list: 包含平台信息的列表。
        '''
        self.logger.info(f"绘制平台检测图 for {column_name}...")

        plateau_info = []
        for i, info in enumerate(density_info):
            N_forecast = info['forecast']
            start_index, end_index, mean= self.find_nearest_interval(ax, t, N, N_forecast)
            if start_index is None or end_index is None:
                continue
            plateau_t = t[start_index:end_index + 1]
            plateau_N = N[start_index:end_index + 1]
            ax.plot(plateau_t, plateau_N, color="green")
            ax.fill_between(plateau_t, plateau_N, np.max(N), alpha=0.3, color='black')
            plateau_info.append({
                "column": column_name,
                "density_index": i,
                "left": start_index,
                "right": end_index,
                "width": end_index - start_index + 1,
                "mean": N_forecast,
                "indices": range(start_index, end_index + 1)
            })

        ax.set_title(f"Peak Detection for {column_name}")
        ax.set_ylim(self.N0*0.9, np.max(N)*1.1)
        ax.set_xlabel("Time(ps)")
        ax.set_ylabel("N")
        ax.legend()
        ax.grid(True)

        return plateau_info


    def plot_analysis_figure(self, t, N, peaks_info, column_name, num_density=None):
        """
        绘制分析图，有两个子图。

        参数:
        - t (pd.Series): 时间序列。
        - N (pd.Series): 数据序列。
        - peaks_info (list): 峰值信息列表。
        - Nsc (float): Nsc 值。
        - N0 (float): 数据的基准值。
        - column_name (str): 数据列名。
        - directory_path (str): 保存结果的目录路径。
        - num_density (int): 检测的密集区域峰值数量，默认 -1 表示检测所有峰值。
        
        返回:
        - tuple: (density_info, N_star)
        """
        N0 = self.N0 or N.min()
        Nsc = self.Nsc or N.mean()
        num_density = self.num_density or -1

        fig, (ax1, ax2) = plt.subplots(1, 2, gridspec_kw={'width_ratios': [5, 1]}, figsize=(14, 6), dpi=300)
        ax2_twin = ax2.twiny()  # 为右侧子图设置共享轴

        # 绘制左侧子图 ax1
        self.plot_peaks(ax1, t, N, peaks_info, column_name)

        # 绘制右侧子图 ax2
        density_info = self.plot_density_analysis(ax2, ax2_twin, N, peaks_info, column_name, num_density)

        plateaus_info = self.plot_plateaus(ax1, t, N, density_info, column_name)

        # 设置整体标题
        plt.suptitle(f'{column_name} Analysis')
        plt.tight_layout()
        plt.subplots_adjust(top=0.88)
        plot_file = os.path.join(self.plot_output_dir, f"{column_name}_peaks.png")
        plt.savefig(plot_file)
        plt.close()
        self.logger.info(f"峰值图已保存到 {plot_file}")

        return plateaus_info


In [None]:
if __name__ == "__main__":
    # 配置日志记录（如上）
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s",
        handlers=[
            logging.StreamHandler()           # 同时输出到控制台
        ]
    )
    # 初始化 FittingModule 实例
    config = load_config("C:\\Users\\husky\\Desktop\\peek-find\\PEM4Fe\\data\\config.yaml")  # 或者 "config.json"
    peak_detection_module = PeakDetectionModule(config=config)
    logging.info("PeakDetectionModule 实例已初始化。")

    # 加载数据
    t = collected_data['t']
    Peak = []
    Plateaus = []
    
    # 对每一列数据执行峰值检测
    for col in collected_data.columns[1:]:  # 从第2列到结束
        N = collected_data[col]  # 确保 'N' 列名正确
        index = str(col)
        logging.info(f"开始执行峰值检测：{col}...")

        # 去除无效值
        t_clean, N_clean = clean_data(t, N)
        
        # 执行峰值检测
        peaks_info = peak_detection_module.detect_peaks(N_clean,index)
        
        # 打印检测结果
        if peaks_info:
            logging.info(f"检测到 {len(peaks_info)} 个峰值。")
        else:
            logging.warning(f"{col} 没有检测到峰值。")
        
        # 绘制并保存峰值检测结果
        plateaus_info = peak_detection_module.plot_analysis_figure(t_clean, N_clean, peaks_info, index)

        Peak.append(peaks_info)
        Plateaus.append(plateaus_info)
        logging.info("--------------------------------------------------")

# -------------------------- 可视化模块 -------------------------- #

In [238]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

class MainModule:
    """主体模块"""
    def __init__(self, config=None):
        """
        初始化主体模块。
        
        参数:
        - config (dict): 配置字典，包含所需参数。
        """
        self.config = config or {}
        self.logger = logging.getLogger(__name__)
        self.Nsc = self.config.get('Nsc', 1000)
        self.N0 = self.config.get('N0', 0)
        self.output_dir = self.config.get('output_dir', './output')
        self.overall_path = os.path.join(self.output_dir, "3_overall_analysis_plots")
        os.makedirs(self.overall_path, exist_ok=True)

    def save_results_to_csv(self, Peak, Plateaus):
        """保存分析结果到 CSV 文件"""
        
        directory_path = self.overall_path

        # 构建 DataFrame 并保存为 CSV 文件
        data = []
        for peaks_info, plateaus_info in zip(Peak, Plateaus):
            # 处理峰值信息
            for peak_info in peaks_info:
                data.append({
                    'Column': peak_info['column'],
                    'Peak Index': peak_info['peak_index'],
                    'Left': peak_info['left'],
                    'Right': peak_info['right'],
                    'Mean': peak_info['mean'],
                    'Width': peak_info['width'],
                    'Choice': False,  
                    'PTM': False       
                })
            
            # 处理平台信息
            for plateau_info in plateaus_info:
                data.append({
                    'Column': plateau_info['column'],
                    'Peak Index': plateau_info['density_index'],
                    'Left': plateau_info['left'],
                    'Right': plateau_info['right'],
                    'Mean': plateau_info['mean'],
                    'Width': plateau_info['width'],
                    'Choice': False,  
                    'PTM': True       
                })
        
        # 创建 DataFrame
        df = pd.DataFrame(data)

        # 保存到 CSV 文件
        output_file = os.path.join(directory_path, 'N_star_info.csv')
        df.to_csv(output_file, index=False)
        
        self.logger.info(f"分析结果已保存到 {output_file}")

    def plot_overall_analysis(self, Peak, Plateaus):
        """
        绘制总体分析图

        参数:
        - peaks_info (list): 峰值信息列表。
        - plateaus_info (list): 平台信息列表。
        - N_star (float): 总体均值。
        """ 
        
        directory_path = self.overall_path
        fig, ax = plt.subplots(figsize=(15, 6))

        values_all = np.array([])
        columns = []
        colors = []            

        # 绘制散点图

        for i in range(len(Peak)):
            peaks_info = Peak[i]
            peek_values = [info['mean'] for info in peaks_info]
            plateaus_info = Plateaus[i]
            plateaus_values = [info['mean'] for info in plateaus_info]
    
            if len(peaks_info) != 0:
                info=peaks_info[0]
            elif len(plateaus_info) != 0:
                info=plateaus_info[0]
            else:
                continue
            column_name = info['column']
            columns.append(column_name)

            values_all = np.concatenate((values_all, peek_values, plateaus_values))
            scatter = ax.scatter(i*np.ones_like(peek_values), peek_values, marker='D',s=80)
            
            colors.append(scatter.get_facecolor()[0])  # 记录颜色
            ax.scatter(i*np.ones_like(plateaus_values), plateaus_values, facecolors='none', edgecolors=colors[i], marker='o', s=80)

            # 风琴图
            values= np.concatenate((np.array(peek_values), np.array(plateaus_values)))
            ax.violinplot(values, positions=[i], showmedians=True, showextrema=True)

        N_star = np.mean(values_all)
        ax.axhline(y=N_star, color='k', linestyle='--', label=f'N* = {N_star:.2f}')

        ax.set_xticks(range(len(Peak)))
        ax.set_xticklabels(columns, rotation=90)

        # 添加图例和标题
        ax.set_xlabel('Index')
        ax.set_ylabel('Mean Value')
        ax.set_title(f'Overall Analysis for {len(Peak)} Columns (N*={N_star}, Nsc={self.Nsc}, N0={self.N0})')
        ax.legend()

        # 保存图像
        plot_file = os.path.join(directory_path, 'Overall_analysis.png')
        plt.savefig(plot_file)
        plt.close()

        self.logger.info(f"总体分析图已保存到 {plot_file}")

    def All_PTM(self,show_output=False):
        
        log_file = "application.log"

        # 调用日志设置函数
        setup_logging(output_dir=output_dir, log_file=log_file, level=logging.DEBUG)
        if show_output:
            logging.basicConfig(
                level=logging.INFO,
                format="%(asctime)s - %(levelname)s - %(message)s",
                handlers=[
                    logging.StreamHandler()           # 同时输出到控制台
                ]
            )
        else:
            logging.basicConfig(
                level=logging.INFO,
                format="%(asctime)s - %(levelname)s - %(message)s",
            )

        self.overall_path = os.path.join(self.output_dir, "3_overall_analysis_plots")
        os.makedirs(self.overall_path, exist_ok=True)

        collected_data,nc = load_clean_save_data(input_dir,output_dir)

        # 初始化 FittingModule 实例
        config = load_config("C:\\Users\\husky\\Desktop\\peek-find\\PEM4Fe\\data\\config.yaml")  # 或者 "config.json"
        peak_detection_module = PeakDetectionModule(config=config)
        logging.info("PeakDetectionModule 实例已初始化。")

        # 加载数据
        t = collected_data['t']
        Peak = []
        Plateaus = []

        fitting_module = FittingModule(config=config)
        logging.info("FittingModule 实例已初始化。")

        for col in collected_data.columns[1:]:  # 从第2列到结束
            N = collected_data[col]  # 确保 'N' 列名正确
            index = str(col)
            logging.info(f"\n开始执行动态指数拟合：{col}...")
            # 去除无效值
            t_clean, N_clean = clean_data(t, N)
            fit_result = fitting_module.fit_dynamic(t_clean, N_clean, index)
            
            if fit_result:
                # print(fit_result)
                params = fit_result['params']
                r_squared = fit_result['r_squared']
                logging.info(f"拟合成功。R² = {r_squared:.4f}")
                logging.info(f"拟合参数: a = {params[0]:.4f}, b = {params[1]:.4f}, c = {params[2]:.4f}, d = {params[3]:.4f}, e = {params[4]:.4f}, f = {params[5]:.4f}")
            else:
                logging.error("拟合失败。")
            logging.info("--------------------------------------------------")

        # 对每一列数据执行峰值检测
        for col in collected_data.columns[1:]:  # 从第2列到结束
            N = collected_data[col]  # 确保 'N' 列名正确
            index = str(col)
            logging.info(f"\n开始执行峰值检测：{col}...")

            # 去除无效值
            t_clean, N_clean = clean_data(t, N)
            
            # 执行峰值检测
            peaks_info = peak_detection_module.detect_peaks(N_clean,index)
            
            # 打印检测结果
            if peaks_info:
                logging.info(f"检测到 {len(peaks_info)} 个峰值。")
            else:
                logging.warning(f"{col} 没有检测到峰值。")
            
            # 绘制并保存峰值检测结果
            plateaus_info = peak_detection_module.plot_analysis_figure(t_clean, N_clean, peaks_info, index)

            Peak.append(peaks_info)
            Plateaus.append(plateaus_info)
            logging.info("--------------------------------------------------")
        
        # 保存结果到 CSV 文件
        main_module = MainModule(config=config)
        main_module.save_results_to_csv(Peak, Plateaus)
        main_module.plot_overall_analysis(Peak, Plateaus)
        

In [None]:
if __name__ == "__main__":
    # 配置日志记录（如上）
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s",
        handlers=[
            logging.StreamHandler()           # 同时输出到控制台
        ]
    )
    # 初始化 FittingModule 实例
    config = load_config("C:\\Users\\husky\\Desktop\\peek-find\\PEM4Fe\\data\\config.yaml")  # 或者 "config.json"
    peak_detection_module = PeakDetectionModule(config=config)
    logging.info("PeakDetectionModule 实例已初始化。")

    # 加载数据
    t = collected_data['t']
    Peak = []
    Plateaus = []
    
    # 对每一列数据执行峰值检测
    for col in collected_data.columns[1:]:  # 从第2列到结束
        N = collected_data[col]  # 确保 'N' 列名正确
        index = str(col)
        logging.info(f"开始执行峰值检测：{col}...")

        # 去除无效值
        t_clean, N_clean = clean_data(t, N)
        
        # 执行峰值检测
        peaks_info = peak_detection_module.detect_peaks(N_clean,index)
        
        # 打印检测结果
        if peaks_info:
            logging.info(f"检测到 {len(peaks_info)} 个峰值。")
        else:
            logging.warning(f"{col} 没有检测到峰值。")
        
        # 绘制并保存峰值检测结果
        plateaus_info = peak_detection_module.plot_analysis_figure(t_clean, N_clean, peaks_info, index)

        Peak.append(peaks_info)
        Plateaus.append(plateaus_info)
        logging.info("--------------------------------------------------")
    
    # 保存结果到 CSV 文件
    main_module = MainModule(config=config)
    data=main_module.save_results_to_csv(Peak, Plateaus)
    main_module.plot_overall_analysis(Peak, Plateaus)

# -------------------------- 结果输出模块 -------------------------- #

In [5]:
import argparse
import os
import logging
from PEM4Fe.src.PEM4Fe.config import load_config, setup_logging
from PEM4Fe.src.PEM4Fe.data_processing import clean_data, load_clean_save_data
from PEM4Fe.src.PEM4Fe.fitting import FittingModule
from PEM4Fe.src.PEM4Fe.peak_detection import PeakDetectionModule
from PEM4Fe.src.PEM4Fe.output import MainModule

def main(config):

    # --------------------- 解析命令行参数 --------------------- #
    # parser = argparse.ArgumentParser(description="PEM4Fe: Data Analysis Tool")
    # parser.add_argument(
    #     "--config",
    #     type=str,
    #     required=True,
    #     help="Path to the configuration file (YAML or JSON).",
    # )
    # args = parser.parse_args()

    # --------------------- 加载配置文件 --------------------- #
    try:
        config = load_config(config)
        logging.info(f"成功加载配置文件：{config}")
    except Exception as e:
        logging.error(f"加载配置文件失败：{e}")
        return

    # --------------------- 设置日志 --------------------- #
    log_file = "application.log"
    setup_logging(output_dir=config.get("output_dir", "./output"), log_file=log_file)
    logging.basicConfig(
                level=logging.INFO,
                format="%(asctime)s - %(levelname)s - %(message)s",
                handlers=[
                    logging.StreamHandler(),           # 同时输出到控制台
                    logging.FileHandler("application.log", encoding="utf-8")
                ]
            )
    # 获取 Logger 对象
    logger = logging.getLogger("Main")
    logger.info("日志记录初始化完成。")


    # --------------------- 加载数据 --------------------- #
    input_dir = config.get("input_dir", "./data")
    output_dir = config.get("output_dir", "./output")
    os.makedirs(output_dir, exist_ok=True)

    try:
        collected_data, nc = load_clean_save_data(input_dir, output_dir)
        logger.info("数据加载成功。")
    except Exception as e:
        logger.error(f"数据加载失败：{e}")
        return

    t = collected_data['t']

    # --------------------- 初始化模块 --------------------- #
    fitting_module = FittingModule(config=config, logger=logger)
    peak_detection_module = PeakDetectionModule(config=config, logger=logger)
    main_module = MainModule(config=config)

    logger.info("模块初始化完成。")

    # --------------------- 执行核心流程 --------------------- #
    peaks_results = []
    plateaus_results = []

    for col in collected_data.columns[1:]:  # 跳过时间列
        logger.info(f"开始处理列：{col}")
        try:
            # 数据清洗
            t_clean, N_clean = clean_data(t, collected_data[col])

            # 动态拟合
            fit_result = fitting_module.fit_dynamic(t_clean, N_clean, index=col)
            if fit_result:
                logger.info(f"拟合成功：R²={fit_result['r_squared']:.4f}")
            else:
                logger.warning(f"{col} 拟合失败，跳过增长检测。")
                continue
            # 峰值检测
            peaks_info = peak_detection_module.detect_peaks(N_clean, col)
            if peaks_info:
                logger.info(f"{col} 检测到 {len(peaks_info)} 个峰值。")
            else:
                logger.warning(f"{col} 未检测到峰值。")

            # 分析和可视化
            plateaus_info = peak_detection_module.plot_analysis_figure(
                t_clean, N_clean, peaks_info, col, fit_result
            )
            if plateaus_info:
                logger.info(f"{col} 检测到 {len(plateaus_info)} 个平台。")
            else:
                logger.warning(f"{col} 未检测到平台。")
            
            peaks_results.append(peaks_info)
            plateaus_results.append(plateaus_info)

        except Exception as e:
            logger.error(f"处理列 {col} 时发生错误：{e}")

    # --------------------- 保存结果 --------------------- #
    try:
        main_module.save_results_to_csv(peaks_results, plateaus_results)
        main_module.plot_overall_analysis(peaks_results, plateaus_results)
        logger.info("结果保存成功。")
    except Exception as e:
        logger.error(f"保存结果失败：{e}")

    logger.info("分析流程完成。")



ModuleNotFoundError: No module named 'PEM4Fe.peak_detection'

In [2]:
# 调用主函数
config_file = r"C:\Users\husky\Desktop\peek-find\PEM4Fe\data\cool1000\config.yaml"  # 配置文件路径
main(config_file)

NameError: name 'main' is not defined

In [18]:
# -------------------------- 结果输出模块 -------------------------- #
def save_results_to_csv(results, output_file):
    """保存分析结果到 CSV 文件"""
    pass

def save_summary_to_log(results, log_file):
    """保存总结到日志文件"""
    pass

# -------------------------- 主逻辑模块 -------------------------- #

In [19]:
# -------------------------- 主逻辑模块 -------------------------- #
def process_column(column, t_clean, N_clean, Nsc, N0, output_dir, fitting_module, peak_module, viz_module):
    """
    处理单列数据：
    - 数据清洗
    - 拟合分析
    - 峰值检测
    - 结果可视化
    """
    pass

def main(config_file):
    """
    主程序入口：
    - 加载配置与参数
    - 数据处理与分析
    - 结果保存与输出
    """
    pass

# -------------------------- 脚本入口 -------------------------- #

In [None]:
# -------------------------- 脚本入口 -------------------------- #
if __name__ == "__main__":
    # 加载日志
    setup_logging()
    
    # 加载配置和参数
    config_file = "config.json"  # 默认配置文件路径
    main(config_file)

In [None]:
_

In [None]:
with open(r"C:\Users\husky\Desktop\peek-find\PEM4Fe\src\PEM4Fe\__init__.py", "rb") as f:
    content = f.read()
    if b'\x00' in content:
        print("Found null byte!")
    else:
        print("No null byte found.")

In [None]:
# 打开文件并移除空字节
file_path = r"C:\Users\husky\Desktop\peek-find\PEM4Fe\src\PEM4Fe\__init__.py"

with open(file_path, "rb") as f:
    content = f.read()

# 移除空字节
clean_content = content.replace(b'\x00', b'')

# 将清理后的内容写回文件
with open(file_path, "wb") as f:
    f.write(clean_content)

print("Null byte removed!")