Skip to content

Commit

Permalink
EP-3509 subclass GeopysparkDataCube from DriverDataCube
Browse files Browse the repository at this point in the history
preparation for Open-EO/openeo-python-client#100 (splitting client and driver data cube hierarchies)
  • Loading branch information
soxofaan committed Oct 6, 2020
1 parent 9161916 commit 68f22fd
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 36 deletions.
12 changes: 6 additions & 6 deletions openeogeotrellis/deploy/batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@
import shutil
import stat
import sys
import uuid
from pathlib import Path
from typing import Dict, List
from openeo.util import Rfc3339
import uuid

from openeo import ImageCollection
from pyspark import SparkContext

from openeo.util import Rfc3339
from openeo.util import TimingLogger, ensure_dir
from openeo_driver import ProcessGraphDeserializer
from openeo_driver.delayed_vector import DelayedVector
from openeo_driver.save_result import ImageCollectionResult, JSONResult, MultipleFilesResult
from pyspark import SparkContext

from openeogeotrellis.geopysparkdatacube import GeopysparkDataCube
from openeogeotrellis.deploy import load_custom_processes
from openeogeotrellis.utils import kerberos, describe_path, log_memory

Expand Down Expand Up @@ -157,7 +157,7 @@ def run_job(job_specification, output_file, metadata_file, api_version):
from shapely.geometry import mapping
geojsons = (mapping(geometry) for geometry in result.geometries)
result = JSONResult(geojsons)
if isinstance(result, ImageCollection):
if isinstance(result, GeopysparkDataCube):
format_options = job_specification.get('output', {})
result.download(output_file, bbox="", time="", **format_options)
_add_permissions(output_file, stat.S_IWGRP)
Expand Down
57 changes: 29 additions & 28 deletions openeogeotrellis/geopysparkdatacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
from shapely.geometry import Point, Polygon, MultiPolygon, GeometryCollection

import openeo.metadata
from openeo.imagecollection import ImageCollection
from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo.metadata import CollectionMetadata, Band
from openeo_driver.datacube import DriverDataCube
from openeo_driver.delayed_vector import DelayedVector
from openeo_driver.errors import FeatureUnsupportedException, OpenEOApiException, InternalException
from openeo_driver.save_result import AggregatePolygonResult
Expand All @@ -44,7 +44,7 @@
_log = logging.getLogger(__name__)


class GeopysparkDataCube(ImageCollection):
class GeopysparkDataCube(DriverDataCube):

# TODO: no longer dependent on ServiceRegistry so it can be removed
def __init__(self, pyramid: Pyramid, service_registry: AbstractServiceRegistry, metadata: CollectionMetadata = None):
Expand All @@ -60,7 +60,7 @@ def _get_jvm(self) -> JVMView:
def _is_spatial(self):
return self.pyramid.levels[self.pyramid.max_zoom].layer_type == gps.LayerType.SPATIAL

