Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Python Cancellation Example #19465

Merged
merged 32 commits into from
Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
32944fd
Lay out bones of example
gnossen Jun 24, 2019
335e655
Free up server thread upon cancellation
gnossen Jun 24, 2019
7dccc07
Start writing README
gnossen Jun 24, 2019
b31431a
Switch over to a generator
gnossen Jun 24, 2019
244279c
Add client CLI
gnossen Jun 25, 2019
b6a5e94
Respond to ctrl+c on client side
gnossen Jun 25, 2019
c9e83db
Implement streaming on the client side
gnossen Jun 25, 2019
4ee154d
Elaborate on unary cancellation
gnossen Jun 25, 2019
82aa406
Elaborate on cancelling streaming RPCs
gnossen Jun 25, 2019
cdae8ca
Add intro about algorithm
gnossen Jun 25, 2019
b9cc2c2
Explain how we take care of servicer threads
gnossen Jun 25, 2019
4c852bf
Cancel RPCs after a hash limit has been reached
gnossen Jun 25, 2019
b122997
Clean up logging
gnossen Jun 25, 2019
8f1bfda
Yapf
gnossen Jun 25, 2019
dc8dba8
Add docstrings
gnossen Jun 25, 2019
93d6344
Add todo
gnossen Jun 25, 2019
786a3ac
Add test
gnossen Jun 25, 2019
edbddf2
Typos
gnossen Jun 25, 2019
81f4203
Pylint
gnossen Jun 25, 2019
fed1c62
Make compatible with Python 3
gnossen Jun 25, 2019
2bf4d50
Factor out simpler generator
gnossen Jun 26, 2019
42b2fe1
Simplify search implementation
gnossen Jun 26, 2019
7fa7f93
Pull search algorithm out into another module
gnossen Jun 26, 2019
4100084
Use six for compatibility in client
gnossen Jun 26, 2019
7486026
Annotate the proto file
gnossen Jun 26, 2019
25f3439
Make whole package testonly
gnossen Jun 26, 2019
1db141a
Change section title
gnossen Jun 26, 2019
915e97b
Fix main thread starvation issues
gnossen Jun 26, 2019
61c2f0c
Merge remote-tracking branch 'origin/master' into cancellation_example
gnossen Jul 4, 2019
ba39c92
Adopt reviewer's comments
gnossen Jul 8, 2019
79e78d1
Pylint
gnossen Jul 8, 2019
805afe6
Copyright typo
gnossen Jul 8, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions examples/python/cancellation/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# gRPC Bazel BUILD file.
#
# Copyright 2019 The gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

load("@grpc_python_dependencies//:requirements.bzl", "requirement")
load("//bazel:python_rules.bzl", "py_proto_library")

package(default_testonly = 1)

proto_library(
name = "hash_name_proto",
srcs = ["hash_name.proto"],
)

py_proto_library(
name = "hash_name_proto_pb2",
deps = [":hash_name_proto"],
gnossen marked this conversation as resolved.
Show resolved Hide resolved
well_known_protos = False,
)

py_binary(
name = "client",
srcs = ["client.py"],
deps = [
"//src/python/grpcio/grpc:grpcio",
":hash_name_proto_pb2",
requirement("six"),
],
srcs_version = "PY2AND3",
)

py_library(
name = "search",
srcs = ["search.py"],
srcs_version = "PY2AND3",
deps = [
":hash_name_proto_pb2",
],
)

py_binary(
name = "server",
srcs = ["server.py"],
deps = [
"//src/python/grpcio/grpc:grpcio",
":hash_name_proto_pb2",
":search",
] + select({
"//conditions:default": [requirement("futures")],
"//:python3": [],
}),
srcs_version = "PY2AND3",
)

py_test(
name = "test/_cancellation_example_test",
srcs = ["test/_cancellation_example_test.py"],
data = [
":client",
":server"
],
size = "small",
)
127 changes: 127 additions & 0 deletions examples/python/cancellation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
### Cancellation

