In [None]:
%matplotlib inline

Import the packages we will need for the solution.

In [None]:
import os
import numpy as np
import mne
import matplotlib.pyplot as plt

Load the sample

In [None]:
sample_data_folder = mne.datasets.sample.data_path()
sample_data_raw_file = os.path.join(sample_data_folder, 'MEG', 'sample',
                                    'sample_audvis_filt-0-40_raw.fif')
raw = mne.io.read_raw_fif(sample_data_raw_file)

## Preprocessing

Note that the data we are loading is filtered from 0.1 to 40 Hz so we do not need to filter it (for now).


In [None]:
# set up and fit the ICA
ica = mne.preprocessing.ICA(n_components=20, random_state=97, max_iter=800)
ica.fit(raw)
ica.exclude = [1, 2]  # details on how we picked these are omitted here
ica.plot_properties(raw, picks=ica.exclude);

In [None]:
orig_raw = raw.copy()
raw.load_data()
ica.apply(raw)

# show some frontal channels to clearly illustrate the artifact removal
chs = ['MEG 0111', 'MEG 0121', 'MEG 0131', 'MEG 0211', 'MEG 0221', 'MEG 0231',
       'MEG 0311', 'MEG 0321', 'MEG 0331', 'MEG 1511', 'MEG 1521', 'MEG 1531',
       'EEG 001', 'EEG 002', 'EEG 003', 'EEG 004', 'EEG 005', 'EEG 006',
       'EEG 007', 'EEG 008']
chan_idxs = [raw.ch_names.index(ch) for ch in chs]


## Detecting experimental events

The sample dataset includes several :term:`"STIM" channels <stim channel>`
that recorded electrical
signals sent from the stimulus delivery computer (as brief DC shifts /
squarewave pulses). These pulses (often called "triggers") are used in this
dataset to mark experimental events: stimulus onset, stimulus type, and
participant response (button press). The individual STIM channels are
combined onto a single channel, in such a way that voltage
levels on that channel can be unambiguously decoded as a particular event
type. On older Neuromag systems (such as that used to record the sample data)
this summation channel was called ``STI 014``, so we can pass that channel
name to the :func:`mne.find_events` function to recover the timing and
identity of the stimulus events.



In [None]:
events = mne.find_events(raw, stim_channel='STI 014')
print(events[:5])  # show the first 5

In [None]:
event_dict = {'auditory/left': 1, 'auditory/right': 2, 'visual/left': 3,
              'visual/right': 4, 'smiley': 5, 'buttonpress': 32}

For paradigms that are not event-related (e.g., analysis of resting-state
data), you can extract regularly spaced (possibly overlapping) spans of data
by creating events using :func:`mne.make_fixed_length_events` and then
proceeding with epoching as described in the next section.



## Epoching continuous data

The :class:`~mne.io.Raw` object and the events array are the bare minimum
needed to create an :class:`~mne.Epochs` object, which we create with the
:class:`~mne.Epochs` class constructor. Here we'll also specify some data
quality constraints: we'll reject any epoch where peak-to-peak signal
amplitude is beyond reasonable limits for that channel type. This is done
with a *rejection dictionary*; you may include or omit thresholds for any of
the channel types present in your data. The values given here are reasonable
for this particular dataset, but may need to be adapted for different
hardware or recording conditions. For a more automated approach, consider
using the `autoreject package`_.



In [None]:
reject_criteria = dict(mag=4000e-15,     # 4000 fT
                       grad=4000e-13,    # 4000 fT/cm
                       eeg=150e-6,       # 150 µV
                       eog=250e-6)       # 250 µV