def apply_to_levels(self, func):
def apply_to_levels(self, func) -> 'GeopysparkDataCube':
"""
Applies a function to each level of the pyramid. The argument provided to the function is of type TiledRasterLayer
Expand Down Expand Up @@ -92,23 +92,25 @@ def _apply_to_levels_geotrellis_rdd(self, func):
pyramid = Pyramid({k:self._create_tilelayer(func( l.srdd.rdd(),k ),l.layer_type,k) for k,l in self.pyramid.levels.items()})
return GeopysparkDataCube(pyramid, self._service_registry, metadata=self.metadata)

def band_filter(self, bands) -> 'ImageCollection':
def band_filter(self, bands) -> 'GeopysparkDataCube':
return self.apply_to_levels(lambda rdd: rdd.bands(bands))

def _data_source_type(self):
return self.metadata.get("_vito", "data_source", "type", default="Accumulo")

def date_range_filter(self, start_date: Union[str, datetime, date],end_date: Union[str, datetime, date]) -> 'ImageCollection':
def date_range_filter(
self, start_date: Union[str, datetime, date], end_date: Union[str, datetime, date]
) -> 'GeopysparkDataCube':
return self.apply_to_levels(lambda rdd: rdd.filter_by_times([pd.to_datetime(start_date),pd.to_datetime(end_date)]))

def filter_bbox(self, west, east, north, south, crs=None, base=None, height=None) -> 'ImageCollection':
def filter_bbox(self, west, east, north, south, crs=None, base=None, height=None) -> 'GeopysparkDataCube':
# Note: the bbox is already extracted in `apply_process` and applied in `GeoPySparkLayerCatalog.load_collection` through the viewingParameters
return self

def rename_dimension(self, source:str, target:str):
def rename_dimension(self, source: str, target: str) -> 'GeopysparkDataCube':
return GeopysparkDataCube(self.pyramid, self._service_registry, self.metadata.rename_dimension(source, target))

def apply(self, process: str, arguments: dict={}) -> 'ImageCollection':
def apply(self, process: str, arguments: dict = {}) -> 'GeopysparkDataCube':
from openeogeotrellis.backend import SingleNodeUDFProcessGraphVisitor, GeoPySparkBackendImplementation
if isinstance(process, dict):
apply_callback = GeoPySparkBackendImplementation.accept_process_graph(process)
Expand All @@ -132,7 +134,7 @@ def apply(self, process: str, arguments: dict={}) -> 'ImageCollection':
applyProcess = gps.get_spark_context()._jvm.org.openeo.geotrellis.OpenEOProcesses().applyProcess
return self._apply_to_levels_geotrellis_rdd(lambda rdd, k: applyProcess(rdd, process))

def reduce(self, reducer: str, dimension: str) -> 'ImageCollection':
def reduce(self, reducer: str, dimension: str) -> 'GeopysparkDataCube':
# TODO: rename this to reduce_temporal (because it only supports temporal reduce)?
from .numpy_aggregators import var_composite, std_composite, min_composite, max_composite, sum_composite

Expand Down Expand Up @@ -180,14 +182,14 @@ def add_dimension(self, name: str, label: str, type: str = None):
metadata=self.metadata.add_dimension(name=name, label=label, type=type)
)

def rename_labels(self, dimension: str, target: list, source: list=None) -> 'ImageCollection':
def rename_labels(self, dimension: str, target: list, source: list=None) -> 'GeopysparkDataCube':
""" Renames the labels of the specified dimension in the data cube from source to target.
:param dimension: Dimension name
:param target: The new names for the labels.
:param source: The names of the labels as they are currently in the data cube.
:return: An ImageCollection instance
:return: An GeopysparkDataCube instance
"""
return GeopysparkDataCube(
pyramid=self.pyramid, service_registry=self._service_registry,
Expand Down Expand Up @@ -226,8 +228,7 @@ def _tile_to_datacube(cls, bands_numpy: np.ndarray, extent: SpatialExtent,
the_array = xr.DataArray(bands_numpy, coords=coords,dims=dims,name="openEODataChunk")
return DataCube(the_array)


def apply_tiles_spatiotemporal(self,function,context={}) -> ImageCollection:
def apply_tiles_spatiotemporal(self, function, context={}) -> 'GeopysparkDataCube':
"""
Apply a function to a group of tiles with the same spatial key.
:param function:
Expand Down Expand Up @@ -283,9 +284,9 @@ def rdd_function(openeo_metadata: CollectionMetadata, rdd):
from functools import partial
return self.apply_to_levels(partial(rdd_function, self.metadata))



