### Configuration

In [1]:
import os
import numpy as np
import pandas as pd
import xarray as xr

import mne
import yasa
from scipy.stats import zscore
import plotnine as pn

from utils__helpers_macro import robust_zscore, welch_psd
import utils__config

In [2]:
os.chdir(utils__config.working_directory)
os.getcwd()

'Z:\\Layton\\Sleep_083023'

### Parameters

In [3]:
# fif_path = 'Data/S01_Feb02_micro_1024hz.fif'
# potato_path = 'Data/S01_Feb02_potatogram.csv'
# dict_path = 'Data/S01_dictionary.xlsx'
# psd_out_path = 'Results/S01_Feb02_micro_psd.svg'

# fif_path = 'Data/S05_Jul11_micro_1024hz.fif'
# potato_path = 'Data/S05_Jul11_potatogram.csv'
# dict_path = 'Data/S05_dictionary.xlsx'
# psd_out_path = 'Results/S05_Jul11_micro_psd.svg'

# fif_path = 'Data/S05_Jul12_micro_1024hz.fif'
# potato_path = 'Data/S05_Jul12_potatogram.csv'
# dict_path = 'Data/S05_dictionary.xlsx'
# psd_out_path = 'Results/S05_Jul12_micro_psd.svg'

fif_path = 'Data/S05_Jul13_micro_1024hz.fif'
potato_path = 'Data/S05_Jul13_potatogram.csv'
dict_path = 'Data/S05_dictionary.xlsx'
psd_out_path = 'Results/S05_Jul13_micro_psd.svg'

Please note that the number of samples must be whole-number divisible by (sampling_freq * epoch_length)

In [4]:
selected_regions = ['CLA', 'ACC', 'AMY']
sampling_freq = 1024 # Hz
epoch_length = 30 # seconds

### Epoch Data

In [5]:
# Load the micro data
raw = mne.io.read_raw_fif(fif_path, preload = True, verbose = False)

# Load the upsampled hypnogram
hypnogram = np.loadtxt(potato_path, delimiter = ',')

# Add hypnogram as a channel to the Raw object

# Hypnogram dictionary: 
# (-2) = Unassigned
# (-1) = Artifact
# (0) = Awake
# (1) = N1
# (2) = N2
# (3) = N3
# (4) = REM 

# Re-value sleep stages for incorporation into Epochs object
hypnogram[(hypnogram != 1) & (hypnogram != 2) & (hypnogram != 3)] = 0
hypnogram[(hypnogram == 1) | (hypnogram == 2) | (hypnogram == 3)] = 1

# Upsample to 1024 Hz from 256 Hz for compatibility
hypnogram = yasa.hypno_upsample_to_data(hypno = hypnogram,
                                        sf_hypno = 256,
                                        data = raw)

# Create raw object from the hypnogram
hypnogram = hypnogram[np.newaxis, :]

hypno_info = mne.create_info(ch_names = ['hypno'], 
                             sfreq = raw.info['sfreq'] / 4, 
                             ch_types = ['misc'])

hypno = mne.io.RawArray(data = hypnogram,
                        info = hypno_info,
                        first_samp = raw.first_samp)

raw.add_channels([hypno], force_update_info = True)



Creating RawArray with float64 data, n_channels=1, n_times=35686400
    Range : 0 ... 35686399 =      0.000 ... 139399.996 secs
Ready.


0,1
Measurement date,"July 12, 2023 01:35:18 GMT"
Experimenter,Unknown
Digitized points,0 points
Good channels,"64 sEEG, 1 misc"
Bad channels,
EOG channels,Not available
ECG channels,Not available
Sampling frequency,1024.00 Hz
Highpass,0.30 Hz
Lowpass,256.00 Hz


Create dummy stim data and an empty stim channel, then fill the channel with the data

In [6]:
# Record the first sample (which is not 0 since the Raw
# file was cropped from the original); you will need this
# to appropriately select the epoch sample number
start = raw.first_samp
step = sampling_freq * epoch_length
stop = raw.last_samp - step

epoch_stim = np.arange(start, stop, step)

# MNE Epochs expects a three column array where the second column
# is a dummy spacer with 0's and the third is an integer indicating
# the ID for the event. So we need to append these to our stim array.
dummy_row = np.zeros(len(epoch_stim))
event_row = np.ones(len(epoch_stim))

epoch_stim = np.vstack((epoch_stim, dummy_row, event_row)).transpose()

# Create a dummy numpy event array and MNE info object
# and use them to create an empty dummy Raw channel
events_info = mne.create_info(ch_names = ['epoch_stim'], 
                              sfreq = raw.info['sfreq'], 
                              ch_types = ['stim'])

empty_events = np.zeros((1, len(raw.times)))

events_channel = mne.io.RawArray(empty_events, events_info)

# Create an event dictionary
event_dictionary = {'epoch_start' : 1}

Creating RawArray with float64 data, n_channels=1, n_times=35686400
    Range : 0 ... 35686399 =      0.000 ... 34849.999 secs
