In [1]:
from pyedflib import EdfReader

class EdfReaderWrapper(EdfReader):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
    def __enter__(self):
        return self
    def __exit__(self, *args):
        super().close()

In [None]:
import os
import json
import datetime

outdir = "./chbmit-plotinfo"
data_dir = "/home/zgl/publicdata/CHBMIT/0_data_edf/chbmit/1.0.0"
ignore_lst = ["chb16_18.edf", "chb16_19.edf", "chb17c_13.edf", "chb18_01.edf", "chb19_01.edf", "chb11_01.edf", "chb12_27.edf", "chb12_28.edf", "chb12_29.edf", "chb09_01.edf", "chb15_01.edf"]

# TODO 改为上面的isoformat
dt_fmt = '%Y-%m-%d %H:%M:%S'

for indexPatient in range(1, 24+1):
    result_obj = {"record_lst": [], "seizure_lst": [], "unused_rec_idx_lst": []}
    with open(os.path.join(data_dir, f"chb{indexPatient:02d}", f"chb{indexPatient:02d}-summary.txt"), 'r') as f:
        for line in f:
            data=line.split(':')
            if(data[0]=="File Name"):
                edfName=data[1].strip()

                with EdfReaderWrapper(os.path.join(data_dir, f"chb{indexPatient:02d}", edfName)) as pedf: 
                    startTime = pedf.getStartdatetime()
                    startStr = startTime.isoformat()
                    endTime = startTime + datetime.timedelta(seconds=pedf.getFileDuration())
                    endStr = endTime.isoformat()
                    result_obj["record_lst"].append({
                        "file": edfName, 
                        "span": [startStr, endStr], 
                        "info": f"edfName records {startTime} ~ {endTime} \r\n of shape {pedf.signals_in_file, pedf.getNSamples()[0]}"
                    })
                    
                nextLine = f.readline().strip()
                while (len(nextLine) != 0 and not nextLine.startswith('Number of Seizures in File:')):
                    nextLine = f.readline().strip()
                if nextLine.startswith('Number of Seizures in File:'):
                    for j in range(0, int(nextLine.split(':')[1])):
                        szStartSec = int(f.readline().split(': ')[1].strip().split(' ')[0])
                        szEndSec = int(f.readline().split(': ')[1].strip().split(' ')[0])
                        result_obj["seizure_lst"].append({
                            "span": [startTime+timedelta(seconds=szStartSec), startTime+timedelta(seconds=szEndSec)], 
                            "info": f'Onset {startTime+timedelta(seconds=szStartSec)}, last {szEndSec - szStartSec}s'
                        })

        # result_obj["record_lst"].sort(key=lambda obj:obj["span"])
        # result_obj["seizure_lst"].sort(key=lambda obj:obj["span"])
        for k, rec_info in enumerate(result_obj["record_lst"]):
            if rec_info["file"] in ignore_lst:
                result_obj["unused_rec_idx_lst"].append(k)
        #   rec_info["span"] = [rec_info["span"][0].isoformat(), rec_info["span"][1].isoformat()]
        
    with open(os.path.join(outdir, f'chb{indexPatient:02d}.json'), 'wt') as f:
        json.dump(result_obj, f, indent=2)

In [1]:
import mne

class MNEEdfObjWrapper:
    def __init__(self, *args, **kwargs):
        self.raw = mne.io.read_raw_edf(*args, **kwargs)
    def __enter__(self):
        return self.raw
    def __exit__(self, *args):
        self.raw.close()


In [4]:
from typing import List
# TODO 添加对Offset注释的识别
def _is_possile_sz(s: str, kws: List[str] = ['sz', 'seiz', 'onset', '发作', '癫痫']): 
    s = s.lower()
    if any([ kw in s for kw in kws ]):  return True
    else:                               return False

In [None]:
import os
xuanwu_data_path = "/mnt/share/data/xuanwu_raw_merged/cuiyibing/"
plot_args_path = "./cui-mneinfo/"; os.makedirs(plot_args_path, exist_ok=True)
ignore_lst = []

import os
import json, csv
import glob
from datetime import datetime, timedelta
import warnings

