From 09f4e9ffc1a75b026ca608d27a7a94b4e647fa2c Mon Sep 17 00:00:00 2001 From: Andre Gaschler Date: Thu, 9 Aug 2018 14:58:06 +0200 Subject: [PATCH 1/2] Unary client supports timeout ISSUE=#28 --- CMakeLists.txt | 1 + async_grpc/BUILD.bazel | 1 + async_grpc/client.h | 37 ++++++++++++------------ async_grpc/client_test.cc | 61 +++++++++++++++++++++++++++++++++++++++ async_grpc/server_test.cc | 5 ++-- 5 files changed, 85 insertions(+), 20 deletions(-) create mode 100644 async_grpc/client_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 3afde7b..ae173aa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -62,6 +62,7 @@ set(ALL_LIBRARY_SRCS async_grpc/token_file_credentials.cc) set(ALL_TESTS + async_grpc/client_test.cc async_grpc/server_test.cc async_grpc/type_traits_test.cc) diff --git a/async_grpc/BUILD.bazel b/async_grpc/BUILD.bazel index ce7ca81..a54630d 100644 --- a/async_grpc/BUILD.bazel +++ b/async_grpc/BUILD.bazel @@ -71,6 +71,7 @@ cc_library( [cc_test( name = src.replace("/", "_").replace(".cc", ""), + size = "small", srcs = [src], deps = [ ":async_grpc", diff --git a/async_grpc/client.h b/async_grpc/client.h index ddeece8..8b385b3 100644 --- a/async_grpc/client.h +++ b/async_grpc/client.h @@ -17,6 +17,7 @@ #ifndef CPP_GRPC_CLIENT_H #define CPP_GRPC_CLIENT_H +#include "async_grpc/common/optional.h" #include "async_grpc/retry.h" #include "async_grpc/rpc_handler_interface.h" #include "async_grpc/rpc_service_method_traits.h" @@ -47,30 +48,37 @@ class Client { using ResponseType = typename RpcServiceMethod::ResponseType; public: - Client(std::shared_ptr<::grpc::Channel> channel, RetryStrategy retry_strategy) + Client(std::shared_ptr<::grpc::Channel> channel) : channel_(channel), client_context_(common::make_unique<::grpc::ClientContext>()), rpc_method_name_(RpcServiceMethod::MethodName()), rpc_method_(rpc_method_name_.c_str(), RpcServiceMethod::StreamType, - channel_), - retry_strategy_(retry_strategy) {} + channel_) {} - Client(std::shared_ptr<::grpc::Channel> channel) + // 'timeout' is used for every 'Write' separately, but multiple retries count + // towards a single timeout. + Client(std::shared_ptr<::grpc::Channel> channel, common::Duration timeout, + RetryStrategy retry_strategy = nullptr) : channel_(channel), client_context_(common::make_unique<::grpc::ClientContext>()), rpc_method_name_(RpcServiceMethod::MethodName()), rpc_method_(rpc_method_name_.c_str(), RpcServiceMethod::StreamType, - channel_) {} + channel_), + timeout_(timeout), + retry_strategy_(retry_strategy) {} bool Write(const RequestType& request, ::grpc::Status* status = nullptr) { ::grpc::Status internal_status; + if (timeout_.has_value()) { + deadline_ = std::chrono::system_clock::now() + timeout_.value(); + } + Reset(); bool result = RetryWithStrategy(retry_strategy_, [this, &request, &internal_status] { WriteImpl(request, &internal_status); return internal_status; }, [this] { Reset(); }); - if (status != nullptr) { *status = internal_status; } @@ -82,6 +90,9 @@ class Client { private: void Reset() { client_context_ = common::make_unique<::grpc::ClientContext>(); + if (deadline_.has_value()) { + client_context_->set_deadline(deadline_.value()); + } } bool WriteImpl(const RequestType& request, ::grpc::Status* status) { @@ -101,6 +112,8 @@ class Client { std::unique_ptr<::grpc::ClientContext> client_context_; const std::string rpc_method_name_; const ::grpc::internal::RpcMethod rpc_method_; + common::optional timeout_; + common::optional deadline_; ResponseType response_; RetryStrategy retry_strategy_; @@ -143,10 +156,6 @@ class Client(); - } - bool WriteImpl(const RequestType& request, ::grpc::Status* status) { InstantiateClientWriterIfNeeded(); return client_writer_->Write(request); @@ -204,10 +213,6 @@ class Client(); - } - bool WriteImpl(const RequestType& request, ::grpc::Status* status) { InstantiateClientReader(request); return true; @@ -267,10 +272,6 @@ class Client(); - } - bool WriteImpl(const RequestType& request, ::grpc::Status* status) { InstantiateClientReaderWriterIfNeeded(); return client_reader_writer_->Write(request); diff --git a/async_grpc/client_test.cc b/async_grpc/client_test.cc new file mode 100644 index 0000000..a75946b --- /dev/null +++ b/async_grpc/client_test.cc @@ -0,0 +1,61 @@ +/* + * Copyright 2018 The Cartographer Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "async_grpc/client.h" + +#include "async_grpc/proto/math_service.pb.h" +#include "async_grpc/retry.h" +#include "glog/logging.h" +#include "grpc++/grpc++.h" +#include "gtest/gtest.h" + +namespace async_grpc { +namespace { + +struct GetEchoMethod { + static constexpr const char* MethodName() { + return "/async_grpc.proto.Math/GetEcho"; + } + using IncomingType = proto::GetEchoRequest; + using OutgoingType = proto::GetEchoResponse; +}; + +const char* kWrongAddress = "wrong-domain-does-not-exist:50051"; + +TEST(ClientTest, TimesOut) { + auto client_channel = ::grpc::CreateChannel( + kWrongAddress, ::grpc::InsecureChannelCredentials()); + Client client(client_channel, common::FromSeconds(0.1)); + proto::GetEchoRequest request; + grpc::Status status; + EXPECT_FALSE(client.Write(request, &status)); + EXPECT_EQ(status.error_code(), grpc::StatusCode::DEADLINE_EXCEEDED); +} + +TEST(ClientTest, TimesOutWithRetries) { + auto client_channel = ::grpc::CreateChannel( + kWrongAddress, ::grpc::InsecureChannelCredentials()); + Client client( + client_channel, common::FromSeconds(0.5), + CreateLimitedBackoffStrategy(common::FromSeconds(0.1), 1, 3)); + proto::GetEchoRequest request; + grpc::Status status; + EXPECT_FALSE(client.Write(request, &status)); + EXPECT_EQ(status.error_code(), grpc::StatusCode::DEADLINE_EXCEEDED); +} + +} // namespace +} // namespace async_grpc diff --git a/async_grpc/server_test.cc b/async_grpc/server_test.cc index 72defbf..3047299 100644 --- a/async_grpc/server_test.cc +++ b/async_grpc/server_test.cc @@ -285,8 +285,9 @@ TEST_F(ServerTest, RetryWithUnrecoverableError) { server_->Start(); Client client( - client_channel_, CreateUnlimitedConstantDelayStrategy( - common::FromSeconds(1), {::grpc::INTERNAL})); + client_channel_, common::FromSeconds(5), + CreateUnlimitedConstantDelayStrategy(common::FromSeconds(1), + {::grpc::INTERNAL})); proto::GetSquareRequest request; request.set_input(-11); EXPECT_FALSE(client.Write(request)); From 07c054fe784872ada87b1ebec761c629ede1d561 Mon Sep 17 00:00:00 2001 From: Andre Gaschler Date: Thu, 9 Aug 2018 16:04:43 +0200 Subject: [PATCH 2/2] address comment --- async_grpc/client.h | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/async_grpc/client.h b/async_grpc/client.h index 8b385b3..c4ba45b 100644 --- a/async_grpc/client.h +++ b/async_grpc/client.h @@ -69,16 +69,18 @@ class Client { bool Write(const RequestType& request, ::grpc::Status* status = nullptr) { ::grpc::Status internal_status; + common::optional deadline; if (timeout_.has_value()) { - deadline_ = std::chrono::system_clock::now() + timeout_.value(); + deadline = std::chrono::system_clock::now() + timeout_.value(); } - Reset(); - bool result = RetryWithStrategy(retry_strategy_, - [this, &request, &internal_status] { - WriteImpl(request, &internal_status); - return internal_status; - }, - [this] { Reset(); }); + client_context_ = ResetContext(deadline); + bool result = RetryWithStrategy( + retry_strategy_, + [this, &request, &internal_status] { + WriteImpl(request, &internal_status); + return internal_status; + }, + [this, deadline] { client_context_ = ResetContext(deadline); }); if (status != nullptr) { *status = internal_status; } @@ -88,11 +90,13 @@ class Client { const ResponseType& response() { return response_; } private: - void Reset() { - client_context_ = common::make_unique<::grpc::ClientContext>(); - if (deadline_.has_value()) { - client_context_->set_deadline(deadline_.value()); + static std::unique_ptr<::grpc::ClientContext> ResetContext( + common::optional deadline) { + auto context = common::make_unique<::grpc::ClientContext>(); + if (deadline.has_value()) { + context->set_deadline(deadline.value()); } + return context; } bool WriteImpl(const RequestType& request, ::grpc::Status* status) { @@ -113,7 +117,6 @@ class Client { const std::string rpc_method_name_; const ::grpc::internal::RpcMethod rpc_method_; common::optional timeout_; - common::optional deadline_; ResponseType response_; RetryStrategy retry_strategy_;