# Control repeat cross-correlation analysis (spectral envelope and HFB) after regressing out hand annotations

In [1]:
import numpy as np
import pympi
from scipy.stats import ttest_ind
from wilcoxon import wilcoxon
from utils import zscore, cross_correlate

In [8]:
# load elan annotations
D = pympi.Elan.Eaf('../data/hand_presence_annotation_fragments.eaf')
tiers = ['hands_speech', 'hands_nonspeech', 'speech', 'nonspeech']
assert all([i in D.get_tier_names() for i in tiers])

A = {}
for itier in tiers:
    temp = np.array(D.get_annotation_data_for_tier(itier)).astype(np.int32).T
    A[itier] = {}
    A[itier]['start'], A[itier]['end'], A[itier]['text'] = temp

# at 1000 Hz
for itier in tiers:
    s = np.argsort(A[itier]['start'])
    for k, v in A[itier].items():
        A[itier][k] = v[s]

down10 = lambda x: np.round(x/10.).astype(np.int32)
speech = down10(np.stack([A['speech']['start'], A['speech']['end']]).T)
nonspeech = down10(np.stack([A['nonspeech']['start'], A['nonspeech']['end']]).T)
hand_speech = down10(np.stack([A['hands_speech']['start'], A['hands_speech']['end']]).T)
hand_nonspeech = down10(np.stack([A['hands_nonspeech']['start'], A['hands_nonspeech']['end']]).T)
hand_speech_vec = A['hands_speech']['text'].T
hand_nonspeech_vec = A['hands_nonspeech']['text'].T

for i, ispe in enumerate(speech):
    if (ispe[1] - ispe[0]) > 400:
        ispe[1] = ispe[0] + 400
for i, inon in enumerate(nonspeech):
    if (inon[1] - inon[0]) > 400:
        inon[1] = inon[0] + 400

In [9]:
# 2 x 2: speech presence/absence and hand presence/absence
def binarize1(x):
    x[x>0] = 1
    return x

# 2 x 2: speech presence/absence and hand movement/hand absence
def binarize2(x):
    x[x==1] = 0
    x[x>0] = 1
    return x

# 2 x 3: speech presence/absence and hand movement/presence/absence
def split_three(x):
    x[x>2] = 2
    return x

In [15]:
# construct hand regressor 
def construct_hand_regressor(fragments, hand_time, hand_values):
    hand_reg = []
    for fragment in range(fragments.shape[0]):
        ind = (hand_time[:, 0]>= fragments[fragment, 0]) & (hand_time[:, 1]<= fragments[fragment, 1]) |\
            (hand_time[:, 0]< fragments[fragment, 1]) & (hand_time[:, 1] >= fragments[fragment, 1]) |\
            (hand_time[:, 1]> fragments[fragment, 0]) & (hand_time[:, 0] <= fragments[fragment, 0])

        #print(fragments[fragment])
        #print(hand_time[ind])
        #print(hand_values[ind])

        st, en = fragments[fragment]
        #print(hand_time[ind].shape[0])
        if hand_time[ind].shape[0] == 1:
            hand_reg.append(np.tile(hand_values[ind], [en-st]))
        elif hand_time[ind].shape[0] > 1:
            temp = np.zeros(400, dtype=np.int32)
            for i in range(hand_time[ind].shape[0]):
                st1, en1 = hand_time[ind][i] - st
                temp[st1 if st1 >=0 else 0 : en1 if en1 <= 400 else 400] = hand_values[ind][i]
            hand_reg.append(temp)
        #print(hand_reg[-1])
    return np.array(hand_reg)

In [11]:
# regress out hand presence from ecog
def regress_z(x, z):
    c = np.linalg.lstsq(np.c_[z, np.ones(z.shape[0])], x)[0]
    r = x - np.c_[z, np.ones(z.shape[0])].dot(c)
    return r.astype(np.float32)

In [12]:
subj = 's1'
x    = np.load('../data/' + subj + '_HD_100Hz_hfb.npy')
pm   = np.load('../results/ttest_ecog_speech_nonspeech_'+subj+'_pmask.npz')['pmask_bonf'] # pmask from the first ttest
grid = np.load('../data/' + subj + '_HD_grid.npy')
n    = x.shape[-1]

