# python code for results from Forster et al., 2024 (unpublished)

running title: "Prestimulus beta power encodes somatosensory stimulus expectations"

notebook loads functions for behavioral and EEG analysis and reproduces figure 1,2,3 and 4

figure 5 and 6 are based on Rscripts, which are used for regression and mediation analysis

results are published in Forster et al. (*in prep*)
___

    Author:  Carina Forster et al.
    Contact: forster@cbs.mpg.de
    Years:   2023

___

Make sure you are in the right environment: `expecon_3.9`

## Setup 

### Imports

In [None]:
# turn off warnings for a cleaner output
import warnings

warnings.filterwarnings("ignore")

In [None]:
%matplotlib inline

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# import packages
import random
from pathlib import Path

import matplotlib.pyplot as plt
import mne
import numpy as np
import pandas as pd
from scipy.stats import wilcoxon

# import functions for behavioral analysis
from expecon_ms.behav import figure1 as behav

# expecon_ms functions
from expecon_ms.configs import config, params, paths
from expecon_ms.eeg.preprocessing import ica

# Import functions from expecon_package for preproccesing eeg data
from expecon_ms.eeg.preprocessing import prepro as pp

# import functions for EEG analysis and visualization
from expecon_ms.eeg.sensor import evokeds as evo
from expecon_ms.eeg.sensor import tfr_contrasts as tfr

# import functions for source analysis
from expecon_ms.eeg.source import source_reco
from expecon_ms.eeg.source import plot_source_behav_correlation

### Set vars, paths, & constants

In [None]:
# Define the output for mne functions
mne.set_log_level("CRITICAL")

## Analyse 

### 1. Behavioral data analysis (Signal detection theory based)

In [None]:
# check the function arguments the docs
help(behav.plot_figure1_grid)

In [None]:
behav.plot_figure1_grid(expecon=1, exclude_high_fa=True)

### 2. Preprocessing EEG data

In [None]:
# function expects a raw object with .fif file ending
pp.prepro(
    study=2,
    trigger="stimulus",
    l_freq=1,
    h_freq=40,
    tmin=-1,
    tmax=1,
    resample_rate=250,
    sf=2500,
    detrend=1,
    ransac=1,
    autoreject=0,
)

# how many channels were interpolated?
pp.n_channels_interpolated(study=2, trigger="stimulus", l_freq=0.1)

# run ica on clean, epoched data
ica.run_ica(study=2, infomax=1, save_psd=1)
# correlate with EOG and ECG and mark bad componets for rejection

ica.label_ica_correlation(study=2)

# usa icalabel to mark components for rejection
# ica.label_iclabel(study=1)

#### ICA stats

In [None]:
# which study to run the analysis on
study = 2

In [None]:
# load the csv file that contains the number of components rejected
df_comp = pd.read_csv(
    Path("E:/expecon_ms/data/eeg/prepro_ica/clean_epochs_corr{study!s}/ica_components_stats_icacorr.csv")
)

# mean components rejected
print(f' on average {df_comp["0"].mean()} components were rejected')
print(f' the sdt of components rejected is {df_comp["0"].std()}')
print(f' the maximum of components rejected is {df_comp["0"].max()}')
print(f' the minimum of components rejected is {df_comp["0"].min()}')

### 3. Evoked potentials

In [None]:
# compare evokeds and plot contrasts
evokeds = evo.create_contrast(
    study=2, drop_bads=True, laplace=False, subtract_evoked=False, save_data_to_disk=False, save_drop_log=False
)

In [None]:
# plot evoked contrast and topography for the contrast
evo.plot_roi(study=2, data=evokeds, tmin=-0.1, tmax=0.3, tmin_base=-0.1, tmax_base=0)

### 4. Time-frequency analysis

In [None]:
# compute tfr representations for each condition
tfr.compute_tfr(
    study=2,
    cond="prev_resp",
    tmin=-0.4,
    tmax=0,
    fmax=35,
    fmin=3,
    laplace=False,
    induced=False,
    mirror=True,
    drop_bads=True,
)

#### stimulus probability contrast

