Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StorageFrontend remoteness attribute and test #617

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 26 additions & 10 deletions strax/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,17 +679,29 @@ def _find_options(self):
fuzzy_for_options=self.context_config['fuzzy_for_options'],
allow_incomplete=self.context_config['allow_incomplete'])

@property
def _sorted_storage(self) -> ty.List[strax.StorageFrontend]:
"""
Simple ordering of the storage frontends on the fly when e.g.
looking for data. This allows us to use the simple self.storage
as a simple list without asking users to keep any particular
order in mind. Return the fastest first and try loading from it
"""
storage_sfs = self.storage
storage_sfs.sort(key=lambda x: x.storage_type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
storage_sfs.sort(key=lambda x: x.storage_type)
storage_sfs = sorted(self.storage, key=lambda x: x.storage_type)

I think you want to keep original list since order is used in copy_to_frontend right? I think the way it is would sort in place the original list no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

return storage_sfs

def _get_partial_loader_for(self, key, time_range=None, chunk_number=None):
"""
Get partial loaders to allow loading data later
:param key: strax.DataKey
:param time_range: 2-length arraylike of (start, exclusive end) of row
numbers to get. Default is None, which means get the entire run.
numbers to get. Default is None, which means get the entire run.
:param chunk_number: number of the chunk for data specified by
strax.DataKey. This chunck is loaded exclusively.
strax.DataKey. This chunck is loaded exclusively.
:return: partial object
"""
for sb_i, sf in enumerate(self.storage):
for _, sf in enumerate(self._sorted_storage):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for _, sf in enumerate(self._sorted_storage):
for sf in self._sorted_storage:

double jeopardy!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are absolutely right!

try:
# Partial is clunky... but allows specifying executor later
# Since it doesn't run until later, we must do a find now
Expand Down Expand Up @@ -879,7 +891,7 @@ def concat_loader(*args, **kwargs):

key = strax.DataKey(run_id, d_to_save, target_plugin.lineage)

for sf in self.storage:
for sf in self._sorted_storage:
if sf.readonly:
continue
if loading_this_data:
Expand Down Expand Up @@ -1469,10 +1481,10 @@ def get_meta(self, run_id, target) -> dict:
:param target: data type to get
"""
key = self.key_for(run_id, target)
for sf in self.storage:
for sf in self._sorted_storage:
try:
return sf.get_metadata(key, **self._find_options)
except strax.DataNotAvailable as e:
except strax.DataNotAvailable:
self.log.debug(f"Frontend {sf} does not have {key}")
raise strax.DataNotAvailable(f"Can't load metadata, "
f"data for {key} not available")
Expand All @@ -1487,7 +1499,7 @@ def run_metadata(self, run_id, projection=None) -> dict:
:param projection: Selection of fields to get, following MongoDB
syntax. May not be supported by frontend.
"""
for sf in self.storage:
for sf in self._sorted_storage:
if not sf.provide_run_metadata:
continue
try:
Expand Down Expand Up @@ -1523,7 +1535,8 @@ def run_defaults(self, run_id):
return defs

def is_stored(self, run_id, target, **kwargs):
"""Return whether data type target has been saved for run_id
"""
Return whether data type target has been saved for run_id
through any of the registered storage frontends.

Note that even if False is returned, the data type may still be made
Expand All @@ -1540,7 +1553,7 @@ def is_stored(self, run_id, target, **kwargs):
# noinspection PyMethodFirstArgAssignment
self = self.new_context(**kwargs)

for sf in self.storage:
for sf in self._sorted_storage:
if self._is_stored_in_sf(run_id, target, sf):
return True
# None of the frontends has the data
Expand Down Expand Up @@ -1592,13 +1605,16 @@ def copy_to_frontend(self,
rechunk: bool = False):
"""
Copy data from one frontend to another

:param run_id: run_id
:param target: target datakind
:param target_frontend_id: index of the frontend that the data should go to
in context.storage. If no index is specified, try all.
:param target_compressor: if specified, recompress with this compressor.
:param rechunk: allow re-chunking for saving
"""

# NB! We don't want to use self.sorted_storage here since the order matters!
if not self.is_stored(run_id, target):
raise strax.DataNotAvailable(f'Cannot copy {run_id} {target} since it '
f'does not exist')
Expand Down Expand Up @@ -1781,7 +1797,7 @@ def _get_source_sf(self, run_id, target, should_exist=False):
:return: strax.StorageFrontend or None (when raise_error is
False)
"""
for sf in self.storage:
for sf in self._sorted_storage:
if self._is_stored_in_sf(run_id, target, sf):
return sf
if should_exist:
Expand Down
20 changes: 19 additions & 1 deletion strax/storage/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import time
import typing
import warnings

from enum import IntEnum
import numpy as np

import strax
Expand Down Expand Up @@ -84,6 +84,23 @@ class RunMetadataNotAvailable(Exception):
pass


@export
class StorageType(IntEnum):
"""
Class attribute of how far/close data is when fetched from a given
storage frontend. This is used to prioritize which frontend will be
asked first for data (prevents loading data from slow frontends when
fast frontends might also have the data)
"""
# Feel free to add, could even be floats if needed
MEMORY = 0 # Sits in cache, super fast to load
LOCAL = 1 # Available on hard-disk, only limited by IO
ONLINE = 2 # Limited by network bandwidth
COMPRESSED = 3 # Data is compressed and needs (slow) decompression before loading
REMOTE = 4 # Data needs to be fetched from another host, and downloaded
TAPE = 10 # Nothing is as slow as tape, except pigeon post


@export
class StorageFrontend:
"""Interface to something that knows data-locations and run-level metadata.
Expand All @@ -93,6 +110,7 @@ class StorageFrontend:
can_define_runs = False
provide_run_metadata = False
provide_superruns = False
storage_type = StorageType.LOCAL

def __init__(self,
readonly=False,
Expand Down
3 changes: 3 additions & 0 deletions strax/storage/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ def _clean_first_key_from_registry(self):
class MongoFrontend(StorageFrontend):
"""MongoDB storage frontend"""

# Not as fast as local storage, especially due to many chunks
storage_type = strax.StorageType.ONLINE

def __init__(self,
uri: str,
database: str,
Expand Down
2 changes: 2 additions & 0 deletions strax/storage/zipfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class ZipDirectory(strax.StorageFrontend):
instead these zip files are made by zipping stuff from FileSytemBackend.
"""

storage_typ = strax.StorageType.COMPRESSED

def __init__(self, path='.', *args, readonly=True, **kwargs):
if not readonly:
raise NotImplementedError("Zipfiles are currently read-only")
Expand Down
199 changes: 194 additions & 5 deletions tests/test_storage.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
import unittest
from unittest import TestCase, skipIf
import strax
from strax.testutils import Records, Peaks
from strax.testutils import Records
import os
import shutil
import tempfile
import numpy as np
import typing as ty
from immutabledict import immutabledict


class TestPerRunDefaults(unittest.TestCase):
"""Test the saving behavior of the context"""
@skipIf(True, '')
class TestPerRunDefaults(TestCase):
"""
Test the saving behavior of the context
"""

def setUp(self):
self.path = os.path.join(tempfile.gettempdir(), 'strax_data')
self.st = strax.Context(use_per_run_defaults=True,
register=[Records],)
register=[Records], )
self.target = 'records'

def tearDown(self):
Expand All @@ -30,3 +37,185 @@ def test_complain_run_id(self):
run_id = 'run-0'
with self.assertRaises(ValueError):
self.st.make(run_id, self.target)


class VerboseDataDir(strax.DataDirectory):
_verbose = False

def _print(self, m):
if self._verbose:
print(m)

def find(self, key, *args, **kwargs):
message = f'{self.path} was asked for {key} ->'
try:
result = super().find(key, *args, **kwargs)
except Exception as e:
self._print(f'{message} raises {type(e)}')
raise e
self._print(f'{message} returns {result}')
return result


class TestStorageType(TestCase):
"""
Test that slow frontends are asked last to provide data.

A bit of a clunky test to test a simple thing. It's may be a bit
hidden but this is the goal:

- Check that we are always asking for data to the *fastest* frontend first

We do this by creating three frontends, of varying speed. To be able
to be sure which frontend is returning what, we put a different
amount of data in each of them, so we can be sure the right frontend
is returning us that data (we do a lot of for loops where I don't
otherwise now how we can cleanly extract which frontend is returning
what).
"""

target = 'records'
run_id = '1'

context_kwargs = immutabledict(use_per_run_defaults=True,
allow_rechunk=False,
register=[Records],
)

@classmethod
def setUpClass(cls) -> None:
"""Get a temp directory available of all the tests"""
cls.path = os.path.join(tempfile.gettempdir(), 'strax_data')

def tearDown(self):
"""After each test, delete the temporary directory"""
if os.path.exists(self.path):
print(f'rm {self.path}')
shutil.rmtree(self.path)

def _sub_dir(self, subdir: str) -> str:
return os.path.join(self.path, subdir)

def get_st_and_fill_frontends(self) -> ty.Tuple[strax.Context, dict]:
"""
Get options that allow us to do the check as in the docstring
of this class
"""
# Three frontends, with three different names, remoteness levels
# and number of chunks stored in them
frontend_setup = {
'name': ['far', 'close', 'intermediate'],
'remoteness': [strax.StorageType.TAPE,
strax.StorageType.LOCAL,
strax.StorageType.REMOTE,
],
'n_chunks': [1, 2, 3],
}

frontends = []
for name, remoteness, n_chunks in zip(*list(frontend_setup.values())):
sf = VerboseDataDir(self._sub_dir(name))
sf.storage_type = remoteness
temp_st = strax.Context(storage=[sf],
config=dict(n_chunks=n_chunks),
**self.context_kwargs,
)
# For each frontend, make data (that won't be equal size!)
recs = temp_st.get_array(self.run_id, self.target)
n_recs = len(recs)
del recs
print(f'{sf} ({name}) made {n_recs})')
sf.readonly = True
sf._verbose = True
frontends += [sf]

return (strax.Context(storage=frontends, **self.context_kwargs),
frontend_setup)

def test_close_goes_first_md(self):
"""
Let's see that if we get the meta-data, it's from the one with
the lowest remoteness. We can check this by comparing the number
of chunks.
"""
st, frontend_setup = self.get_st_and_fill_frontends()
result = st.get_metadata(self.run_id, self.target)
n_chunks = len(result['chunks'])
closest = np.argmin(frontend_setup['remoteness'])
n_fastest = frontend_setup['n_chunks'][closest]

self.assertEqual(n_chunks,
frontend_setup['n_chunks'][closest],
f'Should have returned {n_fastest} from'
f' {st.storage[closest]}, got {n_chunks}'
)

def test_close_goes_first_on_loading(self):
"""
Check that loading data comes from the fastest frontend by
comparing the data length
"""
st, frontend_setup = self.get_st_and_fill_frontends()
closest = np.argmin(frontend_setup['remoteness'])
len_from_main_st = len(st.get_array(self.run_id, self.target))

for sf_i, sf in enumerate(st.storage):
st_compare = st.new_context()
st_compare.storage = [sf]
len_from_compare = len(st_compare.get_array(self.run_id,
self.target))
if sf_i == closest:
self.assertEqual(len_from_compare, len_from_main_st)
else:
self.assertNotEqual(len_from_compare, len_from_main_st)

def test_float_remoteness_allowed(self):
"""
It can happen that the pre-defined remoteness identifiers in
strax.StorageType are not sufficient, e.g. you have 10 similar
but not quite the same performing frontends.

You can set `sf.storage_type` as a float to fix this issue such
that you can order an infinite amount of frontends (the intnum
is only for readability).
"""

# Two different classes with only slightly different storage_types
class FrontentSlow(VerboseDataDir):
storage_type = 10
_verbose = True

class FrontentSlightlySlower(VerboseDataDir):
storage_type = 10.001
raise_when_run = False
_verbose = True

def find(self, *args, **kwargs):
print(self.raise_when_run)
if self.raise_when_run:
raise strax.testutils.SomeCrash
return super().find(*args, **kwargs)

storage_slow = FrontentSlow(self._sub_dir('slow'))
storage_slightly_slow = FrontentSlightlySlower(self._sub_dir('slightlyslow'))
storages = [storage_slow, storage_slightly_slow]
st = strax.Context(storage=storages, **self.context_kwargs)

# Make the data, it should be in both frontends
st.make(self.run_id, self.target)
self.assertTrue(st.is_stored(self.run_id, self.target))
for sf in storages:
self.assertTrue(st._is_stored_in_sf(self.run_id, self.target, sf), str(sf))

# Now set the charge, if the slightly slower frontend is asked
# for data, it will raise an error
storage_slightly_slow.raise_when_run = True
st.set_context_config({'forbid_creation_of': '*'})
# No error raises because we get the storage_slow's data
st.get_array(self.run_id, self.target)
# just to be sure, we would have gotten an error if it would
# have gotten data from storage_slightly_slow
with self.assertRaises(strax.testutils.SomeCrash):
st.storage = [storage_slightly_slow]
print(st.storage)
st.is_stored(self.run_id, self.target)