In [16]:
# t-test HFB speech vs nonspeech
hand_speech_vec = split_three(A['hands_speech']['text'].T.copy())
hand_nonspeech_vec = split_three(A['hands_nonspeech']['text'].T.copy())
hand_speech_reg = construct_hand_regressor(speech, hand_speech, hand_speech_vec)
hand_non_reg = construct_hand_regressor(nonspeech, hand_nonspeech, hand_nonspeech_vec)

# regress out hand from HFB
d_spe, d_non = [], []
for spe_frag, non_frag in zip(speech, nonspeech):
        d_spe.append(x[range(spe_frag[0], spe_frag[1])])
        d_non.append(x[range(non_frag[0], non_frag[1])])

d_spe, d_non = map(np.array, [d_spe, d_non])

d_all_c = np.concatenate([d_spe, d_non]).reshape((-1, n))
hand_all_c = np.concatenate([hand_speech_reg, hand_non_reg]).flatten()

d_all_rs = regress_z(d_all_c, hand_all_c)

d_all_rs = d_all_rs.reshape((-1, 400, n))
d_all_rs_m = np.mean(d_all_rs, 1)

# t-test to compare mean HFB in speech and non-speech fragments after regressing hand
ts_reg, ps_reg = np.zeros(d_all_rs_m.shape[-1]), np.zeros(d_all_rs_m.shape[-1])
for i in range(d_all_rs_m.shape[-1]):
    ts_reg[i], ps_reg[i] = ttest_ind(d_all_rs_m[:115, i], d_all_rs_m[115:, i])
print(ts_reg[np.where(ps_reg<.05/n)[0]])

# original T-test to compare mean HFB in speech and non-speech fragments
d_spe, d_non = [], []
for spe_frag, non_frag in zip(speech, nonspeech):
        d_spe.append(np.mean(x[range(spe_frag[0], spe_frag[1])], 0))
        d_non.append(np.mean(x[range(non_frag[0], non_frag[1])], 0))

d_spe, d_non = map(np.array, [d_spe, d_non])

ts, ps = np.zeros(x.shape[-1]), np.zeros(x.shape[-1])
for i in range(x.shape[-1]):
    ts[i], ps[i] = ttest_ind(d_spe[:, i], d_non[:, i])
print(ts[np.where(ps<.05/n)[0]])

# Wilcoxon for testing difference in t-statistics (speech vs nonspeech in HFB) with regressing hand vs not regressing hand
wilx = wilcoxon(zscore(ts[pm])-zscore(ts_reg[pm]), alternative='two-sided', zero_method='zsplit')
print(wilx.w_statistic, wilx.z_statistic, wilx.pvalue)

  This is separate from the ipykernel package so we can avoid doing imports until


[16.42408618 11.80228885 -5.00481672 26.58881036 12.77547079 26.32934601
 14.39202491 14.13282602  3.74210763 17.52181339 10.05418485  3.81229241
  7.89557077 14.37351416  6.14536781  4.55402917  6.82191685  3.96497827]
[16.34582962 11.72382423 -4.85197143 26.62437594 12.78006145 26.43590348
 14.33138275 14.14851255  3.78837978 17.55815366 10.06085852  3.78531154
  7.90169985 14.42951345  6.18705523  4.59146867  6.85102048  4.0257282 ]
96.0 -0.3359940268259508 0.7368753708069717


In [18]:
# t-test correlation to audio envelope in speech vs nonspeech 

a = np.load('../data/audio_envelope_100Hz.npy')
hand_speech_vec = binarize1(A['hands_speech']['text'].T.copy())
hand_nonspeech_vec = binarize1(A['hands_nonspeech']['text'].T.copy())
hand_speech_reg = construct_hand_regressor(speech, hand_speech, hand_speech_vec)
hand_non_reg = construct_hand_regressor(nonspeech, hand_nonspeech, hand_nonspeech_vec)

