In [None]:
#9.30.15:45
#!/usr/bin/env python3
"""
CHB-MIT癫痫数据预处理（完整版）
功能：
1. 处理所有EDF文件（无论是否有癫痫发作）
2. 对有发作的文件：提取发作前30分钟为前期（preictal）
3. 对无发作的文件：全部标记为间期（interictal）
4. 自动跳过发作期及前后30分钟的数据
"""

import os
import glob
import numpy as np
import pyedflib
from scipy import signal
import warnings
warnings.filterwarnings("ignore")

class CHBPreprocessor:
    def __init__(self, target_channels, preictal_mins=30, window_secs=30):
        self.target_channels = [ch.replace(' ', '') for ch in target_channels]
        self.preictal_mins = preictal_mins * 60  # 转换为秒
        self.window_secs = window_secs

    def select_channels(self, data, all_channels):
        channel_map = {ch: i for i, ch in enumerate(all_channels)}
        selected = []
        for ch in self.target_channels:
            if ch in channel_map:
                selected.append(data[channel_map[ch]])
            else:
                print(f"警告: 通道 {ch} 不存在，用零填充")
                selected.append(np.zeros(data.shape[1]))
        return np.stack(selected)

    def notch_filter(self, data, fs, freq=60, Q=30):
        b, a = signal.iirnotch(freq, Q, fs)
        return signal.filtfilt(b, a, data)

    def extract_seizure_info(self, patient_id, data_dir):
        """提取所有EDF文件的发作信息（包括无发作的文件）"""
        seizure_info = {}
        
        # 查找summary文件
        summary_files = glob.glob(os.path.join(data_dir, patient_id, "*summary*.txt")) + \
                       glob.glob(os.path.join(data_dir, f"{patient_id}*summary*.txt"))
        
        if not summary_files:
            print(f"警告: 未找到患者 {patient_id} 的summary文件")
            return seizure_info
        
        summary_file = summary_files[0]
        print(f"加载发作时间: {summary_file}")
        
        try:
            with open(summary_file, 'r') as f:
                current_edf = None
                
                for line in f:
                    line = line.strip()
                    
                    if line.startswith('File Name:'):
                        current_edf = line.split(':')[1].strip()
                        seizure_info[current_edf] = []  # 初始化
                        
                    elif line.startswith('Number of Seizures in File:'):
                        num_seizures = int(line.split(':')[1].strip())
                        
                        if num_seizures > 0:
                            for _ in range(num_seizures):
                                # 读取发作开始时间
                                while not line.startswith('Seizure') or 'Start Time' not in line:
                                    line = next(f).strip()
                                start = int(line.split(':')[1].split()[0])
                                
                                # 读取发作结束时间
                                while not line.startswith('Seizure') or 'End Time' not in line:
                                    line = next(f).strip()
                                end = int(line.split(':')[1].split()[0])
                                
                                seizure_info[current_edf].append((start, end))
            
            # 打印统计信息
            seizure_files = [f for f, sz in seizure_info.items() if sz]
            print(f"发现 {len(seizure_files)} 个有发作的文件，{len(seizure_info)-len(seizure_files)} 个无发作文件")
            
        except Exception as e:
            print(f"解析summary文件失败: {str(e)}")
        
        return seizure_info

    def process_all_files(self, patient_id, data_dir, output_dir):
        """处理所有EDF文件（包括有/无发作的）"""
        os.makedirs(output_dir, exist_ok=True)
        
        # 1. 提取发作信息（包含所有文件）
        seizure_info = self.extract_seizure_info(patient_id, data_dir)
        
        # 2. 获取所有EDF文件
        edf_files = sorted(glob.glob(os.path.join(data_dir, patient_id, "*.edf")) + 
                         glob.glob(os.path.join(data_dir, f"{patient_id}_*.edf")))
        
        if not edf_files:
            print(f"未找到 {patient_id} 的EDF文件")
            return

        print(f"\n开始处理患者 {patient_id} 的 {len(edf_files)} 个EDF文件...")
        
        all_windows, all_labels = [], []
        
        for edf_file in edf_files:
            file_name = os.path.basename(edf_file)
            print(f"\n处理文件: {file_name}")
            
            # 3. 读取EDF数据（全文件读取）
            data, fs, channels = self._read_full_edf(edf_file)
            if data is None:
                continue
            
            # 4. 通道选择和滤波
            selected_data = self.select_channels(data, channels)
            filtered_data = np.array([self.notch_filter(ch, fs) for ch in selected_data])
            
            # 5. 获取当前文件的发作时间（若无发作则为空列表）
            file_seizures = seizure_info.get(file_name, [])
            
            # 6. 划分窗口并标注
            windows, labels = self._segment_data(filtered_data, fs, file_seizures)
            
            # 统计标签分布
            if labels:
                interictal = sum(l == 0 for l in labels)
                preictal = sum(l == 1 for l in labels)
                excluded = len(labels) - interictal - preictal
                print(f"  窗口分布 - 间期: {interictal}, 前期: {preictal}, 排除: {excluded}")
            
            all_windows.extend(windows)
            all_labels.extend(labels)
        
        # 7. 保存最终结果
        if all_windows:
            save_path = os.path.join(output_dir, f"{patient_id}_all_processed.npz")
            np.savez(save_path, 
                    windows=np.array(all_windows), 
                    labels=np.array(all_labels))
            print(f"\n已保存结果到 {save_path}")
            print(f"总窗口数: {len(all_windows)}")
            print(f"标签分布 - 间期: {sum(all_labels == 0)}, 前期: {sum(all_labels == 1)}")

    def _read_full_edf(self, edf_path):
        """完整读取EDF文件"""
        try:
            with pyedflib.EdfReader(edf_path) as f:
                channels = [ch.strip().replace(' ', '') for ch in f.getSignalLabels()]
                fs = f.getSampleFrequency(0)
                data = np.vstack([f.readSignal(i) for i in range(len(channels))])
                
                duration = data.shape[1] / fs / 60
                print(f"  读取完成: {duration:.1f}分钟数据, {len(channels)}个通道")
                
                return data, fs, channels
                
        except Exception as e:
            print(f"读取EDF失败: {str(e)}")
            return None, None, None

    def _segment_data(self, data, fs, file_seizures):
        """划分窗口并智能标注"""
        window_size = int(self.window_secs * fs)
        n_samples = data.shape[1]
        
        windows, labels = [], []
        
        for start_sample in range(0, n_samples - window_size + 1, window_size):
            end_sample = start_sample + window_size
            window = data[:, start_sample:end_sample]
            start_sec = start_sample / fs
            
            label = self._classify_window(start_sec, file_seizures)
            if label != -1:  # 不保存排除的窗口
                windows.append(window)
                labels.append(label)
                
        return windows, labels

    def _classify_window(self, start_sec, file_seizures):
        """窗口分类逻辑"""
        end_sec = start_sec + self.window_secs
        
        # 检查是否在某个发作的排除范围内
        for sz_start, sz_end in file_seizures:
            exclude_start = sz_start - 1800  # 发作前30分钟
            exclude_end = sz_end + 1800      # 发作后30分钟
            if start_sec >= exclude_start and end_sec <= exclude_end:
                return -1  # 排除
        
        # 检查是否在某个发作的前期范围内
        for sz_start, _ in file_seizures:
            preictal_start = sz_start - self.preictal_mins
            if start_sec >= preictal_start and end_sec <= sz_start:
                return 1  # 前期
        
        return 0  # 间期

