## Imports

In [1]:
import pyxdf 
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from lmfit.models import Model
from os import listdir, getcwd
from os.path import isfile, join
from scipy import stats

## Function Definitions

Predicted pupil dilation, $d(Y)$, caused by luminance $Y$, is computed with the following equation: $𝑑(𝑌) = 𝑎 · 𝑒^{−𝑏·𝑌} + c$

In [39]:
def pupil_func(x, a, b, c):
    return a * np.exp(-b * x) + c

## Load Data

Load the data from the xdf file for a single participant.

In [40]:
streams, header = pyxdf.load_xdf('./Path_Data/ID_112.xdf')

stream_df = {}
for stream in streams:
    stream_name = stream['info']['name'][0]
    stream_channels = {channel['label'][0]: i for i, channel in enumerate(stream['info']['desc'][0]['channels'][0]['channel'])}
    stream_data = stream['time_series']
    data_dict = {key: np.array(stream_data)[:, index] for key, index in stream_channels.items()}
    data_dict['time'] = np.round(np.array(stream['time_stamps']), decimals=4)
    stream_df[stream_name] = pd.DataFrame(data_dict).drop_duplicates(subset=['time']).reset_index(drop=True)


In [41]:
accom_time = pd.to_timedelta(0.5, unit='s')

pupil = stream_df['GazeStream'].loc[(stream_df['GazeStream']['LeftEyeIsBlinking'] == 0) & (stream_df['GazeStream']['RightEyeIsBlinking'] == 0) & (stream_df['GazeStream']['LeftPupilDiameter'] > 0) & (stream_df['GazeStream']['RightPupilDiameter'] > 0), ['time', 'MethodID', 'ModelID', 'LeftPupilDiameter', 'RightPupilDiameter']]
pupil['time'] = pd.to_timedelta(pupil['time'], unit='s')

lum = stream_df['LuminanceStream'].loc[:, ['time', 'MethodID', 'ModelID', 'Luminance']]
lum['time'] = pd.to_timedelta(lum['time'], unit='s')

# Intersection of time stamps
pupil_lum_time_intersection = np.intersect1d(pupil['time'], lum['time'])

# Filter pupil and luminance data by intersection
pupil = pupil[pupil['time'].isin(pupil_lum_time_intersection)].reset_index(drop=True)
lum = lum[lum['time'].isin(pupil_lum_time_intersection)].reset_index(drop=True)

# Combined DataFrame for pupil and luminance
pupil_lum_df = pd.DataFrame({
    'time': pd.to_timedelta(lum['time'], unit='s'),
    'luminance': lum['Luminance'],
    'pupilDiameter': 0.5 * (pupil['LeftPupilDiameter'] + pupil['RightPupilDiameter']),
    'methodID': pupil['MethodID'],
    'modelID': pupil['ModelID']
}).resample('0.1s', on='time').mean()

pupil_lum_df['time'] = pupil_lum_df.index

In [42]:
calibration_events = stream_df['ExperimentStream'].loc[(stream_df['ExperimentStream']['EventType'] == 'CalibrationColorChange') | (stream_df['ExperimentStream']['SceneEvent'] == 'Calibration') | (stream_df['ExperimentStream']['SceneEvent'] == 'CalibrationComplete'), ['time','SceneEvent', 'EventType']]
calibration_events['time'] = pd.to_timedelta(calibration_events['time'], unit='s')
c_start_times = calibration_events[:8]['time']
c_end_times = calibration_events[1:]['time']
c_start_times.reset_index(drop=True, inplace=True)
c_end_times.reset_index(drop=True, inplace=True)

calib_data = {}
for i in range(8):
    calib_data[i] = pupil_lum_df.loc[(pupil_lum_df['time'] >= c_start_times[i]) & (pupil_lum_df['time'] <= c_end_times[i]), ['time','luminance', 'pupilDiameter']]
    calib_data[i]['time'] -= calib_data[i]['time'].iloc[0]
    calib_data[i] = calib_data[i].loc[(calib_data[i]['time'] >= accom_time), ['luminance', 'pupilDiameter']]

calibration_data = pd.concat(calib_data).groupby(level=0).mean().sort_values(by=['luminance']).reset_index(drop=True)

In [43]:
x_data = calibration_data['luminance']
y_data = calibration_data['pupilDiameter']

exp_mod = Model(pupil_func)
params = exp_mod.make_params(a=1, b=4, c=0)

result = exp_mod.fit(y_data, params, x=x_data)
a = result.params['a'].value
b = result.params['b'].value
c = result.params['c'].value

# plt.plot(x_data, y_data, 'o')
# plt.plot(x_data, result.init_fit, '--', label='initial fit')
# plt.plot(x_data, result.best_fit, '-', label='best fit')
# plt.legend()
# plt.show()

In [46]:
navigation_events = stream_df['ExperimentStream'].loc[(stream_df['ExperimentStream']['SceneEvent'] == 'NavigationComplete') | (stream_df['ExperimentStream']['SceneEvent'] == 'Navigation_Trial'), ['time','SceneEvent', 'EventType', 'ModelID', 'MethodID']]
navigation_events['time'] = pd.to_timedelta(navigation_events['time'], unit='s')
nav_start_times = navigation_events.loc[navigation_events['SceneEvent'] == 'Navigation_Trial', 'time']
nav_end_times = navigation_events.loc[navigation_events['SceneEvent'] == 'NavigationComplete', 'time']

