Peak raw data info

In [None]:
def peek_fif_info(file_path):
    """
    Loads a .fif file and prints its information.
    
    Parameters:
        file_path (str): Path to the .fif file.
    """
    # Load the .fif file
    raw = mne.io.read_raw_fif(file_path, preload=False)
    
    # Print the dataset info
    print("\nDataset Information:")
    print(raw.info)
    
    # Print channel names
    print("\nChannel Names:")
    print(raw.info['ch_names'])
    
    # Print sampling frequency
    print("\nSampling Frequency:")
    print(f"{raw.info['sfreq']} Hz")
    
    # Print recording duration
    duration = raw.times[-1]  # Last time point
    print("\nRecording Duration:")
    print(f"{duration:.2f} seconds")
    
    # Extract raw data
    data = raw.get_data()  # Shape: (n_channels, n_times)

    # Calculate statistics
    min_val = data.min()
    max_val = data.max()
    mean_val = data.mean()
    std_val = data.std()

    print(f"Data range: {min_val:.2e} to {max_val:.2e}")
    print(f"Mean value: {mean_val:.2e}")
    print(f"Standard deviation: {std_val:.2e}")

# Path to your .fif file
file_path = "AllData/Figshare/Converted_FIF/H S1 EC.fif"

# Peek at the info
peek_fif_info(file_path)

Convert data into .fif files, set montage, truncate 5 mins

In [None]:
# Convert Figshare to .fif files

import os
import mne

def convert_ec_edf_to_fif_with_montage_and_scaling(input_folder, output_folder_name="Converted_FIF", montage_name="standard_1020"):
    """
    Converts all files ending with 'EC.edf' in the specified folder to FIF format,
    renames channels, drops extraneous channels, sets a standard montage, and scales data to microvolts.

    Parameters:
        input_folder (str): Path to the folder containing the input files.
        output_folder_name (str): Name of the subfolder where FIF files will be saved.
        montage_name (str): Name of the standard montage to assign for digitization (default: "standard_1020").
    """
    # Create output folder under the input folder
    output_folder = os.path.join(input_folder, output_folder_name)
    os.makedirs(output_folder, exist_ok=True)

    # Process all files in the input folder
    for file_name in os.listdir(input_folder):
        if file_name.endswith("EO.edf"):  # Only process files ending with 'EC.edf'
            edf_path = os.path.join(input_folder, file_name)
            fif_name = file_name.replace(".edf", ".fif")  # Replace .edf with .fif
            fif_path = os.path.join(output_folder, fif_name)

            print(f"Processing file: {file_name} -> {fif_name}")

            # Load the EDF file
            raw = mne.io.read_raw_edf(edf_path, preload=True)

            # Step 1: Rename channels to match standard names (remove suffixes like '-LE' or prefixes like 'EEG')
            new_ch_names = [ch.replace('-LE', '').replace('EEG ', '') for ch in raw.ch_names]
            raw.rename_channels({old: new for old, new in zip(raw.ch_names, new_ch_names)})

            # Step 2: Assign channel types to EEG if necessary
            if 'eeg' not in raw.get_channel_types():
                raw.set_channel_types({ch: 'eeg' for ch in raw.ch_names})

            # Step 3: Load the standard montage
            montage_obj = mne.channels.make_standard_montage(montage_name)
            expected_channel_names = montage_obj.ch_names

            # Step 4: Identify and drop channels not part of the standard montage
            channels_to_drop = [ch for ch in raw.ch_names if ch not in expected_channel_names]
            if channels_to_drop:
                raw.drop_channels(channels_to_drop)
                print(f"Dropped channels not in the {montage_name} system: {channels_to_drop}")
            else:
                print(f"No channels need to be dropped. All channels match the {montage_name} system.")

            # Step 5: Set the montage to the raw data
            raw.set_montage(montage_obj, match_case=False, on_missing='ignore')
            print(f"Assigned {montage_name} montage to {file_name}")

            # Step 6: Scale data from volts to microvolts (1 V = 1e6 µV)
            raw.apply_function(lambda x: x * 1e6, picks="eeg")
            print("Data scaled from volts to microvolts.")

            # Save as FIF
            raw.save(fif_path, overwrite=True)
            print(f"Saved FIF file: {fif_path}")

    print(f"All files ending with 'EO.edf' have been converted and saved in {output_folder}")