def process_patient(patient_id, data_dir, output_base_dir="processed_data"):
    TARGET_CHANNELS = ['F7-T7', 'T7-P7', 'F3-C3', 'F8-T8', 'F4-C4', 'P7-O1']
    
    processor = CHBPreprocessor(
        target_channels=TARGET_CHANNELS,
        preictal_mins=30,
        window_secs=30
    )
    
    output_dir = os.path.join(output_base_dir, patient_id)
    processor.process_all_files(patient_id, data_dir, output_dir)

if __name__ == "__main__":
    DATA_DIR = "/root/autodl-tmp/epli"
    OUTPUT_DIR = "/root/final_results"
    
    # 示例处理（可修改为其他患者ID）
    process_patient("chb13", DATA_DIR, OUTPUT_DIR)

加载发作时间: /root/autodl-tmp/epli/chb13/chb13-summary.txt
发现 8 个有发作的文件，25 个无发作文件

开始处理患者 chb13 的 33 个EDF文件...

处理文件: chb13_02.edf
  读取完成: 60.0分钟数据, 28个通道
  窗口分布 - 间期: 120, 前期: 0, 排除: 0

处理文件: chb13_03.edf
  读取完成: 60.0分钟数据, 28个通道
  窗口分布 - 间期: 120, 前期: 0, 排除: 0

处理文件: chb13_04.edf
  读取完成: 60.0分钟数据, 25个通道
  窗口分布 - 间期: 120, 前期: 0, 排除: 0

处理文件: chb13_05.edf
  读取完成: 60.0分钟数据, 22个通道
  窗口分布 - 间期: 120, 前期: 0, 排除: 0

处理文件: chb13_06.edf
  读取完成: 60.0分钟数据, 22个通道
  窗口分布 - 间期: 120, 前期: 0, 排除: 0

处理文件: chb13_07.edf
  读取完成: 60.0分钟数据, 22个通道
  窗口分布 - 间期: 120, 前期: 0, 排除: 0

处理文件: chb13_08.edf
  读取完成: 60.0分钟数据, 22个通道
  窗口分布 - 间期: 120, 前期: 0, 排除: 0

处理文件: chb13_09.edf
  读取完成: 60.0分钟数据, 22个通道
  窗口分布 - 间期: 120, 前期: 0, 排除: 0

处理文件: chb13_10.edf
  读取完成: 60.0分钟数据, 22个通道
  窗口分布 - 间期: 120, 前期: 0, 排除: 0

处理文件: chb13_11.edf
  读取完成: 60.0分钟数据, 22个通道
  窗口分布 - 间期: 120, 前期: 0, 排除: 0

处理文件: chb13_12.edf
  读取完成: 60.0分钟数据, 22个通道
  窗口分布 - 间期: 120, 前期: 0, 排除: 0

处理文件: chb13_13.edf
  读取完成: 60.0分钟数据, 22个通道
  窗口分布 - 间期: 120, 前期: 0, 

In [1]:
#V16:05--------------修改间期逻辑，只有有发作期的才提取了
#!/usr/bin/env python3
"""
EDF癫痫数据处理完整流程
功能：
1. 安全读取EDF文件（带内存监控）
2. 通道选择与陷波滤波
3. 基于summary文件精确提取发作时间
4. 划分前期(preictal)/间期(interictal)窗口
5. 保存为压缩的NPZ格式
"""

import os
import glob
import numpy as np
import pyedflib
from scipy import signal
import gc
import psutil
import warnings
from typing import Dict, List, Tuple

warnings.filterwarnings("ignore")

