diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h index 5c3de51382..bbb7859abd 100644 --- a/include/qpid/dispatch/amqp.h +++ b/include/qpid/dispatch/amqp.h @@ -201,6 +201,7 @@ extern const char * const QD_AMQP_COND_PRECONDITION_FAILED; extern const char * const QD_AMQP_COND_RESOURCE_DELETED; extern const char * const QD_AMQP_COND_ILLEGAL_STATE; extern const char * const QD_AMQP_COND_FRAME_SIZE_TOO_SMALL; +extern const char * const QD_AMQP_COND_MESSAGE_SIZE_EXCEEDED; extern const char * const QD_AMQP_COND_CONNECTION_FORCED; /// @}; diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h index 99eb3bdd1d..054c6d5b4a 100644 --- a/include/qpid/dispatch/container.h +++ b/include/qpid/dispatch/container.h @@ -238,6 +238,8 @@ void qd_session_free(qd_session_t *qd_ssn); bool qd_session_is_q3_blocked(const qd_session_t *qd_ssn); qd_link_list_t *qd_session_q3_blocked_links(qd_session_t *qd_ssn); +void qd_connection_log_policy_denial(qd_link_t *link, const char *text); + // handy macros to get around PROTON-2184: pn_session_set_context aborts if // context==0 (can remove this once qdrouter requires >= proton 0.31.x) diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 9ed2bc248a..afc39c18b0 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -432,6 +432,12 @@ void qd_message_set_aborted(const qd_message_t *msg, bool aborted); */ uint8_t qd_message_get_priority(qd_message_t *msg); +/** + * True if message is larger that maxMessageSize + * @param msg A pointer to the message + * @return + */ +bool qd_message_oversize(const qd_message_t *msg); ///@} diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h index 043baa567f..abe5a9a314 100644 --- a/include/qpid/dispatch/server.h +++ b/include/qpid/dispatch/server.h @@ -604,6 +604,8 @@ bool qd_connection_strip_annotations_in(const qd_connection_t *c); void qd_connection_wake(qd_connection_t *ctx); +int qd_connection_max_message_size(const qd_connection_t *c); + /** * @} */ diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index 9254b7d587..ec678d09d9 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -1869,6 +1869,13 @@ "required": false, "create": true }, + "maxMessageSize": { + "type": "integer", + "default": 0, + "description": "The maximum size in bytes of AMQP message transfers allowed for this router. This limit is applied only to transfers over user connections and is not applied to interrouter or edge router connections. This limit may be overridden by vhost or by vhost user group settings. A value of zero disables this limit.", + "required": false, + "create": true + }, "enableVhostPolicy": { "type": "boolean", "default": false, @@ -1905,10 +1912,11 @@ "graph": true, "description": "The sum of all vhost sender and receiver denials." }, + "maxMessageSizeDenied": {"type": "integer", "graph": true}, "totalDenials": { "type": "integer", "graph": true, - "description": "The total number of connection and link denials." + "description": "The total number of connection, link, and transfer denials." } } }, @@ -1933,6 +1941,11 @@ "create": true, "update": true }, + "maxMessageSize": { + "type": "integer", + "description": "Optional maximum size in bytes of AMQP message transfers allowed for connections to this vhost. This limit overrides the policy maxMessageSize value and may be overridden by vhost user group settings. A value of zero disables this limit.", + "required": false + }, "maxConnectionsPerUser": { "type": "integer", "default": 65535, @@ -1993,6 +2006,11 @@ "required": false, "create": false }, + "maxMessageSize": { + "type": "integer", + "description": "Optional maximum size in bytes of AMQP message transfers allowed for connections created by users in this group. This limit overrides the policy and vhost maxMessageSize values. A value of zero disables this limit.", + "required": false + }, "maxFrameSize": { "type": "integer", "description": "The largest frame, in bytes, that may be sent on this connection. Non-zero policy values overwrite values specified for a listener object (AMQP Open, max-frame-size).", @@ -2130,7 +2148,8 @@ "sessionDenied": {"type": "integer", "graph": true}, "senderDenied": {"type": "integer", "graph": true}, - "receiverDenied": {"type": "integer", "graph": true} + "receiverDenied": {"type": "integer", "graph": true}, + "maxMessageSizeDenied": {"type": "integer", "graph": true} } }, diff --git a/python/qpid_dispatch_internal/management/config.py b/python/qpid_dispatch_internal/management/config.py index 8cf1940ba5..bd1595ffe2 100644 --- a/python/qpid_dispatch_internal/management/config.py +++ b/python/qpid_dispatch_internal/management/config.py @@ -201,10 +201,12 @@ def configure(attributes): policyDir = config.by_type('policy')[0]['policyDir'] policyDefaultVhost = config.by_type('policy')[0]['defaultVhost'] useHostnamePatterns = config.by_type('policy')[0]['enableVhostNamePatterns'] + maxMessageSize = config.by_type('policy')[0]['maxMessageSize'] for a in config.by_type("policy"): configure(a) agent.policy.set_default_vhost(policyDefaultVhost) agent.policy.set_use_hostname_patterns(useHostnamePatterns) + agent.policy.set_max_message_size(maxMessageSize) # Remaining configuration for t in "sslProfile", "authServicePlugin", "listener", "connector", \ diff --git a/python/qpid_dispatch_internal/policy/policy_local.py b/python/qpid_dispatch_internal/policy/policy_local.py index 37733f1b83..cfe34009fc 100644 --- a/python/qpid_dispatch_internal/policy/policy_local.py +++ b/python/qpid_dispatch_internal/policy/policy_local.py @@ -120,7 +120,7 @@ class PolicyCompiler(object): """ Validate incoming configuration for legal schema. - Warn about section options that go unused. - - Disallow negative max connection numbers. + - Disallow negative max connection/message size numbers. - Check that connectionOrigins resolve to IP hosts. - Enforce internal consistency, """ @@ -131,6 +131,7 @@ class PolicyCompiler(object): PolicyKeys.KW_IGNORED_TYPE, PolicyKeys.KW_VHOST_NAME, PolicyKeys.KW_MAXCONN, + PolicyKeys.KW_MAX_MESSAGE_SIZE, PolicyKeys.KW_MAXCONNPERHOST, PolicyKeys.KW_MAXCONNPERUSER, PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT, @@ -232,7 +233,7 @@ def compile_connection_group(self, vhostname, groupname, val, list_out, warnings def compile_app_settings(self, vhostname, usergroup, policy_in, policy_out, warnings, errors): """ - Compile a schema from processed json format to local internal format. + Compile a vhostUserGroupSettings schema from processed json format to local internal format. @param[in] name vhost name @param[in] policy_in user config settings @param[out] policy_out validated Internal format @@ -248,7 +249,7 @@ def compile_app_settings(self, vhostname, usergroup, policy_in, policy_out, warn policy_out[PolicyKeys.KW_REMOTE_HOSTS] = '' # DISPATCH-1277 - KW_MAX_FRAME_SIZE must be defaulted to 16384 not 2147483647 policy_out[PolicyKeys.KW_MAX_FRAME_SIZE] = 16384 - policy_out[PolicyKeys.KW_MAX_MESSAGE_SIZE] = 0 + policy_out[PolicyKeys.KW_MAX_MESSAGE_SIZE] = None policy_out[PolicyKeys.KW_MAX_SESSION_WINDOW] = 2147483647 policy_out[PolicyKeys.KW_MAX_SESSIONS] = 65536 policy_out[PolicyKeys.KW_MAX_SENDERS] = 2147483647 @@ -411,7 +412,7 @@ def compile_app_settings(self, vhostname, usergroup, policy_in, policy_out, warn def compile_access_ruleset(self, name, policy_in, policy_out, warnings, errors): """ - Compile a schema from processed json format to local internal format. + Compile a vhost schema from processed json format to local internal format. @param[in] name vhost name @param[in] policy_in raw policy to be validated @param[out] policy_out validated Internal format @@ -429,6 +430,7 @@ def compile_access_ruleset(self, name, policy_in, policy_out, warnings, errors): policy_out[PolicyKeys.KW_MAXCONNPERUSER] = 65535 policy_out[PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT] = False policy_out[PolicyKeys.KW_GROUPS] = {} + policy_out[PolicyKeys.KW_MAX_MESSAGE_SIZE] = None # validate the options for key, val in dict_iteritems(policy_in): @@ -445,6 +447,14 @@ def compile_access_ruleset(self, name, policy_in, policy_out, warnings, errors): errors.append(msg) return False policy_out[key] = val + elif key in [PolicyKeys.KW_MAX_MESSAGE_SIZE + ]: + if not self.validateNumber(val, 0, 0, cerror): + msg = ("Policy vhost '%s' option '%s' has error '%s'." % + (name, key, cerror[0])) + errors.append(msg) + return False + policy_out[key] = val elif key in [PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT]: if not type(val) is bool: errors.append("Policy vhost '%s' option '%s' must be of type 'bool' but is '%s'" % @@ -609,6 +619,9 @@ def __init__(self, manager): # When true policy ruleset definitions are propagated to C code self.use_hostname_patterns = False + # _max_message_size + # holds global value from policy config object + self._max_message_size = 0 # # Service interfaces # @@ -830,6 +843,13 @@ def lookup_settings(self, vhost_in, groupname, upolicy): upolicy.update(ruleset[PolicyKeys.KW_GROUPS][groupname]) + maxsize = upolicy.get(PolicyKeys.KW_MAX_MESSAGE_SIZE, None) + if maxsize is None: + maxsize = ruleset.get(PolicyKeys.KW_MAX_MESSAGE_SIZE, None) + if maxsize is None: + maxsize = self._max_message_size + upolicy[PolicyKeys.KW_MAX_MESSAGE_SIZE] = maxsize + upolicy[PolicyKeys.KW_CSTATS] = self.statsdb[vhost].get_cstats() return True except Exception as e: @@ -851,6 +871,15 @@ def close_connection(self, conn_id): self._manager.log_trace( "Policy internal error closing connection id %s. %s" % (conn_id, str(e))) + def set_max_message_size(self, size): + """ + record max message size from policy config object + :param size: + :return:ls + + """ + self._max_message_size = size + # # def test_load_config(self): diff --git a/python/qpid_dispatch_internal/policy/policy_manager.py b/python/qpid_dispatch_internal/policy/policy_manager.py index fe5e4e8f28..f7f35fb457 100644 --- a/python/qpid_dispatch_internal/policy/policy_manager.py +++ b/python/qpid_dispatch_internal/policy/policy_manager.py @@ -161,6 +161,14 @@ def close_connection(self, conn_id): @return: none """ self._policy_local.close_connection(conn_id) + + def set_max_message_size(self, size): + """ + Policy has set global maxMessageSize. + :param size: + :return: none + """ + self._policy_local.set_max_message_size(size) # # # diff --git a/src/amqp.c b/src/amqp.c index de89637383..ff11ee67e9 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -86,6 +86,7 @@ const char * const QD_AMQP_COND_RESOURCE_DELETED = "amqp:resource-deleted"; const char * const QD_AMQP_COND_ILLEGAL_STATE = "amqp:illegal-state"; const char * const QD_AMQP_COND_FRAME_SIZE_TOO_SMALL = "amqp:frame-size-too-small"; const char * const QD_AMQP_COND_CONNECTION_FORCED = "amqp:connection:forced"; +const char * const QD_AMQP_COND_MESSAGE_SIZE_EXCEEDED = "amqp:connection:message-size-exceeded"; const char * const QD_AMQP_PORT_STR = "5672"; const char * const QD_AMQPS_PORT_STR = "5671"; diff --git a/src/container.c b/src/container.c index d08800bfa1..a728de735c 100644 --- a/src/container.c +++ b/src/container.c @@ -159,7 +159,7 @@ static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link) } -static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link) +static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link, int max_size) { qd_node_t *node = container->default_node; @@ -191,6 +191,9 @@ static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link) link->node = node; link->remote_snd_settle_mode = pn_link_remote_snd_settle_mode(pn_link); + if (max_size) { + pn_link_set_max_message_size(pn_link, (uint64_t)max_size); + } pn_link_set_context(pn_link, link); node->ntype->incoming_handler(node->context, link); } @@ -652,7 +655,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, } qd_conn->n_senders++; } - setup_incoming_link(container, pn_link); + setup_incoming_link(container, pn_link, qd_connection_max_message_size(qd_conn)); } } else if (pn_link_state(pn_link) & PN_LOCAL_ACTIVE) handle_link_open(container, pn_link); diff --git a/src/message.c b/src/message.c index a3420a1862..58c92e55b4 100644 --- a/src/message.c +++ b/src/message.c @@ -29,6 +29,7 @@ #include "compose_private.h" #include "connection_manager_private.h" #include "aprintf.h" +#include "policy.h" #include #include #include @@ -941,7 +942,6 @@ qd_message_t *qd_message() msg->content->lock = sys_mutex(); sys_atomic_init(&msg->content->ref_count, 1); msg->content->parse_depth = QD_DEPTH_NONE; - return (qd_message_t*) msg; } @@ -1244,14 +1244,13 @@ void qd_message_set_tag_sent(qd_message_t *in_msg, bool tag_sent) /** * Receive and discard large messages for which there is no destination. * Don't waste resources by putting the message into internal buffers. - * Don't fiddle with locking as no sender is competing with reception. + * Message locking is not required since the message content buffers are untouched. */ qd_message_t *discard_receive(pn_delivery_t *delivery, pn_link_t *link, qd_message_t *msg_in) { qd_message_pvt_t *msg = (qd_message_pvt_t*)msg_in; - while (1) { #define DISCARD_BUFFER_SIZE (128 * 1024) char dummy[DISCARD_BUFFER_SIZE]; @@ -1261,13 +1260,15 @@ qd_message_t *discard_receive(pn_delivery_t *delivery, // have read all available pn_link incoming bytes break; } else if (rc == PN_EOS || rc < 0) { - // end of message or error. Call the message complete - msg->content->receive_complete = true; + // End of message or error: finalize message_receive handling msg->content->aborted = pn_delivery_aborted(delivery); qd_nullify_safe_ptr(&msg->content->input_link_sp); - pn_record_t *record = pn_delivery_attachments(delivery); pn_record_set(record, PN_DELIVERY_CTX, 0); + if (msg->content->oversize) { + msg->content->aborted = true; + } + msg->content->receive_complete = true; break; } else { // rc was > 0. bytes were read and discarded. @@ -1308,13 +1309,15 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) msg->strip_annotations_in = qd_connection_strip_annotations_in(qdc); pn_record_def(record, PN_DELIVERY_CTX, PN_WEAKREF); pn_record_set(record, PN_DELIVERY_CTX, (void*) msg); + msg->content->max_message_size = qd_connection_max_message_size(qdc); } // // The discard flag indicates we should keep reading the input stream // but not process the message for delivery. + // Oversize messages are also discarded. // - if (msg->content->discard) { + if (msg->content->discard || msg->content->oversize) { return discard_receive(delivery, link, (qd_message_t *)msg); } @@ -1416,10 +1419,23 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) recv_error = true; } else if (rc > 0) { // - // We have received a positive number of bytes for the message. Advance - // the cursor in the buffer. + // We have received a positive number of bytes for the message. + // Advance the cursor in the buffer. // qd_buffer_insert(content->pending, rc); + + // Handle maxMessageSize violations + if (content->max_message_size) { + content->bytes_received += rc; + if (content->bytes_received > content->max_message_size) + { + qd_connection_t *conn = qd_link_connection(qdl); + qd_connection_log_policy_denial(qdl, "DENY AMQP Transfer maxMessageSize exceeded"); + qd_policy_count_max_size_event(link, conn); + content->oversize = true; + return discard_receive(delivery, link, (qd_message_t*)msg); + } + } } else { // // We received zero bytes, and no PN_EOS. This means that we've received @@ -2231,3 +2247,9 @@ void qd_message_set_aborted(const qd_message_t *msg, bool aborted) qd_message_pvt_t * msg_pvt = (qd_message_pvt_t *)msg; msg_pvt->content->aborted = aborted; } + +bool qd_message_oversize(const qd_message_t *msg) +{ + qd_message_content_t * mc = MSG_CONTENT(msg); + return mc->oversize; +} diff --git a/src/message_private.h b/src/message_private.h index 7bc66dc5f0..ba13538c23 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -109,6 +109,8 @@ typedef struct { qd_parsed_field_t *ma_pf_to_override; qd_parsed_field_t *ma_pf_trace; int ma_int_phase; + int max_message_size; // configured max; 0 if no max to enforce + int bytes_received; // bytes returned by pn_link_recv() when enforcing max_message_size uint32_t fanout; // The number of receivers for this message, including in-process subscribers. qd_link_t_sp input_link_sp; // message received on this link @@ -120,6 +122,7 @@ typedef struct { bool disable_q2_holdoff; // Disable the Q2 flow control bool priority_parsed; bool priority_present; + bool oversize; // policy oversize handling in effect uint8_t priority; // The priority of this message } qd_message_content_t; diff --git a/src/policy.c b/src/policy.c index 896540c1df..4a45bedbcc 100644 --- a/src/policy.c +++ b/src/policy.c @@ -46,6 +46,7 @@ static uint64_t n_connections = 0; static uint64_t n_denied = 0; static uint64_t n_processed = 0; static uint64_t n_links_denied = 0; +static uint64_t n_maxsize_messages_denied = 0; static uint64_t n_total_denials = 0; // @@ -83,6 +84,9 @@ static PyObject * module = 0; ALLOC_DEFINE(qd_policy_settings_t); +// Policy log module used outside of policy proper +qd_log_source_t* policy_log_source = 0; + // // Policy configuration/statistics management interface // @@ -116,6 +120,7 @@ qd_policy_t *qd_policy(qd_dispatch_t *qd) policy->tree_lock = sys_mutex(); policy->hostname_tree = qd_parse_tree_new(QD_PARSE_TREE_ADDRESS); stats_lock = sys_mutex(); + policy_log_source = policy->log_source; qd_log(policy->log_source, QD_LOG_TRACE, "Policy Initialized"); return policy; @@ -206,7 +211,8 @@ qd_error_t qd_policy_c_counts_refresh(long ccounts, qd_entity_t *entity) qd_policy_denial_counts_t *dc = (qd_policy_denial_counts_t*)ccounts; if (!qd_entity_set_long(entity, "sessionDenied", dc->sessionDenied) && !qd_entity_set_long(entity, "senderDenied", dc->senderDenied) && - !qd_entity_set_long(entity, "receiverDenied", dc->receiverDenied) + !qd_entity_set_long(entity, "receiverDenied", dc->receiverDenied) && + !qd_entity_set_long(entity, "maxMessageSizeDenied", dc->maxSizeMessagesDenied) ) return QD_ERROR_NONE; return qd_error_code(); @@ -218,13 +224,14 @@ qd_error_t qd_policy_c_counts_refresh(long ccounts, qd_entity_t *entity) **/ qd_error_t qd_entity_refresh_policy(qd_entity_t* entity, void *unused) { // Return global stats - uint64_t np, nd, nc, nl, nt; + uint64_t np, nd, nc, nl, nm, nt; sys_mutex_lock(stats_lock); { np = n_processed; nd = n_denied; nc = n_connections; nl = n_links_denied; + nm = n_maxsize_messages_denied; nt = n_total_denials; } sys_mutex_unlock(stats_lock); @@ -232,6 +239,7 @@ qd_error_t qd_entity_refresh_policy(qd_entity_t* entity, void *unused) { !qd_entity_set_long(entity, "connectionsDenied", nd) && !qd_entity_set_long(entity, "connectionsCurrent", nc) && !qd_entity_set_long(entity, "linksDenied", nl) && + !qd_entity_set_long(entity, "maxMessageSizeDenied", nm) && !qd_entity_set_long(entity, "totalDenials", nt) ) return QD_ERROR_NONE; @@ -303,19 +311,21 @@ void qd_policy_socket_close(qd_policy_t *policy, const qd_connection_t *conn) qd_python_unlock(lock_state); } const char *hostname = qd_connection_name(conn); - int ssnDenied = 0; - int sndDenied = 0; - int rcvDenied = 0; + uint64_t ssnDenied = 0; + uint64_t sndDenied = 0; + uint64_t rcvDenied = 0; + uint64_t sizDenied = 0; if (conn->policy_settings && conn->policy_settings->denialCounts) { ssnDenied = conn->policy_settings->denialCounts->sessionDenied; sndDenied = conn->policy_settings->denialCounts->senderDenied; rcvDenied = conn->policy_settings->denialCounts->receiverDenied; - } - qd_log(policy->log_source, QD_LOG_DEBUG, + sizDenied = conn->policy_settings->denialCounts->maxSizeMessagesDenied; + qd_log(policy->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Connection '%s' closed with resources n_sessions=%d, n_senders=%d, n_receivers=%d, " - "sessions_denied=%d, senders_denied=%d, receivers_denied=%d. nConnections= %d.", + "sessions_denied=%ld, senders_denied=%ld, receivers_denied=%ld. nConnections= %ld.", conn->connection_id, hostname, conn->n_sessions, conn->n_senders, conn->n_receivers, - ssnDenied, sndDenied, rcvDenied, n_connections); + ssnDenied, sndDenied, rcvDenied, sizDenied, n_connections); + } } @@ -502,6 +512,7 @@ bool qd_policy_open_fetch_settings( settings->maxSessions = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessions", 0); settings->maxSenders = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSenders", 0); settings->maxReceivers = qd_entity_opt_long((qd_entity_t*)upolicy, "maxReceivers", 0); + settings->maxMessageSize = qd_entity_opt_long((qd_entity_t*)upolicy, "maxMessageSize", 0); if (!settings->allowAnonymousSender) { //don't override if enabled by authz plugin settings->allowAnonymousSender = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAnonymousSender", false); } @@ -666,6 +677,20 @@ void _qd_policy_deny_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *qd_ } +// +// +void qd_policy_count_max_size_event(pn_link_t *link, qd_connection_t *qd_conn) +{ + sys_mutex_lock(stats_lock); + n_maxsize_messages_denied++; + n_total_denials++; + sys_mutex_unlock(stats_lock); + // TODO: denialCounts is shared among connections and should be protected also + if (qd_conn->policy_settings && qd_conn->policy_settings->denialCounts) { + qd_conn->policy_settings->denialCounts->maxSizeMessagesDenied++; + } +} + /** * Given a char return true if it is a parse_tree token separater */ @@ -1474,3 +1499,8 @@ char * qd_policy_compile_allowed_csv(char * csv) free(dup); return result; } + + +qd_log_source_t* qd_policy_log_source() { + return policy_log_source; +} diff --git a/src/policy.h b/src/policy.h index a076eb3da5..44e87943d3 100644 --- a/src/policy.h +++ b/src/policy.h @@ -39,6 +39,7 @@ struct qd_policy_denial_counts_s { uint64_t sessionDenied; uint64_t senderDenied; uint64_t receiverDenied; + uint64_t maxSizeMessagesDenied; }; typedef struct qd_policy_t qd_policy_t; @@ -49,6 +50,7 @@ struct qd_policy__settings_s { int maxSessions; int maxSenders; int maxReceivers; + int maxMessageSize; bool allowDynamicSource; bool allowAnonymousSender; bool allowUserIdProxy; @@ -231,6 +233,7 @@ char * qd_policy_host_pattern_lookup(qd_policy_t *policy, const char *hostPatter * @return the ruleset string to be used in policy settings. */ char * qd_policy_compile_allowed_csv(char * csv); + /** * Approve sending of message on anonymous link based on connection's policy. * @@ -238,4 +241,17 @@ char * qd_policy_compile_allowed_csv(char * csv); * @param[in] qd_conn dispatch connection with policy settings */ bool qd_policy_approve_message_target(qd_iterator_t *address, qd_connection_t *qd_conn); + +/** + * Increment counters for a link when policy maxMessageSize limit is exceeded. + * + * @param[in] pn_link proton link being with delivery/transfer being rejected + * @param[in] qd_conn dispatch connection with policy settings and counts + **/ +void qd_policy_count_max_size_event(pn_link_t *link, qd_connection_t *qd_conn); + +/** + * Return POLICY log_source to log policy + */ +qd_log_source_t* qd_policy_log_source(); #endif diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c index 715d8d0379..9e0400ed73 100644 --- a/src/router_core/delivery.c +++ b/src/router_core/delivery.c @@ -50,6 +50,12 @@ qdr_link_t *qdr_delivery_link(const qdr_delivery_t *delivery) } +bool qdr_delivery_oversize(const qdr_delivery_t *delivery) +{ + return delivery && delivery->msg && qd_message_oversize(delivery->msg); +} + + bool qdr_delivery_send_complete(const qdr_delivery_t *delivery) { if (!delivery) diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h index 9a7edfbaa2..f97308ba7c 100644 --- a/src/router_core/delivery.h +++ b/src/router_core/delivery.h @@ -76,6 +76,7 @@ ALLOC_DECLARE(qdr_delivery_t); bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery); bool qdr_delivery_send_complete(const qdr_delivery_t *delivery); +bool qdr_delivery_oversize(const qdr_delivery_t *delivery); void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context); void *qdr_delivery_get_context(const qdr_delivery_t *delivery); diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c index 1e137aed4b..e3f95554a0 100644 --- a/src/router_core/router_core_thread.c +++ b/src/router_core/router_core_thread.c @@ -209,7 +209,6 @@ void *router_core_thread(void *arg) qdr_post_general_work_CT(core, work); } } - qd_log(core->log, QD_LOG_INFO, "Router Core thread exited"); return 0; } diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index f885eeffb4..c93cfb3a85 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -185,7 +185,7 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) DEQ_REMOVE_HEAD(link->undelivered); dlv->link_work = 0; - if (settled) { + if (settled || qdr_delivery_oversize(dlv)) { dlv->where = QDR_DELIVERY_NOWHERE; qdr_delivery_decref(core, dlv, "qdr_link_process_deliveries - remove from undelivered list"); } else { diff --git a/src/router_node.c b/src/router_node.c index a735280fce..4aca57467d 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -27,6 +27,7 @@ #include "entity_cache.h" #include "router_private.h" #include "delivery.h" +#include "policy.h" #include #include #include @@ -287,8 +288,9 @@ static void log_link_message(qd_connection_t *conn, pn_link_t *pn_link, qd_messa const qd_server_config_t *cf = qd_connection_config(conn); if (!cf) return; char buf[qd_message_repr_len()]; - const char *msg_str = qd_message_aborted(msg) ? - "aborted message" : qd_message_repr(msg, buf, sizeof(buf), cf->log_bits); + const char *msg_str = qd_message_oversize(msg) ? "oversize message" : + qd_message_aborted(msg) ? "aborted message" : + qd_message_repr(msg, buf, sizeof(buf), cf->log_bits); if (msg_str) { const char *src = pn_terminus_get_address(pn_link_source(pn_link)); const char *tgt = pn_terminus_get_address(pn_link_target(pn_link)); @@ -335,49 +337,66 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link) qd_message_t *msg = qd_message_receive(pnd); bool receive_complete = qd_message_receive_complete(msg); - if (receive_complete) { - log_link_message(conn, pn_link, msg); + if (!qd_message_oversize(msg)) { + // message not rejected as oversize + if (receive_complete) { + log_link_message(conn, pn_link, msg); - // - // The entire message has been received and we are ready to consume the delivery by calling pn_link_advance(). - // - pn_link_advance(pn_link); - next_delivery = pn_link_current(pn_link) != 0; + // + // The entire message has been received and we are ready to consume the delivery by calling pn_link_advance(). + // + pn_link_advance(pn_link); + next_delivery = pn_link_current(pn_link) != 0; - uint64_t local_disp = qdr_delivery_disposition(delivery); - if (local_disp != 0) { - pn_delivery_update(pnd, local_disp); + uint64_t local_disp = qdr_delivery_disposition(delivery); + if (local_disp != 0) { + pn_delivery_update(pnd, local_disp); + } } - } - if (qd_message_is_discard(msg)) { - // - // Message has been marked for discard, no further processing necessary - // + if (qd_message_is_discard(msg)) { + // + // Message has been marked for discard, no further processing necessary + // + if (receive_complete) { + // If this discarded delivery has already been settled by proton, + // set the presettled flag on the delivery to true if it is not already true. + // Since the entire message has already been received, we directly call the + // function to set the pre-settled flag since we cannot go thru the core-thread + // to do this since the delivery has been discarded. + // Discarded streaming deliveries are not put thru the core thread via the continue action. + if (pn_delivery_settled(pnd)) + qdr_delivery_set_presettled(delivery); + + + // note: expected that the code that set discard has handled + // setting disposition and updating flow! + pn_delivery_settle(pnd); + if (delivery) { + // if delivery already exists then the core thread discarded this + // delivery, it will eventually free the qdr_delivery_t and its + // associated message - do not free it here. + qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd); + } else { + qd_message_free(msg); + } + } + return next_delivery; + } + } else { + // message is oversize if (receive_complete) { - // If this discarded delivery has already been settled by proton, - // set the presettled flag on the delivery to true if it is not already true. - // Since the entire message has already been received, we directly call the - // function to set the pre-settled flag since we cannot go thru the core-thread - // to do this since the delivery has been discarded. - // Discarded streaming deliveries are not put thru the core thread via the continue action. - if (pn_delivery_settled(pnd)) - qdr_delivery_set_presettled(delivery); - - - // note: expected that the code that set discard has handled - // setting disposition and updating flow! + // reject and settle the incoming delivery + pn_delivery_update(pnd, PN_REJECTED); pn_delivery_settle(pnd); - if (delivery) { - // if delivery already exists then the core thread discarded this - // delivery, it will eventually free the qdr_delivery_t and its - // associated message - do not free it here. - qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd); - } else { - qd_message_free(msg); - } + // close the connection + pn_connection_t * pn_conn = qd_connection_pn(conn); + pn_condition_t * cond = pn_connection_condition(pn_conn); + (void) pn_condition_set_name( cond, QD_AMQP_COND_MESSAGE_SIZE_EXCEEDED); + pn_connection_close(pn_conn); } - return next_delivery; + return false; + // oversize messages are not processed any further } // @@ -1913,3 +1932,21 @@ void qd_link_restart_rx(qd_link_t *in_link) qd_connection_invoke_deferred(in_conn, deferred_AMQP_rx_handler, safe_ptr); } } + + +// Issue a warning POLICY log message with connection and link identities +// prepended to the policy denial text string. +void qd_connection_log_policy_denial(qd_link_t *link, const char *text) +{ + qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link); + uint64_t l_id = 0; + uint64_t c_id = 0; + if (rlink) { + l_id = rlink->identity; + if (rlink->conn) { + c_id = rlink->conn->identity; + } + } + qd_log(qd_policy_log_source(), QD_LOG_WARNING, "[C%"PRIu64"][L%"PRIu64"] %s", + c_id, l_id, text); +} diff --git a/src/server.c b/src/server.c index 6d6d289196..20d19807af 100644 --- a/src/server.c +++ b/src/server.c @@ -1663,3 +1663,7 @@ sys_mutex_t *qd_server_get_activation_lock(qd_server_t * server) { return server->conn_activation_lock; } + +int qd_connection_max_message_size(const qd_connection_t *c) { + return (c && c->policy_settings) ? c->policy_settings->maxMessageSize : 0; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e34db23f67..041ee26f98 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -99,6 +99,8 @@ foreach(py_test_module system_tests_default_distribution system_tests_deprecated system_tests_policy + system_tests_policy_oversize_basic + system_tests_policy_oversize_compound system_tests_protocol_family system_tests_protocol_settings system_tests_qdmanage diff --git a/tests/system_tests_policy_oversize_basic.py b/tests/system_tests_policy_oversize_basic.py new file mode 100644 index 0000000000..30ad1aab07 --- /dev/null +++ b/tests/system_tests_policy_oversize_basic.py @@ -0,0 +1,780 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from __future__ import print_function + +import unittest as unittest +import os, json, re, signal +import sys +import time + +from system_test import TestCase, Qdrouterd, main_module, Process, TIMEOUT, DIR, QdManager, Logger +from subprocess import PIPE, STDOUT +from proton import ConnectionException, Timeout, Url, symbol, Message +from proton.handlers import MessagingHandler +from proton.reactor import Container, ReceiverOption +from proton.utils import BlockingConnection, LinkDetached, SyncRequestResponse +from qpid_dispatch_internal.policy.policy_util import is_ipv6_enabled +from qpid_dispatch_internal.compat import dict_iteritems +from test_broker import FakeBroker + +W_THREADS = 2 + +# DISPATCH-975 Detect that an oversize message is blocked. +# These tests check basic blocking where the the sender is blocked by +# the ingress routers. It does not check compound blocking where +# oversize is allowed or denied by an ingress edge router but also +# denied by the uplink interior router. + +class OversizeMessageTransferTest(MessagingHandler): + """ + This test connects a sender and a receiver. Then it tries to send _count_ + number of messages of the given size through the router or router network. + + With expect_block=True the ingress router should detect the sender's oversize + message and close the sender connection. The receiver may receive + aborted message indications but that is not guaranteed. If any aborted + messages are received then the count must be at most one. + The test is a success when the sender receives a connection error with + oversize indication and the receiver has not received too many aborts. + + With expect_block=False sender messages should be received normally. + The test is a success when n_accepted == count. + """ + def __init__(self, sender_host, receiver_host, test_address, + message_size=100000, count=10, expect_block=True, print_to_console=False): + super(OversizeMessageTransferTest, self).__init__() + self.sender_host = sender_host + self.receiver_host = receiver_host + self.test_address = test_address + self.msg_size = message_size + self.count = count + self.expect_block = expect_block + + self.sender_conn = None + self.receiver_conn = None + self.error = None + self.sender = None + self.receiver = None + self.proxy = None + + self.n_sent = 0 + self.n_rcvd = 0 + self.n_accepted = 0 + self.n_rejected = 0 + self.n_aborted = 0 + self.n_connection_error = 0 + + self.logger = Logger(title=("OversizeMessageTransferTest - %s" % (self.test_address)), print_to_console=print_to_console) + self.log_unhandled = not self.expect_block + + def timeout(self): + self.error = "Timeout Expired: n_sent=%d n_rcvd=%d n_rejected=%d n_aborted=%d" % \ + (self.n_sent, self.n_rcvd, self.n_rejected, self.n_aborted) + self.logger.log("self.timeout " + self.error) + self._shut_down_test() + + def on_start(self, event): + self.logger.log("on_start") + self.timer = event.reactor.schedule(10, Timeout(self)) + self.logger.log("on_start: opening receiver connection to %s" % (self.receiver_host.addresses[0])) + self.receiver_conn = event.container.connect(self.receiver_host.addresses[0]) + self.logger.log("on_start: opening sender connection to %s" % (self.sender_host.addresses[0])) + self.sender_conn = event.container.connect(self.sender_host.addresses[0]) + self.logger.log("on_start: Creating receiver") + self.receiver = event.container.create_receiver(self.receiver_conn, self.test_address) + self.logger.log("on_start: Creating sender") + self.sender = event.container.create_sender(self.sender_conn, self.test_address) + self.logger.log("on_start: done") + + def send(self): + while self.sender.credit > 0 and self.n_sent < self.count: + # construct message in indentifiable chunks + body_msg = "" + padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[self.n_sent % 30] + while len(body_msg) < self.msg_size: + chunk = "[%s:%d:%d" % (self.test_address, self.n_sent, len(body_msg)) + padlen = 50 - len(chunk) + chunk += padchar * padlen + body_msg += chunk + if len(body_msg) > self.msg_size: + body_msg = body_msg[:self.msg_size] + self.logger.log("send. address:%s message:%d of %s length=%d" % + (self.test_address, self.n_sent, self.count, self.msg_size)) + m = Message(body=body_msg) + self.sender.send(m) + self.n_sent += 1 + + def on_sendable(self, event): + if event.sender == self.sender: + self.logger.log("on_sendable") + self.send() + + def on_message(self, event): + if self.expect_block: + # All messages should violate maxMessageSize. + # Receiving any is an error. + self.error = "Received a message. Expected to receive no messages." + self.logger.log(self.error) + self._shut_down_test() + else: + self.n_rcvd += 1 + self.accept(event.delivery) + self._check_done() + + def on_connection_error(self, event): + if event.connection == self.sender_conn: + if event.connection.remote_condition.name == "amqp:connection:message-size-exceeded": + self.n_connection_error += 1 + self.sender_conn.close() + self.sender_conn = None + else: + # sender closed but for wrong reason + self.error = "sender close error: Expected amqp:connection:message-size-exceeded but received %s" % \ + event.connection.remote_condition.name + self.logger.log(self.error) + else: + # connection error but not for sender + self.error = "unexpected connection close error: wrong connection closed. condition= %s" % \ + event.connection.remote_condition.name + self.logger.log(self.error) + self._check_done() + + def _shut_down_test(self): + if self.timer: + self.timer.cancel() + self.timer = None + if self.sender: + self.sender.close() + self.sender = None + if self.receiver: + self.receiver.close() + self.receiver = None + if self.sender_conn: + self.sender_conn.close() + self.sender_conn = None + if self.receiver_conn: + self.receiver_conn.close() + self.receiver_conn = None + + def _check_done(self): + current = ("check_done: sent=%d rcvd=%d rejected=%d aborted=%d connection_error:%d" % + (self.n_sent, self.n_rcvd, self.n_rejected, self.n_aborted, self.n_connection_error)) + self.logger.log(current) + if self.error is not None: + self.logger.log("TEST FAIL") + self._shut_down_test() + else: + done = (self.n_connection_error == 1) \ + if self.expect_block else \ + (self.n_sent == self.count and self.n_rcvd == self.count) + + if done: + self.logger.log("TEST DONE!!!") + # self.log_unhandled = True # verbose debugging + self._shut_down_test() + + def on_rejected(self, event): + self.n_rejected += 1 + if self.expect_block: + self.logger.log("on_rejected: entry") + self._check_done() + else: + self.error = "Unexpected on_reject" + self.logger.log(self.error) + self._check_done() + + def on_aborted(self, event): + self.logger.log("on_aborted") + self.n_aborted += 1 + self._check_done() + + def on_error(self, event): + self.error = "Container error" + self.logger.log(self.error) + self.sender_conn.close() + self.receiver_conn.close() + self.timer.cancel() + + def on_link_error(self, event): + self.error = event.link.remote_condition.name + self.logger.log("on_link_error: %s" % (self.error)) + # + # qpid-proton master @ 6abb4ce + # At this point the container is wedged and closing the connections does + # not get the container to exit. + # Instead, raise an exception that bypasses normal container exit. + # This class then returns something for the main test to evaluate. + # + raise Exception(self.error) + + def on_unhandled(self, method, *args): + if self.log_unhandled: + self.logger.log("on_unhandled: method: %s, args: %s" % (method, args)) + + def run(self): + try: + Container(self).run() + except Exception as e: + self.error = "Container run exception: %s" % (e) + self.logger.log(self.error) + self.logger.dump() + + + +# For the next test case define max sizes for each router. +# These are the configured maxMessageSize values +EA1_MAX_SIZE = 50000 +INTA_MAX_SIZE = 100000 +INTB_MAX_SIZE = 150000 +EB1_MAX_SIZE = 200000 + +# Interior routers enforce max size directly. +# Edge routers are also checked by the attached interior router. + +# Block tests that use edge routers that send messages to the network must +# account for the fact that the attached interior router will apply +# another max size. These tests do not check against EB1 max for the +# sender if the receiver is on EA1, INTA, or INTB since INTB's max +# would kick an and cause a false positive. + +# Tests that check for allowing near-max sizes use the minimum of +# the edge router's max and the attached interior router's max. + +# The bytes-over and bytes-under max that should trigger allow or deny. +# Messages with content this much over should be blocked while +# messages with content this much under should be allowed. +# * client overhead is typically 16 bytes or so +# * interrouter overhead is much larger with annotations +OVER_UNDER = 200 + + +class MaxMessageSizeBlockOversize(TestCase): + """ + verify that maxMessageSize blocks oversize messages + """ + @classmethod + def setUpClass(cls): + """Start the router""" + super(MaxMessageSizeBlockOversize, cls).setUpClass() + + def router(name, mode, max_size, extra): + config = [ + ('router', {'mode': mode, + 'id': name, + 'allowUnsettledMulticast': 'yes', + 'workerThreads': W_THREADS}), + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ('policy', {'maxConnections': 100, 'enableVhostPolicy': 'true', 'maxMessageSize': max_size, 'defaultVhost': '$default'}), + ('vhost', {'hostname': '$default', 'allowUnknownUser': 'true', + 'groups': [( + '$default', { + 'users': '*', + 'maxConnections': 100, + 'remoteHosts': '*', + 'sources': '*', + 'targets': '*', + 'allowAnonymousSender': 'true', + 'allowWaypointLinks': 'true', + 'allowDynamicSource': 'true' + } + )]} + ) + ] + + if extra: + config.extend(extra) + config = Qdrouterd.Config(config) + cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) + return cls.routers[-1] + + # configuration: + # two edge routers connected via 2 interior routers with max sizes + # + # +-------+ +---------+ +---------+ +-------+ + # | EA1 |<==>| INT.A |<==>| INT.B |<==>| EB1 | + # | 50,000| | 100,000 | | 150,000 | |200,000| + # +-------+ +---------+ +---------+ +-------+ + # + # Note: + # * Messages whose senders connect to INT.A or INT.B are subject to max message size + # defined for the ingress router only. + # * Message whose senders connect to EA1 or EA2 are subject to max message size + # defined for the ingress router. If the message is forwarded through the + # connected interior router then the message is subject to another max message size + # defined by the interior router. + + cls.routers = [] + + interrouter_port = cls.tester.get_port() + cls.INTA_edge_port = cls.tester.get_port() + cls.INTB_edge_port = cls.tester.get_port() + + router('INT.A', 'interior', INTA_MAX_SIZE, + [('listener', {'role': 'inter-router', + 'port': interrouter_port}), + ('listener', {'role': 'edge', 'port': cls.INTA_edge_port})]) + cls.INT_A = cls.routers[0] + cls.INT_A.listener = cls.INT_A.addresses[0] + + router('INT.B', 'interior', INTB_MAX_SIZE, + [('connector', {'name': 'connectorToA', + 'role': 'inter-router', + 'port': interrouter_port}), + ('listener', {'role': 'edge', + 'port': cls.INTB_edge_port})]) + cls.INT_B = cls.routers[1] + cls.INT_B.listener = cls.INT_B.addresses[0] + + router('EA1', 'edge', EA1_MAX_SIZE, + [('listener', {'name': 'rc', 'role': 'route-container', + 'port': cls.tester.get_port()}), + ('connector', {'name': 'uplink', 'role': 'edge', + 'port': cls.INTA_edge_port})]) + cls.EA1 = cls.routers[2] + cls.EA1.listener = cls.EA1.addresses[0] + + router('EB1', 'edge', EB1_MAX_SIZE, + [('connector', {'name': 'uplink', + 'role': 'edge', + 'port': cls.INTB_edge_port, + 'maxFrameSize': 1024}), + ('listener', {'name': 'rc', 'role': 'route-container', + 'port': cls.tester.get_port()})]) + cls.EB1 = cls.routers[3] + cls.EB1.listener = cls.EB1.addresses[0] + + cls.INT_A.wait_router_connected('INT.B') + cls.INT_B.wait_router_connected('INT.A') + cls.EA1.wait_connectors() + cls.EB1.wait_connectors() + + def test_40_block_oversize_INTA_INTA(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A, + MaxMessageSizeBlockOversize.INT_A, + "e40", + message_size=INTA_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_40 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_41_block_oversize_INTA_INTB(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A, + MaxMessageSizeBlockOversize.INT_B, + "e41", + message_size=INTA_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_41 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_42_block_oversize_INTA_EA1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A, + MaxMessageSizeBlockOversize.EA1, + "e42", + message_size=INTA_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_42 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_43_block_oversize_INTA_EB1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A, + MaxMessageSizeBlockOversize.EB1, + "e43", + message_size=INTA_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_43 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_44_block_oversize_INTB_INTA(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B, + MaxMessageSizeBlockOversize.INT_A, + "e44", + message_size=INTB_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_44 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_45_block_oversize_INTB_INTB(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B, + MaxMessageSizeBlockOversize.INT_B, + "e45", + message_size=INTB_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_45 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_46_block_oversize_INTB_EA1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B, + MaxMessageSizeBlockOversize.EA1, + "e46", + message_size=INTB_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_46 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_47_block_oversize_INTB_EB1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B, + MaxMessageSizeBlockOversize.EB1, + "e47", + message_size=INTB_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_47 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_48_block_oversize_EA1_INTA(self): + if EA1_MAX_SIZE >= INTA_MAX_SIZE: + self.skipTest("EA1 sending to INT.A may be blocked by EA1 limit and also by INT.A limit. That condition is a separate test.") + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1, + MaxMessageSizeBlockOversize.INT_A, + "e48", + message_size=EA1_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_48 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_49_block_oversize_EA1_INTB(self): + if EA1_MAX_SIZE >= INTA_MAX_SIZE: + self.skipTest("EA1 sending to INT.B may be blocked by EA1 limit and also by INT.A limit. That condition is a separate test.") + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1, + MaxMessageSizeBlockOversize.INT_B, + "e49", + message_size=EA1_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_49 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_4a_block_oversize_EA1_EA1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1, + MaxMessageSizeBlockOversize.EA1, + "e4a", + message_size=EA1_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_4a test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_4b_block_oversize_EA1_EB1(self): + if EA1_MAX_SIZE >= INTA_MAX_SIZE: + self.skipTest("EA1 sending to EB1 may be blocked by EA1 limit and also by INT.A limit. That condition is a separate test.") + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1, + MaxMessageSizeBlockOversize.EB1, + "e4b", + message_size=EA1_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_4b test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_4c_block_oversize_EB1_INTA(self): + if EB1_MAX_SIZE > INTB_MAX_SIZE: + self.skipTest("EB1 sending to INT.A may be blocked by EB1 limit and also by INT.B limit. That condition is a separate test.") + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.INT_A, + "e4c", + message_size=EB1_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_4c test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_4d_block_oversize_EB1_INTB(self): + if EB1_MAX_SIZE > INTB_MAX_SIZE: + self.skipTest("EB1 sending to INT.B may be blocked by EB1 limit and also by INT.B limit. That condition is a separate test.") + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.INT_B, + "e4d", + message_size=EB1_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_4d test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_4e_block_oversize_EB1_EA1(self): + if EB1_MAX_SIZE > INTB_MAX_SIZE: + self.skipTest("EB1 sending to EA1 may be blocked by EB1 limit and also by INT.B limit. That condition is a separate test.") + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.EA1, + "e4e", + message_size=EB1_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_4e test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_4f_block_oversize_EB1_EB1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.EB1, + "e4f", + message_size=EB1_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_4f test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + # + # tests under maxMessageSize should not block + # + def test_50_allow_undersize_INTA_INTA(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A, + MaxMessageSizeBlockOversize.INT_A, + "e50", + message_size=INTA_MAX_SIZE - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_50 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_51_allow_undersize_INTA_INTB(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A, + MaxMessageSizeBlockOversize.INT_B, + "e51", + message_size=INTA_MAX_SIZE - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_51 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_52_allow_undersize_INTA_EA1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A, + MaxMessageSizeBlockOversize.EA1, + "e52", + message_size=INTA_MAX_SIZE - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_52 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_53_allow_undersize_INTA_EB1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A, + MaxMessageSizeBlockOversize.EB1, + "e53", + message_size=INTA_MAX_SIZE - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_53 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_54_allow_undersize_INTB_INTA(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B, + MaxMessageSizeBlockOversize.INT_A, + "e54", + message_size=INTB_MAX_SIZE - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_54 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_55_allow_undersize_INTB_INTB(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B, + MaxMessageSizeBlockOversize.INT_B, + "e55", + message_size=INTB_MAX_SIZE - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_55 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_56_allow_undersize_INTB_EA1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B, + MaxMessageSizeBlockOversize.EA1, + "e56", + message_size=INTB_MAX_SIZE - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_56 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_57_allow_undersize_INTB_EB1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B, + MaxMessageSizeBlockOversize.EB1, + "e57", + message_size=INTB_MAX_SIZE - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_57 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_58_allow_undersize_EA1_INTA(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1, + MaxMessageSizeBlockOversize.INT_A, + "e58", + message_size=min(EA1_MAX_SIZE, INTA_MAX_SIZE) - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_58 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + + def test_59_allow_undersize_EA1_INTB(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1, + MaxMessageSizeBlockOversize.INT_B, + "e59", + message_size=min(EA1_MAX_SIZE, INTA_MAX_SIZE) - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_59 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + + def test_5a_allow_undersize_EA1_EA1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1, + MaxMessageSizeBlockOversize.EA1, + "e5a", + message_size=min(EA1_MAX_SIZE, INTA_MAX_SIZE) - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_5a test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + + def test_5b_allow_undersize_EA1_EB1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1, + MaxMessageSizeBlockOversize.EB1, + "e5b", + message_size=min(EA1_MAX_SIZE, INTA_MAX_SIZE) - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_5b test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_5c_allow_undersize_EB1_INTA(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.INT_A, + "e5c", + message_size=min(EB1_MAX_SIZE, INTB_MAX_SIZE) - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_5c test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + + def test_5d_allow_undersize_EB1_INTB(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.INT_B, + "e5d", + message_size=min(EB1_MAX_SIZE, INTB_MAX_SIZE) - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_5d test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + + def test_5e_allow_undersize_EB1_EA1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.EA1, + "e5e", + message_size=min(EB1_MAX_SIZE, INTB_MAX_SIZE) - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_5e test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + + def test_5f_allow_undersize_EB1_EB1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.EB1, + "e5f", + message_size=min(EB1_MAX_SIZE, INTB_MAX_SIZE) - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_5f test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + +if __name__ == '__main__': + unittest.main(main_module()) diff --git a/tests/system_tests_policy_oversize_compound.py b/tests/system_tests_policy_oversize_compound.py new file mode 100644 index 0000000000..92c28a9dca --- /dev/null +++ b/tests/system_tests_policy_oversize_compound.py @@ -0,0 +1,452 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from __future__ import print_function + +import unittest as unittest +import os, json, re, signal +import sys +import time + +from system_test import TestCase, Qdrouterd, main_module, Process, TIMEOUT, DIR, QdManager, Logger +from subprocess import PIPE, STDOUT +from proton import ConnectionException, Timeout, Url, symbol, Message +from proton.handlers import MessagingHandler +from proton.reactor import Container, ReceiverOption +from proton.utils import BlockingConnection, LinkDetached, SyncRequestResponse +from qpid_dispatch_internal.policy.policy_util import is_ipv6_enabled +from qpid_dispatch_internal.compat import dict_iteritems +from test_broker import FakeBroker + +W_THREADS = 2 + +# DISPATCH-975 Detect that an oversize message is blocked. +# These tests check compound blocking where the the sender is blocked by +# the ingress edge routers and/or by the uplink interior router. + +class OversizeMessageTransferTest(MessagingHandler): + """ + This test connects a sender and a receiver. Then it tries to send _count_ + number of messages of the given size through the router or router network. + + The ingress router should allow the sender's oversize message. + The message is blocked by the uplink router by rejecting the message + and closing the connection between the interior and edge routers. + The receiver may receive + aborted message indications but that is not guaranteed. If any aborted + messages are received then the count must be at most one. + The test is a success when, when, when, who knows? HACK + """ + def __init__(self, sender_host, receiver_host, test_address, + message_size=100000, count=10, print_to_console=False): + super(OversizeMessageTransferTest, self).__init__() + self.sender_host = sender_host + self.receiver_host = receiver_host + self.test_address = test_address + self.msg_size = message_size + self.count = count + self.expect_block = True + + self.sender_conn = None + self.receiver_conn = None + self.error = None + self.sender = None + self.receiver = None + self.proxy = None + + self.n_sent = 0 + self.n_rcvd = 0 + self.n_accepted = 0 + self.n_rejected = 0 + self.n_aborted = 0 + self.n_connection_error = 0 + + self.logger = Logger(title=("OversizeMessageTransferTest - %s" % (self.test_address)), print_to_console=print_to_console) + self.log_unhandled = False # verbose diagnostics of proton callbacks + + def timeout(self): + self.error = "Timeout Expired: n_sent=%d n_rcvd=%d n_rejected=%d n_aborted=%d" % \ + (self.n_sent, self.n_rcvd, self.n_rejected, self.n_aborted) + self.logger.log("self.timeout " + self.error) + self._shut_down_test() + + def on_start(self, event): + self.logger.log("on_start") + self.timer = event.reactor.schedule(10, Timeout(self)) + self.logger.log("on_start: opening receiver connection to %s" % (self.receiver_host.addresses[0])) + self.receiver_conn = event.container.connect(self.receiver_host.addresses[0]) + self.logger.log("on_start: opening sender connection to %s" % (self.sender_host.addresses[0])) + self.sender_conn = event.container.connect(self.sender_host.addresses[0]) + self.logger.log("on_start: Creating receiver") + self.receiver = event.container.create_receiver(self.receiver_conn, self.test_address) + self.logger.log("on_start: Creating sender") + self.sender = event.container.create_sender(self.sender_conn, self.test_address) + self.logger.log("on_start: done") + + def send(self): + while self.sender.credit > 0 and self.n_sent < self.count: + # construct message in indentifiable chunks + body_msg = "" + padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[self.n_sent % 30] + while len(body_msg) < self.msg_size: + chunk = "[%s:%d:%d" % (self.test_address, self.n_sent, len(body_msg)) + padlen = 50 - len(chunk) + chunk += padchar * padlen + body_msg += chunk + if len(body_msg) > self.msg_size: + body_msg = body_msg[:self.msg_size] + self.logger.log("send. address:%s message:%d of %s length=%d" % + (self.test_address, self.n_sent, self.count, self.msg_size)) + m = Message(body=body_msg) + self.sender.send(m) + self.n_sent += 1 + + def on_sendable(self, event): + if event.sender == self.sender: + self.logger.log("on_sendable") + self.send() + + def on_message(self, event): + if self.expect_block: + # All messages should violate maxMessageSize. + # Receiving any is an error. + self.error = "Received a message. Expected to receive no messages." + self.logger.log(self.error) + self._shut_down_test() + else: + self.n_rcvd += 1 + self.accept(event.delivery) + self._check_done() + + def on_connection_error(self, event): + if event.connection == self.sender_conn: + if event.connection.remote_condition.name == "amqp:connection:message-size-exceeded": + self.n_connection_error += 1 + self.sender_conn.close() + self.sender_conn = None + else: + # sender closed but for wrong reason + self.error = "sender close error: Expected amqp:connection:message-size-exceeded but received %s" % \ + event.connection.remote_condition.name + self.logger.log(self.error) + else: + # connection error but not for sender + self.error = "unexpected connection close error: wrong connection closed. condition= %s" % \ + event.connection.remote_condition.name + self.logger.log(self.error) + self._check_done() + + def _shut_down_test(self): + if self.timer: + self.timer.cancel() + self.timer = None + if self.sender: + self.sender.close() + self.sender = None + if self.receiver: + self.receiver.close() + self.receiver = None + if self.sender_conn: + self.sender_conn.close() + self.sender_conn = None + if self.receiver_conn: + self.receiver_conn.close() + self.receiver_conn = None + + def _check_done(self): + current = ("check_done: sent=%d rcvd=%d rejected=%d aborted=%d connection_error:%d" % + (self.n_sent, self.n_rcvd, self.n_rejected, self.n_aborted, self.n_connection_error)) + self.logger.log(current) + if self.error is not None: + self.logger.log("TEST FAIL") + self._shut_down_test() + else: + done = self.n_rejected == 1 + + if done: + self.logger.log("TEST DONE!!!") + # self.log_unhandled = True # verbose debugging + self._shut_down_test() + + def on_rejected(self, event): + self.n_rejected += 1 + if self.expect_block: + self.logger.log("on_rejected: entry") + self._check_done() + else: + self.error = "Unexpected on_reject" + self.logger.log(self.error) + self._check_done() + + def on_aborted(self, event): + self.logger.log("on_aborted") + self.n_aborted += 1 + self._check_done() + + def on_error(self, event): + self.error = "Container error" + self.logger.log(self.error) + self.sender_conn.close() + self.receiver_conn.close() + self.timer.cancel() + + def on_link_error(self, event): + self.error = event.link.remote_condition.name + self.logger.log("on_link_error: %s" % (self.error)) + # + # qpid-proton master @ 6abb4ce + # At this point the container is wedged and closing the connections does + # not get the container to exit. + # Instead, raise an exception that bypasses normal container exit. + # This class then returns something for the main test to evaluate. + # + raise Exception(self.error) + + def on_unhandled(self, method, *args): + if self.log_unhandled: + self.logger.log("on_unhandled: method: %s, args: %s" % (method, args)) + + def run(self): + try: + Container(self).run() + except Exception as e: + self.error = "Container run exception: %s" % (e) + self.logger.log(self.error) + self.logger.dump() + + + +# For the next test case define max sizes for each router. +# These are the configured maxMessageSize values +EA1_MAX_SIZE = 50000 +INTA_MAX_SIZE = 100000 +INTB_MAX_SIZE = 150000 +EB1_MAX_SIZE = 200000 + +# Interior routers enforce max size directly. +# Edge routers are also checked by the attached interior router. + +# Block tests that use edge routers that send messages to the network must +# account for the fact that the attached interior router will apply +# another max size. These tests do not check against EB1 max for the +# sender if the receiver is on EA1, INTA, or INTB since INTB's max +# would kick an and cause a false positive. + +# Tests that check for allowing near-max sizes use the minimum of +# the edge router's max and the attached interior router's max. + +# The bytes-over and bytes-under max that should trigger allow or deny. +# Messages with content this much over should be blocked while +# messages with content this much under should be allowed. +# * client overhead is typically 16 bytes or so +# * interrouter overhead is much larger with annotations +OVER_UNDER = 200 + + +class MaxMessageSizeBlockOversize(TestCase): + """ + verify that maxMessageSize blocks oversize messages + """ + @classmethod + def setUpClass(cls): + """Start the router""" + super(MaxMessageSizeBlockOversize, cls).setUpClass() + + def router(name, mode, max_size, extra): + config = [ + ('router', {'mode': mode, + 'id': name, + 'allowUnsettledMulticast': 'yes', + 'workerThreads': W_THREADS}), + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ('policy', {'maxConnections': 100, 'enableVhostPolicy': 'true', 'maxMessageSize': max_size, 'defaultVhost': '$default'}), + ('vhost', {'hostname': '$default', 'allowUnknownUser': 'true', + 'groups': [( + '$default', { + 'users': '*', + 'maxConnections': 100, + 'remoteHosts': '*', + 'sources': '*', + 'targets': '*', + 'allowAnonymousSender': 'true', + 'allowWaypointLinks': 'true', + 'allowDynamicSource': 'true' + } + )]} + ) + ] + + if extra: + config.extend(extra) + config = Qdrouterd.Config(config) + cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) + return cls.routers[-1] + + # configuration: + # two edge routers connected via 2 interior routers with max sizes + # + # +-------+ +---------+ +---------+ +-------+ + # | EA1 |<==>| INT.A |<==>| INT.B |<==>| EB1 | + # | 50,000| | 100,000 | | 150,000 | |200,000| + # +-------+ +---------+ +---------+ +-------+ + # + # Note: + # * Messages whose senders connect to INT.A or INT.B are subject to max message size + # defined for the ingress router only. + # * Message whose senders connect to EA1 or EA2 are subject to max message size + # defined for the ingress router. If the message is forwarded through the + # connected interior router then the message is subject to another max message size + # defined by the interior router. + + cls.routers = [] + + interrouter_port = cls.tester.get_port() + cls.INTA_edge_port = cls.tester.get_port() + cls.INTB_edge_port = cls.tester.get_port() + + router('INT.A', 'interior', INTA_MAX_SIZE, + [('listener', {'role': 'inter-router', + 'port': interrouter_port}), + ('listener', {'role': 'edge', 'port': cls.INTA_edge_port})]) + cls.INT_A = cls.routers[0] + cls.INT_A.listener = cls.INT_A.addresses[0] + + router('INT.B', 'interior', INTB_MAX_SIZE, + [('connector', {'name': 'connectorToA', + 'role': 'inter-router', + 'port': interrouter_port}), + ('listener', {'role': 'edge', + 'port': cls.INTB_edge_port})]) + cls.INT_B = cls.routers[1] + cls.INT_B.listener = cls.INT_B.addresses[0] + + router('EA1', 'edge', EA1_MAX_SIZE, + [('listener', {'name': 'rc', 'role': 'route-container', + 'port': cls.tester.get_port()}), + ('connector', {'name': 'uplink', 'role': 'edge', + 'port': cls.INTA_edge_port})]) + cls.EA1 = cls.routers[2] + cls.EA1.listener = cls.EA1.addresses[0] + + router('EB1', 'edge', EB1_MAX_SIZE, + [('connector', {'name': 'uplink', + 'role': 'edge', + 'port': cls.INTB_edge_port, + 'maxFrameSize': 1024}), + ('listener', {'name': 'rc', 'role': 'route-container', + 'port': cls.tester.get_port()})]) + cls.EB1 = cls.routers[3] + cls.EB1.listener = cls.EB1.addresses[0] + + cls.INT_A.wait_router_connected('INT.B') + cls.INT_B.wait_router_connected('INT.A') + cls.EA1.wait_connectors() + cls.EB1.wait_connectors() + + def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None): + p = self.popen( + ['qdmanage'] + + cmd.split(' ') + + ['--bus', + address or self.address(), + '--indent=-1', '--timeout', str(TIMEOUT)], + stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect, + universal_newlines=True) + out = p.communicate(input)[0] + try: + p.teardown() + except Exception as e: + raise Exception("%s\n%s" % (e, out)) + return out + + def sense_n_closed_lines(self, routername): + with open("../setUpClass/%s.log" % routername, 'r') as router_log: + log_lines = router_log.read().split("\n") + closed_lines = [s for s in log_lines if "amqp:connection:message-size-exceeded" in s] + return len(closed_lines) + + # verify that a message can go through an edge and get blocked by interior + # This test has a sender on EB1 and a receiver on the attached interior INT_B + def test_60_block_oversize_EB1_INTB_at_INTB(self): + closed_before = self.sense_n_closed_lines("EB1") + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.INT_B, + "e60", + message_size=EB1_MAX_SIZE - OVER_UNDER, + print_to_console=False) + test.run() + if test.error is not None: + test.logger.log("test_60 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + # Verify that interrouter link was shut down + closed_after = self.sense_n_closed_lines("EB1") + if (not closed_after == (closed_before + 1)): + print("FAIL: N closed events in log file did not increment by 1. Before: %d, After: %d" % (closed_before, closed_after)) + self.assertTrue(closed_after == (closed_before + 1), "Expected to receive close with condition: message size exceeded") + + # verify that a message can go through an edge and get blocked by interior + def test_61_block_oversize_EB1_EA1_at_INTB(self): + closed_before = self.sense_n_closed_lines("EB1") + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.EA1, + "e61", + message_size=EB1_MAX_SIZE - OVER_UNDER, + print_to_console=False) + test.run() + if test.error is not None: + test.logger.log("test_61 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + # Verify that interrouter link was shut down + closed_after = self.sense_n_closed_lines("EB1") + if (not closed_after == (closed_before + 1)): + print("FAIL: N closed events in log file did not increment by 1. Before: %d, After: %d" % (closed_before, closed_after)) + self.assertTrue(closed_after == (closed_before + 1), "Expected to receive close with condition: message size exceeded") + + # see what happens when a message must be blocked by edge and also by interior + def xtest_70_block_oversize_EB1_INTB_at_both(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.INT_B, + "e70", + message_size=EB1_MAX_SIZE + OVER_UNDER, + print_to_console=False) + test.run() + if test.error is not None: + test.logger.log("test_70 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + # Verify that interrouter link was shut down + with open('../setUpClass/EB1.log', 'r') as router_log: + log_lines = router_log.read().split("\n") + close_lines = [s for s in log_lines if "amqp:connection:message-size-exceeded" in s] + self.assertTrue(len(close_lines) == 1, "Expected to receive close with condition: message size exceeded") + + + +if __name__ == '__main__': + unittest.main(main_module())