In [28]:
import mne
import numpy as np
from pprint import pprint
from collections import Counter
import matplotlib.pyplot as plt
import pandas as pd

In [None]:
# Replace with the path to your .fif file you want to explore
fif_file = '../data/openfmri/test/sub-06/run_01.fif'

# Read the raw data with preloading
raw = mne.io.read_raw_fif(fif_file, preload=True)
print(raw)

# Previewing the data

#### Channel information

In [None]:
# number of channels
n_channels = raw.info['nchan']
print(f'Total number of channels: {n_channels}')

# channel types 
channel_types = raw.get_channel_types()
type_counts = Counter(channel_types)
print(f'Channel types: {type_counts}')

# channel names
channel_names = raw.ch_names
print(f'Channel names ({len(channel_names)}): {channel_names}')

In [None]:
print("Channel information:", raw.info['chs'])

In [None]:
print(raw.ch_names)

In [None]:
misc_channels = raw.copy().pick_channels(['MISC201', 'MISC202', 'MISC203', 'MISC204', 'MISC205', 'MISC206', 
                                          'MISC301', 'MISC302', 'MISC303', 'MISC304', 'MISC305', 'MISC306'])
print(misc_channels.info['chs'])

In [None]:
raw.plot(picks=['MISC201', 'MISC202', 'MISC203', 'MISC204', 'MISC205', 'MISC206', 'MISC301', 'MISC302', 'MISC303', 'MISC304', 'MISC305', 'MISC306'])

#### Sampling information
  

In [None]:
# number of samples
n_samples = raw.n_times
print(f'Number of samples (frames): {n_samples}')

# sampling frequency        
sfreq = raw.info['sfreq']
print(f'Sampling frequency: {sfreq} Hz')

# duration of recording 
duration = n_samples / sfreq
print(f'Duration of recording: {duration} seconds')

## Accessign data format

#### Data info
- Data shape
- Data type

In [None]:
data = raw.get_data()

# Data shape
print(f'Data shape (n_channels x n_times): {data.shape}')

# Data type
print(f'Data type: {data.dtype}')

#### Channel acess

In [None]:
# Channel Indices
eeg_indices = mne.pick_types(raw.info, meg=False, eeg=True)
meg_indices = mne.pick_types(raw.info, meg=True, eeg=False)
# print(f'EEG channel indices: {eeg_indices}')
print(f'Number of EEG channels: {len(eeg_indices)}')
print(f'Number of MEG channels: {len(meg_indices)}')

# EEG and MEG Data
eeg_data = data[eeg_indices, :]
meg_data = data[meg_indices, :]
print(f'EEG data shape: {eeg_data.shape}')
print(f'MEG data shape: {meg_data.shape}')

# Optional: Separate MEG magnetometers and gradiometers
mag_indices = mne.pick_types(raw.info, meg='mag')
grad_indices = mne.pick_types(raw.info, meg='grad')
print(f'\nNumber of MEG magnetometer channels: {len(mag_indices)}')
print(f'Number of gradiometer channels: {len(grad_indices)}')

In [None]:
# Get indices of MEG channels
meg_indices = mne.pick_types(raw.info, meg=True, eeg=False)
print(f'Number of MEG channels: {len(meg_indices)}')
print(f'MEG channel indices: {meg_indices}')

# Optional: Separate MEG magnetometers and gradiometers
mag_indices = mne.pick_types(raw.info, meg='mag')
grad_indices = mne.pick_types(raw.info, meg='grad')
print(f'Number of magnetometer channels: {len(mag_indices)}')
print(f'Number of gradiometer channels: {len(grad_indices)}')

print(f'Magnetometer indices: {mag_indices}')

# Get indices of EEG channels
eeg_indices = mne.pick_types(raw.info, meg=False, eeg=True)
print(f'Number of EEG channels: {len(eeg_indices)}')
print(f'EEG channel indices: {eeg_indices}')

In [11]:
import numpy as np


indices = np.arange(2,306,3)
assert np.array_equal(indices, mag_indices), "Not the same"