class EpilepsyEDFProcessor:
    def __init__(self, 
                 target_channels: List[str],
                 preictal_mins: int = 30,
                 window_secs: int = 30,
                 notch_freq: float = 60.0,
                 notch_quality: float = 30.0,
                 max_mem_gb: float = 4.0):
        """
        初始化处理器
        :param target_channels: 目标通道列表（如['F7-T7', 'F3-C3']）
        :param preictal_mins: 前期定义时长（分钟）
        :param window_secs: 分析窗口长度（秒）
        :param notch_freq: 陷波滤波器中心频率（Hz）
        :param notch_quality: 陷波滤波器Q值
        :param max_mem_gb: 最大允许内存（GB）
        """
        self.target_channels = [ch.replace(' ', '') for ch in target_channels]
        self.preictal_secs = preictal_mins * 60
        self.window_secs = window_secs
        self.notch_freq = notch_freq
        self.notch_quality = notch_quality
        self.max_mem_bytes = max_mem_gb * 1024**3
        self.chunk_size = 3600  # 基础分块大小（样本数）

    def check_memory(self) -> bool:
        """检查当前内存是否安全"""
        mem = psutil.virtual_memory()
        return mem.available > self.max_mem_bytes * 0.2  # 保留20%缓冲

    def parse_summary_file(self, summary_path: str) -> Dict[str, List[Tuple[int, int]]]:
        """
        精确解析summary文件
        :return: {edf文件名: [(发作开始1,结束1), (发作开始2,结束2),...]}
        """
        seizure_info = {}
        current_file = None
        
        with open(summary_path, 'r') as f:
            for line in f:
                line = line.strip()
                
                if line.startswith('File Name:'):
                    current_file = line.split(':')[1].strip()
                    seizure_info[current_file] = []
                    
                elif line.startswith('Number of Seizures in File:'):
                    num_seizures = int(line.split(':')[1].strip())
                    if num_seizures > 0:
                        for _ in range(num_seizures):
                            # 定位到发作开始行
                            while not line.startswith('Seizure') or 'Start Time' not in line:
                                line = next(f).strip()
                            start = int(line.split(':')[1].split()[0])
                            
                            # 定位到发作结束行
                            while not line.startswith('Seizure') or 'End Time' not in line:
                                line = next(f).strip()
                            end = int(line.split(':')[1].split()[0])
                            
                            seizure_info[current_file].append((start, end))
        
        # 验证结果
        valid_files = [f for f, sz in seizure_info.items() if sz]
        print(f"解析完成: 共{len(valid_files)}个有发作的文件, {sum(len(sz) for sz in seizure_info.values())}次发作")
        return seizure_info

    def safe_read_edf(self, edf_path: str) -> Tuple[np.ndarray, float, List[str]]:
        """
        安全读取EDF文件（带内存保护）
        :return: (数据数组, 采样率, 实际通道列表)
        """
        if not self.check_memory():
            raise MemoryError("内存不足，终止读取")

        try:
            with pyedflib.EdfReader(edf_path) as f:
                fs = f.getSampleFrequency(0)
                all_channels = [ch.strip() for ch in f.getSignalLabels()]
                n_samples = f.getNSamples()[0]
                
                # 初始化数据数组
                data = np.zeros((len(self.target_channels), n_samples), dtype=np.float32)
                
                # 分通道读取
                for i, target_ch in enumerate(self.target_channels):
                    if target_ch not in all_channels:
                        print(f"警告: 通道 {target_ch} 不存在，用零填充")
                        continue
                        
                    # 分块读取（避免大数组峰值）
                    ch_idx = all_channels.index(target_ch)
                    for start in range(0, n_samples, self.chunk_size):
                        end = min(start + self.chunk_size, n_samples)
                        data[i, start:end] = f.readSignal(ch_idx, start, end - start)
                        
                        # 定期检查内存
                        if start % (10 * self.chunk_size) == 0 and not self.check_memory():
                            raise MemoryError("内存不足，终止读取")
                
                return data, fs, all_channels
                
        except Exception as e:
            print(f"EDF读取失败: {str(e)}")
            raise

    def preprocess_data(self, 
                       data: np.ndarray, 
                       fs: float,
                       all_channels: List[str]) -> np.ndarray:
        """
        数据预处理流程：
        1. 通道选择
        2. 陷波滤波
        """
        # 通道选择（已在读取时完成）
        
        # 陷波滤波（分块处理）
        filtered_data = np.zeros_like(data)
        for i in range(data.shape[0]):
            if not self.check_memory():
                raise MemoryError("内存不足，终止滤波")
                
            # 设计滤波器
            b, a = signal.iirnotch(self.notch_freq, self.notch_quality, fs)
            
            # 分块滤波（避免内存峰值）
            for start in range(0, data.shape[1], self.chunk_size):
                end = min(start + self.chunk_size, data.shape[1])
                filtered_data[i, start:end] = signal.filtfilt(b, a, data[i, start:end])
        
        return filtered_data

    def segment_data(self,
                    data: np.ndarray,
                    fs: float,
                    seizure_times: List[Tuple[int, int]]) -> Tuple[np.ndarray, np.ndarray]:
        """
        划分数据窗口并标注类别
        :return: (窗口数据, 标签)
        """
        window_size = int(self.window_secs * fs)
        windows = []
        labels = []
        
        for start in range(0, data.shape[1] - window_size + 1, window_size):
            if not self.check_memory():
                raise MemoryError("内存不足，终止分段")
                
            # 计算当前窗口时间（秒）
            window_start_sec = start / fs
            window_end_sec = window_start_sec + self.window_secs
            
            # 分类窗口
            label = self.classify_window(window_start_sec, window_end_sec, seizure_times)
            if label != -1:  # 不保存排除窗口
                windows.append(data[:, start:start+window_size])
                labels.append(label)
        
        return np.array(windows), np.array(labels)

    def classify_window(self, start_sec: float, end_sec: float, seizure_times: List[Tuple[int, int]]) -> int:
        """
        修正后的窗口分类逻辑：
        -1: 排除区（发作期 + 发作后30分钟）
         0: 间期
         1: 前期（发作前30-60分钟）
        """
        for sz_start, sz_end in seizure_times:
            # 排除区：发作期 + 发作后30分钟
            if sz_start <= start_sec <= (sz_end + 1800):
                return -1

            # 前期：发作前30-60分钟（注意不与排除区重叠）
            preictal_start = sz_start - 3600  # 60分钟前
            preictal_end = sz_start - 1800    # 30分钟前
            if preictal_start <= start_sec < preictal_end:
                return 1

        return 0  # 间期
    def process_edf_file(self,
                         edf_path: str,
                         output_path: str,
                         seizure_times: List[Tuple[int, int]]) -> bool:
        """
        处理单个EDF文件完整流程
        """
        try:
            # 1. 安全读取
            print(f"开始处理: {os.path.basename(edf_path)}")
            data, fs, all_channels = self.safe_read_edf(edf_path)
            
            # 2. 预处理
            filtered_data = self.preprocess_data(data, fs, all_channels)
            del data  # 释放原始数据
            gc.collect()
            
            # 3. 划分窗口
            windows, labels = self.segment_data(filtered_data, fs, seizure_times)
            del filtered_data
            gc.collect()
            
            # 4. 保存结果
            np.savez_compressed(
                output_path,
                windows=windows,
                labels=labels,
                fs=fs,
                channels=self.target_channels,
                window_secs=self.window_secs
            )
            print(f"处理成功: {output_path} (窗口数: {len(windows)})")
            return True
            
        except Exception as e:
            print(f"处理失败: {str(e)}")
            return False

