Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for availability check, parallelsourceplugin #194

Merged
merged 5 commits into from
Jun 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions strax/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def search_field(self, pattern):
cache.update(self._get_plugins((d,), run_id='0'))
p = cache[d]

for field_name in p.dtype.names:
for field_name in p.dtype_for(d).fields:
if fnmatch.fnmatch(field_name, pattern):
print(f"{field_name} is part of {d} "
f"(provided by {p.__class__.__name__})")
Expand Down Expand Up @@ -838,7 +838,7 @@ def list_available(self, target, **kwargs):
found = set()
for sf in self.storage:
remaining = keys - found
is_found = sf.find_several(remaining, **self._find_options)
is_found = sf.find_several(list(remaining), **self._find_options)
found |= set([k for i, k in enumerate(remaining)
if is_found[i]])
return list(sorted([x.run_id for x in found]))
Expand Down
7 changes: 6 additions & 1 deletion strax/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ class ParallelSourcePlugin(Plugin):
parallel = 'process'

@classmethod
def inline_plugins(cls, components, start_from):
def inline_plugins(cls, components, start_from, log):
plugins = components.plugins.copy()

sub_plugins = {start_from: plugins[start_from]}
Expand All @@ -501,6 +501,7 @@ def inline_plugins(cls, components, start_from):

if len(set(list(sub_plugins.values()))) == 1:
# Just one plugin to inline: no use
log.debug("Just one plugin to inline: skipping")
return components

# Which data types should we output? Three cases follow.
Expand Down Expand Up @@ -549,6 +550,10 @@ def inline_plugins(cls, components, start_from):
p.dtype = p.sub_plugins[list(outputs_to_send)[0]].dtype
for d in p.provides:
plugins[d] = p
p.deps = {d: plugins[d] for d in p.depends_on}

log.debug(f"Inlined plugins: {p.sub_plugins}."
f"Inlined savers: {p.sub_savers}")

return strax.ProcessorComponents(
plugins, components.loaders, savers, components.targets)
Expand Down
7 changes: 6 additions & 1 deletion strax/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from npshmex import ProcessPoolExecutor as SHMExecutor
except ImportError:
# This is allowed to fail, it only crashes if allow_shm = True
SHMExecutor = None
pass

import numpy as np
Expand Down Expand Up @@ -64,6 +65,10 @@ def __init__(self,
if (allow_multiprocess and len(mp_plugins)):
_proc_ex = ProcessPoolExecutor
if allow_shm:
if SHMExecutor is None:
raise RuntimeError(
"You must install npshmex to enable shm"
" transfer of numpy arrays.")
_proc_ex = SHMExecutor
self.process_executor = _proc_ex(max_workers=max_workers)

Expand All @@ -73,7 +78,7 @@ def __init__(self,
int(np.argmin([len(p.depends_on)
for p in mp_plugins.values()]))]
components = strax.ParallelSourcePlugin.inline_plugins(
components, start_from)
components, start_from, log=self.log)
self.components = components
self.log.debug("Altered components for multiprocessing: "
+ str(components))
Expand Down
15 changes: 0 additions & 15 deletions strax/storage/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,15 +274,6 @@ def _can_overwrite(self, key: DataKey):
and 'exception' not in metadata)
return False

def list_available(self, key: DataKey,
allow_incomplete, fuzzy_for, fuzzy_for_options):
"""Return list of run_ids for which available data matches key.
The run_id field of key is ignored."""
if not self._we_take(key.data_type):
return []
return self._list_available(
key, allow_incomplete, fuzzy_for, fuzzy_for_options)

def find_several(self, keys, **kwargs):
"""Return list with backend keys or False
for several data keys.
Expand Down Expand Up @@ -310,12 +301,6 @@ def _scan_runs(self, store_fields):
"""
yield from tuple()

def _list_available(self, key: DataKey,
allow_incomplete, fuzzy_for, fuzzy_for_options):
"""Return list of available runs whose data matches key.
The run_id field of key is ignored."""
raise NotImplementedError

def _find(self, key: DataKey,
write, allow_incomplete, fuzzy_for, fuzzy_for_options):
"""Return backend key (e.g. for filename) for data identified by key,
Expand Down
17 changes: 0 additions & 17 deletions strax/storage/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,6 @@ def _scan_runs(self, store_fields):
found.add(run_id)
yield dict(name=run_id)

def _list_available(self, key: strax.DataKey,
allow_incomplete, fuzzy_for, fuzzy_for_options):
if allow_incomplete:
raise NotImplementedError(
"allow_incomplete not yet supported with list_available "
"for DataDirectory")

found_runs = []
for fn in self._subfolders():
run_id = self._folder_matches(
fn, key, fuzzy_for, fuzzy_for_options,
ignore_name=True)
if run_id:
found_runs.append(run_id)

return found_runs

def _find(self, key, write,
allow_incomplete, fuzzy_for, fuzzy_for_options):
dirname = osp.join(self.path, str(key))
Expand Down