indices = np.arange(306, 380, 1)
assert np.array_equal(indices, eeg_indices), "Not the same"

In [None]:
channel_info = pd.DataFrame({
    'Name': raw.ch_names,
    'Type': raw.get_channel_types(),
    'Unit': [raw._orig_units.get(ch, 'NA') for ch in raw.ch_names],
    'Sampling Frequency': [sfreq] * n_channels
})
print(channel_info.head())

# Plotting

#### Plotting EEG electrodes in 2D

In [None]:
%matplotlib inline

import warnings
with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=RuntimeWarning)
    raw.plot_sensors(kind='topomap', ch_type='eeg')

#### Plotting EEG electrodes in 3D

In [None]:
%matplotlib inline

import warnings
with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=RuntimeWarning)
    raw.plot_sensors(kind='3d', ch_type='eeg', show_names=True)

#### Plotting MEG electrodes in 2D

In [None]:
import warnings
with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=RuntimeWarning)
    raw.plot_sensors(kind='topomap', ch_type='mag', show_names=True)  # Only MEG sensors MAG
    raw.plot_sensors(kind='topomap', ch_type='grad', show_names=True)  # Only MEG sensors


In [None]:
# Plotting magnetometers in 3D
%matplotlib inline
import warnings
with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=RuntimeWarning)
    raw.plot_sensors(kind='3d', ch_type='mag', show_names=True)

# Plotting gradiometers in 3D
%matplotlib inline
import warnings
with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=RuntimeWarning)
    raw.plot_sensors(kind='3d', ch_type='grad', show_names=True)

#### Plotting both EEG & MEG electrodes together & Calculating closest neighbours

- Plotting all electrodes

In [None]:
# Full code
import plotly.graph_objects as go
import numpy as np
import mne

# Get sensor positions
pos = raw._get_channel_positions() * 1000 # Convert to mm

# Create separate traces for EEG and MEG sensors
eeg_idx = mne.pick_types(raw.info, eeg=True, meg=False)
meg_idx = mne.pick_types(raw.info, meg=True, eeg=False)

# Create the 3D scatter plot
fig = go.Figure()

# Add EEG sensors
eeg_pos = pos[eeg_idx]
fig.add_trace(go.Scatter3d(
    x=eeg_pos[:, 0], y=eeg_pos[:, 1], z=eeg_pos[:, 2],
    mode='markers+text',
    marker=dict(size=8, color='blue'),
    text=[raw.ch_names[i] for i in eeg_idx],
    name='EEG'
))

# Add MEG sensors
meg_pos = pos[meg_idx]
fig.add_trace(go.Scatter3d(
    x=meg_pos[:, 0], y=meg_pos[:, 1], z=meg_pos[:, 2],
    mode='markers+text',
    marker=dict(size=8, color='red'),
    text=[raw.ch_names[i] for i in meg_idx],
    name='MEG'
))

# Find closest MEG electrode for each EEG electrode and plot line
closest_distances = []
for i, eeg_electrode in enumerate(eeg_pos):
    # Calculate distances to all MEG electrodes
    distances = np.sqrt(np.sum((meg_pos - eeg_electrode)**2, axis=1))
    
    # Find the closest MEG electrode
    closest_meg_idx = np.argmin(distances)
    closest_meg_electrode = meg_pos[closest_meg_idx]
    min_distance = distances[closest_meg_idx]
    closest_distances.append(min_distance)
    
    # Create line coordinates
    x_line = [eeg_electrode[0], closest_meg_electrode[0]]
    y_line = [eeg_electrode[1], closest_meg_electrode[1]]
    z_line = [eeg_electrode[2], closest_meg_electrode[2]]
    
    # Add line trace
    fig.add_trace(go.Scatter3d(
        x=x_line, y=y_line, z=z_line,
        mode='lines',
        line=dict(color='green', width=2),  # Made lines slightly thicker
        opacity=0.5,  # Made lines more visible
        showlegend=False,
        hoverinfo='text',
        text=f'Distance: {min_distance:.2f} mm'
    ))