def batch_process(data_dir: str,
                  output_dir: str,
                  target_channels: List[str],
                  patients: List[str] = None):
    """
    批量处理EDF文件
    """
    os.makedirs(output_dir, exist_ok=True)
    
    # 初始化处理器
    processor = EpilepsyEDFProcessor(target_channels)
    
    # 查找患者目录
    if patients is None:
        patients = sorted([d for d in os.listdir(data_dir) if d.startswith('chb')])
    
    for patient_id in patients:
        patient_dir = os.path.join(data_dir, patient_id)
        if not os.path.isdir(patient_dir):
            continue
            
        print(f"\n处理患者: {patient_id}")
        
        # 查找summary文件
        summary_files = glob.glob(os.path.join(patient_dir, "*summary*.txt"))
        if not summary_files:
            print(f"警告: 未找到 {patient_id} 的summary文件")
            continue
            
        # 解析发作信息
        seizure_info = processor.parse_summary_file(summary_files[0])
        
        # 处理EDF文件
        edf_files = glob.glob(os.path.join(patient_dir, "*.edf"))
        for edf_file in edf_files:
            file_name = os.path.basename(edf_file)
            output_path = os.path.join(
                output_dir,
                f"{patient_id}_{file_name.replace('.edf', '.npz')}"
            )
            
            # 获取当前文件的发作时间
            current_seizures = seizure_info.get(file_name, [])
            
            # 处理文件
            processor.process_edf_file(
                edf_path=edf_file,
                output_path=output_path,
                seizure_times=current_seizures
            )

if __name__ == "__main__":
    # 配置参数
    DATA_DIR = "/root/autodl-tmp/epli"
    OUTPUT_DIR = "/root/processed_results"
    TARGET_CHANNELS = ['F7-T7', 'T7-P7', 'F3-C3', 'F8-T8', 'F4-C4', 'P7-O1']
    
    # 示例：处理chb13和chb14
    batch_process(
        data_dir=DATA_DIR,
        output_dir=OUTPUT_DIR,
        target_channels=TARGET_CHANNELS,
        patients=["chb23"]
    )


处理患者: chb23
解析完成: 共3个有发作的文件, 7次发作
开始处理: chb23_06.edf
处理成功: /root/processed_results/chb23_chb23_06.npz (窗口数: 186)
开始处理: chb23_07.edf
处理成功: /root/processed_results/chb23_chb23_07.npz (窗口数: 85)
开始处理: chb23_08.edf
处理成功: /root/processed_results/chb23_chb23_08.npz (窗口数: 222)
开始处理: chb23_09.edf
处理成功: /root/processed_results/chb23_chb23_09.npz (窗口数: 265)
开始处理: chb23_10.edf
处理成功: /root/processed_results/chb23_chb23_10.npz (窗口数: 480)
开始处理: chb23_16.edf
处理成功: /root/processed_results/chb23_chb23_16.npz (窗口数: 480)
开始处理: chb23_17.edf
处理成功: /root/processed_results/chb23_chb23_17.npz (窗口数: 419)
开始处理: chb23_19.edf
处理成功: /root/processed_results/chb23_chb23_19.npz (窗口数: 480)
开始处理: chb23_20.edf
处理成功: /root/processed_results/chb23_chb23_20.npz (窗口数: 166)


In [5]:
data = np.load("/root/processed_results/chb23_chb23_06.npz")
print(data['windows'].shape)  # 应为 (n_windows, n_channels, window_samples)
print(np.unique(data['labels'], return_counts=True))  # 查看标签分布

(186, 6, 7680)
(array([0, 1]), array([126,  60]))


In [7]:
data = np.load("/root/processed_results/chb23_chb23_07.npz")
print(data['windows'].shape)  # 应为 (n_windows, n_channels, window_samples)
print(np.unique(data['labels'], return_counts=True))  # 查看标签分布

(85, 6, 7680)
(array([0]), array([85]))


In [6]:
#V16:05--------------
#!/usr/bin/env python3
"""
EDF癫痫数据处理完整流程
功能：
1. 安全读取EDF文件（带内存监控）
2. 通道选择与陷波滤波
3. 基于summary文件精确提取发作时间
4. 划分前期(preictal)/间期(interictal)窗口
5. 保存为压缩的NPZ格式
"""

import os
import glob
import numpy as np
import pyedflib
from scipy import signal
import gc
import psutil
import warnings
from typing import Dict, List, Tuple

warnings.filterwarnings("ignore")

