Skip to content

Commit

Permalink
Issue #385 Improve support for running UDF directly on vector cube
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Mar 10, 2023
1 parent c590db2 commit 2d63430
Show file tree
Hide file tree
Showing 6 changed files with 350 additions and 143 deletions.
179 changes: 173 additions & 6 deletions openeo/rest/_datacube.py
@@ -1,14 +1,23 @@
import json
import logging
import pathlib
import re
import sys
import typing
from pathlib import Path
from typing import Optional, Union, Tuple
import uuid
import warnings
from pathlib import Path
from typing import Union, Tuple, Optional

import requests

import openeo.processes
from openeo.internal.compat import nullcontext
from openeo.internal.graph_building import PGNode, _FromNodeMixin
from openeo.internal.jupyter import render_component
from openeo.internal.warnings import UserDeprecationWarning
from openeo.rest import OpenEoClientException
from openeo.util import dict_no_none

if typing.TYPE_CHECKING:
# Imports for type checking only (circular import issue at runtime).
Expand Down Expand Up @@ -130,8 +139,166 @@ def _build_pgnode(self, process_id: str, arguments: dict, namespace: Optional[st
def _repr_html_(self):
process = {"process_graph": self.flat_graph()}
parameters = {
'id': uuid.uuid4().hex,
'explicit-zoom': True,
'height': '400px'
"id": uuid.uuid4().hex,
"explicit-zoom": True,
"height": "400px",
}
return render_component('model-builder', data = process, parameters = parameters)
return render_component("model-builder", data=process, parameters=parameters)


class UDF:
"""
Helper class to load UDF code (e.g. from file) and embed them as "callback" or child process in a process graph.
Usage example:
.. code-block:: python
udf = UDF.from_file("my-udf-code.py")
cube = cube.apply(process=udf)
.. versionchanged:: 0.13.0
Added auto-detection of ``runtime``.
Specifying the ``data`` argument is not necessary anymore, and actually deprecated.
Added :py:meth:`from_file` to simplify loading UDF code from a file.
See :ref:`old_udf_api` for more background about the changes.
"""

__slots__ = ["code", "_runtime", "version", "context", "_source"]

def __init__(
self,
code: str,
runtime: Optional[str] = None,
data=None, # TODO #181 remove `data` argument
version: Optional[str] = None,
context: Optional[dict] = None,
_source=None,
):
"""
Construct a UDF object from given code string and other argument related to the ``run_udf`` process.
:param code: UDF source code string (Python, R, ...)
:param runtime: optional UDF runtime identifier, will be autodetected from source code if omitted.
:param data: unused leftover from old API. Don't use this argument, it will be removed in a future release.
:param version: optional UDF runtime version string
:param context: optional additional UDF context data
:param _source: (for internal use) source identifier
"""
# TODO: automatically dedent code (when literal string) ?
self.code = code
self._runtime = runtime
self.version = version
self.context = context
self._source = _source
if data is not None:
# TODO #181 remove `data` argument
warnings.warn(
f"The `data` argument of `{self.__class__.__name__}` is deprecated, unused and will be removed in a future release.",
category=UserDeprecationWarning,
stacklevel=2,
)

def get_runtime(self, connection: "openeo.Connection") -> str:
return self._runtime or self._guess_runtime(connection=connection)

@classmethod
def from_file(
cls,
path: Union[str, pathlib.Path],
runtime: Optional[str] = None,
version: Optional[str] = None,
context: Optional[dict] = None,
) -> "UDF":
"""
Load a UDF from a local file.
.. seealso::
:py:meth:`from_url` for loading from a URL.
:param path: path to the local file with UDF source code
:param runtime: optional UDF runtime identifier, will be auto-detected from source code if omitted.
:param version: optional UDF runtime version string
:param context: optional additional UDF context data
"""
path = pathlib.Path(path)
code = path.read_text(encoding="utf-8")
return cls(
code=code, runtime=runtime, version=version, context=context, _source=path
)

@classmethod
def from_url(
cls,
url: str,
runtime: Optional[str] = None,
version: Optional[str] = None,
context: Optional[dict] = None,
) -> "UDF":
"""
Load a UDF from a URL.
.. seealso::
:py:meth:`from_file` for loading from a local file.
:param url: URL path to load the UDF source code from
:param runtime: optional UDF runtime identifier, will be auto-detected from source code if omitted.
:param version: optional UDF runtime version string
:param context: optional additional UDF context data
"""
resp = requests.get(url)
resp.raise_for_status()
code = resp.text
return cls(
code=code, runtime=runtime, version=version, context=context, _source=url
)

def _guess_runtime(self, connection: "openeo.Connection") -> str:
"""Guess UDF runtime from UDF source (path) or source code."""
# First, guess UDF language
language = None
if isinstance(self._source, pathlib.Path):
language = self._guess_runtime_from_suffix(self._source.suffix)
elif isinstance(self._source, str):
url_match = re.match(
r"https?://.*?(?P<suffix>\.\w+)([&#].*)?$", self._source
)
if url_match:
language = self._guess_runtime_from_suffix(url_match.group("suffix"))
if not language:
# Guess language from UDF code
if re.search(r"^def [\w0-9_]+\(", self.code, flags=re.MULTILINE):
language = "Python"
# TODO: detection heuristics for R and other languages?
if not language:
raise OpenEoClientException("Failed to detect language of UDF code.")
# Find runtime for language
runtimes = {k.lower(): k for k in connection.list_udf_runtimes().keys()}
if language.lower() in runtimes:
return runtimes[language.lower()]
else:
raise OpenEoClientException(
f"Failed to match UDF language {language!r} with a runtime ({runtimes})"
)

def _guess_runtime_from_suffix(self, suffix: str) -> Union[str]:
return {
".py": "Python",
".r": "R",
}.get(suffix.lower())

def get_run_udf_callback(
self, connection: "openeo.Connection", data_parameter: str = "data"
) -> PGNode:
"""
For internal use: construct `run_udf` node to be used as callback in `apply`, `reduce_dimension`, ...
"""
arguments = dict_no_none(
data={"from_parameter": data_parameter},
udf=self.code,
runtime=self.get_runtime(connection=connection),
version=self.version,
context=self.context,
)
return PGNode(process_id="run_udf", arguments=arguments)
137 changes: 2 additions & 135 deletions openeo/rest/datacube.py
Expand Up @@ -10,14 +10,12 @@
import datetime
import logging
import pathlib
import re
import typing
import warnings
from builtins import staticmethod
from typing import List, Dict, Union, Tuple, Optional, Any

import numpy as np
import requests
import shapely.geometry
import shapely.geometry.base
from shapely.geometry import Polygon, MultiPolygon, mapping
Expand All @@ -32,8 +30,8 @@
from openeo.metadata import CollectionMetadata, Band, BandDimension, TemporalDimension, SpatialDimension
from openeo.processes import ProcessBuilder
from openeo.rest import BandMathException, OperatorException, OpenEoClientException
from openeo.rest._datacube import _ProcessGraphAbstraction, THIS
from openeo.rest.job import BatchJob, RESTJob
from openeo.rest._datacube import _ProcessGraphAbstraction, THIS, UDF
from openeo.rest.job import BatchJob
from openeo.rest.mlmodel import MlModel
from openeo.rest.service import Service
from openeo.rest.udp import RESTUserDefinedProcess
Expand All @@ -50,137 +48,6 @@
log = logging.getLogger(__name__)


class UDF:
"""
Helper class to load UDF code (e.g. from file) and embed them as "callback" or child process in a process graph.
Usage example:
.. code-block:: python
udf = UDF.from_file("my-udf-code.py")
cube = cube.apply(process=udf)
.. versionchanged:: 0.13.0
Added auto-detection of ``runtime``.
Specifying the ``data`` argument is not necessary anymore, and actually deprecated.
Added :py:meth:`from_file` to simplify loading UDF code from a file.
See :ref:`old_udf_api` for more background about the changes.
"""

__slots__ = ["code", "runtime", "version", "context", "_source"]

def __init__(
self, code: str, runtime: Optional[str] = None, data=None,
version: Optional[str] = None, context: Optional[dict] = None, _source=None,
):
"""
Construct a UDF object from given code string and other argument related to the ``run_udf`` process.
:param code: UDF source code string (Python, R, ...)
:param runtime: optional UDF runtime identifier, will be autodetected from source code if omitted.
:param data: unused leftover from old API. Don't use this argument, it will be removed in a future release.
:param version: optional UDF runtime version string
:param context: optional additional UDF context data
:param _source: (for internal use) source identifier
"""
# TODO: automatically dedent code (when literal string) ?
self.code = code
self.runtime = runtime
self.version = version
self.context = context
self._source = _source
if data is not None:
# TODO #181 remove `data` argument
warnings.warn(
f"The `data` argument of `{self.__class__.__name__}` is deprecated, unused and will be removed in a future release.",
category=UserDeprecationWarning, stacklevel=2
)

@classmethod
def from_file(
cls, path: Union[str, pathlib.Path], runtime: Optional[str] = None, version: Optional[str] = None,
context: Optional[dict] = None
) -> "UDF":
"""
Load a UDF from a local file.
.. seealso::
:py:meth:`from_url` for loading from a URL.
:param path: path to the local file with UDF source code
:param runtime: optional UDF runtime identifier, will be auto-detected from source code if omitted.
:param version: optional UDF runtime version string
:param context: optional additional UDF context data
"""
path = pathlib.Path(path)
code = path.read_text(encoding="utf-8")
return cls(code=code, runtime=runtime, version=version, context=context, _source=path)

@classmethod
def from_url(
cls, url: str, runtime: Optional[str] = None, version: Optional[str] = None,
context: Optional[dict] = None
) -> "UDF":
"""
Load a UDF from a URL.
.. seealso::
:py:meth:`from_file` for loading from a local file.
:param url: URL path to load the UDF source code from
:param runtime: optional UDF runtime identifier, will be auto-detected from source code if omitted.
:param version: optional UDF runtime version string
:param context: optional additional UDF context data
"""
resp = requests.get(url)
resp.raise_for_status()
code = resp.text
return cls(code=code, runtime=runtime, version=version, context=context, _source=url)

def _guess_runtime(self, connection: "openeo.Connection") -> str:
"""Guess UDF runtime from UDF source (path) or source code."""
# First, guess UDF language
language = None
if isinstance(self._source, pathlib.Path):
language = self._guess_runtime_from_suffix(self._source.suffix)
elif isinstance(self._source, str):
url_match = re.match(r"https?://.*?(?P<suffix>\.\w+)([&#].*)?$", self._source)
if url_match:
language = self._guess_runtime_from_suffix(url_match.group("suffix"))
if not language:
# Guess language from UDF code
if re.search(r"^def [\w0-9_]+\(", self.code, flags=re.MULTILINE):
language = "Python"
# TODO: detection heuristics for R and other languages?
if not language:
raise OpenEoClientException("Failed to detect language of UDF code.")
# Find runtime for language
runtimes = {k.lower(): k for k in connection.list_udf_runtimes().keys()}
if language.lower() in runtimes:
return runtimes[language.lower()]
else:
raise OpenEoClientException(f"Failed to match UDF language {language!r} with a runtime ({runtimes})")

def _guess_runtime_from_suffix(self, suffix: str) -> Union[str]:
return {
".py": "Python",
".r": "R",
}.get(suffix.lower())

def get_run_udf_callback(self, connection: "openeo.Connection", data_parameter: str = "data") -> PGNode:
"""
For internal use: construct `run_udf` node to be used as callback in `apply`, `reduce_dimension`, ...
"""
arguments = dict_no_none(
data={"from_parameter": data_parameter},
udf=self.code,
runtime=self.runtime or self._guess_runtime(connection=connection),
version=self.version,
context=self.context,
)
return PGNode(process_id="run_udf", arguments=arguments)


class DataCube(_ProcessGraphAbstraction):
Expand Down
19 changes: 17 additions & 2 deletions openeo/rest/vectorcube.py
Expand Up @@ -6,7 +6,7 @@
from openeo.internal.graph_building import PGNode
from openeo.internal.warnings import legacy_alias
from openeo.metadata import CollectionMetadata
from openeo.rest._datacube import _ProcessGraphAbstraction
from openeo.rest._datacube import _ProcessGraphAbstraction, UDF
from openeo.rest.job import BatchJob
from openeo.util import dict_no_none

Expand Down Expand Up @@ -48,11 +48,26 @@ def process(

@openeo_process
def run_udf(
self, udf: str, runtime: str, version: Optional[str] = None, context: Optional[dict] = None
self,
udf: Union[str, UDF],
runtime: Optional[str] = None,
version: Optional[str] = None,
context: Optional[dict] = None,
) -> "VectorCube":
"""
.. versionadded:: 0.10.0
"""
if isinstance(udf, UDF):
if not version:
version = udf.version
if not context:
context = udf.context
if not runtime:
runtime = udf.get_runtime(connection=self.connection)
udf = udf.code
else:
if not runtime:
raise ValueError("Argument `runtime` must be specified")
return self.process(
process_id="run_udf",
data=self, udf=udf, runtime=runtime,
Expand Down

0 comments on commit 2d63430

Please sign in to comment.