Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/qpid/dispatch/amqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ extern const char * const QD_AMQP_COND_PRECONDITION_FAILED;
extern const char * const QD_AMQP_COND_RESOURCE_DELETED;
extern const char * const QD_AMQP_COND_ILLEGAL_STATE;
extern const char * const QD_AMQP_COND_FRAME_SIZE_TOO_SMALL;
extern const char * const QD_AMQP_COND_MESSAGE_SIZE_EXCEEDED;

extern const char * const QD_AMQP_COND_CONNECTION_FORCED;
/// @};
Expand Down
2 changes: 2 additions & 0 deletions include/qpid/dispatch/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ void qd_session_free(qd_session_t *qd_ssn);
bool qd_session_is_q3_blocked(const qd_session_t *qd_ssn);
qd_link_list_t *qd_session_q3_blocked_links(qd_session_t *qd_ssn);

void qd_connection_log_policy_denial(qd_link_t *link, const char *text);


// handy macros to get around PROTON-2184: pn_session_set_context aborts if
// context==0 (can remove this once qdrouter requires >= proton 0.31.x)
Expand Down
6 changes: 6 additions & 0 deletions include/qpid/dispatch/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,12 @@ void qd_message_set_aborted(const qd_message_t *msg, bool aborted);
*/
uint8_t qd_message_get_priority(qd_message_t *msg);

/**
* True if message is larger that maxMessageSize
* @param msg A pointer to the message
* @return
*/
bool qd_message_oversize(const qd_message_t *msg);

///@}

Expand Down
2 changes: 2 additions & 0 deletions include/qpid/dispatch/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,8 @@ bool qd_connection_strip_annotations_in(const qd_connection_t *c);

void qd_connection_wake(qd_connection_t *ctx);

int qd_connection_max_message_size(const qd_connection_t *c);

/**
* @}
*/
Expand Down
23 changes: 21 additions & 2 deletions python/qpid_dispatch/management/qdrouter.json
Original file line number Diff line number Diff line change
Expand Up @@ -1869,6 +1869,13 @@
"required": false,
"create": true
},
"maxMessageSize": {
"type": "integer",
"default": 0,
"description": "The maximum size in bytes of AMQP message transfers allowed for this router. This limit is applied only to transfers over user connections and is not applied to interrouter or edge router connections. This limit may be overridden by vhost or by vhost user group settings. A value of zero disables this limit.",
"required": false,
"create": true
},
"enableVhostPolicy": {
"type": "boolean",
"default": false,
Expand Down Expand Up @@ -1905,10 +1912,11 @@
"graph": true,
"description": "The sum of all vhost sender and receiver denials."
},
"maxMessageSizeDenied": {"type": "integer", "graph": true},
"totalDenials": {
"type": "integer",
"graph": true,
"description": "The total number of connection and link denials."
"description": "The total number of connection, link, and transfer denials."
}
}
},
Expand All @@ -1933,6 +1941,11 @@
"create": true,
"update": true
},
"maxMessageSize": {
"type": "integer",
"description": "Optional maximum size in bytes of AMQP message transfers allowed for connections to this vhost. This limit overrides the policy maxMessageSize value and may be overridden by vhost user group settings. A value of zero disables this limit.",
"required": false
},
"maxConnectionsPerUser": {
"type": "integer",
"default": 65535,
Expand Down Expand Up @@ -1993,6 +2006,11 @@
"required": false,
"create": false
},
"maxMessageSize": {
"type": "integer",
"description": "Optional maximum size in bytes of AMQP message transfers allowed for connections created by users in this group. This limit overrides the policy and vhost maxMessageSize values. A value of zero disables this limit.",
"required": false
},
"maxFrameSize": {
"type": "integer",
"description": "The largest frame, in bytes, that may be sent on this connection. Non-zero policy values overwrite values specified for a listener object (AMQP Open, max-frame-size).",
Expand Down Expand Up @@ -2130,7 +2148,8 @@

"sessionDenied": {"type": "integer", "graph": true},
"senderDenied": {"type": "integer", "graph": true},
"receiverDenied": {"type": "integer", "graph": true}
"receiverDenied": {"type": "integer", "graph": true},
"maxMessageSizeDenied": {"type": "integer", "graph": true}
}
},

