From f8eedac1fc59823d4c478663f12b53b71c2d682a Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Fri, 10 Jun 2022 10:56:34 -0700 Subject: [PATCH] Changing grpc_tcp_client_vtable to include TCP cancel connect method (#29968) * Changing grpc_tcp_client_vtable to include TCP cancel connect method * fix unused parameter error * update * fix sanity checks --- src/core/lib/iomgr/tcp_client.cc | 19 +++++++----- src/core/lib/iomgr/tcp_client.h | 30 ++++++++++++------- src/core/lib/iomgr/tcp_client_cfstream.cc | 20 ++++++++----- src/core/lib/iomgr/tcp_client_posix.cc | 18 ++++++----- src/core/lib/iomgr/tcp_client_windows.cc | 18 ++++++----- test/core/end2end/fuzzers/api_fuzzer.cc | 18 +++++++---- test/cpp/end2end/connection_delay_injector.cc | 23 +++++++++----- 7 files changed, 94 insertions(+), 52 deletions(-) diff --git a/src/core/lib/iomgr/tcp_client.cc b/src/core/lib/iomgr/tcp_client.cc index f252f207ebef4..aba610cee7064 100644 --- a/src/core/lib/iomgr/tcp_client.cc +++ b/src/core/lib/iomgr/tcp_client.cc @@ -22,13 +22,18 @@ grpc_tcp_client_vtable* grpc_tcp_client_impl; -void grpc_tcp_client_connect(grpc_closure* on_connect, grpc_endpoint** endpoint, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline) { - grpc_tcp_client_impl->connect(on_connect, endpoint, interested_parties, - channel_args, addr, deadline); +int64_t grpc_tcp_client_connect(grpc_closure* on_connect, + grpc_endpoint** endpoint, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, + grpc_core::Timestamp deadline) { + return grpc_tcp_client_impl->connect(on_connect, endpoint, interested_parties, + channel_args, addr, deadline); +} + +bool grpc_tcp_client_cancel_connect(int64_t connection_handle) { + return grpc_tcp_client_impl->cancel_connect(connection_handle); } void grpc_set_tcp_client_impl(grpc_tcp_client_vtable* impl) { diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index 1dfb8d2574067..59a363c89d7af 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -30,23 +30,31 @@ #include "src/core/lib/resource_quota/memory_quota.h" typedef struct grpc_tcp_client_vtable { - void (*connect)(grpc_closure* on_connect, grpc_endpoint** endpoint, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline); + int64_t (*connect)(grpc_closure* on_connect, grpc_endpoint** endpoint, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, + grpc_core::Timestamp deadline); + bool (*cancel_connect)(int64_t connection_handle); } grpc_tcp_client_vtable; /* Asynchronously connect to an address (specified as (addr, len)), and call cb with arg and the completed connection when done (or call cb with arg and NULL on failure). interested_parties points to a set of pollsets that would be interested - in this connection being established (in order to continue their work) */ -void grpc_tcp_client_connect(grpc_closure* on_connect, grpc_endpoint** endpoint, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline); + in this connection being established (in order to continue their work). It + returns a handle to the connect operation which can be used to cancel the + connection attempt. */ +int64_t grpc_tcp_client_connect(grpc_closure* on_connect, + grpc_endpoint** endpoint, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, + grpc_core::Timestamp deadline); + +// Returns true if a connect attempt corresponding to the provided handle +// is successfully cancelled. Otherwise it returns false. +bool grpc_tcp_client_cancel_connect(int64_t connection_handle); void grpc_tcp_client_global_init(); diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc index 8e2190bedf4c4..de42f01245e4e 100644 --- a/src/core/lib/iomgr/tcp_client_cfstream.cc +++ b/src/core/lib/iomgr/tcp_client_cfstream.cc @@ -149,17 +149,17 @@ static void ParseResolvedAddress(const grpc_resolved_address* addr, *port = grpc_sockaddr_get_port(addr); } -static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* resolved_addr, - grpc_core::Timestamp deadline) { +static int64_t CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* resolved_addr, + grpc_core::Timestamp deadline) { auto addr_uri = grpc_sockaddr_to_uri(resolved_addr); if (!addr_uri.ok()) { grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING(addr_uri.status().ToString()); grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error); - return; + return 0; } CFStreamConnect* connect = new CFStreamConnect(); @@ -198,8 +198,14 @@ static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep, CFWriteStreamOpen(write_stream); grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm); gpr_mu_unlock(&connect->mu); + return 0; } -grpc_tcp_client_vtable grpc_cfstream_client_vtable = {CFStreamClientConnect}; +static bool CFStreamClientCancelConnect(int64_t /*connection_handle*/) { + return false; +} + +grpc_tcp_client_vtable grpc_cfstream_client_vtable = { + CFStreamClientConnect, CFStreamClientCancelConnect}; #endif /* GRPC_CFSTREAM_CLIENT */ diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 82b50e26ec65c..389c2734aeb19 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -328,11 +328,11 @@ void grpc_tcp_client_create_from_prepared_fd( gpr_mu_unlock(&ac->mu); } -static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline) { +static int64_t tcp_connect(grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, + grpc_core::Timestamp deadline) { grpc_resolved_address mapped_addr; int fd = -1; grpc_error_handle error; @@ -340,12 +340,16 @@ static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep, if ((error = grpc_tcp_client_prepare_fd(channel_args, addr, &mapped_addr, &fd)) != GRPC_ERROR_NONE) { grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error); - return; + return 0; } grpc_tcp_client_create_from_prepared_fd(interested_parties, closure, fd, channel_args, &mapped_addr, deadline, ep); + return 0; } -grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {tcp_connect}; +static bool tcp_cancel_connect(int64_t /*connection_handle*/) { return false; } + +grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {tcp_connect, + tcp_cancel_connect}; #endif diff --git a/src/core/lib/iomgr/tcp_client_windows.cc b/src/core/lib/iomgr/tcp_client_windows.cc index 0ca044ea021be..f419c075ccd47 100644 --- a/src/core/lib/iomgr/tcp_client_windows.cc +++ b/src/core/lib/iomgr/tcp_client_windows.cc @@ -121,11 +121,11 @@ static void on_connect(void* acp, grpc_error_handle error) { /* Tries to issue one async connection, then schedules both an IOCP notification request for the connection, and one timeout alert. */ -static void tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline) { +static int64_t tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, + grpc_core::Timestamp deadline) { SOCKET sock = INVALID_SOCKET; BOOL success; int status; @@ -216,7 +216,7 @@ static void tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint, grpc_timer_init(&ac->alarm, deadline, &ac->on_alarm); grpc_socket_notify_on_write(socket, &ac->on_connect); gpr_mu_unlock(&ac->mu); - return; + return 0; failure: GPR_ASSERT(!GRPC_ERROR_IS_NONE(error)); @@ -232,8 +232,12 @@ static void tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint, closesocket(sock); } grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, final_error); + return 0; } -grpc_tcp_client_vtable grpc_windows_tcp_client_vtable = {tcp_connect}; +static bool tcp_cancel_connect(int64_t /*connection_handle*/) { return false; } + +grpc_tcp_client_vtable grpc_windows_tcp_client_vtable = {tcp_connect, + tcp_cancel_connect}; #endif /* GRPC_WINSOCK_SOCKET */ diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index c72e9727499b5..3dad73e0184be 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -258,15 +258,21 @@ static void sched_connect(grpc_closure* closure, grpc_endpoint** ep, GRPC_CLOSURE_CREATE(do_connect, fc, grpc_schedule_on_exec_ctx)); } -static void my_tcp_client_connect(grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* /*interested_parties*/, - const grpc_channel_args* /*channel_args*/, - const grpc_resolved_address* /*addr*/, - grpc_core::Timestamp deadline) { +static int64_t my_tcp_client_connect(grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* /*interested_parties*/, + const grpc_channel_args* /*channel_args*/, + const grpc_resolved_address* /*addr*/, + grpc_core::Timestamp deadline) { sched_connect(closure, ep, deadline.as_timespec(GPR_CLOCK_MONOTONIC)); + return 0; } -grpc_tcp_client_vtable fuzz_tcp_client_vtable = {my_tcp_client_connect}; +static bool my_tcp_cancel_connect(int64_t /*connection_handle*/) { + return false; +} + +grpc_tcp_client_vtable fuzz_tcp_client_vtable = {my_tcp_client_connect, + my_tcp_cancel_connect}; //////////////////////////////////////////////////////////////////////////////// // test driver diff --git a/test/cpp/end2end/connection_delay_injector.cc b/test/cpp/end2end/connection_delay_injector.cc index d3bd3d146d81f..c93c4603456d4 100644 --- a/test/cpp/end2end/connection_delay_injector.cc +++ b/test/cpp/end2end/connection_delay_injector.cc @@ -38,22 +38,31 @@ grpc_tcp_client_vtable* g_original_vtable = nullptr; grpc_core::Mutex* g_mu = nullptr; ConnectionAttemptInjector* g_injector ABSL_GUARDED_BY(*g_mu) = nullptr; -void TcpConnectWithDelay(grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline) { +int64_t TcpConnectWithDelay(grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, + grpc_core::Timestamp deadline) { grpc_core::MutexLock lock(g_mu); if (g_injector == nullptr) { g_original_vtable->connect(closure, ep, interested_parties, channel_args, addr, deadline); - return; + return 0; } g_injector->HandleConnection(closure, ep, interested_parties, channel_args, addr, deadline); + return 0; } -grpc_tcp_client_vtable kDelayedConnectVTable = {TcpConnectWithDelay}; +// TODO(vigneshbabu): This method should check whether the connect attempt has +// actually been started, and if so, it should call +// g_original_vtable->cancel_connect(). If the attempt has not actually been +// started, it should mark the connect request as cancelled, so that when the +// request is resumed, it will not actually proceed. +bool TcpConnectCancel(int64_t /*connection_handle*/) { return false; } + +grpc_tcp_client_vtable kDelayedConnectVTable = {TcpConnectWithDelay, + TcpConnectCancel}; } // namespace