Skip to content

Commit

Permalink
Merge pull request #20753 from gnossen/unary_stream
Browse files Browse the repository at this point in the history
Add experimental option to run unary-stream RPCs on a single Python thread.
  • Loading branch information
gnossen committed Oct 23, 2019
2 parents 19649c6 + 0c6f8db commit 018580f
Show file tree
Hide file tree
Showing 10 changed files with 454 additions and 110 deletions.
367 changes: 267 additions & 100 deletions src/python/grpcio/grpc/_channel.py

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,6 @@ cdef _close(Channel channel, grpc_status_code code, object details,
else:
while state.integrated_call_states:
state.condition.wait()
while state.segregated_call_states:
state.condition.wait()
while state.connectivity_due:
state.condition.wait()

Expand Down
11 changes: 11 additions & 0 deletions src/python/grpcio/grpc/experimental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,14 @@
These APIs are subject to be removed during any minor version release.
"""


class ChannelOptions(object):
"""Indicates a channel option unique to gRPC Python.
This enumeration is part of an EXPERIMENTAL API.
Attributes:
SingleThreadedUnaryStream: Perform unary-stream RPCs on a single thread.
"""
SingleThreadedUnaryStream = "SingleThreadedUnaryStream"
30 changes: 30 additions & 0 deletions src/python/grpcio_tests/tests/stress/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
load("@com_github_grpc_grpc//bazel:python_rules.bzl", "py_proto_library", "py_grpc_library")

proto_library(
name = "unary_stream_benchmark_proto",
srcs = ["unary_stream_benchmark.proto"],
deps = [],
)

py_proto_library(
name = "unary_stream_benchmark_py_pb2",
deps = [":unary_stream_benchmark_proto"],
)

py_grpc_library(
name = "unary_stream_benchmark_py_pb2_grpc",
srcs = [":unary_stream_benchmark_proto"],
deps = [":unary_stream_benchmark_py_pb2"],
)

py_binary(
name = "unary_stream_benchmark",
srcs_version = "PY3",
python_version = "PY3",
srcs = ["unary_stream_benchmark.py"],
deps = [
"//src/python/grpcio/grpc:grpcio",
":unary_stream_benchmark_py_pb2",
":unary_stream_benchmark_py_pb2_grpc",
]
)
27 changes: 27 additions & 0 deletions src/python/grpcio_tests/tests/stress/unary_stream_benchmark.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";

message BenchmarkRequest {
int32 message_size = 1;
int32 response_count = 2;
}

message BenchmarkResponse {
bytes response = 1;
}

service UnaryStreamBenchmarkService {
rpc Benchmark(BenchmarkRequest) returns (stream BenchmarkResponse);
}
104 changes: 104 additions & 0 deletions src/python/grpcio_tests/tests/stress/unary_stream_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright 2019 The gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import threading
import grpc
import grpc.experimental
import subprocess
import sys
import time
import contextlib

_PORT = 5741
_MESSAGE_SIZE = 4
_RESPONSE_COUNT = 32 * 1024

_SERVER_CODE = """
import datetime
import threading
import grpc
from concurrent import futures
from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2
from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2_grpc
class Handler(unary_stream_benchmark_pb2_grpc.UnaryStreamBenchmarkServiceServicer):
def Benchmark(self, request, context):
payload = b'\\x00\\x01' * int(request.message_size / 2)
for _ in range(request.response_count):
yield unary_stream_benchmark_pb2.BenchmarkResponse(response=payload)
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
server.add_insecure_port('[::]:%d')
unary_stream_benchmark_pb2_grpc.add_UnaryStreamBenchmarkServiceServicer_to_server(Handler(), server)
server.start()
server.wait_for_termination()
""" % _PORT

try:
from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2
from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2_grpc

_GRPC_CHANNEL_OPTIONS = [
('grpc.max_metadata_size', 16 * 1024 * 1024),
('grpc.max_receive_message_length', 64 * 1024 * 1024),
(grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1),
]

@contextlib.contextmanager
def _running_server():
server_process = subprocess.Popen(
[sys.executable, '-c', _SERVER_CODE],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
yield
finally:
server_process.terminate()
server_process.wait()
sys.stdout.write("stdout: {}".format(server_process.stdout.read()))
sys.stdout.flush()
sys.stdout.write("stderr: {}".format(server_process.stderr.read()))
sys.stdout.flush()

def profile(message_size, response_count):
request = unary_stream_benchmark_pb2.BenchmarkRequest(
message_size=message_size, response_count=response_count)
with grpc.insecure_channel(
'[::]:{}'.format(_PORT),
options=_GRPC_CHANNEL_OPTIONS) as channel:
stub = unary_stream_benchmark_pb2_grpc.UnaryStreamBenchmarkServiceStub(
channel)
start = datetime.datetime.now()
call = stub.Benchmark(request, wait_for_ready=True)
for message in call:
pass
end = datetime.datetime.now()
return end - start

def main():
with _running_server():
for i in range(1000):
latency = profile(_MESSAGE_SIZE, 1024)
sys.stdout.write("{}\n".format(latency.total_seconds()))
sys.stdout.flush()

if __name__ == '__main__':
main()

except ImportError:
# NOTE(rbellevi): The test runner should not load this module.
pass
1 change: 1 addition & 0 deletions src/python/grpcio_tests/tests/unit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ GRPCIO_TESTS_UNIT = [
"_invocation_defects_test.py",
"_local_credentials_test.py",
"_logging_test.py",
"_metadata_flags_test.py",
"_metadata_code_details_test.py",
"_metadata_test.py",
# TODO: Issue 16336
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ def testSuccessfulUnaryStream(self):

response_iterator_call = self._unary_stream(
_SERIALIZED_REQUEST, metadata=_CLIENT_METADATA)
received_initial_metadata = response_iterator_call.initial_metadata()
list(response_iterator_call)
received_initial_metadata = response_iterator_call.initial_metadata()

self.assertTrue(
test_common.metadata_transmitted(
Expand Down Expand Up @@ -349,11 +349,14 @@ def testAbortedUnaryStream(self):

response_iterator_call = self._unary_stream(
_SERIALIZED_REQUEST, metadata=_CLIENT_METADATA)
received_initial_metadata = \
response_iterator_call.initial_metadata()
# NOTE: In the single-threaded case, we cannot grab the initial_metadata
# without running the RPC first (or concurrently, in another
# thread).
with self.assertRaises(grpc.RpcError):
self.assertEqual(len(list(response_iterator_call)), 0)

received_initial_metadata = \
response_iterator_call.initial_metadata()
self.assertTrue(
test_common.metadata_transmitted(
_CLIENT_METADATA,
Expand Down Expand Up @@ -454,9 +457,9 @@ def testCustomCodeUnaryStream(self):

response_iterator_call = self._unary_stream(
_SERIALIZED_REQUEST, metadata=_CLIENT_METADATA)
received_initial_metadata = response_iterator_call.initial_metadata()
with self.assertRaises(grpc.RpcError):
list(response_iterator_call)
received_initial_metadata = response_iterator_call.initial_metadata()

self.assertTrue(
test_common.metadata_transmitted(
Expand Down Expand Up @@ -547,9 +550,9 @@ def testCustomCodeExceptionUnaryStream(self):

response_iterator_call = self._unary_stream(
_SERIALIZED_REQUEST, metadata=_CLIENT_METADATA)
received_initial_metadata = response_iterator_call.initial_metadata()
with self.assertRaises(grpc.RpcError):
list(response_iterator_call)
received_initial_metadata = response_iterator_call.initial_metadata()

self.assertTrue(
test_common.metadata_transmitted(
Expand Down
6 changes: 3 additions & 3 deletions src/python/grpcio_tests/tests/unit/_metadata_flags_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ def service(self, handler_call_details):


def get_free_loopback_tcp_port():
tcp = socket.socket(socket.AF_INET6)
tcp = socket.socket(socket.AF_INET)
tcp.bind(('', 0))
address_tuple = tcp.getsockname()
return tcp, "[::1]:%s" % (address_tuple[1])
return tcp, "localhost:%s" % (address_tuple[1])


def create_dummy_channel():
Expand Down Expand Up @@ -183,7 +183,7 @@ def check_connection_does_failfast(self, fn, channel, wait_for_ready=None):
fn(channel, wait_for_ready)
self.fail("The Call should fail")
except BaseException as e: # pylint: disable=broad-except
self.assertIn('StatusCode.UNAVAILABLE', str(e))
self.assertIs(grpc.StatusCode.UNAVAILABLE, e.code())

def test_call_wait_for_ready_default(self):
for perform_call in _ALL_CALL_CASES:
Expand Down
3 changes: 3 additions & 0 deletions src/python/grpcio_tests/tests/unit/_metadata_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ def testUnaryUnary(self):
def testUnaryStream(self):
multi_callable = self._channel.unary_stream(_UNARY_STREAM)
call = multi_callable(_REQUEST, metadata=_INVOCATION_METADATA)
# TODO(https://github.com/grpc/grpc/issues/20762): Make the call to
# `next()` unnecessary.
next(call)
self.assertTrue(
test_common.metadata_transmitted(_EXPECTED_INITIAL_METADATA,
call.initial_metadata()))
Expand Down

0 comments on commit 018580f

Please sign in to comment.