# Update layout
fig.update_layout(
    title='EEG and MEG Sensor Positions with Shortest Distance Lines',
    scene=dict(
        xaxis_title='X (mm)',
        yaxis_title='Y (mm)',
        zaxis_title='Z (mm)',
        aspectmode='data'     

    ),
    width=800,
    height=800
)

# Save the plot as an HTML file
fig.write_html("sensor_positions_closest_distances.html")
print("Plot saved as 'sensor_positions_closest_distances.html'. You can open it in your web browser.")

# Print statistics about the closest distances
print(f"\nClosest Distance Statistics (in mm):")
print(f"Minimum distance: {min(closest_distances):.2f}")
print(f"Maximum distance: {max(closest_distances):.2f}")
print(f"Average distance: {np.mean(closest_distances):.2f}")
print(f"Standard deviation: {np.std(closest_distances):.2f}")

- Plotting only the closely connected EEG and MEG electrode pairs (those that are in the bottom 25% of distance interquartile range)

In [None]:
import plotly.graph_objects as go
import numpy as np
import mne

# Get sensor positions and convert from meters to millimeters
pos = raw._get_channel_positions() * 1000  # Convert positions to mm

# Create separate indices for EEG and MEG sensors
eeg_idx = mne.pick_types(raw.info, eeg=True, meg=False)
meg_idx = mne.pick_types(raw.info, meg=True, eeg=False)

# EEG sensor positions and names
eeg_pos_full = pos[eeg_idx]
eeg_names_full = [raw.ch_names[i] for i in eeg_idx]

# MEG sensor positions and names
meg_pos_full = pos[meg_idx]
meg_names_full = [raw.ch_names[i] for i in meg_idx]

# Variables to store distances and indices
closest_distances = []
closest_meg_indices = []

# Track used MEG electrodes
used_meg_electrodes = set()

# Compute closest MEG electrode for each EEG electrode
for i, eeg_electrode in enumerate(eeg_pos_full):
    distances = np.linalg.norm(meg_pos_full - eeg_electrode, axis=1)
    sorted_indices = np.argsort(distances)
    
    # Find the closest unused MEG electrode
    for closest_meg_idx in sorted_indices:
        if closest_meg_idx not in used_meg_electrodes:
            min_distance = distances[closest_meg_idx]
            used_meg_electrodes.add(closest_meg_idx)
            closest_distances.append(min_distance)
            closest_meg_indices.append(closest_meg_idx)
            break

# Convert lists to numpy arrays for easier indexing
closest_distances = np.array(closest_distances)
closest_meg_indices = np.array(closest_meg_indices)

# Calculate quartiles
q1 = np.percentile(closest_distances, 25)
q3 = np.percentile(closest_distances, 75)
iqr = q3 - q1

# Find indices of EEG electrodes in the bottom 25%
bottom25_indices = np.where(closest_distances <= q1)[0]

# Filter EEG electrodes to only those in bottom 25%
eeg_pos = eeg_pos_full[bottom25_indices]
eeg_names = [eeg_names_full[i] for i in bottom25_indices]

# Get corresponding MEG indices and positions
closest_meg_indices_bottom25 = closest_meg_indices[bottom25_indices]
used_meg_indices = np.unique(closest_meg_indices_bottom25)
used_meg_pos = meg_pos_full[used_meg_indices]
used_meg_names = [meg_names_full[i] for i in used_meg_indices]

# Create the 3D scatter plot
fig = go.Figure()

# Add EEG sensors (Bottom 25%) to the plot
fig.add_trace(go.Scatter3d(
    x=eeg_pos[:, 0],
    y=eeg_pos[:, 1],
    z=eeg_pos[:, 2],
    mode='markers+text',
    marker=dict(size=5, color='blue'),
    text=eeg_names,
    name='EEG (Bottom 25%)'
))

# Add MEG sensors corresponding to the Bottom 25% EEG electrodes
fig.add_trace(go.Scatter3d(
    x=used_meg_pos[:, 0],
    y=used_meg_pos[:, 1],
    z=used_meg_pos[:, 2],
    mode='markers+text',
    marker=dict(size=5, color='red'),
    text=used_meg_names,
    name='MEG (Corresponding)'
))