with os.scandir(xuanwu_data_path) as entries:
    for entry in entries:
                        result_obj = {"record_lst": [], "seizure_lst": [], "unused_rec_idx_lst": []}
                        record_fn_lst =  glob.glob(os.path.join(entry.path, "**", "*.edf"), recursive=True)
                        record_fn_lst.extend(glob.glob(os.path.join(entry.path, "**", "*.EDF"), recursive=True))
                        record_fn_lst.extend(glob.glob(os.path.join(entry.path, "**", "*.BDF"), recursive=True))
                        record_fn_lst.extend(glob.glob(os.path.join(entry.path, "**", "*.bdf"), recursive=True))
                        
                        last_ch_names = None
                        for edf_path in record_fn_lst:
                            try:
                                print(f"Try loading {edf_path}")
                                # with EdfReaderWrapper(edf_path) as pedf:
                                #     start_dt = pedf.getStartdatetime()
                                #     end_dt = start_dt + timedelta(seconds=(edf_len := pedf.getFileDuration()))
                                #     fs = pedf.getSampleFrequency(0)
                                #     # assert all((FS := pedf.getSampleFrequencies()) == fs) # TODO 支持过滤非脑电数据通道
                                #     result_obj["record_lst"].append({
                                #         "file": os.path.basename(edf_path), 
                                #         # "span": [start_dt.isoformat(), end_dt.isoformat()], 
                                #         "span": [start_dt, end_dt], 
                                #         "info": f"{os.path.basename(edf_path)} of shape {pedf.signals_in_file, pedf.getNSamples()[0]}"
                                #     })
                                with MNEEdfObjWrapper(edf_path, preload=False) as raw:
                                    start_dt = raw.info['meas_date']
                                    end_dt = start_dt + timedelta(seconds=(raw.n_times / raw.info['sfreq'])) # TODO 核查对于EDF-D情形下此算法是否正确
                                    fs = raw.info['sfreq']

                                    # 构建病人信息字典，并添加到列表中
                                    result_obj["record_lst"].append({
                                        "file": os.path.basename(edf_path),
                                        "span": [start_dt, end_dt],
                                        "info": f"{os.path.basename(edf_path)} of shape {len(raw.ch_names), raw.n_times}", 
                                        "annotations": [(a['onset'], a['description']) for a in raw.annotations] if hasattr(raw, 'annotations') else []
                                    })   
                                    
                                    # 检查跨文件通道一致性
                                    if last_ch_names is None: last_ch_names = raw.ch_names
                                    elif last_ch_names != raw.ch_names: 
                                        warnings.warn(f"相较于之前的通道排布发生变化！\n{edf_path}")
                                        last_ch_names = raw.ch_names

                                    # TODO 非精准匹配可能的发作标注
                                    for annt in raw.annotations: 
                                        if _is_possile_sz(annt['description']): 
                                            result_obj["seizure_lst"].append({
                                                "span": [annt['orig_time'], annt['orig_time']+timedelta(seconds=annt['duration'])], 
                                                "info": f"Onset {annt['orig_time'].isoformat()}, last {annt['duration']}s"
                                            })
                                                                     
                            except ValueError as exp:
                                warnings.warn(f"ValueError from {exp}")

                        result_obj["record_lst"].sort(key=lambda obj:obj["span"])
                        result_obj["seizure_lst"].sort(key=lambda obj:obj["span"])
                        for k, rec_info in enumerate(result_obj["record_lst"]):
                            if rec_info["file"] in ignore_lst:
                                result_obj["unused_rec_idx_lst"].append(k)
                            rec_info["span"] = [rec_info["span"][0].isoformat(), rec_info["span"][1].isoformat()]
                        
                        for seiz_info in result_obj["seizure_lst"]:
                            seiz_info["span"] = [seiz_info["span"][0].isoformat(), seiz_info["span"][1].isoformat()]
                            
                        with open(os.path.join(plot_args_path, f'{entry.name}.json'), "wt") as fout:
                            json.dump(result_obj, fout, indent=2)




In [18]:
with open("./cui-mneinfo/cuiyibing_converted.json", "rt") as f:
    result_obj = json.load(f)

for k, rec_info in enumerate(result_obj["record_lst"]):
    # if rec_info["file"] in ignore_lst:
    #     result_obj["unused_rec_idx_lst"].append(k)
    rec_info["span"] = [datetime.fromisoformat(rec_info["span"][0]), datetime.fromisoformat(rec_info["span"][1])]

for seiz_info in result_obj["seizure_lst"]:
    seiz_info["span"] = [datetime.fromisoformat(seiz_info["span"][0]), datetime.fromisoformat(seiz_info["span"][1])]


In [None]:
import plotly.offline as pyo
from timeline import get_pat_timeline
fig = get_pat_timeline(title="Cui YiBing", record_seq=result_obj["record_lst"], seizure_seq=result_obj["seizure_lst"])

pyo.plot(fig, filename="./cui-mneinfo/cui-timeline.html", # include_plotlyjs="./plotly.min.js", 
            auto_open=False, image='svg', image_width=2560, image_height=1440)
fig.show()