Skip to content

Commit

Permalink
Merge branch 'crolke-DISPATCH-451'
Browse files Browse the repository at this point in the history
This closes #106
  • Loading branch information
Chuck Rolke committed Oct 10, 2016
2 parents 1d34fac + 2bab3f3 commit acbae29
Show file tree
Hide file tree
Showing 10 changed files with 528 additions and 19 deletions.
7 changes: 7 additions & 0 deletions include/qpid/dispatch/amqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@
/// @{


/**
* AMQP Constants
*/
typedef enum {
QD_AMQP_MIN_MAX_FRAME_SIZE = 512
} qd_amqp_constants_t;

/**
* AMQP Performative Tags
*/
Expand Down
13 changes: 13 additions & 0 deletions include/qpid/dispatch/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,19 @@ typedef struct qd_server_config_t {
*/
uint32_t max_frame_size;

/**
* The max_sessions value is the number of sessions allowed on the Connection.
*/
uint32_t max_sessions;

/**
* The incoming capacity value is calculated to be (sessionMaxFrames * maxFrameSize).
* In a round about way the calculation forces the AMQP Begin/incoming-capacity value
* to equal the specified sessionMaxFrames value measured in units of transfer frames.
* This calculation is done to satisfy proton pn_session_set_incoming_capacity().
*/
uint32_t incoming_capacity;

/**
* The idle timeout, in seconds. If the peer sends no data frames in this many seconds, the
* connection will be automatically closed.
Expand Down
37 changes: 31 additions & 6 deletions python/qpid_dispatch/management/qdrouter.json
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,20 @@
"maxFrameSize": {
"type": "integer",
"default": 16384,
"description": "Defaults to 16384. If specified, it is the maximum frame size in octets that will be used in the connection-open negotiation with a connected peer. The frame size is the largest contiguous set of uninterrupted data that can be sent for a message delivery over the connection. Interleaving of messages on different links is done at frame granularity.",
"description": "The maximum frame size in octets that will be used in the connection-open negotiation with a connected peer. The frame size is the largest contiguous set of uninterrupted data that can be sent for a message delivery over the connection. Interleaving of messages on different links is done at frame granularity. Policy settings, if specified, will overwrite this value. Defaults to 16384.",
"create": true
},
"maxSessions": {
"type": "integer",
"default": 32768,
"description": "The maximum number of sessions that can be simultaneously active on the connection. Setting this value to zero selects the default number of sessions. Policy settings, if specified, will overwrite this value. Defaults to 32768.",
"create": true
},
"maxSessionFrames": {
"type": "integer",
"description": "Session incoming window measured in transfer frames for sessions created on this connection. This is the number of transfer frames that may simultaneously be in flight for all links in the session. Setting this value to zero selects the default session window size. Policy settings, if specified, will overwrite this value. The numerical product of maxFrameSize and maxSessionFrames may not exceed 2^31-1. If (maxFrameSize x maxSessionFrames) exceeds 2^31-1 then maxSessionFrames is reduced to (2^31-1 / maxFrameSize). maxSessionFrames has a minimum value of 1. Defaults to 100.",
"default": 100,
"required": false,
"create": true
},
"idleTimeoutSeconds": {
Expand Down Expand Up @@ -700,14 +713,13 @@
"required": false,
"create": true,
"description": "For the 'inter-router' role only. This value assigns a cost metric to the inter-router connection. The default (and minimum) value is one. Higher values represent higher costs. The cost is used to influence the routing algorithm as it attempts to use the path with the lowest total cost from ingress to egress."
},

},
"sslProfile": {
"type": "string",
"required": false,
"description": "Name of the sslProfile.",
"create": true
},
},
"saslMechanisms": {
"type": "string",
"required": false,
Expand All @@ -722,8 +734,21 @@
},
"maxFrameSize": {
"type": "integer",
"default": 65536,
"description": "Maximum frame size in octets that will be used in the connection-open negotiation with a connected peer. The frame size is the largest contiguous set of uninterrupted data that can be sent for a message delivery over the connection. Interleaving of messages on different links is done at frame granularity.",
"default": 16384,
"description": "The maximum frame size in octets that will be used in the connection-open negotiation with a connected peer. The frame size is the largest contiguous set of uninterrupted data that can be sent for a message delivery over the connection. Interleaving of messages on different links is done at frame granularity. Policy settings will not overwrite this value. Defaults to 16384.",
"create": true
},
"maxSessions": {
"type": "integer",
"default": 32768,
"description": "The maximum number of sessions that can be simultaneously active on the connection. Setting this value to zero selects the default number of sessions. Policy settings will not overwrite this value. Defaults to 32768.",
"create": true
},
"maxSessionFrames": {
"type": "integer",
"description": "Session incoming window measured in transfer frames for sessions created on this connection. This is the number of transfer frames that may simultaneously be in flight for all links in the session. Setting this value to zero selects the default session window size. Policy settings will not overwrite this value. The numerical product of maxFrameSize and maxSessionFrames may not exceed 2^31-1. If (maxFrameSize x maxSessionFrames) exceeds 2^31-1 then maxSessionFrames is reduced to (2^31-1 / maxFrameSize). maxSessionFrames has a minimum value of 1. Defaults to 100.",
"default": 100,
"required": false,
"create": true
},
"idleTimeoutSeconds": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,42 +41,42 @@ Until the schema is extended specify embedded maps this document describes the v
},
"maxFrameSize": {
"type": "integer",
"description": "Largest frame that may be sent on this connection. Zero implies system default. (AMQP Open, max-frame-size)",
"default": 65536,
"description": "Largest frame that may be sent on this connection. Non-zero policy values overwrite values specified for a listener object. (AMQP Open, max-frame-size)",
"default": 16384,
"required": false,
"create": true
},
"maxMessageSize": {
"type": "integer",
"description": "Largest message size supported by links created on this connection. Zero implies system default. (AMQP Attach, max-message-size)",
"description": "[NOT IMPLEMENTED] Largest message size supported by links created on this connection. Non-zero policy values overwrite values specified for a listener object. (AMQP Attach, max-message-size)",
"default": 0,
"required": false,
"create": true
},
"maxSessionWindow": {
"type": "integer",
"description": "Largest incoming and outgoing window for sessions created on this connection. Zero implies system default. (AMQP Begin, incoming-window, outgoing-window)",
"default": 2147483647,
"description": "Largest incoming window in octets for sessions created on this connection. Non-zero policy values overwrite values specified for a listener object. (AMQP Begin, incoming-window)",
"default": 1638400,
"required": false,
"create": true
},
"maxSessions": {
"type": "integer",
"description": "Maximum number of sessions that may be created on this connection. Zero implies system default. (AMQP Open, channel-max)",
"default": 2147483647,
"description": "Maximum number of sessions that may be created on this connection. Non-zero policy values overwrite values specified for a listener object. (AMQP Open, channel-max)",
"default": 32768,
"required": false,
"create": true
},
"maxSenders": {
"type": "integer",
"description": "Maximum number of sending links that may be created on this connection. Zero implies system default.",
"description": "Maximum number of sending links that may be created on this connection. Zero disables all sender links.",
"default": 2147483647,
"required": false,
"create": true
},
"maxReceivers": {
"type": "integer",
"description": "Maximum number of receiving links that may be created on this connection. Zero implies system default.",
"description": "Maximum number of receiving links that may be created on this connection. Zero disables all receiver links.",
"default": 2147483647,
"required": false,
"create": true
Expand Down
34 changes: 33 additions & 1 deletion src/connection_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
config->inter_router_cost = qd_entity_opt_long(entity, "cost", 1); CHECK();
config->protocol_family = qd_entity_opt_string(entity, "protocolFamily", 0); CHECK();
config->max_frame_size = qd_entity_get_long(entity, "maxFrameSize"); CHECK();
config->max_sessions = qd_entity_get_long(entity, "maxSessions"); CHECK();
uint64_t ssn_frames = qd_entity_get_long(entity, "maxSessionFrames"); CHECK();
config->idle_timeout_seconds = qd_entity_get_long(entity, "idleTimeoutSeconds"); CHECK();
config->sasl_username = qd_entity_opt_string(entity, "saslUsername", 0); CHECK();
config->sasl_password = qd_entity_opt_string(entity, "saslPassword", 0); CHECK();
Expand All @@ -192,11 +194,41 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
set_config_host(config, entity);

//
// Handle the defaults for link capacity.
// Handle the defaults for various settings
//
if (config->link_capacity == 0)
config->link_capacity = 250;

if (config->max_sessions == 0 || config->max_sessions > 32768)
// Proton disallows > 32768
config->max_sessions = 32768;

if (config->max_frame_size < QD_AMQP_MIN_MAX_FRAME_SIZE)
// Silently promote the minimum max-frame-size
// Proton will do this but the number is needed for the
// incoming capacity calculation.
config->max_frame_size = QD_AMQP_MIN_MAX_FRAME_SIZE;

//
// Given session frame count and max frame size compute session incoming_capacity
// Limit total capacity to 2^31-1.
//
uint64_t mfs = (uint64_t)config->max_frame_size;
uint64_t trial_ic = ssn_frames * mfs;
uint64_t limit = (1ll << 31) - 1;
if (trial_ic < limit) {
// Silently promote incoming capacity of zero to one
config->incoming_capacity =
(trial_ic < QD_AMQP_MIN_MAX_FRAME_SIZE ? QD_AMQP_MIN_MAX_FRAME_SIZE : trial_ic);
} else {
config->incoming_capacity = limit;
uint64_t computed_ssn_frames = limit / mfs;
qd_log(qd->connection_manager->log_source, QD_LOG_WARNING,
"Server configuation for I/O adapter entity name:'%s', host:'%s', port:'%s', "
"requested maxSessionFrames truncated from %llu to %llu",
config->name, config->host, config->port, ssn_frames, computed_ssn_frames);
}

//
// For now we are hardwiring this attribute to true. If there's an outcry from the
// user community, we can revisit this later.
Expand Down
3 changes: 2 additions & 1 deletion src/container.c
Original file line number Diff line number Diff line change
Expand Up @@ -763,9 +763,10 @@ qd_lifetime_policy_t qd_container_node_get_life_policy(const qd_node_t *node)
qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, const char* name)
{
qd_link_t *link = new_qd_link_t();
const qd_server_config_t * cf = qd_connection_config(conn);

link->pn_sess = pn_session(qd_connection_pn(conn));
pn_session_set_incoming_capacity(link->pn_sess, 1000000);
pn_session_set_incoming_capacity(link->pn_sess, cf->incoming_capacity);

if (dir == QD_OUTGOING)
link->pn_link = pn_sender(link->pn_sess, name);
Expand Down
7 changes: 5 additions & 2 deletions src/policy.c
Original file line number Diff line number Diff line change
Expand Up @@ -433,11 +433,14 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn)
//
void qd_policy_apply_session_settings(pn_session_t *ssn, qd_connection_t *qd_conn)
{
size_t capacity;
if (qd_conn->policy_settings && qd_conn->policy_settings->maxSessionWindow) {
pn_session_set_incoming_capacity(ssn, qd_conn->policy_settings->maxSessionWindow);
capacity = qd_conn->policy_settings->maxSessionWindow;
} else {
pn_session_set_incoming_capacity(ssn, 1000000);
const qd_server_config_t * cf = qd_connection_config(qd_conn);
capacity = cf->incoming_capacity;
}
pn_session_set_incoming_capacity(ssn, capacity);
}

//
Expand Down
2 changes: 2 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ static void thread_process_listeners_LH(qd_server_t *qd_server)
//
pn_transport_set_server(tport);
pn_transport_set_max_frame(tport, config->max_frame_size);
pn_transport_set_channel_max(tport, config->max_sessions - 1);
pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000);

//
Expand Down Expand Up @@ -1247,6 +1248,7 @@ static void cxtr_try_open(void *context)
// Configure the transport
//
pn_transport_set_max_frame(tport, config->max_frame_size);
pn_transport_set_channel_max(tport, config->max_sessions - 1);
pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000);

//
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ foreach(py_test_module
system_tests_one_router
system_tests_policy
system_tests_protocol_family
system_tests_protocol_settings
system_tests_qdmanage
system_tests_qdstat
system_tests_sasl_plain
Expand Down

0 comments on commit acbae29

Please sign in to comment.