In the example, we implement a silly algorithm. We search for bytestrings whose
hashes are similar to a given search string. For example, say we're looking for
the string "doctor". Our algorithm may return `JrqhZVkTDoctYrUlXDbL6pfYQHU=` or
`RC9/7mlM3ldy4TdoctOc6WzYbO4=`. This is a brute force algorithm, so the server
performing the search must be conscious of the resources it allows to each client
and each client must be conscientious of the resources it demands of the server.

In particular, we ensure that client processes cancel the stream explicitly
before terminating and we ensure that server processes cancel RPCs that have gone on longer
than a certain number of iterations.
gnossen marked this conversation as resolved.
Show resolved Hide resolved

#### Cancellation on the Client Side

A client may cancel an RPC for several reasons. Perhaps the data it requested
has been made irrelevant. Perhaps you, as the client, want to be a good citizen
of the server and are conserving compute resources.

##### Cancelling a Server-Side Unary RPC from the Client

The default RPC methods on a stub will simply return the result of an RPC.

```python
>>> stub = hash_name_pb2_grpc.HashFinderStub(channel)
>>> stub.Find(hash_name_pb2.HashNameRequest(desired_name=name))
<hash_name_pb2.HashNameResponse object at 0x7fe2eb8ce2d0>
```

But you may use the `future()` method to receive an instance of `grpc.Future`.
This interface allows you to wait on a response with a timeout, add a callback
to be executed when the RPC completes, or to cancel the RPC before it has
completed.

In the example, we use this interface to cancel our in-progress RPC when the
user interrupts the process with ctrl-c.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signal mechanism is not a necessary step for cancellation. I think a simple future.cancel() is capable of convey the idea of cancelling an unary request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. If you have a better way to motivate why you would cancel on the client side, I'm more than happy to give it a shot.


```python
stub = hash_name_pb2_grpc.HashFinderStub(channel)
future = stub.Find.future(hash_name_pb2.HashNameRequest(desired_name=name))
def cancel_request(unused_signum, unused_frame):
future.cancel()
sys.exit(0)
signal.signal(signal.SIGINT, cancel_request)

result = future.result()
print(result)
```

We also call `sys.exit(0)` to terminate the process. If we do not do this, then
`future.result()` with throw an `RpcError`. Alternatively, you may catch this
exception.


##### Cancelling a Server-Side Streaming RPC from the Client

Cancelling a Server-side streaming RPC is even simpler from the perspective of
the gRPC API. The default stub method is already an instance of `grpc.Future`,
so the methods outlined above still apply. It is also a generator, so we may
iterate over it to yield the results of our RPC.

```python
stub = hash_name_pb2_grpc.HashFinderStub(channel)
result_generator = stub.FindRange(hash_name_pb2.HashNameRequest(desired_name=name))
def cancel_request(unused_signum, unused_frame):
result_generator.cancel()
sys.exit(0)
signal.signal(signal.SIGINT, cancel_request)
for result in result_generator:
print(result)
```

We also call `sys.exit(0)` here to terminate the process. Alternatively, you may
catch the `RpcError` raised by the for loop upon cancellation.


#### Cancellation on the Server Side

A server is reponsible for cancellation in two ways. It must respond in some way
when a client initiates a cancellation, otherwise long-running computations
could continue indefinitely.

It may also decide to cancel the RPC for its own reasons. In our example, the
server can be configured to cancel an RPC after a certain number of hashes has
been computed in order to conserve compute resources.

##### Responding to Cancellations from a Servicer Thread

It's important to remember that a gRPC Python server is backed by a thread pool
with a fixed size. When an RPC is cancelled, the library does *not* terminate
your servicer thread. It is your responsibility as the application author to
ensure that your servicer thread terminates soon after the RPC has been
cancelled.

In this example, we use the `ServicerContext.add_callback` method to set a
`threading.Event` object when the RPC is terminated. We pass this `Event` object
down through our hashing algorithm and ensure to check that the RPC is still
ongoing before each iteration.

```python
stop_event = threading.Event()
def on_rpc_done():
# Regain servicer thread.
stop_event.set()
context.add_callback(on_rpc_done)
secret = _find_secret(stop_event)
```

##### Initiating a Cancellation on the Server Side

Initiating a cancellation from the server side is simpler. Just call
`ServicerContext.cancel()`.