We'll also pass the event dictionary as the ``event_id`` parameter (so we can
work with easy-to-pool event labels instead of the integer event IDs), and
specify ``tmin`` and ``tmax`` (the time relative to each event at which to
start and end each epoch). As mentioned above, by default
:class:`~mne.io.Raw` and :class:`~mne.Epochs` data aren't loaded into memory
(they're accessed from disk only when needed), but here we'll force loading
into memory using the ``preload=True`` parameter so that we can see the
results of the rejection criteria being applied:



In [None]:
epochs = mne.Epochs(raw, events, event_id=event_dict, tmin=-0.2, tmax=0.5,
                    reject=reject_criteria, preload=True)

Since we are only interested in the EEG channels, we will select them. 
__Note__ the command works "in-place", i.e. it changes the object in memory.

In [None]:
epochs.pick_types(meg=False, eeg=True)

Next we'll pool across left/right stimulus presentations so we can compare
auditory versus visual responses. To avoid biasing our signals to the
left or right, we'll use :meth:`~mne.Epochs.equalize_event_counts` first to
randomly sample epochs from each condition to match the number of epochs
present in the condition with the fewest good epochs.



In [None]:
conds_we_care_about = ['auditory/left', 'auditory/right',
                       'visual/left', 'visual/right']
epochs.equalize_event_counts(conds_we_care_about)  # this operates in-place
aud_epochs = epochs['auditory']
vis_epochs = epochs['visual']

Like :class:`~mne.io.Raw` objects, :class:`~mne.Epochs` objects also have a
number of built-in plotting methods. One is :meth:`~mne.Epochs.plot_image`,
which shows each epoch as one row of an image map, with color representing
signal magnitude; the average evoked response and the sensor location are
shown below the image:



# Evoked plots

In [None]:
aud_left_evoked = aud_epochs['left'].average()
aud_right_evoked = aud_epochs['right'].average()

In [None]:
aud_left_evoked.plot_joint();
aud_right_evoked.plot_joint();

Now we make a "difference" object be combining the _left_ and _right_ stimuli 

In [None]:
diff_evoked = mne.combine_evoked([aud_left_evoked, -aud_right_evoked], weights='equal')

In [None]:
diff_evoked.plot_joint()

## Is the peak changed by a lower low pass setting, e.g. 30 Hz?

To test this we first need to find the peak for both and then compare it.
We can use the "get_peak" method for that. Remember that help and default values can be found with "?", e.g.: 

```aud_left_evoked.get_peak?```


In [None]:
aud_left_evoked.get_peak(return_amplitude=True)

In [None]:
aud_right_evoked.get_peak(return_amplitude=True)

Now to compare to the different low pass we can filter the epochs. 

__NOTE__ This can potentially lead to massive artifacts and should __NOT__ be done. But since it is just a quick excerise we will still do it.

In [None]:
epochs_30 = epochs.copy()  # make a copy to not overwrite the origianls 

In [None]:
epochs_30.filter(0, 30)

And just to show it, we can chain a lot of the commands together. It is better to be explicit with your own code.

In [None]:
epochs_30['auditory/left'].average().get_peak(return_amplitude=True)

In [None]:
epochs_30['auditory/right'].average().get_peak(return_amplitude=True)

The are excatly identical to the orignals, so it does not appear that the low pass changes the peak time. However the amplitute is not the same, __why__?

# Global field power

In [None]:
aud_left_data = aud_epochs['left'].get_data()
aud_right_data = aud_epochs['right'].get_data()

We have defined GFP as the standard deviation of the evoked signal over time, so if we take the ```np.std``` of the evoked data we get the GFP.

In [None]:
left_gfp = aud_left_evoked._data.std(axis=0)
right_gfp = aud_right_evoked._data.std(axis=0)

We will need to the times of the samples to make a useful plot, we can get those from any of the object with ```.times```, e.g. ```aud_left_evoked.times```

In [None]:
times = aud_left_evoked.times

Now to plot it

In [None]:
plt.plot(times, left_gfp, 'r', label='Left');  # the 'r' makes the line red
plt.plot(times, right_gfp, 'b', label='right');  # label is what goes in the legend
plt.grid();
plt.legend();

# Permutation test

In [None]:
from mne.stats import spatio_temporal_cluster_test
from mne.channels import find_ch_adjacency
from mpl_toolkits.axes_grid1 import make_axes_locatable

The permutation test functions in MNE-python does not work on epochs or evoked directly so we have to extract the data we want to use first. We will call the data tensor for __X__

In [None]:
X = [aud_epochs[condition].get_data() for condition in ['left', 'right']]  # as 3D matrix
X = [np.transpose(x, (0, 2, 1)) for x in X]  # transpose for clustering

In order to let the algorithm know which sensors are neighbors we first generate an adjacency matrix.

In [None]:
adjacency, ch_names = find_ch_adjacency(epochs.info, ch_type='eeg')

print(type(adjacency))  # it's a sparse matrix!

plt.imshow(adjacency.toarray(), cmap='binary', origin='lower',
           interpolation='nearest')
plt.xlabel('{} EEG channels'.format(len(ch_names)))
plt.ylabel('{} EEG channels '.format(len(ch_names)))
plt.grid();
plt.title('Between-sensor adjacency')

In [None]:
# set cluster threshold
# set family-wise p-value
p_accept = 0.01

cluster_stats = spatio_temporal_cluster_test(X,
                                             n_permutations=5000,
                                             threshold=None,
                                             tail=0,
                                             n_jobs=1,
                                             buffer_size=None,
                                             adjacency=adjacency)

In [None]:
T_obs, clusters, p_values, h_zero = cluster_stats
good_cluster_inds = np.where(p_values < p_accept)[0]

Here we are going to visualize the results. There is a lot going on in the code so do not worry about it.

In [None]:
# configure variables for visualization
colors = {"left": "crimson", "right": 'steelblue'}

# organize data for plotting
evokeds = {cond: epochs[cond].average() for cond in ['left', 'right']}

# loop over clusters
for i_clu, clu_idx in enumerate(good_cluster_inds):
    # unpack cluster information, get unique indices
    time_inds, space_inds = np.squeeze(clusters[clu_idx])
    ch_inds = np.unique(space_inds)
    time_inds = np.unique(time_inds)

    # get topography for F stat
    f_map = T_obs[time_inds, ...].mean(axis=0)

    # get signals at the sensors contributing to the cluster
    sig_times = epochs.times[time_inds]

    # create spatial mask
    mask = np.zeros((f_map.shape[0], 1), dtype=bool)
    mask[ch_inds, :] = True

    # initialize figure
    fig, ax_topo = plt.subplots(1, 1, figsize=(10, 3))

    # plot average test statistic and mark significant sensors
    f_evoked = mne.EvokedArray(f_map[:, np.newaxis], epochs.info, tmin=0)
    f_evoked.plot_topomap(times=0, mask=mask, axes=ax_topo, cmap='Reds',
                          vmin=np.min, vmax=np.max, show=False,
                          colorbar=False, mask_params=dict(markersize=10))
    image = ax_topo.images[0]

    # create additional axes (for ERF and colorbar)
    divider = make_axes_locatable(ax_topo)

    # add axes for colorbar
    ax_colorbar = divider.append_axes('right', size='5%', pad=0.05)
    plt.colorbar(image, cax=ax_colorbar)
    ax_topo.set_xlabel(
        'Averaged F-map ({:0.3f} - {:0.3f} s)'.format(*sig_times[[0, -1]]))

    # add new axis for time courses and plot time courses
    ax_signals = divider.append_axes('right', size='300%', pad=1.2)
    title = 'Cluster #{0}, {1} sensor'.format(i_clu + 1, len(ch_inds))
    if len(ch_inds) > 1:
        title += "s (mean)"
    mne.viz.plot_compare_evokeds(evokeds, title=title, picks=ch_inds, axes=ax_signals,
                         colors=colors, show=False,
                         split_legend=True, truncate_yaxis='auto')

    # plot temporal cluster extent
    ymin, ymax = ax_signals.get_ylim()
    ax_signals.fill_betweenx((ymin, ymax), sig_times[0], sig_times[-1],
                             color='orange', alpha=0.3)

    # clean up viz
    mne.viz.tight_layout(fig=fig)
    fig.subplots_adjust(bottom=.05)
    plt.show()