Skip to content

Commit

Permalink
Numpy caching of data in online monitor storage (#698)
Browse files Browse the repository at this point in the history
* test numpy caching of data in online monitor frontend

* add test for clearing the cache

* check size of the buffers

* tweaks and typos

* faster list comprehension

* Update mongo.py

* Update mongo.py

Co-authored-by: Joran Angevaare <j.angevaare@nikhef.nl>
  • Loading branch information
JoranAngevaare and Joran Angevaare committed Aug 10, 2022
1 parent 1562542 commit ef39e3f
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 20 deletions.
36 changes: 16 additions & 20 deletions strax/storage/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _read_chunk(self, backend_key, chunk_info, dtype, compressor):
# the key is not in the registry (will fail below if also not
# there on rebuild).
if registry_key not in self.chunks_registry.keys():
self._build_chunk_registry(backend_key)
self._build_chunk_registry(backend_key, dtype)

# Unpack info about this chunk from the query. Return empty if
# not available. Use a *string* in the registry to lookup the
Expand All @@ -69,18 +69,7 @@ def _read_chunk(self, backend_key, chunk_info, dtype, compressor):
raise ValueError(
f'Metadata claims chunk{chunk_i} exists but it is unknown to '
f'the chunks_registry')

chunk_doc = doc.get('data', None)
if chunk_doc is None:
raise ValueError(f'Doc for chunk_{chunk_i} in wrong format:\n{doc}')

# Convert JSON to numpy
chunk_len = len(chunk_doc)
result = np.zeros(chunk_len, dtype=dtype)
for i in range(chunk_len):
for key in np.dtype(dtype).names:
result[i][key] = chunk_doc[i][key]
return result
return doc

def _saver(self, key, metadata, **kwargs):
"""See strax.Backend"""
Expand All @@ -103,7 +92,7 @@ def _get_metadata(self, key, **kwargs):
return doc['metadata']
raise strax.DataNotAvailable

def _build_chunk_registry(self, backend_key):
def _build_chunk_registry(self, backend_key, dtype):
"""
Build chunk info in a single registry using only one query to
the database. This is much faster as one does not have to do
Expand All @@ -125,22 +114,29 @@ def _build_chunk_registry(self, backend_key):
chunk_key = doc.get('chunk_i', None)
if chunk_key is None:
# Should not happen because of the projection in find
# but let's double check:
# but let's double-check:
raise ValueError(
f'Projection failed, got doc with no "chunk_i":\n{doc}')
# Update our registry with this chunks info. Use chunk_i as
# chunk_key. Make it a *string* to avoid potential key-error
# issues or json-encoding headaches.
self.chunks_registry[backend_key + str(chunk_key)] = doc.copy()
chunk_len = len(doc.get('data', []))
result = np.zeros(chunk_len, dtype=dtype)
for key in np.dtype(dtype).names:
result[key] = [dd[key] for dd in doc['data']]
self.chunks_registry[backend_key + str(chunk_key)] = result
del doc

# Some bookkeeping to make sure we don't buffer too much in this
# backend. We still need to return at least one hence the 'and'.
# See: https://github.com/AxFoundation/strax/issues/346
if backend_key not in self._buffered_backend_keys:
self._buffered_backend_keys.append(backend_key)
while ((getsizeof(self.chunks_registry) / 1e6 > self._buff_mb
and len(self._buffered_backend_keys) > 1)
or len(self._buffered_backend_keys) > self._buff_nruns):
while (
(len(self._buffered_backend_keys) > 1 and
sum(ch.nbytes for ch in self.chunks_registry.values()) / 1e6 > self._buff_mb)
or len(self._buffered_backend_keys) > self._buff_nruns
):
self._clean_first_key_from_registry()

def _clean_first_key_from_registry(self):
Expand Down Expand Up @@ -324,7 +320,7 @@ def _close(self):
update = {f'metadata.{k}': v
for k, v in self.md.items()
if k in ('writing_ended', 'exception')}
# Also update all of the chunk-documents with the run_start_time
# Also update all the chunk-documents with the run_start_time
self.col.update_one({'_id': self.id_md}, {'$set': update})


Expand Down
30 changes: 30 additions & 0 deletions tests/test_mongo_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,36 @@ def _make_mongo_target(self):
self.st.make(self.test_run_id, self.mongo_target)
assert self.is_stored_in_mongo

def test_clear_cache(self, make_n_runs=10):
"""
Test that the caches in the mongo backend get cleared appropriately
:param make_n_runs: The number of runs to make
:return:
"""
# We need to know how large one run is. We'll make sure that we clear the cache if more
# data gets stored here than the content of one run
one_run = self.st.get_array(self.test_run_id, self.mongo_target)
assert self.is_stored_in_mongo

# Allow the frontend to cache a lof of runs, but only little data
mongo_backend = self.mongo_sf.backends[0]
mongo_backend._buff_mb = one_run.nbytes/1e6
mongo_backend._buff_nruns = make_n_runs

self.st.make(list(str(i) for i in range(make_n_runs)), self.mongo_target)

# We should have at most 1 run (assuming all are the same number of bytes)
assert len(mongo_backend._buffered_backend_keys) <= 2

# Allow the frontend to cache a lof of runs, but only little data
mongo_backend._buff_mb = 10 * one_run.nbytes / 1e6
mongo_backend._buff_nruns = 1

self.st.make(list(str(i) for i in range(make_n_runs)), self.mongo_target)

# We should have exactly one run cached
assert len(mongo_backend._buffered_backend_keys) == 1

@staticmethod
def _return_file_info(file: dict,
save_properties=('number',
Expand Down

0 comments on commit ef39e3f

Please sign in to comment.