Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slightly better method for recording; TimeSeries only needs to set a … #2

Open
wants to merge 3 commits into
base: feature-markers
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions wizardhat/acquire.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def __init__(self, source_id=None, with_types=('',), dejitter=True,
**kwargs)

self._dejitter = dejitter
self._store_once = False
self._threads = {}
self._new_threads()
if autostart:
Expand All @@ -139,7 +140,9 @@ def __init__(self, source_id=None, with_types=('',), dejitter=True,
@classmethod
def record(cls, duration, **kwargs):
"""Collect data over a finite interval, then stop."""
return cls(window=duration, store_once=True, **kwargs)
instance = cls(window=duration, **kwargs)
instance._store_once = True
return instance

def start(self):
"""Start data streaming.
Expand Down Expand Up @@ -175,6 +178,8 @@ def _receive(self, name):
inlets = self._inlets
try:
while self._proceed:
if self._store_once and self.buffers[name].written.is_set():
break
samples, timestamps = inlets[name].pull_chunk(timeout=0.1)
#print(name, samples, timestamps)
if timestamps:
Expand All @@ -190,8 +195,9 @@ def _receive(self, name):
print("BGAPI streaming interrupted. Device disconnected?")

finally:
# write any remaining samples in `self.buffers` to file
self.buffers[name].write_to_file()
if not self._store_once:
# write any remaining samples in `self.buffers` to file
self.buffers[name].write_to_file()

def _new_threads(self, names=None):
# break loop in `stream` to cause thread to return
Expand Down
19 changes: 8 additions & 11 deletions wizardhat/buffers/buffers.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(self, metadata=None, filename=None, data_dir='./data',

# thread control
self._lock = threading.Lock()
self.written = threading.Event()
self.event_hook = utils.EventHook()

# file output preparations
Expand Down Expand Up @@ -202,7 +203,7 @@ class TimeSeries(Buffer):
"""

def __init__(self, ch_names, n_samples=2560, sfreq=None, record=True,
channel_fmt='f8', store_once=False, **kwargs):
channel_fmt='f8', **kwargs):
"""Create a new `TimeSeries` object.

Args:
Expand Down Expand Up @@ -237,8 +238,6 @@ def __init__(self, ch_names, n_samples=2560, sfreq=None, record=True,
raise ValueError("Number of formats must match number of channels")

self._record = record
self._write = True
self._store_once = store_once
# write remaining data to file on program exit (e.g. quit())
if record:
atexit.register(self.write_to_file)
Expand Down Expand Up @@ -317,21 +316,19 @@ def write_to_file(self, force=False):
for row in self._data[max(0, self._count):]:
line = ','.join(str(n) for n in row)
f.write(line + '\n')
self.written.set()
print(self.get_timestamps(last_n=1))
self._count = self.n_samples

def _split_append(self, new):
# write out each time array contains only unwritten samples
# however, last chunk added may push out some unwritten samples
# therefore split appends before and after write_to_file
cutoff = self._count
if self._write:
self._append(new[:cutoff])
if self._count == 0:
self.write_to_file()
if self._store_once:
self._write = False
else:
self._append(new[cutoff:])
self._append(new[:cutoff])
if self._count == 0:
self.write_to_file()
self._append(new[cutoff:])

def _append(self, new):
with self._lock:
Expand Down