# EDF File Analysis Notebook

This notebook provides tools for analyzing EDF (European Data Format) files, commonly used for storing biomedical signals like EEG, ECG, and EMG data. We'll perform:

1. EDF file import and metadata extraction
2. Amplitude Envelope Analysis
3. Frequency-Domain Analysis
4. Visualization using heatmaps and interactive plots

## Import Required Libraries

In [2]:
# Import required libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import mne  # For EDF file handling
import pyedflib  # Alternative EDF library
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from scipy import signal
from scipy.fft import fft, fftfreq
import warnings
warnings.filterwarnings('ignore')  # Suppress warnings

# For interactive plots in Jupyter
import plotly.io as pio
pio.renderers.default = 'notebook'

ModuleNotFoundError: No module named 'pyedflib'

## EDF File Import and Description Function

In [None]:
def import_edf_file(file_path):
    """
    Import an EDF file and return descriptive information and data.
    
    Parameters:
    file_path (str): Path to the EDF file
    
    Returns:
    tuple: (raw_data, data_dict) where raw_data is the MNE Raw object and 
           data_dict contains extracted signals and metadata
    """
    try:
        # Read the EDF file using MNE
        raw = mne.io.read_raw_edf(file_path, preload=True)
        
        # Extract basic information
        info = raw.info
        channels = raw.ch_names
        sampling_freq = raw.info['sfreq']
        n_channels = len(channels)
        duration = raw.n_times / sampling_freq
        
        # Get data as numpy array
        data = raw.get_data()
        
        # Create a dictionary with metadata and signals
        data_dict = {
            'channels': channels,
            'sampling_freq': sampling_freq,
            'n_channels': n_channels,
            'duration': duration,
            'data': data,
            'time_axis': np.arange(0, raw.n_times) / sampling_freq
        }
        
        # Print basic information
        print(f"EDF File Analysis: {file_path}\n")
        print(f"Number of channels: {n_channels}")
        print(f"Channel names: {channels}")
        print(f"Sampling frequency: {sampling_freq} Hz")
        print(f"Duration: {duration:.2f} seconds ({duration/60:.2f} minutes)")
        print(f"Total data points: {raw.n_times}")
        
        # Additional metadata if available
        if hasattr(raw, 'annotations') and len(raw.annotations) > 0:
            print(f"\nAnnotations: {len(raw.annotations)} items")
            print(raw.annotations)
            
        # Try to extract and print any additional header information using pyedflib
        try:
            f = pyedflib.EdfReader(file_path)
            header = f.getHeader()
            signal_headers = [f.getSignalHeader(i) for i in range(n_channels)]
            
            print("\nHeader Information:")
            print(f"Patient ID: {header['patientcode']}")
            print(f"Recording Date: {header['startdate']}")
            print(f"Equipment: {header['equipment']}")
            
            # Add to data dictionary
            data_dict['header'] = header
            data_dict['signal_headers'] = signal_headers
            
            f.close()
        except Exception as e:
            print(f"\nCould not extract detailed header information: {e}")
        
        return raw, data_dict
        
    except Exception as e:
        print(f"Error importing EDF file: {e}")
        return None, None

## Signal Processing Functions

### Amplitude Envelope Analysis Function

In [None]:
def amplitude_envelope_analysis(signal_data, fs):
    """
    Calculate the amplitude envelope of a signal using Hilbert transform.
    
    Parameters:
    signal_data (array): Signal data
    fs (float): Sampling frequency in Hz
    
    Returns:
    tuple: (time_axis, envelope)
    """
    # Calculate the analytic signal using Hilbert transform
    analytic_signal = signal.hilbert(signal_data)
    
    # Calculate envelope as the magnitude of the analytic signal
    envelope = np.abs(analytic_signal)
    
    # Create time axis
    time_axis = np.arange(0, len(signal_data)) / fs
    
    return time_axis, envelope

### Frequency-Domain Analysis Function

