-
Notifications
You must be signed in to change notification settings - Fork 10.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #19465 from gnossen/cancellation_example
Add Python Cancellation Example
- Loading branch information
Showing
7 changed files
with
731 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
# gRPC Bazel BUILD file. | ||
# | ||
# Copyright 2019 The gRPC authors. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
load("@grpc_python_dependencies//:requirements.bzl", "requirement") | ||
load("//bazel:python_rules.bzl", "py_proto_library") | ||
|
||
package(default_testonly = 1) | ||
|
||
proto_library( | ||
name = "hash_name_proto", | ||
srcs = ["hash_name.proto"], | ||
) | ||
|
||
py_proto_library( | ||
name = "hash_name_proto_pb2", | ||
deps = [":hash_name_proto"], | ||
well_known_protos = False, | ||
) | ||
|
||
py_binary( | ||
name = "client", | ||
srcs = ["client.py"], | ||
deps = [ | ||
"//src/python/grpcio/grpc:grpcio", | ||
":hash_name_proto_pb2", | ||
requirement("six"), | ||
], | ||
srcs_version = "PY2AND3", | ||
) | ||
|
||
py_library( | ||
name = "search", | ||
srcs = ["search.py"], | ||
srcs_version = "PY2AND3", | ||
deps = [ | ||
":hash_name_proto_pb2", | ||
], | ||
) | ||
|
||
py_binary( | ||
name = "server", | ||
srcs = ["server.py"], | ||
deps = [ | ||
"//src/python/grpcio/grpc:grpcio", | ||
":hash_name_proto_pb2", | ||
":search", | ||
] + select({ | ||
"//conditions:default": [requirement("futures")], | ||
"//:python3": [], | ||
}), | ||
srcs_version = "PY2AND3", | ||
) | ||
|
||
py_test( | ||
name = "test/_cancellation_example_test", | ||
srcs = ["test/_cancellation_example_test.py"], | ||
data = [ | ||
":client", | ||
":server" | ||
], | ||
size = "small", | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
### Cancellation | ||
|
||
In the example, we implement a silly algorithm. We search for bytestrings whose | ||
hashes are similar to a given search string. For example, say we're looking for | ||
the string "doctor". Our algorithm may return `JrqhZVkTDoctYrUlXDbL6pfYQHU=` or | ||
`RC9/7mlM3ldy4TdoctOc6WzYbO4=`. This is a brute force algorithm, so the server | ||
performing the search must be conscious of the resources it allows to each client | ||
and each client must be conscientious of the resources it demands of the server. | ||
|
||
In particular, we ensure that client processes cancel the stream explicitly | ||
before terminating and we ensure that server processes cancel RPCs that have gone on longer | ||
than a certain number of iterations. | ||
|
||
#### Cancellation on the Client Side | ||
|
||
A client may cancel an RPC for several reasons. Perhaps the data it requested | ||
has been made irrelevant. Perhaps you, as the client, want to be a good citizen | ||
of the server and are conserving compute resources. | ||
|
||
##### Cancelling a Server-Side Unary RPC from the Client | ||
|
||
The default RPC methods on a stub will simply return the result of an RPC. | ||
|
||
```python | ||
>>> stub = hash_name_pb2_grpc.HashFinderStub(channel) | ||
>>> stub.Find(hash_name_pb2.HashNameRequest(desired_name=name)) | ||
<hash_name_pb2.HashNameResponse object at 0x7fe2eb8ce2d0> | ||
``` | ||
|
||
But you may use the `future()` method to receive an instance of `grpc.Future`. | ||
This interface allows you to wait on a response with a timeout, add a callback | ||
to be executed when the RPC completes, or to cancel the RPC before it has | ||
completed. | ||
|
||
In the example, we use this interface to cancel our in-progress RPC when the | ||
user interrupts the process with ctrl-c. | ||
|
||
```python | ||
stub = hash_name_pb2_grpc.HashFinderStub(channel) | ||
future = stub.Find.future(hash_name_pb2.HashNameRequest(desired_name=name)) | ||
def cancel_request(unused_signum, unused_frame): | ||
future.cancel() | ||
sys.exit(0) | ||
signal.signal(signal.SIGINT, cancel_request) | ||
|
||
result = future.result() | ||
print(result) | ||
``` | ||
|
||
We also call `sys.exit(0)` to terminate the process. If we do not do this, then | ||
`future.result()` with throw an `RpcError`. Alternatively, you may catch this | ||
exception. | ||
|
||
|
||
##### Cancelling a Server-Side Streaming RPC from the Client | ||
|
||
Cancelling a Server-side streaming RPC is even simpler from the perspective of | ||
the gRPC API. The default stub method is already an instance of `grpc.Future`, | ||
so the methods outlined above still apply. It is also a generator, so we may | ||
iterate over it to yield the results of our RPC. | ||
|
||
```python | ||
stub = hash_name_pb2_grpc.HashFinderStub(channel) | ||
result_generator = stub.FindRange(hash_name_pb2.HashNameRequest(desired_name=name)) | ||
def cancel_request(unused_signum, unused_frame): | ||
result_generator.cancel() | ||
sys.exit(0) | ||
signal.signal(signal.SIGINT, cancel_request) | ||
for result in result_generator: | ||
print(result) | ||
``` | ||
|
||
We also call `sys.exit(0)` here to terminate the process. Alternatively, you may | ||
catch the `RpcError` raised by the for loop upon cancellation. | ||
|
||
|
||
#### Cancellation on the Server Side | ||
|
||
A server is reponsible for cancellation in two ways. It must respond in some way | ||
when a client initiates a cancellation, otherwise long-running computations | ||
could continue indefinitely. | ||
|
||
It may also decide to cancel the RPC for its own reasons. In our example, the | ||
server can be configured to cancel an RPC after a certain number of hashes has | ||
been computed in order to conserve compute resources. | ||
|
||
##### Responding to Cancellations from a Servicer Thread | ||
|
||
It's important to remember that a gRPC Python server is backed by a thread pool | ||
with a fixed size. When an RPC is cancelled, the library does *not* terminate | ||
your servicer thread. It is your responsibility as the application author to | ||
ensure that your servicer thread terminates soon after the RPC has been | ||
cancelled. | ||
|
||
In this example, we use the `ServicerContext.add_callback` method to set a | ||
`threading.Event` object when the RPC is terminated. We pass this `Event` object | ||
down through our hashing algorithm and ensure to check that the RPC is still | ||
ongoing before each iteration. | ||
|
||
```python | ||
stop_event = threading.Event() | ||
def on_rpc_done(): | ||
# Regain servicer thread. | ||
stop_event.set() | ||
context.add_callback(on_rpc_done) | ||
secret = _find_secret(stop_event) | ||
``` | ||
|
||
##### Initiating a Cancellation on the Server Side | ||
|
||
Initiating a cancellation from the server side is simpler. Just call | ||
`ServicerContext.cancel()`. | ||
|
||
In our example, we ensure that no single client is monopolizing the server by | ||
cancelling after a configurable number of hashes have been checked. | ||
|
||
```python | ||
try: | ||
for candidate in secret_generator: | ||
yield candidate | ||
except ResourceLimitExceededError: | ||
print("Cancelling RPC due to exhausted resources.") | ||
context.cancel() | ||
``` | ||
|
||
In this type of situation, you may also consider returning a more specific error | ||
using the [`grpcio-status`](https://pypi.org/project/grpcio-status/) package. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
# Copyright 2019 the gRPC authors. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
"""An example of cancelling requests in gRPC.""" | ||
|
||
from __future__ import absolute_import | ||
from __future__ import division | ||
from __future__ import print_function | ||
|
||
import argparse | ||
import logging | ||
import signal | ||
import sys | ||
|
||
import grpc | ||
|
||
from examples.python.cancellation import hash_name_pb2 | ||
from examples.python.cancellation import hash_name_pb2_grpc | ||
|
||
_DESCRIPTION = "A client for finding hashes similar to names." | ||
_LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
def run_unary_client(server_target, name, ideal_distance): | ||
with grpc.insecure_channel(server_target) as channel: | ||
stub = hash_name_pb2_grpc.HashFinderStub(channel) | ||
future = stub.Find.future( | ||
hash_name_pb2.HashNameRequest( | ||
desired_name=name, ideal_hamming_distance=ideal_distance), | ||
wait_for_ready=True) | ||
|
||
def cancel_request(unused_signum, unused_frame): | ||
future.cancel() | ||
sys.exit(0) | ||
|
||
signal.signal(signal.SIGINT, cancel_request) | ||
result = future.result() | ||
print(result) | ||
|
||
|
||
def run_streaming_client(server_target, name, ideal_distance, | ||
interesting_distance): | ||
with grpc.insecure_channel(server_target) as channel: | ||
stub = hash_name_pb2_grpc.HashFinderStub(channel) | ||
result_generator = stub.FindRange( | ||
hash_name_pb2.HashNameRequest( | ||
desired_name=name, | ||
ideal_hamming_distance=ideal_distance, | ||
interesting_hamming_distance=interesting_distance), | ||
wait_for_ready=True) | ||
|
||
def cancel_request(unused_signum, unused_frame): | ||
result_generator.cancel() | ||
sys.exit(0) | ||
|
||
signal.signal(signal.SIGINT, cancel_request) | ||
for result in result_generator: | ||
print(result) | ||
|
||
|
||
def main(): | ||
parser = argparse.ArgumentParser(description=_DESCRIPTION) | ||
parser.add_argument("name", type=str, help='The desired name.') | ||
parser.add_argument( | ||
"--ideal-distance", | ||
default=0, | ||
nargs='?', | ||
type=int, | ||
help="The desired Hamming distance.") | ||
parser.add_argument( | ||
'--server', | ||
default='localhost:50051', | ||
type=str, | ||
nargs='?', | ||
help='The host-port pair at which to reach the server.') | ||
parser.add_argument( | ||
'--show-inferior', | ||
default=None, | ||
type=int, | ||
nargs='?', | ||
help='Also show candidates with a Hamming distance less than this value.' | ||
) | ||
|
||
args = parser.parse_args() | ||
if args.show_inferior is not None: | ||
run_streaming_client(args.server, args.name, args.ideal_distance, | ||
args.show_inferior) | ||
else: | ||
run_unary_client(args.server, args.name, args.ideal_distance) | ||
|
||
|
||
if __name__ == "__main__": | ||
logging.basicConfig() | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
// Copyright 2019 the gRPC authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
syntax = "proto3"; | ||
|
||
package hash_name; | ||
|
||
// A request for a single secret whose hash is similar to a desired name. | ||
message HashNameRequest { | ||
// The string that is desired in the secret's hash. | ||
string desired_name = 1; | ||
|
||
// The ideal Hamming distance betwen desired_name and the secret that will | ||
// be searched for. | ||
int32 ideal_hamming_distance = 2; | ||
|
||
// A Hamming distance greater than the ideal Hamming distance. Search results | ||
// with a Hamming distance less than this value but greater than the ideal | ||
// distance will be returned back to the client but will not terminate the | ||
// search. | ||
int32 interesting_hamming_distance = 3; | ||
} | ||
|
||
message HashNameResponse { | ||
// The search result. | ||
string secret = 1; | ||
|
||
// The hash of the search result. A substring of this is of | ||
// ideal_hamming_distance Hamming distance or less from desired_name. | ||
string hashed_name = 2; | ||
|
||
// The Hamming distance between hashed_name and desired_name. | ||
int32 hamming_distance = 3; | ||
} | ||
|
||
service HashFinder { | ||
|
||
// Search for a single string whose hash is similar to the specified | ||
// desired_name. interesting_hamming_distance is ignored. | ||
rpc Find (HashNameRequest) returns (HashNameResponse) {} | ||
|
||
// Search for a string whose hash is similar to the specified desired_name, | ||
// but also stream back less-than-ideal candidates. | ||
rpc FindRange (HashNameRequest) returns (stream HashNameResponse) {} | ||
} |
Oops, something went wrong.