From df9f69c1279ad22a6b5f30565d850028337e231a Mon Sep 17 00:00:00 2001 From: Ganesh Murthy Date: Wed, 13 Apr 2016 09:20:37 -0400 Subject: [PATCH] DISPATCH-263 - Added a record with key PN_DELIVERY_CTX to hold the message. Dont accept the disposition until the full message has been received --- src/message.c | 13 ++++++++++--- src/message_private.h | 2 ++ src/router_node.c | 13 +++++++++++++ 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/src/message.c b/src/message.c index 11e993cc5b..ed248b32b8 100644 --- a/src/message.c +++ b/src/message.c @@ -694,7 +694,9 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) pn_link_t *link = pn_delivery_link(delivery); ssize_t rc; qd_buffer_t *buf; - qd_message_pvt_t *msg = (qd_message_pvt_t*) pn_delivery_get_context(delivery); + + pn_record_t *record = pn_delivery_attachments(delivery); + qd_message_pvt_t *msg = (qd_message_pvt_t*) pn_record_get(record, PN_DELIVERY_CTX); // // If there is no message associated with the delivery, this is the first time @@ -703,7 +705,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) // if (!msg) { msg = (qd_message_pvt_t*) qd_message(); - pn_delivery_set_context(delivery, (void*) msg); + pn_record_set(record, PN_DELIVERY_CTX, (void*) msg); } // @@ -727,16 +729,21 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) // If we receive PN_EOS, we have come to the end of the message. // if (rc == PN_EOS) { + // + // Clear the value in the record with key PN_DELIVERY_CTX + // + pn_record_set(record, PN_DELIVERY_CTX, 0); + // // If the last buffer in the list is empty, remove it and free it. This // will only happen if the size of the message content is an exact multiple // of the buffer size. // + if (qd_buffer_size(buf) == 0) { DEQ_REMOVE_TAIL(msg->content->buffers); qd_buffer_free(buf); } - pn_delivery_set_context(delivery, 0); char repr[qd_message_repr_len()]; qd_log(log_source, QD_LOG_TRACE, "Received %s on link %s", diff --git a/src/message_private.h b/src/message_private.h index 8ede2c7820..139946d116 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -104,6 +104,8 @@ ALLOC_DECLARE(qd_message_content_t); /** Initialize logging */ void qd_message_initialize(); +PN_HANDLE(PN_DELIVERY_CTX) + ///@} #endif diff --git a/src/router_node.c b/src/router_node.c index ed1b51c2e6..d36fc84f99 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -26,6 +26,7 @@ #include "dispatch_private.h" #include "entity_cache.h" #include "router_private.h" +#include "message_private.h" const char *QD_ROUTER_NODE_TYPE = "router.node"; const char *QD_ROUTER_ADDRESS_TYPE = "router.address"; @@ -370,6 +371,18 @@ static void AMQP_disposition_handler(void* context, qd_link_t *link, pn_delivery if (!delivery) return; + pn_record_t *record = pn_delivery_attachments(pnd); + + // + // On the delivery, we set the message (qd_message_pvt_t) as a record with key PN_DELIVERY_CTX. When the + // complete delivery is received (rc == PN_EOS), we set the value on the PN_DELIVERY_CTX record to zero. + // If the PN_DELIVERY_CTX record is non-zero, it means that the message is still in-flight (complete message + // not received yet) in which case we will ignore the disposition. + // + if (pn_record_get(record, PN_DELIVERY_CTX) != 0) { + return; + } + // // If the delivery is settled, remove the linkage between the PN and QDR deliveries. //