From 5bf76e33f9f04a5813650481a8cc2e1cefee0695 Mon Sep 17 00:00:00 2001 From: Aaron Jacobs Date: Tue, 10 Nov 2020 21:39:51 +0000 Subject: [PATCH] Refactor consumer callbacks to (n)ack *after* execution. The existing behaviour is basically the same as always having no_ack=TRUE, in that messages are unequivocally acknowledged. However, it is sometimes desirable to distinguish processing which fails from that which succeeds, so callbacks which throw an error will now nack messages instead -- without requeing them. Since nack without requeue is only *semantically* (not operationally) different than acknowledgment, existing workflows are unlikely to be affected. Documentation has been updated to reflect this behaviour. Part of #2. --- R/consume.R | 26 +++++++++++++++++++++-- man/amqp_consume.Rd | 3 ++- man/amqp_consume_later.Rd | 2 ++ src/consume.c | 43 +++++++++------------------------------ 4 files changed, 38 insertions(+), 36 deletions(-) diff --git a/R/consume.R b/R/consume.R index 2b7a94c..27de368 100644 --- a/R/consume.R +++ b/R/consume.R @@ -46,7 +46,8 @@ #' @details #' #' Unless \code{no_ack} is \code{TRUE}, messages are acknowledged automatically -#' after the callback executes. +#' after the callback executes. If it fails, messages are nacked instead before +#' surfacing the underlying error to the caller. #' #' @examples #' \dontrun{ @@ -79,8 +80,27 @@ amqp_consume <- function(conn, queue, fun, tag = "", no_ack = FALSE, } stopifnot(is.function(fun)) args <- amqp_table(...) + if (!no_ack) { + # Wrap fun to control error conditions and ensure messages are acknowledged. + wrapped <- function(msg, chan) { + should_ack <- TRUE + tryCatch({ + fun(msg) + }, error = function(cond) { + should_ack <<- FALSE + amqp_nack_on_channel(conn, chan, msg$delivery_tag, requeue = FALSE) + stop(cond) + }, finally = { + if (should_ack) { + amqp_nack_on_channel(conn, chan, msg$delivery_tag) + } + }) + } + } else { + wrapped <- fun + } .Call( - R_amqp_create_consumer, conn$ptr, queue, tag, fun, new.env(), no_ack, + R_amqp_create_consumer, conn$ptr, queue, tag, wrapped, new.env(), no_ack, exclusive, args$ptr ) } @@ -145,6 +165,8 @@ amqp_listen <- function(conn, timeout = 10L) { #' \code{\link{amqp_cancel_consumer}} or by garbage collection when the original #' connection object expires. #' +#' Messages to background consumers are always acknowledged. +#' #' @seealso \code{\link{amqp_consume}} to consume messages in the main thread. #' @export #' @import later diff --git a/man/amqp_consume.Rd b/man/amqp_consume.Rd index 1b84cea..979347e 100644 --- a/man/amqp_consume.Rd +++ b/man/amqp_consume.Rd @@ -68,7 +68,8 @@ incomplete list is as follows: } Unless \code{no_ack} is \code{TRUE}, messages are acknowledged automatically -after the callback executes. +after the callback executes. If it fails, messages are nacked instead before +surfacing the underlying error to the caller. } \examples{ \dontrun{ diff --git a/man/amqp_consume_later.Rd b/man/amqp_consume_later.Rd index 21b8301..9533843 100644 --- a/man/amqp_consume_later.Rd +++ b/man/amqp_consume_later.Rd @@ -52,6 +52,8 @@ This may change in future versions. At present, consumers can only be cancelled by using \code{\link{amqp_cancel_consumer}} or by garbage collection when the original connection object expires. + +Messages to background consumers are always acknowledged. } \seealso{ \code{\link{amqp_consume}} to consume messages in the main thread. diff --git a/src/consume.c b/src/consume.c index 47c9ec3..2e2724d 100644 --- a/src/consume.c +++ b/src/consume.c @@ -139,9 +139,7 @@ SEXP R_amqp_listen(SEXP ptr, SEXP timeout) tv.tv_usec = 0; time_t start = time(NULL); - SEXP message, body; - SEXP R_fcall = PROTECT(allocList(2)); - SET_TYPEOF(R_fcall, LANGSXP); + SEXP message, body, R_fcall; int ack; amqp_rpc_reply_t reply; @@ -235,46 +233,25 @@ SEXP R_amqp_listen(SEXP ptr, SEXP timeout) &env.message.properties)); amqp_destroy_envelope(&env); - /* Acknowledge the message before we run the callback, just in case it - * fails. */ - - if (!elt->no_ack) { - ack = amqp_basic_ack(conn->conn, elt->chan.chan, env.delivery_tag, 0); - - /* We want to handle errors here before running the callback, since it's - * possible the message will be redelivered and the callback may not be - * idempotent. */ - switch (ack) { - case AMQP_STATUS_OK: - break; - case AMQP_STATUS_CONNECTION_CLOSED: - /* fallthrough */ - case AMQP_STATUS_SOCKET_CLOSED: - /* fallthrough */ - case AMQP_STATUS_SOCKET_ERROR: - /* fallthrough */ - conn->is_connected = 0; - Rf_error("Disconnected from server."); - break; - default: - Rf_error("Failed to acknowledge message: %s.\n", - amqp_error_string2(ack)); - break; - } - } - + R_fcall = PROTECT(allocList(elt->no_ack ? 2 : 3)); + SET_TYPEOF(R_fcall, LANGSXP); SETCAR(R_fcall, elt->fun); SETCADR(R_fcall, message); + if (!elt->no_ack) { + /* The channel is passed to the callback wrapper so that it can be used + * for acknowledgements. */ + SEXP chan_ptr = PROTECT(R_MakeExternalPtr(&elt->chan, R_NilValue, R_NilValue)); + SETCADDR(R_fcall, chan_ptr); + } Rf_eval(R_fcall, elt->rho); - UNPROTECT(2); + UNPROTECT(elt->no_ack ? 3 : 4); } current_wait = time(NULL) - start; R_CheckUserInterrupt(); // Escape hatch. } - UNPROTECT(1); return R_NilValue; }