In [None]:
# load the tfr data for each condition for probability conds.
tfr_a_cond, tfr_b_cond = tfr.load_tfr_conds(
    studies=[1, 2],
    cond="probability",
    cond_a_name="high_-0.7_0",
    cond_b_name="low_-0.7_0",
    cond_a_names=["high_prevhit_-0.7_0", "high_prevmiss_-0.7_0", "high_prevcr_-0.7_0"],
    cond_b_names=["low_prevhit_-0.7_0", "low_prevmiss_-0.7_0", "low_prevcr_-0.7_0"],
)

#### Qualitative checks for TFR (no stats yet)

In [None]:
# plot grand average per condition (no differences, Gabriel Curios comments, numbtouch symposium)

# study 1
high_study1 = np.array(tfr_a_cond[0])
low_study1 = np.array(tfr_b_cond[0])

# study 2
high_study2 = np.array(tfr_a_cond[1])
low_study2 = np.array(tfr_b_cond[1])

# study 1: prevhits
prevhit_highstudy1 = high_study1[:, 0]
prevhit_lowstudy1 = low_study1[:, 0]

# grand average over participants
# study 1
prevhit_highstudy1gra = mne.grand_average(list(prevhit_highstudy1))
prevhit_lowstudy1gra = mne.grand_average(list(prevhit_lowstudy1))

high_study2gra = mne.grand_average(list(high_study2))
low_study2gra = mne.grand_average(list(low_study2))

# plot grand average
# study 1
diff = mne.combine_evoked([prevhit_highstudy1gra, prevhit_lowstudy1gra], weights=[1, -1])
diff.copy().crop(-0.4, 0).apply_baseline((-0.4, 0), mode="zscore").plot(picks=["CP4"])

diff = mne.combine_evoked([high_study2gra, low_study2gra], weights=[1, -1])
diff.copy().crop(-0.4, 0).apply_baseline((-0.4, 0), mode="zscore").plot(picks=["CP4"])

In [None]:
# get the indices of frequencies higher than 7 and smaller than 13
freqs = tfr_a_cond[1][0].freqs
freqs = np.array(freqs)
# extract the indices of alpha frequencies (7-13 Hz)
alpha = np.where((freqs > 7) & (freqs < 13))
# extract the indices of beta frequencies (13-30 Hz)
beta = np.where((freqs > 14) & (freqs < 26))
# find index of channel CP4
idx = tfr_a_cond[1][0].ch_names.index("CP4")

# extract the data for alpha and beta frequencies and channel CP4 for each participant
# study 1
prevhit_highstudy1_alpha = np.array([h_.crop(-0.4, 0).data[idx, alpha] for h_ in np.array(tfr_a_cond[0])[:, 1]])
# now average over alpha frequencies
prevhit_highstudy1_alpha = np.mean(np.squeeze(prevhit_highstudy1_alpha), axis=1)

prevhit_lowstudy1_alpha = np.array([l_.crop(-0.4, 0).data[idx, alpha] for l_ in np.array(tfr_b_cond[0])[:, 1]])
# now average over alpha frequencies
prevhit_lowstudy1_alpha = np.mean(np.squeeze(prevhit_lowstudy1_alpha), axis=1)

# study 2
high_study2_alpha = np.array([h_.crop(-0.4, 0).data[idx, alpha] for h_ in np.array(tfr_a_cond[1])])
# now average over alpha frequencies
high_study2_alpha = np.mean(np.squeeze(high_study2_alpha), axis=1)

low_study2_alpha = np.array([l_.crop(-0.4, 0).data[idx, alpha] for l_ in np.array(tfr_b_cond[1])])
# now average over alpha frequencies
low_study2_alpha = np.mean(np.squeeze(low_study2_alpha), axis=1)

In [None]:
# now extract beta power and plot
# study 1
prevhit_highstudy1_beta = np.array([h_.crop(-0.4, 0).data[idx, beta] for h_ in np.array(tfr_a_cond[0])[:, 0]])
prevhit_highstudy1_beta = np.mean(np.squeeze(prevhit_highstudy1_beta), axis=1)

prevhit_lowstudy1_beta = np.array([l_.crop(-0.4, 0).data[idx, beta] for l_ in np.array(tfr_b_cond[0])[:, 0]])