# Add lines between EEG electrodes and their closest MEG electrodes
for i, eeg_electrode in enumerate(eeg_pos):
    closest_meg_idx = closest_meg_indices_bottom25[i]
    closest_meg_electrode = meg_pos_full[closest_meg_idx]
    min_distance = np.linalg.norm(closest_meg_electrode - eeg_electrode)
    
    x_line = [eeg_electrode[0], closest_meg_electrode[0]]
    y_line = [eeg_electrode[1], closest_meg_electrode[1]]
    z_line = [eeg_electrode[2], closest_meg_electrode[2]]
    
    # Add line trace
    fig.add_trace(go.Scatter3d(
        x=x_line,
        y=y_line,
        z=z_line,
        mode='lines',
        line=dict(color='green', width=2),
        opacity=0.8,
        showlegend=False,
        hoverinfo='text',
        text=f'Distance: {min_distance:.2f} mm'
    ))

# Update plot layout
fig.update_layout(
    title='EEG and MEG Sensor Positions (Bottom 25% Distances)',
    scene=dict(
        xaxis_title='X (mm)',
        yaxis_title='Y (mm)',
        zaxis_title='Z (mm)',
        aspectmode='data'  # Ensures equal scaling
    ),
    width=800,
    height=800
)

# Save the plot as an HTML file
fig.write_html("sensor_positions_bottom25.html")
print("Plot saved as 'sensor_positions_bottom25.html'. You can open it in your web browser.")

# Print distance statistics for bottom 25%
bottom25_distances = closest_distances[bottom25_indices]
print(f"\nBottom 25% Closest Distance Statistics (in mm):")
print(f"Minimum distance: {bottom25_distances.min():.2f}")
print(f"Maximum distance: {bottom25_distances.max():.2f}")
print(f"Average distance: {bottom25_distances.mean():.2f}")
print(f"Standard deviation: {bottom25_distances.std():.2f}")
print(f"Number of EEG electrodes in Bottom 25%: {len(bottom25_indices)}")
print(f"Number of MEG electrodes used: {len(used_meg_indices)}")

#### Plotting EEG spectogramm

In [None]:
# Plot EEG data
eeg_indices = mne.pick_types(raw.info, meg=False, eeg=True)
raw.plot(picks=eeg_indices, duration=5, n_channels=30, title='EEG Data')
plt.show()

#### Plotting MEG spectogramm

In [None]:
# Pick MEG channels
meg_indices = mne.pick_types(raw.info, meg=True, eeg=False)

# Plot MEG data
raw.plot(picks=meg_indices, duration=5, n_channels=30, title='MEG Data')
plt.show()


In [None]:

# Channel Indices
eeg_indices = mne.pick_types(raw.info, meg=False, eeg=True)
meg_indices = mne.pick_types(raw.info, meg=True, eeg=False)
print(f'Number of EEG channels: {len(eeg_indices)}')
print(f'Number of MEG channels: {len(meg_indices)}')

# EEG and MEG Data
eeg_data = data[eeg_indices, :]
meg_data = data[meg_indices, :]
print(f'EEG data shape: {eeg_data.shape}')
print(f'MEG data shape: {meg_data.shape}')

# Units Conversion
eeg_data_uv = eeg_data * 1e6  # Convert EEG data to µV
meg_data_ft = meg_data * 1e15  # Convert MEG data to fT (magnetometers)

# Power Spectral Density for EEG
fig, ax = plt.subplots(figsize=(10, 6))
raw.plot_psd(picks=eeg_indices, fmax=100, average=True, spatial_colors=False, ax=ax)
ax.set_title('EEG PSD')
plt.show()

# Power Spectral Density for MEG
# Separate plots for magnetometers and gradiometers
mag_indices = mne.pick_types(raw.info, meg='mag')
grad_indices = mne.pick_types(raw.info, meg='grad')

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 10))
raw.plot_psd(picks=mag_indices, fmax=100, average=True, spatial_colors=False, ax=ax1)
ax1.set_title('MEG Magnetometers PSD')