Expand Down
2 changes: 2 additions & 0 deletions python/qpid_dispatch_internal/management/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,12 @@ def configure(attributes):
policyDir = config.by_type('policy')[0]['policyDir']
policyDefaultVhost = config.by_type('policy')[0]['defaultVhost']
useHostnamePatterns = config.by_type('policy')[0]['enableVhostNamePatterns']
maxMessageSize = config.by_type('policy')[0]['maxMessageSize']
for a in config.by_type("policy"):
configure(a)
agent.policy.set_default_vhost(policyDefaultVhost)
agent.policy.set_use_hostname_patterns(useHostnamePatterns)
agent.policy.set_max_message_size(maxMessageSize)

# Remaining configuration
for t in "sslProfile", "authServicePlugin", "listener", "connector", \
Expand Down
37 changes: 33 additions & 4 deletions python/qpid_dispatch_internal/policy/policy_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class PolicyCompiler(object):
"""
Validate incoming configuration for legal schema.
- Warn about section options that go unused.
- Disallow negative max connection numbers.
- Disallow negative max connection/message size numbers.
- Check that connectionOrigins resolve to IP hosts.
- Enforce internal consistency,
"""
Expand All @@ -131,6 +131,7 @@ class PolicyCompiler(object):
PolicyKeys.KW_IGNORED_TYPE,
PolicyKeys.KW_VHOST_NAME,
PolicyKeys.KW_MAXCONN,
PolicyKeys.KW_MAX_MESSAGE_SIZE,
PolicyKeys.KW_MAXCONNPERHOST,
PolicyKeys.KW_MAXCONNPERUSER,
PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT,
Expand Down Expand Up @@ -232,7 +233,7 @@ def compile_connection_group(self, vhostname, groupname, val, list_out, warnings

def compile_app_settings(self, vhostname, usergroup, policy_in, policy_out, warnings, errors):
"""
Compile a schema from processed json format to local internal format.
Compile a vhostUserGroupSettings schema from processed json format to local internal format.
@param[in] name vhost name
@param[in] policy_in user config settings
@param[out] policy_out validated Internal format
Expand All @@ -248,7 +249,7 @@ def compile_app_settings(self, vhostname, usergroup, policy_in, policy_out, warn
policy_out[PolicyKeys.KW_REMOTE_HOSTS] = ''
# DISPATCH-1277 - KW_MAX_FRAME_SIZE must be defaulted to 16384 not 2147483647
policy_out[PolicyKeys.KW_MAX_FRAME_SIZE] = 16384
policy_out[PolicyKeys.KW_MAX_MESSAGE_SIZE] = 0
policy_out[PolicyKeys.KW_MAX_MESSAGE_SIZE] = None
policy_out[PolicyKeys.KW_MAX_SESSION_WINDOW] = 2147483647
policy_out[PolicyKeys.KW_MAX_SESSIONS] = 65536
policy_out[PolicyKeys.KW_MAX_SENDERS] = 2147483647
Expand Down Expand Up @@ -411,7 +412,7 @@ def compile_app_settings(self, vhostname, usergroup, policy_in, policy_out, warn

def compile_access_ruleset(self, name, policy_in, policy_out, warnings, errors):
"""
Compile a schema from processed json format to local internal format.
Compile a vhost schema from processed json format to local internal format.
@param[in] name vhost name
@param[in] policy_in raw policy to be validated
@param[out] policy_out validated Internal format
Expand All @@ -429,6 +430,7 @@ def compile_access_ruleset(self, name, policy_in, policy_out, warnings, errors):
policy_out[PolicyKeys.KW_MAXCONNPERUSER] = 65535
policy_out[PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT] = False
policy_out[PolicyKeys.KW_GROUPS] = {}
policy_out[PolicyKeys.KW_MAX_MESSAGE_SIZE] = None

# validate the options
for key, val in dict_iteritems(policy_in):
Expand All @@ -445,6 +447,14 @@ def compile_access_ruleset(self, name, policy_in, policy_out, warnings, errors):
errors.append(msg)
return False
policy_out[key] = val
elif key in [PolicyKeys.KW_MAX_MESSAGE_SIZE
]:
if not self.validateNumber(val, 0, 0, cerror):
msg = ("Policy vhost '%s' option '%s' has error '%s'." %
(name, key, cerror[0]))
errors.append(msg)
return False
policy_out[key] = val
elif key in [PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT]:
if not type(val) is bool:
errors.append("Policy vhost '%s' option '%s' must be of type 'bool' but is '%s'" %
Expand Down Expand Up @@ -609,6 +619,9 @@ def __init__(self, manager):
# When true policy ruleset definitions are propagated to C code
self.use_hostname_patterns = False

# _max_message_size
# holds global value from policy config object
self._max_message_size = 0
#
# Service interfaces
#
Expand Down Expand Up @@ -830,6 +843,13 @@ def lookup_settings(self, vhost_in, groupname, upolicy):

upolicy.update(ruleset[PolicyKeys.KW_GROUPS][groupname])

maxsize = upolicy.get(PolicyKeys.KW_MAX_MESSAGE_SIZE, None)
if maxsize is None:
maxsize = ruleset.get(PolicyKeys.KW_MAX_MESSAGE_SIZE, None)
if maxsize is None:
maxsize = self._max_message_size
upolicy[PolicyKeys.KW_MAX_MESSAGE_SIZE] = maxsize

upolicy[PolicyKeys.KW_CSTATS] = self.statsdb[vhost].get_cstats()
return True
except Exception as e:
Expand All @@ -851,6 +871,15 @@ def close_connection(self, conn_id):
self._manager.log_trace(
"Policy internal error closing connection id %s. %s" % (conn_id, str(e)))

def set_max_message_size(self, size):
"""
record max message size from policy config object
:param size:
:return:ls

"""
self._max_message_size = size

#
#
def test_load_config(self):
Expand Down
8 changes: 8 additions & 0 deletions python/qpid_dispatch_internal/policy/policy_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ def close_connection(self, conn_id):
@return: none
"""
self._policy_local.close_connection(conn_id)

def set_max_message_size(self, size):
"""
Policy has set global maxMessageSize.
:param size:
:return: none
"""
self._policy_local.set_max_message_size(size)
#
#
#
Expand Down
1 change: 1 addition & 0 deletions src/amqp.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ const char * const QD_AMQP_COND_RESOURCE_DELETED = "amqp:resource-deleted";
const char * const QD_AMQP_COND_ILLEGAL_STATE = "amqp:illegal-state";
const char * const QD_AMQP_COND_FRAME_SIZE_TOO_SMALL = "amqp:frame-size-too-small";
const char * const QD_AMQP_COND_CONNECTION_FORCED = "amqp:connection:forced";
const char * const QD_AMQP_COND_MESSAGE_SIZE_EXCEEDED = "amqp:connection:message-size-exceeded";

const char * const QD_AMQP_PORT_STR = "5672";
const char * const QD_AMQPS_PORT_STR = "5671";
Expand Down
7 changes: 5 additions & 2 deletions src/container.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link)
}


static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link)
static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link, int max_size)
{
qd_node_t *node = container->default_node;

Expand Down Expand Up @@ -191,6 +191,9 @@ static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link)
link->node = node;
link->remote_snd_settle_mode = pn_link_remote_snd_settle_mode(pn_link);

if (max_size) {
pn_link_set_max_message_size(pn_link, (uint64_t)max_size);
}
pn_link_set_context(pn_link, link);
node->ntype->incoming_handler(node->context, link);
}
Expand Down Expand Up @@ -652,7 +655,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
qd_conn->n_senders++;
}
setup_incoming_link(container, pn_link);
setup_incoming_link(container, pn_link, qd_connection_max_message_size(qd_conn));
}
} else if (pn_link_state(pn_link) & PN_LOCAL_ACTIVE)
handle_link_open(container, pn_link);
Expand Down
40 changes: 31 additions & 9 deletions src/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "compose_private.h"
#include "connection_manager_private.h"
#include "aprintf.h"
#include "policy.h"
#include <string.h>
#include <ctype.h>
#include <stdio.h>
Expand Down Expand Up @@ -941,7 +942,6 @@ qd_message_t *qd_message()
msg->content->lock = sys_mutex();
sys_atomic_init(&msg->content->ref_count, 1);
msg->content->parse_depth = QD_DEPTH_NONE;

return (qd_message_t*) msg;
}

Expand Down Expand Up @@ -1244,14 +1244,13 @@ void qd_message_set_tag_sent(qd_message_t *in_msg, bool tag_sent)
/**
* Receive and discard large messages for which there is no destination.
* Don't waste resources by putting the message into internal buffers.
* Don't fiddle with locking as no sender is competing with reception.
* Message locking is not required since the message content buffers are untouched.
*/
qd_message_t *discard_receive(pn_delivery_t *delivery,
pn_link_t *link,
qd_message_t *msg_in)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*)msg_in;

while (1) {
#define DISCARD_BUFFER_SIZE (128 * 1024)
char dummy[DISCARD_BUFFER_SIZE];
Expand All @@ -1261,13 +1260,15 @@ qd_message_t *discard_receive(pn_delivery_t *delivery,
// have read all available pn_link incoming bytes
break;
} else if (rc == PN_EOS || rc < 0) {
// end of message or error. Call the message complete
msg->content->receive_complete = true;
// End of message or error: finalize message_receive handling
msg->content->aborted = pn_delivery_aborted(delivery);
qd_nullify_safe_ptr(&msg->content->input_link_sp);

pn_record_t *record = pn_delivery_attachments(delivery);
pn_record_set(record, PN_DELIVERY_CTX, 0);
if (msg->content->oversize) {
msg->content->aborted = true;
}
msg->content->receive_complete = true;
break;
} else {
// rc was > 0. bytes were read and discarded.
Expand Down Expand Up @@ -1308,13 +1309,15 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
msg->strip_annotations_in = qd_connection_strip_annotations_in(qdc);
pn_record_def(record, PN_DELIVERY_CTX, PN_WEAKREF);
pn_record_set(record, PN_DELIVERY_CTX, (void*) msg);
msg->content->max_message_size = qd_connection_max_message_size(qdc);
}

//
// The discard flag indicates we should keep reading the input stream
// but not process the message for delivery.
// Oversize messages are also discarded.
//
if (msg->content->discard) {
if (msg->content->discard || msg->content->oversize) {
return discard_receive(delivery, link, (qd_message_t *)msg);
}

Expand Down Expand Up @@ -1416,10 +1419,23 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
recv_error = true;
} else if (rc > 0) {
//
// We have received a positive number of bytes for the message. Advance
// the cursor in the buffer.
// We have received a positive number of bytes for the message.
// Advance the cursor in the buffer.
//
qd_buffer_insert(content->pending, rc);

// Handle maxMessageSize violations
if (content->max_message_size) {
content->bytes_received += rc;
if (content->bytes_received > content->max_message_size)
{
qd_connection_t *conn = qd_link_connection(qdl);
qd_connection_log_policy_denial(qdl, "DENY AMQP Transfer maxMessageSize exceeded");
qd_policy_count_max_size_event(link, conn);
content->oversize = true;
return discard_receive(delivery, link, (qd_message_t*)msg);
}
}
} else {
//
// We received zero bytes, and no PN_EOS. This means that we've received
Expand Down Expand Up @@ -2231,3 +2247,9 @@ void qd_message_set_aborted(const qd_message_t *msg, bool aborted)
qd_message_pvt_t * msg_pvt = (qd_message_pvt_t *)msg;
msg_pvt->content->aborted = aborted;
}

bool qd_message_oversize(const qd_message_t *msg)
{
qd_message_content_t * mc = MSG_CONTENT(msg);
return mc->oversize;
}
3 changes: 3 additions & 0 deletions src/message_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ typedef struct {
qd_parsed_field_t *ma_pf_to_override;
qd_parsed_field_t *ma_pf_trace;
int ma_int_phase;
int max_message_size; // configured max; 0 if no max to enforce
int bytes_received; // bytes returned by pn_link_recv() when enforcing max_message_size
uint32_t fanout; // The number of receivers for this message, including in-process subscribers.
qd_link_t_sp input_link_sp; // message received on this link

Expand All @@ -120,6 +122,7 @@ typedef struct {
bool disable_q2_holdoff; // Disable the Q2 flow control
bool priority_parsed;
bool priority_present;
bool oversize; // policy oversize handling in effect
uint8_t priority; // The priority of this message
} qd_message_content_t;

Expand Down
Loading