prevhit_lowstudy1_beta = np.mean(np.squeeze(prevhit_lowstudy1_beta), axis=1)

# study 2
high_study2_beta = np.array([h_.crop(-0.4, 0).data[idx, beta] for h_ in np.array(tfr_a_cond[1])])
high_study2_beta = np.mean(np.squeeze(high_study2_beta), axis=1)

low_study2_beta = np.array([l_.crop(-0.4, 0).data[idx, beta] for l_ in np.array(tfr_b_cond[1])])
low_study2_beta = np.mean(np.squeeze(low_study2_beta), axis=1)

In [None]:
# now zscore the data over participants
# study 1
prevhit_highstudy1_alpha = (prevhit_highstudy1_alpha - np.mean(prevhit_highstudy1_alpha)) / np.std(
    prevhit_highstudy1_alpha
)
prevhit_lowstudy1_alpha = (prevhit_lowstudy1_alpha - np.mean(prevhit_lowstudy1_alpha)) / np.std(
    prevhit_lowstudy1_alpha
)

prevhit_highstudy1_beta = (prevhit_highstudy1_beta - np.mean(prevhit_highstudy1_beta)) / np.std(
    prevhit_highstudy1_beta
)
prevhit_lowstudy1_beta = (prevhit_lowstudy1_beta - np.mean(prevhit_lowstudy1_beta)) / np.std(prevhit_lowstudy1_beta)

# study 2
high_study2_alpha = (high_study2_alpha - np.mean(high_study2_alpha)) / np.std(high_study2_alpha)
low_study2_alpha = (low_study2_alpha - np.mean(low_study2_alpha)) / np.std(low_study2_alpha)

high_study2_beta = (high_study2_beta - np.mean(high_study2_beta)) / np.std(high_study2_beta)
low_study2_beta = (low_study2_beta - np.mean(low_study2_beta)) / np.std(low_study2_beta)

In [None]:
# Assuming you have the following variables from your previous code
time = tfr_a_cond[1][0].crop(-0.4, 0).times
uncorrected_alpha_level = params.alpha  # 0.05

# Number of comparisons (number of time points)
num_comparisons = len(time)

# Bonferroni-corrected significance level
alpha_level = uncorrected_alpha_level / num_comparisons

# Create a figure with two rows and two columns
fig, axes = plt.subplots(2, 2, figsize=(10, 8))

# Study 1 - Alpha power
data_high_alpha = np.mean(prevhit_highstudy1_alpha, axis=0)
data_low_alpha = np.mean(prevhit_lowstudy1_alpha, axis=0)
axes[0, 0].plot(time, data_high_alpha, label="high_probability", color="#ca0020ff")
axes[0, 0].plot(time, data_low_alpha, label="low_probability", color="#0571b0ff")
axes[0, 0].legend()
axes[0, 0].set_xlabel("time (ms)")
axes[0, 0].set_ylabel("alpha power (z-scored)")
axes[0, 0].set_title("Study 1")
axes[0, 0].axvline(x=0, color="red", linestyle="--", label="Stimulation Onset")

# Perform Wilcoxon signed-rank test for Study 1 - High vs Low Alpha
_, p_value_alpha_study1 = wilcoxon(prevhit_highstudy1_alpha.flatten(), prevhit_lowstudy1_alpha.flatten())

# Add significance marker if p-value is less than alpha
if p_value_alpha_study1 < alpha_level:
    axes[0, 0].text(0.5, 0.9, "*", color="red", fontsize=12, ha="center", va="center", transform=axes[0, 0].transAxes)

# Study 1 - Beta power
data_high_beta = np.mean(prevhit_highstudy1_beta, axis=0)
data_low_beta = np.mean(prevhit_lowstudy1_beta, axis=0)
axes[1, 0].plot(time, data_high_beta, label="high_probability", color="#ca0020ff")
axes[1, 0].plot(time, data_low_beta, label="low_probability", color="#0571b0ff")
axes[1, 0].legend()
axes[1, 0].set_xlabel("time (ms)")
axes[1, 0].set_ylabel("beta power (z-scored)")
axes[1, 0].axvline(x=0, color="red", linestyle="--", label="Stimulation Onset")

