Skip to content

Commit

Permalink
[backport][iomgr][EventEngine] Improve server handling of file descri…
Browse files Browse the repository at this point in the history
…ptor exhaustion (#33672)

Backport of #33656
  • Loading branch information
drfloob committed Jul 13, 2023
1 parent aff3066 commit 1e86ca5
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 14 deletions.
1 change: 1 addition & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1909,6 +1909,7 @@ grpc_cc_library(
"posix_event_engine_tcp_socket_utils",
"socket_mutator",
"status_helper",
"time",
"//:event_engine_base_hdrs",
"//:gpr",
],
Expand Down
1 change: 1 addition & 0 deletions src/core/lib/event_engine/posix_engine/posix_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport,
const DNSResolver::ResolverOptions& options) override;
void Run(Closure* closure) override;
void Run(absl::AnyInvocable<void()> closure) override;
// Caution!! The timer implementation cannot create any fds. See #20418.
TaskHandle RunAfter(Duration when, Closure* closure) override;
TaskHandle RunAfter(Duration when,
absl::AnyInvocable<void()> closure) override;
Expand Down
29 changes: 29 additions & 0 deletions src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
#include <sys/socket.h> // IWYU pragma: keep
#include <unistd.h> // IWYU pragma: keep

#include <atomic>
#include <string>
#include <tuple>
#include <utility>

#include "absl/functional/any_invocable.h"
Expand All @@ -41,6 +43,7 @@
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/socket_mutator.h"

namespace grpc_event_engine {
Expand Down Expand Up @@ -133,6 +136,32 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept(
switch (errno) {
case EINTR:
continue;
case EMFILE:
// When the process runs out of fds, accept4() returns EMFILE. When
// this happens, the connection is left in the accept queue until
// either a read event triggers the on_read callback, or time has
// passed and the accept should be re-tried regardless. This callback
// is not cancelled, so a spurious wakeup may occur even when there's
// nothing to accept. This is not a performant code path, but if an fd
// limit has been reached, the system is likely in an unhappy state
// regardless.
GRPC_LOG_EVERY_N_SEC(1, "%s",
"File descriptor limit reached. Retrying.");
handle_->NotifyOnRead(notify_on_accept_);
// Do not schedule another timer if one is already armed.
if (retry_timer_armed_.exchange(true)) return;
// Hold a ref while the retry timer is waiting, to prevent listener
// destruction and the races that would ensue.
Ref();

This comment has been minimized.

Copy link
@xtofl

xtofl Oct 2, 2023

Isn't there a way to move the ref to the closure of the lambda at line 157, so it becomes clear that Unref is called even if the job is e.g. canceled?

This comment has been minimized.

Copy link
@drfloob

drfloob Oct 2, 2023

Author Member

Not really. The AsyncConnectionAcceptor implemented its own ref counting mechanism here, void Ref(). The normal ref-counting schemes across most of the rest of the codebase return objects whose lifetimes own the count ... those play nicely with move-ing ownership into the lambda.

std::ignore =
engine_->RunAfter(grpc_core::Duration::Seconds(1), [this]() {
retry_timer_armed_.store(false);
if (!handle_->IsHandleShutdown()) {
handle_->SetReadable();
}
Unref();
});
return;
case EAGAIN:
case ECONNABORTED:
handle_->NotifyOnRead(notify_on_accept_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ class PosixEngineListenerImpl
ListenerSocketsContainer::ListenerSocket socket_;
EventHandle* handle_;
PosixEngineClosure* notify_on_accept_;
// Tracks the status of a backup timer to retry accept4 calls after file
// descriptor exhaustion.
std::atomic<bool> retry_timer_armed_{false};
};
class ListenerAsyncAcceptors : public ListenerSocketsContainer {
public:
Expand Down
53 changes: 39 additions & 14 deletions src/core/lib/iomgr/tcp_server_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
//
//

#include <grpc/support/port_platform.h>

#include <utility>

#include <grpc/support/atm.h>

// FIXME: "posix" files shouldn't be depending on _GNU_SOURCE
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif

#include <grpc/support/port_platform.h>

#include "src/core/lib/iomgr/port.h"

#ifdef GRPC_POSIX_SOCKET_TCP_SERVER
Expand All @@ -45,6 +49,7 @@

#include <grpc/byte_buffer.h>
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
Expand Down Expand Up @@ -74,6 +79,8 @@
#include "src/core/lib/transport/error_utils.h"

static std::atomic<int64_t> num_dropped_connections{0};
static constexpr grpc_core::Duration kRetryAcceptWaitTime{
grpc_core::Duration::Seconds(1)};

using ::grpc_event_engine::experimental::EndpointConfig;
using ::grpc_event_engine::experimental::EventEngine;
Expand Down Expand Up @@ -350,22 +357,38 @@ static void on_read(void* arg, grpc_error_handle err) {
if (fd < 0) {
if (errno == EINTR) {
continue;
} else if (errno == EAGAIN || errno == ECONNABORTED ||
errno == EWOULDBLOCK) {
}
// When the process runs out of fds, accept4() returns EMFILE. When this
// happens, the connection is left in the accept queue until either a
// read event triggers the on_read callback, or time has passed and the
// accept should be re-tried regardless. This callback is not cancelled,
// so a spurious wakeup may occur even when there's nothing to accept.
// This is not a performant code path, but if an fd limit has been
// reached, the system is likely in an unhappy state regardless.
if (errno == EMFILE) {
GRPC_LOG_EVERY_N_SEC(1, "%s",
"File descriptor limit reached. Retrying.");
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
if (gpr_atm_full_xchg(&sp->retry_timer_armed, true)) return;
grpc_timer_init(&sp->retry_timer,
grpc_core::Timestamp::Now() + kRetryAcceptWaitTime,
&sp->retry_closure);
return;
}
if (errno == EAGAIN || errno == ECONNABORTED || errno == EWOULDBLOCK) {
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
return;
}
gpr_mu_lock(&sp->server->mu);
if (!sp->server->shutdown_listeners) {
gpr_log(GPR_ERROR, "Failed accept4: %s",
grpc_core::StrError(errno).c_str());
} else {
gpr_mu_lock(&sp->server->mu);
if (!sp->server->shutdown_listeners) {
gpr_log(GPR_ERROR, "Failed accept4: %s",
grpc_core::StrError(errno).c_str());
} else {
// if we have shutdown listeners, accept4 could fail, and we
// needn't notify users
}
gpr_mu_unlock(&sp->server->mu);
goto error;
// if we have shutdown listeners, accept4 could fail, and we
// needn't notify users
}
gpr_mu_unlock(&sp->server->mu);
goto error;
}

if (sp->server->memory_quota->IsMemoryPressureHigh()) {
Expand Down Expand Up @@ -558,6 +581,7 @@ static grpc_error_handle clone_port(grpc_tcp_listener* listener,
sp->port_index = listener->port_index;
sp->fd_index = listener->fd_index + count - i;
GPR_ASSERT(sp->emfd);
grpc_tcp_server_listener_initialize_retry_timer(sp);
while (listener->server->tail->next != nullptr) {
listener->server->tail = listener->server->tail->next;
}
Expand Down Expand Up @@ -791,6 +815,7 @@ static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
if (s->active_ports) {
grpc_tcp_listener* sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_timer_cancel(&sp->retry_timer);
grpc_fd_shutdown(sp->emfd, GRPC_ERROR_CREATE("Server shutdown"));
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/core/lib/iomgr/tcp_server_utils_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/resource_quota/memory_quota.h"

// one listening port
Expand All @@ -52,6 +53,11 @@ typedef struct grpc_tcp_listener {
// identified while iterating through 'next'.
struct grpc_tcp_listener* sibling;
int is_sibling;
// If an accept4() call fails, a timer is started to drain the accept queue in
// case no further connection attempts reach the gRPC server.
grpc_closure retry_closure;
grpc_timer retry_timer;
gpr_atm retry_timer_armed;
} grpc_tcp_listener;

// the overall server
Expand Down Expand Up @@ -139,4 +145,10 @@ grpc_error_handle grpc_tcp_server_prepare_socket(
// Ruturn true if the platform supports ifaddrs
bool grpc_tcp_server_have_ifaddrs(void);

// Initialize (but don't start) the timer and callback to retry accept4() on a
// listening socket after file descriptors have been exhausted. This must be
// called when creating a new listener.
void grpc_tcp_server_listener_initialize_retry_timer(
grpc_tcp_listener* listener);

#endif // GRPC_SRC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H
21 changes: 21 additions & 0 deletions src/core/lib/iomgr/tcp_server_utils_posix_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#include <grpc/support/port_platform.h>

#include <grpc/support/atm.h>

#include "src/core/lib/iomgr/port.h"

#ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON
Expand Down Expand Up @@ -81,6 +83,24 @@ static int get_max_accept_queue_size(void) {
return s_max_accept_queue_size;
}

static void listener_retry_timer_cb(void* arg, grpc_error_handle err) {
// Do nothing if cancelled.
if (!err.ok()) return;
grpc_tcp_listener* listener = static_cast<grpc_tcp_listener*>(arg);
gpr_atm_no_barrier_store(&listener->retry_timer_armed, false);
if (!grpc_fd_is_shutdown(listener->emfd)) {
grpc_fd_set_readable(listener->emfd);
}
}

void grpc_tcp_server_listener_initialize_retry_timer(
grpc_tcp_listener* listener) {
gpr_atm_no_barrier_store(&listener->retry_timer_armed, false);
grpc_timer_init_unset(&listener->retry_timer);
GRPC_CLOSURE_INIT(&listener->retry_closure, listener_retry_timer_cb, listener,
grpc_schedule_on_exec_ctx);
}

static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd,
const grpc_resolved_address* addr,
unsigned port_index,
Expand Down Expand Up @@ -112,6 +132,7 @@ static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd,
sp->server = s;
sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name.c_str(), true);
grpc_tcp_server_listener_initialize_retry_timer(sp);

// Check and set fd as prellocated
if (grpc_tcp_server_pre_allocated_fd(s) == fd) {
Expand Down

0 comments on commit 1e86ca5

Please sign in to comment.