Skip to content

Commit

Permalink
feat: initial support for Extended Operations
Browse files Browse the repository at this point in the history
Certain APIs with Long-Running Operations deviate from the semantics
in https://google.aip.dev/151 and instead define custom operation
messages, aka Extended Operations.

This change adds a PollingFuture subclass designed to be used with
Extended Operations. It is analogous and broadly similar to
google.api_core.operation.Operation and subclasses
google.api_core.future.polling.PollingFuture.

The full description of Extended Operation semantics is beyond the
scope of this change.
  • Loading branch information
software-dov committed Feb 25, 2022
1 parent 4422cce commit 97da2d9
Show file tree
Hide file tree
Showing 3 changed files with 387 additions and 17 deletions.
176 changes: 176 additions & 0 deletions google/api_core/extended_operation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Futures for extended long-running operations returned from Google Cloud APIs.
These futures can be used to synchronously wait for the result of a
lon-running operations using :meth:`ExtendedOperation.result`:
.. code-black:: python
extended_operation = my_api_client.long_running_method()
result =
"""

import threading

from google.api_core import exceptions
from google.api_core.future import polling


class ExtendedOperation(polling.PollingFuture):
"""An ExtendedOperation future for interacting with a Google API Long-Running Operation.
Args:
extended_operation (proto.Message): The initial operation.
refresh (Callable[[], type(extended_operation)]): A callable that returns
the latest state of the operation.
cancel (Callable[[], None]): A callable that tries to cancel the operation.
retry: Optional(google.api_core.retry.Retry): The retry configuration used
when polling. This can be used to control how often :meth:`done`
is polled. Regardless of the retry's ``deadline``, it will be
overridden by the ``timeout`` argument to :meth:`result`.
Note: Most long-running API methods use google.api_core.operation.Operation
This class is a wrapper for a subset of methods that use alternative
Long-Running Operation (LRO) semantics.
"""

def __init__(
self, extended_operation, refresh, cancel, retry=polling.DEFAULT_RETRY
):
super().__init__(retry=retry)
# Note: there is not a concrete type the extended operation must be.
# It MUST have fields that correspond to the following, POSSIBLY WITH DIFFERENT NAMES:
# * name: str
# * status: Union[str, bool, enum.Enum]
# * error_code: int
# * error_message: str
self._extended_operation = extended_operation
self._refresh = refresh
self._cancel = cancel
# Note: the extended operation does not give a good way to indicate cancellation.
# We make do with manually tracking cancellation and checking for doneness.
self._cancelled = False
self._completion_lock = threading.Lock()
# Invoke in case the operation came back already complete.
self._handle_refreshed_operation()

# Note: the following four properties MUST be overridden in a subclass
# if, and only if, the fields in the corresponding extended operation message
# have different names.
#
# E.g. we have an extended operation class that looks like
#
# class MyOperation(proto.Message):
# moniker = proto.Field(proto.STRING, number=1)
# status_msg = proto.Field(proto.STRING, number=2)
# optional http_error_code = proto.Field(proto.INT32, number=3)
# optional http_error_msg = proto.Field(proto.STRING, number=4)
#
# the ExtendedOperation subclass would provide property overrrides that map
# to these (poorly named) fields.
@property
def name(self):
return self._extended_operation.name

@property
def status(self):
return self._extended_operation.status

@property
def error_code(self):
return self._extended_operation.error_code

@property
def error_message(self):
return self._extended_operation.error_message

def done(self, retry=polling.DEFAULT_RETRY):
self._refresh_and_update(retry)
return self._extended_operation.done

def cancel(self):
if self.done():
return False

self._cancel()
self._cancelled = True
return True

def cancelled(self):
# TODO(dovs): there is not currently a good way to determine whether the
# operation has been cancelled.
# The best we can do is manually keep track of cancellation
# and check for doneness.
if not self._cancelled:
return False

self._refresh_and_update()
return self._extended_operation.done

def _refresh_and_update(self, retry=polling.DEFAULT_RETRY):
if not self._extended_operation.done:
self._extended_operation = self._refresh(retry=retry)
self._handle_refreshed_operation()

def _handle_refreshed_operation(self):
with self._completion_lock:
if not self._extended_operation.done:
return

if self.error_code and self.error_message:
# TODO(dovs): handle this better.
exception = exceptions.from_grpc_status(
status_code=self.error_code,
message=self.error_message,
response=self._extended_operation,
)
self.set_exception(exception)
elif self.error_code or self.error_message:
exception = exceptions.GoogleAPICallError(
f"Unexpected error {self.error_code}: {self.error_message}"
)
self.set_exception(exception)
else:
# Extended operations have no payload.
self.set_result(None)

@classmethod
def make(cls, refresh, cancel, extended_operation, **kwargs):
# Note: it is the caller's responsibility to set up refresh and cancel
# with their correct request argument.
# The reason for this is that the services that use Extended Operations
# have rpcs that look something like the following:
# // service.proto
# service MyLongService {
# rpc StartLongTask(StartLongTaskRequest) returns (ExtendedOperation) {
# option (google.cloud.operation_service) = "CustomOperationService";
# }
# }
#
# service CustomOperationService {
# rpc Get(GetOperationRequest) returns (ExtendedOperation) {
# option (google.cloud.operation_polling_method) = true;
# }
# }
#
# Any info needed for the poll, e.g. a name, path params, etc.
# is held in the request, which the initial client method is in a much
# better position to make made because the caller made the initial request.
#
# TL;DR: the caller sets up closures for refresh and cancel that carry
# the properly configured requests.
return cls(extended_operation, refresh, cancel, **kwargs)
46 changes: 29 additions & 17 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def default(session, install_grpc=True):
)

# Install all test dependencies, then install this package in-place.
session.install("mock", "pytest", "pytest-cov")
session.install("dataclasses", "mock", "pytest", "pytest-cov", "pytest-xdist")
if install_grpc:
session.install("-e", ".[grpc]", "-c", constraints_path)
else:
Expand All @@ -102,28 +102,36 @@ def default(session, install_grpc=True):
"python",
"-m",
"py.test",
"--quiet",
"--cov=google.api_core",
"--cov=tests.unit",
"--cov-append",
"--cov-config=.coveragerc",
"--cov-report=",
"--cov-fail-under=0",
os.path.join("tests", "unit"),
*(
# Helpful for running a single test or testfile.
session.posargs
or [
"--quiet",
"--cov=google.api_core",
"--cov=tests.unit",
"--cov-append",
"--cov-config=.coveragerc",
"--cov-report=",
"--cov-fail-under=0",
# Running individual tests with parallelism enabled is usually not helpful.
"-n=auto",
os.path.join("tests", "unit"),
]
),
]
pytest_args.extend(session.posargs)

# Inject AsyncIO content and proto-plus, if version >= 3.6.
# proto-plus is needed for a field mask test in test_protobuf_helpers.py
if _greater_or_equal_than_36(session.python):
session.install("asyncmock", "pytest-asyncio", "proto-plus")

pytest_args.append("--cov=tests.asyncio")
pytest_args.append(os.path.join("tests", "asyncio"))
session.run(*pytest_args)
else:
# Run py.test against the unit tests.
session.run(*pytest_args)
# Having positional arguments means the user wants to run specific tests.
# Best not to add additional tests to that list.
if not session.posargs:
pytest_args.append("--cov=tests.asyncio")
pytest_args.append(os.path.join("tests", "asyncio"))

session.run(*pytest_args)


@nox.session(python=["3.6", "3.7", "3.8", "3.9", "3.10"])
Expand Down Expand Up @@ -171,7 +179,11 @@ def mypy(session):
"""Run type-checking."""
session.install(".[grpc, grpcgcp]", "mypy")
session.install(
"types-setuptools", "types-requests", "types-protobuf", "types-mock"
"types-setuptools",
"types-requests",
"types-protobuf",
"types-mock",
"types-dataclasses",
)
session.run("mypy", "google", "tests")

Expand Down

0 comments on commit 97da2d9

Please sign in to comment.