# Perform Wilcoxon signed-rank test for Study 1 - High vs Low Beta
_, p_value_beta_study1 = wilcoxon(prevhit_highstudy1_beta.flatten(), prevhit_lowstudy1_beta.flatten())

# Add significance marker if p-value is less than alpha
if p_value_beta_study1 < alpha_level:
    axes[1, 0].text(0.5, 0.9, "*", color="red", fontsize=12, ha="center", va="center", transform=axes[1, 0].transAxes)

# Study 2 - Alpha power
data_high_alpha_study2 = np.mean(high_study2_alpha, axis=0)
data_low_alpha_study2 = np.mean(low_study2_alpha, axis=0)
axes[0, 1].plot(time, data_high_alpha_study2, label="high_probability", color="#ca0020ff")
axes[0, 1].plot(time, data_low_alpha_study2, label="low_probability", color="#0571b0ff")
axes[0, 1].legend()
axes[0, 1].set_xlabel("time (ms)")
axes[0, 1].set_ylabel("alpha power (z-scored)")
axes[0, 1].set_title("Study 2")
axes[0, 1].axvline(x=0, color="red", linestyle="--", label="Stimulation Onset")

# Perform Wilcoxon signed-rank test for Study 2 - High vs Low Alpha
_, p_value_alpha_study2 = wilcoxon(high_study2_alpha.flatten(), low_study2_alpha.flatten())

# Add significance marker if p-value is less than alpha
if p_value_alpha_study2 < alpha_level:
    axes[0, 1].text(0.5, 0.9, "*", color="red", fontsize=12, ha="center", va="center", transform=axes[0, 1].transAxes)

# Study 2 - Beta power
data_high_beta_study2 = np.mean(high_study2_beta, axis=0)
data_low_beta_study2 = np.mean(low_study2_beta, axis=0)
axes[1, 1].plot(time, data_high_beta_study2, label="high_probability", color="#ca0020ff")
axes[1, 1].plot(time, data_low_beta_study2, label="low_probability", color="#0571b0ff")
axes[1, 1].legend()
axes[1, 1].set_xlabel("time (ms)")
axes[1, 1].set_ylabel("beta power (z-scored)")
# Add red line at stimulation onset
axes[1, 1].axvline(x=0, color="red", linestyle="--", label="Stimulation Onset")

# Perform Wilcoxon signed-rank test for Study 2 - High vs Low Beta
_, p_value_beta_study2 = wilcoxon(high_study2_beta.flatten(), low_study2_beta.flatten())

# Add significance marker if p-value is less than alpha
if p_value_beta_study2 < alpha_level:
    axes[1, 1].text(0.5, 0.9, "*", color="red", fontsize=12, ha="center", va="center", transform=axes[1, 1].transAxes)

# Adjust layout to prevent overlap
plt.tight_layout()

# Save the figure as SVG
plt.savefig(f"{save_dir_fig4_suppl}/suppl_fig_4.svg", format="svg", dpi=300)

# Save the figure as PNG
plt.savefig(f"{save_dir_fig4_suppl}/suppl_fig_4.png", format="png", dpi=300)

# Show the figure
plt.show()

In [None]:
study = 1  # expecon 2, single trial cues

# pick 10 random participants
random_ids = random.sample(range(0, len(tfr_a_cond[1])), 5)

# create figure with 3 rows and 5 columns
fig, axs = plt.subplots(3, 5, figsize=(15, 10))

# now fill the figure with the plots
for i, sid in enumerate(random_ids):
    # plot tfr for each condition
    tfr_a_cond[study][sid].copy().crop(-0.4, 0).plot(picks=["CP4"], axes=axs[0, i], show=False)
    tfr_b_cond[study][sid].crop(-0.4, 0).plot(picks=["CP4"], axes=axs[1, i], show=False)

    diff = tfr_a_cond[study][sid].copy().crop(-0.4, 0) - tfr_b_cond[study][sid].crop(-0.4, 0)
    diff.plot(picks=["CP4"], axes=axs[2, i], show=False)
    # get rid of y label for every plot expcept the first one on the left
    axs[0, i].set_ylabel("")
    axs[1, i].set_ylabel("")
    axs[2, i].set_ylabel("")
    # also remove x axis for each row except the last row
    axs[0, i].set_xlabel("")
    axs[1, i].set_xlabel("")
    axs[2, i].set_xlabel("")
    # set title for each plot
    axs[0, i].set_title(f"ID {sid}")

