In [3]:
# !pip install pyedflib

In [4]:
import pyedflib
import pandas as pd
import numpy as np
from scipy.signal import butter, filtfilt
import matplotlib.pyplot as plt
import os
import pandas as pd
from sklearn.metrics import cohen_kappa_score
import re

# Trying and building everything on 1 patient
----
## Extract
----
### Signals

In [5]:
edf_file = '/kaggle/input/svuh-dataset/files/ucddb002.rec'
edf = pyedflib.EdfReader(edf_file)
signals = {}
for i in range(edf.signals_in_file):
    label = edf.getLabel(i)
    signal = edf.readSignal(i)
    signals[label] = signal
edf._close()

### Annotations

In [47]:
import pandas as pd
import re

# Define the path to the file
txt_file = '/kaggle/input/svuh-dataset/files/ucddb006_respevt.txt'

# Columns you expect in the dataframe
columns = ['Time', 'Type', 'PB/CS', 'Duration', 'Low', '%Drop', 'Snore', 'Arousal', 'Rate', 'Change']

# Step 1: Read the file with a custom function to handle splitting issues
def read_custom_file(txt_file):
    with open(txt_file, 'r') as f:
        # Read the content, skipping the first 3 lines
        lines = f.readlines()[3:]
        
    # Process lines manually to avoid splitting on 'PB EVENT'
    processed_lines = []
    for line in lines:
        # Replace multiple spaces with a single space
        line = re.sub(r'\s+', ' ', line.strip())
        
        # Ensure 'PB EVENT' is correctly kept as part of the 'PB/CS' column
        line = re.sub(r'\s(PB EVENT)\s', r' PBEVENT ', line)  # This ensures that 'PB EVENT' stays intact
        # Append the processed line to the list
        processed_lines.append(line)
    
    return processed_lines

# Step 2: Load the data using the custom processed lines
processed_lines = read_custom_file(txt_file)

# Step 3: Create a DataFrame from the processed lines
# Now, the data will be correctly parsed without splitting the 'PB/CS' column.
from io import StringIO
data = "\n".join(processed_lines)
annotations_1_1 = pd.read_csv(StringIO(data), delimiter=' ', header=None, names=columns)

# Display a sample of the data
annotations_1_1.sample(10)

Unnamed: 0,Time,Type,PB/CS,Duration,Low,%Drop,Snore,Arousal,Rate,Change
43,02:55:54,PBEVENT,PB,12.0,-,-,,,,
66,03:10:44,HYP-O,8,86.8,4,-,-,,,
222,,,,,,,,,,
143,05:18:47,APNEA-C,13,86.8,5.2,-,-,,,
110,05:00:33,APNEA-C,15,88.0,2.9,-,-,,,
85,04:17:40,HYP-C,11,83.9,7,+,-,,,
79,04:04:29,HYP-C,18,88.0,4,-,-,,,
71,03:42:36,HYP-C,23,86.7,2.1,+,-,,,
127,05:09:33,HYP-C,15,87.3,2.7,-,-,,,
35,02:51:03,APNEA-C,12,86.8,3.2,-,+,,,


In [48]:
import re

def clean_annotations(df):
    # Define a regex pattern to match integer values (positive integers)
    int_pattern = r"^\d+$"  # This matches any string that represents a positive integer
    
    # Iterate through each row and check if the 'PB/CS' value matches the integer pattern
    for idx, row in df.iterrows():
        # Check if 'PB/CS' matches the integer regex pattern (i.e., is an integer-like string)
        if (not isinstance(row['PB/CS'], str)) or (isinstance(row['PB/CS'], str) and re.match(int_pattern, row['PB/CS'])):
            # Shift the columns of this row by 1
            
            # For each column (except the last one), move the value to the next column
            for col_idx in range(len(df.columns) - 1, 2, -1):
                df.iloc[idx, col_idx] = df.iloc[idx, col_idx - 1]
            # Set the first column to None (or NaN, to maintain consistent length)
            df.loc[idx, 'PB/CS'] = None

        if (isinstance(row['Low'], str) and re.match('\-|\+', row['Low'])):
            for col_idx in range(len(df.columns)-1, 5, -1):
                df.iloc[idx, col_idx] = df.iloc[idx, col_idx-2]
            df.loc[idx, ['Low', '%Drop']] = [None, None]

    return df

# Apply the function to your DataFrame
annotations_1_1_cleaned = clean_annotations(annotations_1_1.copy());

  df.iloc[idx, col_idx] = df.iloc[idx, col_idx - 1]


In [49]:
annotations_1_1_cleaned.sample(10)

Unnamed: 0,Time,Type,PB/CS,Duration,Low,%Drop,Snore,Arousal,Rate,Change
179,05:38:17,PBEVENT,PB,14,,,-,-,,
153,05:23:55,HYP-C,,14,85.9,4.1,-,+,,
39,02:53:47,HYP-C,,14,88.0,4.0,-,+,,
105,04:58:12,HYP-C,,13,88.8,3.2,-,-,71.8,10.4
185,05:41:36,HYP-C,,8,85.9,2.1,-,-,,
149,05:21:48,HYP-C,,17,86.7,4.2,-,+,73.2,7.5
29,02:47:43,HYP-C,,15,88.0,2.8,-,+,85.1,5.9
119,05:05:34,APNEA-C,,12,86.8,4.0,-,-,72.3,9.6
78,04:03:56,APNEA-C,,16,85.9,2.9,-,+,,
67,03:26:54,APNEA-C,,15,86.7,2.1,-,-,,


In [63]:
annotations_1_1.loc[[0, 171, 191]]

