/
_parse.py
263 lines (232 loc) · 9.73 KB
/
_parse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# -*- coding: utf-8 -*-
"""File parsing functions
"""
import ast
from collections import OrderedDict
import csv
import json
import numpy as np
def read_tab_raw(fname, return_params=False):
"""Read .tab file from expyfun output without segmenting into trials.
Parameters
----------
fname : str
Input filename.
return_params : bool
If True, return the JSON-parsed comment header.
Returns
-------
data : list of tuple
The data with each line from the tab file being a tuple in a list.
Each tuple is of the form (``timestamp``, ``key``, ``value``).
params : dict
The JSON-parsed comment header. Only returned if
``return_params=True``.
See Also
--------
read_tab
"""
with open(fname, 'r') as f:
csvr = csv.reader(f, delimiter='\t')
lines = [c for c in csvr]
# first two lines are headers
assert len(lines[0]) == 1 and lines[0][0].startswith('# ')
if return_params:
line = lines[0][0][2:]
try:
params = json.loads(
line, object_pairs_hook=OrderedDict)
except json.decoder.JSONDecodeError: # old format
params = json.loads(
line.replace("'", '"'), object_pairs_hook=OrderedDict)
else:
params = None
assert lines[1] == ['timestamp', 'event', 'value']
lines = lines[2:]
times = [float(line[0]) for line in lines]
keys = [line[1] for line in lines]
vals = [line[2] for line in lines]
data = list(zip(times, keys, vals))
return (data, params) if return_params else data
def read_tab(fname, group_start='trial_id', group_end='trial_ok',
return_params=False, allow_last_missing=False):
"""Read .tab file from expyfun output and segment into trials.
Parameters
----------
fname : str
Input filename.
group_start : str
Key to use to start a trial/row.
group_end : str | None
Key to use to end a trial/row. If None, the next ``group_start``
will end the current group.
return_params : bool
If True, return the JSON-parsed comment header.
allow_last_missing : bool
If True, allow the last "trial_ok" data line to be missing.
This should only be needed for old/legacy expyfun files.
Returns
-------
data : list of dict
The data, with a dict for each trial. Each value in the dict
is a list of tuples (event, time) for each occurrence of that
key.
params : dict
The JSON-parsed comment header. Only returned if
``return_params=True``.
See Also
--------
read_tab_raw
"""
# load everything into memory for ease of use
out = read_tab_raw(fname, return_params=return_params)
lines = out[0] if return_params else out
# determine the event fields
header = list(set([line[1] for line in lines]))
header.sort()
if group_start not in header:
raise ValueError('group_start "{0}" not in header: {1}'
''.format(group_start, header))
if group_end == group_start:
raise ValueError('group_start cannot equal group_end, use '
'group_end=None')
header = [header.pop(header.index(group_start))] + header
b1s = np.where([line[1] == group_start for line in lines])[0]
if group_end is None:
b2s = np.concatenate((b1s[1:], [len(lines)]))
else: # group_end is not None
if group_end not in header:
raise ValueError('group_end "{0}" not in header ({1})'
''.format(group_end, header))
header.append(header.pop(header.index(group_end)))
b2s = np.where([line[1] == group_end for line in lines])[0]
if len(b1s) == len(b2s) + 1 and allow_last_missing:
# old expyfun would sometimes not write the last trial_ok :(
b2s = np.concatenate([b2s, [len(lines)]])
lines.append((lines[-1][0] + 0.1, group_end, 'None'))
if len(b1s) != len(b2s) or not np.all(b1s < b2s):
raise RuntimeError('bad bounds in {0}:\n{1}\n{2}'
.format(fname, b1s, b2s))
data = []
for b1, b2 in zip(b1s, b2s):
assert lines[b1][1] == group_start # prevent stupidity
if group_end is not None:
b2 = b2 + 1 # include the end
assert lines[b2 - 1][1] == group_end
d = dict()
these_times = [float(line[0]) for line in lines[b1:b2]]
these_keys = [line[1] for line in lines[b1:b2]]
these_vals = [line[2] for line in lines[b1:b2]]
for ki, key in enumerate(header):
idx = np.where(key == np.array(these_keys))[0]
d[key] = [(these_vals[ii], these_times[ii]) for ii in idx]
data.append(d)
return (data, out[1]) if return_params else data
def reconstruct_tracker(fname):
"""Reconstruct TrackerUD, TrackerBinom, TrackerMHW objects from .tab files.
Parameters
----------
fname : str
Input filename.
Returns
-------
tr : list of TrackerUD or TrackerBinom or TrackerMHW
The tracker objects with all responses such that they are in their
stopped state (as long as the trackers were allowed to stop during
the generation of the file.) If only one tracker is found in the file,
it will still be stored in a list and will be accessible as ``tr[0]``.
"""
from ..stimuli import TrackerUD, TrackerBinom, TrackerMHW
# read in raw data
raw = read_tab_raw(fname)
# find tracker_identify and make list of IDs
tracker_idx = np.where([r[1] == 'tracker_identify' for r in raw])[0]
if len(tracker_idx) == 0:
raise ValueError('There are no Trackers in this file.')
tr = []
used_dict_idx = [] # they can have repeat names!
used_stop_idx = []
for ii in tracker_idx:
tracker_id = ast.literal_eval(raw[ii][2])['tracker_id']
tracker_type = ast.literal_eval(raw[ii][2])['tracker_type']
# find tracker_ID_init lines and get dict
init_str = 'tracker_' + str(tracker_id) + '_init'
tracker_dict_idx = np.where([r[1] == init_str for r in raw])[0]
tracker_dict_idx = np.setdiff1d(tracker_dict_idx, used_dict_idx)
tracker_dict_idx = tracker_dict_idx[0]
used_dict_idx.append(tracker_dict_idx)
tracker_dict = json.loads(raw[tracker_dict_idx][2])
td = dict(TrackerUD=TrackerUD, TrackerBinom=TrackerBinom,
TrackerMHW=TrackerMHW)
tr.append(td[tracker_type](**tracker_dict))
tr[-1]._tracker_id = tracker_id # make sure tracker has original ID
stop_str = 'tracker_' + str(tracker_id) + '_stop'
tracker_stop_idx = np.where([r[1] == stop_str for r in raw])[0]
tracker_stop_idx = np.setdiff1d(tracker_stop_idx, used_stop_idx)
if len(tracker_stop_idx) == 0:
raise ValueError('Tracker {} has not stopped. All Trackers '
'must be stopped.'.format(tracker_id))
tracker_stop_idx = tracker_stop_idx[0]
used_stop_idx.append(tracker_stop_idx)
responses = json.loads(raw[tracker_stop_idx][2])['responses']
# feed in responses from tracker_ID_stop
for r in responses:
tr[-1].respond(r)
return tr
def reconstruct_dealer(fname):
"""Reconstruct TrackerDealer object from .tab files.
The ``reconstruct_tracker`` function will be called to retrieve the
trackers.
Parameters
----------
fname : str
Input filename.
Returns
-------
dealer : list of TrackerDealer
The TrackerDealer objects with all responses such that they are in
their stopped state. If only one dealer is found in the file, it will
still be stored in a list and will be assessible as ``td[0]``.
"""
from ..stimuli import TrackerDealer
raw = read_tab_raw(fname)
# find info on dealer
dealer_idx = np.where([r[1] == 'dealer_identify' for r in raw])[0]
if len(dealer_idx) == 0:
raise ValueError('There are no TrackerDealers in this file.')
dealer = []
for ii in dealer_idx:
dealer_id = ast.literal_eval(raw[ii][2])['dealer_id']
dealer_init_str = 'dealer_' + str(dealer_id) + '_init'
dealer_dict_idx = np.where([r[1] == dealer_init_str
for r in raw])[0][0]
dealer_dict = ast.literal_eval(raw[dealer_dict_idx][2])
dealer_trackers = dealer_dict['trackers']
# match up tracker objects to id
trackers = reconstruct_tracker(fname)
tr_objects = []
for t in dealer_trackers:
idx = np.where([t == t_id._tracker_id for t_id in trackers])[0][0]
tr_objects.append(trackers[idx])
# make the dealer object
max_lag = dealer_dict['max_lag']
pace_rule = dealer_dict['pace_rule']
dealer.append(TrackerDealer(None, tr_objects, max_lag, pace_rule))
# force input responses/log data
dealer_stop_str = 'dealer_' + str(dealer_id) + '_stop'
dealer_stop_idx = np.where([r[1] == dealer_stop_str for r in raw])[0]
if len(dealer_stop_idx) == 0:
raise ValueError('TrackerDealer {} has not stopped. All dealers '
'must be stopped.'.format(dealer_id))
dealer_stop_log = json.loads(raw[dealer_stop_idx[0]][2])
shape = tuple(dealer_dict['shape'])
log_response_history = dealer_stop_log['response_history']
log_x_history = dealer_stop_log['x_history']
log_tracker_history = dealer_stop_log['tracker_history']
dealer[-1]._shape = shape
dealer[-1]._trackers.shape = shape
dealer[-1]._response_history = log_response_history
dealer[-1]._x_history = log_x_history
dealer[-1]._tracker_history = log_tracker_history
dealer[-1]._stopped = True
return dealer