From 729ed4cadfe6b13599da8f33fa666c515f7807cd Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Mon, 22 Oct 2018 12:21:59 -0400 Subject: [PATCH] DISPATCH-1150: Router core messaging client --- include/qpid/dispatch/message.h | 1 + src/CMakeLists.txt | 1 + src/message.c | 21 +- src/router_core/core_client_api.c | 685 ++++++++++++++++++ src/router_core/core_client_api.h | 165 +++++ .../modules/test_hooks/core_test_hooks.c | 225 +++++- tests/CMakeLists.txt | 1 + tests/system_tests_core_client.py | 220 ++++++ 8 files changed, 1300 insertions(+), 19 deletions(-) create mode 100644 src/router_core/core_client_api.c create mode 100644 src/router_core/core_client_api.h create mode 100644 tests/system_tests_core_client.py diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 8af8c9b6f7..48408ab271 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -267,6 +267,7 @@ ssize_t qd_message_field_copy(qd_message_t *msg, qd_message_field_t field, char void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *buffers); void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content); void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2); +void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, qd_composed_field_t *content3); /** Put string representation of a message suitable for logging in buffer. * @return buffer diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index da47242c75..3159db983d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -88,6 +88,7 @@ set(qpid_dispatch_SOURCES router_core/connections.c router_core/core_events.c router_core/core_link_endpoint.c + router_core/core_client_api.c router_core/error.c router_core/exchange_bindings.c router_core/forwarder.c diff --git a/src/message.c b/src/message.c index 66d9955891..83216720c7 100644 --- a/src/message.c +++ b/src/message.c @@ -1968,13 +1968,22 @@ void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *field1, qd_com content->buffers = *field1_buffers; DEQ_INIT(*field1_buffers); + DEQ_APPEND(content->buffers, (*field2_buffers)); +} - qd_buffer_t *buf = DEQ_HEAD(*field2_buffers); - while (buf) { - DEQ_REMOVE_HEAD(*field2_buffers); - DEQ_INSERT_TAIL(content->buffers, buf); - buf = DEQ_HEAD(*field2_buffers); - } + +void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2, qd_composed_field_t *field3) +{ + qd_message_content_t *content = MSG_CONTENT(msg); + content->receive_complete = true; + qd_buffer_list_t *field1_buffers = qd_compose_buffers(field1); + qd_buffer_list_t *field2_buffers = qd_compose_buffers(field2); + qd_buffer_list_t *field3_buffers = qd_compose_buffers(field3); + + content->buffers = *field1_buffers; + DEQ_INIT(*field1_buffers); + DEQ_APPEND(content->buffers, (*field2_buffers)); + DEQ_APPEND(content->buffers, (*field3_buffers)); } diff --git a/src/router_core/core_client_api.c b/src/router_core/core_client_api.c new file mode 100644 index 0000000000..b5e8bbc336 --- /dev/null +++ b/src/router_core/core_client_api.c @@ -0,0 +1,685 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "core_client_api.h" +#include "core_link_endpoint.h" + +#include +#include +#include +#include + +#define CORRELATION_ID_LEN 32 +const char *CORRELATION_ID_FMT = "client-%016"PRIxPTR"%08"PRIx32; + +typedef struct qdrc_client_request_t qdrc_client_request_t; +struct qdrc_client_request_t { + DEQ_LINKS_N(SEND_Q, qdrc_client_request_t); + DEQ_LINKS_N(UNSETTLED, qdrc_client_request_t); + DEQ_LINKS_N(REPLY, qdrc_client_request_t); + + qdrc_client_t *client; + uintptr_t req_context; + + char correlation_id[CORRELATION_ID_LEN]; + qd_iterator_t *correlation_key; + qd_hash_handle_t *hash_handle; + qdr_delivery_t *delivery; + + qd_composed_field_t *app_properties; + qd_composed_field_t *body; + + bool on_send_queue; // to be sent + bool on_unsettled_list; // awaiting disposition + bool on_reply_list; // awaiting reply message + + qdrc_client_on_reply_CT_t on_reply_cb; + qdrc_client_on_ack_CT_t on_ack_cb; + qdrc_client_request_done_CT_t done_cb; +}; +DEQ_DECLARE(qdrc_client_request_t, qdrc_client_request_list_t); +ALLOC_DEFINE(qdrc_client_request_t); + + +struct qdrc_client_t { + qdr_core_t *core; + qd_hash_t *correlations; + + qdrc_endpoint_t *sender; // for outgoing management request messages + qdrc_endpoint_t *receiver; // for incoming management reply messages + bool sender_up; + bool receiver_up; + bool active; + char *reply_to; + + qdrc_client_request_list_t send_queue; + qdrc_client_request_list_t unsettled_list; + qdrc_client_request_list_t reply_list; // for expected reply + + uint32_t next_cid; // correlation id generation + int rx_credit_max; + int rx_credit; + int tx_credit; + + uintptr_t user_context; + qdrc_client_on_state_CT_t on_state_cb; + qdrc_client_on_flow_CT_t on_flow_cb; + +}; +ALLOC_DEFINE(qdrc_client_t); + + +static void _send_request_CT(qdrc_client_t *client, + qdrc_client_request_t *req); +static void _flush_send_queue_CT(qdrc_client_t *client); + +static void _state_updated_CT(qdrc_client_t *client); + +static void _sender_second_attach_CT(void *client_context, + qdr_terminus_t *remote_source, + qdr_terminus_t *remote_target); +static void _receiver_second_attach_CT(void *client_context, + qdr_terminus_t *remote_source, + qdr_terminus_t *remote_target); +static void _sender_flow_CT(void *client_context, + int available_credit, + bool drain); +static void _sender_update_CT(void *client_context, + qdr_delivery_t *delivery, + bool settled, + uint64_t disposition); +static void _receiver_transfer_CT(void *client_context, + qdr_delivery_t *delivery, + qd_message_t *message); +static void _sender_detached_CT(void *client_context, + qdr_error_t *error); +static void _receiver_detached_CT(void *client_context, + qdr_error_t *error); +static void _sender_cleanup_CT(void *client_context); +static void _receiver_cleanup_CT(void *client_context); + +static void _free_request_CT(qdrc_client_t *client, + qdrc_client_request_t *req, + const char *error); +static qd_message_t *_create_message_CT(qdrc_client_t *client, + qdrc_client_request_t *req); + + +static qdrc_endpoint_desc_t sender_endpoint = { + .label = "core client - sender", + .on_second_attach = _sender_second_attach_CT, + .on_flow = _sender_flow_CT, + .on_update = _sender_update_CT, + .on_first_detach = _sender_detached_CT, + .on_cleanup = _sender_cleanup_CT +}; + +static qdrc_endpoint_desc_t receiver_endpoint = { + .label = "core client - receiver", + .on_second_attach = _receiver_second_attach_CT, + .on_transfer = _receiver_transfer_CT, + .on_first_detach = _receiver_detached_CT, + .on_cleanup = _receiver_cleanup_CT +}; + +qdrc_client_t *qdrc_client_CT(qdr_core_t *core, + qdr_connection_t *conn, + qdr_terminus_t *target, + int credit_window, + uintptr_t user_context, + qdrc_client_on_state_CT_t on_state_cb, + qdrc_client_on_flow_CT_t on_flow_cb) +{ + qdrc_client_t *client = new_qdrc_client_t(); + if (!client) + return NULL; + + ZERO(client); + client->core = core; + client->correlations = qd_hash(6, 4, 0); + client->next_cid = rand(); + client->rx_credit_max = credit_window; + client->user_context = user_context; + client->on_state_cb = on_state_cb; + client->on_flow_cb = on_flow_cb; + + // create links + client->sender = qdrc_endpoint_create_link_CT(core, + conn, + QD_OUTGOING, + NULL, // source terminus + target, + &sender_endpoint, + client); + // create receiver link for replies from interior management + qdr_terminus_t *source = qdr_terminus(0); + source->dynamic = true; + client->receiver = qdrc_endpoint_create_link_CT(core, + conn, + QD_INCOMING, + source, + NULL, // target terminus + &receiver_endpoint, + client); + qd_log(core->log, QD_LOG_TRACE, + "New core client created c=%p", client); + return client; +} + + +void qdrc_client_free_CT(qdrc_client_t *client) +{ + if (!client) + return; + + if (client->sender) { + client->sender = NULL; + } + + if (client->receiver) { + client->receiver = NULL; + } + + qdrc_client_request_t *req = DEQ_HEAD(client->send_queue); + while (req) { + _free_request_CT(client, req, NULL); // removes from send_queue + req = DEQ_HEAD(client->send_queue); + } + + req = DEQ_HEAD(client->unsettled_list); + while (req) { + _free_request_CT(client, req, NULL); // removes from unsettled_list + req = DEQ_HEAD(client->unsettled_list); + } + + req = DEQ_HEAD(client->reply_list); + while (req) { + _free_request_CT(client, req, NULL); // removes from reply_list + req = DEQ_HEAD(client->reply_list); + } + + qd_hash_free(client->correlations); + free(client->reply_to); + + qd_log(client->core->log, QD_LOG_TRACE, + "Core client freed c=%p", client); + + free_qdrc_client_t(client); +} + + +// send a message +int qdrc_client_request_CT(qdrc_client_t *client, + uintptr_t request_context, + qd_composed_field_t *app_properties, + qd_composed_field_t *body, + qdrc_client_on_reply_CT_t on_reply_cb, + qdrc_client_on_ack_CT_t on_ack_cb, + qdrc_client_request_done_CT_t done_cb) +{ + qd_log(client->core->log, QD_LOG_TRACE, + "New core client request created c=%p, rc=%"PRIuPTR")", + client, request_context); + + qdrc_client_request_t *req = new_qdrc_client_request_t(); + ZERO(req); + req->client = client; + req->req_context = request_context; + req->app_properties = app_properties; + req->body = body; + req->on_reply_cb = on_reply_cb; + req->on_ack_cb = on_ack_cb; + req->done_cb = done_cb; + + _send_request_CT(client, req); + return 0; +} + + +// attempt to send a new request message +static void _send_request_CT(qdrc_client_t *client, + qdrc_client_request_t *req) +{ + DEQ_INSERT_TAIL_N(SEND_Q, client->send_queue, req); + req->on_send_queue = true; + _flush_send_queue_CT(client); +} + + +// send any pending messages on the send_queue +static void _flush_send_queue_CT(qdrc_client_t *client) +{ + qdrc_client_request_t *req = DEQ_HEAD(client->send_queue); + + while (req && client->tx_credit > 0) { + bool presettled = (req->on_ack_cb == NULL); + + if (req->on_reply_cb && !client->reply_to) { + // cannot send until receiver comes up + break; + } + + qd_message_t *msg = _create_message_CT(client, req); + req->delivery = qdrc_endpoint_delivery_CT(client->core, + client->sender, + msg); + qdrc_endpoint_send_CT(client->core, + client->sender, + req->delivery, + presettled); + DEQ_REMOVE_HEAD_N(SEND_Q, client->send_queue); + req->on_send_queue = false; + + qd_log(client->core->log, QD_LOG_TRACE, + "Core client request sent c=%p, rc=%"PRIuPTR" dlv=%p cid=%s)", + client, req->req_context, req->delivery, + *req->correlation_id ? req->correlation_id : ""); + + if (!presettled && req->on_ack_cb) { + DEQ_INSERT_TAIL_N(UNSETTLED, client->unsettled_list, req); + req->on_unsettled_list = true; + } + if (req->on_reply_cb) { + DEQ_INSERT_TAIL_N(REPLY, client->reply_list, req); + req->on_reply_list = true; + } + if (!req->on_reply_list && !req->on_unsettled_list) { + // "Fire and forget" no need to keep the request any longer + _free_request_CT(client, req, NULL); + } + client->tx_credit -= 1; + req = DEQ_HEAD(client->send_queue); + } +} + + +static void _free_request_CT(qdrc_client_t *client, + qdrc_client_request_t *req, + const char *error) +{ + if (req->on_send_queue) + DEQ_REMOVE_N(SEND_Q, client->send_queue, req); + if (req->on_unsettled_list) + DEQ_REMOVE_N(UNSETTLED, client->unsettled_list, req); + if (req->on_reply_list) + DEQ_REMOVE_N(REPLY, client->reply_list, req); + + if (req->hash_handle) { + qd_hash_remove_by_handle(client->correlations, req->hash_handle); + qd_hash_handle_free(req->hash_handle); + } + + if (req->correlation_key) { + qd_iterator_free(req->correlation_key); + } + + if (req->body) { + qd_compose_free(req->body); + } + + if (req->app_properties) { + qd_compose_free(req->app_properties); + } + + // notify user that the request has completed + if (req->done_cb) { + req->done_cb(client->core, + client, + client->user_context, + req->req_context, + error); + } + + qd_log(client->core->log, QD_LOG_TRACE, + "Freeing core client request c=%p, rc=%"PRIuPTR")", + client, req->req_context); + + free_qdrc_client_request_t(req); +} + + +// issue state change callbacks if necessary +static void _state_updated_CT(qdrc_client_t *client) +{ + if (client->on_state_cb) { + bool new_state = (client->sender_up && client->receiver_up); + if (new_state != client->active) { + client->active = new_state; + client->on_state_cb(client->core, client, client->user_context, new_state); + if (client->active && client->tx_credit > 0) + client->on_flow_cb(client->core, + client, + client->user_context, + client->tx_credit, + false); + } + } +} + + +static void _sender_second_attach_CT(void *context, + qdr_terminus_t *remote_source, + qdr_terminus_t *remote_target) +{ + qdrc_client_t *client = (qdrc_client_t *)context; + + qd_log(client->core->log, QD_LOG_TRACE, + "Core client sender 2nd attach c=%p", client); + + if (!client->sender_up) { + client->sender_up = true; + _state_updated_CT(client); + } + qdr_terminus_free(remote_source); + qdr_terminus_free(remote_target); +} + + +static void _receiver_second_attach_CT(void *context, + qdr_terminus_t *remote_source, + qdr_terminus_t *remote_target) +{ + qdrc_client_t *client = (qdrc_client_t *)context; + + qd_log(client->core->log, QD_LOG_TRACE, + "Core client receiver 2nd attach c=%p", client); + + if (!client->receiver_up) { + client->receiver_up = true; + client->reply_to = qdr_field_copy(remote_source->address); + client->rx_credit = client->rx_credit_max; + qdrc_endpoint_flow_CT(client->core, client->receiver, client->rx_credit, false); + _state_updated_CT(client); + } + qdr_terminus_free(remote_source); + qdr_terminus_free(remote_target); +} + + +static void _sender_flow_CT(void *context, + int available_credit, + bool drain) +{ + qdrc_client_t *client = (qdrc_client_t *)context; + qdr_core_t *core = client->core; + + qd_log(core->log, QD_LOG_TRACE, + "Core client sender flow granted c=%p credit=%d d=%s", + client, available_credit, (drain) ? "T" : "F"); + client->tx_credit = available_credit; + if (available_credit > 0) { + _flush_send_queue_CT(client); + } + + if (client->active && client->on_flow_cb) + client->on_flow_cb(core, + client, + client->user_context, + client->tx_credit, + drain); + if (drain) { + client->tx_credit = 0; + } +} + + +// disposition update on sent request +static void _sender_update_CT(void *context, + qdr_delivery_t *delivery, + bool settled, + uint64_t disposition) +{ + qdrc_client_t *client = (qdrc_client_t *)context; + + qd_log(client->core->log, QD_LOG_TRACE, + "Core client sender update c=%p dlv=%p d=%"PRIx64" %s", + client, delivery, disposition, + settled ? "settled" : "unsettled"); + + if (disposition) { + // should be on unsettled list + qdrc_client_request_t *req = DEQ_HEAD(client->unsettled_list); + DEQ_FIND_N(UNSETTLED, req, (req->delivery == delivery)); + if (req) { + assert(req->on_ack_cb); + req->on_ack_cb(client->core, + client, + client->user_context, + req->req_context, + disposition); + // remove from unsettled list + DEQ_REMOVE_N(UNSETTLED, client->unsettled_list, req); + req->on_unsettled_list = false; + + if (!req->on_reply_list || disposition != PN_ACCEPTED) { + // no reply is coming, release the request + _free_request_CT(client, req, NULL); + } + } else { + // may have received reply so this is not an error + qd_log(client->core->log, QD_LOG_DEBUG, + "Core client could not find request for disposition update" + " client=%p delivery=%p", + client, delivery); + } + } +} + + +static void _receiver_transfer_CT(void *client_context, + qdr_delivery_t *delivery, + qd_message_t *message) +{ + qdrc_client_t *client = (qdrc_client_t *)client_context; + qdr_core_t *core = client->core; + bool complete = qd_message_receive_complete(message); + + qd_log(core->log, QD_LOG_TRACE, + "Core client received msg c=%p complete=%s", + client, complete ? "T" : "F"); + + if (complete) { + uint64_t disposition = PN_ACCEPTED; + + // lookup the corresponding request using the correlation-id + + qd_iterator_t *cid_iter = qd_message_field_iterator(message, + QD_FIELD_CORRELATION_ID); + if (cid_iter) { + qdrc_client_request_t *req = NULL; + qd_hash_retrieve(client->correlations, cid_iter, (void **)&req); + qd_iterator_free(cid_iter); + if (req) { + + qd_log(core->log, QD_LOG_TRACE, + "Core client received msg c=%p rc=%"PRIuPTR" cid=%s", + client, req->req_context, req->correlation_id); + + qd_hash_remove_by_handle(client->correlations, req->hash_handle); + qd_hash_handle_free(req->hash_handle); + req->hash_handle = 0; + + assert(req->on_reply_list); + DEQ_REMOVE_N(REPLY, client->reply_list, req); + req->on_reply_list = false; + + qd_iterator_t *app_props = qd_message_field_iterator(message, QD_FIELD_APPLICATION_PROPERTIES); + qd_iterator_t *body = qd_message_field_iterator(message, QD_FIELD_BODY); + + assert(req->on_reply_cb); + disposition = req->on_reply_cb(core, + client, + client->user_context, + req->req_context, + app_props, + body); + // should we keep req if still waiting for disposition update + // on sent message? I say "no"... + _free_request_CT(client, req, NULL); + } else { + // request may be old... + qd_log(core->log, QD_LOG_WARNING, + "Core client reply message dropped: no matching correlation-id"); + disposition = PN_ACCEPTED; + } + } else { + qd_log(core->log, QD_LOG_ERROR, "Invalid core client reply message: no correlation-id"); + disposition = PN_REJECTED; + } + + qdrc_endpoint_settle_CT(core, delivery, disposition); + + if (--client->rx_credit < (client->rx_credit_max / 2)) { + int prev = client->rx_credit; + client->rx_credit += client->rx_credit_max - client->rx_credit; + + qd_log(core->log, QD_LOG_TRACE, + "Client issuing flow on rx link: c=%p old value=%d credit=%d (max=%d)", + client, prev, client->rx_credit, client->rx_credit_max); + + qdrc_endpoint_flow_CT(core, + client->receiver, + client->rx_credit, + false); + } + } +} + + +static void _sender_detached_CT(void *client_context, + qdr_error_t *error) +{ + qdrc_client_t *client = (qdrc_client_t *)client_context; + + qd_log(client->core->log, QD_LOG_TRACE, + "Core client sender detached c=%p", client); + + if (client->sender_up) { + client->sender_up = false; + client->tx_credit = 0; + + // abort all pending and unsettled requests + // + qdrc_client_request_t *req = DEQ_HEAD(client->send_queue); + while (req) { + _free_request_CT(client, req, "link detached"); // removes from send_queue + req = DEQ_HEAD(client->send_queue); + } + req = DEQ_HEAD(client->unsettled_list); + while (req) { + _free_request_CT(client, req, "link detached"); // removes from unsettled_list + req = DEQ_HEAD(client->unsettled_list); + } + + _state_updated_CT(client); + } + client->sender = NULL; +} + + +static void _receiver_detached_CT(void *client_context, + qdr_error_t *error) +{ + qdrc_client_t *client = (qdrc_client_t *)client_context; + + qd_log(client->core->log, QD_LOG_TRACE, + "Core client receiver detached c=%p", client); + + if (client->receiver_up) { + client->receiver_up = false; + free(client->reply_to); + client->reply_to = 0; + + // abort all waiting requests + // + qdrc_client_request_t *req = DEQ_HEAD(client->reply_list); + while (req) { + _free_request_CT(client, req, "link detached"); // removes from reply list + req = DEQ_HEAD(client->reply_list); + } + + _state_updated_CT(client); + } + client->receiver = NULL; +} + + +static void _sender_cleanup_CT(void *client_context) +{ + _sender_detached_CT(client_context, NULL); +} + + +static void _receiver_cleanup_CT(void *client_context) +{ + _receiver_detached_CT(client_context, NULL); +} + + +static qd_message_t *_create_message_CT(qdrc_client_t *client, + qdrc_client_request_t *req) +{ + // build necessary message headers, etc: + qd_composed_field_t *fld = qd_compose(QD_PERFORMATIVE_HEADER, 0); + qd_compose_start_list(fld); + qd_compose_insert_bool(fld, 0); // durable + qd_compose_end_list(fld); + + if (req->on_reply_cb) { + // generate unique correlation-id + snprintf(req->correlation_id, + CORRELATION_ID_LEN, CORRELATION_ID_FMT, + (uint64_t)time(NULL), client->next_cid++); + req->correlation_key = qd_iterator_string(req->correlation_id, + ITER_VIEW_ALL); + qd_hash_insert(client->correlations, + req->correlation_key, + req, + &req->hash_handle); + + fld = qd_compose(QD_PERFORMATIVE_PROPERTIES, fld); + qd_compose_start_list(fld); + qd_compose_insert_null(fld); // message-id + qd_compose_insert_null(fld); // user-id + qd_compose_insert_null(fld); // to + qd_compose_insert_null(fld); // subject + assert(client->reply_to); + qd_compose_insert_string(fld, client->reply_to); + qd_compose_insert_string(fld, req->correlation_id); + qd_compose_end_list(fld); + } + + qd_message_t *message = qd_message(); + if (req->app_properties && req->body) { + qd_message_compose_4(message, fld, req->app_properties, req->body); + } else if (req->body) { + qd_message_compose_3(message, fld, req->body); + } else if (req->app_properties) { + qd_message_compose_3(message, fld, req->app_properties); + } else { + qd_message_compose_2(message, fld); + } + qd_compose_free(fld); + qd_compose_free(req->body); + req->body = 0; + qd_compose_free(req->app_properties); + req->app_properties = 0; + + return message; +} diff --git a/src/router_core/core_client_api.h b/src/router_core/core_client_api.h new file mode 100644 index 0000000000..1cf5b20f61 --- /dev/null +++ b/src/router_core/core_client_api.h @@ -0,0 +1,165 @@ +#ifndef qd_router_core_client_api_h +#define qd_router_core_client_api_h 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// +// A simple request/response client messaging API using core terminated links +// + +#include "router_core_private.h" + + +typedef struct qdrc_client_t qdrc_client_t; + + +// +// client application callbacks +// + + +/** + * Client links have changed state + * + * @param core + * @param client - core client returned by qdrc_client_CT() + * @param user_context - as passed to qdrc_client_CT() + * @param active - true if both links are opened, else false + */ +typedef void (*qdrc_client_on_state_CT_t)(qdr_core_t *core, + qdrc_client_t *client, + uintptr_t user_context, + bool active); + + +/** + * Credit has become available for sending messages + * + * @param core + * @param client - core client returned by qdrc_client_CT() + * @param user_context - as passed to qdrc_client_CT() + * @param available_credit - current credit allocation + * @param drain - if true client must consume all credit + */ +typedef void (*qdrc_client_on_flow_CT_t)(qdr_core_t *core, + qdrc_client_t *client, + uintptr_t user_context, + int available_credit, + bool drain); + +/** + * Final disposition received for sent message + * + * @param core + * @param client - core client returned by qdrc_client_CT() + * @param user_context - as passed to qdrc_client_CT() + * @param request_context - as passed to qdrc_client_request_CT() + * @param disposition - for the associated sent message + */ +typedef void (*qdrc_client_on_ack_CT_t)(qdr_core_t *core, + qdrc_client_t *client, + uintptr_t user_context, + uintptr_t request_context, + uint64_t disposition); + +/** + * A reply message has arrived for a given request + * + * @param core + * @param client - core client returned by qdrc_client_CT() + * @param user_context - as passed to qdrc_client_CT() + * @param app_properties - application properties header from reply. + * Ownership is handed off to this callback - user must free the + * iterator when done. + * @param body - message body. Ownership is handed off to this callback + * - user must free the iterator when done. + * @return final disposition for the received reply message + */ +typedef uint64_t (*qdrc_client_on_reply_CT_t)(qdr_core_t *core, + qdrc_client_t *client, + uintptr_t user_context, + uintptr_t request_context, + qd_iterator_t *app_properties, + qd_iterator_t *body); + + +typedef void (*qdrc_client_request_done_CT_t)(qdr_core_t *core, + qdrc_client_t *client, + uintptr_t user_context, + uintptr_t request_context, + const char *error); + + +/** + * Create a request/response client + * + * @param core + * @param conn - connection over which links will be created. + * @param target - for messages sent by this client. + * @param credit_window - for receiver link (credit loop) + * @param user_context - context that will be passed to callbacks + * @param on_state_cb - callback when link state changes + * @param on_flow_cb - callback when sender credit is updated. + * @return a new core client + */ +qdrc_client_t *qdrc_client_CT(qdr_core_t *core, + qdr_connection_t *conn, + qdr_terminus_t *target, + int credit_window, + uintptr_t user_context, + qdrc_client_on_state_CT_t on_state_cb, + qdrc_client_on_flow_CT_t on_flow_cb); + + +/** + * Free a request/response client + * + * @param client - as returned by qdrc_client_CT() + */ +void qdrc_client_free_CT(qdrc_client_t *client); + + +/** + * Send a request message + * + * @param client - as returned by qdrc_client_CT() + * @param request_context - context for this request that will be passed to + * callbacks + + * @param app_properties - the application properties for the sent message. + * Ownership is transferred to this call - the caller must not reference the + * composed field on return. + + * @param body - the message body for the sent message. Ownership is transferred + * to this call - the caller must not reference the composed field on return. + + * @param on_reply_cb - (optional) invoked when reply message arrives + * @param on_ack_cb - (optional) invoked when sent message disposition is set + * @param done_cb - (optional) called once request is done (for cleanup) + * @return zero on success. + */ +int qdrc_client_request_CT(qdrc_client_t *client, + uintptr_t request_context, + qd_composed_field_t *app_properties, + qd_composed_field_t *body, + qdrc_client_on_reply_CT_t on_reply_cb, + qdrc_client_on_ack_CT_t on_ack_cb, + qdrc_client_request_done_CT_t done_cb); + +#endif // #define qd_router_core_client_api_h 1 diff --git a/src/router_core/modules/test_hooks/core_test_hooks.c b/src/router_core/modules/test_hooks/core_test_hooks.c index d1731174fa..19f31afd2d 100644 --- a/src/router_core/modules/test_hooks/core_test_hooks.c +++ b/src/router_core/modules/test_hooks/core_test_hooks.c @@ -17,10 +17,13 @@ * under the License. */ +#undef NDEBUG // test code - uses asserts + #include "qpid/dispatch/ctools.h" #include "qpid/dispatch/message.h" #include "qpid/dispatch/compose.h" #include "core_link_endpoint.h" +#include "core_client_api.h" #include "module.h" #include #include @@ -36,6 +39,7 @@ typedef enum { typedef struct test_module_t test_module_t; typedef struct test_node_t test_node_t; +typedef struct test_client_t test_client_t; typedef struct test_endpoint_t { DEQ_LINKS(struct test_endpoint_t); @@ -61,13 +65,14 @@ struct test_node_t { }; struct test_module_t { - qdr_core_t *core; - test_node_t *echo_node; - test_node_t *deny_node; - test_node_t *sink_node; - test_node_t *source_node; - test_node_t *source_ps_node; - test_node_t *discard_node; + qdr_core_t *core; + test_node_t *echo_node; + test_node_t *deny_node; + test_node_t *sink_node; + test_node_t *source_node; + test_node_t *source_ps_node; + test_node_t *discard_node; + test_client_t *test_client; }; @@ -383,7 +388,7 @@ static qdrc_endpoint_desc_t descriptor = {"Core Test Hooks", on_first_attach, on on_transfer, on_first_detach, on_second_detach, on_cleanup}; -static test_module_t *qdrc_test_hooks_core_endpoint_setup(qdr_core_t *core) +static test_module_t *qdrc_test_hooks_core_endpoint_setup(qdr_core_t *core, test_module_t *module) { char *echo_address = "org.apache.qpid.dispatch.router/test/echo"; char *deny_address = "org.apache.qpid.dispatch.router/test/deny"; @@ -392,9 +397,6 @@ static test_module_t *qdrc_test_hooks_core_endpoint_setup(qdr_core_t *core) char *source_ps_address = "org.apache.qpid.dispatch.router/test/source_ps"; char *discard_address = "org.apache.qpid.dispatch.router/test/discard"; - test_module_t *module = NEW(test_module_t); - - module->core = core; module->echo_node = NEW(test_node_t); module->deny_node = NEW(test_node_t); module->sink_node = NEW(test_node_t); @@ -465,6 +467,195 @@ static void qdrc_test_hooks_core_endpoint_finalize(test_module_t *module) } +// +// Tests for in-core messaging client API +// +// Note well: this test client is used by the system_tests_core_client.py unit +// tests. Any changes here may require updates to those tests. +// + +struct test_client_t { + test_module_t *module; + qdrc_event_subscription_t *conn_events; + qdr_connection_t *conn; + qdrc_client_t *core_client; + int credit; + int counter; +}; + +static uint64_t _client_on_reply_cb(qdr_core_t *core, + qdrc_client_t *client, + uintptr_t user_context, + uintptr_t request_context, + qd_iterator_t *app_properties, + qd_iterator_t *body) +{ + qd_log(core->log, QD_LOG_TRACE, + "client test reply received rc=%"PRIxPTR, request_context); + + qd_iterator_free(app_properties); + qd_iterator_free(body); + + return PN_ACCEPTED; +} + +static void _client_on_ack_cb(qdr_core_t *core, + qdrc_client_t *client, + uintptr_t user_context, + uintptr_t request_context, + uint64_t disposition) +{ + test_client_t *tc = (test_client_t *)user_context; + qd_log(core->log, QD_LOG_TRACE, + "client test request ack rc=%"PRIxPTR" d=%"PRIu64, + request_context, disposition); + assert(request_context < tc->counter); +} +static void _client_on_done_cb(qdr_core_t *core, + qdrc_client_t *client, + uintptr_t user_context, + uintptr_t request_context, + const char *error) +{ + qd_log(core->log, QD_LOG_TRACE, + "client test request done rc=%"PRIxPTR" error=%s", + request_context, + (error) ? error : "None"); +} + +static void _do_send(test_client_t *tc) +{ + int rc = 0; + while (tc->credit > 0) { + + qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0); + qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); + + qd_compose_start_map(props); + qd_compose_insert_string(props, "action"); + qd_compose_insert_string(props, "echo"); + qd_compose_insert_string(props, "counter"); + qd_compose_insert_int(props, tc->counter); + qd_compose_end_map(props); + qd_compose_insert_string(body, "HI THERE"); + + qdrc_client_request_CT(tc->core_client, + tc->counter, // request context + props, + body, + _client_on_reply_cb, + _client_on_ack_cb, + _client_on_done_cb); + assert(rc == 0); + ++tc->counter; + --tc->credit; + qd_log(tc->module->core->log, QD_LOG_TRACE, + "client test message sent id=%d c=%d", tc->counter + 1, tc->credit); + } +} + +static void _client_on_state_cb(qdr_core_t *core, qdrc_client_t *core_client, + uintptr_t user_context, bool active) +{ + test_client_t *tc = (test_client_t *)user_context; + qd_log(tc->module->core->log, QD_LOG_TRACE, + "client test on state active=%c", active ? 'T' : 'F'); +} + +static void _client_on_flow_cb(qdr_core_t *core, qdrc_client_t *core_client, + uintptr_t user_context, int available_credit, + bool drain) +{ + test_client_t *tc = (test_client_t *)user_context; + + if (!tc->core_client) + return; + + qd_log(tc->module->core->log, QD_LOG_TRACE, + "client test on flow c=%d d=%c", available_credit, drain ? 'T' : 'F'); + tc->credit = available_credit; + _do_send(tc); + if (drain) + tc->credit = 0; +} + +static void _on_conn_event(void *context, qdrc_event_t type, qdr_connection_t *conn) +{ + test_client_t *tc = (test_client_t *)context; + + qd_log(tc->module->core->log, QD_LOG_TRACE, "client test on conn event"); + + switch (type) { + case QDRC_EVENT_CONN_OPENED: + qd_log(tc->module->core->log, QD_LOG_TRACE, "client test conn open"); + if (tc->conn) // already have a conn, ignore + return; + // look for the special test container id + const char *cid = ((conn->connection_info) + ? conn->connection_info->container + : NULL); + qd_log(tc->module->core->log, QD_LOG_TRACE, "client test container-id=%s", cid); + + if (cid && strcmp(cid, "org.apache.qpid.dispatch.test_core_client") == 0) { + qd_log(tc->module->core->log, QD_LOG_TRACE, "client test connection opened"); + qdr_terminus_t *target = qdr_terminus(NULL); + qdr_terminus_set_address(target, "test_client_address"); + tc->conn = conn; + tc->core_client = qdrc_client_CT(tc->module->core, + tc->conn, + target, + 10, // credit window + (uintptr_t)tc, // user context + _client_on_state_cb, + _client_on_flow_cb); + assert(tc->core_client); + } + break; + case QDRC_EVENT_CONN_CLOSED: + qd_log(tc->module->core->log, QD_LOG_TRACE, "client test conn closed"); + if (tc->conn == conn) { + tc->conn = NULL; + tc->credit = 0; + tc->counter = 0; + qdrc_client_free_CT(tc->core_client); + tc->core_client = NULL; + qd_log(tc->module->core->log, QD_LOG_TRACE, "client test connection closed"); + } + break; + } +} + + +static void qdrc_test_client_api_setup(test_module_t *test_module) +{ + test_client_t *tc = NEW(test_client_t); + ZERO(tc); + + tc->module = test_module; + test_module->test_client = tc; + tc->conn_events = qdrc_event_subscribe_CT(test_module->core, + (QDRC_EVENT_CONN_OPENED | QDRC_EVENT_CONN_CLOSED), + _on_conn_event, + NULL, NULL, tc); + + qd_log(test_module->core->log, QD_LOG_TRACE, "client test registered %p", tc->conn_events); +} + + +static void qdrc_test_client_api_finalize(test_module_t *test_module) +{ + test_client_t *tc = test_module->test_client; + if (tc) { + if (tc->core_client) + qdrc_client_free_CT(tc->core_client); + if (tc->conn_events) + qdrc_event_unsubscribe_CT(test_module->core, tc->conn_events); + free(tc); + test_module->test_client = NULL; + } +} + + static void qdrc_test_hooks_init_CT(qdr_core_t *core, void **module_context) { // @@ -475,14 +666,22 @@ static void qdrc_test_hooks_init_CT(qdr_core_t *core, void **module_context) return; } - *module_context = qdrc_test_hooks_core_endpoint_setup(core); + test_module_t *test_module = NEW(test_module_t); + ZERO(test_module); + test_module->core = core; + qdrc_test_hooks_core_endpoint_setup(core, test_module); + qdrc_test_client_api_setup(test_module); + *module_context = test_module; } static void qdrc_test_hooks_final_CT(void *module_context) { - if (!!module_context) + if (!!module_context) { qdrc_test_hooks_core_endpoint_finalize(module_context); + qdrc_test_client_api_finalize(module_context); + free(module_context); + } } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index b5b123c79f..fe99dec9b0 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -122,6 +122,7 @@ foreach(py_test_module ${SYSTEM_TESTS_HTTP} ${CONSOLE_TEST} system_tests_priority + system_tests_core_client ) add_test(${py_test_module} ${TEST_WRAP} -x ${PY_UNIT2_STRING} -v ${py_test_module}) diff --git a/tests/system_tests_core_client.py b/tests/system_tests_core_client.py new file mode 100644 index 0000000000..3898aa1342 --- /dev/null +++ b/tests/system_tests_core_client.py @@ -0,0 +1,220 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from __future__ import print_function + +from system_test import TestCase +from system_test import Qdrouterd +from system_test import TIMEOUT + +from proton import Message +from proton import Endpoint +from proton.handlers import MessagingHandler +from proton.reactor import Container + +# test the request/response core client messaging API These tests rely on +# enabling the router test hooks, which instantiates a test client (see +# modules/test_hooks/core_test_hooks) + +# see core_test_hooks.c +CONTAINER_ID = "org.apache.qpid.dispatch.test_core_client" +TARGET_ADDR = "test_core_client_address" + +class CoreClientAPITest(TestCase): + @classmethod + def setUpClass(cls): + super(CoreClientAPITest, cls).setUpClass() + + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'QDR.A'}), + ('listener', {'port': cls.tester.get_port()}), + ]) + + cls.router = cls.tester.qdrouterd("A", config, cl_args=["-T"]) + + def test_send_receive(self): + ts = TestService(self.router.addresses[0], credit=10) + ts.run() + self.assertTrue(ts.error is None) + self.assertEqual(10, ts.in_count) + self.assertEqual(10, ts.out_count) + + def test_credit_starve(self): + ts = TestCreditStarve(self.router.addresses[0]) + ts.run() + self.assertTrue(ts.error is None) + self.assertTrue(ts.starved) + self.assertEqual(10, ts.in_count) + + def test_unexpected_conn_close(self): + ts = TestEarlyClose(self.router.addresses[0]) + ts.run() + self.assertTrue(ts.error is None) + self.assertTrue(ts.in_count >= 1) + + + def test_bad_format(self): + ts = TestNoCorrelationId(self.router.addresses[0]) + ts.run() + self.assertTrue(ts.error is None) + self.assertTrue(ts.rejected) + + def test_old_cid(self): + ts = TestOldCorrelationId(self.router.addresses[0]) + ts.run() + self.assertTrue(ts.error is None) + self.assertTrue(ts.accepted) + + +class TestService(MessagingHandler): + # a service that the core client can communicate with + class Timeout(object): + def __init__(self, service): + self.service = service + + def on_timer_task(self, event): + self.service.timeout() + + def __init__(self, address, container_id=CONTAINER_ID, credit=1): + super(TestService, self).__init__(prefetch=0) + self._container = Container(self) + self._container.container_id = CONTAINER_ID + self._conn = None + self.address = address + self.timer = None + self.error = None + self.reply_link = None + self.incoming_link = None + self.credit = credit + self.in_count = 0 + self.out_count = 0 + + def fail(self, error): + self.error = error + if self._conn: + self._conn.close() + + def timeout(self): + self.fail("Timeout expired") + + def on_start(self, event): + self.timer = event.reactor.schedule(TIMEOUT, self.Timeout(self)) + + self._conn = event.container.connect(self.address) + + def on_link_opening(self, event): + link = event.link + if link.state & Endpoint.LOCAL_UNINIT: + link.source.copy(link.remote_source) + link.target.copy(link.remote_target) + if event.sender: + if not link.remote_source.dynamic: + self.fail("expected dynamic source terminus") + return + link.source.dynamic = False + link.source.address = "a/reply/address" + self.reply_link = link + else: + link.flow(self.credit) + self.incoming_link = link + + def create_reply(self, message): + return Message(body=message.body, + correlation_id=message.correlation_id) + + # echo back to sender + def on_message(self, event): + self.in_count += 1 + cid = event.message.correlation_id + self.reply_link.send(self.create_reply(event.message)) + + # stop when all sent messages have settled + def on_settled(self, event): + self.out_count += 1 + self.credit -= 1 + if self.credit == 0: + self._conn.close() + + def on_connection_closed(self, event): + self._conn = None + + def run(self): + self._container.timeout = 1.0 + self._container.start() + while self._container.process(): + if self._conn is None and self._container.quiesced: + break; + self._container.stop() + self._container.process() + + +# wait until all credit is exhausted, then re-flow more credit +class TestCreditStarve(TestService): + def __init__(self, address): + super(TestCreditStarve, self).__init__(address, credit=5) + self.starved = False + + def on_settled(self, event): + self.credit -= 1 + if self.credit == 0: + if not self.starved: + self.starved = True + self.credit = 5 + self.incoming_link.drain(self.credit) + else: + self._conn.close() + + +# grant 10, but don't respond and close early +class TestEarlyClose(TestService): + def __init__(self, address): + super(TestEarlyClose, self).__init__(address, credit=10) + + def on_message(self, event): + self.in_count += 1 + if self.in_count == 1: + self._conn.close() + + +class TestNoCorrelationId(TestService): + def __init__(self, address): + super(TestNoCorrelationId, self).__init__(address, credit=1) + self.rejected = False + + def create_reply(self, message): + return Message(body=dict()) + + def on_rejected(self, event): + self.rejected = True + + +class TestOldCorrelationId(TestService): + def __init__(self, address): + super(TestOldCorrelationId, self).__init__(address, credit=1) + self.accepted = False + + def create_reply(self, message): + return Message(body=dict(), + correlation_id="not going to match") + + def on_accepted(self, event): + self.accepted = True