Unnamed: 0,Time,Type,PB/CS,Duration,Low,%Drop,Snore,Arousal,Rate,Change
0,00:12:51,HYP-O,14,88.7,2.1,-,-,,,
171,05:34:06,PBEVENT,PB,12.0,-,-,,,,
191,06:01:15,PBEVENT,PB,12.0,-,-,,,,


## Investigation
----

In [54]:
for key, value in signals.items():
    print(key, end=": ")
    print(len(value))

Lefteye: 1438080
RightEye: 1438080
EMG: 1438080
C3A2: 2876160
C4A1: 2876160
ECG: 2876160
SpO2: 179760
Sound: 179760
Flow: 179760
Sum: 179760
ribcage: 179760
abdo: 179760
BodyPos: 179760
Pulse: 179760


In [62]:
import pyedflib

def extract_edf_metadata(file_path):
  """
  Extracts metadata from an EDF file using pyedflib.

  Args:
    file_path: Path to the EDF file.

  Returns:
    A dictionary containing the extracted metadata, including:
      - sampling_frequency: Sampling frequency of the signals.
      - n_channels: Number of channels in the file.
      - channel_labels: List of channel labels.
      - start_datetime: Start time of the recording.
      - patient_id: Patient ID (if available).
      - recording_id: Recording ID (if available).
  """

  try:
    f = pyedflib.EdfReader(file_path)

    metadata = {
        'sampling_frequency': f.getSampleFrequencies(),
        'n_channels': f.signals_in_file,
        'channel_labels': f.getSignalLabels(),
        'start_datetime': f.getStartdatetime(),
    }

    f.close()

    return metadata

  except Exception as e:
    print(f"Error reading EDF file: {e}")
    return None

# Example usage:
file_path = "/kaggle/input/svuh-dataset/files/ucddb002.rec"
metadata = extract_edf_metadata(file_path)

if metadata:
  print("Sampling Frequency:", metadata['sampling_frequency'])
  print("Number of Channels:", metadata['n_channels'])
  print("Channel Labels:", metadata['channel_labels'])
  print("Start Datetime:", metadata['start_datetime'])

Sampling Frequency: [ 64.  64.  64. 128. 128. 128.   8.   8.   8.   8.   8.   8.   8.   8.]
Number of Channels: 14
Channel Labels: ['Lefteye', 'RightEye', 'EMG', 'C3A2', 'C4A1', 'ECG', 'SpO2', 'Sound', 'Flow', 'Sum', 'ribcage', 'abdo', 'BodyPos', 'Pulse']
Start Datetime: 2002-01-01 00:11:04


There are `179760` samples of Pulse for patient 2 and Sampling Frequency is `8 Hz` thus total seconds:


Our model is trained on `375` samples per event. Here we must have `375/8 = 46.875 seconds` _(not ideal)_ worth of signal, so we will use upsampling to solve this inter model problem. And get our desired `30 sec` of data. 

In [None]:
import pandas as pd
import numpy as np
from scipy.signal import resample

def extract_30sec_clips_and_resample(signal_data, annotations, start_datetime, 
                                    original_sampling_frequency, desired_sampling_frequency, 
                                    clip_duration=30, desired_clip_length=375):
  """
  Extracts 30-second clips of signals around events in the annotations file, 
  resamples them to the desired sampling frequency, and ensures consistent 
  clip lengths.

  Args:
    signal_data: A dictionary where keys are channel labels and values are NumPy arrays 
                 containing the signal data.
    annotations: A pandas DataFrame containing the annotations with columns:
                  - 'start_time': Start time of the event.
                  - 'end_time': End time of the event (optional).
    start_datetime: Start datetime of the recording.
    original_sampling_frequency: The original sampling frequency of the signals.
    desired_sampling_frequency: The desired sampling frequency.
    clip_duration: Duration of the clip in seconds (default: 30).
    desired_clip_length: Desired number of samples in each clip (default: 375).

  Returns:
    A list of dictionaries, where each dictionary contains resampled 
    30-second clips for each channel with consistent length.
  """

  clips = []
  samples_per_second = original_sampling_frequency

  for _, row in annotations.iterrows():
    event_start_time = pd.to_datetime(row['start_time'])
    event_end_time = row.get('end_time') 

    # Calculate start and end indices for the 30-second clip
    start_index = int((event_start_time - start_datetime).total_seconds() * samples_per_second)
    end_index = start_index + (clip_duration * samples_per_second)

    # Resample signals before clipping
    resampled_signals = {}
    for channel_label, signal in signal_data.items():
      resampled_signal = resample(signal, 
                                 int(len(signal) * (desired_sampling_frequency / original_sampling_frequency)))
      resampled_signals[channel_label] = resampled_signal

    # Calculate start and end indices for the resampled clip
    resampled_start_index = int(start_index * (desired_sampling_frequency / original_sampling_frequency))
    resampled_end_index = int(end_index * (desired_sampling_frequency / original_sampling_frequency)) 

    # Extract and adjust clip length
    channel_clips = {}
    for channel_label, resampled_signal in resampled_signals.items():
      clip = resampled_signal[resampled_start_index:resampled_end_index] 
      if len(clip) < desired_clip_length:
        # Pad with zeros if clip is shorter than desired length
        clip = np.pad(clip, (0, desired_clip_length - len(clip)), 'constant') 
      elif len(clip) > desired_clip_length:
        # Truncate if clip is longer than desired length
        clip = clip[:desired_clip_length] 
      channel_clips[channel_label] = clip

    clips.append(channel_clips)

  return clips

# Example Usage:
# ... (as in the previous example)