Skip to content

Commit

Permalink
Fixes the noisy logs when meets invalid addresses.
Browse files Browse the repository at this point in the history
Signed-off-by: Tao He <sighingnow@gmail.com>
  • Loading branch information
sighingnow committed Dec 19, 2023
1 parent 84343ca commit aa2ca6e
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 41 deletions.
5 changes: 4 additions & 1 deletion etcd/Watcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,11 @@ class Watcher {
* Note that you shouldn't use the watcher itself inside the `Wait()` callback
* as the callback will be invoked in a separate **detached** thread where the
* watcher may have been destroyed.
*
* @return true if the callback has been set successfully (no existing
* callback).
*/
void Wait(std::function<void(bool)> callback);
bool Wait(std::function<void(bool)> callback);

/**
* Stop the watching action.
Expand Down
8 changes: 6 additions & 2 deletions src/KeepAlive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,12 @@ void etcd::KeepAlive::refresh() {
if (!continue_next.load()) {
return;
}
std::cerr << "Warn: awaked from condition_variable" +
" but continue_next is not set, maybe due to clock drift." << std::endl;
#ifndef NDEBUG
std::cerr
<< "[warn] awaked from condition_variable but continue_next is "
"not set, maybe due to clock drift."
<< std::endl;
#endif
}
}

Expand Down
82 changes: 54 additions & 28 deletions src/SyncClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <grpc++/grpc++.h>
#include <grpc++/security/credentials.h>
#include <grpc++/support/status_code_enum.h>
#include <grpc/grpc.h> // for grpc_lame_client_channel_create()

#include "proto/rpc.grpc.pb.h"
#include "proto/v3election.grpc.pb.h"
Expand All @@ -49,6 +50,15 @@
#include "etcd/v3/Transaction.hpp"
#include "etcd/v3/action_constants.hpp"

namespace grpc {
// forward declaration for compatibility with older grpc versions
std::shared_ptr<Channel> CreateChannelInternal(
const std::string& host, grpc_channel* c_channel,
std::vector<
std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
interceptor_creators);
} // namespace grpc

namespace etcd {
namespace detail {

Expand All @@ -70,7 +80,7 @@ static void string_split(std::vector<std::string>& dests,
}

static std::string string_join(std::vector<std::string> const& srcs,
std::string const sep) {
std::string const& sep) {
std::stringstream ss;
if (!srcs.empty()) {
ss << srcs[0];
Expand All @@ -91,7 +101,9 @@ static bool dns_resolve(std::string const& target,
std::vector<std::string> target_parts;
string_split(target_parts, target, ":");
if (target_parts.size() != 2) {
std::cerr << "warn: invalid URL: " << target << std::endl;
#ifndef NDEBUG
std::cerr << "[warn] invalid URL: " << target << std::endl;
#endif
return false;
}

Expand All @@ -104,7 +116,10 @@ static bool dns_resolve(std::string const& target,
int err = WSAStartup(wVersionRequested, &wsaData);
if (err != 0) {
// Tell the user that we could not find a usable Winsock DLL.
std::cerr << "WSAStartup failed with error: %d" << err << std::endl;
#ifndef NDEBUG
std::cerr << "[warn] WSAStartup failed with error: %d" << err
<< std::endl;
#endif
return false;
}
}
Expand All @@ -113,8 +128,10 @@ static bool dns_resolve(std::string const& target,
int r = getaddrinfo(target_parts[0].c_str(), target_parts[1].c_str(), &hints,
&addrs);
if (r != 0) {
std::cerr << "warn: getaddrinfo() failed for endpoint " << target
#ifndef NDEBUG
std::cerr << "[warn] getaddrinfo() failed for endpoint " << target
<< " with error: " << r << std::endl;
#endif
return false;
}

Expand Down Expand Up @@ -175,10 +192,12 @@ static std::string read_from_file(std::string const& filename) {
file.close();
return ss.str();
} else {
std::cerr << "[ERROR] failed to load given file '" << filename << "', "
#ifndef NDEBUG
std::cerr << "[error] failed to load given file '" << filename << "', "
<< strerror(errno) << std::endl;
#endif
return std::string{};
}
return std::string{};
}

static grpc::SslCredentialsOptions make_ssl_credentials(
Expand All @@ -195,6 +214,25 @@ std::unique_ptr<T> make_unique_ptr(Args&&... args) {
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}

static std::shared_ptr<grpc::Channel> create_grpc_channel(
const std::string& address,
const std::shared_ptr<grpc::ChannelCredentials> creds,
const grpc::ChannelArguments& grpc_args) {
const std::string addresses =
etcd::detail::strip_and_resolve_addresses(address);
if (addresses.empty() || addresses == "ipv4:///") {
// bypass grpc initialization to avoid noisy logs from grpc
return grpc::CreateChannelInternal(
"",
grpc_lame_client_channel_create(addresses.c_str(), GRPC_STATUS_INTERNAL,
"the target uri is not valid"),
std::vector<std::unique_ptr<
grpc::experimental::ClientInterceptorFactoryInterface>>());
} else {
return grpc::CreateCustomChannel(addresses, creds, grpc_args);
}
}

} // namespace detail
} // namespace etcd

Expand Down Expand Up @@ -271,15 +309,13 @@ void etcd::SyncClient::EtcdServerStubsDeleter::operator()(
etcd::SyncClient::SyncClient(std::string const& address,
std::string const& load_balancer) {
// create channels
std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
std::shared_ptr<grpc::ChannelCredentials> creds =
grpc::InsecureChannelCredentials();
grpc_args.SetLoadBalancingPolicyName(load_balancer);
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args);
this->token_authenticator.reset(new TokenAuthenticator());

// create stubs
Expand All @@ -294,14 +330,12 @@ etcd::SyncClient::SyncClient(std::string const& address,
etcd::SyncClient::SyncClient(std::string const& address,
grpc::ChannelArguments const& arguments) {
// create channels
std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args = arguments;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
std::shared_ptr<grpc::ChannelCredentials> creds =
grpc::InsecureChannelCredentials();
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args);
this->token_authenticator.reset(new TokenAuthenticator());

// create stubs
Expand Down Expand Up @@ -329,15 +363,13 @@ etcd::SyncClient::SyncClient(std::string const& address,
int const auth_token_ttl,
std::string const& load_balancer) {
// create channels
std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
std::shared_ptr<grpc::ChannelCredentials> creds =
grpc::InsecureChannelCredentials();
grpc_args.SetLoadBalancingPolicyName(load_balancer);
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args);

// auth
this->token_authenticator.reset(new TokenAuthenticator(
Expand All @@ -358,14 +390,12 @@ etcd::SyncClient::SyncClient(std::string const& address,
int const auth_token_ttl,
grpc::ChannelArguments const& arguments) {
// create channels
std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args = arguments;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
std::shared_ptr<grpc::ChannelCredentials> creds =
grpc::InsecureChannelCredentials();
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args);

// auth
this->token_authenticator.reset(new TokenAuthenticator(
Expand Down Expand Up @@ -403,8 +433,6 @@ etcd::SyncClient::SyncClient(std::string const& address, std::string const& ca,
std::string const& target_name_override,
std::string const& load_balancer) {
// create channels
std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
Expand All @@ -415,7 +443,7 @@ etcd::SyncClient::SyncClient(std::string const& address, std::string const& ca,
grpc_args.SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
target_name_override);
}
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args);
this->token_authenticator.reset(new TokenAuthenticator());

// setup stubs
Expand All @@ -433,8 +461,6 @@ etcd::SyncClient::SyncClient(std::string const& address, std::string const& ca,
std::string const& target_name_override,
grpc::ChannelArguments const& arguments) {
// create channels
std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args = arguments;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
Expand All @@ -444,7 +470,7 @@ etcd::SyncClient::SyncClient(std::string const& address, std::string const& ca,
grpc_args.SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
target_name_override);
}
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args);
this->token_authenticator.reset(new TokenAuthenticator());

// setup stubs
Expand Down Expand Up @@ -1005,15 +1031,15 @@ std::shared_ptr<etcdv3::AsyncUnlockAction> etcd::SyncClient::unlock_internal(
if (p_keeps_alive != this->keep_alive_for_locks.end()) {
this->keep_alive_for_locks.erase(p_keeps_alive);
} else {
#if !defined(NDEBUG)
std::cerr << "Keepalive for lease not found" << std::endl;
#ifndef NDEBUG
std::cerr << "[warn] keepalive for lease not found" << std::endl;
#endif
}
lock_lease_id = p_leases->second;
this->leases_for_locks.erase(p_leases);
} else {
#if !defined(NDEBUG)
std::cerr << "Lease for lock not found" << std::endl;
#ifndef NDEBUG
std::cerr << "[warn] lease for lock not found" << std::endl;
#endif
}
if (lock_lease_id != 0) {
Expand Down
13 changes: 9 additions & 4 deletions src/Watcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,18 @@ bool etcd::Watcher::Wait() {
return stubs->call->Cancelled();
}

void etcd::Watcher::Wait(std::function<void(bool)> callback) {
bool etcd::Watcher::Wait(std::function<void(bool)> callback) {
if (wait_callback == nullptr) {
wait_callback = callback;
return true;
} else {
std::cerr << "Failed to set a asynchronous wait callback since it has "
"already been set"
<< std::endl;
#ifndef NDEBUG
std::cerr
<< "[warn] failed to set a asynchronous wait callback since it has "
"already been set"
<< std::endl;
#endif
return false;
}
}

Expand Down
23 changes: 17 additions & 6 deletions src/v3/AsyncGRPC.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,12 @@ void etcdv3::AsyncTxnResponse::ParseResponse(TxnResponse& reply) {
}

// skip
std::cerr << "Not implemented error: unable to parse nested transaction "
#ifndef NDEBUG
std::cerr << "[debug] not implemented error: unable to parse nested "
"transaction "
"response"
<< std::endl;
#endif
}
}
if (!values.empty()) {
Expand Down Expand Up @@ -629,17 +632,22 @@ void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive() {
got_tag == (void*) etcdv3::KEEPALIVE_DONE) {
// ok
} else {
std::cerr << "Failed to mark a lease keep-alive connection as DONE: "
<< context.debug_error_string() << std::endl;
#ifndef NDEBUG
std::cerr
<< "[debug] failed to mark a lease keep-alive connection as DONE: "
<< context.debug_error_string() << std::endl;
#endif
}

stream->Finish(&status, (void*) KEEPALIVE_FINISH);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*) KEEPALIVE_FINISH) {
// ok
} else {
std::cerr << "Failed to finish a lease keep-alive connection: "
#ifndef NDEBUG
std::cerr << "[debug] failed to finish a lease keep-alive connection: "
<< status.error_message() << ", "
<< context.debug_error_string() << std::endl;
#endif
}

// cancel on-the-fly calls
Expand Down Expand Up @@ -820,8 +828,10 @@ void etcdv3::AsyncObserveAction::CancelObserve() {
break;
case CompletionQueue::NextStatus::GOT_EVENT:
if (!ok || got_tag != (void*) ELECTION_OBSERVE_FINISH) {
std::cerr << "Failed to finish a election observing connection"
#ifndef NDEBUG
std::cerr << "[debug] failed to finish a election observing connection"
<< std::endl;
#endif
}
}

Expand Down Expand Up @@ -1165,7 +1175,9 @@ void etcdv3::AsyncWatchAction::waitForResponse() {
switch (cq_.AsyncNext(&got_tag, &ok, deadline)) {
case CompletionQueue::NextStatus::TIMEOUT:
case CompletionQueue::NextStatus::SHUTDOWN: {
#ifndef NDEBUG
std::cerr << "[warn] watcher does't exit normally" << std::endl;
#endif
// pretend to be received a "WATCH_FINISH" tag: shutdown
context.TryCancel();
cq_.Shutdown();
Expand Down Expand Up @@ -1219,7 +1231,6 @@ void etcdv3::AsyncWatchAction::waitForResponse() {
<< std::endl;
}

std::cout << "issue a watch cancel" << std::endl;
// cancel the watcher after receiving the good response
this->CancelWatch();

Expand Down

0 comments on commit aa2ca6e

Please sign in to comment.