Skip to content

Commit

Permalink
Find the frontends which stored the targets (#802)
Browse files Browse the repository at this point in the history
* Find the frontends which stored the targets

* Turn to use _get_source_sf
  • Loading branch information
dachengx committed Feb 7, 2024
1 parent 0f36687 commit 2aa293a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
27 changes: 21 additions & 6 deletions strax/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2085,7 +2085,7 @@ def copy_to_frontend(
target_sf = [self.storage[target_frontend_id]]

# Figure out which of the frontends has the data. Raise error when none
source_sf = self._get_source_sf(run_id, target, should_exist=True)
source_sf = self.get_source_sf(run_id, target, should_exist=True)[0]

# Keep frontends that:
# 1. don't already have the data; and
Expand Down Expand Up @@ -2275,22 +2275,37 @@ def _is_stored_in_sf(self, run_id, target, storage_frontend: strax.StorageFronte
except strax.DataNotAvailable:
return False

def _get_source_sf(self, run_id, target, should_exist=False):
"""Get the source storage frontend for a given run_id and target.
def get_source_sf(self, run_id, target, should_exist=False):
"""Get the source storage frontends for a given run_id and target.
:param run_id, target: run_id, target
:param should_exist: Raise a ValueError if we cannot find one (e.g. we already checked the
data is stored)
:return: strax.StorageFrontend or None (when raise_error is False)
:return: list of strax.StorageFrontend (when should_exist is False)
"""
if isinstance(target, (tuple, list)):
if len(target) == 0:
raise ValueError("Cannot find stored frontend for empty target!")
frontends_list = [
self.get_source_sf(
run_id,
t,
should_exist=should_exist,
)
for t in target
]
return list(set.intersection(*map(set, frontends_list)))

frontends = []
for sf in self._sorted_storage:
if self._is_stored_in_sf(run_id, target, sf):
return sf
if should_exist:
frontends.append(sf)
if should_exist and not frontends:
raise ValueError(
"This cannot happen, we just checked that this run should be stored?!?"
)
return frontends

def get_save_when(self, target: str) -> ty.Union[strax.SaveWhen, int]:
"""For a given plugin, get the save when attribute either being a dict or a number."""
Expand Down
2 changes: 2 additions & 0 deletions tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ def test_copy_to_frontend():

# Add the second frontend
context.storage += [strax.DataDirectory(temp_dir_2)]
# Test get_source_sf
context.get_source_sf(run_id, ["records", "records"])
context.copy_to_frontend(run_id, "records", target_compressor="lz4")

# Make sure both frontends have the same data.
Expand Down

0 comments on commit 2aa293a

Please sign in to comment.