# Save epoch setting

In [95]:
import os
import numpy as np
import scipy.io as sio

In [106]:
# who is running this notebook?

env = 'sungho_macbookpro'
# env = 'jisub'

# Issue a warning about the environment variable
print(
    f"Environment is set to '{env}'",
)


Environment is set to 'sungho_macbookpro'


In [116]:
# Set type
reject_trials = True  # True is reject minus trial else keep it original

# Set epoch period
Ts = 1000  # Sampling rate
epoch_setting = np.array([-300, 300]) / 1000  # in seconds
eset = np.abs(np.round(epoch_setting * Ts)).astype(int) # windows before and after the saccade

# Load data list

if env == 'jisub':
    path = r'C:\Users\Jisub\Desktop\Saccadic\left\data'
elif env == 'sungho_macbookpro':
    path = '/Users/shhong/Dropbox (Personal)/Data/Peng/left'

file_list = sorted(os.listdir(path))
file_list = sorted([f for f in file_list if f.endswith('.mat') and f.startswith('e2')])
file_list = sorted([f for f in file_list if 'e2012-11-22-16-58-54-078.CH6.final' not in f])
# del file_list[5]


In [117]:
to_flatlist = lambda lst: [
    (
        int(x[0])
        if isinstance(x, np.ndarray) and x.size == 1
        else int(x) if isinstance(x, np.integer) else x
    )
    for x in lst
]

total_epoch = []

for fname in file_list:
    data = sio.loadmat(os.path.join(path, fname), squeeze_me=True)
    print(fname)

    Saccade = data["Saccade"]
    T0 = data["T1"]
    T1 = T0.toarray()
    TargetX = data["TargetX"]
    TargetY = data["TargetY"]

    s_amp = [s["amplitude"] for s in Saccade]
    s_start = [s["start"] for s in Saccade]
    s_dur = [s["duration"] for s in Saccade]
    Time = T1.shape[1]

    # Find epoch time point by stimulus
    sti_epoch = []
    for x, y in zip(TargetX, TargetY):
        temp = np.abs(x) + np.abs(y)
        idx = np.argmax(temp > 1)
        sti_epoch.append(idx)

    # Find epoch time point by saccade, trial, amplitude, start, c_label, duration
    final_epoch = []
    for t_num in range(len(s_amp)):
        s_ampI = np.array(s_amp[t_num])
        temp_amp = np.zeros(np.size(s_ampI), dtype=bool)
        star_idx = s_start[t_num] < 500  # before onset
        s_ampI[star_idx] = 0

        # no saccade
        if s_ampI.size == 0:
            temp_amp = np.array([])
        else:
            # valid primary saccade
            if np.max(s_ampI) > 7:
                I = np.argmax(s_ampI)
                temp_amp[I] = True

        # mark 0 if no valid primary saccade
        if np.sum(temp_amp) == 0:
            c_label = 0
        else:
            c_label = 1

        if len(temp_amp) < 2:
            if temp_amp.size > 0:
                temp_start = s_start[t_num]
                temp_dur = s_dur[t_num]
            else:
                temp_start = np.array([])
                temp_dur = np.array([])
        else:
            temp_start = s_start[t_num][temp_amp]
            temp_dur = s_dur[t_num][temp_amp]

        final_epoch.append([t_num, temp_amp, temp_start, c_label, temp_dur])

    if len(sti_epoch) != len(final_epoch):
        print(
            f"{fname} has {len(sti_epoch)} sti_epoch and {len(final_epoch)} final_epoch....",
            f"Skipping {fname}",
        )
        continue

    final_epochtime = []
    for temp_epoch in final_epoch:

        stif_epoch = sti_epoch[t_num]

        # Skip trials with c_label == 0 (no valid primary saccade)
        if temp_epoch[3] == 0:
            continue

        temp_times = temp_epoch[2]
        temp_dur = temp_epoch[4]

        final_ep = []
        final_ep2 = np.array([stif_epoch - eset[0], stif_epoch + eset[1]])
        if np.size(temp_times) < 2:  # if there is only one primary saccade
            temp_priod = [
                temp_times - eset[0],
                temp_times + eset[1],
                temp_epoch[0],
                temp_dur,
            ]
            final_ep.append(temp_priod)
        else:  # if there are multiple primary saccades
            for len_epo in range(len(temp_times)):
                temp_priod = [
                    temp_times[len_epo] - eset[0],
                    temp_times[len_epo] + eset[1],
                    temp_epoch[0],
                    temp_dur,
                ]
                final_ep.append(temp_priod)

        final_epochtime.append(
            np.append(to_flatlist(final_ep[0]), to_flatlist(final_ep2))
        )

    final_epochtime = np.array(final_epochtime, dtype=object).astype(int)

    if reject_trials:
        valid = (final_epochtime[:, 0] > 0) & (final_epochtime[:, 1] <= Time)
        final_epochtime2 = final_epochtime[valid]
    else:
        final_epochtime2 = final_epochtime

    total_epoch.append(
        [
            final_epoch,
            final_epochtime2,
            len(final_epochtime2),
            len(final_epochtime) - len(final_epochtime2),
            fname,
            sti_epoch,
        ]
    )

total_epoch = np.array(total_epoch, dtype=object)

e2012-06-15-14-50-24-590.CH7.final-needflip.mat
e2012-06-15-15-37-05-311.CH7.final-needflip.mat
e2012-11-08-16-44-54-465.CH6.final.mat
e2012-11-20-16-09-57-633.CH6.final.mat
e2012-11-20-17-46-42-156.CH6.final.mat
e2012-11-28-16-38-36-062.CH6.final.mat
e2012-11-29-17-44-35-815.CH6.final.mat
e2012-12-13-16-48-37-742.CH7.final.mat
e2012-12-20-15-20-21-316.CH7.final.mat
e2013-01-07-17-58-21-600.CH6.final.mat
e2013-01-08-17-52-34-883.CH6.final.mat
e2013-01-09-15-58-13-835.CH6.final.mat
e2013-01-09-16-24-11-837.CH7.final.mat
e2013-01-11-14-49-40-680.CH7.final.mat
e2013-01-11-15-00-49-402.CH7.final.mat
e2013-01-16-15-23-54-780.CH7.final.mat
e2013-01-17-14-37-17-495.CH7.final.mat
e2013-01-23-16-56-22-357.CH6.final.mat
e2013-03-19-16-02-58-357-.CH6.final.mat
e2013-07-17-15-59-13-217.CH6.final-needflip.mat
e2013-07-17-16-09-24-411.CH6.final-needflip.mat
e2013-07-23-16-20-32-578.CH6.final-needflip.mat
e2013-07-24-14-04-05-561.CH6.final-needflip.mat
e2013-07-24-14-44-30-105.CH6.final-needflip.mat


In [118]:
# Save the result
# os.chdir(r'C:\Users\Jisub\Desktop\Saccadic\python_code')
os.chdir(os.curdir)
np.save('epoch_setting.npy',total_epoch)