class EpilepsyEDFProcessor:
    def __init__(self, 
                 target_channels: List[str],
                 preictal_mins: int = 30,
                 window_secs: int = 30,
                 notch_freq: float = 60.0,
                 notch_quality: float = 30.0,
                 max_mem_gb: float = 4.0):
        """
        初始化处理器
        :param target_channels: 目标通道列表（如['F7-T7', 'F3-C3']）
        :param preictal_mins: 前期定义时长（分钟）
        :param window_secs: 分析窗口长度（秒）
        :param notch_freq: 陷波滤波器中心频率（Hz）
        :param notch_quality: 陷波滤波器Q值
        :param max_mem_gb: 最大允许内存（GB）
        """
        self.target_channels = [ch.replace(' ', '') for ch in target_channels]
        self.preictal_secs = preictal_mins * 60
        self.window_secs = window_secs
        self.notch_freq = notch_freq
        self.notch_quality = notch_quality
        self.max_mem_bytes = max_mem_gb * 1024**3
        self.chunk_size = 3600  # 基础分块大小（样本数）

    def check_memory(self) -> bool:
        """检查当前内存是否安全"""
        mem = psutil.virtual_memory()
        return mem.available > self.max_mem_bytes * 0.2  # 保留20%缓冲

    def parse_summary_file(self, summary_path: str) -> Dict[str, List[Tuple[int, int]]]:
        """
        精确解析summary文件
        :return: {edf文件名: [(发作开始1,结束1), (发作开始2,结束2),...]}
        """
        seizure_info = {}
        current_file = None
        
        with open(summary_path, 'r') as f:
            for line in f:
                line = line.strip()
                
                if line.startswith('File Name:'):
                    current_file = line.split(':')[1].strip()
                    seizure_info[current_file] = []
                    
                elif line.startswith('Number of Seizures in File:'):
                    num_seizures = int(line.split(':')[1].strip())
                    if num_seizures > 0:
                        for _ in range(num_seizures):
                            # 定位到发作开始行
                            while not line.startswith('Seizure') or 'Start Time' not in line:
                                line = next(f).strip()
                            start = int(line.split(':')[1].split()[0])
                            
                            # 定位到发作结束行
                            while not line.startswith('Seizure') or 'End Time' not in line:
                                line = next(f).strip()
                            end = int(line.split(':')[1].split()[0])
                            
                            seizure_info[current_file].append((start, end))
        
        # 验证结果
        valid_files = [f for f, sz in seizure_info.items() if sz]
        print(f"解析完成: 共{len(valid_files)}个有发作的文件, {sum(len(sz) for sz in seizure_info.values())}次发作")
        return seizure_info

    def safe_read_edf(self, edf_path: str) -> Tuple[np.ndarray, float, List[str]]:
        """
        安全读取EDF文件（带内存保护）
        :return: (数据数组, 采样率, 实际通道列表)
        """
        if not self.check_memory():
            raise MemoryError("内存不足，终止读取")

        try:
            with pyedflib.EdfReader(edf_path) as f:
                fs = f.getSampleFrequency(0)
                all_channels = [ch.strip() for ch in f.getSignalLabels()]
                n_samples = f.getNSamples()[0]
                
                # 初始化数据数组
                data = np.zeros((len(self.target_channels), n_samples), dtype=np.float32)
                
                # 分通道读取
                for i, target_ch in enumerate(self.target_channels):
                    if target_ch not in all_channels:
                        print(f"警告: 通道 {target_ch} 不存在，用零填充")
                        continue
                        
                    # 分块读取（避免大数组峰值）
                    ch_idx = all_channels.index(target_ch)
                    for start in range(0, n_samples, self.chunk_size):
                        end = min(start + self.chunk_size, n_samples)
                        data[i, start:end] = f.readSignal(ch_idx, start, end - start)
                        
                        # 定期检查内存
                        if start % (10 * self.chunk_size) == 0 and not self.check_memory():
                            raise MemoryError("内存不足，终止读取")
                
                return data, fs, all_channels
                
        except Exception as e:
            print(f"EDF读取失败: {str(e)}")
            raise

    def preprocess_data(self, 
                       data: np.ndarray, 
                       fs: float,
                       all_channels: List[str]) -> np.ndarray:
        """
        数据预处理流程：
        1. 通道选择
        2. 陷波滤波
        """
        # 通道选择（已在读取时完成）
        
        # 陷波滤波（分块处理）
        filtered_data = np.zeros_like(data)
        for i in range(data.shape[0]):
            if not self.check_memory():
                raise MemoryError("内存不足，终止滤波")
                
            # 设计滤波器
            b, a = signal.iirnotch(self.notch_freq, self.notch_quality, fs)
            
            # 分块滤波（避免内存峰值）
            for start in range(0, data.shape[1], self.chunk_size):
                end = min(start + self.chunk_size, data.shape[1])
                filtered_data[i, start:end] = signal.filtfilt(b, a, data[i, start:end])
        
        return filtered_data

    def segment_data(self,
                    data: np.ndarray,
                    fs: float,
                    seizure_times: List[Tuple[int, int]]) -> Tuple[np.ndarray, np.ndarray]:
        """
        划分数据窗口并标注类别
        :return: (窗口数据, 标签)
        """
        window_size = int(self.window_secs * fs)
        windows = []
        labels = []
        
        for start in range(0, data.shape[1] - window_size + 1, window_size):
            if not self.check_memory():
                raise MemoryError("内存不足，终止分段")
                
            # 计算当前窗口时间（秒）
            window_start_sec = start / fs
            window_end_sec = window_start_sec + self.window_secs
            
            # 分类窗口
            label = self.classify_window(window_start_sec, window_end_sec, seizure_times)
            if label != -1:  # 不保存排除窗口
                windows.append(data[:, start:start+window_size])
                labels.append(label)
        
        return np.array(windows), np.array(labels)

    def classify_window(self, start_sec: float, end_sec: float, seizure_times: List[Tuple[int, int]]) -> int:
        """
        修正后的窗口分类逻辑：
        -1: 排除区（发作期 + 发作后30分钟）
         0: 间期
         1: 前期（发作前30-60分钟）
        """
        for sz_start, sz_end in seizure_times:
            # 排除区：发作期 + 发作后30分钟
            if sz_start <= start_sec <= (sz_end + 1800):
                return -1

            # 前期：发作前30-60分钟（注意不与排除区重叠）
            preictal_start = sz_start - 3600  # 60分钟前
            preictal_end = sz_start - 1800    # 30分钟前
            if preictal_start <= start_sec < preictal_end:
                return 1

        return 0  # 间期
    def process_edf_file(self,
                         edf_path: str,
                         output_path: str,
                         seizure_times: List[Tuple[int, int]]) -> bool:
        """
        处理单个EDF文件完整流程
        """
        try:
            # 1. 安全读取
            print(f"开始处理: {os.path.basename(edf_path)}")
            data, fs, all_channels = self.safe_read_edf(edf_path)
            
            # 2. 预处理
            filtered_data = self.preprocess_data(data, fs, all_channels)
            del data  # 释放原始数据
            gc.collect()
            
            # 3. 划分窗口
            windows, labels = self.segment_data(filtered_data, fs, seizure_times)
            del filtered_data
            gc.collect()
            
            # 4. 保存结果
            np.savez_compressed(
                output_path,
                windows=windows,
                labels=labels,
                fs=fs,
                channels=self.target_channels,
                window_secs=self.window_secs
            )
            print(f"处理成功: {output_path} (窗口数: {len(windows)})")
            return True
            
        except Exception as e:
            print(f"处理失败: {str(e)}")
            return False

