From 0a2d47165574809e20f16f47acd1e63b6ce97430 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Thu, 25 Oct 2018 16:02:56 -0400 Subject: [PATCH] DISPATCH-1154: Synchronize edge link routes to interior --- src/CMakeLists.txt | 2 + src/router_core/agent_conn_link_route.c | 22 +- .../modules/edge_router/edge_mgmt.c | 317 ++++++++++++ .../modules/edge_router/edge_mgmt.h | 101 ++++ .../modules/edge_router/link_route_proxy.c | 458 ++++++++++++++++++ .../modules/edge_router/link_route_proxy.h | 33 ++ src/router_core/modules/edge_router/module.c | 9 + src/router_core/router_core.c | 2 + tests/system_tests_edge_router.py | 208 ++++++++ tests/system_tests_link_routes.py | 71 +-- tests/test_broker.py | 38 +- 11 files changed, 1212 insertions(+), 49 deletions(-) create mode 100644 src/router_core/modules/edge_router/edge_mgmt.c create mode 100644 src/router_core/modules/edge_router/edge_mgmt.h create mode 100644 src/router_core/modules/edge_router/link_route_proxy.c create mode 100644 src/router_core/modules/edge_router/link_route_proxy.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b8eb36f9ee..bcaa66a72c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -104,6 +104,8 @@ set(qpid_dispatch_SOURCES router_core/modules/edge_router/module.c router_core/modules/edge_router/addr_proxy.c router_core/modules/edge_router/connection_manager.c + router_core/modules/edge_router/link_route_proxy.c + router_core/modules/edge_router/edge_mgmt.c router_core/modules/test_hooks/core_test_hooks.c router_node.c router_pynode.c diff --git a/src/router_core/agent_conn_link_route.c b/src/router_core/agent_conn_link_route.c index 686bd07573..c09378087b 100644 --- a/src/router_core/agent_conn_link_route.c +++ b/src/router_core/agent_conn_link_route.c @@ -131,17 +131,19 @@ static qdr_link_route_t *_find_link_route_CT(qdr_connection_t *conn, // if both id and name provided, prefer id // if (identity) { - qd_parsed_field_t *identity_field = qd_parse(identity); - uint64_t id = qd_parse_as_ulong(identity_field); - if (qd_parse_ok(identity_field)) { - lr = DEQ_HEAD(conn->conn_link_routes); - while (lr) { - if (id == lr->identity) - break; - lr = DEQ_NEXT(lr); - } + char buf[64]; + uint64_t id = 0; + assert(qd_iterator_length(identity) < sizeof(buf)); + qd_iterator_strncpy(identity, buf, sizeof(buf)); + if (sscanf(buf, "%"SCNu64, &id) != 1) { + return NULL; + } + lr = DEQ_HEAD(conn->conn_link_routes); + while (lr) { + if (id == lr->identity) + break; + lr = DEQ_NEXT(lr); } - qd_parse_free(identity_field); } else if (name) { lr = DEQ_HEAD(conn->conn_link_routes); while (lr) { diff --git a/src/router_core/modules/edge_router/edge_mgmt.c b/src/router_core/modules/edge_router/edge_mgmt.c new file mode 100644 index 0000000000..f9d713a565 --- /dev/null +++ b/src/router_core/modules/edge_router/edge_mgmt.c @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "edge_mgmt.h" +#include "core_client_api.h" +#include "link_route_proxy.h" + +#include +#include + +/* + * an API that lets the core issue management requests + */ + +static qdrc_client_t *_client; // req/resp client +static qdrc_event_subscription_t *_event_handle; // edge conn up/down + + +static void _mgmt_on_state_cb_CT(qdr_core_t *, qdrc_client_t *, uintptr_t, bool); +static void _mgmt_on_flow_cb_CT(qdr_core_t *, qdrc_client_t *, uintptr_t, int, bool); + + +// +// edge uplink connection event handler +// +static void _conn_event_CT(void *context, qdrc_event_t event_type, qdr_connection_t *conn) +{ + qdr_core_t *core = (qdr_core_t *) context; + + switch (event_type) { + case QDRC_EVENT_CONN_EDGE_ESTABLISHED: + // create a messaging client to the interior router's $management + // + qd_log(core->log, QD_LOG_TRACE, + "starting edge mgmt client (id=%"PRIu64")", conn->identity); + qdr_terminus_t *target = qdr_terminus(0); + qdr_terminus_set_address(target, "$management"); + _client = qdrc_client_CT(core, + conn, + target, + 100, // credit + 0, // user_context + _mgmt_on_state_cb_CT, + _mgmt_on_flow_cb_CT); + if (!_client) { + qd_log(core->log, QD_LOG_ERROR, + "Failed to start edge management client"); + } + break; + + case QDRC_EVENT_CONN_EDGE_LOST: + // clean up messaging client + // + qd_log(core->log, QD_LOG_TRACE, + "stopping edge mgmt client (id=%"PRIu64")", conn->identity); + qdrc_client_free_CT(_client); + _client = NULL; + break; + } +} + + +// Per anagement request context +// +typedef struct qcm_edge_mgmt_request_t qcm_edge_mgmt_request_t; +struct qcm_edge_mgmt_request_t { + void *req_context; + qcm_edge_mgmt_reply_CT_t reply_callback; + qcm_edge_mgmt_error_CT_t error_callback; +}; +ALLOC_DEFINE(qcm_edge_mgmt_request_t); + + +// utility to parse out status code from management response message +static int _extract_mgmt_status(qdr_core_t *core, + qd_iterator_t *app_properties, + int32_t *statusCode, + char **statusDescription) +{ + int rc = 0; + *statusDescription = NULL; + *statusCode = 500; + + qd_parsed_field_t *properties = qd_parse(app_properties); + if (!properties || !qd_parse_ok(properties) || !qd_parse_is_map(properties)) { + qd_log(core->log, QD_LOG_ERROR, "bad edge management reply msg - invalid properties field"); + rc = EINVAL; + goto exit; + } + qd_parsed_field_t *status_fld = qd_parse_value_by_key(properties, "statusCode"); + if (!status_fld) { + qd_log(core->log, QD_LOG_ERROR, "bad edge management reply msg - statusCode field missing"); + rc = EINVAL; + goto exit; + } + *statusCode = qd_parse_as_int(status_fld); + if (!qd_parse_ok(status_fld)) { + qd_log(core->log, QD_LOG_ERROR, "bad edge management reply msg - statusCode field invalid"); + rc = EINVAL; + goto exit; + } + qd_parsed_field_t *desc_fld = qd_parse_value_by_key(properties, "statusDescription"); + if (desc_fld) { // it's optional, so no error if unset + qd_iterator_t *tmp = qd_parse_raw(desc_fld); + *statusDescription = (char *)qd_iterator_copy(tmp); + } + +exit: + if (properties) + qd_parse_free(properties); + return rc; +} + + +// mgmt client link state changed +static void _mgmt_on_state_cb_CT(qdr_core_t *core, + qdrc_client_t *client, + uintptr_t user_context, + bool active) +{ + qd_log(core->log, QD_LOG_TRACE, + "edge mgmt client state change: uc=%"PRIuPTR" %s", + user_context, + (active) ? "active" : "down"); + + if (!active) { + // stop the syncing of link routes by setting credit=0 + qcm_edge_link_route_proxy_flow_CT(core, 0, true); + } +} + + +// mgmt client credit granted by interior router +static void _mgmt_on_flow_cb_CT(qdr_core_t *core, + qdrc_client_t *client, + uintptr_t user_context, + int available_credit, + bool drain) +{ + qd_log(core->log, QD_LOG_TRACE, + "edge mgmt client flow: uc=%"PRIuPTR" c=%d d=%s", + user_context, available_credit, + (drain) ? "T" : "F"); + + qcm_edge_link_route_proxy_flow_CT(core, + available_credit, + drain); +} + + +// terminal disposition set on sent request +static void _mgmt_on_ack_cb_CT(qdr_core_t *core, + qdrc_client_t *client, + uintptr_t user_context, + uintptr_t request_context, + uint64_t disposition) +{ + qcm_edge_mgmt_request_t *req = (qcm_edge_mgmt_request_t *)request_context; + + qd_log(core->log, QD_LOG_TRACE, + "edge mgmt request update: rc=%"PRIuPTR" d=0x%"PRIx64, + req->req_context, disposition); + + if (disposition != PN_ACCEPTED) { + // failure - no reply will be sent to cleanup + if (req->error_callback) { + req->error_callback(core, req->req_context, "Request not accepted"); + req->error_callback = NULL; // avoid recalling on mgmt done + } + } +} + + +// reply message received +static uint64_t _mgmt_on_reply_cb_CT(qdr_core_t *core, + qdrc_client_t *client, + uintptr_t user_context, + uintptr_t request_context, + qd_iterator_t *app_properties, + qd_iterator_t *body) +{ + int32_t statusCode = 0; + char *statusDescription = 0; + uint64_t disposition = PN_ACCEPTED; + + qcm_edge_mgmt_request_t *req = (qcm_edge_mgmt_request_t *)request_context; + + if (_extract_mgmt_status(core, app_properties, &statusCode, &statusDescription)) { + // error - bad response + statusCode = 500; + } + qd_iterator_free(app_properties); + + qd_log(core->log, QD_LOG_TRACE, + "Edge management request reply:" + " rc=%"PRIuPTR" status=%"PRId32": %s", + req->req_context, statusCode, + (statusDescription) ? statusDescription : ""); + + if (req->reply_callback) + disposition = req->reply_callback(core, + req->req_context, + statusCode, + statusDescription, + body); + free(statusDescription); + return disposition; +} + + +// request completed or aborted due to error +static void _mgmt_on_done_cb_CT(qdr_core_t *core, + qdrc_client_t *client, + uintptr_t user_context, + uintptr_t request_context, + const char *error) +{ + qcm_edge_mgmt_request_t *req = (qcm_edge_mgmt_request_t *)request_context; + qd_log(core->log, QD_LOG_TRACE, + "edge mgmt request done: uc=%"PRIuPTR" rc=%"PRIuPTR" %s", + user_context, request_context, error ? error : ""); + + if (error && req->error_callback) + req->error_callback(core, req->req_context, error); + + free_qcm_edge_mgmt_request_t(req); +} + + +// send management request - takes ownership of body +int qcm_edge_mgmt_request_CT(qdr_core_t *core, + void *request_context, + const char *operation, + const char *entity_type, + const char *identity, + const char *name, + qd_composed_field_t *body, + qcm_edge_mgmt_reply_CT_t reply_cb, + qcm_edge_mgmt_error_CT_t error_cb) +{ + + qd_log(core->log, QD_LOG_TRACE, + "New Edge management request: rc=%"PRIuPTR" %s type=%s id=%s", + request_context, operation, entity_type, + (identity) ? identity : ""); + + qcm_edge_mgmt_request_t *req = new_qcm_edge_mgmt_request_t(); + ZERO(req); + req->req_context = request_context; + req->reply_callback = reply_cb; + req->error_callback = error_cb; + + // create a message containing a management request + + qd_composed_field_t *ap_fld = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0); + qd_compose_start_map(ap_fld); + qd_compose_insert_string(ap_fld, "operation"); + qd_compose_insert_string(ap_fld, operation); + qd_compose_insert_string(ap_fld, "type"); + qd_compose_insert_string(ap_fld, entity_type); + if (identity) { + qd_compose_insert_string(ap_fld, "identity"); + qd_compose_insert_string(ap_fld, identity); + } + // note: if there is a name specified qdrouterd expects to find it in the + // *properties* header, which doesn't jive with the current mgmt spec + // (WD12) + if (name) { + qd_compose_insert_string(ap_fld, "name"); + qd_compose_insert_string(ap_fld, name); + } + qd_compose_end_map(ap_fld); + + return qdrc_client_request_CT(_client, + (uintptr_t)req, // request context + ap_fld, + body, + _mgmt_on_reply_cb_CT, + _mgmt_on_ack_cb_CT, + _mgmt_on_done_cb_CT); +} + + +void qcm_edge_mgmt_init_CT(qdr_core_t *core) +{ + _event_handle = qdrc_event_subscribe_CT(core, + (QDRC_EVENT_CONN_EDGE_ESTABLISHED + | QDRC_EVENT_CONN_EDGE_LOST), + _conn_event_CT, + NULL, // link event + NULL, // addr event + core); // context +} + + +void qcm_edge_mgmt_final_CT(qdr_core_t *core) +{ + qdrc_event_unsubscribe_CT(core, _event_handle); + qdrc_client_free_CT(_client); + _client = NULL; +} diff --git a/src/router_core/modules/edge_router/edge_mgmt.h b/src/router_core/modules/edge_router/edge_mgmt.h new file mode 100644 index 0000000000..06ee0069b4 --- /dev/null +++ b/src/router_core/modules/edge_router/edge_mgmt.h @@ -0,0 +1,101 @@ +#ifndef router_core_edge_mgmt_h +#define router_core_edge_mgmt_h 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "router_core_private.h" + +// +// Edge Router Management API +// +// API to interior router management agent. Note that all callbacks are run +// under the Core thread. +// + + +/** + * invoked when the reply arrives from the interior router + * + * @param core + * @param request_context - user supplied context passed to the qcm_edge_mgmt_request_CT call + * @param statusCode - as defined by the AMQP management spec + * @param statusDescription - as defined by the AMQP management spec, may be NULL + * @param body - body from reply message. May be NULL. Ownership is transferred + * to this callback - it must be freed by the application. + * @return the final disposition to return to the interior router + */ +typedef uint64_t (*qcm_edge_mgmt_reply_CT_t)(qdr_core_t *core, + void *request_context, + int32_t statusCode, + const char *statusDescription, + qd_iterator_t *body); + + +/** + * Should the request message fail to be sent this callback will be invoked + * instead the reply callback. It can be used to clean up application state on + * failure. + * + * @param core + * @param request_context - user supplied context passed to the qcm_edge_mgmt_request_CT call + * @param error - a description of the error that occurred. + * + */ +typedef void (*qcm_edge_mgmt_error_CT_t)(qdr_core_t *core, + void *request_context, + const char *error); + +/** + * Send management request to the interior router. + * + * Creates a management request message and sends it to the interior router's + * $management target. When the reply is received it is dispatched via the + * reply_cb callback. If a messaging failure occurs the error_cb callback is + * invoked (and the reply_cb is not invoked) + * + * @param core + * @param request_context - user supplied context that is passed to the callbacks + * @param operation - one of "CREATE", "DELETE" + * @param entity_type - identifies the type of mgmt entity to operate on, + * (e.g. "org.apache.qpid.dispatch.router.config.address") + * @param identity - for DELETE only: identifier for entity to be deleted + * @param name - (optional) the entity's name + * @param body - message body content. Ownership is transferred - the caller + * must not reference this on return. + * @param reply_cb - Callback for reply message. + * @param error_cb - Callback if error occurs + * @return zero on success, else error. On success a callback is + * guaranteed to be invoked. + */ +int qcm_edge_mgmt_request_CT(qdr_core_t *core, + void *request_context, + const char *operation, + const char *entity_type, + const char *identity, + const char *name, + qd_composed_field_t *body, + qcm_edge_mgmt_reply_CT_t reply_cb, + qcm_edge_mgmt_error_CT_t error_cb); + + +// module setup/teardown +void qcm_edge_mgmt_init_CT(qdr_core_t *core); +void qcm_edge_mgmt_final_CT(qdr_core_t *core); + +#endif // router_core_edge_mgmt_h diff --git a/src/router_core/modules/edge_router/link_route_proxy.c b/src/router_core/modules/edge_router/link_route_proxy.c new file mode 100644 index 0000000000..5a761f1cfd --- /dev/null +++ b/src/router_core/modules/edge_router/link_route_proxy.c @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "link_route_proxy.h" +#include "agent_conn_link_route.h" +#include "edge_mgmt.h" + +#include +#include + +// Track the state of the link route configuration proxy on +// the uplinked interior router +// + +typedef enum { + QDR_LINK_ROUTE_PROXY_NEW = 0, // needs to create proxy + QDR_LINK_ROUTE_PROXY_CREATING, // create request sent + QDR_LINK_ROUTE_PROXY_CREATED, // create request completed ok + QDR_LINK_ROUTE_PROXY_CANCELLED, // deleted while waiting for create request reply + QDR_LINK_ROUTE_PROXY_DELETED, // needs to delete proxy + QDR_LINK_ROUTE_PROXY_DELETING, // delete request sent +} link_route_proxy_state_t; + +typedef struct link_route_proxy_t link_route_proxy_t; +struct link_route_proxy_t { + DEQ_LINKS(link_route_proxy_t); + char *proxy_name; + char *proxy_id; + char *address; + link_route_proxy_state_t proxy_state; + qd_direction_t direction; +}; +ALLOC_DEFINE(link_route_proxy_t); +DEQ_DECLARE(link_route_proxy_t, link_route_proxy_list_t); + + +static link_route_proxy_list_t _link_route_proxies; +static qdrc_event_subscription_t *_event_handle; +static int _available_credit; + + +static uint64_t _on_create_reply_CT(qdr_core_t *, void *, int32_t, const char *, qd_iterator_t *); +static uint64_t _on_delete_reply_CT(qdr_core_t *, void *, int32_t, const char *, qd_iterator_t *); +static void _on_create_error_CT(qdr_core_t *, void *, const char *); +static void _on_delete_error_CT(qdr_core_t *, void *, const char *); + + +static void _free_link_route_proxy(link_route_proxy_t *lrp) +{ + if (!lrp) + return; + free(lrp->proxy_name); + free(lrp->proxy_id); + free(lrp->address); + free_link_route_proxy_t(lrp); +} + + +// clean up the entire proxy list +static void _free_all_link_route_proxies(void) +{ + link_route_proxy_t *lrp = DEQ_HEAD(_link_route_proxies); + while (lrp) { + DEQ_REMOVE_HEAD(_link_route_proxies); + _free_link_route_proxy(lrp); + lrp = DEQ_HEAD(_link_route_proxies); + } +} + + +// generate the body for a management CREATE message for a Connection Scoped +// Link Route +static qd_composed_field_t *_create_body(link_route_proxy_t *lrp) +{ + qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); + qd_compose_start_map(body); + + qd_compose_insert_string(body, qdr_conn_link_route_columns[QDR_CONN_LINK_ROUTE_TYPE]); + qd_compose_insert_string(body, CONN_LINK_ROUTE_TYPE); + + qd_compose_insert_string(body, qdr_conn_link_route_columns[QDR_CONN_LINK_ROUTE_PATTERN]); + qd_compose_insert_string(body, lrp->address); + + qd_compose_insert_string(body, qdr_conn_link_route_columns[QDR_CONN_LINK_ROUTE_DIRECTION]); + qd_compose_insert_string(body, lrp->direction == QD_INCOMING ? "in" : "out"); + + qd_compose_insert_string(body, qdr_conn_link_route_columns[QDR_CONN_LINK_ROUTE_NAME]); + qd_compose_insert_string(body, lrp->proxy_name); + + qd_compose_end_map(body); + return body; +} + + +// check for any link route configuration entities that need to be +// synchronized with the peer interior router +// +static void _sync_interior_proxies(qdr_core_t *core) +{ + link_route_proxy_t *lrp = DEQ_HEAD(_link_route_proxies); + while (lrp && _available_credit > 0) { + + if (lrp->proxy_state == QDR_LINK_ROUTE_PROXY_NEW) { + + qd_log(core->log, QD_LOG_TRACE, + "Creating proxy link route for address=%s named=%s", + lrp->address, lrp->proxy_name); + + lrp->proxy_state = QDR_LINK_ROUTE_PROXY_CREATING; + qcm_edge_mgmt_request_CT(core, + lrp, // context + "CREATE", + CONN_LINK_ROUTE_TYPE, + 0, + lrp->proxy_name, + _create_body(lrp), + _on_create_reply_CT, + _on_create_error_CT); + _available_credit -= 1; + + } else if (lrp->proxy_state == QDR_LINK_ROUTE_PROXY_DELETED) { + + qd_log(core->log, QD_LOG_TRACE, + "Deleting proxy link route address=%s proxy-id=%s name=%s", + lrp->address, lrp->proxy_id, lrp->proxy_name); + + lrp->proxy_state = QDR_LINK_ROUTE_PROXY_DELETING; + + // empty body for delete + qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); + qd_compose_start_map(body); + qd_compose_end_map(body); + + qcm_edge_mgmt_request_CT(core, + lrp, // context + "DELETE", + CONN_LINK_ROUTE_TYPE, + lrp->proxy_id, + lrp->proxy_name, + body, + _on_delete_reply_CT, + _on_delete_error_CT); + _available_credit -= 1; + } + lrp = DEQ_NEXT(lrp); + } +} + + +// handle the response to our create request message +static uint64_t _on_create_reply_CT(qdr_core_t *core, + void *request_context, + int32_t statusCode, + const char *statusDescription, + qd_iterator_t *body) +{ + link_route_proxy_t *lrp = (link_route_proxy_t *)request_context; + uint64_t disposition = PN_ACCEPTED; + + if (statusCode == 201) { // Created + qd_parsed_field_t *parsed_body = qd_parse(body); + qd_parsed_field_t *proxy_id = qd_parse_value_by_key(parsed_body, + "identity"); + if (!proxy_id) { + // really should not happen (a bug) + qd_log(core->log, QD_LOG_ERROR, + "Link route proxy CREATE failed: invalid response message," + " address=%s proxy name=%s", + lrp->address, lrp->proxy_name); + DEQ_REMOVE(_link_route_proxies, lrp); + _free_link_route_proxy(lrp); + disposition = PN_REJECTED; + } else { + lrp->proxy_id = (char *)qd_iterator_copy(qd_parse_raw(proxy_id)); + qd_log(core->log, QD_LOG_TRACE, + "link route proxy CREATE successful, address=%s peer-id=%s proxy name=%s)", + lrp->address, lrp->proxy_id, lrp->proxy_name); + switch (lrp->proxy_state) { + case QDR_LINK_ROUTE_PROXY_CREATING: + lrp->proxy_state = QDR_LINK_ROUTE_PROXY_CREATED; + break; + case QDR_LINK_ROUTE_PROXY_CANCELLED: + // the address was removed while waiting for the create + // to complete. Now forward the delete along to interior + lrp->proxy_state = QDR_LINK_ROUTE_PROXY_DELETED; + _sync_interior_proxies(core); + break; + default: + assert(false); + } + } + qd_parse_free(parsed_body); + } else { + // crap. This is unexpected. Perhaps a duplication? + // only way to be sure is to now query using the proxy + // name, which makes things complicated... + qd_log(core->log, QD_LOG_ERROR, + "link route proxy CREATE failed with error: (%"PRId32") %s," + " address=%s proxy_name=%s)", + statusCode, + statusDescription ? statusDescription : "unknown", + lrp->address, lrp->proxy_name); + // @TODO(kgiusti) - reset the connection + DEQ_REMOVE(_link_route_proxies, lrp); + _free_link_route_proxy(lrp); + } + + qd_iterator_free(body); + return disposition; +} + + +// create request failed to reach interior (or was rejected, released, etc) +static void _on_create_error_CT(qdr_core_t *core, + void *request_context, + const char *error) +{ + link_route_proxy_t *lrp = (link_route_proxy_t *)request_context; + + // likely the link detached or conn coming down - preserve + // the proxy and try again later + qd_log(core->log, QD_LOG_DEBUG, + "link route proxy CREATE failed: %s, address=%s name=%s", + error ? error : "unknown", + lrp->address, lrp->proxy_name); + lrp->proxy_state = QDR_LINK_ROUTE_PROXY_NEW; +} + + +// handle the response to our delete request message +static uint64_t _on_delete_reply_CT(qdr_core_t *core, + void *request_context, + int32_t statusCode, + const char *statusDescription, + qd_iterator_t *body) +{ + link_route_proxy_t *lrp = (link_route_proxy_t *)request_context; + + qd_iterator_free(body); // body is ignored + + switch (statusCode) { + case 204: + case 404: + // consider No Content or Not Found as success + qd_log(core->log, QD_LOG_TRACE, + "link route proxy DELETE successful," + " address=%s proxy_id=%s proxy_name=%s (code=%d)", + lrp->address, lrp->proxy_id, lrp->proxy_name, + statusCode); + break; + default: + // oh crap, this is unexpected and is probably a bug + qd_log(core->log, QD_LOG_ERROR, + "link route proxy DELETE failed with error: (%"PRId32") %s," + " address=%s proxy id=%s proxy name=%s)", + statusCode, + statusDescription ? statusDescription : "unknown", + lrp->address, lrp->proxy_id, lrp->proxy_name); + } + DEQ_REMOVE(_link_route_proxies, lrp); + _free_link_route_proxy(lrp); + return PN_ACCEPTED; +} + + +// delete request failed to reach interior (or was rejected, released, etc) +static void _on_delete_error_CT(qdr_core_t *core, + void *request_context, + const char *error) +{ + link_route_proxy_t *lrp = (link_route_proxy_t *)request_context; + + // likely the link detached or conn coming down - preserve + // the proxy and try again later + qd_log(core->log, QD_LOG_DEBUG, + "link route proxy DELETE failed: %s, address=%s name=%s", + error ? error : "unknown", + lrp->address, lrp->proxy_name); + lrp->proxy_state = QDR_LINK_ROUTE_PROXY_DELETED; +} + + +// called when a new link route is configured. Create a proxy +// for the link route on the interior router +// +static void _link_route_added_CT(qdr_core_t *core, qdr_address_t *addr) +{ + const char *address = (const char *)qd_hash_key_by_handle(addr->hash_handle); + qd_log(core->log, QD_LOG_TRACE, + "edge creating proxy link route for '%s'", address); + + link_route_proxy_t *lrp = new_link_route_proxy_t(); + ZERO(lrp); + + if (QDR_IS_LINK_ROUTE_PREFIX(address[0])) { + // connection scoped link routes only support patterns (since prefix is + // a type of pattern). Prefix address strings do not have the trailing + // '#' in the address as it is inferred by the type. So convert the + // prefix address to an address pattern + char *buf = malloc(strlen(address) + 2); // skip prefix, add /# + strcpy(buf, &address[1]); + strcat(buf, "/#"); + lrp->address = buf; + } else { // already in pattern form + lrp->address = strdup(&address[1]); // skip prefix + } + lrp->proxy_state = QDR_LINK_ROUTE_PROXY_NEW; + lrp->direction = QDR_LINK_ROUTE_DIR(address[0]); + + // construct a name for the proxy link route in the format of + // /proxyLinkRoute/
+ lrp->proxy_name = malloc(strlen(core->router_id) + + 16 + + strlen(address) + 1); // 16 == len("/proxyLinkRoute/") + sprintf(lrp->proxy_name, "%s/proxyLinkRoute/%s", core->router_id, address); + + DEQ_INSERT_TAIL(_link_route_proxies, lrp); +} + + +// called by route control when an existing link route config entity is deleted +static void _link_route_deleted_CT(qdr_core_t *core, qdr_address_t *addr) +{ + const char *address = (const char *)qd_hash_key_by_handle(addr->hash_handle); + qd_log(core->log, QD_LOG_TRACE, + "edge deleting proxy link route for '%s'", address); + + qd_direction_t dir = QDR_LINK_ROUTE_DIR(address[0]); + link_route_proxy_t *lrp = DEQ_HEAD(_link_route_proxies); + DEQ_FIND(lrp, dir == lrp->direction && strcmp(lrp->address, &address[1]) == 0); + if (lrp) { + switch (lrp->proxy_state) { + case QDR_LINK_ROUTE_PROXY_NEW: + // never created - no need to send a delete request + DEQ_REMOVE(_link_route_proxies, lrp); + _free_link_route_proxy(lrp); + break; + case QDR_LINK_ROUTE_PROXY_CREATED: + lrp->proxy_state = QDR_LINK_ROUTE_PROXY_DELETED; + break; + case QDR_LINK_ROUTE_PROXY_CREATING: + // uh oh - deleted before our outstanding create completed + lrp->proxy_state = QDR_LINK_ROUTE_PROXY_CANCELLED; + break; + default: + assert(false); + } + } +} + +static void _on_conn_event(void *context, + qdrc_event_t event_type, + qdr_connection_t *conn) +{ + // we only receive edge loss events + assert(event_type == QDRC_EVENT_CONN_EDGE_LOST); + + // the interior should purge all of the proxies since they are connection + // scoped. Reset the proxy state to NEW or remove the proxy if deleted + link_route_proxy_t *lrp = DEQ_HEAD(_link_route_proxies); + while (lrp) { + link_route_proxy_t *next = DEQ_NEXT(lrp); + switch (lrp->proxy_state) { + case QDR_LINK_ROUTE_PROXY_CREATING: + case QDR_LINK_ROUTE_PROXY_CREATED: + lrp->proxy_state = QDR_LINK_ROUTE_PROXY_NEW; + free(lrp->proxy_id); + lrp->proxy_id = NULL; + break; + case QDR_LINK_ROUTE_PROXY_CANCELLED: + case QDR_LINK_ROUTE_PROXY_DELETED: + case QDR_LINK_ROUTE_PROXY_DELETING: + DEQ_REMOVE(_link_route_proxies, lrp); + _free_link_route_proxy(lrp); + break; + default: + break; + } + lrp = next; + } +} + +static void _on_addr_event(void *context, + qdrc_event_t event_type, + qdr_address_t *addr) +{ + qdr_core_t *core = (qdr_core_t *)context; + const char *address = (const char *)qd_hash_key_by_handle(addr->hash_handle); + + if (!QDR_IS_LINK_ROUTE(*address)) + return; + + switch (event_type) { + case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST: + _link_route_added_CT(core, addr); + break; + case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST: + _link_route_deleted_CT(core, addr); + break; + } + _sync_interior_proxies(core); +} + +// +// Public API +// + + +// called by the edge mgmt API when credit has been granted: +void qcm_edge_link_route_proxy_flow_CT(qdr_core_t *core, int available_credit, bool drain) +{ + _available_credit = available_credit; + _sync_interior_proxies(core); + if (drain) { + _available_credit = 0; + } +} + +// called by the edge router module init method: +void qcm_edge_link_route_init_CT(qdr_core_t *core) +{ + // need to know when the connection to the interior + // fails so we can flush state + _event_handle = qdrc_event_subscribe_CT(core, + (QDRC_EVENT_CONN_EDGE_LOST + | QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST + | QDRC_EVENT_ADDR_BECAME_LOCAL_DEST), + _on_conn_event, + NULL, + _on_addr_event, + core); +} + + +// called by the edge router module final method: +void qcm_edge_link_route_final_CT(qdr_core_t *core) +{ + qdrc_event_unsubscribe_CT(core, _event_handle); + _free_all_link_route_proxies(); +} + + diff --git a/src/router_core/modules/edge_router/link_route_proxy.h b/src/router_core/modules/edge_router/link_route_proxy.h new file mode 100644 index 0000000000..3f8b0eec82 --- /dev/null +++ b/src/router_core/modules/edge_router/link_route_proxy.h @@ -0,0 +1,33 @@ +#ifndef router_core_edge_link_routes_h +#define router_core_edge_link_routes_h 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// +// Manages link route configuration proxies created on the edge's interior +// router +// + +#include "router_core_private.h" + +void qcm_edge_link_route_init_CT(qdr_core_t *core); +void qcm_edge_link_route_final_CT(qdr_core_t *core); +void qcm_edge_link_route_proxy_flow_CT(qdr_core_t *core, int available_credit, bool drain); + +#endif diff --git a/src/router_core/modules/edge_router/module.c b/src/router_core/modules/edge_router/module.c index d115d2b216..9c73a78faf 100644 --- a/src/router_core/modules/edge_router/module.c +++ b/src/router_core/modules/edge_router/module.c @@ -20,8 +20,12 @@ #include "module.h" #include "connection_manager.h" #include "addr_proxy.h" +#include "edge_mgmt.h" +#include "link_route_proxy.h" + typedef struct { + qdr_core_t *core; qcm_edge_conn_mgr_t *conn_mgr; qcm_edge_addr_proxy_t *addr_proxy; // TODO - Add pointers to other edge-router state here @@ -32,8 +36,11 @@ static void qcm_edge_router_init_CT(qdr_core_t *core, void **module_context) { if (core->router_mode == QD_ROUTER_MODE_EDGE) { qcm_edge_t *edge = NEW(qcm_edge_t); + edge->core = core; edge->conn_mgr = qcm_edge_conn_mgr(core); edge->addr_proxy = qcm_edge_addr_proxy(core); + qcm_edge_mgmt_init_CT(core); + qcm_edge_link_route_init_CT(core); // TODO - Add initialization of other edge-router functions here *module_context = edge; } else @@ -48,6 +55,8 @@ static void qcm_edge_router_final_CT(void *module_context) if (edge) { qcm_edge_conn_mgr_final(edge->conn_mgr); qcm_edge_addr_proxy_final(edge->addr_proxy); + qcm_edge_mgmt_final_CT(edge->core); + qcm_edge_link_route_final_CT(edge->core); // TODO - Add finalization of other edge-router functions here free(edge); } diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 119a5a1ae1..187a19167a 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -464,6 +464,7 @@ void qdr_core_bind_address_conn_CT(qdr_core_t *core, qdr_address_t *addr, qdr_co if (DEQ_SIZE(addr->conns) == 1) { const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle); qdr_post_mobile_added_CT(core, key); + qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_BECAME_LOCAL_DEST, addr); } } @@ -474,6 +475,7 @@ void qdr_core_unbind_address_conn_CT(qdr_core_t *core, qdr_address_t *addr, qdr_ if (DEQ_IS_EMPTY(addr->conns)) { const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle); qdr_post_mobile_removed_CT(core, key); + qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST, addr); } } diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py index d7d16ae30b..fa706b4e07 100644 --- a/tests/system_tests_edge_router.py +++ b/tests/system_tests_edge_router.py @@ -17,12 +17,24 @@ # under the License. # +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from __future__ import print_function + +from time import sleep + import unittest2 as unittest from proton import Message, Timeout from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy +from system_test import AsyncTestReceiver +from system_test import AsyncTestSender +from system_tests_link_routes import ConnLinkRouteService +from test_broker import FakeService from proton.handlers import MessagingHandler from proton.reactor import Container, DynamicNodeProperties + class RouterTest(TestCase): inter_router_port = None @@ -170,6 +182,202 @@ def test_14_mobile_address_edge_to_edge_two_interior(self): self.assertEqual(None, test.error) +class LinkRouteProxyTest(TestCase): + """ + Test edge router's ability to proxy configured and connection-scoped link + routes into the interior + """ + + @classmethod + def setUpClass(cls): + """Start a router""" + super(LinkRouteProxyTest, cls).setUpClass() + + def router(name, mode, extra): + config = [ + ('router', {'mode': mode, 'id': name}), + ('listener', {'role': 'normal', 'port': cls.tester.get_port()}) + ] + + if extra: + config.extend(extra) + config = Qdrouterd.Config(config) + cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) + return cls.routers[-1] + + # configuration: + # two edge routers connected via 2 interior routers. + # + # +-------+ +---------+ +---------+ +-------+ + # | EA1 |<==>| INT.A |<==>| INT.B |<==>| EB1 | + # +-------+ +---------+ +---------+ +-------+ + + cls.routers = [] + + router('INT.A', 'interior', + [('listener', {'role': 'inter-router', 'port': cls.tester.get_port()})]) + cls.INT_A = cls.routers[0] + + router('INT.B', 'interior', + [('connector', {'name': 'connectorToA', 'role': 'inter-router', + 'port': cls.INT_A.ports[1]})]) + cls.INT_B = cls.routers[1] + + router('EA1', 'edge', + [('listener', {'name': 'rc', 'role': 'route-container', + 'port': cls.tester.get_port()}), + ('connector', {'name': 'uplink', 'role': 'edge', + 'port': cls.INT_A.ports[0]}), + ('linkRoute', {'prefix': 'CfgLinkRoute1', 'containerId': 'FakeBroker', 'direction': 'in'}), + ('linkRoute', {'prefix': 'CfgLinkRoute1', 'containerId': 'FakeBroker', 'direction': 'out'})]) + cls.EA1 = cls.routers[2] + + router('EB1', 'edge', + [('connector', {'name': 'uplink', 'role': 'edge', + 'port': cls.INT_B.ports[0]})]) + cls.EB1 = cls.routers[3] + + cls.INT_A.wait_router_connected('INT.B') + cls.INT_B.wait_router_connected('INT.A') + cls.EA1.wait_connectors() + cls.EB1.wait_connectors() + + cls.CFG_LINK_ROUTE_TYPE = 'org.apache.qpid.dispatch.router.config.linkRoute' + cls.CONN_LINK_ROUTE_TYPE = 'org.apache.qpid.dispatch.router.connection.linkRoute' + cls.CONNECTOR_TYPE = 'org.apache.qpid.dispatch.connector' + + def _get_address(self, router, address): + a_type = 'org.apache.qpid.dispatch.router.address' + addrs = router.management.query(a_type).get_dicts() + return list(filter(lambda a: a['name'].find(address) != -1, + addrs)) + + def _test_traffic(self, sender, receiver, address, count=5): + tr = AsyncTestReceiver(receiver, address) + ts = AsyncTestSender(sender, address, count) + ts.wait() # wait until all sent + for i in range(count): + tr.queue.get(timeout=TIMEOUT) + tr.stop() + + def test_link_route_proxy_configured(self): + """ + Activate the configured link routes via a FakeService, verify proxies + created by passing traffic from/to and interior router + """ + + fs = FakeService(self.EA1.addresses[1]) + self.INT_B.wait_address("CfgLinkRoute1") + self._test_traffic(self.INT_B.addresses[0], + self.INT_B.addresses[0], + "CfgLinkRoute1/hi", + count=5) + fs.join() + self.assertEqual(5, fs.in_count) + self.assertEqual(5, fs.out_count) + + def test_conn_link_route_proxy(self): + """ + Test connection scoped link routes + """ + fs = ConnLinkRouteService(self.EA1.addresses[1], + container_id="FakeService", + config = [("ConnLinkRoute1", + {"pattern": "Conn/*/One", + "direction": "out"}), + ("ConnLinkRoute2", + {"pattern": "Conn/*/One", + "direction": "in"})]) + self.assertEqual(2, len(fs.values)) + + self.INT_B.wait_address("Conn/*/One") + self.assertEqual(2, len(self._get_address(self.INT_A, "Conn/*/One"))) + + self._test_traffic(self.INT_B.addresses[0], + self.INT_A.addresses[0], + "Conn/BLAB/One", + count=5) + fs.join() + self.assertEqual(5, fs.in_count) + self.assertEqual(5, fs.out_count) + + # the link route service connection is closed, verify delete + self.assertEqual(0, len(self._get_address(self.INT_A, "Conn/*/One"))) + + def test_interior_conn_lost(self): + """ + What happens when the interior connection bounces? + """ + config = Qdrouterd.Config([('router', {'mode': 'edge', + 'id': 'Edge1'}), + ('listener', {'role': 'normal', + 'port': self.tester.get_port()}), + ('listener', {'name': 'rc', + 'role': 'route-container', + 'port': self.tester.get_port()}), + ('linkRoute', {'pattern': 'Edge1/*', + 'containerId': 'FakeBroker', + 'direction': 'in'}), + ('linkRoute', {'pattern': 'Edge1/*', + 'containerId': 'FakeBroker', + 'direction': 'out'})]) + er = self.tester.qdrouterd('Edge1', config, wait=True) + + # activate the link routes before the connection exists + fs = FakeService(er.addresses[1]) + er.wait_address("Edge1/*") + + + # create the connection to interior + er_mgmt = er.management + ctor = er_mgmt.create(type=self.CONNECTOR_TYPE, + name='toA', + attributes={'role': 'edge', + 'port': self.INT_A.ports[0]}) + self.INT_B.wait_address("Edge1/*") + + # delete it, and verify the routes are removed + ctor.delete() + while self._get_address(self.INT_B, "Edge1/*"): + sleep(0.5) + + # now recreate and verify routes re-appear + ctor = er_mgmt.create(type=self.CONNECTOR_TYPE, + name='toA', + attributes={'role': 'edge', + 'port': self.INT_A.ports[0]}) + self.INT_B.wait_address("Edge1/*") + er.teardown() + + + def test_thrashing_link_routes(self): + """ + Rapidly add and delete link routes at the edge + """ + + # activate the pre-configured link routes + ea1_mgmt = self.EA1.management + fs = FakeService(self.EA1.addresses[1]) + self.INT_B.wait_address("CfgLinkRoute1") + + for i in range(10): + lr1 = ea1_mgmt.create(type=self.CFG_LINK_ROUTE_TYPE, + name="TestLRout%d" % i, + attributes={'pattern': 'Test/*/%d/#' % i, + 'containerId': 'FakeBroker', + 'direction': 'out'}) + lr2 = ea1_mgmt.create(type=self.CFG_LINK_ROUTE_TYPE, + name="TestLRin%d" % i, + attributes={'pattern': 'Test/*/%d/#' % i, + 'containerId': 'FakeBroker', + 'direction': 'in'}) + # verify that they are correctly propagated (once) + if i == 9: + self.INT_B.wait_address("Test/*/9/#") + lr1.delete() + lr2.delete() + + class Timeout(object): def __init__(self, parent): self.parent = parent diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py index cfef365d82..dd1631da78 100644 --- a/tests/system_tests_link_routes.py +++ b/tests/system_tests_link_routes.py @@ -1700,31 +1700,23 @@ def test_config_file_bad(self): def test_mgmt(self): # test create, delete, and query mgmt_conn = BlockingConnection(self.QDR_A.addresses[0]) - mgmt_recv = mgmt_conn.create_receiver(address=None, dynamic=True, credit=250) - mgmt_sender = mgmt_conn.create_sender(address="$management") - mgmt_proxy = MgmtMsgProxy(mgmt_recv.link.remote_source.address) + mgmt_proxy = ConnLinkRouteMgmtProxy(mgmt_conn) for i in range(10): - msg = mgmt_proxy.create_conn_link_route("lr1-%d" % i, + rsp = mgmt_proxy.create_conn_link_route("lr1-%d" % i, {'pattern': "*/hi/there/%d" % i, 'direction': 'out' if i % 2 else 'in'}) - mgmt_sender.send(msg) - rsp = mgmt_proxy.response(mgmt_recv.receive()) self.assertEqual(201, rsp.status_code) # test query - msg = mgmt_proxy.query_conn_link_routes() - mgmt_sender.send(msg) - rsp = mgmt_proxy.response(mgmt_recv.receive()) + rsp = mgmt_proxy.query_conn_link_routes() self.assertEqual(200, rsp.status_code) self.assertEqual(10, len(rsp.results)) entities = rsp.results # test read - msg = mgmt_proxy.read_conn_link_route('lr1-5') - mgmt_sender.send(msg) - rsp = mgmt_proxy.response(mgmt_recv.receive()) + rsp = mgmt_proxy.read_conn_link_route('lr1-5') self.assertEqual(200, rsp.status_code) self.assertEqual("lr1-5", rsp.attrs['name']) self.assertEqual("*/hi/there/5", rsp.attrs['pattern']) @@ -1738,35 +1730,25 @@ def test_mgmt(self): {'pattern': ''}, {'pattern': 7}] for a in attrs: - msg = mgmt_proxy.create_conn_link_route("iamnoone", a) - mgmt_sender.send(msg) - rsp = mgmt_proxy.response(mgmt_recv.receive()) + rsp = mgmt_proxy.create_conn_link_route("iamnoone", a) self.assertEqual(400, rsp.status_code) # bad read - msg = mgmt_proxy.read_conn_link_route('iamnoone') - mgmt_sender.send(msg) - rsp = mgmt_proxy.response(mgmt_recv.receive()) + rsp = mgmt_proxy.read_conn_link_route('iamnoone') self.assertEqual(404, rsp.status_code) # bad delete - msg = mgmt_proxy.delete_conn_link_route('iamnoone') - mgmt_sender.send(msg) - rsp = mgmt_proxy.response(mgmt_recv.receive()) + rsp = mgmt_proxy.delete_conn_link_route('iamnoone') self.assertEqual(404, rsp.status_code) # delete all for r in entities: self.assertEqual(200, r.status_code) - msg = mgmt_proxy.delete_conn_link_route(r.attrs['name']) - mgmt_sender.send(msg) - rsp = mgmt_proxy.response(mgmt_recv.receive()) + rsp = mgmt_proxy.delete_conn_link_route(r.attrs['name']) self.assertEqual(204, rsp.status_code) # query - should be none left - msg = mgmt_proxy.query_conn_link_routes() - mgmt_sender.send(msg) - rsp = mgmt_proxy.response(mgmt_recv.receive()) + rsp = mgmt_proxy.query_conn_link_routes() self.assertEqual(200, rsp.status_code) self.assertEqual(0, len(rsp.results)) @@ -1839,7 +1821,7 @@ def test_send_receive(self): self.assertEqual("SENDING TO flea.B", r.queue.get(timeout=TIMEOUT).body) r.stop() - self.assertEqual(COUNT, fs.incoming) + self.assertEqual(COUNT, fs.in_count) # send from B to A r = AsyncTestReceiver(self.QDR_A.addresses[0], @@ -1855,7 +1837,7 @@ def test_send_receive(self): self.assertEqual("SENDING TO flea.A", r.queue.get(timeout=TIMEOUT).body) r.stop() - self.assertEqual(2 * COUNT, fs.incoming) + self.assertEqual(2 * COUNT, fs.in_count) # once the fake service closes its conn the link routes # are removed so the link route addresses must be gone @@ -1876,7 +1858,6 @@ def __init__(self, url, container_id, config, timeout=TIMEOUT): self.mgmt_proxy = None self.mgmt_sender = None self.mgmt_receiver = None - self.incoming = 0 self._config = config self._config_index = 0 self._config_done = Event() @@ -1884,6 +1865,7 @@ def __init__(self, url, container_id, config, timeout=TIMEOUT): self._config_values = [] self._cleaning_up = False self._delete_done = Event() + self._delete_count = 0 self._event_injector = EventInjector() self._delete_event = ApplicationEvent("delete_config") super(ConnLinkRouteService, self).__init__(url, container_id) @@ -1943,6 +1925,7 @@ def on_sendable(self, event): elif self._config_values: cv = self._config_values.pop() msg = self.mgmt_proxy.delete_conn_link_route(cv['name']) + self._delete_count += 1 else: super(ConnLinkRouteService, self).on_sendable(event) @@ -1956,7 +1939,8 @@ def on_message(self, event): self._config_done.set() elif response.status_code == 204: # deleted - if not self._config_values: + self._delete_count -= 1 + if (not self._config_values) and self._delete_count == 0: self._delete_done.set() else: # error @@ -1965,7 +1949,6 @@ def on_message(self, event): self._config_done.set() self._delete_done.set() else: - self.incoming += 1 super(ConnLinkRouteService, self).on_message(event) def on_delete_config(self, event): @@ -1979,9 +1962,33 @@ def on_delete_config(self, event): cv = self._config_values.pop() msg = self.mgmt_proxy.delete_conn_link_route(cv["name"]) self.mgmt_sender.send(msg) + self._delete_count += 1 except IndexError: pass +class ConnLinkRouteMgmtProxy(object): + """ + Manage connection scoped link routes over a given connection. + While the connection remains open the connection scoped links will remain + configured and active + """ + def __init__(self, bconn, credit=250): + self._receiver = bconn.create_receiver(address=None, dynamic=True, credit=credit) + self._sender = bconn.create_sender(address="$management") + self._proxy = MgmtMsgProxy(self._receiver.link.remote_source.address) + + def __getattr__(self, key): + # wrap accesses to the management message functions so we can send and + # receive the messages using the blocking links + f = getattr(self._proxy, key) + if not callable(f): + return f + def _func(*args, **kwargs): + self._sender.send(f(*args, **kwargs)) + return self._proxy.response(self._receiver.receive()) + return _func + + if __name__ == '__main__': unittest.main(main_module()) diff --git a/tests/test_broker.py b/tests/test_broker.py index 6894878b86..0b9bdfad45 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -40,6 +40,9 @@ class FakeBroker(MessagingHandler): + """ + A fake broker-like service that listens for client connections + """ class _Queue(object): def __init__(self, dynamic=False): self.dynamic = dynamic @@ -56,31 +59,39 @@ def unsubscribe(self, consumer): def publish(self, message): self.queue.append(message) - self.dispatch() + return self.dispatch() def dispatch(self, consumer=None): if consumer: c = [consumer] else: c = self.consumers - while self._deliver_to(c): pass + count = 0 + while True: + rc = self._deliver_to(c) + count += rc + if rc == 0: + break; + return count def _deliver_to(self, consumers): try: - result = False + result = 0 for c in consumers: if c.credit: c.send(self.queue.popleft()) - result = True + result += 1 return result except IndexError: # no more messages - return False + return 0 def __init__(self, url, container_id=None): super(FakeBroker, self).__init__() self.url = url self.queues = {} self.acceptor = None + self.in_count = 0 + self.out_count = 0 self._connections = [] self._error = None self._container = Container(self) @@ -168,7 +179,20 @@ def remove_stale_consumers(self, connection): l = l.next(Endpoint.REMOTE_ACTIVE) def on_sendable(self, event): - self._queue(event.link.source.address).dispatch(event.link) + self.out_count += self._queue(event.link.source.address).dispatch(event.link) def on_message(self, event): - self._queue(event.link.target.address).publish(event.message) + self.in_count += 1 + self.out_count += self._queue(event.link.target.address).publish(event.message) + + +class FakeService(FakeBroker): + """ + Like a broker, but proactively connects to the message bus + Useful for testing link routes + """ + def __init__(self, url, container_id=None): + super(FakeService, self).__init__(url, container_id) + + def on_start(self, event): + event.container.connect(url=self.url)