Skip to content

Commit

Permalink
Implements the timeout feature to the etcd client.
Browse files Browse the repository at this point in the history
Signed-off-by: Tao He <sighingnow@gmail.com>
  • Loading branch information
sighingnow committed May 22, 2022
1 parent 8da8946 commit e5f1167
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 3 deletions.
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@ And pass a `target_name_override` arguments to `WithSSL`,
etcd::Client *etcd = etcd::Client::WithSSL(
"https://127.0.0.1:2379,https://127.0.0.2:2479",
"example.rootca.cert", "example.cert", "example.key", "etcd");

```

For more discussion about this feature, see also [#87](https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/issues/87),
Expand Down Expand Up @@ -336,6 +335,26 @@ which can be used for fine-grained control the gRPC settings, e.g.,
For more motivation and discussion about the above design, please refer to [issue-103](https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/issues/103).
### gRPC timeout when waiting for responses
gRPC Timeout is long-standing missing pieces in the etcd-cpp-apiv3 library. The timeout has been
supported via a `set_grpc_timeout` interfaces on the client,
```cpp
template <typename Rep = std::micro>
void set_grpc_timeout(std::chrono::duration<Rep> const &timeout)
```

Any `std::chrono::duration` value can be used to set the grpc timeout, e.g.,

```cpp
etcd.set_grpc_timeout(std::chrono::seconds(5));
```

Note that the timeout value is the "timeout" when waiting for responses upon the gRPC channel, i.e., `CompletionQueue::AsyncNext`.
It doesn't means the timeout between issuing a `.set()` method getting the `etcd::Response`, as in the async mode the such a time
duration is unpredictable and the gRPC timeout should be enough to avoid deadly waiting (e.g., waiting for a `lock()`).

### Reading a value

You can read a value with the `get()` method of the client instance. The only parameter is the
Expand Down
16 changes: 16 additions & 0 deletions etcd/Client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,22 @@ namespace etcd
std::shared_ptr<grpc_impl::Channel> grpc_channel() const;
#endif

/**
* Set a timeout value for grpc operations.
*/
template <typename Rep = std::micro>
void set_grpc_timeout(std::chrono::duration<Rep> const &timeout) {
this->client->set_grpc_timeout(timeout);
}

/**
* Get the current timeout value for grpc operations.
*/
template <typename Rep = std::micro>
std::chrono::duration<Rep> get_grpc_timeout() const {
return this->client->get_grpc_timeout();
}

/**
* Obtain the underlying synchronous client.
*/
Expand Down
5 changes: 5 additions & 0 deletions etcd/Response.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ namespace etcd
*/
int error_code() const;

/**
* Check whether the response contains a grpc TIMEOUT error.
*/
bool is_grpc_timeout() const;

/**
* Returns the string representation of the error code
*/
Expand Down
19 changes: 19 additions & 0 deletions etcd/SyncClient.hpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#ifndef __ETCD_SYNC_CLIENT_HPP__
#define __ETCD_SYNC_CLIENT_HPP__

#include <chrono>
#include <map>
#include <memory>
#include <mutex>
#include <ratio>
#include <string>

#include "etcd/Response.hpp"
Expand Down Expand Up @@ -711,6 +713,22 @@ namespace etcd
std::shared_ptr<grpc_impl::Channel> grpc_channel() const;
#endif

/**
* Set a timeout value for grpc operations.
*/
template <typename Rep = std::micro>
void set_grpc_timeout(std::chrono::duration<Rep> const &timeout) {
grpc_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(timeout);
}

/**
* Get the current timeout value for grpc operations.
*/
template <typename Rep = std::micro>
std::chrono::duration<Rep> get_grpc_timeout() const {
return std::chrono::duration_cast<std::chrono::duration<Rep>>(grpc_timeout);
}

private:
#if defined(WITH_GRPC_CHANNEL_CLASS)
std::shared_ptr<grpc::Channel> channel;
Expand All @@ -719,6 +737,7 @@ namespace etcd
#endif

mutable std::unique_ptr<TokenAuthenticator, TokenAuthenticatorDeleter> token_authenticator;
mutable std::chrono::microseconds grpc_timeout = std::chrono::microseconds::zero();

struct EtcdServerStubs;
struct EtcdServerStubsDeleter {
Expand Down
10 changes: 10 additions & 0 deletions etcd/v3/Action.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ using etcdserverpb::Lease;
using v3lockpb::Lock;
using v3electionpb::Election;

namespace etcd {
class Response;
}

namespace etcdv3
{
enum class AtomicityType
Expand All @@ -42,12 +46,16 @@ namespace etcdv3
std::string value;
std::string old_value;
std::string auth_token;
std::chrono::microseconds grpc_timeout = std::chrono::microseconds::zero();
KV::Stub* kv_stub;
Watch::Stub* watch_stub;
Lease::Stub* lease_stub;
Lock::Stub* lock_stub;
Election::Stub* election_stub;

bool has_grpc_timeout() const;
std::chrono::system_clock::time_point grpc_deadline() const;

void dump(std::ostream &os) const;
};

Expand All @@ -67,6 +75,8 @@ namespace etcdv3
private:
// Init things like auth token, etc.
void InitAction();

friend class etcd::Response;
};

namespace detail {
Expand Down
1 change: 1 addition & 0 deletions src/KeepAlive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ etcd::KeepAlive::KeepAlive(SyncClient const &client,

etcdv3::ActionParameters params;
params.auth_token.assign(client.current_auth_token());
// n.b.: keepalive: no need for timeout
params.lease_id = this->lease_id;
params.lease_stub = stubs->leaseServiceStub.get();

Expand Down
5 changes: 5 additions & 0 deletions src/Response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ std::string const & etcd::Response::error_message() const
return _error_message;
}

bool etcd::Response::is_grpc_timeout() const
{
return _error_code == grpc::StatusCode::DEADLINE_EXCEEDED;
}

int64_t etcd::Response::index() const
{
return _index;
Expand Down
28 changes: 28 additions & 0 deletions src/SyncClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ std::shared_ptr<etcdv3::AsyncHeadAction> etcd::SyncClient::head_internal()
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncHeadAction>(std::move(params));
}
Expand All @@ -506,6 +507,7 @@ std::shared_ptr<etcdv3::AsyncRangeAction> etcd::SyncClient::get_internal(std::st
params.key.assign(key);
params.withPrefix = false;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncRangeAction>(std::move(params));
}
Expand Down Expand Up @@ -540,6 +542,7 @@ std::shared_ptr<etcdv3::AsyncSetAction> etcd::SyncClient::set_internal(std::stri
params.value.assign(value);
params.lease_id = leaseid;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncSetAction>(std::move(params), false);
}
Expand Down Expand Up @@ -574,6 +577,7 @@ std::shared_ptr<etcdv3::AsyncSetAction> etcd::SyncClient::add_internal(std::stri
params.value.assign(value);
params.lease_id = leaseid;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncSetAction>(std::move(params), true);
}
Expand All @@ -587,6 +591,7 @@ std::shared_ptr<etcdv3::AsyncPutAction> etcd::SyncClient::put_internal(std::stri
params.key.assign(key);
params.value.assign(value);
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncPutAction>(std::move(params));
}
Expand Down Expand Up @@ -621,6 +626,7 @@ std::shared_ptr<etcdv3::AsyncUpdateAction> etcd::SyncClient::modify_internal(std
params.value.assign(value);
params.lease_id = leaseid;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncUpdateAction>(std::move(params));
}
Expand Down Expand Up @@ -681,6 +687,7 @@ std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> etcd::SyncClient::modify_if_i
params.old_revision = old_index;
params.old_value = old_value;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncCompareAndSwapAction>(std::move(params), atomicity_type);
}
Expand All @@ -695,6 +702,7 @@ std::shared_ptr<etcdv3::AsyncDeleteAction> etcd::SyncClient::rm_internal(std::st
params.key.assign(key);
params.withPrefix = false;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncDeleteAction>(std::move(params));
}
Expand All @@ -716,6 +724,7 @@ std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> etcd::SyncClient::rm_if_int
params.old_revision = old_index;
params.old_value = old_value;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncCompareAndDeleteAction>(std::move(params), atomicity_type);
}
Expand All @@ -730,6 +739,7 @@ std::shared_ptr<etcdv3::AsyncDeleteAction> etcd::SyncClient::rmdir_internal(std:
params.key.assign(key);
params.withPrefix = recursive;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncDeleteAction>(std::move(params));
}
Expand All @@ -750,6 +760,7 @@ std::shared_ptr<etcdv3::AsyncDeleteAction> etcd::SyncClient::rmdir_internal(std:
params.range_end.assign(range_end);
params.withPrefix = false;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncDeleteAction>(std::move(params));
}
Expand All @@ -770,6 +781,7 @@ std::shared_ptr<etcdv3::AsyncRangeAction> etcd::SyncClient::ls_internal(std::str
params.withPrefix = true;
params.limit = limit;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncRangeAction>(std::move(params));
}
Expand All @@ -791,6 +803,7 @@ std::shared_ptr<etcdv3::AsyncRangeAction> etcd::SyncClient::ls_internal(std::str
params.withPrefix = false;
params.limit = limit;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncRangeAction>(std::move(params));
}
Expand All @@ -811,6 +824,7 @@ std::shared_ptr<etcdv3::AsyncWatchAction> etcd::SyncClient::watch_internal(std::
params.withPrefix = recursive;
params.revision = fromIndex;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.watch_stub = stubs->watchServiceStub.get();
return std::make_shared<etcdv3::AsyncWatchAction>(std::move(params));
}
Expand All @@ -837,6 +851,7 @@ std::shared_ptr<etcdv3::AsyncWatchAction> etcd::SyncClient::watch_internal(std::
params.withPrefix = false;
params.revision = fromIndex;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.watch_stub = stubs->watchServiceStub.get();
return std::make_shared<etcdv3::AsyncWatchAction>(std::move(params));
}
Expand All @@ -850,6 +865,7 @@ etcd::Response etcd::SyncClient::leasegrant(int ttl)
return Response::create<etcdv3::AsyncLeaseGrantAction>([this, ttl]() {
etcdv3::ActionParameters params;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.lease_stub = stubs->leaseServiceStub.get();
params.ttl = ttl;
return std::make_shared<etcdv3::AsyncLeaseGrantAction>(std::move(params));
Expand All @@ -860,6 +876,7 @@ std::shared_ptr<etcd::KeepAlive> etcd::SyncClient::leasekeepalive(int ttl) {
etcdv3::ActionParameters params;
params.ttl = ttl;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.lease_stub = stubs->leaseServiceStub.get();

// keep alive is synchronous in two folds:
Expand All @@ -881,6 +898,7 @@ std::shared_ptr<etcdv3::AsyncLeaseRevokeAction> etcd::SyncClient::leaserevoke_in
etcdv3::ActionParameters params;
params.lease_id = lease_id;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.lease_stub = stubs->leaseServiceStub.get();
return std::make_shared<etcdv3::AsyncLeaseRevokeAction>(std::move(params));
}
Expand All @@ -894,6 +912,7 @@ std::shared_ptr<etcdv3::AsyncLeaseTimeToLiveAction> etcd::SyncClient::leasetimet
etcdv3::ActionParameters params;
params.lease_id = lease_id;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.lease_stub = stubs->leaseServiceStub.get();
return std::make_shared<etcdv3::AsyncLeaseTimeToLiveAction>(std::move(params));
}
Expand All @@ -916,6 +935,7 @@ etcd::Response etcd::SyncClient::lock_internal(std::string const &key, std::shar
params.key = key;
params.lease_id = keepalive->Lease();
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.lock_stub = stubs->lockServiceStub.get();

{
Expand Down Expand Up @@ -948,6 +968,7 @@ std::shared_ptr<etcdv3::AsyncLockAction> etcd::SyncClient::lock_with_lease_inter
params.key = key;
params.lease_id = lease_id;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.lock_stub = stubs->lockServiceStub.get();
return std::make_shared<etcdv3::AsyncLockAction>(std::move(params));
}
Expand All @@ -960,6 +981,7 @@ std::shared_ptr<etcdv3::AsyncUnlockAction> etcd::SyncClient::unlock_internal(std
etcdv3::ActionParameters params;
params.key = lock_key;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.lock_stub = stubs->lockServiceStub.get();

// issue a "unlock" request
Expand Down Expand Up @@ -1002,6 +1024,7 @@ etcd::Response etcd::SyncClient::txn(etcdv3::Transaction const &txn) {
std::shared_ptr<etcdv3::AsyncTxnAction> etcd::SyncClient::txn_internal(etcdv3::Transaction const &txn) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncTxnAction>(std::move(params), txn);
}
Expand All @@ -1018,6 +1041,7 @@ std::shared_ptr<etcdv3::AsyncCampaignAction> etcd::SyncClient::campaign_internal
params.lease_id = lease_id;
params.value = value;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.election_stub = stubs->electionServiceStub.get();
return std::make_shared<etcdv3::AsyncCampaignAction>(std::move(params));
}
Expand All @@ -1038,6 +1062,7 @@ std::shared_ptr<etcdv3::AsyncProclaimAction> etcd::SyncClient::proclaim_internal
params.revision = revision;
params.value = value;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.election_stub = stubs->electionServiceStub.get();
return std::make_shared<etcdv3::AsyncProclaimAction>(std::move(params));
}
Expand All @@ -1050,6 +1075,7 @@ std::shared_ptr<etcdv3::AsyncLeaderAction> etcd::SyncClient::leader_internal(std
etcdv3::ActionParameters params;
params.name = name;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.election_stub = stubs->electionServiceStub.get();
return std::make_shared<etcdv3::AsyncLeaderAction>(std::move(params));
}
Expand All @@ -1059,6 +1085,7 @@ std::unique_ptr<etcd::SyncClient::Observer> etcd::SyncClient::observe(
etcdv3::ActionParameters params;
params.name.assign(name);
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.election_stub = stubs->electionServiceStub.get();
std::unique_ptr<etcd::SyncClient::Observer> observer(new Observer());
observer->action = std::make_shared<etcdv3::AsyncObserveAction>(std::move(params));
Expand All @@ -1078,6 +1105,7 @@ std::shared_ptr<etcdv3::AsyncResignAction> etcd::SyncClient::resign_internal(std
params.key = key;
params.revision = revision;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.election_stub = stubs->electionServiceStub.get();
return std::make_shared<etcdv3::AsyncResignAction>(std::move(params));
}
Expand Down
Loading

0 comments on commit e5f1167

Please sign in to comment.