Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ack only successful messages and expose ways to control nacks for users that want them #10

Merged
merged 6 commits into from
Mar 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export(amqp_delete_queue)
export(amqp_disconnect)
export(amqp_get)
export(amqp_listen)
export(amqp_nack)
export(amqp_properties)
export(amqp_publish)
export(amqp_reconnect)
Expand Down
23 changes: 23 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,26 @@
# longears 0.2.4.9000

- Consumers created with `amqp_consume()` will now acknowledge messages only
*after* the callback has run. Moreover, when a callback fails the message in
question will be nacked instead. The new `requeue_on_error` parameter controls
whether messages should be redelivered when callbacks error; by default it is
`FALSE`, in line with current behaviour. For complete control over when and
how messages are nacked, one can use the new `amqp_nack()` function, which
works like `stop()` inside a message handler. The following is an example of
complex acknowledgement behaviour for a consumer, which might be used in
combination with a [dead letter exchange](https://www.rabbitmq.com/dlx.html):

```r
consumer <- amqp_consume(conn, queue, function(msg) {
if (msg$redelivered) {
message("can't handle this either, dead-letter it")
amqp_nack(requeue = FALSE)
} else {
stop("can't handle this, maybe someone else can")
}
}, requeue_on_error = TRUE)
```

# longears 0.2.4

- Fixes handling of connection failures in `amqp_listen()`. Previously if you
Expand Down
12 changes: 8 additions & 4 deletions R/basic.R
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,29 @@ as.data.frame.amqp_message <- function(x, row.names = NULL, optional = FALSE,
#' handle correctly by "nack"-ing them.
#'
#' @param conn An object returned by \code{\link{amqp_connect}}.
#' @param chan An external pointer representing a channel object.
#' @param delivery_tag The message's numeric identifier.
#' @param multiple When \code{TRUE}, (n)ack messages up-to-and-including this
#' \code{delivery_tag}. By default, we only (n)ack a single message.
#'
#' @noRd
amqp_ack <- function(conn, delivery_tag, multiple = FALSE) {
amqp_ack_on_channel <- function(conn, chan, delivery_tag, multiple = FALSE) {
if (!inherits(conn, "amqp_connection")) {
stop("`conn` is not an amqp_connection object")
}
invisible(.Call(R_amqp_ack, conn$ptr, delivery_tag, multiple))
invisible(.Call(R_amqp_ack_on_channel, conn$ptr, chan, delivery_tag, multiple))
}

#' @param requeue When \code{TRUE}, ask the server to requeue the message.
#' Otherwise, messages are discarded or dead-lettered.
#'
#' @noRd
amqp_nack <- function(conn, delivery_tag, multiple = FALSE, requeue = FALSE) {
amqp_nack_on_channel <- function(conn, chan, delivery_tag, multiple = FALSE,
requeue = FALSE) {
if (!inherits(conn, "amqp_connection")) {
stop("`conn` is not an amqp_connection object")
}
invisible(.Call(R_amqp_ack, conn$ptr, delivery_tag, multiple))
invisible(.Call(
R_amqp_nack_on_channel, conn$ptr, chan, delivery_tag, multiple, requeue
))
}
54 changes: 51 additions & 3 deletions R/consume.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
#' server will generate one automatically.
#' @param exclusive When \code{TRUE}, request that this consumer has exclusive
#' access to the queue.
#' @param requeue_on_error When \code{TRUE}, errors in \code{fun} will cause the
#' message in question to be redelivered on the queue by the server. This is
#' advisable \emph{only} when you expect that another consumer will be able to
#' handle the same message without issue.
#' @param ... Additional arguments, used to declare broker-specific AMQP
#' extensions. See \strong{Details}.
#'
Expand Down Expand Up @@ -46,7 +50,10 @@
#' @details
#'
#' Unless \code{no_ack} is \code{TRUE}, messages are acknowledged automatically
#' after the callback executes.
#' after the callback executes. If it fails, messages are nacked instead before
#' surfacing the underlying error to the caller. \code{amqp_nack()} can be used
#' instead to manually signal that a message should be nacked and control the
#' redelivery behaviour.
#'
#' @examples
#' \dontrun{
Expand All @@ -73,14 +80,41 @@
#'
#' @export
amqp_consume <- function(conn, queue, fun, tag = "", no_ack = FALSE,
exclusive = FALSE, ...) {
exclusive = FALSE, requeue_on_error = FALSE, ...) {
if (!inherits(conn, "amqp_connection")) {
stop("`conn` is not an amqp_connection object")
}
stopifnot(is.function(fun))
stopifnot(is.logical(requeue_on_error))
args <- amqp_table(...)
if (!no_ack) {
# Wrap fun to control error conditions and ensure messages are acknowledged.
wrapped <- function(msg, chan) {
should_ack <- TRUE
tryCatch({
fun(msg)
}, amqp_nack = function(cond) {
should_ack <<- FALSE
amqp_nack_on_channel(
conn, chan, msg$delivery_tag, requeue = cond$requeue
)
}, error = function(cond) {
should_ack <<- FALSE
amqp_nack_on_channel(
conn, chan, msg$delivery_tag, requeue = requeue_on_error
)
stop(cond)
}, finally = {
if (should_ack) {
amqp_nack_on_channel(conn, chan, msg$delivery_tag)
}
})
}
} else {
wrapped <- fun
}
.Call(
R_amqp_create_consumer, conn$ptr, queue, tag, fun, new.env(), no_ack,
R_amqp_create_consumer, conn$ptr, queue, tag, wrapped, new.env(), no_ack,
exclusive, args$ptr
)
}
Expand Down Expand Up @@ -110,6 +144,18 @@ amqp_listen <- function(conn, timeout = 10L) {
invisible(.Call(R_amqp_listen, conn$ptr, timeout))
}

#' @param requeue When \code{TRUE}, redeliver the message on the queue.
#'
#' @rdname amqp_consume
#' @export
amqp_nack <- function(requeue = FALSE) {
cond <- structure(
list(requeue = requeue),
class = c("amqp_nack", "condition")
)
signalCondition(cond)
}

#' Consume Messages from a Queue, Later
#'
#' @description
Expand Down Expand Up @@ -145,6 +191,8 @@ amqp_listen <- function(conn, timeout = 10L) {
#' \code{\link{amqp_cancel_consumer}} or by garbage collection when the original
#' connection object expires.
#'
#' Messages to background consumers are always acknowledged.
#'
#' @seealso \code{\link{amqp_consume}} to consume messages in the main thread.
#' @export
#' @import later
Expand Down
17 changes: 15 additions & 2 deletions man/amqp_consume.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions man/amqp_consume_later.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 28 additions & 12 deletions src/basic.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,42 +119,58 @@ SEXP R_amqp_get(SEXP ptr, SEXP queue, SEXP no_ack)
return out;
}

SEXP R_amqp_ack(SEXP ptr, SEXP delivery_tag, SEXP multiple)
SEXP R_amqp_ack_on_channel(SEXP ptr, SEXP chan_ptr, SEXP delivery_tag,
SEXP multiple)
{
connection *conn = (connection *) R_ExternalPtrAddr(ptr);
char errbuff[200];
if (ensure_valid_channel(conn, &conn->chan, errbuff, 200) < 0) {
Rf_error("Failed to find an open channel. %s", errbuff);
return R_NilValue;
channel *chan = (channel *) R_ExternalPtrAddr(chan_ptr);
if (!conn || !chan) {
Rf_error("Failed to acknowledge message(s). Invalid connection or channel object.");
}
if (!conn->is_connected) {
chan->is_open = 0;
Rf_error("Failed to acknowledge message(s). Not connected to a server.");
}
if (!chan->is_open) {
Rf_error("Failed to acknowledge message(s). Channel is closed.");
}
int delivery_tag_ = asInteger(delivery_tag);
int multiple_ = asLogical(multiple);

int result = amqp_basic_ack(conn->conn, conn->chan.chan, delivery_tag_,
int result = amqp_basic_ack(conn->conn, chan->chan, delivery_tag_,
multiple_);
if (result != AMQP_STATUS_OK) {
char errbuff[200];
render_amqp_library_error(result, conn, &conn->chan, errbuff, 200);
Rf_error("Failed to acknowledge message(s). %s", errbuff);
}

return R_NilValue;
}

SEXP R_amqp_nack(SEXP ptr, SEXP delivery_tag, SEXP multiple, SEXP requeue)
SEXP R_amqp_nack_on_channel(SEXP ptr, SEXP chan_ptr, SEXP delivery_tag,
SEXP multiple, SEXP requeue)
{
connection *conn = (connection *) R_ExternalPtrAddr(ptr);
char errbuff[200];
if (ensure_valid_channel(conn, &conn->chan, errbuff, 200) < 0) {
Rf_error("Failed to find an open channel. %s", errbuff);
return R_NilValue;
channel *chan = (channel *) R_ExternalPtrAddr(chan_ptr);
if (!conn || !chan) {
Rf_error("Failed to nack message(s). Invalid connection or channel object.");
}
if (!conn->is_connected) {
chan->is_open = 0;
Rf_error("Failed to nack message(s). Not connected to a server.");
}
if (!chan->is_open) {
Rf_error("Failed to nack message(s). Channel is closed.");
}
int delivery_tag_ = asInteger(delivery_tag);
int multiple_ = asLogical(multiple);
int requeue_ = asLogical(requeue);

int result = amqp_basic_nack(conn->conn, conn->chan.chan, delivery_tag_,
int result = amqp_basic_nack(conn->conn, chan->chan, delivery_tag_,
multiple_, requeue_);
if (result != AMQP_STATUS_OK) {
char errbuff[200];
render_amqp_library_error(result, conn, &conn->chan, errbuff, 200);
Rf_error("Failed to nack message(s). %s", errbuff);
}
Expand Down
43 changes: 10 additions & 33 deletions src/consume.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,7 @@ SEXP R_amqp_listen(SEXP ptr, SEXP timeout)
tv.tv_usec = 0;
time_t start = time(NULL);

SEXP message, body;
SEXP R_fcall = PROTECT(allocList(2));
SET_TYPEOF(R_fcall, LANGSXP);
SEXP message, body, R_fcall;

int ack;
amqp_rpc_reply_t reply;
Expand Down Expand Up @@ -235,46 +233,25 @@ SEXP R_amqp_listen(SEXP ptr, SEXP timeout)
&env.message.properties));
amqp_destroy_envelope(&env);

/* Acknowledge the message before we run the callback, just in case it
* fails. */

if (!elt->no_ack) {
ack = amqp_basic_ack(conn->conn, elt->chan.chan, env.delivery_tag, 0);

/* We want to handle errors here before running the callback, since it's
* possible the message will be redelivered and the callback may not be
* idempotent. */
switch (ack) {
case AMQP_STATUS_OK:
break;
case AMQP_STATUS_CONNECTION_CLOSED:
/* fallthrough */
case AMQP_STATUS_SOCKET_CLOSED:
/* fallthrough */
case AMQP_STATUS_SOCKET_ERROR:
/* fallthrough */
conn->is_connected = 0;
Rf_error("Disconnected from server.");
break;
default:
Rf_error("Failed to acknowledge message: %s.\n",
amqp_error_string2(ack));
break;
}
}

R_fcall = PROTECT(allocList(elt->no_ack ? 2 : 3));
SET_TYPEOF(R_fcall, LANGSXP);
SETCAR(R_fcall, elt->fun);
SETCADR(R_fcall, message);
if (!elt->no_ack) {
/* The channel is passed to the callback wrapper so that it can be used
* for acknowledgements. */
SEXP chan_ptr = PROTECT(R_MakeExternalPtr(&elt->chan, R_NilValue, R_NilValue));
SETCADDR(R_fcall, chan_ptr);
}
Rf_eval(R_fcall, elt->rho);

UNPROTECT(2);
UNPROTECT(elt->no_ack ? 3 : 4);
}

current_wait = time(NULL) - start;
R_CheckUserInterrupt(); // Escape hatch.
}

UNPROTECT(1);
return R_NilValue;
}

Expand Down
4 changes: 2 additions & 2 deletions src/longears.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ static const R_CallMethodDef longears_entries[] = {
{"R_amqp_unbind_exchange", (DL_FUNC) &R_amqp_unbind_exchange, 5},
{"R_amqp_publish", (DL_FUNC) &R_amqp_publish, 7},
{"R_amqp_get", (DL_FUNC) &R_amqp_get, 3},
{"R_amqp_ack", (DL_FUNC) &R_amqp_ack, 3},
{"R_amqp_nack", (DL_FUNC) &R_amqp_nack, 4},
{"R_amqp_ack_on_channel", (DL_FUNC) &R_amqp_ack_on_channel, 4},
{"R_amqp_nack_on_channel", (DL_FUNC) &R_amqp_nack_on_channel, 5},
{"R_amqp_create_consumer", (DL_FUNC) &R_amqp_create_consumer, 8},
{"R_amqp_listen", (DL_FUNC) &R_amqp_listen, 2},
{"R_amqp_consume_later", (DL_FUNC) &R_amqp_consume_later, 8},
Expand Down
4 changes: 2 additions & 2 deletions src/longears.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ SEXP R_amqp_unbind_exchange(SEXP ptr, SEXP dest, SEXP source, SEXP routing_key,

SEXP R_amqp_publish(SEXP ptr, SEXP routing_key, SEXP body, SEXP exchange, SEXP context_type, SEXP mandatory, SEXP immediate);
SEXP R_amqp_get(SEXP ptr, SEXP queue, SEXP no_ack);
SEXP R_amqp_ack(SEXP ptr, SEXP delivery_tag, SEXP multiple);
SEXP R_amqp_nack(SEXP ptr, SEXP delivery_tag, SEXP multiple, SEXP requeue);
SEXP R_amqp_ack_on_channel(SEXP ptr, SEXP chan_ptr, SEXP delivery_tag, SEXP multiple);
SEXP R_amqp_nack_on_channel(SEXP ptr, SEXP chan_ptr, SEXP delivery_tag, SEXP multiple, SEXP requeue);

SEXP R_amqp_create_consumer(SEXP ptr, SEXP queue, SEXP tag, SEXP fun, SEXP rho, SEXP no_ack, SEXP exclusive, SEXP args);
SEXP R_amqp_listen(SEXP ptr, SEXP timeout);
Expand Down
Loading