In our example, we ensure that no single client is monopolizing the server by
cancelling after a configurable number of hashes have been checked.

```python
try:
for candidate in secret_generator:
yield candidate
except ResourceLimitExceededError:
print("Cancelling RPC due to exhausted resources.")
context.cancel()
```

In this type of situation, you may also consider returning a more specific error
using the [`grpcio-status`](https://pypi.org/project/grpcio-status/) package.
104 changes: 104 additions & 0 deletions examples/python/cancellation/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright 2019 the gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""An example of cancelling requests in gRPC."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import logging
import signal
import sys

import grpc

from examples.python.cancellation import hash_name_pb2
from examples.python.cancellation import hash_name_pb2_grpc

_DESCRIPTION = "A client for finding hashes similar to names."
_LOGGER = logging.getLogger(__name__)


def run_unary_client(server_target, name, ideal_distance):
with grpc.insecure_channel(server_target) as channel:
stub = hash_name_pb2_grpc.HashFinderStub(channel)
future = stub.Find.future(
hash_name_pb2.HashNameRequest(
desired_name=name, ideal_hamming_distance=ideal_distance),
wait_for_ready=True)

def cancel_request(unused_signum, unused_frame):
future.cancel()
sys.exit(0)

signal.signal(signal.SIGINT, cancel_request)
result = future.result()
print(result)


def run_streaming_client(server_target, name, ideal_distance,
interesting_distance):
with grpc.insecure_channel(server_target) as channel:
stub = hash_name_pb2_grpc.HashFinderStub(channel)
result_generator = stub.FindRange(
hash_name_pb2.HashNameRequest(
desired_name=name,
ideal_hamming_distance=ideal_distance,
interesting_hamming_distance=interesting_distance),
wait_for_ready=True)

def cancel_request(unused_signum, unused_frame):
result_generator.cancel()
sys.exit(0)

signal.signal(signal.SIGINT, cancel_request)
for result in result_generator:
print(result)


def main():
parser = argparse.ArgumentParser(description=_DESCRIPTION)
parser.add_argument("name", type=str, help='The desired name.')
parser.add_argument(
"--ideal-distance",
default=0,
nargs='?',
type=int,
help="The desired Hamming distance.")
parser.add_argument(
'--server',
default='localhost:50051',
type=str,
nargs='?',
help='The host-port pair at which to reach the server.')
parser.add_argument(
'--show-inferior',
default=None,
type=int,
nargs='?',
help='Also show candidates with a Hamming distance less than this value.'
)

args = parser.parse_args()
if args.show_inferior is not None:
run_streaming_client(args.server, args.name, args.ideal_distance,
args.show_inferior)
else:
run_unary_client(args.server, args.name, args.ideal_distance)


if __name__ == "__main__":
logging.basicConfig()
main()
56 changes: 56 additions & 0 deletions examples/python/cancellation/hash_name.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2019 the gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package hash_name;

// A request for a single secret whose hash is similar to a desired name.
message HashNameRequest {
// The string that is desired in the secret's hash.
string desired_name = 1;

// The ideal Hamming distance betwen desired_name and the secret that will
// be searched for.
int32 ideal_hamming_distance = 2;

// A Hamming distance greater than the ideal Hamming distance. Search results
// with a Hamming distance less than this value but greater than the ideal
// distance will be returned back to the client but will not terminate the
// search.
int32 interesting_hamming_distance = 3;
}

message HashNameResponse {
// The search result.
string secret = 1;

// The hash of the search result. A substring of this is of
// ideal_hamming_distance Hamming distance or less from desired_name.
string hashed_name = 2;

// The Hamming distance between hashed_name and desired_name.
int32 hamming_distance = 3;
}

service HashFinder {

// Search for a single string whose hash is similar to the specified
// desired_name. interesting_hamming_distance is ignored.
rpc Find (HashNameRequest) returns (HashNameResponse) {}

// Search for a string whose hash is similar to the specified desired_name,
// but also stream back less-than-ideal candidates.
rpc FindRange (HashNameRequest) returns (stream HashNameResponse) {}
}
gnossen marked this conversation as resolved.
Show resolved Hide resolved
Loading