nav_start_times.reset_index(drop=True, inplace=True)
nav_end_times.reset_index(drop=True, inplace=True)

#check time difference between each start time
if len(nav_start_times) > 8:
    nav_diff = nav_start_times.diff().dt.total_seconds()
    nav_start_times = nav_start_times.loc[(nav_diff.isnull()) | (nav_diff > 3)]

nav_start_times.reset_index(drop=True, inplace=True)
nav_end_times.reset_index(drop=True, inplace=True)

nav_data = {}
for i in range(8):
    nav_data[i] = pupil_lum_df.loc[(pupil_lum_df['time']>=nav_start_times.loc[i]) & (pupil_lum_df['time']<=nav_end_times.loc[i]), ['time', 'methodID', 'modelID', 'luminance', 'pupilDiameter']]
    nav_data[i].set_index('time', inplace=True, drop=False)

navigation_data = pd.concat(nav_data, names=['trial'])
navigation_data = navigation_data.groupby(level=0).resample('0.5s', on='time', ).mean()
navigation_data['plr'] = pupil_func(navigation_data['luminance'], a, b, c)
navigation_data['tepr'] = navigation_data['pupilDiameter'] - navigation_data['plr']


0   3 days 22:44:03.972200
1   3 days 22:45:36.378700
2   3 days 22:46:54.391600
3   3 days 22:48:26.961600
4   3 days 22:50:58.102500
5   3 days 22:52:49.317400
6   3 days 22:54:06.476600
7   3 days 22:55:32.295600
Name: time, dtype: timedelta64[ns]
0         NaN
1     92.4065
2     78.0129
3     92.5700
4    151.1409
5      1.7708
6    109.4441
7     77.1592
8     85.8190
Name: time, dtype: float64


In [None]:
# navigation_events = dfs['ExperimentStream'].loc[(dfs['ExperimentStream']['SceneEvent'] == 'NavigationComplete') | (dfs['ExperimentStream']['SceneEvent'] == 'Navigation_Trial'), ['time','SceneEvent', 'EventType', 'ModelID', 'MethodID']]
# navigation_events.reset_index(drop=True, inplace=True)
# navigation_events['time'] = pd.to_timedelta(navigation_events['time'], unit='s')
# start_times = navigation_events.loc[navigation_events['SceneEvent'] == 'Navigation_Trial', 'time']
# start_times.reset_index(drop=True, inplace=True)
# end_times = navigation_events.loc[navigation_events['SceneEvent'] == 'NavigationComplete', 'time']
# end_times.reset_index(drop=True, inplace=True)

# nav_data = {}
# for i in range(8):
#     nav_data[i] = pup_lum_df.loc[(pup_lum_df['time']>=start_times.loc[i]) & (pup_lum_df['time']<=end_times.loc[i]), ['time', 'methodID', 'modelID', 'luminance', 'pupilDiameter']]
#     nav_data[i].set_index('time', inplace=True, drop=False)

# navigation_data = pd.concat(nav_data, names=['trial'])
# navigation_data = navigation_data.groupby(level=0).resample('0.5s', on='time', ).mean()
# navigation_data['plr'] = pupil_func(navigation_data['luminance'], a, b, c)
# navigation_data['tepr'] = navigation_data['pupilDiameter'] - navigation_data['plr']

# navigation_avg = navigation_data.groupby(level=0).mean()
# navigation_avg.drop(columns=['luminance'], inplace=True)

# models = navigation_avg.reset_index(drop=True)
# model_avg = models.groupby(['modelID']).mean()
# model_avg.drop(columns=['methodID'], inplace=True)

# methods = navigation_avg.reset_index(drop=True)
# method_avg = methods.groupby(['methodID']).mean()
# method_avg.drop(columns=['modelID'], inplace=True)


In [None]:
# idx = 1

# for i in range(8):
#     nav_data[i] = nav_data[i].resample('0.5s', origin='start').mean()

# lum_nav = nav_data[idx]['Luminance']
# pupil_nav = nav_data[idx]['pupilDiameter']

# plr = pd.DataFrame(pupil_func(lum_nav, a, b, c))
# plr['time'] = plr.index
# plr.rename(columns={"Luminance": "pupilDiameter"}, inplace=True)
# plr.reset_index(drop=True, inplace=True)

# predicted_plr = pd.DataFrame(columns=['pupilDiameter'])
# for t in range(len(plr)):
#     if t==0:
#         predicted_plr.loc[t, "pupilDiameter"] = plr.loc[t,'pupilDiameter']
#     else:
#         predicted_plr.loc[t, "pupilDiameter"] = plr.loc[t-1,'pupilDiameter']
# predicted_plr['time'] = plr['time']
# predicted_plr.set_index('time', inplace=True)

# plot_dif = pupil_nav - predicted_plr['pupilDiameter']

# x_data = pupil_nav.index

# plt.plot(x_data, predicted_plr['pupilDiameter'], '--', label='predicted')
# plt.plot(x_data, pupil_nav, '--', label='actual')
# plt.plot(x_data, plot_dif, '-', label='difference')
# plt.legend()
