Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix online monitor bug for only md stored #596

Merged
merged 4 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 21 additions & 10 deletions strax/storage/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
@export
class MongoBackend(StorageBackend):
"""Mongo storage backend"""
def __init__(self, uri, database, col_name=None):
def __init__(self, uri: str, database: str, col_name: str):
"""
Backend for reading/writing data from Mongo
:param uri: Mongo url (with pw and username)
Expand Down Expand Up @@ -85,10 +85,10 @@ def _read_chunk(self, backend_key, chunk_info, dtype, compressor):
def _saver(self, key, metadata, **kwargs):
"""See strax.Backend"""
# Use the key to make a collection otherwise, use the backend-key
col = self.db[self.col_name if self.col_name is not None else str(key)]
jmosbacher marked this conversation as resolved.
Show resolved Hide resolved
col = self.db[self.col_name]
return MongoSaver(key, metadata, col, **kwargs)

def _get_metadata(self, key):
def _get_metadata(self, key, **kwargs):
"""See strax.Backend"""
query = backend_key_to_query(key)

Expand Down Expand Up @@ -160,7 +160,7 @@ def _clean_first_key_from_registry(self):
class MongoFrontend(StorageFrontend):
"""MongoDB storage frontend"""

def __init__(self, uri, database, col_name=None, *args, **kwargs):
def __init__(self, uri, database, col_name, *args, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No type hints?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me add some

"""
MongoFrontend for reading/writing data from Mongo
:param uri: Mongo url (with pw and username)
Expand All @@ -176,16 +176,24 @@ def __init__(self, uri, database, col_name=None, *args, **kwargs):
self.backends = [MongoBackend(uri, database, col_name=col_name)]
self.col_name = col_name

@property
def collection(self):
return self.db[self.col_name]

def _find(self, key, write, allow_incomplete, fuzzy_for,
fuzzy_for_options):
"""See strax.Frontend"""
if write:
return self.backends[0].__class__.__name__, str(key)
query = backend_key_to_query(str(key))
if self.db[self.col_name].count_documents(query):
self.log.debug(f"{key} is in cache.")
# Should have at least one non-metadata chunk, otherwise there
# is no data to load
query = {**backend_key_to_query(str(key)),
'provides_meta': False
}
if self.collection.count_documents(query):
self.log.debug(f"{key} is in database.")
Comment on lines -184 to +199
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternately the same query would work but we check that count_documents() returns at least 2

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this is true and actually what I did first (when I was trying to isolate the issue). Due to another bug (in my old version of the test) it did not seem to work. I could now change now, but I guess there is not much of a difference now.

return self.backends[0].__class__.__name__, str(key)
self.log.debug(f"{key} is NOT in cache.")
self.log.debug(f"{key} is NOT in database.")
jmosbacher marked this conversation as resolved.
Show resolved Hide resolved
raise strax.DataNotAvailable


Expand Down Expand Up @@ -313,8 +321,11 @@ def backend_key_to_query(backend_key):
if len(split_key) != 3:
raise ValueError(f'backend_key ({backend_key}) has too many "-"s,'
f' don\'t use "-" within run_ids')
n, d, l = split_key
return {'number': int(n), 'data_type': d, 'lineage_hash': l}
number, data_type, lineage = split_key
return {'number': int(number),
'data_type': data_type,
'lineage_hash': lineage,
}


def remove_np(dictin):
Expand Down
154 changes: 130 additions & 24 deletions tests/test_mongo_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
import strax
from strax.testutils import Records, Peaks
import os
import shutil
import tempfile
import pymongo
from warnings import warn
import logging
import time
_can_test = 'TEST_MONGO_URI' in os.environ


@unittest.skipIf(not _can_test, 'No test-database is configured')
class TestMongoFrontend(unittest.TestCase):
"""
Test the saving behavior of the context with the strax.MongoFrontend
Expand All @@ -20,15 +21,8 @@ class TestMongoFrontend(unittest.TestCase):
At the moment this is just an empty database but you can also use some free
ATLAS mongo server.
"""
run_test = True

def setUp(self):
# Just to make sure we are running some mongo server, see test-class docstring
if 'TEST_MONGO_URI' not in os.environ:
self.run_test = False
warn('Cannot connect to test-database')
return

self.test_run_id = '0'
self.all_targets = ('peaks', 'records')
self.mongo_target = 'peaks'
Expand All @@ -38,6 +32,7 @@ def setUp(self):
self.collection_name = 'temp-test-collection-mongosf'
client = pymongo.MongoClient(uri)
self.database = client[db_name]
self.collection.drop()
assert self.collection_name not in self.database.list_collection_names()

self.mongo_sf = strax.MongoFrontend(uri=uri,
Expand All @@ -50,30 +45,33 @@ def setUp(self):
storage=[self.mongo_sf],
use_per_run_defaults=True,
)
self.log = logging.getLogger(self.__class__.__name__)
assert not self.is_all_targets_stored

def tearDown(self):
if not self.run_test:
return
self.database[self.collection_name].drop()
self.collection.drop()

@property
def collection(self):
return self.database[self.collection_name]

@property
def is_all_targets_stored(self) -> bool:
"""This should always be False as one of the targets (records) is not stored in mongo"""
return all([self.st.is_stored(self.test_run_id, t) for t in self.all_targets])

def stored_in_context(self, context):
return context._is_stored_in_sf(self.test_run_id, self.mongo_target, self.mongo_sf)

@property
def is_stored_in_mongo(self) -> bool:
return self.st._is_stored_in_sf(self.test_run_id, self.mongo_target, self.mongo_sf)
return self.stored_in_context(self.st)

@property
def is_data_in_collection(self):
return self.database[self.collection_name].find_one() is not None
return self.collection.find_one() is not None

def test_write_and_load(self):
if not self.run_test:
return

# Shouldn't be any traces of data
assert not self.is_data_in_collection
assert not self.is_stored_in_mongo
Expand All @@ -100,8 +98,6 @@ def test_write_and_change_lineage(self):
Lineage changes should result in data not being available
and therefore the data should not be returned.
"""
if not self.run_test:
return
self._make_mongo_target()
assert self.is_stored_in_mongo

Expand All @@ -117,8 +113,6 @@ def test_clean_cache(self):
We keep a small cache in the backend of the last loaded data for
offloading the database, test that it works
"""
if not self.run_test:
return
self._make_mongo_target()
assert self.is_stored_in_mongo
mongo_backend = self.mongo_sf.backends[0]
Expand All @@ -135,16 +129,128 @@ def test_interrupt_iterator(self):
When we interrupt during the writing of data, make sure
we are not able to data that is only half computed
"""
if not self.run_test:
return
assert not self.is_stored_in_mongo
self.st.config['n_chunks'] = 2 # Make sure that after one iteration we haven't finished
for chunk in self.st.get_iter(self.test_run_id, self.mongo_target):
print(chunk)
break
assert not self.is_stored_in_mongo

def test_allow_incomplete(self):
"""Test loading incomplete data"""
st_incomplete_allowed = self.st.new_context()
st_incomplete_allowed.set_context_config(
{'allow_incomplete': True,
'forbid_creation_of': '*',
}
)
assert not self.is_stored_in_mongo
self.st.config['n_chunks'] = 3

self.log.info(f'Starting with empty db {self.chunk_summary}')
# Get the iterator separately and complete with "next(iterator)
iterator = self.st.get_iter(self.test_run_id, self.mongo_target)

self.log.info(f'Got iterator, still no data?: {self.chunk_summary}')
# Chunk 0
if self.is_stored_in_mongo:
raise RuntimeError(f'Target should not be stored'
f'\n{self.chunk_summary}')

# Chunk 1
next(iterator)
time.sleep(0.5) # In case the database is not reflecting changes quick enough
self.log.info(f'Got first chunk, still no data?: {self.chunk_summary}')
if self.is_stored_in_mongo:
raise RuntimeError(f'After 1 chunk target should not be stored'
f'\n{self.chunk_summary}')
if not self.stored_in_context(st_incomplete_allowed):
raise RuntimeError(f'We did not find the one chunk that should be '
f'allowed to be loaded\n {self.chunk_summary}')

# Chunks >1
for _ in iterator:
pass

stored_in_st = self.is_stored_in_mongo
stored_in_incomplete_st = self.stored_in_context(st_incomplete_allowed)
if not stored_in_st or not stored_in_incomplete_st:
raise RuntimeError(f'Source finished and should be stored in st '
f'({stored_in_st}) and st_incomplete ('
f'{stored_in_incomplete_st}) '
f'\n{self.chunk_summary}')

def test_allow_incomplete_during_md_creation(self):
"""
Test that allowing incomplete data does not find data if the
metadata ("md") is just created

See #596 for more info

Test is different from "test_allow_incomplete" in the sense
that no chunks are written at all, only the md is registered
"""
st_incomplete_allowed = self.st.new_context()
st_incomplete_allowed.set_context_config(
{'allow_incomplete': True,
'forbid_creation_of': '*',
}
)
assert not self.is_stored_in_mongo
self.st.config['n_chunks'] = 3

# Mimic metadata creation:
# github.com/AxFoundation/strax/blob/a9ec08003a9193113c65910602d8b1b0ed4eb4e6/strax/context.py#L903 # noqa
# Get the iterator separately and complete with "next(iterator)
key = self.st.key_for(self.test_run_id, self.mongo_target)
target_plugin = self.st.get_single_plugin(self.test_run_id, self.mongo_target)
self.mongo_sf.saver(
key=key,
metadata=target_plugin.metadata(self.test_run_id, self.mongo_target),
saver_timeout=self.st.context_config['saver_timeout'])
print(self.chunk_summary)
assert len(self.chunk_summary) == 1, f'Only md should be written {self.chunk_summary}'

# Now check that both frontends understand there is no data (even when
# allow_incomplete is set)
stored_in_st = self.is_stored_in_mongo
stored_in_incomplete_st = self.stored_in_context(st_incomplete_allowed)
if stored_in_st or stored_in_incomplete_st:
raise RuntimeError(f'Only metadata written and should NOT stored in st '
f'({stored_in_st}) and st_incomplete ('
f'{stored_in_incomplete_st}) '
f'\n{self.chunk_summary}')
Comment on lines +214 to +222
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the check that failed before and is now fixed


def _make_mongo_target(self):
assert not self.is_stored_in_mongo
self.st.make(self.test_run_id, self.mongo_target)
assert self.is_stored_in_mongo

@staticmethod
def _return_file_info(file: dict,
save_properties=('number',
'data_type',
'lineage_hash',
'provides_meta',
'chunk_i',
'chunks',
)
) -> dict:
return {k: file.get(k) for k in save_properties}

@property
def chunk_summary(self):
files = self.collection.find()
return [self._return_file_info(f) for f in files]


if __name__ == '__main__':
test = TestMongoFrontend()

for attribute in test.__dict__.keys():
if attribute.startswith('test_'):
test.setUp()
func = getattr(test, attribute)
func()
test.tearDown()
print('Done bye bye')