Skip to content

Commit

Permalink
Merge 5af6afe into 5adaaf0
Browse files Browse the repository at this point in the history
  • Loading branch information
jmilloy committed Apr 3, 2020
2 parents 5adaaf0 + 5af6afe commit 400b725
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 198 deletions.
1 change: 0 additions & 1 deletion doc/source/api.rst
Expand Up @@ -177,7 +177,6 @@ Stitch multiple data sources together
:toctree: api/
:template: class.rst

podpac.compositor.Compositor
podpac.compositor.OrderedCompositor


Expand Down
2 changes: 1 addition & 1 deletion podpac/compositor.py
Expand Up @@ -4,4 +4,4 @@

# REMINDER: update api docs (doc/source/user/api.rst) to reflect changes to this file

from podpac.core.compositor import Compositor, OrderedCompositor
from podpac.core.compositor import OrderedCompositor
4 changes: 2 additions & 2 deletions podpac/core/algorithm/test/test_coord_select.py
Expand Up @@ -112,7 +112,7 @@ def test_year_substitution_orig_coords(self):

def test_year_substitution_missing_coords(self):
source = Array(
data=[[1, 2, 3], [4, 5, 6]],
source=[[1, 2, 3], [4, 5, 6]],
native_coordinates=podpac.Coordinates(
[podpac.crange("2018-01-01", "2018-01-02", "1,D"), podpac.clinspace(45, 66, 3)], dims=["time", "lat"]
),
Expand All @@ -124,7 +124,7 @@ def test_year_substitution_missing_coords(self):

def test_year_substitution_missing_coords_orig_coords(self):
source = Array(
data=[[1, 2, 3], [4, 5, 6]],
source=[[1, 2, 3], [4, 5, 6]],
native_coordinates=podpac.Coordinates(
[podpac.crange("2018-01-01", "2018-01-02", "1,D"), podpac.clinspace(45, 66, 3)], dims=["time", "lat"]
),
Expand Down
161 changes: 72 additions & 89 deletions podpac/core/compositor.py
Expand Up @@ -12,7 +12,7 @@

# Internal imports
from podpac.core.settings import settings
from podpac.core.coordinates import Coordinates, merge_dims
from podpac.core.coordinates import Coordinates, merge_dims, union
from podpac.core.utils import common_doc, NodeTrait, trait_is_defined
from podpac.core.units import UnitsDataArray
from podpac.core.node import COMMON_NODE_DOC, node_eval, Node
Expand All @@ -24,29 +24,21 @@


@common_doc(COMMON_COMPOSITOR_DOC)
class Compositor(Node):
"""Compositor
class BaseCompositor(Node):
"""A base class for compositor nodes.
Attributes
----------
sources : :class:`np.ndarray`
An array of sources. This is a numpy array as opposed to a list so that boolean indexing may be used to
subselect the nodes that will be evaluated.
shared_coordinates : :class:`podpac.Coordinates`.
Coordinates that are shared amongst all of the composited sources. Optional.
sources : list
Source nodes.
source_coordinates : :class:`podpac.Coordinates`
Coordinates that make each source unique. Must the same size as ``sources`` and single-dimensional. Optional.
interpolation : str, dict, optional
{interpolation}
is_source_coordinates_complete : Bool
Default is False. The source_coordinates do not have to completely describe the source. For example, the source
coordinates could include the year-month-day of the source, but the actual source also has hour-minute-second
information. In that case, source_coordinates is incomplete. This flag is used to automatically construct
native_coordinates.
Notes
-----
Developers of new Compositor nodes need to implement the `composite` method.
Developers of compositor subclasses nodes need to implement the `composite` method.
Multitheading::
* When MULTITHREADING is False, the compositor stops evaluated sources once the output is completely filled.
Expand All @@ -59,18 +51,8 @@ class Compositor(Node):

sources = tl.List(trait=NodeTrait()).tag(attr=True)
interpolation = InterpolationTrait(allow_none=True, default_value=None).tag(attr=True)
shared_coordinates = tl.Instance(Coordinates, allow_none=True, default_value=None).tag(attr=True)
source_coordinates = tl.Instance(Coordinates, allow_none=True, default_value=None).tag(attr=True)

is_source_coordinates_complete = tl.Bool(
False,
help=(
"This allows some optimizations but assumes that the sources have "
"native_coordinates=source_coordinate + shared_coordinate "
"IN THAT ORDER"
),
)

@tl.validate("sources")
def _validate_sources(self, d):
sources = d["value"]
Expand All @@ -87,15 +69,17 @@ def _validate_sources(self, d):

@tl.validate("source_coordinates")
def _validate_source_coordinates(self, d):
if d["value"] is not None:
if d["value"].ndim != 1:
raise ValueError("Invalid source_coordinates, invalid ndim (%d != 1)" % d["value"].ndim)
if d["value"] is None:
return None

if d["value"].size != len(self.sources):
raise ValueError(
"Invalid source_coordinates, source and source_coordinates size mismatch (%d != %d)"
% (d["value"].size, len(self.sources))
)
if d["value"].ndim != 1:
raise ValueError("Invalid source_coordinates, invalid ndim (%d != 1)" % d["value"].ndim)

if d["value"].size != len(self.sources):
raise ValueError(
"Invalid source_coordinates, source and source_coordinates size mismatch (%d != %d)"
% (d["value"].size, len(self.sources))
)

return d["value"]

Expand Down Expand Up @@ -123,10 +107,7 @@ def _default_outputs(self):
return outputs

def select_sources(self, coordinates):
"""Downselect compositor sources based on requested coordinates.
This is used during the :meth:`eval` process as an optimization
when :attr:`source_coordinates` are not pre-defined.
"""Select and prepare sources based on requested coordinates.
Parameters
----------
Expand All @@ -135,41 +116,52 @@ def select_sources(self, coordinates):
Returns
-------
:class:`np.ndarray`
Array of downselected sources
sources : :class:`np.ndarray`
Array of sources
Notes
-----
* If :attr:`source_coordinates` is defined, only sources that intersect the requested coordinates are selected.
* Sets sources :attr:`interpolation`.
"""

# if source coordinates are defined, use intersect
if self.source_coordinates is not None:
# intersecting sources only
# select intersecting sources, if possible
if self.source_coordinates is None:
sources = np.array(self.sources)
else:
try:
_, I = self.source_coordinates.intersect(coordinates, outer=True, return_indices=True)

except: # Likely non-monotonic coordinates
except:
# Likely non-monotonic coordinates
_, I = self.source_coordinates.intersect(coordinates, outer=False, return_indices=True)
i = I[0]
src_subset = np.array(self.sources)[i]
sources = np.array(self.sources)[i]

# no downselection possible - get all sources compositor
else:
src_subset = np.array(self.sources)
# set the interpolation properties for sources
if self.interpolation is not None:
for s in sources:
if s.trait_is_defined("interpolation"):
s.set_trait("interpolation", self.interpolation)

return src_subset
return sources

def composite(self, coordinates, outputs, result=None):
"""Implements the rules for compositing multiple sources together.
def composite(self, coordinates, data_arrays, result=None):
"""Implements the rules for compositing multiple sources together. Must be implemented by child classes.
Parameters
----------
outputs : list
A list of outputs that need to be composited together
coordinates : :class:`podpac.Coordinates`
{requested_coordinates}
data_arrays : list
Evaluated data from the sources.
result : UnitDataArray, optional
An optional pre-filled array may be supplied, otherwise the output will be allocated.
Raises
------
NotImplementedError
Returns
-------
{eval_return}
"""

raise NotImplementedError()

def iteroutputs(self, coordinates):
Expand All @@ -185,34 +177,16 @@ def iteroutputs(self, coordinates):
:class:`podpac.core.units.UnitsDataArray`
Output from source node eval method
"""
# downselect sources based on coordinates
src_subset = self.select_sources(coordinates)

if len(src_subset) == 0:
# get sources, potentially downselected
sources = self.select_sources(coordinates)

if len(sources) == 0:
yield self.create_output_array(coordinates)
return

# Set the interpolation properties for sources
if self.interpolation is not None:
for s in src_subset.ravel():
if self.trait_is_defined("interpolation"):
s.set_trait("interpolation", self.interpolation)

# Optimization: if coordinates complete and source coords is 1D,
# set native_coordinates unless they are set already
# WARNING: this assumes
# native_coords = source_coords + shared_coordinates
# NOT native_coords = shared_coords + source_coords
if self.is_source_coordinates_complete and self.source_coordinates.ndim == 1:
coords_subset = list(self.source_coordinates.intersect(coordinates, outer=True).coords.values())[0]
coords_dim = list(self.source_coordinates.dims)[0]
crs = self.source_coordinates.crs
for s, c in zip(src_subset, coords_subset):
nc = merge_dims([Coordinates(np.atleast_1d(c), dims=[coords_dim], crs=crs), self.shared_coordinates])
s.set_native_coordinates(nc)

if settings["MULTITHREADING"]:
n_threads = thread_manager.request_n_threads(len(src_subset))
n_threads = thread_manager.request_n_threads(len(sources))
if n_threads == 1:
thread_manager.release_n_threads(n_threads)
else:
Expand All @@ -222,7 +196,7 @@ def iteroutputs(self, coordinates):
# evaluate nodes in parallel using thread pool
self._multi_threaded = True
pool = thread_manager.get_thread_pool(processes=n_threads)
outputs = pool.map(lambda src: src.eval(coordinates), src_subset)
outputs = pool.map(lambda src: src.eval(coordinates), sources)
pool.close()
thread_manager.release_n_threads(n_threads)
for output in outputs:
Expand All @@ -231,7 +205,7 @@ def iteroutputs(self, coordinates):
else:
# evaluate nodes serially
self._multi_threaded = False
for src in src_subset:
for src in sources:
yield src.eval(coordinates)

@node_eval
Expand All @@ -252,7 +226,6 @@ def eval(self, coordinates, output=None):
"""

self._requested_coordinates = coordinates

outputs = self.iteroutputs(coordinates)
output = self.composite(coordinates, outputs, output)
return output
Expand All @@ -264,10 +237,10 @@ def find_coordinates(self):
Returns
-------
coords_list : list
list of available coordinates (Coordinate objects)
available coordinates from all of the sources.
"""

raise NotImplementedError("TODO")
return [coords for source in self.sources for coords in source.find_coordinates()]

@property
def _repr_keys(self):
Expand All @@ -278,21 +251,31 @@ def _repr_keys(self):
return keys


class OrderedCompositor(Compositor):
"""Compositor that combines sources based on their order in self.sources. Once a request contains no
nans, the result is returned.
class OrderedCompositor(BaseCompositor):
"""Compositor that combines sources based on their order in self.sources.
The requested data is interpolated by the sources before being composited.
Attributes
----------
sources : list
Source nodes, in order of preference. Later sources are only used where earlier sources do not provide data.
source_coordinates : :class:`podpac.Coordinates`
Coordinates that make each source unique. Must the same size as ``sources`` and single-dimensional. Optional.
interpolation : str, dict, optional
{interpolation}
"""

@common_doc(COMMON_COMPOSITOR_DOC)
def composite(self, coordinates, data_arrays, result=None):
"""Composites data_arrays in order that they appear.
"""Composites data_arrays in order that they appear. Once a request contains no nans, the result is returned.
Parameters
----------
coordinates : :class:`podpac.Coordinates`
{requested_coordinates}
data_arrays : generator
Generator that gives UnitDataArray's with the source values.
Evaluated source data, in the same order as the sources.
result : podpac.UnitsDataArray, optional
{eval_output}
Expand Down
2 changes: 2 additions & 0 deletions podpac/core/data/datasource.py
Expand Up @@ -500,6 +500,8 @@ def set_native_coordinates(self, coordinates, force=False):
---------
coordinates : :class:`podpac.Coordinates`
Coordinates to set. Usually these are coordinates that are shared across compositor sources.
NOTE: This is only currently used by SMAPCompositor. It should potentially be moved to the SMAPSource.
"""

if not self.trait_is_defined("_native_coordinates"):
Expand Down

0 comments on commit 400b725

Please sign in to comment.