def batch_process(data_dir: str,
                  output_dir: str,
                  target_channels: List[str],
                  patients: List[str] = None):
    """
    批量处理EDF文件
    """
    os.makedirs(output_dir, exist_ok=True)
    
    # 初始化处理器
    processor = EpilepsyEDFProcessor(target_channels)
    
    # 查找患者目录
    if patients is None:
        patients = sorted([d for d in os.listdir(data_dir) if d.startswith('chb')])
    
    for patient_id in patients:
        patient_dir = os.path.join(data_dir, patient_id)
        if not os.path.isdir(patient_dir):
            continue
            
        print(f"\n处理患者: {patient_id}")
        
        # 查找summary文件
        summary_files = glob.glob(os.path.join(patient_dir, "*summary*.txt"))
        if not summary_files:
            print(f"警告: 未找到 {patient_id} 的summary文件")
            continue
            
        # 解析发作信息
        seizure_info = processor.parse_summary_file(summary_files[0])
        
        # 处理EDF文件
        edf_files = glob.glob(os.path.join(patient_dir, "*.edf"))
        for edf_file in edf_files:
            file_name = os.path.basename(edf_file)
            output_path = os.path.join(
                output_dir,
                f"{patient_id}_{file_name.replace('.edf', '.npz')}"
            )
            
            # 获取当前文件的发作时间
            current_seizures = seizure_info.get(file_name, [])
            
            # 处理文件
            processor.process_edf_file(
                edf_path=edf_file,
                output_path=output_path,
                seizure_times=current_seizures
            )

if __name__ == "__main__":
    # 配置参数
    DATA_DIR = "/root/autodl-tmp/epli"
    OUTPUT_DIR = "/root/processed_results"
    TARGET_CHANNELS = ['F7-T7', 'T7-P7', 'F3-C3', 'F8-T8', 'F4-C4', 'P7-O1']
    
    # 示例：处理chb13和chb14
    batch_process(
        data_dir=DATA_DIR,
        output_dir=OUTPUT_DIR,
        target_channels=TARGET_CHANNELS,
        patients=["chb01","chb02","chb03","chb21","chb20"]
    )


处理患者: chb01
解析完成: 共7个有发作的文件, 7次发作
开始处理: chb01_01.edf
处理成功: /root/processed_results/chb01_chb01_01.npz (窗口数: 120)
开始处理: chb01_02.edf
处理成功: /root/processed_results/chb01_chb01_02.npz (窗口数: 120)
开始处理: chb01_03.edf
处理成功: /root/processed_results/chb01_chb01_03.npz (窗口数: 100)
开始处理: chb01_04.edf
处理成功: /root/processed_results/chb01_chb01_04.npz (窗口数: 59)
开始处理: chb01_05.edf
处理成功: /root/processed_results/chb01_chb01_05.npz (窗口数: 120)
开始处理: chb01_06.edf
处理成功: /root/processed_results/chb01_chb01_06.npz (窗口数: 120)
开始处理: chb01_07.edf
处理成功: /root/processed_results/chb01_chb01_07.npz (窗口数: 120)
开始处理: chb01_08.edf
处理成功: /root/processed_results/chb01_chb01_08.npz (窗口数: 120)
开始处理: chb01_09.edf
处理成功: /root/processed_results/chb01_chb01_09.npz (窗口数: 120)
开始处理: chb01_10.edf
处理成功: /root/processed_results/chb01_chb01_10.npz (窗口数: 120)
开始处理: chb01_11.edf
处理成功: /root/processed_results/chb01_chb01_11.npz (窗口数: 120)
开始处理: chb01_12.edf
处理成功: /root/processed_results/chb01_chb01_12.npz (窗口数: 120)
开始处理: chb01_13.edf

In [8]:
#V16:05--------------
#!/usr/bin/env python3
"""
EDF癫痫数据处理完整流程
功能：
1. 安全读取EDF文件（带内存监控）
2. 通道选择与陷波滤波
3. 基于summary文件精确提取发作时间
4. 划分前期(preictal)/间期(interictal)窗口
5. 保存为压缩的NPZ格式
"""

