Skip to content

Commit

Permalink
Repurpose existing ack/nack internals to support explicit channels.
Browse files Browse the repository at this point in the history
This will allow them to actually be used.
  • Loading branch information
atheriel committed Nov 10, 2020
1 parent 20c28fd commit a8eb676
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 20 deletions.
12 changes: 8 additions & 4 deletions R/basic.R
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,29 @@ as.data.frame.amqp_message <- function(x, row.names = NULL, optional = FALSE,
#' handle correctly by "nack"-ing them.
#'
#' @param conn An object returned by \code{\link{amqp_connect}}.
#' @param chan An external pointer representing a channel object.
#' @param delivery_tag The message's numeric identifier.
#' @param multiple When \code{TRUE}, (n)ack messages up-to-and-including this
#' \code{delivery_tag}. By default, we only (n)ack a single message.
#'
#' @noRd
amqp_ack <- function(conn, delivery_tag, multiple = FALSE) {
amqp_ack_on_channel <- function(conn, chan, delivery_tag, multiple = FALSE) {
if (!inherits(conn, "amqp_connection")) {
stop("`conn` is not an amqp_connection object")
}
invisible(.Call(R_amqp_ack, conn$ptr, delivery_tag, multiple))
invisible(.Call(R_amqp_ack_on_channel, conn$ptr, chan, delivery_tag, multiple))
}

#' @param requeue When \code{TRUE}, ask the server to requeue the message.
#' Otherwise, messages are discarded or dead-lettered.
#'
#' @noRd
amqp_nack <- function(conn, delivery_tag, multiple = FALSE, requeue = FALSE) {
amqp_nack_on_channel <- function(conn, chan, delivery_tag, multiple = FALSE,
requeue = FALSE) {
if (!inherits(conn, "amqp_connection")) {
stop("`conn` is not an amqp_connection object")
}
invisible(.Call(R_amqp_ack, conn$ptr, delivery_tag, multiple))
invisible(.Call(
R_amqp_nack_on_channel, conn$ptr, chan, delivery_tag, multiple, requeue
))
}
40 changes: 28 additions & 12 deletions src/basic.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,42 +119,58 @@ SEXP R_amqp_get(SEXP ptr, SEXP queue, SEXP no_ack)
return out;
}

SEXP R_amqp_ack(SEXP ptr, SEXP delivery_tag, SEXP multiple)
SEXP R_amqp_ack_on_channel(SEXP ptr, SEXP chan_ptr, SEXP delivery_tag,
SEXP multiple)
{
connection *conn = (connection *) R_ExternalPtrAddr(ptr);
char errbuff[200];
if (ensure_valid_channel(conn, &conn->chan, errbuff, 200) < 0) {
Rf_error("Failed to find an open channel. %s", errbuff);
return R_NilValue;
channel *chan = (channel *) R_ExternalPtrAddr(chan_ptr);
if (!conn || !chan) {
Rf_error("Failed to acknowledge message(s). Invalid connection or channel object.");
}
if (!conn->is_connected) {
chan->is_open = 0;
Rf_error("Failed to acknowledge message(s). Not connected to a server.");
}
if (!chan->is_open) {
Rf_error("Failed to acknowledge message(s). Channel is closed.");
}
int delivery_tag_ = asInteger(delivery_tag);
int multiple_ = asLogical(multiple);

int result = amqp_basic_ack(conn->conn, conn->chan.chan, delivery_tag_,
int result = amqp_basic_ack(conn->conn, chan->chan, delivery_tag_,
multiple_);
if (result != AMQP_STATUS_OK) {
char errbuff[200];
render_amqp_library_error(result, conn, &conn->chan, errbuff, 200);
Rf_error("Failed to acknowledge message(s). %s", errbuff);
}

return R_NilValue;
}

SEXP R_amqp_nack(SEXP ptr, SEXP delivery_tag, SEXP multiple, SEXP requeue)
SEXP R_amqp_nack_on_channel(SEXP ptr, SEXP chan_ptr, SEXP delivery_tag,
SEXP multiple, SEXP requeue)
{
connection *conn = (connection *) R_ExternalPtrAddr(ptr);
char errbuff[200];
if (ensure_valid_channel(conn, &conn->chan, errbuff, 200) < 0) {
Rf_error("Failed to find an open channel. %s", errbuff);
return R_NilValue;
channel *chan = (channel *) R_ExternalPtrAddr(chan_ptr);
if (!conn || !chan) {
Rf_error("Failed to nack message(s). Invalid connection or channel object.");
}
if (!conn->is_connected) {
chan->is_open = 0;
Rf_error("Failed to nack message(s). Not connected to a server.");
}
if (!chan->is_open) {
Rf_error("Failed to nack message(s). Channel is closed.");
}
int delivery_tag_ = asInteger(delivery_tag);
int multiple_ = asLogical(multiple);
int requeue_ = asLogical(requeue);

int result = amqp_basic_nack(conn->conn, conn->chan.chan, delivery_tag_,
int result = amqp_basic_nack(conn->conn, chan->chan, delivery_tag_,
multiple_, requeue_);
if (result != AMQP_STATUS_OK) {
char errbuff[200];
render_amqp_library_error(result, conn, &conn->chan, errbuff, 200);
Rf_error("Failed to nack message(s). %s", errbuff);
}
Expand Down
4 changes: 2 additions & 2 deletions src/longears.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ static const R_CallMethodDef longears_entries[] = {
{"R_amqp_unbind_exchange", (DL_FUNC) &R_amqp_unbind_exchange, 5},
{"R_amqp_publish", (DL_FUNC) &R_amqp_publish, 7},
{"R_amqp_get", (DL_FUNC) &R_amqp_get, 3},
{"R_amqp_ack", (DL_FUNC) &R_amqp_ack, 3},
{"R_amqp_nack", (DL_FUNC) &R_amqp_nack, 4},
{"R_amqp_ack_on_channel", (DL_FUNC) &R_amqp_ack_on_channel, 4},
{"R_amqp_nack_on_channel", (DL_FUNC) &R_amqp_nack_on_channel, 5},
{"R_amqp_create_consumer", (DL_FUNC) &R_amqp_create_consumer, 8},
{"R_amqp_listen", (DL_FUNC) &R_amqp_listen, 2},
{"R_amqp_consume_later", (DL_FUNC) &R_amqp_consume_later, 8},
Expand Down
4 changes: 2 additions & 2 deletions src/longears.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ SEXP R_amqp_unbind_exchange(SEXP ptr, SEXP dest, SEXP source, SEXP routing_key,

SEXP R_amqp_publish(SEXP ptr, SEXP routing_key, SEXP body, SEXP exchange, SEXP context_type, SEXP mandatory, SEXP immediate);
SEXP R_amqp_get(SEXP ptr, SEXP queue, SEXP no_ack);
SEXP R_amqp_ack(SEXP ptr, SEXP delivery_tag, SEXP multiple);
SEXP R_amqp_nack(SEXP ptr, SEXP delivery_tag, SEXP multiple, SEXP requeue);
SEXP R_amqp_ack_on_channel(SEXP ptr, SEXP chan_ptr, SEXP delivery_tag, SEXP multiple);
SEXP R_amqp_nack_on_channel(SEXP ptr, SEXP chan_ptr, SEXP delivery_tag, SEXP multiple, SEXP requeue);

SEXP R_amqp_create_consumer(SEXP ptr, SEXP queue, SEXP tag, SEXP fun, SEXP rho, SEXP no_ack, SEXP exclusive, SEXP args);
SEXP R_amqp_listen(SEXP ptr, SEXP timeout);
Expand Down

0 comments on commit a8eb676

Please sign in to comment.