Ready.


Epoch data using dummy stim data in the new channel

In [7]:
# Update the empty stim channel with the formatted epoch start times
raw.add_channels([events_channel], force_update_info = True)
raw.add_events(epoch_stim, 'epoch_stim')

# Find events and create epochs
events = mne.find_events(raw, 
                         stim_channel = 'epoch_stim', 
                         shortest_event = sampling_freq * epoch_length,
                         initial_event = True)

epochs = mne.Epochs(raw, 
                    preload = True, 
                    events = events, 
                    event_id = event_dictionary, 
                    baseline = None,
                    verbose = True,
                    tmin = 0, 
                    tmax = epoch_length)

# Drop the event channel before exporting data
epochs = epochs.drop_channels(['epoch_stim'])

1161 events found
Event IDs: [1]
Not setting metadata
1161 matching events found
No baseline correction applied
0 projection items activated
Using data from preloaded Raw for 1161 events and 30721 original time points ...
0 bad epochs dropped


Select epochs with more than 50% N2/3 sleep and save their sample number start times

In [8]:
# Get epoched hypnogram and get SWS% per epoch
hypochs = epochs.get_data(picks = ['hypno']).squeeze()
hypochs = hypochs.mean(axis = 1).transpose()
hypochs = pd.DataFrame(hypochs, columns = ['hypno_score'])

# Keep epochs with more than 50% of SWS
nopochs = pd.Series(hypochs[hypochs['hypno_score'] <= 0.50].index)
hypochs = pd.Series(hypochs[hypochs['hypno_score'] > 0.50].index)

# Remove hypno channel and get data
epochs = epochs.drop_channels(['hypno'])

# N2/3 sleep
nosleep = epochs[nopochs.tolist()]
nosleep = nosleep.get_data(units = 'uV').transpose(1, 0, 2).reshape(len(epochs.ch_names), -1)

# Non-N2/3 sleep
sleep = epochs[hypochs.tolist()]
sleep = sleep.get_data(units = 'uV').transpose(1, 0, 2).reshape(len(epochs.ch_names), -1)

### Welch PSD

In [9]:
# Welch PSD
tfr_sleep = welch_psd(data = sleep, chan_names = epochs.ch_names, sampling_freq = sampling_freq, freq_min = 0.3, freq_max = 200, n_jobs = 4)
tfr_nosleep = welch_psd(data = nosleep, chan_names = epochs.ch_names, sampling_freq = sampling_freq, freq_min = 0.3, freq_max = 200, n_jobs = 4)

# Average PSD across channels
#tfr_sleep = tfr_sleep.groupby('frequency').mean('log_power').reset_index()
#tfr_nosleep = tfr_nosleep.groupby('frequency').mean('log_power').reset_index()

# Add sleep stage column
tfr_sleep['stage'] = 'NREM'
tfr_nosleep['stage'] = 'W/REM'

# Merge sleep/nosleep tfr's
tfr = pd.concat([tfr_sleep, tfr_nosleep])

Effective window size : 0.250 (s)


[Parallel(n_jobs=4)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=4)]: Done   2 out of   4 | elapsed:   11.2s remaining:   11.2s


Effective window size : 0.250 (s)


[Parallel(n_jobs=4)]: Done   4 out of   4 | elapsed:   14.1s finished
[Parallel(n_jobs=4)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=4)]: Done   2 out of   4 | elapsed:   11.6s remaining:   11.6s
[Parallel(n_jobs=4)]: Done   4 out of   4 | elapsed:   14.7s finished


In [10]:
ch_dict = pd.read_excel(dict_path)

# Extract the channel numbers and create a new column 'number'
tfr['number'] = tfr['channel'].str.extract('(\d+)').astype(int)

# Merge with channel dictionary
tfr = pd.merge(tfr, ch_dict, on='number', how='left')

# Select only relevant micro regions
tfr = tfr[tfr['region'].isin(selected_regions)]

In [11]:
# Define a color mapping for regions
# region_colors = {
#     'CLA': '#E28DB8',
#     'ACC': '#A67A77',
#     'AMY': '#7BA387'
# }

# # Map the region colors to the channels
# tfr['facet_color'] = tfr['region'].map(region_colors)

In [12]:
# Static plot with Plotnine
p = (pn.ggplot(tfr)
 + pn.aes(x='frequency', y='log_power', color='stage')
 + pn.scale_fill_identity()  # Use actual color values in 'fill' column
 + pn.geom_line()
 + pn.facet_wrap('~ channel + region')
 + pn.scale_x_continuous(expand=(0, 0), limits=(1, 200))
 + pn.labs(x='Frequency (Hz)', y='Log Power (dB)')
 + pn.theme_classic()
 + pn.theme(figure_size=(20, 15), panel_border=pn.element_rect(color='black', size=1))
)

# Save the plot
p.save(filename=psd_out_path, dpi=300)