# Example usage
input_folder = "AllData/Figshare"  # Replace with the actual path to the Figshare folder
convert_ec_edf_to_fif_with_montage_and_scaling(input_folder)


Data preprocessing, trying to remove noise and leave real brain activities

In [None]:
import os
import mne
import numpy as np

In [None]:
def load_fif_data(file_path):
    raw_fif = mne.io.read_raw_fif(file_path, preload=True)
    return raw_fif

In [None]:
def downsampling(raw_fif):
    target_sampling_rate = 250
    
    raw_downsampled = raw_fif.copy()  # Work on a copy to preserve the original object
    raw_downsampled.resample(sfreq=target_sampling_rate, npad="auto")
    return raw_downsampled

In [None]:
def highpass_lowpass_filter(raw_downsampled, l_freq=1.0, h_freq=45.0):
    
    raw_filtered = raw_downsampled.filter(
        l_freq=l_freq, h_freq=h_freq, fir_design='firwin', phase='zero'
    )
    
    return raw_filtered

In [None]:
def rereference_to_average(raw_filtered):
    
    raw_rereferenced = raw_filtered.set_eeg_reference(ref_channels='average', projection=False)
    
    return raw_rereferenced

In [None]:
def auto_reject_bad_channel_and_interpolate(raw, flat_threshold=0.5, noise_threshold=500, verbose=True):
    """
    Detects bad EEG channels based on flatness and excessive noise, interpolates them, and logs statistics.

    Parameters:
        raw (mne.io.Raw): The EEG Raw object.
        flat_threshold (float): Minimum variance to flag a flat channel.
        noise_threshold (float): Maximum variance to flag a noisy channel.
        verbose (bool): If True, prints detailed information about bad channels.

    Returns:
        raw_cleaned (mne.io.Raw): The Raw object with bad channels interpolated.
        stats (dict): Statistics about detected bad channels.
    """
    # Get the raw data
    data = raw.get_data()  # Shape: (n_channels, n_times)

    # Detect flat channels (variance below threshold)
    variances = np.var(data, axis=1)
    flat_channels = [raw.ch_names[i] for i, var in enumerate(variances) if var < flat_threshold]

    # Detect noisy channels (variance above threshold)
    noisy_channels = [raw.ch_names[i] for i, var in enumerate(variances) if var > noise_threshold]

    # Combine flat and noisy channels
    bad_channels = list(set(flat_channels + noisy_channels))
    raw.info['bads'] = bad_channels

    # Log statistics
    stats = {
        "total_channels": len(raw.ch_names),
        "bad_channels": len(bad_channels),
        "bad_channel_names": bad_channels,
        "flat_channels": flat_channels,
        "noisy_channels": noisy_channels,
    }

    if verbose:
        print(f"Total channels: {stats['total_channels']}")
        print(f"Number of bad channels detected: {stats['bad_channels']}")
        print(f"Bad channel names: {stats['bad_channel_names']}")
        print(f"Flat channels: {flat_channels}")
        print(f"Noisy channels: {noisy_channels}")

    # Interpolate bad channels
    raw_cleaned = raw.copy().interpolate_bads(reset_bads=True, mode='accurate')
    if verbose:
        print("Bad channels have been interpolated.")

    return raw_cleaned, stats