In [None]:
study = 0  # mini block design, study 1
conds = [0, 1, 2]  # prev hit, prev miss, prev cr

# pick 10 random participants
random_ids = random.sample(range(0, len(tfr_a_cond[0])), 5)

for c in conds:
    # create figure with 3 rows and 5 columns
    fig, axs = plt.subplots(3, 5, figsize=(15, 10))
    # now fill the figure with the plots
    for i, sid in enumerate(random_ids):
        # plot tfr for each condition
        tfr_a_cond[study][sid][c].copy().crop(-0.4, 0).plot(picks=["CP4"], axes=axs[0, i], show=False)
        tfr_b_cond[study][sid][c].crop(-0.4, 0).plot(picks=["CP4"], axes=axs[1, i], show=False)

        diff = tfr_a_cond[study][sid][c].copy().crop(-0.4, 0) - tfr_b_cond[study][sid][c].crop(-0.4, 0)
        diff.plot(picks=["CP4"], axes=axs[2, i], show=False)
        # get rid of y label for every plot expcept the first one on the left
        axs[0, i].set_ylabel("")
        axs[1, i].set_ylabel("")
        axs[2, i].set_ylabel("")
        # also remove x axis for each row except the last row
        axs[0, i].set_xlabel("")
        axs[1, i].set_xlabel("")
        axs[2, i].set_xlabel("")
        # set title for each plot
        axs[0, i].set_title(f"ID {sid}")

In [None]:
# run-cluster-based permutation tests for the conditions contrast
# and plot sign. cluster
tfr.plot_tfr_cluster_test_output(
    cond="probability",
    tfr_a_cond=tfr_a_cond,
    tfr_b_cond=tfr_b_cond,
    threed_test=False,
    cond_a_name="high",
    cond_b_name="low",
    channel_names=["CP4"],
)

previous response contrast

In [None]:
# load the tfr data for each condition for prev_resp conds.
tfr_a_cond, tfr_b_cond = tfr.load_tfr_conds(
    studies=[1, 2],
    cond="prev_resp",
    cond_a_name="prevyesresp_highprob_stim_-0.7_0",
    cond_b_name="prevnoresp_highprob_stim_-0.7_0",
    cond_a_names=["prevyesresp_samecue_lowprob_-0.7_0", "prevyesresp_samecue_highprob_-0.7_0"],
    cond_b_names=["prevnoresp_samecue_lowprob_-0.7_0", "prevnoresp_samecue_highprob_-0.7_0"],
)

In [None]:
# run cluster based permutation tests for the conditions contrasts
# and plot sign. cluster
tfr.plot_tfr_cluster_test_output(
    cond="prev_resp",
    tfr_a_cond=tfr_a_cond,
    tfr_b_cond=tfr_b_cond,
    threed_test=False,
    cond_a_name="prevyesresp",
    cond_b_name="prevnoresp",
    channel_names=["CP4"],
)

### 5. Source reconstruction

In [None]:
# run source reconstruction for each condition
source_reco.run_source_reco(
    study=1,
    cond="prev_resp",
    mirror=False,
    dics=True,
    fmin=15,
    fmax=25,
    tmin=-0.7,
    tmax=-0.1,
    drop_bads=True,
    plot_alignment=False,
)

In [None]:
# plot source contrast (grand average over all participants)
# opens plots in separate windows
source_reco.plot_grand_average_source_contrast(
    study=1, cond="probability", method="beamformer", save_plots=False
)

In [None]:
# run source localization for each epoch based on filter from contrast

In [None]:
source_reco.run_source_reco_per_trial(study=2, fmin=15, fmax=25, tmin=-0.7, tmax=-0.1, drop_bads=True)

In [None]:
# Figure 6 correlation between source power and behavioral outcomes

In [None]:
plot_source_behav_correlation.plot_correlation()