In [None]:
def frequency_domain_analysis(signal_data, fs, nperseg=1024):
    """
    Perform frequency domain analysis on a signal.
    
    Parameters:
    signal_data (array): Signal data
    fs (float): Sampling frequency in Hz
    nperseg (int): Length of each segment for STFT
    
    Returns:
    dict: Dictionary containing frequency domain data
    """
    # FFT Analysis
    n = len(signal_data)
    yf = fft(signal_data)
    xf = fftfreq(n, 1/fs)[:n//2]
    yf_abs = 2.0/n * np.abs(yf[0:n//2])
    
    # Power Spectral Density using Welch's method
    f_welch, Pxx_welch = signal.welch(signal_data, fs, nperseg=nperseg)
    
    # Short-Time Fourier Transform for time-frequency analysis
    f_stft, t_stft, Zxx = signal.stft(signal_data, fs, nperseg=nperseg)
    
    # Compute spectrogram
    f_spec, t_spec, Sxx = signal.spectrogram(signal_data, fs, nperseg=nperseg)
    
    # Return dictionary with all computed values
    return {
        'fft_freq': xf,
        'fft_magnitude': yf_abs,
        'psd_freq': f_welch,
        'psd_power': Pxx_welch,
        'stft_freq': f_stft,
        'stft_time': t_stft,
        'stft_values': Zxx,
        'spec_freq': f_spec,
        'spec_time': t_spec,
        'spec_values': Sxx
    }

### Time-Series Analysis Functions

#### Autocorrelation and Partial Autocorrelation Analysis

In [None]:
def autocorrelation_analysis(signal_data, max_lags=100, sample_rate=None):
    """
    Calculate and visualize the autocorrelation function (ACF) and 
    partial autocorrelation function (PACF) of a signal.
    
    Parameters:
    signal_data (array): Signal data
    max_lags (int): Maximum number of lags to compute
    sample_rate (float, optional): Sampling rate in Hz. If provided, x-axis will be in seconds
    
    Returns:
    dict: Dictionary containing ACF and PACF values
    """
    from statsmodels.tsa.stattools import acf, pacf
    import plotly.subplots as sp
    
    # Normalize the signal (optional)
    signal_normalized = (signal_data - np.mean(signal_data)) / np.std(signal_data)
    
    # Calculate ACF and PACF
    lag_acf = acf(signal_normalized, nlags=max_lags, fft=True)
    lag_pacf = pacf(signal_normalized, nlags=max_lags, method='ols')
    
    # Create x-axis values
    if sample_rate is not None:
        x_values = np.arange(len(lag_acf)) / sample_rate
        x_label = 'Time lag (seconds)'
    else:
        x_values = np.arange(len(lag_acf))
        x_label = 'Lag'
    
    # Create subplots
    fig = sp.make_subplots(rows=2, cols=1, 
                         subplot_titles=('Autocorrelation Function (ACF)', 
                                        'Partial Autocorrelation Function (PACF)'))
    
    # Add ACF trace
    fig.add_trace(
        go.Scatter(x=x_values, y=lag_acf, mode='lines', name='ACF',
                  line=dict(color='blue')),
        row=1, col=1
    )
    
    # Add confidence intervals for ACF (±1.96/sqrt(n))
    conf_int = 1.96 / np.sqrt(len(signal_normalized))
    fig.add_trace(
        go.Scatter(x=x_values, y=[conf_int] * len(x_values), mode='lines',
                  line=dict(color='red', dash='dash'), name='95% Confidence'),
        row=1, col=1
    )
    fig.add_trace(
        go.Scatter(x=x_values, y=[-conf_int] * len(x_values), mode='lines',
                  line=dict(color='red', dash='dash'), showlegend=False),
        row=1, col=1
    )
    
    # Add PACF trace
    fig.add_trace(
        go.Scatter(x=x_values, y=lag_pacf, mode='lines', name='PACF',
                  line=dict(color='green')),
        row=2, col=1
    )
    
    # Add confidence intervals for PACF
    fig.add_trace(
        go.Scatter(x=x_values, y=[conf_int] * len(x_values), mode='lines',
                  line=dict(color='red', dash='dash'), showlegend=False),
        row=2, col=1
    )
    fig.add_trace(
        go.Scatter(x=x_values, y=[-conf_int] * len(x_values), mode='lines',
                  line=dict(color='red', dash='dash'), showlegend=False),
        row=2, col=1
    )
    
    # Update layout
    fig.update_layout(
        height=700,
        width=900,
        title='Temporal Dependency Analysis',
        xaxis_title=x_label,
        xaxis2_title=x_label,
        yaxis_title='Correlation',
        yaxis2_title='Partial Correlation'
    )
    
    # Calculate the dominant period from ACF
    # (First significant peak after lag 0)
    acf_peaks_indices = signal.find_peaks(lag_acf[1:])[0] + 1  # +1 because we skipped lag 0
    significant_peaks = [i for i in acf_peaks_indices if lag_acf[i] > conf_int]
    
    if len(significant_peaks) > 0:
        dominant_period_lag = significant_peaks[0]
        if sample_rate is not None:
            dominant_period_seconds = dominant_period_lag / sample_rate
            print(f"Dominant period from ACF: {dominant_period_lag} lags ({dominant_period_seconds:.4f} seconds)")
            # Mark the dominant period on the plot
            fig.add_trace(
                go.Scatter(x=[dominant_period_lag/sample_rate], y=[lag_acf[dominant_period_lag]],
                          mode='markers', marker=dict(size=10, color='red'),
                          name='Dominant Period'),
                row=1, col=1
            )
        else:
            print(f"Dominant period from ACF: {dominant_period_lag} lags")
            # Mark the dominant period on the plot
            fig.add_trace(
                go.Scatter(x=[dominant_period_lag], y=[lag_acf[dominant_period_lag]],
                          mode='markers', marker=dict(size=10, color='red'),
                          name='Dominant Period'),
                row=1, col=1
            )
    else:
        print("No significant periodic pattern detected in ACF")
    
    # Show the plot
    fig.show()
    
    # Return the computed values
    return {
        'acf': lag_acf,
        'pacf': lag_pacf,
        'conf_int': conf_int,
        'x_values': x_values
    }

#### Detrended Fluctuation Analysis

Detrended Fluctuation Analysis (DFA) is used to detect long-range correlations in time series data. It's particularly useful for analyzing non-stationary physiological signals like EEG.

In [None]:
def detrended_fluctuation_analysis(signal_data, scale_min=4, scale_max=None):
    """
    Perform Detrended Fluctuation Analysis (DFA) on a signal to detect long-range correlations.
    
    Parameters:
    signal_data (array): Signal data
    scale_min (int): Minimum scale for DFA calculation
    scale_max (int, optional): Maximum scale for DFA calculation
    
    Returns:
    tuple: (scales, fluctuations, alpha) where alpha is the scaling exponent
    """
    from numpy import log, polyfit
    
    # Set default scale_max if not provided
    if scale_max is None:
        scale_max = len(signal_data) // 10
    
    # Ensure scale_max is not too large
    scale_max = min(scale_max, len(signal_data) // 4)
    
    # Generate the scales for analysis (log-spaced)
    scales = np.logspace(log(scale_min) / log(10), log(scale_max) / log(10), 15).astype(int)
    scales = np.unique(scales)  # Remove duplicates
    
    # Calculate the profile (cumulative sum) of the signal
    profile = np.cumsum(signal_data - np.mean(signal_data))
    
    # Calculate fluctuation for each scale
    fluct = np.zeros(len(scales))
    
    for i, scale in enumerate(scales):
        # Calculate the number of segments
        n_segments = len(profile) // scale
        
        if n_segments == 0:  # Skip if scale is too large
            continue
        
        # Initialize the array for storing fluctuations
        y_segments = np.zeros(n_segments)
        
        # Loop over segments
        for j in range(n_segments):
            segment = profile[j*scale:(j+1)*scale]
            # Fit a polynomial (degree 1 = linear) to the segment
            x = np.arange(scale)
            p = np.polyfit(x, segment, 1)
            # Calculate the fit
            fit = np.polyval(p, x)
            # Calculate the root-mean-square deviation from the fit
            y_segments[j] = np.sqrt(np.mean((segment - fit) ** 2))
        
        # Calculate the mean fluctuation over all segments
        fluct[i] = np.mean(y_segments)
    
    # Filter out zeros in fluctuation array (in case some scales were skipped)
    valid_indices = fluct > 0
    scales_valid = scales[valid_indices]
    fluct_valid = fluct[valid_indices]
    
    # Calculate the scaling exponent (alpha) using linear regression on log-log plot
    polyfit_result = polyfit(np.log(scales_valid), np.log(fluct_valid), 1)
    alpha = polyfit_result[0]
    
    # Create a figure for visualization
    fig = go.Figure()
    
    # Add DFA data points
    fig.add_trace(go.Scatter(
        x=scales_valid,
        y=fluct_valid,
        mode='markers',
        name='DFA',
        marker=dict(size=8)
    ))
    
    # Add the fit line
    # Generate points for the fit line
    x_fit = np.logspace(np.log10(scales_valid.min()), np.log10(scales_valid.max()), 100)
    y_fit = np.exp(polyfit_result[1]) * x_fit**alpha
    
    fig.add_trace(go.Scatter(
        x=x_fit,
        y=y_fit,
        mode='lines',
        name=f'Fit: α = {alpha:.4f}',
        line=dict(color='red')
    ))
    
    # Set log scales
    fig.update_xaxes(type='log', title='Scale (log)')
    fig.update_yaxes(type='log', title='Fluctuation (log)')
    
    # Update layout
    fig.update_layout(
        title='Detrended Fluctuation Analysis (DFA)',
        width=700,
        height=500,
        showlegend=True
    )
    
    # Interpret alpha value
    interpretation = ""
    if alpha < 0.5:
        interpretation = "Anti-correlated signal (α < 0.5): negative correlation"
    elif 0.45 <= alpha <= 0.55:
        interpretation = "Uncorrelated signal (α ≈ 0.5): white noise"
    elif 0.9 <= alpha <= 1.1:
        interpretation = "1/f noise or pink noise (α ≈ 1): transition between white and Brownian noise"
    elif 1.45 <= alpha <= 1.55:
        interpretation = "Brownian noise (α ≈ 1.5): integrated white noise"
    else:
        interpretation = f"Fractional Brownian motion with Hurst exponent H = {alpha-1:.4f}"
    
    print(f"DFA Scaling Exponent (α): {alpha:.4f}")
    print(f"Interpretation: {interpretation}")
    
    fig.show()
    
    return scales_valid, fluct_valid, alpha

## Visualization Functions

### Heatmap Visualization Function

In [None]:
def plot_heatmap(freq_analysis, channel_name, max_freq=None):
    """
    Create a heatmap visualization of time-frequency data.
    
    Parameters:
    freq_analysis (dict): Output from frequency_domain_analysis
    channel_name (str): Name of the channel being visualized
    max_freq (float, optional): Maximum frequency to display in Hz
    """
    # Extract data for spectrogram
    f = freq_analysis['spec_freq']
    t = freq_analysis['spec_time']
    Sxx = freq_analysis['spec_values']
    
    # Apply frequency limit if specified
    if max_freq is not None:
        idx = np.where(f <= max_freq)[0]
        if len(idx) > 0:
            f = f[idx]
            Sxx = Sxx[idx, :]
    
    # Convert power to dB scale
    Sxx_db = 10 * np.log10(Sxx + 1e-10)
    
    # Create heatmap using plotly
    fig = go.Figure(data=go.Heatmap(
        z=Sxx_db,
        x=t,
        y=f,
        colorscale='Viridis',
        colorbar=dict(title='Power (dB)')
    ))
    
    fig.update_layout(
        title=f'Spectrogram Heatmap: {channel_name}',
        xaxis_title='Time (s)',
        yaxis_title='Frequency (Hz)',
        width=900,
        height=600
    )
    
    return fig

### Interactive Signal Plot Function

In [None]:
def plot_interactive_signal(time, signal_data, envelope=None, channel_name="Signal"):
    """
    Create an interactive plot of the signal and its envelope.
    
    Parameters:
    time (array): Time axis
    signal_data (array): Signal data
    envelope (array, optional): Signal envelope
    channel_name (str): Name of the channel being visualized
    
    Returns:
    plotly.graph_objects.Figure: Interactive figure
    """
    fig = go.Figure()
    
    # Add signal trace
    fig.add_trace(go.Scatter(
        x=time,
        y=signal_data,
        mode='lines',
        name='Signal',
        line=dict(color='blue', width=1)
    ))
    
    # Add envelope if provided
    if envelope is not None:
        fig.add_trace(go.Scatter(
            x=time,
            y=envelope,
            mode='lines',
            name='Envelope',
            line=dict(color='red', width=1.5)
        ))
        
        # Add negative envelope for visualization
        fig.add_trace(go.Scatter(
            x=time,
            y=-envelope,
            mode='lines',
            name='Neg. Envelope',
            line=dict(color='red', width=1.5)
        ))
    
    # Update layout
    fig.update_layout(
        title=f"{channel_name} - Time Domain Analysis",
        xaxis_title="Time (s)",
        yaxis_title="Amplitude",
        legend=dict(x=0.02, y=0.98),
        width=900,
        height=400,
        hovermode="closest"
    )
    
    # Add interactive features
    fig.update_layout(
        updatemenus=[
            dict(
                type="buttons",
                direction="right",
                buttons=[
                    dict(label="Reset",
                         method="relayout",
                         args=[{"xaxis.range": [time.min(), time.max()]}])
                ],
                pad={"r": 10, "t": 10},
                showactive=False,
                x=0.1,
                y=1.1,
                xanchor="right",
                yanchor="top"
            )
        ]
    )
    
    # Add range slider
    fig.update_layout(
        xaxis=dict(
            rangeslider=dict(visible=True),
            type="linear"
        )
    )
    
    return fig

### Comprehensive Analysis Function

In [None]:
def analyze_edf_channel(data_dict, channel_index, max_time=None, max_freq=100, apply_filter=False):
    """
    Perform comprehensive analysis on a single channel from an EDF file.
    
    Parameters:
    data_dict (dict): Data dictionary from import_edf_file function
    channel_index (int): Index of channel to analyze
    max_time (float, optional): Maximum time to display in seconds
    max_freq (float): Maximum frequency to display in Hz
    apply_filter (bool): Whether to apply a 49Hz notch filter
    """
    # Extract channel data
    channel_name = data_dict['channels'][channel_index]
    signal_data = data_dict['data'][channel_index]
    fs = data_dict['sampling_freq']
    time_axis = data_dict['time_axis']
    
    print(f"\nAnalyzing channel: {channel_name}")
    
    # Apply time limit if specified
    if max_time is not None and max_time < time_axis[-1]:
        idx = np.where(time_axis <= max_time)[0]
        signal_data = signal_data[idx]
        time_axis = time_axis[idx]
    
    # Apply frequency filter if specified
    if apply_filter:
        signal_data = apply_frequency_filter(signal_data, fs)
    
    # Perform amplitude envelope analysis
    _, envelope = amplitude_envelope_analysis(signal_data, fs)
    
    # Perform frequency domain analysis
    freq_analysis = frequency_domain_analysis(signal_data, fs)
    
    # Create interactive signal plot
    signal_fig = plot_interactive_signal(time_axis, signal_data, envelope, channel_name)
    signal_fig.show()
    
    # Create frequency domain plot
    freq_fig = go.Figure()
    freq_fig.add_trace(go.Scatter(
        x=freq_analysis['fft_freq'],
        y=freq_analysis['fft_magnitude'],
        mode='lines',
        name='FFT Magnitude'
    ))
    
    freq_fig.update_layout(
        title=f"{channel_name} - Frequency Spectrum",
        xaxis_title="Frequency (Hz)",
        yaxis_title="Magnitude",
        width=900,
        height=400,
        xaxis=dict(range=[0, max_freq])
    )
    freq_fig.show()
    
    # Create heatmap
    heatmap_fig = plot_heatmap(freq_analysis, channel_name, max_freq)
    heatmap_fig.show()
    
    # Display statistics
    print(f"\nStatistics for {channel_name}:")
    print(f"Mean: {np.mean(signal_data):.4f}")
    print(f"Std Dev: {np.std(signal_data):.4f}")
    print(f"Min: {np.min(signal_data):.4f}")
    print(f"Max: {np.max(signal_data):.4f}")
    print(f"Duration: {time_axis[-1]:.2f} seconds")

## Usage Example

To use this notebook for analyzing an EDF file, follow these steps:

In [None]:
# Define the missing frequency filter function
def apply_frequency_filter(signal_data, fs, notch_freq=49.0, quality_factor=30.0):
    """
    Apply a notch filter to remove specific frequency noise (e.g., 50/60Hz power line noise).
    
    Parameters:
    signal_data (array): Signal data
    fs (float): Sampling frequency in Hz
    notch_freq (float): Frequency to filter out in Hz
    quality_factor (float): Quality factor for the notch filter
    
    Returns:
    array: Filtered signal
    """
    # Design notch filter
    b, a = signal.iirnotch(notch_freq, quality_factor, fs)
    
    # Apply filter
    filtered_data = signal.filtfilt(b, a, signal_data)
    
    return filtered_data

# Example usage with an EDF file
# Replace with the path to your EDF file
file_path = "raw data/SC4001E0-PSG.edf"

# Uncomment and run the following lines when you have an EDF file

# # Import the EDF file
raw, data_dict = import_edf_file(file_path)
# 
# # Analyze first channel (index 0) with a 49Hz filter and 60 seconds of data
if data_dict is not None:
    analyze_edf_channel(data_dict, 0, max_time=60, max_freq=50, apply_filter=True)

# # To analyze another channel, change the index

Extracting EDF parameters from /home/yahia/notebooks/raw data/SC4001E0-PSG.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 7949999  =      0.000 ... 79499.990 secs...
Reading 0 ... 7949999  =      0.000 ... 79499.990 secs...


EDF File Analysis: raw data/SC4001E0-PSG.edf

Number of channels: 7
Channel names: ['EEG Fpz-Cz', 'EEG Pz-Oz', 'EOG horizontal', 'Resp oro-nasal', 'EMG submental', 'Temp rectal', 'Event marker']
Sampling frequency: 100.0 Hz
Duration: 79500.00 seconds (1325.00 minutes)
Total data points: 7950000

Header Information:
Patient ID: 
Recording Date: 1989-04-24 16:13:00
Equipment: 

Analyzing channel: EEG Fpz-Cz



Statistics for EEG Fpz-Cz:
Mean: -0.0000
Std Dev: 0.0000
Min: -0.0001
Max: 0.0002
Duration: 60.00 seconds


## Analyzing Multiple Channels

In [None]:
def analyze_multiple_channels(data_dict, channel_indices=None, max_time=60, max_freq=50):
    """
    Analyze multiple channels from the EDF file.
    
    Parameters:
    data_dict (dict): Data dictionary from import_edf_file function
    channel_indices (list, optional): List of channel indices to analyze. If None, analyze all.
    max_time (float): Maximum time to display in seconds
    max_freq (float): Maximum frequency to display in Hz
    """
    if channel_indices is None:
        channel_indices = range(data_dict['n_channels'])
        
    for idx in channel_indices:
        if idx < data_dict['n_channels']:
            analyze_edf_channel(data_dict, idx, max_time, max_freq)
            print("\n" + "-"*80 + "\n")
        else:
            print(f"Channel index {idx} is out of range.")


In [None]:

# Example usage:
analyze_multiple_channels(data_dict, [0, 1, 2], max_time=30)


Analyzing channel: EEG Fpz-Cz



Statistics for EEG Fpz-Cz:
Mean: 0.0000
Std Dev: 0.0000
Min: -0.0001
Max: 0.0002
Duration: 30.00 seconds

--------------------------------------------------------------------------------


Analyzing channel: EEG Pz-Oz



Statistics for EEG Pz-Oz:
Mean: -0.0000
Std Dev: 0.0000
Min: -0.0000
Max: 0.0000
Duration: 30.00 seconds

--------------------------------------------------------------------------------


Analyzing channel: EOG horizontal



Statistics for EOG horizontal:
Mean: -0.0000
Std Dev: 0.0001
Min: -0.0003
Max: 0.0003
Duration: 30.00 seconds

--------------------------------------------------------------------------------



## Conclusion

This notebook provides a comprehensive toolkit for analyzing EDF files with various signal processing techniques:

1. **Data Import and Description**: Extract and display metadata from EDF files
2. **Amplitude Envelope Analysis**: Extract signal envelopes using Hilbert transform
3. **Frequency-Domain Analysis**: Compute and analyze FFT, power spectral density, and STFT
4. **Visualization**: Create interactive plots and heatmaps for signal analysis

To use this notebook with your own data, simply provide the path to your EDF file and run the analysis functions.

## Temporal Dependency Analysis

This section provides tools for analyzing temporal dependencies in EEG signals using:

1. Autocorrelation Function (ACF)
2. Partial Autocorrelation Function (PACF)
3. Detrended Fluctuation Analysis (DFA)

In [None]:
# Make sure the file is loaded
if 'data_dict' not in globals() or data_dict is None:
    file_path = "by captain borat/raw/EEG_0_per_hour_2024-03-20 17_12_18.edf"
    raw, data_dict = import_edf_file(file_path)

# Extract first channel data and limit to first 30 seconds for analysis
channel_idx = 0
channel_name = data_dict['channels'][channel_idx]
fs = data_dict['sampling_freq']
signal_data = data_dict['data'][channel_idx]

# Use 60 seconds of data for autocorrelation analysis
seconds = 60
n_samples = int(seconds * fs)
signal_segment = signal_data[:n_samples]

# Perform autocorrelation analysis
print(f"\nPerforming autocorrelation analysis on {channel_name} (first {seconds} seconds)")
acf_results = autocorrelation_analysis(signal_segment, max_lags=int(fs*3), sample_rate=fs)  # 3 seconds of lags

# Perform detrended fluctuation analysis
print(f"\nPerforming DFA on {channel_name} (first {seconds} seconds)")
scales, fluct, alpha = detrended_fluctuation_analysis(signal_segment, scale_min=4, scale_max=int(fs*10)) # Up to 10 seconds scale


Performing autocorrelation analysis on EEG Fpz-Cz (first 60 seconds)
Dominant period from ACF: 215 lags (2.1500 seconds)
Dominant period from ACF: 215 lags (2.1500 seconds)



Performing DFA on EEG Fpz-Cz (first 60 seconds)
DFA Scaling Exponent (α): 1.1457
Interpretation: Fractional Brownian motion with Hurst exponent H = 0.1457
DFA Scaling Exponent (α): 1.1457
Interpretation: Fractional Brownian motion with Hurst exponent H = 0.1457


In [None]:
from scipy.signal import spectrogram
from mne.time_frequency import psd_array_multitaper

# Load the specific EDF file
file_path = "by captain borat/raw/EEG_0_per_hour_2024-03-20 17_12_18.edf"
raw, data_dict = import_edf_file(file_path)

if data_dict is not None:
    # Extract first channel data and limit to first 30 seconds
    fs = data_dict['sampling_freq']  # Should be 512 Hz as specified
    channel_idx = 0
    channel_name = data_dict['channels'][channel_idx]
    signal_data = data_dict['data'][channel_idx]
    
    # Limit to first 30 seconds
    n_samples = int(60 * fs)
    signal_data = signal_data[:n_samples]
    time_axis = data_dict['time_axis'][:n_samples]
    
    # Calculate multitaper spectrogram
    
    # Define parameters for multitaper spectrogram
    window_size = 2  # seconds
    window_samples = int(window_size * fs)
    step = 0.1  # seconds
    step_samples = int(step * fs)
    
    # Calculate multitaper PSD for overlapping windows
    times = np.arange(0, 30-window_size, step)
    
    # Don't pre-define freqs - get it from the first PSD calculation
    # Get first segment to determine frequency axis shape
    first_segment = signal_data[:window_samples]
    psd_first, freqs = psd_array_multitaper(first_segment, sfreq=fs, fmin=0, fmax=fs/2, 
                                   adaptive=True, normalization='full')
    
    # Now create the spec_mt array with the correct dimensions
    spec_mt = np.zeros((len(freqs), len(times)))
    
    # Start the loop (recomputing the first window)
    for i, t in enumerate(times):
        start_sample = int(t * fs)
        end_sample = start_sample + window_samples
        segment = signal_data[start_sample:end_sample]
        
        # Calculate multitaper PSD for this window
        psd, _ = psd_array_multitaper(segment, sfreq=fs, fmin=0, fmax=fs/2, 
                                     adaptive=True, normalization='full')
        spec_mt[:, i] = psd
    
    # Convert to dB scale
    spec_mt_db = 10 * np.log10(spec_mt + 1e-10)
    
    # Plot multitaper spectrogram
    fig = go.Figure(data=go.Heatmap(
        z=spec_mt_db,
        x=times + window_size/2,  # Center time points
        y=freqs,
        colorscale='Viridis',
        colorbar=dict(title='Power (dB)')
    ))
    
    fig.update_layout(
        title=f'Multitaper Spectrogram: {channel_name} (First 30 seconds)',
        xaxis_title='Time (s)',
        yaxis_title='Frequency (Hz)',
        width=900,
        height=600
    )
    
    fig.show()
    
    # Also plot the time series for reference
    time_fig = plot_interactive_signal(time_axis, signal_data, channel_name=channel_name)
    time_fig.update_layout(title=f"{channel_name} - First 30 seconds")
    time_fig.show()
else:
    print("Failed to load EDF file")

Extracting EDF parameters from /home/yahia/notebooks/by captain borat/raw/EEG_0_per_hour_2024-03-20 17_12_18.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 44153855  =      0.000 ... 86237.998 secs...
Reading 0 ... 44153855  =      0.000 ... 86237.998 secs...
EDF File Analysis: by captain borat/raw/EEG_0_per_hour_2024-03-20 17_12_18.edf

Number of channels: 1
Channel names: ['Temp']
Sampling frequency: 512.0 Hz
Duration: 86238.00 seconds (1437.30 minutes)
Total data points: 44153856

Header Information:
Patient ID: 
Recording Date: 2024-05-19 10:41:49
Equipment: 
    Using multitaper spectrum estimation with 7 DPSS windows
    Using multitaper spectrum estimation with 7 DPSS windows
EDF File Analysis: by captain borat/raw/EEG_0_per_hour_2024-03-20 17_12_18.edf

Number of channels: 1
Channel names: ['Temp']
Sampling frequency: 512.0 Hz
Duration: 862