In [None]:
def perform_ica_and_reject(raw, amplitude_threshold=100, n_components=None, verbose=True):
    """
    Performs ICA decomposition, rejects high-amplitude components, and provides rejection statistics,
    including the percentage of variance explained by rejected components.

    Parameters:
        raw (mne.io.Raw): The preprocessed Raw EEG/MEG object.
        amplitude_threshold (float): The peak-to-peak amplitude threshold to identify artifact components.
        n_components (int or None): Number of ICA components to compute (default: None, auto-detect).
        verbose (bool): If True, prints detailed information about the rejection process.

    Returns:
        raw_cleaned (mne.io.Raw): The Raw object after ICA and artifact rejection.
        stats (dict): Statistics about the rejected ICA components, including variance explained.
    """
    # Step 1: Perform ICA decomposition
    ica = mne.preprocessing.ICA(n_components=n_components, random_state=97, max_iter=800)
    ica.fit(raw)

    # Step 2: Calculate the peak-to-peak amplitude of each ICA component
    ica_sources = ica.get_sources(raw).get_data()  # Shape: (n_components, n_times)
    ptp_amplitudes = np.ptp(ica_sources, axis=1)  # Peak-to-peak amplitude for each component

    # Step 3: Identify high-amplitude components
    high_amplitude_indices = [i for i, amp in enumerate(ptp_amplitudes) if amp > amplitude_threshold]

    # Step 4: Calculate variance explained by each component
    total_variance = np.var(raw.get_data())  # Total variance of the original data
    component_variances = []
    for i in range(len(ica_sources)):
        # Reconstruct the signal for this single component
        reconstructed_signal = np.outer(ica.mixing_matrix_[:, i], ica_sources[i])
        component_variances.append(np.var(reconstructed_signal))

    # Variance explained by rejected components
    rejected_variance = sum([component_variances[i] for i in high_amplitude_indices])
    percent_rejected_variance = (rejected_variance / total_variance) * 100

    # Step 5: Exclude high-amplitude components
    ica.exclude = high_amplitude_indices

    # Step 6: Apply ICA to remove artifact components
    raw_cleaned = ica.apply(raw)

    if verbose:
        print(f"High-amplitude components rejected: {high_amplitude_indices}")
        print(f"Percentage of variance explained by rejected components: {percent_rejected_variance:.2f}%")

    # Prepare statistics
    stats = {
        "n_components": len(ica_sources),
        "rejected_components": len(high_amplitude_indices),
        "rejected_indices": high_amplitude_indices,
        "variance_explained": percent_rejected_variance,
    }

    return raw_cleaned, stats

In [None]:
def save(raw_clean, file_path_to):
    raw_clean.save(file_path_to, overwrite=True)
    print(f'Cleaned EEG data saved to {file_path_to}')

In [None]:
import os
import json