raw.plot_psd(picks=grad_indices, fmax=100, average=True, spatial_colors=False, ax=ax2)
ax2.set_title('MEG Gradiometers PSD')
plt.tight_layout()
plt.show()


# Events and Annotations
events = mne.find_events(raw, stim_channel='STI101')
print(f'Found {len(events)} events.')
print(events)
print('Annotations:')
print(raw.annotations)

# Measurement Info
print('Measurement Info:')
pprint(raw.info)

# Inter-subject data exploration

In [22]:
import os

def checkDataIntegrity(datasetPath):
    ### iterate through mode folders (train, test, val)
    for modeFolder in os.listdir(datasetPath):
        if ".zip" in modeFolder or modeFolder[0] == ".":
            continue
        assert os.path.isdir(os.path.join(datasetPath,modeFolder)), f"Dataset folder contains unexpected file: {modeFolder}"
        assert modeFolder == "train" or modeFolder == "val" or modeFolder == "test", f"Dataset folder contains unexpected folder: {modeFolder} (expected train, test and val)"
        ### iterate through subject folders in dataset folder
        for subjectFolder in os.listdir(os.path.join(datasetPath, modeFolder)):
            assert os.path.isdir(os.path.join(datasetPath,modeFolder,subjectFolder)), f"Dataset folder contains unexpected file: {modeFolder}/{subjectFolder}"
            assert len(os.listdir(os.path.join(datasetPath,modeFolder,subjectFolder)))>0, f"Subject folder {modeFolder}/{subjectFolder} unexpectedly empty"
            ### iterate through run files in subject folders
            for file in os.listdir(os.path.join(datasetPath,modeFolder,subjectFolder)):
                assert ".txt" in file or ".fif" in file, f"Unexpected file {file} in folder {modeFolder}/{subjectFolder}"


datasetPath = "../data/openfmri/"
checkDataIntegrity(datasetPath)

In [None]:
modes = ['train','test','val']
subject_dict = {mode: {} for mode in modes}

for mode in modes:
    for subjectFolder in os.listdir(os.path.join(datasetPath, mode)):
        subject_dict[mode][subjectFolder] = []
        for file in os.listdir(os.path.join(datasetPath, mode, subjectFolder)):
            if ".fif" in file:
                subject_dict[mode][subjectFolder].append(file)

train_subject_dict = subject_dict['train']
val_subject_dict = subject_dict['val']
test_subject_dict = subject_dict['test']

print(subject_dict)

In [None]:
import mne
import matplotlib.pyplot as plt

# Initialize dictionaries to store the number of frames and sampling rates per participant and per run
frames_count = {mode: {} for mode in modes}
sampling_rates = {mode: {} for mode in modes}

# Iterate over each mode and participant to calculate the number of frames and track sampling rates
for mode, participants in subject_dict.items():
    for participant, runs in participants.items():
        frames_count[mode][participant] = []
        sampling_rates[mode][participant] = []
        for run in runs:
            fif_file = os.path.join(datasetPath, mode, participant, run)
            raw = mne.io.read_raw_fif(fif_file, preload=True)
            frames_count[mode][participant].append(raw.n_times)
            sampling_rates[mode][participant].append(raw.info['sfreq'])


In [None]:
# Flatten the sampling_rates dictionary
flat_sampling_rates = [rate for mode in sampling_rates.values() for participant in mode.values() for rate in participant]

# Assert that all sampling rates are the same
assert all(rate == flat_sampling_rates[0] for rate in flat_sampling_rates), "Not all sampling rates are the same"
print(f"All sampling rates are consistent at {flat_sampling_rates[0]:.0f}Hz.")

In [None]:

# Plot the number of frames per participant
for mode, participants in frames_count.items():
    plt.figure(figsize=(10, 5))
    for participant, frames in participants.items():
        plt.plot(frames, label=participant)
    plt.title(f'Number of Frames per Run for {mode.capitalize()} Mode')
    plt.xlabel('Run Index')
    plt.ylabel('Number of Frames')
    plt.legend(loc='upper right')
    plt.show()