import os
import glob
import numpy as np
import pyedflib
from scipy import signal
import gc
import psutil
import warnings
from typing import Dict, List, Tuple

warnings.filterwarnings("ignore")

class EpilepsyEDFProcessor:
    def __init__(self, 
                 target_channels: List[str],
                 preictal_mins: int = 30,
                 window_secs: int = 30,
                 notch_freq: float = 60.0,
                 notch_quality: float = 30.0,
                 max_mem_gb: float = 4.0):
        """
        初始化处理器
        :param target_channels: 目标通道列表（如['F7-T7', 'F3-C3']）
        :param preictal_mins: 前期定义时长（分钟）
        :param window_secs: 分析窗口长度（秒）
        :param notch_freq: 陷波滤波器中心频率（Hz）
        :param notch_quality: 陷波滤波器Q值
        :param max_mem_gb: 最大允许内存（GB）
        """
        self.target_channels = [ch.replace(' ', '') for ch in target_channels]
        self.preictal_secs = preictal_mins * 60
        self.window_secs = window_secs
        self.notch_freq = notch_freq
        self.notch_quality = notch_quality
        self.max_mem_bytes = max_mem_gb * 1024**3
        self.chunk_size = 3600  # 基础分块大小（样本数）

    def check_memory(self) -> bool:
        """检查当前内存是否安全"""
        mem = psutil.virtual_memory()
        return mem.available > self.max_mem_bytes * 0.2  # 保留20%缓冲

    def parse_summary_file(self, summary_path: str) -> Dict[str, List[Tuple[int, int]]]:
        """
        精确解析summary文件
        :return: {edf文件名: [(发作开始1,结束1), (发作开始2,结束2),...]}
        """
        seizure_info = {}
        current_file = None
        
        with open(summary_path, 'r') as f:
            for line in f:
                line = line.strip()
                
                if line.startswith('File Name:'):
                    current_file = line.split(':')[1].strip()
                    seizure_info[current_file] = []
                    
                elif line.startswith('Number of Seizures in File:'):
                    num_seizures = int(line.split(':')[1].strip())
                    if num_seizures > 0:
                        for _ in range(num_seizures):
                            # 定位到发作开始行
                            while not line.startswith('Seizure') or 'Start Time' not in line:
                                line = next(f).strip()
                            start = int(line.split(':')[1].split()[0])
                            
                            # 定位到发作结束行
                            while not line.startswith('Seizure') or 'End Time' not in line:
                                line = next(f).strip()
                            end = int(line.split(':')[1].split()[0])
                            
                            seizure_info[current_file].append((start, end))
        
        # 验证结果
        valid_files = [f for f, sz in seizure_info.items() if sz]
        print(f"解析完成: 共{len(valid_files)}个有发作的文件, {sum(len(sz) for sz in seizure_info.values())}次发作")
        return seizure_info

    def safe_read_edf(self, edf_path: str) -> Tuple[np.ndarray, float, List[str]]:
        """
        安全读取EDF文件（带内存保护）
        :return: (数据数组, 采样率, 实际通道列表)
        """
        if not self.check_memory():
            raise MemoryError("内存不足，终止读取")

        try:
            with pyedflib.EdfReader(edf_path) as f:
                fs = f.getSampleFrequency(0)
                all_channels = [ch.strip() for ch in f.getSignalLabels()]
                n_samples = f.getNSamples()[0]
                
                # 初始化数据数组
                data = np.zeros((len(self.target_channels), n_samples), dtype=np.float32)
                
                # 分通道读取
                for i, target_ch in enumerate(self.target_channels):
                    if target_ch not in all_channels:
                        print(f"警告: 通道 {target_ch} 不存在，用零填充")
                        continue
                        
                    # 分块读取（避免大数组峰值）
                    ch_idx = all_channels.index(target_ch)
                    for start in range(0, n_samples, self.chunk_size):
                        end = min(start + self.chunk_size, n_samples)
                        data[i, start:end] = f.readSignal(ch_idx, start, end - start)
                        
                        # 定期检查内存
                        if start % (10 * self.chunk_size) == 0 and not self.check_memory():
                            raise MemoryError("内存不足，终止读取")
                
                return data, fs, all_channels
                
        except Exception as e:
            print(f"EDF读取失败: {str(e)}")
            raise

    def preprocess_data(self, 
                       data: np.ndarray, 
                       fs: float,
                       all_channels: List[str]) -> np.ndarray:
        """
        数据预处理流程：
        1. 通道选择
        2. 陷波滤波
        """
        # 通道选择（已在读取时完成）
        
        # 陷波滤波（分块处理）
        filtered_data = np.zeros_like(data)
        for i in range(data.shape[0]):
            if not self.check_memory():
                raise MemoryError("内存不足，终止滤波")
                
            # 设计滤波器
            b, a = signal.iirnotch(self.notch_freq, self.notch_quality, fs)
            
            # 分块滤波（避免内存峰值）
            for start in range(0, data.shape[1], self.chunk_size):
                end = min(start + self.chunk_size, data.shape[1])
                filtered_data[i, start:end] = signal.filtfilt(b, a, data[i, start:end])
        
        return filtered_data

    def segment_data(self,
                    data: np.ndarray,
                    fs: float,
                    seizure_times: List[Tuple[int, int]]) -> Tuple[np.ndarray, np.ndarray]:
        """
        划分数据窗口并标注类别
        :return: (窗口数据, 标签)
        """
        window_size = int(self.window_secs * fs)
        windows = []
        labels = []
        
        for start in range(0, data.shape[1] - window_size + 1, window_size):
            if not self.check_memory():
                raise MemoryError("内存不足，终止分段")
                
            # 计算当前窗口时间（秒）
            window_start_sec = start / fs
            window_end_sec = window_start_sec + self.window_secs
            
            # 分类窗口
            label = self.classify_window(window_start_sec, window_end_sec, seizure_times)
            if label != -1:  # 不保存排除窗口
                windows.append(data[:, start:start+window_size])
                labels.append(label)
        
        return np.array(windows), np.array(labels)

    def classify_window(self, start_sec: float, end_sec: float, seizure_times: List[Tuple[int, int]]) -> int:
        """
        修正后的窗口分类逻辑：
        -1: 排除区（发作期 + 发作后30分钟）
         0: 间期
         1: 前期（发作前30-60分钟）
        """
        for sz_start, sz_end in seizure_times:
            # 排除区：发作期 + 发作后30分钟
            if sz_start <= start_sec <= (sz_end + 1800):
                return -1

            # 前期：发作前30-60分钟（注意不与排除区重叠）
            preictal_start = sz_start - 3600  # 60分钟前
            preictal_end = sz_start - 1800    # 30分钟前
            if preictal_start <= start_sec < preictal_end:
                return 1

        return 0  # 间期
    def process_edf_file(self,
                         edf_path: str,
                         output_path: str,
                         seizure_times: List[Tuple[int, int]]) -> bool:
        """
        处理单个EDF文件完整流程
        """
        try:
            # 1. 安全读取
            print(f"开始处理: {os.path.basename(edf_path)}")
            data, fs, all_channels = self.safe_read_edf(edf_path)
            
            # 2. 预处理
            filtered_data = self.preprocess_data(data, fs, all_channels)
            del data  # 释放原始数据
            gc.collect()
            
            # 3. 划分窗口
            windows, labels = self.segment_data(filtered_data, fs, seizure_times)
            del filtered_data
            gc.collect()
            
            # 4. 保存结果
            np.savez_compressed(
                output_path,
                windows=windows,
                labels=labels,
                fs=fs,
                channels=self.target_channels,
                window_secs=self.window_secs
            )
            print(f"处理成功: {output_path} (窗口数: {len(windows)})")
            return True
            
        except Exception as e:
            print(f"处理失败: {str(e)}")
            return False

