-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
copy data from frontend to frontend #351
Changes from 5 commits
ab4231d
5f58da6
d138ca4
c56262f
5adf425
d0b75d4
2ad58fd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1323,13 +1323,10 @@ def is_stored(self, run_id, target, **kwargs): | |
# noinspection PyMethodFirstArgAssignment | ||
self = self.new_context(**kwargs) | ||
|
||
key = self.key_for(run_id, target) | ||
for sf in self.storage: | ||
try: | ||
sf.find(key, **self._find_options) | ||
if self._is_stored_in_sf(run_id, target, sf): | ||
return True | ||
except strax.DataNotAvailable: | ||
continue | ||
# None of the frontends has the data | ||
return False | ||
|
||
def _check_forbidden(self): | ||
|
@@ -1361,6 +1358,106 @@ def _apply_function(self, data, targets): | |
data = function(data, targets) | ||
return data | ||
|
||
def _copy_to_frontend(self, run_id, target, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps this should not be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On the one hand I agree with you that this function should just be used by a very limited amount of people who know what they are doing and hence make the function not "visible". On the other hand I think it is very unlikely that anyone will use this function except for the experts. My/our concerns are very limited, and the underscore usually indicates that the function is a private member, I agree with your statement to remove it. |
||
target_frontend_id=None, rechunk=False): | ||
""" | ||
Copy data from one frontend to another | ||
:param run_id: run_id | ||
:param target: target datakind | ||
:param target_frontend_id: index of the frontend that the data should go to | ||
in context.storage. If no index is specified, try all. | ||
:param rechunk: allow re-chunking for saving | ||
""" | ||
if not self.is_stored(run_id, target): | ||
raise strax.DataNotAvailable(f'Cannot copy {run_id} {target} since it ' | ||
f'does not exist') | ||
if len(strax.to_str_tuple(target)) > 1: | ||
raise ValueError('_copy_to_frontend only works for ') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The message is missing something There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. whoop, thanks will add |
||
if target_frontend_id is None: | ||
target_sf = self.storage | ||
elif len(self.storage) > target_frontend_id: | ||
# only write to selected other frontend | ||
target_sf = [self.storage[target_frontend_id]] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am wondering if it would not be a bit neater to add a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would be the gain? Now it's just the index in st.storage which is a list. Using a number is quite simple. Could try to match the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, I think it is not really needed and I am fine with the PR as it is.
This was also my concern, but I am not sure if the "risk" justifies the extra work. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually, I'm not that sure if it makes sense. the repr is quite a long string and just referring to the class name will also not be sufficient. There is not that much real risk as we are in the end checking if the sf is susceptible at all anyways |
||
else: | ||
raise ValueError(f'Cannot select {target_frontend_id}-th frontend as ' | ||
f'we only have {len(self.storage)} frontends!') | ||
|
||
# Figure out which of the frontends has the data. Raise error when none | ||
source_sf = self._get_source_sf(run_id, target, raise_error=True) | ||
|
||
# Keep frontends that: | ||
# 1. already have the data; and | ||
# 2. take the data; and | ||
# 3. are readonly | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "not" |
||
target_sf = [t_sf for t_sf in target_sf if | ||
(not self._is_stored_in_sf(run_id, target, t_sf) and | ||
t_sf._we_take(target) and | ||
t_sf.readonly is False)] | ||
|
||
if not len(target_sf): | ||
raise ValueError('No frontend to copy to! Perhaps you already stored ' | ||
'it or none of the frontends is willing to take it?') | ||
|
||
# Get the info from the source backend (s_be) that we need to fill | ||
# the target backend (t_be) with | ||
data_key = self.key_for(run_id, target) | ||
# This should never fail, we just tried | ||
s_be_str, s_be_key = source_sf.find(data_key) | ||
s_be = source_sf._get_backend(s_be_str) | ||
md = s_be.get_metadata(s_be_key) | ||
|
||
for t_sf in target_sf: | ||
try: | ||
# Need to load a new loader each time since it's a generator | ||
# and will be exhausted otherwise. | ||
loader = s_be.loader(s_be_key) | ||
# Fill the target buffer | ||
t_be_str, t_be_key = t_sf.find(data_key, write=True) | ||
target_be = t_sf._get_backend(t_be_str) | ||
saver = target_be._saver(t_be_key, md) | ||
saver.save_from(loader, rechunk=rechunk) | ||
except NotImplementedError: | ||
# Target is not susceptible | ||
continue | ||
except strax.DataExistsError: | ||
raise strax.DataExistsError( | ||
f'Trying to write {data_key} to {t_sf} which already exists, ' | ||
'do you have two storage frontends writing to the same place?') | ||
|
||
def _is_stored_in_sf(self, run_id, target, | ||
storage_frontend: strax.StorageFrontend) -> bool: | ||
""" | ||
:param run_id, target: run_id, target | ||
:param storage_frontend: strax.StorageFrontend to check if it has the | ||
requested datakey for the run_id and target. | ||
:return: if the frontend has the key or not. | ||
""" | ||
key = self.key_for(run_id, target) | ||
try: | ||
storage_frontend.find(key, **self._find_options) | ||
return True | ||
except strax.DataNotAvailable: | ||
return False | ||
|
||
def _get_source_sf(self, run_id, target, raise_error=False): | ||
""" | ||
Get the source storage frontend for a given run_id and target | ||
:param run_id, target: run_id, target | ||
:param raise_error: Raise a ValueError if we cannot find one | ||
(e.g. we already checked the data is stored) | ||
:return: strax.StorageFrontend or None (when raise_error is | ||
False) | ||
""" | ||
source_sf = None | ||
for sf in self.storage: | ||
if self._is_stored_in_sf(run_id, target, sf): | ||
source_sf = sf | ||
# We only need a single source | ||
break | ||
if source_sf is None and raise_error: | ||
raise ValueError('This cannot happen, we just checked that this ' | ||
'run should be stored?!?') | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh my... thanks |
||
@classmethod | ||
def add_method(cls, f): | ||
"""Add f as a new Context method""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A possible more compact alternate could be
return any(map(partial(self._is_stored_in_sf, run_id, target), self.storage))
, but this boolean-optimizes (as it were).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about something like this but I decided against it. I wrote it like this as it returns as soon as a single one is true, you get the
True
flag. If one writes a SF that takes long to see if something is stored, you may safe some time.