From 5f4e4a6f2a0dc326bf51823036ab8e6bad9df063 Mon Sep 17 00:00:00 2001 From: Chuck Rolke Date: Mon, 3 Oct 2016 13:48:52 -0400 Subject: [PATCH 1/5] DISPATCH-451: Allow configurable maxSessions and maxSessionWindow --- include/qpid/dispatch/server.h | 16 ++++++++++++ python/qpid_dispatch/management/qdrouter.json | 26 +++++++++++++++++++ src/connection_manager.c | 10 ++++++- src/container.c | 4 ++- src/policy.c | 8 ++++-- src/server.c | 2 ++ 6 files changed, 62 insertions(+), 4 deletions(-) diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h index 9862895c7e..6e6f5032be 100644 --- a/include/qpid/dispatch/server.h +++ b/include/qpid/dispatch/server.h @@ -417,6 +417,22 @@ typedef struct qd_server_config_t { */ uint32_t max_frame_size; + /** + * The max_sessions value is the number of sessions allowed on the Connection. + * This value minus one is the Open performative channel-max setting. + */ + uint32_t max_sessions; + + /** + * The max_session_window value is the maximum number of outstanding octets that are + * allowed to be in flight on a session. This value is used to calculate the number of + * outstanding transfers that are allowed by the formula: + * incoming_window = max_session_window / max_frame_size + * If max_session_window=1000000 and max_frame_size=32768 then 30 transfers may + * be outstanding before session flow control begins. + */ + uint32_t max_session_window; + /** * The idle timeout, in seconds. If the peer sends no data frames in this many seconds, the * connection will be automatically closed. diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index a3200689c4..5b2c347528 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -612,6 +612,19 @@ "description": "Defaults to 16384. If specified, it is the maximum frame size in octets that will be used in the connection-open negotiation with a connected peer. The frame size is the largest contiguous set of uninterrupted data that can be sent for a message delivery over the connection. Interleaving of messages on different links is done at frame granularity.", "create": true }, + "maxSessions": { + "type": "integer", + "default": 32768, + "description": "Defaults to 32768. The maximum number of sessions that can be simultaneously active on the connection. Setting this value to zero selects the default number of sessions.", + "create": true + }, + "maxSessionWindow": { + "type": "integer", + "description": "Incoming window measured in octets for sessions created on this connection. The AMQP negotiated session incoming window, measured in transfers, is calculated to be (maxSessionWindow / maxFrameSize). Setting this value to zero selects the default session window size.", + "default": 1000000, + "required": false, + "create": true + }, "idleTimeoutSeconds": { "type": "integer", "default": 16, @@ -726,6 +739,19 @@ "description": "Maximum frame size in octets that will be used in the connection-open negotiation with a connected peer. The frame size is the largest contiguous set of uninterrupted data that can be sent for a message delivery over the connection. Interleaving of messages on different links is done at frame granularity.", "create": true }, + "maxSessions": { + "type": "integer", + "default": 32768, + "description": "Defaults to 32768. The maximum number of sessions that can be simultaneously active on the connection. Setting this value to zero selects the default number of sessions.", + "create": true + }, + "maxSessionWindow": { + "type": "integer", + "description": "Incoming window measured in octets for sessions created on this connection. The AMQP negotiated session incoming window, measured in transfers, is calculated to be (maxSessionWindow / maxFrameSize). Setting this value to zero selects the default session window size.", + "default": 1000000, + "required": false, + "create": true + }, "idleTimeoutSeconds": { "type": "integer", "default": 16, diff --git a/src/connection_manager.c b/src/connection_manager.c index a779fd6fb5..96b85f1977 100644 --- a/src/connection_manager.c +++ b/src/connection_manager.c @@ -183,6 +183,8 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf config->inter_router_cost = qd_entity_opt_long(entity, "cost", 1); CHECK(); config->protocol_family = qd_entity_opt_string(entity, "protocolFamily", 0); CHECK(); config->max_frame_size = qd_entity_get_long(entity, "maxFrameSize"); CHECK(); + config->max_sessions = qd_entity_get_long(entity, "maxSessions"); CHECK(); + config->max_session_window = qd_entity_get_long(entity, "maxSessionWindow"); CHECK(); config->idle_timeout_seconds = qd_entity_get_long(entity, "idleTimeoutSeconds"); CHECK(); config->sasl_username = qd_entity_opt_string(entity, "saslUsername", 0); CHECK(); config->sasl_password = qd_entity_opt_string(entity, "saslPassword", 0); CHECK(); @@ -192,11 +194,17 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf set_config_host(config, entity); // - // Handle the defaults for link capacity. + // Handle the defaults for various settings // if (config->link_capacity == 0) config->link_capacity = 250; + if (config->max_sessions == 0) + config->max_sessions = 32768; + + if (config->max_session_window == 0) + config->max_session_window = 1000000; + // // For now we are hardwiring this attribute to true. If there's an outcry from the // user community, we can revisit this later. diff --git a/src/container.c b/src/container.c index 5080258dab..f9cc64bbe1 100644 --- a/src/container.c +++ b/src/container.c @@ -763,9 +763,11 @@ qd_lifetime_policy_t qd_container_node_get_life_policy(const qd_node_t *node) qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, const char* name) { qd_link_t *link = new_qd_link_t(); + const qd_server_config_t * cf = qd_connection_config(conn); + assert(cf); link->pn_sess = pn_session(qd_connection_pn(conn)); - pn_session_set_incoming_capacity(link->pn_sess, 1000000); + pn_session_set_incoming_capacity(link->pn_sess, cf->max_session_window); if (dir == QD_OUTGOING) link->pn_link = pn_sender(link->pn_sess, name); diff --git a/src/policy.c b/src/policy.c index a347c12d35..a90a796438 100644 --- a/src/policy.c +++ b/src/policy.c @@ -433,11 +433,15 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn) // void qd_policy_apply_session_settings(pn_session_t *ssn, qd_connection_t *qd_conn) { + size_t capacity; if (qd_conn->policy_settings && qd_conn->policy_settings->maxSessionWindow) { - pn_session_set_incoming_capacity(ssn, qd_conn->policy_settings->maxSessionWindow); + capacity = qd_conn->policy_settings->maxSessionWindow; } else { - pn_session_set_incoming_capacity(ssn, 1000000); + const qd_server_config_t * cf = qd_connection_config(qd_conn); + assert(cf); + capacity = cf->max_session_window; } + pn_session_set_incoming_capacity(ssn, capacity); } // diff --git a/src/server.c b/src/server.c index 47cbac2768..a55b0c2681 100644 --- a/src/server.c +++ b/src/server.c @@ -680,6 +680,7 @@ static void thread_process_listeners_LH(qd_server_t *qd_server) // pn_transport_set_server(tport); pn_transport_set_max_frame(tport, config->max_frame_size); + pn_transport_set_channel_max(tport, config->max_sessions - 1); pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000); // @@ -1247,6 +1248,7 @@ static void cxtr_try_open(void *context) // Configure the transport // pn_transport_set_max_frame(tport, config->max_frame_size); + pn_transport_set_channel_max(tport, config->max_sessions - 1); pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000); // From c248a34f3d2da46bb7e7e67ccbd04e2340b69f15 Mon Sep 17 00:00:00 2001 From: Chuck Rolke Date: Mon, 3 Oct 2016 14:37:40 -0400 Subject: [PATCH 2/5] DISPATCH-451: Describe policy setting priority over listener/connector setting --- python/qpid_dispatch/management/qdrouter.json | 17 ++++++++--------- .../qdrouter.policyRuleset.settings.txt | 12 ++++++------ 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index 5b2c347528..81131406a0 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -609,18 +609,18 @@ "maxFrameSize": { "type": "integer", "default": 16384, - "description": "Defaults to 16384. If specified, it is the maximum frame size in octets that will be used in the connection-open negotiation with a connected peer. The frame size is the largest contiguous set of uninterrupted data that can be sent for a message delivery over the connection. Interleaving of messages on different links is done at frame granularity.", + "description": "Defaults to 16384. If specified, it is the maximum frame size in octets that will be used in the connection-open negotiation with a connected peer. The frame size is the largest contiguous set of uninterrupted data that can be sent for a message delivery over the connection. Interleaving of messages on different links is done at frame granularity. Policy settings, if specified, will overwrite this value.", "create": true }, "maxSessions": { "type": "integer", "default": 32768, - "description": "Defaults to 32768. The maximum number of sessions that can be simultaneously active on the connection. Setting this value to zero selects the default number of sessions.", + "description": "Defaults to 32768. The maximum number of sessions that can be simultaneously active on the connection. Setting this value to zero selects the default number of sessions. Policy settings, if specified, will overwrite this value.", "create": true }, "maxSessionWindow": { "type": "integer", - "description": "Incoming window measured in octets for sessions created on this connection. The AMQP negotiated session incoming window, measured in transfers, is calculated to be (maxSessionWindow / maxFrameSize). Setting this value to zero selects the default session window size.", + "description": "Incoming window measured in octets for sessions created on this connection. The AMQP negotiated session incoming window, measured in transfers, is calculated to be (maxSessionWindow / maxFrameSize). Setting this value to zero selects the default session window size. Policy settings, if specified, will overwrite this value.", "default": 1000000, "required": false, "create": true @@ -713,14 +713,13 @@ "required": false, "create": true, "description": "For the 'inter-router' role only. This value assigns a cost metric to the inter-router connection. The default (and minimum) value is one. Higher values represent higher costs. The cost is used to influence the routing algorithm as it attempts to use the path with the lowest total cost from ingress to egress." - }, - + }, "sslProfile": { "type": "string", "required": false, "description": "Name of the sslProfile.", "create": true - }, + }, "saslMechanisms": { "type": "string", "required": false, @@ -736,18 +735,18 @@ "maxFrameSize": { "type": "integer", "default": 65536, - "description": "Maximum frame size in octets that will be used in the connection-open negotiation with a connected peer. The frame size is the largest contiguous set of uninterrupted data that can be sent for a message delivery over the connection. Interleaving of messages on different links is done at frame granularity.", + "description": "Maximum frame size in octets that will be used in the connection-open negotiation with a connected peer. The frame size is the largest contiguous set of uninterrupted data that can be sent for a message delivery over the connection. Interleaving of messages on different links is done at frame granularity. Policy settings, if specified, will overwrite this value.", "create": true }, "maxSessions": { "type": "integer", "default": 32768, - "description": "Defaults to 32768. The maximum number of sessions that can be simultaneously active on the connection. Setting this value to zero selects the default number of sessions.", + "description": "Defaults to 32768. The maximum number of sessions that can be simultaneously active on the connection. Setting this value to zero selects the default number of sessions. Policy settings, if specified, will overwrite this value.", "create": true }, "maxSessionWindow": { "type": "integer", - "description": "Incoming window measured in octets for sessions created on this connection. The AMQP negotiated session incoming window, measured in transfers, is calculated to be (maxSessionWindow / maxFrameSize). Setting this value to zero selects the default session window size.", + "description": "Incoming window measured in octets for sessions created on this connection. The AMQP negotiated session incoming window, measured in transfers, is calculated to be (maxSessionWindow / maxFrameSize). Setting this value to zero selects the default session window size. Policy settings, if specified, will overwrite this value.", "default": 1000000, "required": false, "create": true diff --git a/python/qpid_dispatch/management/qdrouter.policyRuleset.settings.txt b/python/qpid_dispatch/management/qdrouter.policyRuleset.settings.txt index 798e146627..db85f477a0 100644 --- a/python/qpid_dispatch/management/qdrouter.policyRuleset.settings.txt +++ b/python/qpid_dispatch/management/qdrouter.policyRuleset.settings.txt @@ -41,29 +41,29 @@ Until the schema is extended specify embedded maps this document describes the v }, "maxFrameSize": { "type": "integer", - "description": "Largest frame that may be sent on this connection. Zero implies system default. (AMQP Open, max-frame-size)", + "description": "Largest frame that may be sent on this connection. Zero implies system default. Policy setting overwrites values specified for a listener or connector. (AMQP Open, max-frame-size)", "default": 65536, "required": false, "create": true }, "maxMessageSize": { "type": "integer", - "description": "Largest message size supported by links created on this connection. Zero implies system default. (AMQP Attach, max-message-size)", + "description": "Largest message size supported by links created on this connection. Zero implies system default. Policy setting overwrites values specified for a listener or connector. (AMQP Attach, max-message-size)", "default": 0, "required": false, "create": true }, "maxSessionWindow": { "type": "integer", - "description": "Largest incoming and outgoing window for sessions created on this connection. Zero implies system default. (AMQP Begin, incoming-window, outgoing-window)", - "default": 2147483647, + "description": "Largest incoming window in octets for sessions created on this connection. Zero implies system default. Policy setting overwrites values specified for a listener or connector. (AMQP Begin, incoming-window)", + "default": 1000000, "required": false, "create": true }, "maxSessions": { "type": "integer", - "description": "Maximum number of sessions that may be created on this connection. Zero implies system default. (AMQP Open, channel-max)", - "default": 2147483647, + "description": "Maximum number of sessions that may be created on this connection. Zero implies system default. Policy setting overwrites values specified for a listener or connector. (AMQP Open, channel-max)", + "default": 32768, "required": false, "create": true }, From f1dbfbf6dd73b4068440f235be38906944c35b59 Mon Sep 17 00:00:00 2001 From: Chuck Rolke Date: Fri, 7 Oct 2016 17:23:31 -0400 Subject: [PATCH 3/5] DISPATCH-451: Incorporate review comments; add self tests --- include/qpid/dispatch/amqp.h | 7 + include/qpid/dispatch/server.h | 13 +- python/qpid_dispatch/management/qdrouter.json | 22 +- .../qdrouter.policyRuleset.settings.txt | 4 +- src/connection_manager.c | 32 +- src/container.c | 3 +- src/policy.c | 3 +- tests/CMakeLists.txt | 1 + tests/system_tests_protocol_settings.py | 316 ++++++++++++++++++ 9 files changed, 372 insertions(+), 29 deletions(-) create mode 100644 tests/system_tests_protocol_settings.py diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h index 3f9c7780b8..bda13006bb 100644 --- a/include/qpid/dispatch/amqp.h +++ b/include/qpid/dispatch/amqp.h @@ -29,6 +29,13 @@ /// @{ +/** + * AMQP Constants + */ +typedef enum { + QD_AMQP_MIN_MAX_FRAME_SIZE = 512 +} qd_amqp_constants_t; + /** * AMQP Performative Tags */ diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h index 6e6f5032be..de20385636 100644 --- a/include/qpid/dispatch/server.h +++ b/include/qpid/dispatch/server.h @@ -419,19 +419,16 @@ typedef struct qd_server_config_t { /** * The max_sessions value is the number of sessions allowed on the Connection. - * This value minus one is the Open performative channel-max setting. */ uint32_t max_sessions; /** - * The max_session_window value is the maximum number of outstanding octets that are - * allowed to be in flight on a session. This value is used to calculate the number of - * outstanding transfers that are allowed by the formula: - * incoming_window = max_session_window / max_frame_size - * If max_session_window=1000000 and max_frame_size=32768 then 30 transfers may - * be outstanding before session flow control begins. + * The incoming capacity value is calculated to be (sessionMaxFrames * maxFrameSize). + * In a round about way the calculation forces the AMQP Begin/incoming-capacity value + * to equal the specified sessionMaxFrames value measured in units of transfer frames. + * This calculation is done to satisfy proton pn_session_set_incoming_capacity(). */ - uint32_t max_session_window; + uint32_t incoming_capacity; /** * The idle timeout, in seconds. If the peer sends no data frames in this many seconds, the diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index 81131406a0..c57a3c1690 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -609,19 +609,19 @@ "maxFrameSize": { "type": "integer", "default": 16384, - "description": "Defaults to 16384. If specified, it is the maximum frame size in octets that will be used in the connection-open negotiation with a connected peer. The frame size is the largest contiguous set of uninterrupted data that can be sent for a message delivery over the connection. Interleaving of messages on different links is done at frame granularity. Policy settings, if specified, will overwrite this value.", + "description": "The maximum frame size in octets that will be used in the connection-open negotiation with a connected peer. The frame size is the largest contiguous set of uninterrupted data that can be sent for a message delivery over the connection. Interleaving of messages on different links is done at frame granularity. Policy settings, if specified, will overwrite this value. Defaults to 16384.", "create": true }, "maxSessions": { "type": "integer", "default": 32768, - "description": "Defaults to 32768. The maximum number of sessions that can be simultaneously active on the connection. Setting this value to zero selects the default number of sessions. Policy settings, if specified, will overwrite this value.", + "description": "The maximum number of sessions that can be simultaneously active on the connection. Setting this value to zero selects the default number of sessions. Policy settings, if specified, will overwrite this value. Defaults to 32768.", "create": true }, - "maxSessionWindow": { + "maxSessionFrames": { "type": "integer", - "description": "Incoming window measured in octets for sessions created on this connection. The AMQP negotiated session incoming window, measured in transfers, is calculated to be (maxSessionWindow / maxFrameSize). Setting this value to zero selects the default session window size. Policy settings, if specified, will overwrite this value.", - "default": 1000000, + "description": "Session incoming window measured in transfer frames for sessions created on this connection. This is the number of transfer frames that may simultaneously be in flight for all links in the session. Setting this value to zero selects the default session window size. Policy settings, if specified, will overwrite this value. Defaults to 100.", + "default": 100, "required": false, "create": true }, @@ -734,20 +734,20 @@ }, "maxFrameSize": { "type": "integer", - "default": 65536, - "description": "Maximum frame size in octets that will be used in the connection-open negotiation with a connected peer. The frame size is the largest contiguous set of uninterrupted data that can be sent for a message delivery over the connection. Interleaving of messages on different links is done at frame granularity. Policy settings, if specified, will overwrite this value.", + "default": 16384, + "description": "The maximum frame size in octets that will be used in the connection-open negotiation with a connected peer. The frame size is the largest contiguous set of uninterrupted data that can be sent for a message delivery over the connection. Interleaving of messages on different links is done at frame granularity. Policy settings will not overwrite this value. Defaults to 16384.", "create": true }, "maxSessions": { "type": "integer", "default": 32768, - "description": "Defaults to 32768. The maximum number of sessions that can be simultaneously active on the connection. Setting this value to zero selects the default number of sessions. Policy settings, if specified, will overwrite this value.", + "description": "The maximum number of sessions that can be simultaneously active on the connection. Setting this value to zero selects the default number of sessions. Policy settings will not overwrite this value. Defaults to 32768.", "create": true }, - "maxSessionWindow": { + "maxSessionFrames": { "type": "integer", - "description": "Incoming window measured in octets for sessions created on this connection. The AMQP negotiated session incoming window, measured in transfers, is calculated to be (maxSessionWindow / maxFrameSize). Setting this value to zero selects the default session window size. Policy settings, if specified, will overwrite this value.", - "default": 1000000, + "description": "Session incoming window measured in transfer frames for sessions created on this connection. This is the number of transfer frames that may simultaneously be in flight for all links in the session. Setting this value to zero selects the default session window size. Policy settings will not overwrite this value. Defaults to 100.", + "default": 100, "required": false, "create": true }, diff --git a/python/qpid_dispatch/management/qdrouter.policyRuleset.settings.txt b/python/qpid_dispatch/management/qdrouter.policyRuleset.settings.txt index db85f477a0..fe2a6f2317 100644 --- a/python/qpid_dispatch/management/qdrouter.policyRuleset.settings.txt +++ b/python/qpid_dispatch/management/qdrouter.policyRuleset.settings.txt @@ -42,7 +42,7 @@ Until the schema is extended specify embedded maps this document describes the v "maxFrameSize": { "type": "integer", "description": "Largest frame that may be sent on this connection. Zero implies system default. Policy setting overwrites values specified for a listener or connector. (AMQP Open, max-frame-size)", - "default": 65536, + "default": 16384, "required": false, "create": true }, @@ -56,7 +56,7 @@ Until the schema is extended specify embedded maps this document describes the v "maxSessionWindow": { "type": "integer", "description": "Largest incoming window in octets for sessions created on this connection. Zero implies system default. Policy setting overwrites values specified for a listener or connector. (AMQP Begin, incoming-window)", - "default": 1000000, + "default": 1638400, "required": false, "create": true }, diff --git a/src/connection_manager.c b/src/connection_manager.c index 96b85f1977..2c9d5498f6 100644 --- a/src/connection_manager.c +++ b/src/connection_manager.c @@ -184,7 +184,7 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf config->protocol_family = qd_entity_opt_string(entity, "protocolFamily", 0); CHECK(); config->max_frame_size = qd_entity_get_long(entity, "maxFrameSize"); CHECK(); config->max_sessions = qd_entity_get_long(entity, "maxSessions"); CHECK(); - config->max_session_window = qd_entity_get_long(entity, "maxSessionWindow"); CHECK(); + uint64_t ssn_frames = qd_entity_get_long(entity, "maxSessionFrames"); CHECK(); config->idle_timeout_seconds = qd_entity_get_long(entity, "idleTimeoutSeconds"); CHECK(); config->sasl_username = qd_entity_opt_string(entity, "saslUsername", 0); CHECK(); config->sasl_password = qd_entity_opt_string(entity, "saslPassword", 0); CHECK(); @@ -199,11 +199,35 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf if (config->link_capacity == 0) config->link_capacity = 250; - if (config->max_sessions == 0) + if (config->max_sessions == 0 || config->max_sessions > 32768) + // Proton disallows > 32768 config->max_sessions = 32768; - if (config->max_session_window == 0) - config->max_session_window = 1000000; + if (config->max_frame_size < QD_AMQP_MIN_MAX_FRAME_SIZE) + // Silently promote the minimum max-frame-size + // Proton will do this but the number is needed for the + // incoming capacity calculation. + config->max_frame_size = QD_AMQP_MIN_MAX_FRAME_SIZE; + + // + // Given session frame count and max frame size compute session incoming_capacity + // Limit total capacity to 2^31-1. + // + uint64_t mfs = (uint64_t)config->max_frame_size; + uint64_t trial_ic = ssn_frames * mfs; + uint64_t limit = (1ll << 31) - 1; + if (trial_ic < limit) { + // Silently promote incoming capacity of zero to one + config->incoming_capacity = + (trial_ic < QD_AMQP_MIN_MAX_FRAME_SIZE ? QD_AMQP_MIN_MAX_FRAME_SIZE : trial_ic); + } else { + config->incoming_capacity = limit; + uint64_t computed_ssn_frames = limit / mfs; + qd_log(qd->connection_manager->log_source, QD_LOG_WARNING, + "Server configuation for I/O adapter entity name:'%s', host:'%s', port:'%s', " + "requested maxSessionFrames truncated from %llu to %llu", + config->name, config->host, config->port, ssn_frames, computed_ssn_frames); + } // // For now we are hardwiring this attribute to true. If there's an outcry from the diff --git a/src/container.c b/src/container.c index f9cc64bbe1..e3f181a8b9 100644 --- a/src/container.c +++ b/src/container.c @@ -764,10 +764,9 @@ qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, c { qd_link_t *link = new_qd_link_t(); const qd_server_config_t * cf = qd_connection_config(conn); - assert(cf); link->pn_sess = pn_session(qd_connection_pn(conn)); - pn_session_set_incoming_capacity(link->pn_sess, cf->max_session_window); + pn_session_set_incoming_capacity(link->pn_sess, cf->incoming_capacity); if (dir == QD_OUTGOING) link->pn_link = pn_sender(link->pn_sess, name); diff --git a/src/policy.c b/src/policy.c index a90a796438..ad6bb1f928 100644 --- a/src/policy.c +++ b/src/policy.c @@ -438,8 +438,7 @@ void qd_policy_apply_session_settings(pn_session_t *ssn, qd_connection_t *qd_con capacity = qd_conn->policy_settings->maxSessionWindow; } else { const qd_server_config_t * cf = qd_connection_config(qd_conn); - assert(cf); - capacity = cf->max_session_window; + capacity = cf->incoming_capacity; } pn_session_set_incoming_capacity(ssn, capacity); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index eb04e82e26..6bc0e748fa 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -78,6 +78,7 @@ foreach(py_test_module system_tests_one_router system_tests_policy system_tests_protocol_family + system_tests_protocol_settings system_tests_qdmanage system_tests_qdstat system_tests_sasl_plain diff --git a/tests/system_tests_protocol_settings.py b/tests/system_tests_protocol_settings.py new file mode 100644 index 0000000000..79f2f035df --- /dev/null +++ b/tests/system_tests_protocol_settings.py @@ -0,0 +1,316 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest +from proton import Message, Delivery, PENDING, ACCEPTED, REJECTED +from system_test import TestCase, Qdrouterd, main_module +from proton.handlers import MessagingHandler +from proton.reactor import Container, AtMostOnce, AtLeastOnce +from proton.utils import BlockingConnection, SyncRequestResponse +from qpid_dispatch.management.client import Node +from proton import ConnectionException + +class MaxFrameMaxSessionFramesTest(TestCase): + """System tests setting proton negotiated size max-frame-size and incoming-window""" + @classmethod + def setUpClass(cls): + '''Start a router''' + super(MaxFrameMaxSessionFramesTest, cls).setUpClass() + name = "MaxFrameMaxSessionFrames" + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'QDR'}), + + ('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port(), 'maxFrameSize': '2048', 'maxSessionFrames': '10'}), + ]) + cls.router = cls.tester.qdrouterd(name, config) + cls.router.wait_ready() + cls.address = cls.router.addresses[0] + + def test_max_frame_max_session_frames__max_sessions_default(self): + # Set up a connection to get the Open and a receiver to get a Begin frame in the log + bc = BlockingConnection(self.router.addresses[0]) + bc.create_receiver("xxx") + bc.close() + + with open('../setUpClass/MaxFrameMaxSessionFrames.log', 'r') as router_log: + log_lines = router_log.read().split("\n") + open_lines = [s for s in log_lines if "-> @open" in s] + # max-frame is from the config + self.assertTrue(' max-frame-size=2048,' in open_lines[0]) + # channel-max is default + self.assertTrue(" channel-max=32767" in open_lines[0]) + begin_lines = [s for s in log_lines if "-> @begin" in s] + # incoming-window is from the config + self.assertTrue(" incoming-window=10," in begin_lines[0] ) + + +class MaxSessionsTest(TestCase): + """System tests setting proton channel-max""" + @classmethod + def setUpClass(cls): + """Start a router and a messenger""" + super(MaxSessionsTest, cls).setUpClass() + name = "MaxSessions" + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'QDR'}), + + ('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port(), 'maxSessions': '10'}), + ]) + cls.router = cls.tester.qdrouterd(name, config) + cls.router.wait_ready() + cls.address = cls.router.addresses[0] + + def test_max_sessions(self): + # Set up a connection to get the Open and a receiver to get a Begin frame in the log + bc = BlockingConnection(self.router.addresses[0]) + bc.create_receiver("xxx") + bc.close() + + with open('../setUpClass/MaxSessions.log', 'r') as router_log: + log_lines = router_log.read().split("\n") + open_lines = [s for s in log_lines if "-> @open" in s] + # channel-max is 10 + self.assertTrue(" channel-max=9" in open_lines[0]) + + +class MaxSessionsZeroTest(TestCase): + """System tests setting proton channel-max""" + @classmethod + def setUpClass(cls): + """Start a router and a messenger""" + super(MaxSessionsZeroTest, cls).setUpClass() + name = "MaxSessionsZero" + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'QDR'}), + + ('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port(), 'maxSessions': '0'}), + ]) + cls.router = cls.tester.qdrouterd(name, config) + cls.router.wait_ready() + cls.address = cls.router.addresses[0] + + def test_max_sessions_zero(self): + # Set up a connection to get the Open and a receiver to get a Begin frame in the log + bc = BlockingConnection(self.router.addresses[0]) + bc.create_receiver("xxx") + bc.close() + + with open('../setUpClass/MaxSessionsZero.log', 'r') as router_log: + log_lines = router_log.read().split("\n") + open_lines = [s for s in log_lines if "-> @open" in s] + # channel-max is 0. Should get proton default 32767 + self.assertTrue(" channel-max=32767" in open_lines[0]) + + +class MaxSessionsLargeTest(TestCase): + """System tests setting proton channel-max""" + @classmethod + def setUpClass(cls): + """Start a router and a messenger""" + super(MaxSessionsLargeTest, cls).setUpClass() + name = "MaxSessionsLarge" + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'QDR'}), + + ('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port(), 'maxSessions': '500000'}), + ]) + cls.router = cls.tester.qdrouterd(name, config) + cls.router.wait_ready() + cls.address = cls.router.addresses[0] + + def test_max_sessions_large(self): + # Set up a connection to get the Open and a receiver to get a Begin frame in the log + bc = BlockingConnection(self.router.addresses[0]) + bc.create_receiver("xxx") + bc.close() + + with open('../setUpClass/MaxSessionsLarge.log', 'r') as router_log: + log_lines = router_log.read().split("\n") + open_lines = [s for s in log_lines if "-> @open" in s] + # channel-max is 0. Should get proton default 32767 + self.assertTrue(" channel-max=32767" in open_lines[0]) + + +class MaxFrameSmallTest(TestCase): + """System tests setting proton max-frame-size""" + @classmethod + def setUpClass(cls): + """Start a router and a messenger""" + super(MaxFrameSmallTest, cls).setUpClass() + name = "MaxFrameSmall" + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'QDR'}), + + ('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port(), 'maxFrameSize': '2'}), + ]) + cls.router = cls.tester.qdrouterd(name, config) + cls.router.wait_ready() + cls.address = cls.router.addresses[0] + + def test_max_frame_small(self): + # Set up a connection to get the Open and a receiver to get a Begin frame in the log + bc = BlockingConnection(self.router.addresses[0]) + bc.create_receiver("xxx") + bc.close() + + with open('../setUpClass/MaxFrameSmall.log', 'r') as router_log: + log_lines = router_log.read().split("\n") + open_lines = [s for s in log_lines if "-> @open" in s] + # if frame size <= 512 proton set min of 512 + self.assertTrue(" max-frame-size=512" in open_lines[0]) + + +class MaxFrameDefaultTest(TestCase): + """System tests setting proton max-frame-size""" + @classmethod + def setUpClass(cls): + """Start a router and a messenger""" + super(MaxFrameDefaultTest, cls).setUpClass() + name = "MaxFrameDefault" + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'QDR'}), + + ('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port()}), + ]) + cls.router = cls.tester.qdrouterd(name, config) + cls.router.wait_ready() + cls.address = cls.router.addresses[0] + + def test_max_frame_default(self): + # Set up a connection to get the Open and a receiver to get a Begin frame in the log + bc = BlockingConnection(self.router.addresses[0]) + bc.create_receiver("xxx") + bc.close() + + with open('../setUpClass/MaxFrameDefault.log', 'r') as router_log: + log_lines = router_log.read().split("\n") + open_lines = [s for s in log_lines if "-> @open" in s] + # if frame size not set then a default is used + self.assertTrue(" max-frame-size=16384" in open_lines[0]) + + +class MaxSessionFramesDefaultTest(TestCase): + """System tests setting proton max-frame-size""" + @classmethod + def setUpClass(cls): + """Start a router and a messenger""" + super(MaxSessionFramesDefaultTest, cls).setUpClass() + name = "MaxSessionFramesDefault" + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'QDR'}), + + ('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port()}), + ]) + cls.router = cls.tester.qdrouterd(name, config) + cls.router.wait_ready() + cls.address = cls.router.addresses[0] + + def test_max_session_frames_default(self): + # Set up a connection to get the Open and a receiver to get a Begin frame in the log + bc = BlockingConnection(self.router.addresses[0]) + bc.create_receiver("xxx") + bc.close() + + with open('../setUpClass/MaxSessionFramesDefault.log', 'r') as router_log: + log_lines = router_log.read().split("\n") + open_lines = [s for s in log_lines if "-> @open" in s] + # if frame size not set then a default is used + self.assertTrue(" max-frame-size=16384" in open_lines[0]) + begin_lines = [s for s in log_lines if "-> @begin" in s] + # incoming-window is from the config + self.assertTrue(" incoming-window=100," in begin_lines[0]) + + +class MaxFrameMaxSessionFramesTooBigTest(TestCase): + """ + System tests setting proton negotiated size max-frame-size and incoming-window + when the product of the two is > 2^31-1. There must be a warning and the incoming + window will be reduced to 2^31-1 / max-frame-size + """ + @classmethod + def setUpClass(cls): + '''Start a router''' + super(MaxFrameMaxSessionFramesTooBigTest, cls).setUpClass() + name = "MaxFrameMaxSessionFramesTooBig" + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'QDR'}), + + ('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port(), 'maxFrameSize': '1000000', 'maxSessionFrames': '5000000'}), + ]) + cls.router = cls.tester.qdrouterd(name, config) + cls.router.wait_ready() + cls.address = cls.router.addresses[0] + + def test_max_frame_max_session_too_big(self): + # Set up a connection to get the Open and a receiver to get a Begin frame in the log + bc = BlockingConnection(self.router.addresses[0]) + bc.create_receiver("xxx") + bc.close() + + with open('../setUpClass/MaxFrameMaxSessionFramesTooBig.log', 'r') as router_log: + log_lines = router_log.read().split("\n") + open_lines = [s for s in log_lines if "-> @open" in s] + # max-frame is from the config + self.assertTrue(' max-frame-size=1000000,' in open_lines[0]) + begin_lines = [s for s in log_lines if "-> @begin" in s] + # incoming-window is truncated + self.assertTrue(" incoming-window=2147," in begin_lines[0]) + warning_lines = [s for s in log_lines if "(warning)" in s] + self.assertTrue(len(warning_lines) == 1) + self.assertTrue("requested maxSessionFrames truncated from 5000000 to 2147" in warning_lines[0]) + + +class MaxFrameMaxSessionFramesZeroTest(TestCase): + """ + System tests setting proton negotiated size max-frame-size and incoming-window + when they are both zero. Frame size is bumped up to the minimum and capacity is + bumped up to have an incoming window of 1 + """ + @classmethod + def setUpClass(cls): + '''Start a router''' + super(MaxFrameMaxSessionFramesZeroTest, cls).setUpClass() + name = "MaxFrameMaxSessionFramesZero" + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'QDR'}), + + ('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port(), 'maxFrameSize': '0', 'maxSessionFrames': '0'}), + ]) + cls.router = cls.tester.qdrouterd(name, config) + cls.router.wait_ready() + cls.address = cls.router.addresses[0] + + def test_max_frame_max_session_zero(self): + # Set up a connection to get the Open and a receiver to get a Begin frame in the log + bc = BlockingConnection(self.router.addresses[0]) + bc.create_receiver("xxx") + bc.close() + + with open('../setUpClass/MaxFrameMaxSessionFramesZero.log', 'r') as router_log: + log_lines = router_log.read().split("\n") + open_lines = [s for s in log_lines if "-> @open" in s] + # max-frame gets set to protocol min + self.assertTrue(' max-frame-size=512,' in open_lines[0]) + begin_lines = [s for s in log_lines if "-> @begin" in s] + # incoming-window is promoted to 1 + self.assertTrue(" incoming-window=1," in begin_lines[0]) + + +if __name__ == '__main__': + unittest.main(main_module()) From d5e02b8d7084343fe47873cba4e8bfba66dd1b37 Mon Sep 17 00:00:00 2001 From: Chuck Rolke Date: Mon, 10 Oct 2016 11:13:44 -0400 Subject: [PATCH 4/5] DISPATCH-451: Test connector config settings; Fix setting descriptions The logic behind the config settings for listeners and connectors is the same. Add tests to verify that connector settings are propagated over the wire. Clarify how maxSessionFrames may be adjusted when the product of maxFrameSize and maxSessionFrames exceeds 2^31-1. --- python/qpid_dispatch/management/qdrouter.json | 4 +- tests/system_tests_protocol_settings.py | 109 ++++++++++++++++++ 2 files changed, 111 insertions(+), 2 deletions(-) diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index c57a3c1690..a748dbfe24 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -620,7 +620,7 @@ }, "maxSessionFrames": { "type": "integer", - "description": "Session incoming window measured in transfer frames for sessions created on this connection. This is the number of transfer frames that may simultaneously be in flight for all links in the session. Setting this value to zero selects the default session window size. Policy settings, if specified, will overwrite this value. Defaults to 100.", + "description": "Session incoming window measured in transfer frames for sessions created on this connection. This is the number of transfer frames that may simultaneously be in flight for all links in the session. Setting this value to zero selects the default session window size. Policy settings, if specified, will overwrite this value. The numerical product of maxFrameSize and maxSessionFrames may not exceed 2^31-1. If (maxFrameSize x maxSessionFrames) exceeds 2^31-1 then maxSessionFrames is reduced to (2^31-1 / maxFrameSize). maxSessionFrames has a minimum value of 1. Defaults to 100.", "default": 100, "required": false, "create": true @@ -746,7 +746,7 @@ }, "maxSessionFrames": { "type": "integer", - "description": "Session incoming window measured in transfer frames for sessions created on this connection. This is the number of transfer frames that may simultaneously be in flight for all links in the session. Setting this value to zero selects the default session window size. Policy settings will not overwrite this value. Defaults to 100.", + "description": "Session incoming window measured in transfer frames for sessions created on this connection. This is the number of transfer frames that may simultaneously be in flight for all links in the session. Setting this value to zero selects the default session window size. Policy settings will not overwrite this value. The numerical product of maxFrameSize and maxSessionFrames may not exceed 2^31-1. If (maxFrameSize x maxSessionFrames) exceeds 2^31-1 then maxSessionFrames is reduced to (2^31-1 / maxFrameSize). maxSessionFrames has a minimum value of 1. Defaults to 100.", "default": 100, "required": false, "create": true diff --git a/tests/system_tests_protocol_settings.py b/tests/system_tests_protocol_settings.py index 79f2f035df..233b9f3a9d 100644 --- a/tests/system_tests_protocol_settings.py +++ b/tests/system_tests_protocol_settings.py @@ -312,5 +312,114 @@ def test_max_frame_max_session_zero(self): self.assertTrue(" incoming-window=1," in begin_lines[0]) +class ConnectorSettingsDefaultTest(TestCase): + """ + The internal logic for protocol settings in listener and connector + is common code. This test makes sure that defaults in the connector + config make it to the wire. + """ + inter_router_port = None + + @staticmethod + def ssl_config(client_server, connection): + return [] # Over-ridden by RouterTestSsl + + @classmethod + def setUpClass(cls): + """Start two routers""" + super(ConnectorSettingsDefaultTest, cls).setUpClass() + + def router(name, client_server, connection): + config = cls.ssl_config(client_server, connection) + [ + ('router', {'mode': 'interior', 'id': 'QDR.%s' % name}), + + ('listener', {'port': cls.tester.get_port()}), + connection + ] + + config = Qdrouterd.Config(config) + + cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) + + cls.routers = [] + + inter_router_port = cls.tester.get_port() + + router('A', 'server', + ('listener', {'role': 'inter-router', 'port': inter_router_port})) + router('B', 'client', + ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port, + 'verifyHostName': 'no'})) + + cls.routers[0].wait_router_connected('QDR.B') + cls.routers[1].wait_router_connected('QDR.A') + + def test_connector_default(self): + with open('../setUpClass/A.log', 'r') as router_log: + log_lines = router_log.read().split("\n") + open_lines = [s for s in log_lines if "<- @open" in s] + # defaults + self.assertTrue(' max-frame-size=16384,' in open_lines[0]) + self.assertTrue(' channel-max=32767,' in open_lines[0]) + begin_lines = [s for s in log_lines if "<- @begin" in s] + # defaults + self.assertTrue(" incoming-window=100," in begin_lines[0]) + + +class ConnectorSettingsNondefaultTest(TestCase): + """ + The internal logic for protocol settings in listener and connector + is common code. This test makes sure that settings in the connector + config make it to the wire. The listener tests test the setting logic. + """ + inter_router_port = None + + @staticmethod + def ssl_config(client_server, connection): + return [] # Over-ridden by RouterTestSsl + + @classmethod + def setUpClass(cls): + """Start two routers""" + super(ConnectorSettingsNondefaultTest, cls).setUpClass() + + def router(name, client_server, connection): + config = cls.ssl_config(client_server, connection) + [ + ('router', {'mode': 'interior', 'id': 'QDR.%s' % name}), + + ('listener', {'port': cls.tester.get_port()}), + connection + ] + + config = Qdrouterd.Config(config) + + cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) + + cls.routers = [] + + inter_router_port = cls.tester.get_port() + + router('A', 'server', + ('listener', {'role': 'inter-router', 'port': inter_router_port})) + router('B', 'client', + ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port, + 'maxFrameSize': '2048', 'maxSessionFrames': '10', 'maxSessions': '20', + 'verifyHostName': 'no'})) + + cls.routers[0].wait_router_connected('QDR.B') + cls.routers[1].wait_router_connected('QDR.A') + + def test_connector_default(self): + with open('../setUpClass/A.log', 'r') as router_log: + log_lines = router_log.read().split("\n") + open_lines = [s for s in log_lines if "<- @open" in s] + # nondefaults + self.assertTrue(' max-frame-size=2048,' in open_lines[0]) + self.assertTrue(' channel-max=19,' in open_lines[0]) + begin_lines = [s for s in log_lines if "<- @begin" in s] + # nondefaults + self.assertTrue(" incoming-window=10," in begin_lines[0]) + + if __name__ == '__main__': unittest.main(main_module()) From 2bab3f33e2fb7893fe40f3fc850cc0144a45708e Mon Sep 17 00:00:00 2001 From: Chuck Rolke Date: Mon, 10 Oct 2016 16:08:08 -0400 Subject: [PATCH 5/5] DISPATCH-451: Clarify policy vhost setting descriptions --- .../management/qdrouter.policyRuleset.settings.txt | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/qpid_dispatch/management/qdrouter.policyRuleset.settings.txt b/python/qpid_dispatch/management/qdrouter.policyRuleset.settings.txt index fe2a6f2317..da389ff794 100644 --- a/python/qpid_dispatch/management/qdrouter.policyRuleset.settings.txt +++ b/python/qpid_dispatch/management/qdrouter.policyRuleset.settings.txt @@ -41,42 +41,42 @@ Until the schema is extended specify embedded maps this document describes the v }, "maxFrameSize": { "type": "integer", - "description": "Largest frame that may be sent on this connection. Zero implies system default. Policy setting overwrites values specified for a listener or connector. (AMQP Open, max-frame-size)", + "description": "Largest frame that may be sent on this connection. Non-zero policy values overwrite values specified for a listener object. (AMQP Open, max-frame-size)", "default": 16384, "required": false, "create": true }, "maxMessageSize": { "type": "integer", - "description": "Largest message size supported by links created on this connection. Zero implies system default. Policy setting overwrites values specified for a listener or connector. (AMQP Attach, max-message-size)", + "description": "[NOT IMPLEMENTED] Largest message size supported by links created on this connection. Non-zero policy values overwrite values specified for a listener object. (AMQP Attach, max-message-size)", "default": 0, "required": false, "create": true }, "maxSessionWindow": { "type": "integer", - "description": "Largest incoming window in octets for sessions created on this connection. Zero implies system default. Policy setting overwrites values specified for a listener or connector. (AMQP Begin, incoming-window)", + "description": "Largest incoming window in octets for sessions created on this connection. Non-zero policy values overwrite values specified for a listener object. (AMQP Begin, incoming-window)", "default": 1638400, "required": false, "create": true }, "maxSessions": { "type": "integer", - "description": "Maximum number of sessions that may be created on this connection. Zero implies system default. Policy setting overwrites values specified for a listener or connector. (AMQP Open, channel-max)", + "description": "Maximum number of sessions that may be created on this connection. Non-zero policy values overwrite values specified for a listener object. (AMQP Open, channel-max)", "default": 32768, "required": false, "create": true }, "maxSenders": { "type": "integer", - "description": "Maximum number of sending links that may be created on this connection. Zero implies system default.", + "description": "Maximum number of sending links that may be created on this connection. Zero disables all sender links.", "default": 2147483647, "required": false, "create": true }, "maxReceivers": { "type": "integer", - "description": "Maximum number of receiving links that may be created on this connection. Zero implies system default.", + "description": "Maximum number of receiving links that may be created on this connection. Zero disables all receiver links.", "default": 2147483647, "required": false, "create": true