def batch_process(data_dir: str,
                  output_dir: str,
                  target_channels: List[str],
                  patients: List[str] = None):
    """
    批量处理EDF文件
    """
    os.makedirs(output_dir, exist_ok=True)
    
    # 初始化处理器
    processor = EpilepsyEDFProcessor(target_channels)
    
    # 查找患者目录
    if patients is None:
        patients = sorted([d for d in os.listdir(data_dir) if d.startswith('chb')])
    
    for patient_id in patients:
        patient_dir = os.path.join(data_dir, patient_id)
        if not os.path.isdir(patient_dir):
            continue
            
        print(f"\n处理患者: {patient_id}")
        
        # 查找summary文件
        summary_files = glob.glob(os.path.join(patient_dir, "*summary*.txt"))
        if not summary_files:
            print(f"警告: 未找到 {patient_id} 的summary文件")
            continue
            
        # 解析发作信息
        seizure_info = processor.parse_summary_file(summary_files[0])
        
        # 处理EDF文件
        edf_files = glob.glob(os.path.join(patient_dir, "*.edf"))
        for edf_file in edf_files:
            file_name = os.path.basename(edf_file)
            output_path = os.path.join(
                output_dir,
                f"{patient_id}_{file_name.replace('.edf', '.npz')}"
            )
            
            # 获取当前文件的发作时间
            current_seizures = seizure_info.get(file_name, [])
            
            # 处理文件
            processor.process_edf_file(
                edf_path=edf_file,
                output_path=output_path,
                seizure_times=current_seizures
            )

if __name__ == "__main__":
    # 配置参数
    DATA_DIR = "/root/autodl-tmp/epli"
    OUTPUT_DIR = "/root/processed_results"
    TARGET_CHANNELS = ['F7-T7', 'T7-P7', 'F3-C3', 'F8-T8', 'F4-C4', 'P7-O1']
    
    # 示例：处理chb13和chb14
    batch_process(
        data_dir=DATA_DIR,
        output_dir=OUTPUT_DIR,
        target_channels=TARGET_CHANNELS,
        patients=["chb19","chb18","chb14","chb13","chb10","chb09","chb05"]
    )


处理患者: chb19
解析完成: 共3个有发作的文件, 3次发作
开始处理: chb19_01.edf
处理成功: /root/processed_results/chb19_chb19_01.npz (窗口数: 120)
开始处理: chb19_02.edf
处理成功: /root/processed_results/chb19_chb19_02.npz (窗口数: 120)
开始处理: chb19_03.edf
处理成功: /root/processed_results/chb19_chb19_03.npz (窗口数: 120)
开始处理: chb19_04.edf
处理成功: /root/processed_results/chb19_chb19_04.npz (窗口数: 120)
开始处理: chb19_05.edf
处理成功: /root/processed_results/chb19_chb19_05.npz (窗口数: 120)
开始处理: chb19_06.edf
处理成功: /root/processed_results/chb19_chb19_06.npz (窗口数: 120)
开始处理: chb19_07.edf
处理成功: /root/processed_results/chb19_chb19_07.npz (窗口数: 120)
开始处理: chb19_08.edf
处理成功: /root/processed_results/chb19_chb19_08.npz (窗口数: 120)
开始处理: chb19_09.edf
处理成功: /root/processed_results/chb19_chb19_09.npz (窗口数: 120)
开始处理: chb19_10.edf
处理成功: /root/processed_results/chb19_chb19_10.npz (窗口数: 120)
开始处理: chb19_11.edf
处理成功: /root/processed_results/chb19_chb19_11.npz (窗口数: 120)
开始处理: chb19_12.edf
处理成功: /root/processed_results/chb19_chb19_12.npz (窗口数: 120)
开始处理: chb19_13.ed

###05的23edf文件未处理 