# regress out hand from HFB
d_spe, d_non = [], []
for spe_frag, non_frag in zip(speech, nonspeech):
        d_spe.append(x[range(spe_frag[0], spe_frag[1])])
        d_non.append(x[range(non_frag[0], non_frag[1])])

d_spe, d_non = map(np.array, [d_spe, d_non])

d_all_c = np.concatenate([d_spe, d_non]).reshape((-1, n))
hand_all_c = np.concatenate([hand_speech_reg, hand_non_reg]).flatten()

d_all_rs = regress_z(d_all_c, hand_all_c)

d_all_rs = d_all_rs.reshape((-1, 400, n))
d_spe_rs = d_all_rs[:115]
d_non_rs = d_all_rs[115:]

# correlation to audio envelope: after regressing hand
r_spe, r_non = [], []
for ifrag, (spe_frag, non_frag) in enumerate(zip(speech, nonspeech)):
    r_spe.append([])
    r_non.append([])
    for i in range(x.shape[-1]):
        r_spe[-1].append(cross_correlate(d_spe_rs[ifrag, :, i], a[range(spe_frag[0], spe_frag[1])], type='spearman'))
        r_non[-1].append(cross_correlate(d_non_rs[ifrag, :, i], a[range(non_frag[0], non_frag[1])], type='spearman'))

r_spe, r_non = np.array(r_spe), np.array(r_non)

# take max within [-100:500] ms per fragment
r_spe_m_reg = np.max(r_spe[:,:,390:-349], 2)
r_non_m_reg = np.max(r_non[:,:,390:-349], 2)

# fisher transform
r_spe_mf = np.arctanh(r_spe_m_reg)
r_non_mf = np.arctanh(r_non_m_reg)

# t-test on spe vs non after regressing hand
ts_reg, ps_reg = [], []
for i, j in zip(r_spe_mf.T, r_non_mf.T):
    t, p = ttest_ind(i, j)
    ts_reg.append(t)
    ps_reg.append(p)
ts_reg, ps_reg = np.array(ts_reg), np.array(ps_reg)

# original correlation
r_spe, r_non = [], []
for spe_frag, non_frag in zip(speech, nonspeech):
    r_spe.append([])
    r_non.append([])
    for i in range(x.shape[-1]):
        r_spe[-1].append(cross_correlate(x[spe_frag[0]:spe_frag[1], i], a[spe_frag[0]:spe_frag[1]], type='spearman'))
        r_non[-1].append(cross_correlate(x[non_frag[0]:non_frag[1], i], a[non_frag[0]:non_frag[1]], type='spearman'))

r_spe, r_non = np.array(r_spe), np.array(r_non)

r_spe_m = np.max(r_spe[:,:,390:-349], 2)
r_non_m = np.max(r_non[:,:,390:-349], 2)

# fisher transform
r_spe_mf = np.arctanh(r_spe_m)
r_non_mf = np.arctanh(r_non_m)

# t-test on original correlations
ts, ps = [], []
for i, j in zip(r_spe_mf.T, r_non_mf.T):
    t, p = ttest_ind(i, j)
    ts.append(t)
    ps.append(p)
ts, ps = np.array(ts), np.array(ps)
ts_masked = ts.copy()

# select only significant electrodes from the original analysis
ts_masked[np.setdiff1d(range(n), pm)] = 0
thresh_01 = 1e-2/pm.shape[0]
ts_masked[ps >= thresh_01] = 0
ts_reg_masked = ts_reg.copy()
ts_reg_masked[np.setdiff1d(range(n), pm)] = 0
ts_reg_masked[ps >= thresh_01] = 0

# Wilcoxon for testing difference in t-statistics (correlation to audio envelope in speech vs nonspeech) with regressing hand vs not regressing hand
wilx = wilcoxon(zscore(ts_masked[ts_masked>0])-zscore(ts_reg_masked[ts_masked>0]), alternative='two-sided', zero_method='zsplit')
print(wilx.w_statistic, wilx.z_statistic, wilx.pvalue)

  This is separate from the ipykernel package so we can avoid doing imports until


22.0 -0.05923488777590923 0.9527650219907529