def main():
    """
    Main function to loop over .fif files, apply preprocessing, save processed files,
    and generate logs for rejected subjects, channels, and ICA components.
    """
    # Define input and output folders
    input_folder = "AllData/Figshare/Converted_FIF_EO"  # Replace with actual path to Converted_FIF
    output_folder_name = "Preprocessed_Figshare_EO"
    output_folder = os.path.join(os.path.dirname(input_folder), output_folder_name)
    os.makedirs(output_folder, exist_ok=True)

    # Define log file paths
    rejected_subjects_log = os.path.join(output_folder, "rejected_subjects.txt")
    rejected_channels_log = os.path.join(output_folder, "rejected_channels.txt")
    rejected_ica_log = os.path.join(output_folder, "rejected_components.txt")
    stats_log_file = os.path.join(output_folder, "subject_stats.json")

    # Initialize logs
    rejected_subjects = []
    rejected_channels = {}
    rejected_ica = {}
    all_stats = {}

    # Loop through all .fif files in the input folder
    for file_name in os.listdir(input_folder):
        if file_name.endswith(".fif"):  # Only process .fif files
            input_file_path = os.path.join(input_folder, file_name)
            output_file_path = os.path.join(output_folder, file_name)

            print(f"Processing file: {file_name}")

            # Load the .fif data
            raw_fif = load_fif_data(input_file_path)

            # Downsample the data to 250 Hz
            raw_downsampled = downsampling(raw_fif)

            # Apply bandpass filter (1–45 Hz)
            raw_filtered = highpass_lowpass_filter(raw_downsampled, l_freq=1.0, h_freq=45.0)

            # Automatically reject bad channels and interpolate
            raw_good_channels, bad_channel_stats = auto_reject_bad_channel_and_interpolate(raw_filtered)

            # Check the percentage of rejected channels
            total_channels = bad_channel_stats["total_channels"]
            bad_channels = bad_channel_stats["bad_channels"]
            rejection_percentage = (bad_channels / total_channels) * 100

            rejected_channels[file_name] = bad_channel_stats["bad_channel_names"]
            
            if rejection_percentage > 20:
                print(f"Subject {file_name} rejected due to {rejection_percentage:.2f}% bad channels.")
                rejected_subjects.append(file_name)
                all_stats[file_name] = {"status": "rejected", "bad_channel_stats": bad_channel_stats}
                continue  # Skip further processing for this file

            # Re-reference to average
            raw_rereferenced = rereference_to_average(raw_good_channels)

            # Perform ICA and reject high-amplitude components
            raw_cleaned, ica_stats = perform_ica_and_reject(raw_rereferenced)

            # Log rejected ICA components
            rejected_ica[file_name] = {
                "rejected_components": ica_stats["rejected_components"],
                "variance_explained": ica_stats["variance_explained"]
            }

            # Save the cleaned data
            save(raw_cleaned, output_file_path)

            # Log stats for processed subjects
            all_stats[file_name] = {
                "status": "processed",
                "bad_channel_stats": bad_channel_stats,
                "ica_stats": ica_stats,
            }

            print(f"Processed file saved to: {output_file_path}")

    # Write rejected subjects to a log file
    with open(rejected_subjects_log, "w") as f:
        for subject in rejected_subjects:
            f.write(f"{subject}\n")

    # Write rejected channels to a log file
    with open(rejected_channels_log, "w") as f:
        for subject, channels in rejected_channels.items():
            f.write(f"{subject}: {channels}\n")

    # Write rejected ICA components to a log file
    with open(rejected_ica_log, "w") as f:
        for subject, ica_info in rejected_ica.items():
            f.write(f"{subject}: {ica_info}\n")

    # Write all stats to the JSON file
    with open(stats_log_file, "w") as f:
        json.dump(all_stats, f, indent=4)

    print(f"All files have been processed and saved in {output_folder}")
    print(f"Rejected subjects logged in {rejected_subjects_log}")
    print(f"Rejected channels logged in {rejected_channels_log}")
    print(f"Rejected ICA components logged in {rejected_ica_log}")
    print(f"All subject stats saved in {stats_log_file}")

# Call the main function
if __name__ == "__main__":
    main()

Visualize rejection log

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.stats.multitest import multipletests
from scipy.stats import ttest_ind

# Define the folder path where the logs are stored
folder_path = "AllData/Figshare/Processed_FIF"

# File paths for the logs
rejected_subjects_file = os.path.join(folder_path, "rejected_subjects.txt")
rejected_channels_file = os.path.join(folder_path, "rejected_channels.txt")
rejected_components_file = os.path.join(folder_path, "rejected_components.txt")

# Read the logs
with open(rejected_subjects_file, 'r') as f:
    rejected_subjects = f.readlines()

with open(rejected_channels_file, 'r') as f:
    rejected_channels = [line.strip() for line in f.readlines()]

with open(rejected_components_file, 'r') as f:
    rejected_components = [line.strip() for line in f.readlines()]

# Initialize data storage for groups
data = {"H": {"subjects": 0, "channels": [], "independent components": [], "variance_explained": []},
        "MDD": {"subjects": 0, "channels": [], "independent components": [], "variance_explained": []}}

# Process rejected subjects
for subject in rejected_subjects:
    group = "H" if subject.startswith("H") else "MDD"
    data[group]["subjects"] += 1

