Skip to content

Commit

Permalink
Merge d88fe84 into 9cce3b0
Browse files Browse the repository at this point in the history
  • Loading branch information
mpu-creare committed Oct 22, 2020
2 parents 9cce3b0 + d88fe84 commit fe3ca74
Show file tree
Hide file tree
Showing 16 changed files with 115 additions and 150 deletions.
11 changes: 5 additions & 6 deletions podpac/core/algorithm/algorithm.py
Expand Up @@ -14,7 +14,7 @@
# Internal dependencies
from podpac.core.coordinates import Coordinates, union
from podpac.core.units import UnitsDataArray
from podpac.core.node import Node, NodeException, node_eval, COMMON_NODE_DOC
from podpac.core.node import Node, NodeException, COMMON_NODE_DOC
from podpac.core.utils import common_doc, NodeTrait
from podpac.core.settings import settings
from podpac.core.managers.multi_threading import thread_manager
Expand Down Expand Up @@ -73,8 +73,7 @@ def algorithm(self, inputs):
raise NotImplementedError

@common_doc(COMMON_DOC)
@node_eval
def eval(self, coordinates, output=None, selector=None):
def _eval(self, coordinates, output=None, _selector=None):
"""Evalutes this nodes using the supplied coordinates.
Parameters
Expand All @@ -83,7 +82,7 @@ def eval(self, coordinates, output=None, selector=None):
{requested_coordinates}
output : podpac.UnitsDataArray, optional
{eval_output}
selector: callable(coordinates, request_coordinates)
_selector: callable(coordinates, request_coordinates)
{eval_selector}
Returns
Expand All @@ -105,7 +104,7 @@ def eval(self, coordinates, output=None, selector=None):
if settings["MULTITHREADING"] and n_threads > 1:
# Create a function for each thread to execute asynchronously
def f(node):
return node.eval(coordinates)
return node.eval(coordinates, _selector=_selector)

# Create pool of size n_threads, note, this may be created from a sub-thread (i.e. not the main thread)
pool = thread_manager.get_thread_pool(processes=n_threads)
Expand All @@ -126,7 +125,7 @@ def f(node):
else:
# Evaluate nodes in serial
for key, node in self.inputs.items():
inputs[key] = node.eval(coordinates)
inputs[key] = node.eval(coordinates, output=output, _selector=_selector)
self._multi_threaded = False

# accumulate output coordinates
Expand Down
6 changes: 3 additions & 3 deletions podpac/core/algorithm/coord_select.py
Expand Up @@ -46,7 +46,7 @@ def _default_coordinates_source(self):
return self.source

@common_doc(COMMON_DOC)
def eval(self, coordinates, output=None, selector=None):
def _eval(self, coordinates, output=None, _selector=None):
"""Evaluates this nodes using the supplied coordinates.
Parameters
Expand All @@ -55,7 +55,7 @@ def eval(self, coordinates, output=None, selector=None):
{requested_coordinates}
output : podpac.UnitsDataArray, optional
{eval_output}
selector: callable(coordinates, request_coordinates)
_selector: callable(coordinates, request_coordinates)
{eval_selector}
Returns
Expand All @@ -79,7 +79,7 @@ def eval(self, coordinates, output=None, selector=None):
raise ValueError("Modified coordinates do not intersect with source data (dim '%s')" % dim)

outputs = {}
outputs["source"] = self.source.eval(self._modified_coordinates, output=output)
outputs["source"] = self.source.eval(self._modified_coordinates, output=output, _selector=_selector)

if self.substitute_eval_coords:
dims = outputs["source"].dims
Expand Down
2 changes: 1 addition & 1 deletion podpac/core/algorithm/reprojection.py
Expand Up @@ -62,7 +62,7 @@ def _coordinates(self):
raise TypeError("The coordinates attribute is of the wrong type.")

def _source_eval(self, coordinates, selector, output=None):
return self.source.eval(self._coordinates, output, selector)
return self.source.eval(self._coordinates, output=output, _selector=selector)

@property
def base_ref(self):
Expand Down
9 changes: 4 additions & 5 deletions podpac/core/algorithm/signal.py
Expand Up @@ -16,7 +16,7 @@
from podpac.core.node import Node
from podpac.core.algorithm.algorithm import UnaryAlgorithm
from podpac.core.utils import common_doc, ArrayTrait, NodeTrait
from podpac.core.node import COMMON_NODE_DOC, node_eval
from podpac.core.node import COMMON_NODE_DOC