def reduce_dimension(self, dimension: str, reducer:Union[ProcessGraphVisitor,Dict],binary=False, context=None) -> 'ImageCollection':
def reduce_dimension(
self, dimension: str, reducer: Union[ProcessGraphVisitor, Dict], binary=False, context=None
) -> 'GeopysparkDataCube':
from openeogeotrellis.backend import SingleNodeUDFProcessGraphVisitor,GeoPySparkBackendImplementation
if isinstance(reducer,dict):
reducer = GeoPySparkBackendImplementation.accept_process_graph(reducer)
Expand Down Expand Up @@ -314,8 +315,7 @@ def reduce_dimension(self, dimension: str, reducer:Union[ProcessGraphVisitor,Dic
result_collection = result_collection.apply_to_levels(lambda rdd: rdd.to_spatial_layer() if rdd.layer_type != gps.LayerType.SPATIAL else rdd)
return result_collection


def apply_tiles(self, function,context={}) -> 'ImageCollection':
def apply_tiles(self, function, context={}) -> 'GeopysparkDataCube':
"""Apply a function to the given set of bands in this image collection."""
#TODO apply .bands(bands)

Expand Down Expand Up @@ -358,7 +358,9 @@ def aggregate_time(self, temporal_window, aggregationfunction) -> Series :
#reduce
pass

def aggregate_temporal(self, intervals: List, labels: List, reducer, dimension: str = None) -> 'ImageCollection':
def aggregate_temporal(
self, intervals: List, labels: List, reducer, dimension: str = None
) -> 'GeopysparkDataCube':
""" Computes a temporal aggregation based on an array of date and/or time intervals.
Calendar hierarchies such as year, month, week etc. must be transformed into specific intervals by the clients. For each interval, all data along the dimension will be passed through the reducer. The computed values will be projected to the labels, so the number of labels and the number of intervals need to be equal.
Expand All @@ -370,7 +372,7 @@ def aggregate_temporal(self, intervals: List, labels: List, reducer, dimension:
:param reducer: A reducer to be applied on all values along the specified dimension. The reducer must be a callable process (or a set processes) that accepts an array and computes a single return value of the same type as the input values, for example median.
:param dimension: The temporal dimension for aggregation. All data along the dimension will be passed through the specified reducer. If the dimension is not set, the data cube is expected to only have one temporal dimension.
:return: An ImageCollection containing a result for each time window
:return: A data cube containing a result for each time window
"""
intervals_iso = list(map(lambda d:pd.to_datetime(d).strftime('%Y-%m-%dT%H:%M:%SZ'),intervals))
labels_iso = list(map(lambda l:pd.to_datetime(l).strftime('%Y-%m-%dT%H:%M:%SZ'), labels))
Expand All @@ -380,7 +382,7 @@ def aggregate_temporal(self, intervals: List, labels: List, reducer, dimension:
reducer = self._normalize_temporal_reducer(dimension, reducer)
return mapped_keys.apply_to_levels(lambda rdd: rdd.aggregate_by_cell(reducer))

def _aggregate_over_time_numpy(self, reducer: Callable[[Iterable[Tile]], Tile]) -> 'ImageCollection':
def _aggregate_over_time_numpy(self, reducer: Callable[[Iterable[Tile]], Tile]) -> 'GeopysparkDataCube':
"""
Aggregate over time.
:param reducer: a function that reduces n Tiles to a single Tile
Expand Down Expand Up @@ -517,7 +519,7 @@ def apply_kernel(self, kernel: np.ndarray, factor=1, border = 0, replace_invalid
lambda rdd, level: pysc._jvm.org.openeo.geotrellis.OpenEOProcesses().apply_kernel_spatial(rdd,geotrellis_tile))
return result_collection

def apply_neighborhood(self, process:Dict, size:List,overlap:List) -> 'ImageCollection':
def apply_neighborhood(self, process: Dict, size: List, overlap: List) -> 'GeopysparkDataCube':

spatial_dims = self.metadata.spatial_dimensions
if len(spatial_dims) != 2:
Expand Down Expand Up @@ -591,14 +593,13 @@ def apply_neighborhood(self, process:Dict, size:List,overlap:List) -> 'ImageColl

return result_collection


def resample_cube_spatial(self, target:'ImageCollection', method:str='near')-> 'ImageCollection':
def resample_cube_spatial(self, target: 'GeopysparkDataCube', method: str = 'near') -> 'GeopysparkDataCube':
"""
Resamples the spatial dimensions (x,y) of this data cube to a target data cube and return the results as a new data cube.
https://processes.openeo.org/#resample_cube_spatial
:param target: An ImageCollection that specifies the target
:param target: An data cube that specifies the target
:param method: The resampling method.
:return: A raster data cube with values warped onto the new projection.
Expand Down Expand Up @@ -695,13 +696,13 @@ def _get_resample_method(self, method):
}.get(method, gps.ResampleMethod.NEAREST_NEIGHBOR)
return resample_method

def linear_scale_range(self, input_min, input_max, output_min, output_max) -> 'ImageCollection':
def linear_scale_range(self, input_min, input_max, output_min, output_max) -> 'GeopysparkDataCube':
""" Color stretching
:param input_min: Minimum input value
:param input_max: Maximum input value
:param output_min: Minimum output value
:param output_max: Maximum output value
:return An ImageCollection instance
:return A data cube
"""
rescaled = self.apply_to_levels(lambda layer: layer.normalize(output_min, output_max, input_min, input_max))
output_range = output_max - output_min
Expand Down Expand Up @@ -1422,7 +1423,7 @@ def _ndvi_collection(self, red_index: int, nir_index: int) -> 'GeopysparkDataCub

return self.reduce_bands(visitor.accept_process_graph(reduce_graph))

def apply_atmospheric_correction(self) -> 'ImageCollection':
def apply_atmospheric_correction(self) -> 'GeopysparkDataCube':
# TODO: looking up the bandids is just coincidentally matching the lookuptable order
# in the future the lookuptables have to be converted and it should contain the band mappings by name, not by int id
bandIds=self.metadata.band_names
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pyproj<2.2.0
geopandas==0.6.2
numpy==1.17.0
openeo>=0.4.3a1.*
openeo_driver>=0.3.1a1.*
openeo_driver>=0.4.0a1.*
openeo_udf>=1.0.0rc2
py4j
python_dateutil
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
'flask',
'pyspark>=2.3.1,<2.4.0',
'openeo>=0.4.3a1.*',
'openeo_driver>=0.3.1a1.*',
'openeo_driver>=0.4.0a1.*',
'openeo_udf>=1.0.0rc2',
'matplotlib>=2.0.0,<3.0.0',
'colortools>=0.1.2',
Expand Down

0 comments on commit 68f22fd

Please sign in to comment.