In [33]:
from typing import Dict, Any, List, Optional
import numpy as np
import mne

In [None]:
class Calculate_v2:
    def get_signal(self, file, labels: List[str]) -> Optional[Dict[str, Any]]:
        """
        signals : np.ndarray or list
        the signals of the chosen channels contained in the EDF.
        signal_headers : list
        one signal header for each channel in the EDF.
        header : dict
        the main header of the EDF file containing meta information.
        """
        # 使用 mne 讀取 EDF 檔案
        raw = mne.io.read_raw_edf(file, preload=True)

        matched_signals = {}

        for label in labels:
            if label in raw.ch_names:
                index = raw.ch_names.index(label)
                signal = raw.get_data(picks=[index])[0]

                # 將 Desaturation 轉為 0/1 型態
                if label == "Desaturation":
                    signal = np.where(signal != 0.0, 1, 0)

                # 建構 signal_header 模擬結構
                signal_header = {
                    "label": label,
                    "dimension": None,
                    "sample_rate": raw.info['sfreq'] if 'sfreq' in raw.info else None,
                    "physical_min": None,
                    "physical_max": None,
                }

                matched_signals[label] = {
                    "signal_header": signal_header,
                    "signal": signal
                }

        return matched_signals if matched_signals else None

    def get_time(self, matched_signals, label, time127):
        matched_signal = matched_signals.get(label)

        if matched_signal is None:
            print(f"Label '{label}' not found in matched signals.")
            return None

        signal = matched_signal.get("signal")

        if signal is None:
            print(f"No signal data found for label '{label}'.")
            return None

        if label == "Saturation":
            time = len(signal) - (time127 / 10)
            return time

    def get_area(self, matched_signals):
        saturation_signal = matched_signals.get("Saturation")["signal"]
        desaturation_signal = matched_signals.get("Desaturation")["signal"]

        # step 1: Expand Saturation Signal (Repeat each value 10 times)
        saturation_signal = np.repeat(saturation_signal, 10)

        # Step 2: Align Desaturation Signal with Expanded Saturation
        min_length = min(len(saturation_signal), len(desaturation_signal))

        # Step 3: Identify Continuous Regions Where Desaturation == 1
        aligned_time = []
        aligned_saturation_signals = []
        regions = []
        current_regions = []
        saturation_127 = 0

        for index in range(min_length):
            if saturation_signal[index] == 127:
                saturation_127 += 1
                continue

            if desaturation_signal[index] == 1:
                aligned_time.append(index)
                aligned_saturation_signals.append(saturation_signal[index])
                current_regions.append(saturation_signal[index])
            else:
                if current_regions:
                    regions.append(current_regions)
                current_regions = []

        if current_regions:
            regions.append(current_regions)

        # Step 4: Compute Area for Each Region
        sum_area = 0
        for region in regions:
            max_value = max(region)
            region_area = sum(max_value - value for value in region)
            sum_area += region_area

        return sum_area, saturation_127

    def cal_result(self, time, area):
        processed_value = (area / 600) / (time / 3600)
        return processed_value


In [34]:
raw = mne.io.read_raw_edf("./data/1122--_S_2024-02-05 - 複製.edf", preload=True)
raw

Extracting EDF parameters from c:\Users\user\Desktop\project\other\hypoxic-burden\data\1122--_S_2024-02-05 - 複製.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 2186999  =      0.000 ... 21869.990 secs...


Unnamed: 0,General,General.1
,Filename(s),1122--_S_2024-02-05 - 複製.edf
,MNE object type,RawEDF
,Measurement date,2024-02-05 at 21:53:00 UTC
,Participant,Stroke
,Experimenter,Unknown
,Acquisition,Acquisition
,Duration,06:04:30 (HH:MM:SS)
,Sampling frequency,100.00 Hz
,Time points,2187000
,Channels,Channels


In [35]:
sat_index = raw.ch_names.index("Saturation")
desat_index = raw.ch_names.index("Desaturation")
print(sat_index)
print(desat_index)
saturation_signal = raw.get_data(picks=[sat_index])[0]
desaturation_signal = raw.get_data(picks=[desat_index])[0]
print(saturation_signal)
print(desaturation_signal)

4
6
[127.         127.95925884 128.89848361 ... 124.00453133 125.02237447
 126.02095303]
[-5.32305998e-17  1.88134686e-04  3.57845863e-04 ... -4.92584708e-04
 -3.57876163e-04 -1.88142651e-04]


In [36]:
def get_signal(file, labels: List[str]) -> Optional[Dict[str, Any]]:
    """
    signals : np.ndarray or list, the signals of the chosen channels contained in the EDF.
    signal_headers : list, one signal header for each channel in the EDF.
    header : dict, the main header of the EDF file containing meta information.
    """
    # 使用 mne 讀取 EDF 檔案
    raw = mne.io.read_raw_edf(file, preload=True)

    matched_signals = {}

    # 針對label取值
    for label in labels:
        if label in raw.ch_names:
            index = raw.ch_names.index(label)
            signal = raw.get_data(picks=[index])[0]

            # 將 Desaturation 正規化為 0 / 1 型態
            if label == "Desaturation":
                signal = np.where(np.abs(signal) > 1e-6, 1, 0)

            # 建構 signal_header 模擬結構
            signal_header = {
                "label": label,
                "dimension": None,
                "sample_rate": raw.info['sfreq'] if 'sfreq' in raw.info else None,
                "physical_min": None,
                "physical_max": None,
            }

            matched_signals[label] = {
                "signal_header": signal_header,
                "signal": signal
            }

    return matched_signals if matched_signals else None

In [None]:
def get_area(matched_signals):
    saturation_signal = matched_signals.get("Saturation")["signal"]
    desaturation_signal = matched_signals.get("Desaturation")["signal"]

    # Step 1: Align Saturation and Desaturation signal 
    min_length = min(len(saturation_signal), len(desaturation_signal))

    # Step2: Calculate validate area 
    # - Saturation < 100 Identify Continuous Regions Where Desaturation == 1
    # - Desaturation = 1 (正規化後) Identify Continuous Regions Where Desaturation == 1
    aligned_saturation_signals = []
    aligned_desaturation_signals = []
    current_regions = []
    regions = []

    for index in range(min_length):
        if (saturation_signal[index] < 100) and (desaturation_signal[index] == 1):
            aligned_saturation_signals.append(saturation_signal[index])
            aligned_desaturation_signals.append(desaturation_signal[index])
            current_regions.append(saturation_signal[index])
        else:
            if current_regions:
                regions.append(current_regions)
                current_regions = []

    # 最後一段若有累積 current_regions 也要加進去
    if current_regions:
        regions.append(current_regions)

    # Step 3: Compute total region for each region
    sum_region = 0
    sum_time = 0

    for region in regions:
        max_value = max(region) # 各區域的最高值
        region_time = len(region)

        region_value = sum(max_value - value for value in region)
        sum_region += region_value
        sum_time += region_time

    return sum_region, sum_time

In [41]:
matched_signals = get_signal("./data/1122--_S_2024-02-05 - 複製.edf", ["Saturation","Desaturation"])

Extracting EDF parameters from c:\Users\user\Desktop\project\other\hypoxic-burden\data\1122--_S_2024-02-05 - 複製.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 2186999  =      0.000 ... 21869.990 secs...


In [42]:
area, time = get_area(matched_signals)
print(area)
print(time)

636020.2546280753
1971307
