Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Numpy caching of data in online monitor storage #698

Merged
merged 7 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 18 additions & 20 deletions strax/storage/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _read_chunk(self, backend_key, chunk_info, dtype, compressor):
# the key is not in the registry (will fail below if also not
# there on rebuild).
if registry_key not in self.chunks_registry.keys():
self._build_chunk_registry(backend_key)
self._build_chunk_registry(backend_key, dtype)

# Unpack info about this chunk from the query. Return empty if
# not available. Use a *string* in the registry to lookup the
Expand All @@ -69,18 +69,7 @@ def _read_chunk(self, backend_key, chunk_info, dtype, compressor):
raise ValueError(
f'Metadata claims chunk{chunk_i} exists but it is unknown to '
f'the chunks_registry')

chunk_doc = doc.get('data', None)
if chunk_doc is None:
raise ValueError(f'Doc for chunk_{chunk_i} in wrong format:\n{doc}')

# Convert JSON to numpy
chunk_len = len(chunk_doc)
result = np.zeros(chunk_len, dtype=dtype)
for i in range(chunk_len):
for key in np.dtype(dtype).names:
result[i][key] = chunk_doc[i][key]
return result
return doc

def _saver(self, key, metadata, **kwargs):
"""See strax.Backend"""
Expand All @@ -103,7 +92,7 @@ def _get_metadata(self, key, **kwargs):
return doc['metadata']
raise strax.DataNotAvailable

def _build_chunk_registry(self, backend_key):
def _build_chunk_registry(self, backend_key, dtype):
"""
Build chunk info in a single registry using only one query to
the database. This is much faster as one does not have to do
Expand All @@ -125,22 +114,31 @@ def _build_chunk_registry(self, backend_key):
chunk_key = doc.get('chunk_i', None)
if chunk_key is None:
# Should not happen because of the projection in find
# but let's double check:
# but let's double-check:
raise ValueError(
f'Projection failed, got doc with no "chunk_i":\n{doc}')
# Update our registry with this chunks info. Use chunk_i as
# chunk_key. Make it a *string* to avoid potential key-error
# issues or json-encoding headaches.
self.chunks_registry[backend_key + str(chunk_key)] = doc.copy()
# Convert JSON to numpy
chunk_len = len(doc.get('data', []))
result = np.zeros(chunk_len, dtype=dtype)
for i in range(chunk_len):
for key in np.dtype(dtype).names:
result[i][key] = doc['data'][i][key]
self.chunks_registry[backend_key + str(chunk_key)] = result
del doc

# Some bookkeeping to make sure we don't buffer too much in this
# backend. We still need to return at least one hence the 'and'.
# See: https://github.com/AxFoundation/strax/issues/346
if backend_key not in self._buffered_backend_keys:
self._buffered_backend_keys.append(backend_key)
while ((getsizeof(self.chunks_registry) / 1e6 > self._buff_mb
and len(self._buffered_backend_keys) > 1)
or len(self._buffered_backend_keys) > self._buff_nruns):
while (
(len(self._buffered_backend_keys) > 1 and
sum(ch.nbytes for ch in self.chunks_registry.values()) / 1e6 > self._buff_mb)
or len(self._buffered_backend_keys) > self._buff_nruns
):
self._clean_first_key_from_registry()

def _clean_first_key_from_registry(self):
Expand Down Expand Up @@ -324,7 +322,7 @@ def _close(self):
update = {f'metadata.{k}': v
for k, v in self.md.items()
if k in ('writing_ended', 'exception')}
# Also update all of the chunk-documents with the run_start_time
# Also update all the chunk-documents with the run_start_time
self.col.update_one({'_id': self.id_md}, {'$set': update})


Expand Down
30 changes: 30 additions & 0 deletions tests/test_mongo_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,36 @@ def _make_mongo_target(self):
self.st.make(self.test_run_id, self.mongo_target)
assert self.is_stored_in_mongo

def test_clear_cache(self, make_n_runs=10):
"""
Test that the caches in the mongo backend get cleared appropriately
:param make_n_runs: The number of runs to make
:return:
"""
# We need to know how large one run is. We'll make sure that we clear the cache if more
# data gets stored here than the content of one run
one_run = self.st.get_array(self.test_run_id, self.mongo_target)
assert self.is_stored_in_mongo

# Allow the frontend to cache a lof of runs, but only little data
mongo_backend = self.mongo_sf.backends[0]
mongo_backend._buff_mb = one_run.nbytes/1e6
mongo_backend._buff_nruns = make_n_runs

self.st.make(list(str(i) for i in range(make_n_runs)), self.mongo_target)

# We should have at most 1 run (assuming all are the same number of bytes)
assert len(mongo_backend._buffered_backend_keys) <= 2

# Allow the frontend to cache a lof of runs, but only little data
mongo_backend._buff_mb = 10 * one_run.nbytes / 1e6
mongo_backend._buff_nruns = 1

self.st.make(list(str(i) for i in range(make_n_runs)), self.mongo_target)

# We should have exactly one run cached
assert len(mongo_backend._buffered_backend_keys) == 1

@staticmethod
def _return_file_info(file: dict,
save_properties=('number',
Expand Down