Skip to content

Commit

Permalink
[ruby] improve the way completion queue pluck operations handle signa…
Browse files Browse the repository at this point in the history
…ls and process shutdown (grpc#36903)

Fixes the CBF of `src/ruby/end2end/killed_client_thread_test.rb` (failure mode is a hang of the child process that receives the SIGTERM) that has been happening since grpc#36724

So far grpc-ruby CQ pluck operations have so far used a 20ms-interval busy poll to check interrupts in case we've received a signal, handle process shutdown, etc. This means ongoing RPCs will not terminate their CQ operations if we need to terminate the process (the loop simply exits without waiting for the CQ op to finish), causing a leak. Those RPCs can leave refs over their corresponding channels preventing [this](https://github.com/grpc/grpc/blob/8564f72e8e0334c25c480e0aec1df75bdc1fce14/src/ruby/ext/grpc/rb_channel.c#L653) from terminating (the channels don't reach state SHUTDOWN after being destroyed).

Fix is to unblock CQ pluck operations by cancelling calls, and thus allowing the CQ pluck to actually complete its operation. For server listening CQ operations, we unblock them by shutting down the server.

A side win here is to remove the [20ms-interval busy poll](https://github.com/grpc/grpc/blob/8564f72e8e0334c25c480e0aec1df75bdc1fce14/src/ruby/ext/grpc/rb_completion_queue.c#L44) on CQ operations, which was only needed to handle shutdown.

Closes grpc#36903

COPYBARA_INTEGRATE_REVIEW=grpc#36903 from apolcyn:fix_ruby_interrupt bed1ee2
PiperOrigin-RevId: 643046465
  • Loading branch information
apolcyn committed Jun 13, 2024
1 parent 74e5a21 commit 1d6a880
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 56 deletions.
9 changes: 8 additions & 1 deletion src/ruby/ext/grpc/rb_call.c
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,12 @@ struct call_run_batch_args {
run_batch_stack* st;
};

static void cancel_call_unblock_func(void* arg) {
gpr_log(GPR_INFO, "GRPC_RUBY: cancel_call_unblock_func");
grpc_call* call = (grpc_call*)arg;
grpc_call_cancel(call, NULL);
}

static VALUE grpc_rb_call_run_batch_try(VALUE value_args) {
grpc_rb_fork_unsafe_begin();
struct call_run_batch_args* args = (struct call_run_batch_args*)value_args;
Expand All @@ -830,7 +836,8 @@ static VALUE grpc_rb_call_run_batch_try(VALUE value_args) {
grpc_call_error_detail_of(err), err);
}
ev = rb_completion_queue_pluck(args->call->queue, tag,
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
gpr_inf_future(GPR_CLOCK_REALTIME),
cancel_call_unblock_func, args->call->wrapped);
if (!ev.success) {
rb_raise(grpc_rb_eCallError, "call#run_batch failed somehow");
}
Expand Down
47 changes: 15 additions & 32 deletions src/ruby/ext/grpc/rb_completion_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,15 @@ typedef struct next_call_stack {
grpc_event event;
gpr_timespec timeout;
void* tag;
volatile int interrupted;
void (*unblock_func)(void*);
void* unblock_func_arg;
} next_call_stack;

/* Calls grpc_completion_queue_pluck without holding the ruby GIL */
static void* grpc_rb_completion_queue_pluck_no_gil(void* param) {
next_call_stack* const next_call = (next_call_stack*)param;
gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN);
gpr_timespec deadline;
do {
deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment);
next_call->event = grpc_completion_queue_pluck(
next_call->cq, next_call->tag, deadline, NULL);
if (next_call->event.type != GRPC_QUEUE_TIMEOUT ||
gpr_time_cmp(deadline, next_call->timeout) > 0) {
break;
}
} while (!next_call->interrupted);
next_call->event = grpc_completion_queue_pluck(next_call->cq, next_call->tag,
next_call->timeout, NULL);
return NULL;
}

Expand All @@ -65,37 +57,28 @@ void grpc_rb_completion_queue_destroy(grpc_completion_queue* cq) {
grpc_completion_queue_destroy(cq);
}

static void unblock_func(void* param) {
static void outer_unblock_func(void* param) {
next_call_stack* const next_call = (next_call_stack*)param;
next_call->interrupted = 1;
if (next_call->unblock_func == NULL) return;
next_call->unblock_func(next_call->unblock_func_arg);
}

/* Does the same thing as grpc_completion_queue_pluck, while properly releasing
the GVL and handling interrupts */
grpc_event rb_completion_queue_pluck(grpc_completion_queue* queue, void* tag,
gpr_timespec deadline, void* reserved) {
gpr_timespec deadline,
void (*unblock_func)(void* param),
void* unblock_func_arg) {
next_call_stack next_call;
MEMZERO(&next_call, next_call_stack, 1);
next_call.cq = queue;
next_call.timeout = deadline;
next_call.tag = tag;
next_call.event.type = GRPC_QUEUE_TIMEOUT;
(void)reserved;
/* Loop until we finish a pluck without an interruption. The internal
pluck function runs either until it is interrupted or it gets an
event, or time runs out.
The basic reason we need this relatively complicated construction is that
we need to re-acquire the GVL when an interrupt comes in, so that the ruby
interpreter can do what it needs to do with the interrupt. But we also need
to get back to plucking when the interrupt has been handled. */
do {
next_call.interrupted = 0;
rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
(void*)&next_call, unblock_func,
(void*)&next_call);
/* If an interrupt prevented pluck from returning useful information, then
any plucks that did complete must have timed out */
} while (next_call.interrupted && next_call.event.type == GRPC_QUEUE_TIMEOUT);
next_call.unblock_func = unblock_func;
next_call.unblock_func_arg = unblock_func_arg;
rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
(void*)&next_call, outer_unblock_func,
(void*)&next_call);
return next_call.event;
}
8 changes: 7 additions & 1 deletion src/ruby/ext/grpc/rb_completion_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@ void grpc_rb_completion_queue_destroy(grpc_completion_queue* cq);
* Makes the implementation of CompletionQueue#pluck available in other files
*
* This avoids having code that holds the GIL repeated at multiple sites.
*
* unblock_func is invoked with the provided argument to unblock the CQ
* operation in the event of process termination (e.g. a signal), but
* unblock_func may be NULL in which case it's unused.
*/
grpc_event rb_completion_queue_pluck(grpc_completion_queue* queue, void* tag,
gpr_timespec deadline, void* reserved);
gpr_timespec deadline,
void (*unblock_func)(void* param),
void* unblock_func_arg);

#endif /* GRPC_RB_COMPLETION_QUEUE_H_ */
61 changes: 39 additions & 22 deletions src/ruby/ext/grpc/rb_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,28 @@ typedef struct grpc_rb_server {
/* The actual server */
grpc_server* wrapped;
grpc_completion_queue* queue;
int shutdown_and_notify_done;
int destroy_done;
} grpc_rb_server;

static void grpc_rb_server_maybe_shutdown_and_notify(grpc_rb_server* server,
gpr_timespec deadline) {
static void grpc_rb_server_shutdown_and_notify_internal(grpc_rb_server* server,
gpr_timespec deadline) {
grpc_event ev;
void* tag = &ev;
if (!server->shutdown_and_notify_done) {
server->shutdown_and_notify_done = 1;
if (server->wrapped != NULL) {
grpc_server_shutdown_and_notify(server->wrapped, server->queue, tag);
ev = rb_completion_queue_pluck(server->queue, tag, deadline, NULL);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
grpc_server_cancel_all_calls(server->wrapped);
ev = rb_completion_queue_pluck(
server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
}
if (ev.type != GRPC_OP_COMPLETE) {
gpr_log(GPR_INFO,
"GRPC_RUBY: bad grpc_server_shutdown_and_notify result:%d",
ev.type);
}
if (server->wrapped != NULL) {
grpc_server_shutdown_and_notify(server->wrapped, server->queue, tag);
// Following pluck calls will release the GIL and block but cannot
// be interrupted. They should terminate quickly enough though b/c
// we will cancel all server calls after the deadline.
ev = rb_completion_queue_pluck(server->queue, tag, deadline, NULL, NULL);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
grpc_server_cancel_all_calls(server->wrapped);
ev = rb_completion_queue_pluck(
server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME), NULL, NULL);
}
if (ev.type != GRPC_OP_COMPLETE) {
gpr_log(GPR_INFO,
"GRPC_RUBY: bad grpc_server_shutdown_and_notify result:%d",
ev.type);
}
}
}
Expand Down Expand Up @@ -100,7 +99,7 @@ static void grpc_rb_server_free_internal(void* p) {
deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(2, GPR_TIMESPAN));

grpc_rb_server_maybe_shutdown_and_notify(svr, deadline);
grpc_rb_server_shutdown_and_notify_internal(svr, deadline);
grpc_rb_server_maybe_destroy(svr);

xfree(p);
Expand Down Expand Up @@ -132,7 +131,6 @@ static VALUE grpc_rb_server_alloc(VALUE cls) {
grpc_rb_server* wrapper = ALLOC(grpc_rb_server);
wrapper->wrapped = NULL;
wrapper->destroy_done = 0;
wrapper->shutdown_and_notify_done = 0;
return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper);
}

Expand Down Expand Up @@ -192,6 +190,24 @@ struct server_request_call_args {
request_call_stack st;
};

static void shutdown_server_unblock_func(void* arg) {
grpc_rb_server* server = (grpc_rb_server*)arg;
gpr_log(GPR_INFO, "GRPC_RUBY: shutdown_server_unblock_func");
GPR_ASSERT(server->wrapped != NULL);
grpc_event event;
void* tag = &event;
grpc_server_shutdown_and_notify(server->wrapped, server->queue, tag);
grpc_server_cancel_all_calls(server->wrapped);
// Following call is blocking, but should finish quickly since we've
// cancelled all calls.
event = grpc_completion_queue_pluck(server->queue, tag,
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
gpr_log(GPR_INFO,
"GRPC_RUBY: shutdown_server_unblock_func pluck event.type: %d "
"event.success: %d",
event.type, event.success);
}

static VALUE grpc_rb_server_request_call_try(VALUE value_args) {
grpc_rb_fork_unsafe_begin();
struct server_request_call_args* args =
Expand All @@ -215,7 +231,8 @@ static VALUE grpc_rb_server_request_call_try(VALUE value_args) {
}

grpc_event ev = rb_completion_queue_pluck(
args->server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
args->server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME),
shutdown_server_unblock_func, args->server);
if (!ev.success) {
rb_raise(grpc_rb_eCallError, "request_call completion failed");
}
Expand Down Expand Up @@ -288,7 +305,7 @@ static VALUE grpc_rb_server_shutdown_and_notify(VALUE self, VALUE timeout) {
deadline = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
}

grpc_rb_server_maybe_shutdown_and_notify(s, deadline);
grpc_rb_server_shutdown_and_notify_internal(s, deadline);

return Qnil;
}
Expand Down

0 comments on commit 1d6a880

Please sign in to comment.