From 4d39a9217a92be243c8f8bdb7d62505da39b3d23 Mon Sep 17 00:00:00 2001 From: "Matt L. Laporte" Date: Wed, 7 Nov 2018 16:16:25 -0500 Subject: [PATCH] Slightly better method for recording; TimeSeries only needs to set a flag for when it has written to file, and record will see the flag and stop the respective stream. There is still an issue with this, since split_append will add the trailing samples from the chunk after writing to file; the correct duration will be in the file but may be sllightly offset in memory. Not sure how to fix this yet. --- wizardhat/acquire.py | 12 +++++++++--- wizardhat/buffers/buffers.py | 19 ++++++++----------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/wizardhat/acquire.py b/wizardhat/acquire.py index 06d8508..eaa1b28 100644 --- a/wizardhat/acquire.py +++ b/wizardhat/acquire.py @@ -131,6 +131,7 @@ def __init__(self, source_id=None, with_types=('',), dejitter=True, **kwargs) self._dejitter = dejitter + self._store_once = False self._threads = {} self._new_threads() if autostart: @@ -139,7 +140,9 @@ def __init__(self, source_id=None, with_types=('',), dejitter=True, @classmethod def record(cls, duration, **kwargs): """Collect data over a finite interval, then stop.""" - return cls(window=duration, store_once=True, **kwargs) + instance = cls(window=duration, **kwargs) + instance._store_once = True + return instance def start(self): """Start data streaming. @@ -175,6 +178,8 @@ def _receive(self, name): inlets = self._inlets try: while self._proceed: + if self._store_once and self.buffers[name].written.is_set(): + break samples, timestamps = inlets[name].pull_chunk(timeout=0.1) #print(name, samples, timestamps) if timestamps: @@ -190,8 +195,9 @@ def _receive(self, name): print("BGAPI streaming interrupted. Device disconnected?") finally: - # write any remaining samples in `self.buffers` to file - self.buffers[name].write_to_file() + if not self._store_once: + # write any remaining samples in `self.buffers` to file + self.buffers[name].write_to_file() def _new_threads(self, names=None): # break loop in `stream` to cause thread to return diff --git a/wizardhat/buffers/buffers.py b/wizardhat/buffers/buffers.py index 2d8e26c..d162a95 100644 --- a/wizardhat/buffers/buffers.py +++ b/wizardhat/buffers/buffers.py @@ -78,6 +78,7 @@ def __init__(self, metadata=None, filename=None, data_dir='./data', # thread control self._lock = threading.Lock() + self.written = threading.Event() self.event_hook = utils.EventHook() # file output preparations @@ -202,7 +203,7 @@ class TimeSeries(Buffer): """ def __init__(self, ch_names, n_samples=2560, sfreq=None, record=True, - channel_fmt='f8', store_once=False, **kwargs): + channel_fmt='f8', **kwargs): """Create a new `TimeSeries` object. Args: @@ -237,8 +238,6 @@ def __init__(self, ch_names, n_samples=2560, sfreq=None, record=True, raise ValueError("Number of formats must match number of channels") self._record = record - self._write = True - self._store_once = store_once # write remaining data to file on program exit (e.g. quit()) if record: atexit.register(self.write_to_file) @@ -317,6 +316,8 @@ def write_to_file(self, force=False): for row in self._data[max(0, self._count):]: line = ','.join(str(n) for n in row) f.write(line + '\n') + self.written.set() + print(self.get_timestamps(last_n=1)) self._count = self.n_samples def _split_append(self, new): @@ -324,14 +325,10 @@ def _split_append(self, new): # however, last chunk added may push out some unwritten samples # therefore split appends before and after write_to_file cutoff = self._count - if self._write: - self._append(new[:cutoff]) - if self._count == 0: - self.write_to_file() - if self._store_once: - self._write = False - else: - self._append(new[cutoff:]) + self._append(new[:cutoff]) + if self._count == 0: + self.write_to_file() + self._append(new[cutoff:]) def _append(self, new): with self._lock: