Skip to content

Commit

Permalink
[backport][iomgr][EventEngine] Improve server handling of file descri…
Browse files Browse the repository at this point in the history
…ptor exhaustion (#33667)

Backport of #33656
  • Loading branch information
drfloob committed Jul 12, 2023
1 parent 8a59ef0 commit d4b53c7
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 12 deletions.
1 change: 1 addition & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1937,6 +1937,7 @@ grpc_cc_library(
"posix_event_engine_tcp_socket_utils",
"socket_mutator",
"status_helper",
"time",
"//:event_engine_base_hdrs",
"//:gpr",
],
Expand Down
1 change: 1 addition & 0 deletions src/core/lib/event_engine/posix_engine/posix_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport,
const DNSResolver::ResolverOptions& options) override;
void Run(Closure* closure) override;
void Run(absl::AnyInvocable<void()> closure) override;
// Caution!! The timer implementation cannot create any fds. See #20418.
TaskHandle RunAfter(Duration when, Closure* closure) override;
TaskHandle RunAfter(Duration when,
absl::AnyInvocable<void()> closure) override;
Expand Down
30 changes: 30 additions & 0 deletions src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
#include <sys/socket.h> // IWYU pragma: keep
#include <unistd.h> // IWYU pragma: keep

#include <atomic>
#include <string>
#include <tuple>
#include <type_traits>
#include <utility>

#include "absl/functional/any_invocable.h"
Expand All @@ -41,6 +44,7 @@
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/socket_mutator.h"

namespace grpc_event_engine {
Expand Down Expand Up @@ -136,6 +140,32 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept(
switch (errno) {
case EINTR:
continue;
case EMFILE:
// When the process runs out of fds, accept4() returns EMFILE. When
// this happens, the connection is left in the accept queue until
// either a read event triggers the on_read callback, or time has
// passed and the accept should be re-tried regardless. This callback
// is not cancelled, so a spurious wakeup may occur even when there's
// nothing to accept. This is not a performant code path, but if an fd
// limit has been reached, the system is likely in an unhappy state
// regardless.
GRPC_LOG_EVERY_N_SEC(1, GPR_ERROR, "%s",
"File descriptor limit reached. Retrying.");
handle_->NotifyOnRead(notify_on_accept_);
// Do not schedule another timer if one is already armed.
if (retry_timer_armed_.exchange(true)) return;
// Hold a ref while the retry timer is waiting, to prevent listener
// destruction and the races that would ensue.
Ref();
std::ignore =
engine_->RunAfter(grpc_core::Duration::Seconds(1), [this]() {
retry_timer_armed_.store(false);
if (!handle_->IsHandleShutdown()) {
handle_->SetReadable();
}
Unref();
});
return;
case EAGAIN:
case ECONNABORTED:
handle_->NotifyOnRead(notify_on_accept_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ class PosixEngineListenerImpl
ListenerSocketsContainer::ListenerSocket socket_;
EventHandle* handle_;
PosixEngineClosure* notify_on_accept_;
// Tracks the status of a backup timer to retry accept4 calls after file
// descriptor exhaustion.
std::atomic<bool> retry_timer_armed_{false};
};
class ListenerAsyncAcceptors : public ListenerSocketsContainer {
public:
Expand Down
46 changes: 34 additions & 12 deletions src/core/lib/iomgr/tcp_server_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#include <grpc/support/port_platform.h>

#include <utility>

#include <grpc/support/atm.h>

// FIXME: "posix" files shouldn't be depending on _GNU_SOURCE
Expand Down Expand Up @@ -78,6 +80,8 @@
#include "src/core/lib/transport/error_utils.h"

static std::atomic<int64_t> num_dropped_connections{0};
static constexpr grpc_core::Duration kRetryAcceptWaitTime{
grpc_core::Duration::Seconds(1)};

using ::grpc_event_engine::experimental::EndpointConfig;
using ::grpc_event_engine::experimental::EventEngine;
Expand Down Expand Up @@ -361,22 +365,38 @@ static void on_read(void* arg, grpc_error_handle err) {
if (fd < 0) {
if (errno == EINTR) {
continue;
} else if (errno == EAGAIN || errno == ECONNABORTED ||
errno == EWOULDBLOCK) {
}
// When the process runs out of fds, accept4() returns EMFILE. When this
// happens, the connection is left in the accept queue until either a
// read event triggers the on_read callback, or time has passed and the
// accept should be re-tried regardless. This callback is not cancelled,
// so a spurious wakeup may occur even when there's nothing to accept.
// This is not a performant code path, but if an fd limit has been
// reached, the system is likely in an unhappy state regardless.
if (errno == EMFILE) {
GRPC_LOG_EVERY_N_SEC(1, GPR_ERROR, "%s",
"File descriptor limit reached. Retrying.");
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
if (gpr_atm_full_xchg(&sp->retry_timer_armed, true)) return;
grpc_timer_init(&sp->retry_timer,
grpc_core::Timestamp::Now() + kRetryAcceptWaitTime,
&sp->retry_closure);
return;
}
if (errno == EAGAIN || errno == ECONNABORTED || errno == EWOULDBLOCK) {
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
return;
}
gpr_mu_lock(&sp->server->mu);
if (!sp->server->shutdown_listeners) {
gpr_log(GPR_ERROR, "Failed accept4: %s",
grpc_core::StrError(errno).c_str());
} else {
gpr_mu_lock(&sp->server->mu);
if (!sp->server->shutdown_listeners) {
gpr_log(GPR_ERROR, "Failed accept4: %s",
grpc_core::StrError(errno).c_str());
} else {
// if we have shutdown listeners, accept4 could fail, and we
// needn't notify users
}
gpr_mu_unlock(&sp->server->mu);
goto error;
// if we have shutdown listeners, accept4 could fail, and we
// needn't notify users
}
gpr_mu_unlock(&sp->server->mu);
goto error;
}

if (sp->server->memory_quota->IsMemoryPressureHigh()) {
Expand Down Expand Up @@ -569,6 +589,7 @@ static grpc_error_handle clone_port(grpc_tcp_listener* listener,
sp->port_index = listener->port_index;
sp->fd_index = listener->fd_index + count - i;
GPR_ASSERT(sp->emfd);
grpc_tcp_server_listener_initialize_retry_timer(sp);
while (listener->server->tail->next != nullptr) {
listener->server->tail = listener->server->tail->next;
}
Expand Down Expand Up @@ -817,6 +838,7 @@ static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
if (s->active_ports) {
grpc_tcp_listener* sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_timer_cancel(&sp->retry_timer);
grpc_fd_shutdown(sp->emfd, GRPC_ERROR_CREATE("Server shutdown"));
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/core/lib/iomgr/tcp_server_utils_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/resource_quota/memory_quota.h"

// one listening port
Expand All @@ -52,6 +53,11 @@ typedef struct grpc_tcp_listener {
// identified while iterating through 'next'.
struct grpc_tcp_listener* sibling;
int is_sibling;
// If an accept4() call fails, a timer is started to drain the accept queue in
// case no further connection attempts reach the gRPC server.
grpc_closure retry_closure;
grpc_timer retry_timer;
gpr_atm retry_timer_armed;
} grpc_tcp_listener;

// the overall server
Expand Down Expand Up @@ -139,4 +145,10 @@ grpc_error_handle grpc_tcp_server_prepare_socket(
// Ruturn true if the platform supports ifaddrs
bool grpc_tcp_server_have_ifaddrs(void);

// Initialize (but don't start) the timer and callback to retry accept4() on a
// listening socket after file descriptors have been exhausted. This must be
// called when creating a new listener.
void grpc_tcp_server_listener_initialize_retry_timer(
grpc_tcp_listener* listener);

#endif // GRPC_SRC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H
21 changes: 21 additions & 0 deletions src/core/lib/iomgr/tcp_server_utils_posix_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#include <grpc/support/port_platform.h>

#include <grpc/support/atm.h>

#include "src/core/lib/iomgr/port.h"

#ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON
Expand Down Expand Up @@ -81,6 +83,24 @@ static int get_max_accept_queue_size(void) {
return s_max_accept_queue_size;
}

static void listener_retry_timer_cb(void* arg, grpc_error_handle err) {
// Do nothing if cancelled.
if (!err.ok()) return;
grpc_tcp_listener* listener = static_cast<grpc_tcp_listener*>(arg);
gpr_atm_no_barrier_store(&listener->retry_timer_armed, false);
if (!grpc_fd_is_shutdown(listener->emfd)) {
grpc_fd_set_readable(listener->emfd);
}
}

void grpc_tcp_server_listener_initialize_retry_timer(
grpc_tcp_listener* listener) {
gpr_atm_no_barrier_store(&listener->retry_timer_armed, false);
grpc_timer_init_unset(&listener->retry_timer);
GRPC_CLOSURE_INIT(&listener->retry_closure, listener_retry_timer_cb, listener,
grpc_schedule_on_exec_ctx);
}

static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd,
const grpc_resolved_address* addr,
unsigned port_index,
Expand Down Expand Up @@ -112,6 +132,7 @@ static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd,
sp->server = s;
sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name.c_str(), true);
grpc_tcp_server_listener_initialize_retry_timer(sp);

// Check and set fd as prellocated
if (grpc_tcp_server_pre_allocated_fd(s) == fd) {
Expand Down

0 comments on commit d4b53c7

Please sign in to comment.