From 01461e4ebb4b40fe3862a528e5b77493128ad315 Mon Sep 17 00:00:00 2001 From: Ganesh Murthy Date: Thu, 15 Dec 2016 10:29:44 -0500 Subject: [PATCH] DISPATCH-596 - Preserve the error from the rejected state in relayed disposition --- include/qpid/dispatch/router_core.h | 5 +++- src/router_core/error.c | 17 +++++++++++ src/router_core/forwarder.c | 1 + src/router_core/router_core_private.h | 2 ++ src/router_core/transfer.c | 13 ++++++++- src/router_node.c | 24 ++++++++++++++-- tests/system_tests_one_router.py | 41 ++++++++++++++++++++++++++- 7 files changed, 98 insertions(+), 5 deletions(-) diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index cb2d319368..d29d55c7ba 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -348,6 +348,8 @@ qdr_error_t *qdr_error(const char *name, const char *description); void qdr_error_free(qdr_error_t *error); void qdr_error_copy(qdr_error_t *from, pn_condition_t *to); char *qdr_error_description(qdr_error_t *err); +char *qdr_error_name(qdr_error_t *err); +pn_data_t *qdr_error_info(qdr_error_t *err); /** ****************************************************************************** @@ -553,7 +555,7 @@ void qdr_connection_handlers(qdr_core_t *core, ****************************************************************************** */ void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disp, - bool settled, bool ref_given); + bool settled, qdr_error_t *error, bool ref_given); void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context); void *qdr_delivery_get_context(qdr_delivery_t *delivery); @@ -561,6 +563,7 @@ void qdr_delivery_incref(qdr_delivery_t *delivery); void qdr_delivery_decref(qdr_delivery_t *delivery); void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *length); qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery); +qdr_error_t *qdr_delivery_error(const qdr_delivery_t *delivery); /** ****************************************************************************** diff --git a/src/router_core/error.c b/src/router_core/error.c index 56e90796f2..525189434d 100644 --- a/src/router_core/error.c +++ b/src/router_core/error.c @@ -108,3 +108,20 @@ char *qdr_error_description(qdr_error_t *err) return text; } +char *qdr_error_name(qdr_error_t *err) +{ + if (!err || !err->name || !err->name->iterator) + return 0; + int length = qd_iterator_length(err->name->iterator); + char *text = (char*) malloc(length + 1); + qd_iterator_ncopy(err->name->iterator, (unsigned char*) text, length); + text[length] = '\0'; + return text; +} + + +pn_data_t *qdr_error_info(qdr_error_t *err) +{ + return err->info; +} + diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 7ece2f92a2..f47dc694a3 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -111,6 +111,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in dlv->presettled = dlv->settled; *tag = core->next_tag++; dlv->tag_length = 8; + dlv->error = 0; // // Create peer linkage only if the delivery is not settled diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 5962a17113..d10c96d0ad 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -117,6 +117,7 @@ struct qdr_action_t { qdr_delivery_t *delivery; uint64_t disposition; bool settled; + qdr_error_t *error; } delivery; // @@ -213,6 +214,7 @@ struct qdr_delivery_t { qd_iterator_t *to_addr; qd_iterator_t *origin; uint64_t disposition; + qdr_error_t *error; bool settled; bool presettled; qdr_delivery_where_t where; diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 2595a70f43..62c1a20ea7 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -52,6 +52,7 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterato dlv->settled = settled; dlv->presettled = settled; dlv->link_exclusion = link_exclusion; + dlv->error = 0; action->args.connection.delivery = dlv; qdr_action_enqueue(link->core, action); @@ -75,6 +76,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg, dlv->settled = settled; dlv->presettled = settled; dlv->link_exclusion = link_exclusion; + dlv->error = 0; action->args.connection.delivery = dlv; qdr_action_enqueue(link->core, action); @@ -97,6 +99,7 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t * dlv->msg = msg; dlv->settled = settled; dlv->presettled = settled; + dlv->error = 0; action->args.connection.delivery = dlv; action->args.connection.tag_length = tag_length; @@ -221,12 +224,13 @@ void qdr_send_to2(qdr_core_t *core, qd_message_t *msg, const char *addr, bool ex void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition, - bool settled, bool ref_given) + bool settled, qdr_error_t *error, bool ref_given) { qdr_action_t *action = qdr_action(qdr_update_delivery_CT, "update_delivery"); action->args.delivery.delivery = delivery; action->args.delivery.disposition = disposition; action->args.delivery.settled = settled; + action->args.delivery.error = error; // // The delivery's ref_count must be incremented to protect its travels into the @@ -307,6 +311,11 @@ qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery) return delivery->msg; } +qdr_error_t *qdr_delivery_error(const qdr_delivery_t *delivery) +{ + return delivery->error; +} + //================================================================================== // In-Thread Functions @@ -643,6 +652,7 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool bool dlv_moved = false; uint64_t disp = action->args.delivery.disposition; bool settled = action->args.delivery.settled; + qdr_error_t *error = action->args.delivery.error; // // Logic: @@ -659,6 +669,7 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool dlv->disposition = disp; if (peer) { peer->disposition = disp; + peer->error = error; push = true; } } diff --git a/src/router_node.c b/src/router_node.c index abf80f4c98..c122e0fa55 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -409,6 +409,10 @@ static void AMQP_disposition_handler(void* context, qd_link_t *link, pn_delivery { qd_router_t *router = (qd_router_t*) context; qdr_delivery_t *delivery = (qdr_delivery_t*) pn_delivery_get_context(pnd); + pn_disposition_t *disp = pn_delivery_remote(pnd); + pn_condition_t *cond = pn_disposition_condition(disp); + qdr_error_t *error = qdr_error_from_pn(cond); + bool give_reference = false; // @@ -436,7 +440,9 @@ static void AMQP_disposition_handler(void* context, qd_link_t *link, pn_delivery // Update the disposition of the delivery // qdr_delivery_update_disposition(router->router_core, delivery, - pn_delivery_remote_state(pnd), pn_delivery_settled(pnd), + pn_delivery_remote_state(pnd), + pn_delivery_settled(pnd), + error, give_reference); // @@ -913,7 +919,7 @@ static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *d if (!settled && remote_snd_settled) // Tell the core that the delivery has been accepted and settled, since we are settling on behalf of the receiver - qdr_delivery_update_disposition(router->router_core, dlv, PN_ACCEPTED, true, false); + qdr_delivery_update_disposition(router->router_core, dlv, PN_ACCEPTED, true, 0, false); if (settled || remote_snd_settled) pn_delivery_settle(pdlv); @@ -929,6 +935,17 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di if (!pnd) return; + qdr_error_t *error = qdr_delivery_error(dlv); + + if (error) { + pn_condition_t *condition = pn_disposition_condition(pn_delivery_local(pnd)); + const char *name = (const char *)qdr_error_name(error); + const char *description = (const char *)qdr_error_description(error); + pn_condition_set_name(condition, name); + pn_condition_set_description(condition, description); + pn_data_copy(pn_condition_info(condition), qdr_error_info(error)); + } + // // If the disposition has changed, update the proton delivery. // @@ -947,6 +964,9 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di pn_delivery_settle(pnd); qdr_delivery_decref(dlv); } + + if (error) + qdr_error_free(error); } diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py index d993549b30..0ac4f72a6e 100644 --- a/tests/system_tests_one_router.py +++ b/tests/system_tests_one_router.py @@ -18,7 +18,7 @@ # import unittest -from proton import Message, Delivery, PENDING, ACCEPTED, REJECTED +from proton import Condition, Message, Delivery, PENDING, ACCEPTED, REJECTED from system_test import TestCase, Qdrouterd, main_module from proton.handlers import MessagingHandler from proton.reactor import Container, AtMostOnce, AtLeastOnce @@ -1115,6 +1115,11 @@ def test_21_presettled_overflow(self): test.run() self.assertEqual(None, test.error) + def test_reject_disposition(self): + test = RejectDispositionTest(self.address) + test.run() + self.assertTrue(test.received_error) + def test_connection_properties(self): connection = BlockingConnection(self.router.addresses[0], timeout=60, @@ -1530,5 +1535,39 @@ def run(self): Container(self).run() +class RejectDispositionTest(MessagingHandler): + def __init__(self, address): + super(RejectDispositionTest, self).__init__(auto_accept=False) + self.address = address + self.sent = False + self.received_error = False + self.dest = "rejectDispositionTest" + self.error_description = 'you were out of luck this time!' + self.error_name = u'amqp:internal-error' + + + def on_start(self, event): + conn = event.container.connect(self.address) + event.container.create_sender(conn, self.dest) + event.container.create_receiver(conn, self.dest) + + def on_sendable(self, event): + if not self.sent: + event.sender.send(Message(body=u"Hello World!")) + self.sent = True + + def on_rejected(self, event): + if event.delivery.remote.condition.description == self.error_description \ + and event.delivery.remote.condition.name == self.error_name: + self.received_error = True + event.connection.close() + + def on_message(self, event): + event.delivery.local.condition = Condition(self.error_name, self.error_description) + self.reject(event.delivery) + + def run(self): + Container(self).run() + if __name__ == '__main__': unittest.main(main_module())