From ee054ce69d21a33663152b90b876ad63cc2e839e Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Fri, 4 Nov 2016 13:28:30 +0100 Subject: [PATCH] DISPATCH-476: Add support for groupId property for matching link routes and auto links --- include/qpid/dispatch/amqp.h | 1 + include/qpid/dispatch/router_core.h | 1 + python/qpid_dispatch/management/qdrouter.json | 12 +++ src/amqp.c | 1 + src/router_config.c | 16 ++++ src/router_core/agent_config_auto_link.c | 22 ++++- src/router_core/agent_config_auto_link.h | 2 +- src/router_core/agent_config_link_route.c | 22 ++++- src/router_core/agent_config_link_route.h | 2 +- src/router_core/connections.c | 25 ++++-- src/router_core/route_control.c | 88 ++++++++++++++----- src/router_core/route_control.h | 10 ++- src/router_core/router_core.c | 6 ++ src/router_core/router_core_private.h | 10 ++- src/router_node.c | 26 ++++++ tests/system_tests_autolinks.py | 37 +++++--- 16 files changed, 228 insertions(+), 53 deletions(-) diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h index bda13006bb..fc7d21dbfd 100644 --- a/include/qpid/dispatch/amqp.h +++ b/include/qpid/dispatch/amqp.h @@ -128,6 +128,7 @@ extern const char * const QD_CONNECTION_PROPERTY_PRODUCT_KEY; extern const char * const QD_CONNECTION_PROPERTY_PRODUCT_VALUE; extern const char * const QD_CONNECTION_PROPERTY_VERSION_KEY; extern const char * const QD_CONNECTION_PROPERTY_COST_KEY; +extern const char * const QD_CONNECTION_PROPERTY_GROUP_KEY; /// @} /** @name AMQP error codes. */ diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 942148140a..80233483ae 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -166,6 +166,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core, uint64_t management_id, const char *label, const char *remote_container_id, + pn_bytes_t group_id, bool strip_annotations_in, bool strip_annotations_out, int link_capacity); diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index 2bd21acd77..78c945b8e9 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -902,6 +902,12 @@ "create": true, "required": false }, + "groupId": { + "type": "string", + "description": "GroupID for the target container", + "create": true, + "required": false + }, "connection": { "type": "string", "description": "The name from a connector or listener", @@ -958,6 +964,12 @@ "create": true, "required": false }, + "groupId": { + "type": "string", + "description": "GroupID for the target container", + "create": true, + "required": false + }, "connection": { "type": "string", "description": "The name from a connector or listener", diff --git a/src/amqp.c b/src/amqp.c index 95688000fe..f9cd7b6cce 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -36,6 +36,7 @@ const char * const QD_CONNECTION_PROPERTY_PRODUCT_KEY = "product"; const char * const QD_CONNECTION_PROPERTY_PRODUCT_VALUE = "qpid-dispatch-router"; const char * const QD_CONNECTION_PROPERTY_VERSION_KEY = "version"; const char * const QD_CONNECTION_PROPERTY_COST_KEY = "qd.inter-router-cost"; +const char * const QD_CONNECTION_PROPERTY_GROUP_KEY = "qd.route-container-group"; const qd_amqp_error_t QD_AMQP_OK = { 200, "OK" }; const qd_amqp_error_t QD_AMQP_CREATED = { 201, "Created" }; diff --git a/src/router_config.c b/src/router_config.c index 6fbf3375d7..4e0a37468d 100644 --- a/src/router_config.c +++ b/src/router_config.c @@ -306,6 +306,7 @@ qd_error_t qd_router_configure_link_route(qd_router_t *router, qd_entity_t *enti char *name = 0; char *prefix = 0; char *container = 0; + char *group = 0; char *c_name = 0; char *distrib = 0; char *dir = 0; @@ -314,6 +315,7 @@ qd_error_t qd_router_configure_link_route(qd_router_t *router, qd_entity_t *enti name = qd_entity_opt_string(entity, "name", 0); QD_ERROR_BREAK(); prefix = qd_entity_get_string(entity, "prefix"); QD_ERROR_BREAK(); container = qd_entity_opt_string(entity, "containerId", 0); QD_ERROR_BREAK(); + group = qd_entity_opt_string(entity, "groupId", 0); QD_ERROR_BREAK(); c_name = qd_entity_opt_string(entity, "connection", 0); QD_ERROR_BREAK(); distrib = qd_entity_opt_string(entity, "distribution", 0); QD_ERROR_BREAK(); dir = qd_entity_opt_string(entity, "dir", 0); QD_ERROR_BREAK(); @@ -339,6 +341,11 @@ qd_error_t qd_router_configure_link_route(qd_router_t *router, qd_entity_t *enti qd_compose_insert_string(body, container); } + if (group) { + qd_compose_insert_string(body, "groupId"); + qd_compose_insert_string(body, group); + } + if (c_name) { qd_compose_insert_string(body, "connection"); qd_compose_insert_string(body, c_name); @@ -386,6 +393,7 @@ qd_error_t qd_router_configure_link_route(qd_router_t *router, qd_entity_t *enti free(name); free(prefix); free(container); + free(group); free(c_name); free(distrib); free(dir); @@ -400,6 +408,7 @@ qd_error_t qd_router_configure_auto_link(qd_router_t *router, qd_entity_t *entit char *addr = 0; char *dir = 0; char *container = 0; + char *group = 0; char *c_name = 0; char *ext_addr = 0; @@ -408,6 +417,7 @@ qd_error_t qd_router_configure_auto_link(qd_router_t *router, qd_entity_t *entit addr = qd_entity_get_string(entity, "addr"); QD_ERROR_BREAK(); dir = qd_entity_get_string(entity, "dir"); QD_ERROR_BREAK(); container = qd_entity_opt_string(entity, "containerId", 0); QD_ERROR_BREAK(); + group = qd_entity_opt_string(entity, "groupId", 0); QD_ERROR_BREAK(); c_name = qd_entity_opt_string(entity, "connection", 0); QD_ERROR_BREAK(); ext_addr = qd_entity_opt_string(entity, "externalAddr", 0); QD_ERROR_BREAK(); long phase = qd_entity_opt_long(entity, "phase", -1); QD_ERROR_BREAK(); @@ -443,6 +453,11 @@ qd_error_t qd_router_configure_auto_link(qd_router_t *router, qd_entity_t *entit qd_compose_insert_string(body, container); } + if (group) { + qd_compose_insert_string(body, "groupId"); + qd_compose_insert_string(body, group); + } + if (c_name) { qd_compose_insert_string(body, "connection"); qd_compose_insert_string(body, c_name); @@ -486,6 +501,7 @@ qd_error_t qd_router_configure_auto_link(qd_router_t *router, qd_entity_t *entit free(addr); free(dir); free(container); + free(group); free(c_name); free(ext_addr); diff --git a/src/router_core/agent_config_auto_link.c b/src/router_core/agent_config_auto_link.c index 7c72f82d25..8d0f14a3b4 100644 --- a/src/router_core/agent_config_auto_link.c +++ b/src/router_core/agent_config_auto_link.c @@ -35,6 +35,7 @@ #define QDR_CONFIG_AUTO_LINK_LINK_REF 9 #define QDR_CONFIG_AUTO_LINK_OPER_STATUS 10 #define QDR_CONFIG_AUTO_LINK_LAST_ERROR 11 +#define QDR_CONFIG_AUTO_LINK_GROUP_ID 12 const char *qdr_config_auto_link_columns[] = {"name", @@ -49,6 +50,7 @@ const char *qdr_config_auto_link_columns[] = "linkRef", "operStatus", "lastError", + "groupId", 0}; const char *CONFIG_AUTOLINK_TYPE = "org.apache.qpid.dispatch.router.config.autoLink"; @@ -98,6 +100,7 @@ static void qdr_config_auto_link_insert_column_CT(qdr_auto_link_t *al, int col, case QDR_CONFIG_AUTO_LINK_CONNECTION: case QDR_CONFIG_AUTO_LINK_CONTAINER_ID: + case QDR_CONFIG_AUTO_LINK_GROUP_ID: if (al->conn_id) { key = (const char*) qd_hash_key_by_handle(al->conn_id->hash_handle); if (key && key[0] == 'L' && col == QDR_CONFIG_AUTO_LINK_CONNECTION) { @@ -108,6 +111,10 @@ static void qdr_config_auto_link_insert_column_CT(qdr_auto_link_t *al, int col, qd_compose_insert_string(body, &key[1]); break; } + if (key && key[0] == 'G' && col == QDR_CONFIG_AUTO_LINK_GROUP_ID) { + qd_compose_insert_string(body, &key[1]); + break; + } } qd_compose_insert_null(body); break; @@ -369,6 +376,7 @@ void qdra_config_auto_link_create_CT(qdr_core_t *core, qd_parsed_field_t *phase_field = qd_parse_value_by_key(in_body, qdr_config_auto_link_columns[QDR_CONFIG_AUTO_LINK_PHASE]); qd_parsed_field_t *connection_field = qd_parse_value_by_key(in_body, qdr_config_auto_link_columns[QDR_CONFIG_AUTO_LINK_CONNECTION]); qd_parsed_field_t *container_field = qd_parse_value_by_key(in_body, qdr_config_auto_link_columns[QDR_CONFIG_AUTO_LINK_CONTAINER_ID]); + qd_parsed_field_t *group_field = qd_parse_value_by_key(in_body, qdr_config_auto_link_columns[QDR_CONFIG_AUTO_LINK_GROUP_ID]); qd_parsed_field_t *external_addr = qd_parse_value_by_key(in_body, qdr_config_auto_link_columns[QDR_CONFIG_AUTO_LINK_EXT_ADDR]); // @@ -409,10 +417,18 @@ void qdra_config_auto_link_create_CT(qdr_core_t *core, // // The request is good. Create the entity. // - bool is_container = !!container_field; - qd_parsed_field_t *in_use_conn = is_container ? container_field : connection_field; + int matcher = QDR_CONN_ID_MATCHER_CONN_LABEL; + qd_parsed_field_t *in_use_conn = connection_field; + + if (!!container_field) { + matcher = QDR_CONN_ID_MATCHER_CONTAINER_ID; + in_use_conn = container_field; + } else if (!!group_field) { + matcher = QDR_CONN_ID_MATCHER_GROUP_ID; + in_use_conn = group_field; + } - al = qdr_route_add_auto_link_CT(core, name, addr_field, dir, phase, in_use_conn, is_container, external_addr); + al = qdr_route_add_auto_link_CT(core, name, addr_field, dir, phase, in_use_conn, matcher, external_addr); // // Compose the result map for the response. diff --git a/src/router_core/agent_config_auto_link.h b/src/router_core/agent_config_auto_link.h index 9ee33c974f..657f611e92 100644 --- a/src/router_core/agent_config_auto_link.h +++ b/src/router_core/agent_config_auto_link.h @@ -32,7 +32,7 @@ void qdra_config_auto_link_get_CT(qdr_core_t *core, qd_field_iterator_t *identity, qdr_query_t *query, const char *qdr_config_auto_link_columns[]); -#define QDR_CONFIG_AUTO_LINK_COLUMN_COUNT 12 +#define QDR_CONFIG_AUTO_LINK_COLUMN_COUNT 13 const char *qdr_config_auto_link_columns[QDR_CONFIG_AUTO_LINK_COLUMN_COUNT + 1]; diff --git a/src/router_core/agent_config_link_route.c b/src/router_core/agent_config_link_route.c index d7da761903..4099997db8 100644 --- a/src/router_core/agent_config_link_route.c +++ b/src/router_core/agent_config_link_route.c @@ -32,6 +32,7 @@ #define QDR_CONFIG_LINK_ROUTE_CONTAINER_ID 6 #define QDR_CONFIG_LINK_ROUTE_DIR 7 #define QDR_CONFIG_LINK_ROUTE_OPER_STATUS 8 +#define QDR_CONFIG_LINK_ROUTE_GROUP_ID 9 const char *qdr_config_link_route_columns[] = {"name", @@ -43,6 +44,7 @@ const char *qdr_config_link_route_columns[] = "containerId", "dir", "operStatus", + "groupId", 0}; const char *CONFIG_LINKROUTE_TYPE = "org.apache.qpid.dispatch.router.config.linkRoute"; @@ -100,6 +102,7 @@ static void qdr_config_link_route_insert_column_CT(qdr_link_route_t *lr, int col case QDR_CONFIG_LINK_ROUTE_CONNECTION: case QDR_CONFIG_LINK_ROUTE_CONTAINER_ID: + case QDR_CONFIG_LINK_ROUTE_GROUP_ID: if (lr->conn_id) { key = (const char*) qd_hash_key_by_handle(lr->conn_id->hash_handle); if (key && key[0] == 'L' && col == QDR_CONFIG_LINK_ROUTE_CONNECTION) { @@ -110,6 +113,10 @@ static void qdr_config_link_route_insert_column_CT(qdr_link_route_t *lr, int col qd_compose_insert_string(body, &key[1]); break; } + if (key && key[0] == 'G' && col == QDR_CONFIG_LINK_ROUTE_GROUP_ID) { + qd_compose_insert_string(body, &key[1]); + break; + } } qd_compose_insert_null(body); break; @@ -375,6 +382,7 @@ void qdra_config_link_route_create_CT(qdr_core_t *core, qd_parsed_field_t *distrib_field = qd_parse_value_by_key(in_body, qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_DISTRIBUTION]); qd_parsed_field_t *connection_field = qd_parse_value_by_key(in_body, qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_CONNECTION]); qd_parsed_field_t *container_field = qd_parse_value_by_key(in_body, qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_CONTAINER_ID]); + qd_parsed_field_t *group_field = qd_parse_value_by_key(in_body, qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_GROUP_ID]); qd_parsed_field_t *dir_field = qd_parse_value_by_key(in_body, qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_DIR]); // @@ -408,10 +416,18 @@ void qdra_config_link_route_create_CT(qdr_core_t *core, // // The request is good. Create the entity. // - bool is_container = !!container_field; - qd_parsed_field_t *in_use_conn = is_container ? container_field : connection_field; + int matcher = QDR_CONN_ID_MATCHER_CONN_LABEL; + qd_parsed_field_t *in_use_conn = connection_field; + + if (!!container_field) { + matcher = QDR_CONN_ID_MATCHER_CONTAINER_ID; + in_use_conn = container_field; + } else if (!!group_field) { + matcher = QDR_CONN_ID_MATCHER_GROUP_ID; + in_use_conn = group_field; + } - lr = qdr_route_add_link_route_CT(core, name, prefix_field, in_use_conn, is_container, trt, dir); + lr = qdr_route_add_link_route_CT(core, name, prefix_field, in_use_conn, matcher, trt, dir); // // Compose the result map for the response. diff --git a/src/router_core/agent_config_link_route.h b/src/router_core/agent_config_link_route.h index 46c9e15f35..4673c77cd3 100644 --- a/src/router_core/agent_config_link_route.h +++ b/src/router_core/agent_config_link_route.h @@ -33,7 +33,7 @@ void qdra_config_link_route_get_CT(qdr_core_t *core, qdr_query_t *query, const char *qdr_config_link_route_columns[]); -#define QDR_CONFIG_LINK_ROUTE_COLUMN_COUNT 9 +#define QDR_CONFIG_LINK_ROUTE_COLUMN_COUNT 10 const char *qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_COLUMN_COUNT + 1]; diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 4dabeebf38..23c27f0ce1 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -62,6 +62,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core, uint64_t management_id, const char *label, const char *remote_container_id, + pn_bytes_t group_id, bool strip_annotations_in, bool strip_annotations_out, int link_capacity) @@ -87,6 +88,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core, action->args.connection.conn = conn; action->args.connection.connection_label = qdr_field(label); action->args.connection.container_id = qdr_field(remote_container_id); + action->args.connection.group_id = qdr_field_with_length(group_id.start, group_id.size); qdr_action_enqueue(core, action); return conn; @@ -895,6 +897,7 @@ static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, boo // qdr_field_free(action->args.connection.connection_label); qdr_field_free(action->args.connection.container_id); + qdr_field_free(action->args.connection.group_id); return; } @@ -910,6 +913,7 @@ static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, boo conn->role = QDR_ROLE_NORMAL; qdr_field_free(action->args.connection.connection_label); qdr_field_free(action->args.connection.container_id); + qdr_field_free(action->args.connection.group_id); return; } @@ -932,18 +936,29 @@ static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, boo // // - // If there's a connection label, use it as the identifier. Otherwise, use the remote - // container id. + // The connection identifier is matched in the following order: + // 1. Remote group id property + // 2. Connection label + // 3. Remote container id // - qdr_field_t *cid = action->args.connection.connection_label ? - action->args.connection.connection_label : action->args.connection.container_id; + qdr_field_t *cid = action->args.connection.group_id; + int matcher = QDR_CONN_ID_MATCHER_GROUP_ID; + if (!cid) { + cid = action->args.connection.connection_label; + matcher = QDR_CONN_ID_MATCHER_CONN_LABEL; + } + if (!cid) { + cid = action->args.connection.container_id; + matcher = QDR_CONN_ID_MATCHER_CONTAINER_ID; + } if (cid) - qdr_route_connection_opened_CT(core, conn, cid, action->args.connection.connection_label == 0); + qdr_route_connection_opened_CT(core, conn, cid, matcher); } } qdr_field_free(action->args.connection.connection_label); qdr_field_free(action->args.connection.container_id); + qdr_field_free(action->args.connection.group_id); } diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c index 57120b864b..69d7622e5c 100644 --- a/src/router_core/route_control.c +++ b/src/router_core/route_control.c @@ -26,11 +26,23 @@ ALLOC_DEFINE(qdr_auto_link_t); ALLOC_DEFINE(qdr_conn_identifier_t); +const char qdr_conn_id_match_prefix[] = + {'C', + 'L', + 'G', + 0}; + +const char *qdr_conn_id_match_id[] = + {"container", + "name", + "group", + 0}; + static qdr_conn_identifier_t *qdr_route_declare_id_CT(qdr_core_t *core, qd_field_iterator_t *conn_id, - bool is_container) + int matcher) { - char prefix = is_container ? 'C' : 'L'; + char prefix = qdr_conn_id_match_prefix[matcher]; qdr_conn_identifier_t *cid = 0; qd_address_iterator_reset_view(conn_id, ITER_VIEW_ADDRESS_HASH); @@ -53,7 +65,7 @@ static void qdr_route_check_id_for_deletion_CT(qdr_core_t *core, qdr_conn_identi // If this connection identifier has no open connection and no referencing routes, // it can safely be deleted and removed from the hash index. // - if (cid->open_connection == 0 && DEQ_IS_EMPTY(cid->link_route_refs) && DEQ_IS_EMPTY(cid->auto_link_refs)) { + if (DEQ_IS_EMPTY(cid->connection_refs) && DEQ_IS_EMPTY(cid->link_route_refs) && DEQ_IS_EMPTY(cid->auto_link_refs)) { qd_hash_remove_by_handle(core->conn_id_hash, cid->hash_handle); free_qdr_conn_identifier_t(cid); } @@ -69,10 +81,15 @@ static void qdr_route_log_CT(qdr_core_t *core, const char *text, const char *nam if (!name) snprintf(id_string, 64, "%"PRId64, id); - qd_log(core->log, QD_LOG_INFO, "%s '%s' on %s %s", - text, log_name, key[0] == 'L' ? "connection" : "container", &key[1]); -} + const char * key_name = "connection"; + if (key[0] == 'C') { + key_name = "container"; + } else if (key[0] == 'G') { + key_name = "group"; + } + qd_log(core->log, QD_LOG_INFO, "%s '%s' on %s %s", text, log_name, key_name, &key[1]); +} static void qdr_link_route_activate_CT(qdr_core_t *core, qdr_link_route_t *lr, qdr_connection_t *conn) { @@ -167,7 +184,7 @@ qdr_link_route_t *qdr_route_add_link_route_CT(qdr_core_t *core, qd_field_iterator_t *name, qd_parsed_field_t *prefix_field, qd_parsed_field_t *conn_id, - bool is_container, + int matcher, qd_address_treatment_t treatment, qd_direction_t dir) { @@ -202,10 +219,13 @@ qdr_link_route_t *qdr_route_add_link_route_CT(qdr_core_t *core, // Find or create a connection identifier structure for this link route // if (conn_id) { - lr->conn_id = qdr_route_declare_id_CT(core, qd_parse_raw(conn_id), is_container); + lr->conn_id = qdr_route_declare_id_CT(core, qd_parse_raw(conn_id), matcher); DEQ_INSERT_TAIL_N(REF, lr->conn_id->link_route_refs, lr); - if (lr->conn_id->open_connection) - qdr_link_route_activate_CT(core, lr, lr->conn_id->open_connection); + qdr_connection_ref_t * cref = DEQ_HEAD(lr->conn_id->connection_refs); + while (cref) { + qdr_link_route_activate_CT(core, lr, cref->conn); + cref = DEQ_NEXT(cref); + } } // @@ -225,8 +245,11 @@ void qdr_route_del_link_route_CT(qdr_core_t *core, qdr_link_route_t *lr) // qdr_conn_identifier_t *cid = lr->conn_id; if (cid) { - if (!!cid->open_connection) - qdr_link_route_deactivate_CT(core, lr, cid->open_connection); + qdr_connection_ref_t * cref = DEQ_HEAD(cid->connection_refs); + while (cref) { + qdr_link_route_deactivate_CT(core, lr, cref->conn); + cref = DEQ_NEXT(cref); + } DEQ_REMOVE_N(REF, cid->link_route_refs, lr); qdr_route_check_id_for_deletion_CT(core, cid); } @@ -254,7 +277,7 @@ qdr_auto_link_t *qdr_route_add_auto_link_CT(qdr_core_t *core, qd_direction_t dir, int phase, qd_parsed_field_t *conn_id, - bool is_container, + int matcher, qd_parsed_field_t *external_addr) { qdr_auto_link_t *al = new_qdr_auto_link_t(); @@ -289,11 +312,15 @@ qdr_auto_link_t *qdr_route_add_auto_link_CT(qdr_core_t *core, // // Find or create a connection identifier structure for this auto_link // + qd_log(core->log, QD_LOG_INFO, "Attempting to add link to connection id"); if (conn_id) { - al->conn_id = qdr_route_declare_id_CT(core, qd_parse_raw(conn_id), is_container); + al->conn_id = qdr_route_declare_id_CT(core, qd_parse_raw(conn_id), matcher); DEQ_INSERT_TAIL_N(REF, al->conn_id->auto_link_refs, al); - if (al->conn_id->open_connection) - qdr_auto_link_activate_CT(core, al, al->conn_id->open_connection); + qdr_connection_ref_t * cref = DEQ_HEAD(al->conn_id->connection_refs); + while (cref) { + qdr_auto_link_activate_CT(core, al, cref->conn); + cref = DEQ_NEXT(cref); + } } // @@ -313,8 +340,11 @@ void qdr_route_del_auto_link_CT(qdr_core_t *core, qdr_auto_link_t *al) // qdr_conn_identifier_t *cid = al->conn_id; if (cid) { - if (!!cid->open_connection) - qdr_auto_link_deactivate_CT(core, al, cid->open_connection); + qdr_connection_ref_t * cref = DEQ_HEAD(cid->connection_refs); + while (cref) { + qdr_auto_link_deactivate_CT(core, al, cref->conn); + cref = DEQ_NEXT(cref); + } DEQ_REMOVE_N(REF, cid->auto_link_refs, al); qdr_route_check_id_for_deletion_CT(core, cid); } @@ -340,15 +370,14 @@ void qdr_route_del_auto_link_CT(qdr_core_t *core, qdr_auto_link_t *al) void qdr_route_connection_opened_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_field_t *field, - bool is_container) + int matcher) { if (conn->role != QDR_ROLE_ROUTE_CONTAINER || !field) return; - qdr_conn_identifier_t *cid = qdr_route_declare_id_CT(core, field->iterator, is_container); + qdr_conn_identifier_t *cid = qdr_route_declare_id_CT(core, field->iterator, matcher); - assert(!cid->open_connection); - cid->open_connection = conn; + qdr_add_connection_ref(&cid->connection_refs, conn); conn->conn_id = cid; // @@ -360,11 +389,13 @@ void qdr_route_connection_opened_CT(qdr_core_t *core, lr = DEQ_NEXT_N(REF, lr); } + qd_log(core->log, QD_LOG_INFO, "Connection opened, matching on field: %s", qdr_conn_id_match_id[matcher]); // // Activate all auto-links associated with this remote container. // qdr_auto_link_t *al = DEQ_HEAD(cid->auto_link_refs); while (al) { + qd_log(core->log, QD_LOG_INFO, "Attempting to open auto link"); qdr_auto_link_activate_CT(core, al, conn); al = DEQ_NEXT_N(REF, al); } @@ -396,7 +427,18 @@ void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn) al = DEQ_NEXT_N(REF, al); } - cid->open_connection = 0; + // + // Remove our own entry in the connection list + // + qdr_connection_ref_t * cref = DEQ_HEAD(cid->connection_refs); + while (cref) { + if (cref->conn == conn) { + DEQ_REMOVE(cid->connection_refs, cref); + break; + } + cref = DEQ_NEXT(cref); + } + conn->conn_id = 0; qdr_route_check_id_for_deletion_CT(core, cid); diff --git a/src/router_core/route_control.h b/src/router_core/route_control.h index 8111566717..750932a060 100644 --- a/src/router_core/route_control.h +++ b/src/router_core/route_control.h @@ -21,11 +21,15 @@ #include "router_core_private.h" +#define QDR_CONN_ID_MATCHER_CONN_LABEL 0 +#define QDR_CONN_ID_MATCHER_CONTAINER_ID 1 +#define QDR_CONN_ID_MATCHER_GROUP_ID 2 + qdr_link_route_t *qdr_route_add_link_route_CT(qdr_core_t *core, qd_field_iterator_t *name, qd_parsed_field_t *prefix_field, qd_parsed_field_t *conn_id, - bool is_container, + int matcher, qd_address_treatment_t treatment, qd_direction_t dir); @@ -37,7 +41,7 @@ qdr_auto_link_t *qdr_route_add_auto_link_CT(qdr_core_t *core, qd_direction_t dir, int phase, qd_parsed_field_t *conn_id, - bool is_container, + int matcher, qd_parsed_field_t *external_addr); void qdr_route_del_auto_link_CT(qdr_core_t *core, qdr_auto_link_t *auto_link); @@ -45,7 +49,7 @@ void qdr_route_del_auto_link_CT(qdr_core_t *core, qdr_auto_link_t *auto_link); void qdr_route_connection_opened_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_field_t *field, - bool is_container); + int matcher); void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn); diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 07b7035400..ec1c498547 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -113,9 +113,15 @@ void qdr_core_free(qdr_core_t *core) ALLOC_DECLARE(qdr_field_t); ALLOC_DEFINE(qdr_field_t); + qdr_field_t *qdr_field(const char *text) { size_t length = text ? strlen(text) : 0; + return qdr_field_with_length(text, length); +} + +qdr_field_t *qdr_field_with_length(const char *text, size_t length) +{ size_t ilength = length; if (length == 0) diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index f8e702fefe..aef0622187 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -61,6 +61,7 @@ typedef struct { } qdr_field_t; qdr_field_t *qdr_field(const char *string); +qdr_field_t *qdr_field_with_length(const char *string, size_t length); qdr_field_t *qdr_field_from_iter(qd_field_iterator_t *iter); qd_field_iterator_t *qdr_field_iterator(qdr_field_t *field); void qdr_field_free(qdr_field_t *field); @@ -96,6 +97,7 @@ struct qdr_action_t { qdr_connection_t *conn; qdr_field_t *connection_label; qdr_field_t *container_id; + qdr_field_t *group_id; qdr_link_t *link; qdr_delivery_t *delivery; qd_message_t *msg; @@ -505,10 +507,10 @@ DEQ_DECLARE(qdr_auto_link_t, qdr_auto_link_list_t); struct qdr_conn_identifier_t { - qd_hash_handle_t *hash_handle; - qdr_connection_t *open_connection; - qdr_link_route_list_t link_route_refs; - qdr_auto_link_list_t auto_link_refs; + qd_hash_handle_t *hash_handle; + qdr_connection_ref_list_t connection_refs; + qdr_link_route_list_t link_route_refs; + qdr_auto_link_list_t auto_link_refs; }; ALLOC_DECLARE(qdr_conn_identifier_t); diff --git a/src/router_node.c b/src/router_node.c index 09449891c9..c2e0bf4556 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -557,6 +557,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool bool strip_annotations_out = false; int link_capacity = 1; const char *name = 0; + pn_bytes_t group_id = {0, 0}; uint64_t connection_id = qd_connection_connection_id(conn); pn_connection_t *pn_conn = qd_connection_pn(conn); @@ -593,10 +594,35 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool // if (remote_cost > cost) cost = remote_cost; + } else if (role == QDR_ROLE_ROUTE_CONTAINER) { + // Check remote properties for a group id + // + pn_data_t *props = pn_conn ? pn_connection_remote_properties(pn_conn) : 0; + if (props) { + pn_data_rewind(props); + pn_data_next(props); + if (props && pn_data_type(props) == PN_MAP) { + pn_data_enter(props); + while (pn_data_next(props)) { + if (pn_data_type(props) == PN_SYMBOL) { + pn_bytes_t sym = pn_data_get_symbol(props); + if (sym.size == strlen(QD_CONNECTION_PROPERTY_GROUP_KEY) && + strcmp(sym.start, QD_CONNECTION_PROPERTY_GROUP_KEY) == 0) { + pn_data_next(props); + if (pn_data_type(props) == PN_STRING) { + group_id = pn_data_get_string(props); + } + break; + } + } + } + } + } } qdr_connection_t *qdrc = qdr_connection_opened(router->router_core, inbound, role, cost, connection_id, name, pn_connection_remote_container(pn_conn), + group_id, strip_annotations_in, strip_annotations_out, link_capacity); qd_connection_set_context(conn, qdrc); diff --git a/tests/system_tests_autolinks.py b/tests/system_tests_autolinks.py index 928cf5c10c..298bb4b021 100644 --- a/tests/system_tests_autolinks.py +++ b/tests/system_tests_autolinks.py @@ -18,7 +18,7 @@ # import unittest -from proton import Message, Delivery, PENDING, ACCEPTED, REJECTED +from proton import Message, Delivery, PENDING, ACCEPTED, REJECTED, symbol from system_test import TestCase, Qdrouterd, main_module from proton.handlers import MessagingHandler from proton.reactor import Container, AtMostOnce, AtLeastOnce @@ -67,6 +67,12 @@ def setUpClass(cls): # ('autoLink', {'addr': 'node.2', 'externalAddr': 'ext.2', 'containerId': 'container.4', 'dir': 'in'}), ('autoLink', {'addr': 'node.2', 'externalAddr': 'ext.2', 'containerId': 'container.4', 'dir': 'out'}), + + # + # Create a pair of auto-links with a group id + # + ('autoLink', {'addr': 'node.3', 'groupId': 'group.1', 'dir': 'in'}), + ('autoLink', {'addr': 'node.3', 'groupId': 'group.1', 'dir': 'out'}), ]) cls.router = cls.tester.qdrouterd(name, config) @@ -165,6 +171,16 @@ def test_09_autolink_receiver_with_ext_addr(self): test.run() self.assertEqual(None, test.error) + def test_10_autolink_group_id(self): + """ + Create an autolink with a group id. Ensure that multiple connections will get attached + links. + """ + properties = {symbol(u'qd.route-container-group'): u'group.1'} + test = AutolinkAttachTest('container.5', self.route_address, 'node.3', properties) + test.run() + self.assertEqual(None, test.error) + class Timeout(object): def __init__(self, parent): @@ -175,14 +191,15 @@ def on_timer_task(self, event): class AutolinkAttachTest(MessagingHandler): - def __init__(self, cid, address, node_addr): + def __init__(self, cid, address, node_addr, connect_properties={}): super(AutolinkAttachTest, self).__init__(prefetch=0) - self.cid = cid - self.address = address - self.node_addr = node_addr - self.error = None - self.sender = None - self.receiver = None + self.cid = cid + self.connect_properties = connect_properties + self.address = address + self.node_addr = node_addr + self.error = None + self.sender = None + self.receiver = None self.n_rx_attach = 0 self.n_tx_attach = 0 @@ -193,11 +210,11 @@ def timeout(self): def on_start(self, event): self.timer = event.reactor.schedule(5, Timeout(self)) - self.conn = event.container.connect(self.address) + self.conn = event.container.connect(self.address, properties=self.connect_properties) def on_connection_closed(self, event): if self.n_tx_attach == 1: - self.conn = event.container.connect(self.address) + self.conn = event.container.connect(self.address, properties=self.connect_properties) def on_link_opened(self, event): if event.sender: