Skip to content

Commit

Permalink
Refactor consumer callbacks to (n)ack *after* execution.
Browse files Browse the repository at this point in the history
The existing behaviour is basically the same as always having
no_ack=TRUE, in that messages are unequivocally acknowledged. However,
it is sometimes desirable to distinguish processing which fails from
that which succeeds, so callbacks which throw an error will now nack
messages instead -- without requeing them.

Since nack without requeue is only *semantically* (not operationally)
different than acknowledgment, existing workflows are unlikely to be
affected.

Documentation has been updated to reflect this behaviour.

Part of #2.
  • Loading branch information
atheriel committed Nov 10, 2020
1 parent a8eb676 commit 5bf76e3
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 36 deletions.
26 changes: 24 additions & 2 deletions R/consume.R
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
#' @details
#'
#' Unless \code{no_ack} is \code{TRUE}, messages are acknowledged automatically
#' after the callback executes.
#' after the callback executes. If it fails, messages are nacked instead before
#' surfacing the underlying error to the caller.
#'
#' @examples
#' \dontrun{
Expand Down Expand Up @@ -79,8 +80,27 @@ amqp_consume <- function(conn, queue, fun, tag = "", no_ack = FALSE,
}
stopifnot(is.function(fun))
args <- amqp_table(...)
if (!no_ack) {
# Wrap fun to control error conditions and ensure messages are acknowledged.
wrapped <- function(msg, chan) {
should_ack <- TRUE
tryCatch({
fun(msg)
}, error = function(cond) {
should_ack <<- FALSE
amqp_nack_on_channel(conn, chan, msg$delivery_tag, requeue = FALSE)
stop(cond)
}, finally = {
if (should_ack) {
amqp_nack_on_channel(conn, chan, msg$delivery_tag)
}
})
}
} else {
wrapped <- fun
}
.Call(
R_amqp_create_consumer, conn$ptr, queue, tag, fun, new.env(), no_ack,
R_amqp_create_consumer, conn$ptr, queue, tag, wrapped, new.env(), no_ack,
exclusive, args$ptr
)
}
Expand Down Expand Up @@ -145,6 +165,8 @@ amqp_listen <- function(conn, timeout = 10L) {
#' \code{\link{amqp_cancel_consumer}} or by garbage collection when the original
#' connection object expires.
#'
#' Messages to background consumers are always acknowledged.
#'
#' @seealso \code{\link{amqp_consume}} to consume messages in the main thread.
#' @export
#' @import later
Expand Down
3 changes: 2 additions & 1 deletion man/amqp_consume.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions man/amqp_consume_later.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 10 additions & 33 deletions src/consume.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,7 @@ SEXP R_amqp_listen(SEXP ptr, SEXP timeout)
tv.tv_usec = 0;
time_t start = time(NULL);

SEXP message, body;
SEXP R_fcall = PROTECT(allocList(2));
SET_TYPEOF(R_fcall, LANGSXP);
SEXP message, body, R_fcall;

int ack;
amqp_rpc_reply_t reply;
Expand Down Expand Up @@ -235,46 +233,25 @@ SEXP R_amqp_listen(SEXP ptr, SEXP timeout)
&env.message.properties));
amqp_destroy_envelope(&env);

/* Acknowledge the message before we run the callback, just in case it
* fails. */

if (!elt->no_ack) {
ack = amqp_basic_ack(conn->conn, elt->chan.chan, env.delivery_tag, 0);

/* We want to handle errors here before running the callback, since it's
* possible the message will be redelivered and the callback may not be
* idempotent. */
switch (ack) {
case AMQP_STATUS_OK:
break;
case AMQP_STATUS_CONNECTION_CLOSED:
/* fallthrough */
case AMQP_STATUS_SOCKET_CLOSED:
/* fallthrough */
case AMQP_STATUS_SOCKET_ERROR:
/* fallthrough */
conn->is_connected = 0;
Rf_error("Disconnected from server.");
break;
default:
Rf_error("Failed to acknowledge message: %s.\n",
amqp_error_string2(ack));
break;
}
}

R_fcall = PROTECT(allocList(elt->no_ack ? 2 : 3));
SET_TYPEOF(R_fcall, LANGSXP);
SETCAR(R_fcall, elt->fun);
SETCADR(R_fcall, message);
if (!elt->no_ack) {
/* The channel is passed to the callback wrapper so that it can be used
* for acknowledgements. */
SEXP chan_ptr = PROTECT(R_MakeExternalPtr(&elt->chan, R_NilValue, R_NilValue));
SETCADDR(R_fcall, chan_ptr);
}
Rf_eval(R_fcall, elt->rho);

UNPROTECT(2);
UNPROTECT(elt->no_ack ? 3 : 4);
}

current_wait = time(NULL) - start;
R_CheckUserInterrupt(); // Escape hatch.
}

UNPROTECT(1);
return R_NilValue;
}

Expand Down

0 comments on commit 5bf76e3

Please sign in to comment.