COMMON_DOC = COMMON_NODE_DOC.copy()
Expand Down Expand Up @@ -100,8 +100,7 @@ def _first_init(self, kernel=None, kernel_dims=None, kernel_type=None, kernel_nd
return super(Convolution, self)._first_init(**kwargs)

@common_doc(COMMON_DOC)
@node_eval
def eval(self, coordinates, output=None, selector=None):
def _eval(self, coordinates, output=None, _selector=None):
"""Evaluates this nodes using the supplied coordinates.
Parameters
Expand All @@ -110,7 +109,7 @@ def eval(self, coordinates, output=None, selector=None):
{requested_coordinates}
output : podpac.UnitsDataArray, optional
{eval_output}
selector: callable(coordinates, request_coordinates)
_selector: callable(coordinates, request_coordinates)
{eval_selector}
Returns
Expand Down Expand Up @@ -162,7 +161,7 @@ def eval(self, coordinates, output=None, selector=None):
self._expanded_coordinates = expanded_coordinates

# evaluate source using expanded coordinates, convolve, and then slice out original coordinates
source = self.source.eval(expanded_coordinates)
source = self.source.eval(expanded_coordinates, _selector=_selector)

# Check dimensions
if any([d not in kernel_dims for d in source.dims if d != "output"]):
Expand Down
31 changes: 14 additions & 17 deletions podpac/core/algorithm/stats.py
Expand Up @@ -21,7 +21,7 @@
from podpac.core.node import Node
from podpac.core.algorithm.algorithm import UnaryAlgorithm, Algorithm
from podpac.core.utils import common_doc, NodeTrait
from podpac.core.node import COMMON_NODE_DOC, node_eval
from podpac.core.node import COMMON_NODE_DOC

COMMON_DOC = COMMON_NODE_DOC.copy()

Expand Down Expand Up @@ -135,7 +135,7 @@ def _reshape(self, x):
a = x.data.reshape(-1, *x.shape[n:])
return a

def iteroutputs(self, coordinates):
def iteroutputs(self, coordinates, _selector):
"""Generator for the chunks of the output
Yields
Expand All @@ -145,11 +145,10 @@ def iteroutputs(self, coordinates):
"""
chunk_shape = self._get_chunk_shape(coordinates)
for chunk in coordinates.iterchunks(chunk_shape):
yield self.source.eval(chunk)
yield self.source.eval(chunk, _selector=_selector)

@common_doc(COMMON_DOC)
@node_eval
def eval(self, coordinates, output=None, selector=None):
def _eval(self, coordinates, output=None, _selector=None):
"""Evaluates this nodes using the supplied coordinates.
Parameters
Expand All @@ -158,7 +157,7 @@ def eval(self, coordinates, output=None, selector=None):
{requested_coordinates}
output : podpac.UnitsDataArray, optional
{eval_output}
selector: callable(coordinates, request_coordinates)
_selector: callable(coordinates, request_coordinates)
{eval_selector}
Returns
Expand All @@ -179,13 +178,13 @@ def eval(self, coordinates, output=None, selector=None):

if self.chunk_size and self.chunk_size < reduce(mul, coordinates.shape, 1):
try:
result = self.reduce_chunked(self.iteroutputs(coordinates), output)
result = self.reduce_chunked(self.iteroutputs(coordinates, _selector), output)
except NotImplementedError:
warnings.warn("No reduce_chunked method defined, using one-step reduce")
source_output = self.source.eval(coordinates)
source_output = self.source.eval(coordinates, _selector=_selector)
result = self.reduce(source_output)
else:
source_output = self.source.eval(coordinates)
source_output = self.source.eval(coordinates, _selector=_selector)
result = self.reduce(source_output)

if output.shape == ():
Expand Down Expand Up @@ -276,7 +275,7 @@ def _get_chunk_shape(self, coords):

return [d[dim] for dim in coords.dims]

def iteroutputs(self, coordinates):
def iteroutputs(self, coordinates, selector):
"""Generator for the chunks of the output
Yields
Expand All @@ -287,7 +286,7 @@ def iteroutputs(self, coordinates):

chunk_shape = self._get_chunk_shape(coordinates)
for chunk, slices in coordinates.iterchunks(chunk_shape, return_slices=True):
yield self.source.eval(chunk), slices
yield self.source.eval(chunk, _selector=selector), slices

def reduce_chunked(self, xs, output):
"""
Expand Down Expand Up @@ -866,8 +865,7 @@ def _default_coordinates_source(self):
return self.source

@common_doc(COMMON_DOC)
@node_eval
def eval(self, coordinates, output=None, selector=None):
def _eval(self, coordinates, output=None, _selector=None):
"""Evaluates this nodes using the supplied coordinates.
Parameters
Expand Down Expand Up @@ -963,8 +961,7 @@ def _default_coordinates_source(self):
return self.source

@common_doc(COMMON_DOC)
@node_eval
def eval(self, coordinates, output=None, selector=None):
def _eval(self, coordinates, output=None, _selector=None):
"""Evaluates this nodes using the supplied coordinates.
Parameters
Expand All @@ -973,7 +970,7 @@ def eval(self, coordinates, output=None, selector=None):
{requested_coordinates}
output : podpac.UnitsDataArray, optional
{eval_output}
selector: callable(coordinates, request_coordinates)
_selector: callable(coordinates, request_coordinates)
{eval_selector}
Returns
Expand All @@ -986,7 +983,7 @@ def eval(self, coordinates, output=None, selector=None):
If source it not time-depended (required by this node).
"""

source_output = self.source.eval(coordinates)
source_output = self.source.eval(coordinates, _selector=_selector)

# group
grouped = source_output.resample(time=self.resample)
Expand Down
15 changes: 7 additions & 8 deletions podpac/core/compositor/compositor.py
Expand Up @@ -15,7 +15,7 @@
from podpac.core.coordinates import Coordinates, Coordinates1d, StackedCoordinates
from podpac.core.coordinates.utils import Dimension
from podpac.core.utils import common_doc, NodeTrait
from podpac.core.node import COMMON_NODE_DOC, node_eval, Node
from podpac.core.node import COMMON_NODE_DOC, Node
from podpac.core.data.datasource import COMMON_DATA_DOC
from podpac.core.interpolation.interpolation import InterpolationTrait
from podpac.core.managers.multi_threading import thread_manager
Expand Down Expand Up @@ -176,7 +176,7 @@ def composite(self, coordinates, data_arrays, result=None):

raise NotImplementedError()

def iteroutputs(self, coordinates):
def iteroutputs(self, coordinates, _selector=None):
"""Summary
Parameters
Expand Down Expand Up @@ -211,7 +211,7 @@ def iteroutputs(self, coordinates):
# evaluate nodes in parallel using thread pool
self._multi_threaded = True
pool = thread_manager.get_thread_pool(processes=n_threads)
outputs = pool.map(lambda src: src.eval(coordinates), sources)
outputs = pool.map(lambda src: src.eval(coordinates, _selector=_selector), sources)
pool.close()
thread_manager.release_n_threads(n_threads)
for output in outputs:
Expand All @@ -221,11 +221,10 @@ def iteroutputs(self, coordinates):
# evaluate nodes serially
self._multi_threaded = False
for src in sources:
yield src.eval(coordinates)
yield src.eval(coordinates, _selector=_selector)

@node_eval
@common_doc(COMMON_COMPOSITOR_DOC)
def eval(self, coordinates, output=None, selector=None):
def _eval(self, coordinates, output=None, _selector=None):
"""Evaluates this nodes using the supplied coordinates.
Parameters
Expand All @@ -234,7 +233,7 @@ def eval(self, coordinates, output=None, selector=None):
{requested_coordinates}
output : podpac.UnitsDataArray, optional
{eval_output}
selector: callable(coordinates, request_coordinates)
_selector: callable(coordinates, request_coordinates)
{eval_selector}
Returns
Expand All @@ -255,7 +254,7 @@ def eval(self, coordinates, output=None, selector=None):
coordinates = coordinates.drop(extra)

self._evaluated_coordinates = coordinates
outputs = self.iteroutputs(coordinates)
outputs = self.iteroutputs(coordinates, _selector)
output = self.composite(coordinates, outputs, output)
return output

Expand Down
10 changes: 4 additions & 6 deletions podpac/core/data/datasource.py
Expand Up @@ -23,7 +23,6 @@
from podpac.core.node import Node, NodeException
from podpac.core.utils import common_doc
from podpac.core.node import COMMON_NODE_DOC
from podpac.core.node import node_eval
from podpac.core.interpolation.interpolation import Interpolate, InterpolationTrait

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -277,8 +276,7 @@ def _get_data(self, rc, rci):
# ------------------------------------------------------------------------------------------------------------------

@common_doc(COMMON_DATA_DOC)
@node_eval
def eval(self, coordinates, output=None, selector=None):
def _eval(self, coordinates, output=None, _selector=None):
"""Evaluates this node using the supplied coordinates.
The coordinates are mapped to the requested coordinates, interpolated if necessary, and set to
Expand All @@ -297,7 +295,7 @@ def eval(self, coordinates, output=None, selector=None):
Extra dimensions in the requested coordinates are dropped.
output : :class:`podpac.UnitsDataArray`, optional
{eval_output}
selector: callable(coordinates, request_coordinates)
_selector: callable(coordinates, request_coordinates)
{eval_selector}
Returns
Expand Down Expand Up @@ -364,8 +362,8 @@ def eval(self, coordinates, output=None, selector=None):
return output

# Use the selector
if selector is not None:
(rsc, rsci) = selector(rsc, rsci, coordinates)
if _selector is not None:
(rsc, rsci) = _selector(rsc, rsci, coordinates)

# Check the coordinate_index_type
if self.coordinate_index_type == "slice": # Most restrictive
Expand Down
16 changes: 7 additions & 9 deletions podpac/core/interpolation/interpolation.py
Expand Up @@ -10,7 +10,7 @@
import numpy as np

from podpac.core.settings import settings
from podpac.core.node import Node, node_eval
from podpac.core.node import Node
from podpac.core.utils import NodeTrait, common_doc
from podpac.core.units import UnitsDataArray
from podpac.core.coordinates import merge_dims, Coordinates
Expand All @@ -26,12 +26,11 @@ def interpolation_decorator():
class InterpolationMixin(tl.HasTraits):
interpolation = InterpolationTrait().tag(attr=True)

@node_eval
def eval(self, coordinates, output=None, selector=None):
def _eval(self, coordinates, output=None, _selector=None):
node = Interpolate(interpolation=self.interpolation)
node._set_interpolation()
node._source_xr = super().eval(coordinates, selector=node._interpolation.select_coordinates)
return node.eval(coordinates, output)
node._source_xr = super()._eval(coordinates, _selector=node._interpolation.select_coordinates)
return node.eval(coordinates, output=output)


class Interpolate(Node):
Expand Down Expand Up @@ -162,8 +161,7 @@ def _set_interpolation(self):
else:
self._interpolation = InterpolationManager(self.interpolation)

@node_eval
def eval(self, coordinates, output=None, selector=None):
def _eval(self, coordinates, output=None, _selector=None):
"""Evaluates this node using the supplied coordinates.
The coordinates are mapped to the requested coordinates, interpolated if necessary, and set to
Expand All @@ -182,7 +180,7 @@ def eval(self, coordinates, output=None, selector=None):
Extra dimensions in the requested coordinates are dropped.
output : :class:`podpac.UnitsDataArray`, optional
{eval_output}
selector :
_selector :
{eval_selector}
Returns
Expand Down Expand Up @@ -243,7 +241,7 @@ def _source_eval(self, coordinates, selector, output=None):
if isinstance(self._source_xr, UnitsDataArray):
return self._source_xr
else:
return self.source.eval(coordinates, output, selector)
return self.source.eval(coordinates, output=output, _selector=selector)

def find_coordinates(self):
"""
Expand Down
3 changes: 2 additions & 1 deletion podpac/core/managers/multi_process.py
Expand Up @@ -45,7 +45,8 @@ class Process(Node):
def outputs(self):
return self.source.outputs

def eval(self, coordinates, output=None, selector=None):
def eval(self, coordinates, **kwargs):
output = kwargs.get("output")
definition = self.source.json
coords = coordinates.json

Expand Down

0 comments on commit fe3ca74

Please sign in to comment.