###**cluster_2way**

Run hierarchical models on iEEG timeseries data with one within-channel fixed effect (2 levels) and one between-channel fixed effect (2 levels), with the random effects of subject and channel nested in subject. Perform cluster-based corrections for multiple comparisons across timepoints, preserving the random effects structure. The outputs are F-statistics with uncorrected and cluster-corrected p-values.  

Copyright (c) 2025  
EL Johnson, PhD

###Import modules:

In [None]:
import numpy as np
import pandas as pd
from statsmodels.formula.api import mixedlm
import warnings
import pickle
from pathlib import Path
import gdown

# custom modules
from cluster_utils import load_ieeg_data, create_df, cluster_test

###Download sample data file:

Contains the following variables:  
- sid = subject ID
- ch = channel label
- hit_miss = hit (1) or miss (0) within-channel variable
- region = brain region (phrc or hc) between-channel variable
- data = timeseries data with one row per channel per level for the within-channel variable (338 rows x 139 timepoints)

In [None]:
# download data from Google Drive
output_path = 'lme_data_2way.mat'
fid = '1AYk6KAdVkZbDSJkTKwI4XJCmdEofD9rV'
url = f'https://drive.google.com/uc?id={fid}'
gdown.download(url, output_path, quiet = False)
data_path = Path(output_path)

# load data
data_dict = load_ieeg_data(data_path)

# create df
df = create_df(data_dict)

print(f'\nDataFrame: {df.shape[0]} rows × {df.shape[1]} columns')
print('\nFirst few rows:')
print(df.head())

###Define function to run the models:

In [None]:
def run_lme_2way(df, verbose = True):
  """
  Run LME models across all timepoints.

  Parameters
  ----------
  df : pd.DataFrame
    DataFrame from create_df()
  verbose : bool, optional
    If True, print progress for each timepoint. Default is True

  Returns
  -------
  dict
    Dictionary with F-statistics and p-values for each effect:
    - 'hit_miss': {'F': array, 'p': array}
    - 'region': {'F': array, 'p': array}
    - 'int': {'F': array, 'p': array}
  """
  warnings.filterwarnings('ignore')

  # initialize dictionary for storing model outputs
  n_time = df.shape[1] - 4  # exclude metadata columns

  lme = {
    'hit_miss': {
        'F': np.full(n_time, np.nan),
        'p': np.full(n_time, np.nan)
        },
    'region': {
        'F': np.full(n_time, np.nan),
        'p': np.full(n_time, np.nan)
        },
    'int': {
        'F': np.full(n_time, np.nan),
        'p': np.full(n_time, np.nan)
        }
  }

  # get timepoint column names
  time_cols = [col for col in df.columns if col.startswith('t')]

  # loop through timepoints
  for t in range(n_time):

    if verbose:
      print(f'Running model on datapoint {t+1}/{n_time}...')

    # create table for model
    tmp_dat = df[['sid', 'channel', 'hit_miss', 'region', time_cols[t]]].copy()
    tmp_dat.rename(columns = {time_cols[t]: 'data'}, inplace = True)

    # create sid:channel variable for nested random effect
    tmp_dat['sid_ch'] = tmp_dat['sid'].astype(str) + ':' + tmp_dat['channel'].astype(str)

    # run model: data ~ hit_miss * region + (1|sid) + (1|sid:channel)
    model = mixedlm(
        formula = 'data ~ hit_miss * region',
        data = tmp_dat,
        groups = tmp_dat['sid'],
        re_formula = '1',
        vc_formula = {'sid_ch': '0 + C(sid_ch)'}
        )
    result = model.fit(method = 'lbfgs', reml = True)

    # get ANOVA table with F-stats
    params = result.params
    cov = result.cov_params()

    # create contrast matrices for each effect (excluding intercept)
    hit_miss = np.zeros((1, len(params)))
    hit_miss[0, 1] = 1
    region = np.zeros((1, len(params)))
    region[0, 2] = 1
    interact = np.zeros((1, len(params)))
    interact[0, 3] = 1

    # run F-tests
    f_hit_miss = result.f_test(hit_miss)
    f_region = result.f_test(region)
    f_int = result.f_test(interact)

    # extract model outputs
    lme['hit_miss']['F'][t] = f_hit_miss.fvalue
    lme['hit_miss']['p'][t] = f_hit_miss.pvalue
    lme['region']['F'][t] = f_region.fvalue
    lme['region']['p'][t] = f_region.pvalue
    lme['int']['F'][t] = f_int.fvalue
    lme['int']['p'][t] = f_int.pvalue

  return lme

###Run model per timepoint on observed data:

In [None]:
lme = run_lme_2way(df)

###Define function to create null distributions for cluster testing:

In [None]:
def create_null_dist_2way(df, lme, nperm = 1000):
  """
  Create null distributions for cluster testing via permutation with region effects.

  Lower nperm for prototyping (multiple of 10). HPC recommended for implementation.

  Parameters
  ----------
  df : pd.DataFrame
    DataFrame from create_df() containing EEG data with columns:
    'sid', 'channel', 'hit_miss', 'region', and timepoint columns starting with 't'
  lme : dict
    Dictionary from run_lme_models() with 'hit_miss', 'region', and 'int'
    subdictionaries, each containing 'F' arrays
  nperm : int, optional
    Number of permutations. Default is 1000

  Returns
  -------
  tuple of (array, array, array, array, array, array)
    Returns six arrays:
    - hit_miss_main: observed hit_miss F-statistics
    - region_main: observed region F-statistics
    - int_effect: observed interaction F-statistics
    - hit_miss_main_null: null distribution for hit_miss (n_timepoints, nperm)
    - region_main_null: null distribution for region (n_timepoints, nperm)
    - int_null: null distribution for interaction (n_timepoints, nperm)
  """

  # get model stats from observed data
  hit_miss_main = lme['hit_miss']['F']
  region_main = lme['region']['F']
  int_effect = lme['int']['F']

  # create channel-nested-in-subject IDs
  sid_ch = df['sid'].astype(str) + ':' + df['channel'].astype(str)
  uid = sid_ch.unique()  # unique IDs

  # get region labels
  regions = df['region'].unique()

  # initialize null distributions
  hit_miss_main_null = np.zeros((len(hit_miss_main), nperm))
  region_main_null = np.zeros((len(region_main), nperm))
  int_null = np.zeros((len(int_effect), nperm))

  # loop through permutations
  for p in range(nperm):
    print(f'Shuffling the data for permutation {p+1}/{nperm}...')

    # initialize shuffled labels
    hit_miss_shuff = np.zeros(len(df), dtype = int)
    region_shuff = np.empty(len(df), dtype = object)

    # calculate region proportions
    n_region1 = np.sum(df['region'] == regions[0]) / len(df['hit_miss'].unique())
    n_region2 = np.sum(df['region'] == regions[1]) / len(df['hit_miss'].unique())

    # loop through unique IDs
    for u in range(len(uid)):
      u_idx = np.where(sid_ch == uid[u])[0]

      # randomly shuffle within-channel variable labels
      if np.random.rand() > 0.5:  # coin flip
        hit_miss_shuff[u_idx[0]] = 1  # set 1st row of pair to condition 1 (hit)
      else:
        hit_miss_shuff[u_idx[1]] = 1  # set 2nd row of pair to condition 1 (hit)

      # randomly shuffle between-channel variable labels
      if np.random.rand() > (n_region1 / (n_region1 + n_region2)):
        region_shuff[u_idx[0]] = regions[0]  # set both rows to region 1
        region_shuff[u_idx[1]] = regions[0]
        n_region2 = n_region2 - 1
      else:
        region_shuff[u_idx[0]] = regions[1]  # set both rows to region 2
        region_shuff[u_idx[1]] = regions[1]
        n_region1 = n_region1 - 1

    # create shuffled df
    df_shuff = df.copy()
    df_shuff['hit_miss'] = hit_miss_shuff.astype(bool)  # convert to boolean
    df_shuff['region'] = region_shuff

    # run model per timepoint with shuffled labels
    lme_null = run_lme_2way(df_shuff, verbose=False)

    # extract model outputs
    hit_miss_main_null[:, p] = lme_null['hit_miss']['F']
    region_main_null[:, p] = lme_null['region']['F']
    int_null[:, p] = lme_null['int']['F']

  return hit_miss_main, region_main, int_effect, hit_miss_main_null, region_main_null, int_null

###Create null distributions for cluster testing:

In [None]:
hit_miss_main, region_main, int_effect, hit_miss_main_null, region_main_null, int_null = \
  create_null_dist_2way(df, lme, nperm = 10)  # lowered nperm for demo

###Run cluster tests:

In [None]:
# main effect of within-channel variable (hit/miss)
_, p, _ = cluster_test(hit_miss_main, hit_miss_main_null, tail = 1)
lme['hit_miss']['p_clust'] = p.flatten()

# main effect of between-channel variable (region)
_, p, _ = cluster_test(region_main, region_main_null, tail = 1)
lme['region']['p_clust'] = p.flatten()

# interaction (hit/miss*region)
_, p, _ = cluster_test(int_effect, int_null, tail = 1)
lme['int']['p_clust'] = p.flatten()

###Save:

In [None]:
with open('lme_clust_2way.pkl', 'wb') as f:
  pickle.dump(lme, f)

To load from Colab Files panel:  
```
with open('lme_clust_2way.pkl', 'rb') as f:  
  lme = pickle.load(f)
```

To download from Colab:  
```
from google.colab import files  
files.download('lme_clust_2way.pkl')
```