# Process rejected channels
for line in rejected_channels:
    if line:
        parts = line.split(": ")
        group = "H" if parts[0].startswith("H") else "MDD"
        channels_list = eval(parts[1]) if len(parts) > 1 and parts[1] else []
        data[group]["channels"].append(len(channels_list))

# Process rejected independent components
for line in rejected_components:
    if line:
        parts = line.split(": ", 1)
        group = "H" if parts[0].startswith("H") else "MDD"
        stats_dict = eval(parts[1]) if len(parts) > 1 else {}
        data[group]["independent components"].append(stats_dict.get('rejected_components', 0))
        data[group]["variance_explained"].append(stats_dict.get('variance_explained', 0))

# Calculate means and standard errors
def calc_stats(data_list):
    return np.mean(data_list), np.std(data_list) / np.sqrt(len(data_list))

stats = {group: {
    "subjects": data[group]["subjects"],
    "channels": calc_stats(data[group]["channels"]),
    "independent components": calc_stats(data[group]["independent components"]),
    "variance_explained": calc_stats(data[group]["variance_explained"]),
} for group in data.keys()}

# Perform statistical tests
p_channels = ttest_ind(data["H"]["channels"], data["MDD"]["channels"], equal_var=False).pvalue
p_components = ttest_ind(data["H"]["independent components"], data["MDD"]["independent components"], equal_var=False).pvalue
p_variance = ttest_ind(data["H"]["variance_explained"], data["MDD"]["variance_explained"], equal_var=False).pvalue

# Apply Bonferroni correction
p_values = [p_channels, p_components, p_variance]
bonferroni_corrected = multipletests(p_values, method='bonferroni')[1]

# Print corrected p-values
print("Bonferroni Corrected p-values:")
print(f"Channels: {bonferroni_corrected[0]:.3f}")
print(f"Components: {bonferroni_corrected[1]:.3f}")
print(f"Variance Explained: {bonferroni_corrected[2]:.3f}")

# Plot the results
fig, axes = plt.subplots(1, 4, figsize=(22, 6))

# Rejected Subjects
axes[0].bar(["H", "MDD"], [stats["H"]["subjects"], stats["MDD"]["subjects"]])
axes[0].set_title("Rejected Subjects")
axes[0].set_ylabel("Count")

# Rejected Channels
axes[1].bar(["H", "MDD"], [stats["H"]["channels"][0], stats["MDD"]["channels"][0]],
            yerr=[stats["H"]["channels"][1], stats["MDD"]["channels"][1]], capsize=5)
axes[1].set_title("Rejected Channels")
axes[1].set_ylabel("Mean Count")
axes[1].text(0.5, max(stats["H"]["channels"][0], stats["MDD"]["channels"][0]),
             f"p={bonferroni_corrected[0]:.3f}", ha='center')

# Rejected Components
axes[2].bar(["H", "MDD"], [stats["H"]["independent components"][0], stats["MDD"]["independent components"][0]],
            yerr=[stats["H"]["independent components"][1], stats["MDD"]["independent components"][1]], capsize=5)
axes[2].set_title("Rejected Independent Components")
axes[2].set_ylabel("Mean Count")
axes[2].text(0.5, max(stats["H"]["independent components"][0], stats["MDD"]["independent components"][0]),
             f"p={bonferroni_corrected[1]:.3f}", ha='center')

# Variance Explained by Rejected Components
axes[3].bar(["H", "MDD"], [stats["H"]["variance_explained"][0], stats["MDD"]["variance_explained"][0]],
            yerr=[stats["H"]["variance_explained"][1], stats["MDD"]["variance_explained"][1]], capsize=5)
axes[3].set_title("Variance Explained by Rejected Independent Components")
axes[3].set_ylabel("Mean Variance (%)")
axes[3].text(0.5, max(stats["H"]["variance_explained"][0], stats["MDD"]["variance_explained"][0]),
             f"p={bonferroni_corrected[2]:.3f}", ha='center')

plt.tight_layout()
plt.show()
