Skip to content

Commit

Permalink
Changing grpc_tcp_client_vtable to include TCP cancel connect method (#…
Browse files Browse the repository at this point in the history
…29968)

* Changing grpc_tcp_client_vtable to include TCP cancel connect method

* fix unused parameter error

* update

* fix sanity checks
  • Loading branch information
Vignesh2208 committed Jun 10, 2022
1 parent a0da0ab commit f8eedac
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 52 deletions.
19 changes: 12 additions & 7 deletions src/core/lib/iomgr/tcp_client.cc
Expand Up @@ -22,13 +22,18 @@

grpc_tcp_client_vtable* grpc_tcp_client_impl;

void grpc_tcp_client_connect(grpc_closure* on_connect, grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) {
grpc_tcp_client_impl->connect(on_connect, endpoint, interested_parties,
channel_args, addr, deadline);
int64_t grpc_tcp_client_connect(grpc_closure* on_connect,
grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) {
return grpc_tcp_client_impl->connect(on_connect, endpoint, interested_parties,
channel_args, addr, deadline);
}

bool grpc_tcp_client_cancel_connect(int64_t connection_handle) {
return grpc_tcp_client_impl->cancel_connect(connection_handle);
}

void grpc_set_tcp_client_impl(grpc_tcp_client_vtable* impl) {
Expand Down
30 changes: 19 additions & 11 deletions src/core/lib/iomgr/tcp_client.h
Expand Up @@ -30,23 +30,31 @@
#include "src/core/lib/resource_quota/memory_quota.h"

typedef struct grpc_tcp_client_vtable {
void (*connect)(grpc_closure* on_connect, grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);
int64_t (*connect)(grpc_closure* on_connect, grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);
bool (*cancel_connect)(int64_t connection_handle);
} grpc_tcp_client_vtable;

/* Asynchronously connect to an address (specified as (addr, len)), and call
cb with arg and the completed connection when done (or call cb with arg and
NULL on failure).
interested_parties points to a set of pollsets that would be interested
in this connection being established (in order to continue their work) */
void grpc_tcp_client_connect(grpc_closure* on_connect, grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);
in this connection being established (in order to continue their work). It
returns a handle to the connect operation which can be used to cancel the
connection attempt. */
int64_t grpc_tcp_client_connect(grpc_closure* on_connect,
grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);

// Returns true if a connect attempt corresponding to the provided handle
// is successfully cancelled. Otherwise it returns false.
bool grpc_tcp_client_cancel_connect(int64_t connection_handle);

void grpc_tcp_client_global_init();

Expand Down
20 changes: 13 additions & 7 deletions src/core/lib/iomgr/tcp_client_cfstream.cc
Expand Up @@ -149,17 +149,17 @@ static void ParseResolvedAddress(const grpc_resolved_address* addr,
*port = grpc_sockaddr_get_port(addr);
}

static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* resolved_addr,
grpc_core::Timestamp deadline) {
static int64_t CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* resolved_addr,
grpc_core::Timestamp deadline) {
auto addr_uri = grpc_sockaddr_to_uri(resolved_addr);
if (!addr_uri.ok()) {
grpc_error_handle error =
GRPC_ERROR_CREATE_FROM_CPP_STRING(addr_uri.status().ToString());
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
return;
return 0;
}

CFStreamConnect* connect = new CFStreamConnect();
Expand Down Expand Up @@ -198,8 +198,14 @@ static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep,
CFWriteStreamOpen(write_stream);
grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm);
gpr_mu_unlock(&connect->mu);
return 0;
}

grpc_tcp_client_vtable grpc_cfstream_client_vtable = {CFStreamClientConnect};
static bool CFStreamClientCancelConnect(int64_t /*connection_handle*/) {
return false;
}

grpc_tcp_client_vtable grpc_cfstream_client_vtable = {
CFStreamClientConnect, CFStreamClientCancelConnect};

#endif /* GRPC_CFSTREAM_CLIENT */
18 changes: 11 additions & 7 deletions src/core/lib/iomgr/tcp_client_posix.cc
Expand Up @@ -328,24 +328,28 @@ void grpc_tcp_client_create_from_prepared_fd(
gpr_mu_unlock(&ac->mu);
}

static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) {
static int64_t tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) {
grpc_resolved_address mapped_addr;
int fd = -1;
grpc_error_handle error;
*ep = nullptr;
if ((error = grpc_tcp_client_prepare_fd(channel_args, addr, &mapped_addr,
&fd)) != GRPC_ERROR_NONE) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
return;
return 0;
}
grpc_tcp_client_create_from_prepared_fd(interested_parties, closure, fd,
channel_args, &mapped_addr, deadline,
ep);
return 0;
}

grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {tcp_connect};
static bool tcp_cancel_connect(int64_t /*connection_handle*/) { return false; }

grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {tcp_connect,
tcp_cancel_connect};
#endif
18 changes: 11 additions & 7 deletions src/core/lib/iomgr/tcp_client_windows.cc
Expand Up @@ -121,11 +121,11 @@ static void on_connect(void* acp, grpc_error_handle error) {

/* Tries to issue one async connection, then schedules both an IOCP
notification request for the connection, and one timeout alert. */
static void tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) {
static int64_t tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) {
SOCKET sock = INVALID_SOCKET;
BOOL success;
int status;
Expand Down Expand Up @@ -216,7 +216,7 @@ static void tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint,
grpc_timer_init(&ac->alarm, deadline, &ac->on_alarm);
grpc_socket_notify_on_write(socket, &ac->on_connect);
gpr_mu_unlock(&ac->mu);
return;
return 0;

failure:
GPR_ASSERT(!GRPC_ERROR_IS_NONE(error));
Expand All @@ -232,8 +232,12 @@ static void tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint,
closesocket(sock);
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, final_error);
return 0;
}

grpc_tcp_client_vtable grpc_windows_tcp_client_vtable = {tcp_connect};
static bool tcp_cancel_connect(int64_t /*connection_handle*/) { return false; }

grpc_tcp_client_vtable grpc_windows_tcp_client_vtable = {tcp_connect,
tcp_cancel_connect};

#endif /* GRPC_WINSOCK_SOCKET */
18 changes: 12 additions & 6 deletions test/core/end2end/fuzzers/api_fuzzer.cc
Expand Up @@ -258,15 +258,21 @@ static void sched_connect(grpc_closure* closure, grpc_endpoint** ep,
GRPC_CLOSURE_CREATE(do_connect, fc, grpc_schedule_on_exec_ctx));
}

static void my_tcp_client_connect(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* /*interested_parties*/,
const grpc_channel_args* /*channel_args*/,
const grpc_resolved_address* /*addr*/,
grpc_core::Timestamp deadline) {
static int64_t my_tcp_client_connect(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* /*interested_parties*/,
const grpc_channel_args* /*channel_args*/,
const grpc_resolved_address* /*addr*/,
grpc_core::Timestamp deadline) {
sched_connect(closure, ep, deadline.as_timespec(GPR_CLOCK_MONOTONIC));
return 0;
}

grpc_tcp_client_vtable fuzz_tcp_client_vtable = {my_tcp_client_connect};
static bool my_tcp_cancel_connect(int64_t /*connection_handle*/) {
return false;
}

grpc_tcp_client_vtable fuzz_tcp_client_vtable = {my_tcp_client_connect,
my_tcp_cancel_connect};

////////////////////////////////////////////////////////////////////////////////
// test driver
Expand Down
23 changes: 16 additions & 7 deletions test/cpp/end2end/connection_delay_injector.cc
Expand Up @@ -38,22 +38,31 @@ grpc_tcp_client_vtable* g_original_vtable = nullptr;
grpc_core::Mutex* g_mu = nullptr;
ConnectionAttemptInjector* g_injector ABSL_GUARDED_BY(*g_mu) = nullptr;

void TcpConnectWithDelay(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) {
int64_t TcpConnectWithDelay(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) {
grpc_core::MutexLock lock(g_mu);
if (g_injector == nullptr) {
g_original_vtable->connect(closure, ep, interested_parties, channel_args,
addr, deadline);
return;
return 0;
}
g_injector->HandleConnection(closure, ep, interested_parties, channel_args,
addr, deadline);
return 0;
}

grpc_tcp_client_vtable kDelayedConnectVTable = {TcpConnectWithDelay};
// TODO(vigneshbabu): This method should check whether the connect attempt has
// actually been started, and if so, it should call
// g_original_vtable->cancel_connect(). If the attempt has not actually been
// started, it should mark the connect request as cancelled, so that when the
// request is resumed, it will not actually proceed.
bool TcpConnectCancel(int64_t /*connection_handle*/) { return false; }

grpc_tcp_client_vtable kDelayedConnectVTable = {TcpConnectWithDelay,
TcpConnectCancel};

} // namespace

Expand Down

0 comments on commit f8eedac

Please sign in to comment.