Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

limit buffer for mongo backend #349

Merged
merged 1 commit into from
Nov 13, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 32 additions & 0 deletions strax/storage/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@
from datetime import datetime
from pytz import utc as py_utc
from warnings import warn
from sys import getsizeof
export, __all__ = strax.exporter()

# Some data is stored in the buffer. Delete when either of these values
# are exceeded
DEFAULT_MONGO_BACKEND_BUFFER_MB = 200
DEFAULT_MONGO_BACKEND_BUFFER_NRUNS = 5


@export
class MongoBackend(StorageBackend):
Expand All @@ -32,7 +38,12 @@ def __init__(self, uri, database, col_name=None):
self.client = MongoClient(uri)
self.db = self.client[database]
self.col_name = col_name

# Attributes for the chunks-buffer
self.chunks_registry = {}
self._buffered_backend_keys = []
self._buff_mb = DEFAULT_MONGO_BACKEND_BUFFER_MB
self._buff_nruns = DEFAULT_MONGO_BACKEND_BUFFER_NRUNS

def _read_chunk(self, backend_key, chunk_info, dtype, compressor):
"""See strax.Backend"""
Expand Down Expand Up @@ -115,6 +126,27 @@ def _build_chunk_registry(self, backend_key):
# issues or json-encoding headaches.
self.chunks_registry[backend_key + str(chunk_key)] = doc.copy()

# Some bookkeeping to make sure we don't buffer too much in this
# backend. We still need to return at least one hence the 'and'.
# See: https://github.com/AxFoundation/strax/issues/346
if backend_key not in self._buffered_backend_keys:
self._buffered_backend_keys.append(backend_key)
while ((getsizeof(self.chunks_registry) / 1e6 > self._buff_mb
and len(self._buffered_backend_keys) > 1)
or len(self._buffered_backend_keys) > self._buff_nruns):
self._clean_first_key_from_registry()

def _clean_first_key_from_registry(self):
"""
Remove the first item in the self.buffered_keys and all the
associated keys in the self.chunks_registry to limit RAM-usage
"""
# only clean the first entry from the list
to_clean = self._buffered_backend_keys[0]
for registry_key in list(self.chunks_registry.keys()):
if to_clean in registry_key:
del self.chunks_registry[registry_key]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python's complained at me if I modified a dictionary while iterating over it. List-casting the keys probably helps with this, but I got around it by storing the keys to delete in a second list.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you really have a good memory for errors, got to give you that. This is true.

However if you look carefully we are making a copy of the keys by iterating them when converting to a list. Therefore you are not iterating over the keys but over the list of strings that are extracted

del self._buffered_backend_keys[0]

@export
class MongoFrontend(StorageFrontend):
Expand Down