Skip to content

Commit

Permalink
EP-3509 insert DriverDataCube class in ImageCollection hierarchy
Browse files Browse the repository at this point in the history
preparation for larger process graph processing refactor
preparation for Open-EO/openeo-python-client#100 (splitting client and driver data cube hierarchies)
  • Loading branch information
soxofaan committed Oct 6, 2020
1 parent dd5cead commit aaefc59
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 45 deletions.
66 changes: 33 additions & 33 deletions openeo_driver/ProcessGraphDeserializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import numpy as np
import requests

from openeo import ImageCollection
from openeo.capabilities import ComparableVersion
from openeo.metadata import MetadataException
from openeo_driver.backend import get_backend_implementation, UserDefinedProcessMetadata
from openeo_driver.datacube import DriverDataCube
from openeo_driver.delayed_vector import DelayedVector
from openeo_driver.errors import ProcessParameterRequiredException, \
ProcessParameterInvalidException
Expand Down Expand Up @@ -127,12 +127,12 @@ def get_process_registry(api_version: ComparableVersion) -> ProcessRegistry:
backend_implementation = get_backend_implementation()


def evaluate(processGraph: dict, viewingParameters=None) -> ImageCollection:
def evaluate(processGraph: dict, viewingParameters=None) -> DriverDataCube:
"""
Converts the json representation of a (part of a) process graph into the corresponding Python ImageCollection.
Converts the json representation of a (part of a) process graph into the corresponding Python data cube object.
:param processGraph:
:param viewingParameters:
:return: an ImageCollection
:return: an DriverDataCube
"""
if viewingParameters is None:
viewingParameters = {}
Expand Down Expand Up @@ -265,7 +265,7 @@ def extract_deep(args: dict, *steps):


@process
def load_collection(args: Dict, viewingParameters) -> ImageCollection:
def load_collection(args: Dict, viewingParameters) -> DriverDataCube:
name = extract_arg(args, 'id')
if 'temporal_extent' in args and args['temporal_extent'] is not None:
extent = args['temporal_extent']
Expand Down Expand Up @@ -295,7 +295,7 @@ def load_collection(args: Dict, viewingParameters) -> ImageCollection:
.param(name='options', description="options specific to the file format", schema={"type": "object"})
.returns(description="the data as a data cube", schema={})
)
def load_disk_data(args: Dict, viewingParameters) -> object:
def load_disk_data(args: Dict, viewingParameters) -> DriverDataCube:
format = extract_arg(args, 'format')
glob_pattern = extract_arg(args, 'glob_pattern')
options = args.get('options', {})
Expand All @@ -304,7 +304,7 @@ def load_disk_data(args: Dict, viewingParameters) -> object:


@process_registry_100.add_function
def apply_neighborhood(args: dict, ctx: dict) -> ImageCollection:
def apply_neighborhood(args: dict, ctx: dict) -> DriverDataCube:
process = extract_deep(args, "process", "process_graph")
size = extract_arg(args, 'size')
overlap = extract_arg(args, 'overlap')
Expand All @@ -314,7 +314,7 @@ def apply_neighborhood(args: dict, ctx: dict) -> ImageCollection:
return data_cube.apply_neighborhood(process,size,overlap)

@process
def apply_dimension(args: Dict, ctx: dict) -> ImageCollection:
def apply_dimension(args: Dict, ctx: dict) -> DriverDataCube:
return _evaluate_sub_process_graph(args, 'process', parent_process='apply_dimension', version=ctx["version"])


Expand All @@ -327,7 +327,7 @@ def save_result(args: Dict, viewingParameters) -> SaveResult:
if isinstance(data, SaveResult):
data.set_format(format, data)
return data
if isinstance(data, ImageCollection):
if isinstance(data, DriverDataCube):
return ImageCollectionResult(data, format, {**viewingParameters, **options})
elif isinstance(data, DelayedVector):
geojsons = (mapping(geometry) for geometry in data.geometries)
Expand All @@ -340,7 +340,7 @@ def save_result(args: Dict, viewingParameters) -> SaveResult:


@process
def apply(args: dict, ctx: dict) -> ImageCollection:
def apply(args: dict, ctx: dict) -> DriverDataCube:
"""
Applies a unary process (a local operation) to each value of the specified or all dimensions in the data cube.
Expand All @@ -361,7 +361,7 @@ def apply(args: dict, ctx: dict) -> ImageCollection:


@process_registry_040.add_function
def reduce(args: dict, ctx: dict) -> ImageCollection:
def reduce(args: dict, ctx: dict) -> DriverDataCube:
"""
https://open-eo.github.io/openeo-api/v/0.4.0/processreference/#reduce
Expand All @@ -386,7 +386,7 @@ def reduce(args: dict, ctx: dict) -> ImageCollection:


@process_registry_100.add_function
def reduce_dimension(args: dict, ctx: dict) -> ImageCollection:
def reduce_dimension(args: dict, ctx: dict) -> DriverDataCube:
reduce_pg = extract_deep(args, "reducer", "process_graph")
dimension = extract_arg(args, 'dimension')
data_cube = extract_arg(args, 'data')
Expand All @@ -398,7 +398,7 @@ def reduce_dimension(args: dict, ctx: dict) -> ImageCollection:


@process
def add_dimension(args: dict, ctx: dict):
def add_dimension(args: dict, ctx: dict) -> DriverDataCube:
data_cube = extract_arg(args, 'data')
return data_cube.add_dimension(
name=extract_arg(args, 'name'),
Expand All @@ -408,7 +408,7 @@ def add_dimension(args: dict, ctx: dict):


@process_registry_100.add_function
def rename_labels(args: dict, ctx: dict):
def rename_labels(args: dict, ctx: dict) -> DriverDataCube:
data_cube = extract_arg(args, 'data')
return data_cube.rename_labels(
dimension=extract_arg(args, 'dimension'),
Expand All @@ -417,7 +417,7 @@ def rename_labels(args: dict, ctx: dict):
)


def _check_dimension(cube: ImageCollection, dim: str, process: str):
def _check_dimension(cube: DriverDataCube, dim: str, process: str):
"""
Helper to check/validate the requested and available dimensions of a cube.
Expand Down Expand Up @@ -454,7 +454,7 @@ def _check_dimension(cube: ImageCollection, dim: str, process: str):


@process
def aggregate_temporal(args: dict, ctx: dict) -> ImageCollection:
def aggregate_temporal(args: dict, ctx: dict) -> DriverDataCube:
"""
https://open-eo.github.io/openeo-api/v/0.4.0/processreference/#reduce
Expand All @@ -465,7 +465,7 @@ def aggregate_temporal(args: dict, ctx: dict) -> ImageCollection:
return _evaluate_sub_process_graph(args, 'reducer', parent_process='aggregate_temporal', version=ctx["version"])


def _evaluate_sub_process_graph(args: dict, name: str, parent_process: str, version: str):
def _evaluate_sub_process_graph(args: dict, name: str, parent_process: str, version: str) -> DriverDataCube:
"""
Helper function to unwrap and evaluate a sub-process_graph
Expand All @@ -481,21 +481,21 @@ def _evaluate_sub_process_graph(args: dict, name: str, parent_process: str, vers


@process_registry_040.add_function
def aggregate_polygon(args: dict, ctx: dict) -> ImageCollection:
def aggregate_polygon(args: dict, ctx: dict) -> DriverDataCube:
return _evaluate_sub_process_graph(args, 'reducer', parent_process='aggregate_polygon', version=ctx["version"])


@process_registry_100.add_function
def aggregate_spatial(args: dict, ctx: dict) -> ImageCollection:
def aggregate_spatial(args: dict, ctx: dict) -> DriverDataCube:
return _evaluate_sub_process_graph(args, 'reducer', parent_process='aggregate_spatial', version=ctx["version"])


@process_registry_040.add_function(name="mask")
def mask_04(args: dict, viewingParameters) -> ImageCollection:
def mask_04(args: dict, viewingParameters) -> DriverDataCube:
mask = extract_arg(args, 'mask')
replacement = args.get('replacement', None)
cube = extract_arg(args, 'data')
if isinstance(mask, ImageCollection):
if isinstance(mask, DriverDataCube):
image_collection = cube.mask(mask=mask, replacement=replacement)
else:
polygon = mask.geometries[0] if isinstance(mask, DelayedVector) else shape(mask)
Expand All @@ -507,15 +507,15 @@ def mask_04(args: dict, viewingParameters) -> ImageCollection:


@process_registry_100.add_function
def mask(args: dict, ctx: dict) -> ImageCollection:
def mask(args: dict, ctx: dict) -> DriverDataCube:
mask = extract_arg(args, 'mask')
replacement = args.get('replacement', None)
image_collection = extract_arg(args, 'data').mask(mask=mask, replacement=replacement)
return image_collection


@process_registry_100.add_function
def mask_polygon(args: dict, ctx: dict) -> ImageCollection:
def mask_polygon(args: dict, ctx: dict) -> DriverDataCube:
mask = extract_arg(args, 'mask')
replacement = args.get('replacement', None)
inside = args.get('inside', False)
Expand All @@ -528,21 +528,21 @@ def mask_polygon(args: dict, ctx: dict) -> ImageCollection:


@process
def filter_temporal(args: Dict, viewingParameters) -> ImageCollection:
def filter_temporal(args: Dict, viewingParameters) -> DriverDataCube:
# Note: the temporal range is already extracted in `apply_process` and applied in `GeoPySparkLayerCatalog.load_collection` through the viewingParameters
image_collection = extract_arg(args, 'data')
return image_collection


@process
def filter_bbox(args: Dict, viewingParameters) -> ImageCollection:
def filter_bbox(args: Dict, viewingParameters) -> DriverDataCube:
# Note: the bbox is already extracted in `apply_process` and applied in `GeoPySparkLayerCatalog.load_collection` through the viewingParameters
image_collection = extract_arg(args, 'data')
return image_collection


@process
def filter_bands(args: Dict, viewingParameters) -> ImageCollection:
def filter_bands(args: Dict, viewingParameters) -> DriverDataCube:
# Note: the bands are already extracted in `apply_process` and applied in `GeoPySparkLayerCatalog.load_collection` through the viewingParameters
image_collection = extract_arg(args, 'data')
return image_collection
Expand All @@ -564,7 +564,7 @@ def zonal_statistics(args: Dict, viewingParameters) -> Dict:


@process
def apply_kernel(args: Dict, viewingParameters) -> ImageCollection:
def apply_kernel(args: Dict, viewingParameters) -> DriverDataCube:
image_collection = extract_arg(args, 'data')
kernel = np.asarray(extract_arg(args, 'kernel'))
factor = args.get('factor', 1.0)
Expand All @@ -578,7 +578,7 @@ def apply_kernel(args: Dict, viewingParameters) -> ImageCollection:


@process
def ndvi(args: dict, viewingParameters: dict) -> ImageCollection:
def ndvi(args: dict, viewingParameters: dict) -> DriverDataCube:
image_collection = extract_arg(args, 'data')

version = ComparableVersion(viewingParameters["version"])
Expand All @@ -594,7 +594,7 @@ def ndvi(args: dict, viewingParameters: dict) -> ImageCollection:


@process
def resample_spatial(args: dict, viewingParameters: dict) -> ImageCollection:
def resample_spatial(args: dict, viewingParameters: dict) -> DriverDataCube:
image_collection = extract_arg(args, 'data')
resolution = args.get('resolution', 0)
projection = args.get('projection', None)
Expand All @@ -604,15 +604,15 @@ def resample_spatial(args: dict, viewingParameters: dict) -> ImageCollection:


@process
def resample_cube_spatial(args: dict, viewingParameters: dict) -> ImageCollection:
def resample_cube_spatial(args: dict, viewingParameters: dict) -> DriverDataCube:
image_collection = extract_arg(args, 'data')
target_image_collection = extract_arg(args, 'target')
method = args.get('method', 'near')
return image_collection.resample_cube_spatial(target=target_image_collection, method=method)


@process
def merge_cubes(args: dict, viewingParameters: dict) -> ImageCollection:
def merge_cubes(args: dict, viewingParameters: dict) -> DriverDataCube:
cube1 = extract_arg(args, 'cube1')
cube2 = extract_arg(args, 'cube2')
overlap_resolver = args.get('overlap_resolver')
Expand Down Expand Up @@ -670,7 +670,7 @@ def run_udf(args: dict, viewingParameters: dict):


@process
def linear_scale_range(args: dict, viewingParameters: dict) -> ImageCollection:
def linear_scale_range(args: dict, viewingParameters: dict) -> DriverDataCube:
image_collection = extract_arg(args, 'x')

inputMin = extract_arg(args, "inputMin")
Expand Down Expand Up @@ -701,7 +701,7 @@ def constant(args: dict, viewingParameters:dict):
"type": "object"
})
)
def histogram(_args, _viewingParameters) -> None:
def histogram(_args, _viewingParameters):
# currently only available as a reducer passed to e.g. aggregate_polygon
raise ProcessUnsupportedException('histogram')

Expand Down
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.3.1a1'
__version__ = '0.4.0a1'
6 changes: 3 additions & 3 deletions openeo_driver/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
from pathlib import Path
from typing import List, Union, NamedTuple, Dict

from openeo import ImageCollection
from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo.util import rfc3339
from openeo_driver.datacube import DriverDataCube
from openeo_driver.errors import CollectionNotFoundException, ServiceUnsupportedException
from openeo_driver.utils import read_json

Expand Down Expand Up @@ -154,7 +154,7 @@ def get_collection_metadata(self, collection_id: str) -> dict:
"""
return self._get(collection_id=collection_id)

def load_collection(self, collection_id: str, viewing_parameters: dict) -> ImageCollection:
def load_collection(self, collection_id: str, viewing_parameters: dict) -> DriverDataCube:
raise NotImplementedError


Expand Down Expand Up @@ -379,7 +379,7 @@ def file_formats(self) -> dict:
"""
return {"input": {}, "output": {}}

def load_disk_data(self, format: str, glob_pattern: str, options: dict, viewing_parameters: dict) -> object:
def load_disk_data(self, format: str, glob_pattern: str, options: dict, viewing_parameters: dict) -> DriverDataCube:
# TODO: move this to catalog "microservice"
raise NotImplementedError

Expand Down
9 changes: 9 additions & 0 deletions openeo_driver/datacube.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from openeo import ImageCollection
from openeo.metadata import CollectionMetadata


class DriverDataCube(ImageCollection):
"""Base class for "driver" side data cubes."""

def __init__(self, metadata: CollectionMetadata = None):
self.metadata = metadata if isinstance(metadata, CollectionMetadata) else CollectionMetadata(metadata or {})
13 changes: 7 additions & 6 deletions openeo_driver/dummy/dummy_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@
from typing import List, Dict, Union, Tuple
from unittest.mock import Mock

from openeo import ImageCollection
from shapely.geometry import Polygon, MultiPolygon
from shapely.geometry.collection import GeometryCollection

from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo.metadata import CollectionMetadata
from openeo_driver.backend import SecondaryServices, OpenEoBackendImplementation, CollectionCatalog, ServiceMetadata, \
BatchJobs, BatchJobMetadata, OidcProvider, UserDefinedProcesses, UserDefinedProcessMetadata
from openeo_driver.datacube import DriverDataCube
from openeo_driver.delayed_vector import DelayedVector
from openeo_driver.errors import JobNotFoundException, JobNotFinishedException, ProcessGraphNotFoundException
from openeo_driver.save_result import AggregatePolygonResult
from shapely.geometry import Polygon, MultiPolygon
from shapely.geometry.collection import GeometryCollection

DEFAULT_DATETIME = datetime(2020, 4, 23, 16, 20, 27)

Expand Down Expand Up @@ -100,7 +101,7 @@ def get_log_entries(self, service_id: str, user_id: str, offset: str) -> List[di
]


class DummyImageCollection(ImageCollection):
class DummyDataCube(DriverDataCube):
# TODO move all Mock stuff here?
pass

Expand Down Expand Up @@ -180,12 +181,12 @@ class DummyCatalog(CollectionCatalog):
def __init__(self):
super().__init__(all_metadata=self._COLLECTIONS)

def load_collection(self, collection_id: str, viewing_parameters: dict) -> ImageCollection:
def load_collection(self, collection_id: str, viewing_parameters: dict) -> DriverDataCube:
if collection_id in collections:
return collections[collection_id]

# TODO simplify all this mock/return_value stuff?
image_collection = DummyImageCollection(
image_collection = DummyDataCube(
metadata=CollectionMetadata(metadata=self.get_collection_metadata(collection_id))
)

Expand Down
4 changes: 2 additions & 2 deletions openeo_driver/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
from werkzeug.exceptions import HTTPException, NotFound
from werkzeug.middleware.proxy_fix import ProxyFix

from openeo import ImageCollection
from openeo.capabilities import ComparableVersion
from openeo.util import date_to_rfc3339, dict_no_none, deep_get, Rfc3339
from openeo_driver.ProcessGraphDeserializer import evaluate, get_process_registry
from openeo_driver.backend import ServiceMetadata, BatchJobMetadata, UserDefinedProcessMetadata, \
get_backend_implementation, ErrorSummary
from openeo_driver.datacube import DriverDataCube
from openeo_driver.delayed_vector import DelayedVector
from openeo_driver.errors import OpenEOApiException, ProcessGraphMissingException, ServiceNotFoundException, \
FilePathInvalidException, ProcessGraphNotFoundException, FeatureUnsupportedException
Expand Down Expand Up @@ -454,7 +454,7 @@ def execute():
})

# TODO unify all this output handling within SaveResult logic?
if isinstance(result, ImageCollection):
if isinstance(result, DriverDataCube):
format_options = post_data.get('output', {})
filename = result.download(None, bbox="", time="", **format_options)
return send_from_directory(os.path.dirname(filename), os.path.basename(filename))
Expand Down

0 comments on commit aaefc59

Please sign in to comment.