Skip to content
Permalink
Browse files
Merge branch 'tross-DISPATCH-179-1'
Conflicts:
	include/qpid/dispatch/container.h
	include/qpid/dispatch/ctools.h
	include/qpid/dispatch/server.h
	python/qpid_dispatch_internal/management/agent.py
	src/connection_manager.c
	src/container.c
	src/router_config.c
	src/router_node.c
	src/router_pynode.c
	tests/system_tests_qdmanage.py
  • Loading branch information
ted-ross committed Mar 18, 2016
2 parents c6ccf1e + 620d779 commit 215b5864b778e9c1817bc05e62a7e00985fee286
Show file tree
Hide file tree
Showing 78 changed files with 10,753 additions and 2,973 deletions.
@@ -46,7 +46,7 @@ if (NOT PYTHONLIBS_FOUND)
message(FATAL_ERROR "Python Development Libraries are needed.")
endif (NOT PYTHONLIBS_FOUND)

set (SO_VERSION_MAJOR 1)
set (SO_VERSION_MAJOR 2)
set (SO_VERSION_MINOR 0)
set (SO_VERSION "${SO_VERSION_MAJOR}.${SO_VERSION_MINOR}")

@@ -83,3 +83,10 @@ fixedAddress {
fanout: multiple
}

##
## Temporary
##
log {
module: ROUTER_CORE
enable: trace+
}
@@ -47,6 +47,7 @@
#include <qpid/dispatch/iterator.h>
#include <qpid/dispatch/log.h>
#include <qpid/dispatch/router.h>
#include <qpid/dispatch/router_core.h>
#include <qpid/dispatch/amqp.h>
#include <qpid/dispatch/parse.h>
#include <qpid/dispatch/compose.h>
@@ -104,7 +104,8 @@ const char * const QD_CAPABILITY_ANONYMOUS_RELAY;

/** @name Link Terminus Capabilities */
/// @{
const char * const QD_CAPABILITY_ROUTER;
const char * const QD_CAPABILITY_ROUTER_CONTROL;
const char * const QD_CAPABILITY_ROUTER_DATA;
/// @}

/** @name Dynamic Node Properties */
@@ -123,6 +124,8 @@ const char * const QD_INTERNODE_LINK_NAME_2;
/** An AMQP error status code and string description */
typedef struct qd_amqp_error_t { int status; const char* description; } qd_amqp_error_t;
extern const qd_amqp_error_t QD_AMQP_OK;
extern const qd_amqp_error_t QD_AMQP_CREATED;
extern const qd_amqp_error_t QD_AMQP_NO_CONTENT;
extern const qd_amqp_error_t QD_AMQP_BAD_REQUEST;
extern const qd_amqp_error_t QD_AMQP_NOT_FOUND;
extern const qd_amqp_error_t QD_AMQP_NOT_IMPLEMENTED;
@@ -40,10 +40,16 @@ qd_bitmask_t *qd_bitmask(int initial);
void qd_bitmask_free(qd_bitmask_t *b);
void qd_bitmask_set_all(qd_bitmask_t *b);
void qd_bitmask_clear_all(qd_bitmask_t *b);
void qd_bitmask_set_bit(qd_bitmask_t *b, int bitnum);
void qd_bitmask_clear_bit(qd_bitmask_t *b, int bitnum);
int qd_bitmask_set_bit(qd_bitmask_t *b, int bitnum);
int qd_bitmask_clear_bit(qd_bitmask_t *b, int bitnum);
int qd_bitmask_value(qd_bitmask_t *b, int bitnum);
int qd_bitmask_first_set(qd_bitmask_t *b, int *bitnum);
int qd_bitmask_cardinality(const qd_bitmask_t *b);

int _qdbm_start(qd_bitmask_t *b);
void _qdbm_next(qd_bitmask_t *b, int *v);

#define QD_BITMASK_EACH(M,V,C) C=qd_bitmask_cardinality(M),V=_qdbm_start(M);V>=0 && C;_qdbm_next(M,&V),C--

///@}

@@ -47,6 +47,17 @@ qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd);
*/
void qd_connection_manager_free(qd_connection_manager_t *cm);

/**
* Free all the resources associated with a config listener
*/
void qd_config_listener_free(qd_config_listener_t *cl);


/**
* Free all the resources associated with a config connector
*/
void qd_config_connector_free(qd_config_connector_t *cl);


/**
* Start the configured Listeners and Connectors
@@ -74,7 +74,7 @@ typedef void (*qd_container_delivery_handler_t) (void *node_context, qd_link_
typedef int (*qd_container_link_handler_t) (void *node_context, qd_link_t *link);
typedef int (*qd_container_link_detach_handler_t) (void *node_context, qd_link_t *link, qd_detach_type_t dt);
typedef void (*qd_container_node_handler_t) (void *type_context, qd_node_t *node);
typedef void (*qd_container_conn_handler_t) (void *type_context, qd_connection_t *conn, void *context);
typedef int (*qd_container_conn_handler_t) (void *type_context, qd_connection_t *conn, void *context);

/**
* A set of Node handlers for deliveries, links and container events.
@@ -100,12 +100,8 @@ typedef struct {
/** Invoked when an attach for a new outgoing link is received. */
qd_container_link_handler_t outgoing_handler;

/**
* Invoked when an outgoing link is available for sending either deliveries
* or disposition changes. The handler must check the link's credit to
* determine whether (and how many) message deliveries may be sent.
*/
qd_container_link_handler_t writable_handler;
/** Invoked when an activated connection is available for writing. */
qd_container_conn_handler_t writable_handler;

/** Invoked when a link is detached. */
qd_container_link_detach_handler_t link_detach_handler;
@@ -56,6 +56,7 @@
#define DEQ_NEXT(i) DEQ_NEXT_N(,i)
#define DEQ_PREV_N(n,i) (i)->prev##n
#define DEQ_PREV(i) DEQ_PREV_N(,i)
#define DEQ_MOVE(d1,d2) do {d2 = d1; DEQ_INIT(d1);} while (0)
/**
*@pre ptr points to first element of deq
*@post ptr points to first element of deq that passes test, or 0. Test should involve ptr.
@@ -245,6 +245,15 @@ int qd_field_iterator_ncopy(qd_field_iterator_t *iter, unsigned char* buffer, in
*/
unsigned char *qd_field_iterator_copy(qd_field_iterator_t *iter);

/**
* Return a new iterator that is a duplicate of the original iterator, referring
* to the same base data. If the input iterator pointer is NULL, the duplicate
* will also be NULL (i.e. no new iterator will be created).
* @param iter Input iterator
* @return Pointer to a new, identical iterator referring to the same data.
*/
qd_field_iterator_t *qd_field_iterator_dup(const qd_field_iterator_t *iter);

/**
* Copy the iterator's view into buffer as a null terminated string,
* up to a maximum of n bytes. Cursor is advanced by the number of bytes
@@ -167,7 +167,7 @@ void qd_message_set_trace_annotation(qd_message_t *msg, qd_composed_field_t *tra
* method must not reference it after this call.
*
* @param msg Pointer to an outgoing message.
* @param to_field Pointer to a composed field representing the to overrid
* @param to_field Pointer to a composed field representing the to override
* address that will be used as the value for the QD_MA_TO map entry. If null,
* the message will not have a QA_MA_TO message annotation field. Ownership of
* this field is transferred to the message.
@@ -127,6 +127,14 @@ int32_t qd_parse_as_int(qd_parsed_field_t *field);
*/
int64_t qd_parse_as_long(qd_parsed_field_t *field);

/**
* Return the raw content as a boolean value.
*
* @param field The field pointer returned by qd_parse.
* @return The raw content of the field cast as a bool.
*/
bool qd_parse_as_bool(qd_parsed_field_t *field);

/**
* Return the number of sub-field in a compound field. If the field is
* a list or array, this is the number of items in the list/array. If
@@ -34,69 +34,17 @@

typedef struct qd_router_t qd_router_t;
typedef struct qd_address_t qd_address_t;
typedef uint8_t qd_address_semantics_t;
typedef struct qd_router_delivery_t qd_router_delivery_t;

/**
* @name Address fanout semantics
* @{
*/
#define QD_FANOUTMASK 0x03
#define QD_FANOUT_SINGLE 0x00 ///< Message will be delivered to a single consumer.
#define QD_FANOUT_MULTIPLE 0x01 ///< Message will be delivered to multiple consumers.
#define QD_FANOUT_GROUP 0x02 ///< Message will be delivered to one consumer per group.
#define QD_FANOUT(d) (d & QD_FANOUTMASK) ///< Get fanout bits.
///@}

/**
* @name Address bias semantics for SINGLE/GROUP fanout
* @{
*/

#define QD_BIASMASK 0x0c
#define QD_BIAS_NONE 0x00 ///< Apply no bias (also used for multiple fanout).
#define QD_BIAS_CLOSEST 0x04 ///< Message will be delivered to the closest (lowest cost) consumer.
#define QD_BIAS_SPREAD 0x08 ///< Messages will be spread arbitrarily across all consumers.
#define QD_BIAS_LATENCY 0x0c ///< Messages will be spread to minimize latency in light of each consumer's rate of consumption.
#define QD_BIAS(d) (d & QD_BIASMASK)
///@}


/**
* @name Address congestion semantics.
*
* This controls that the router will do with
* received messages that are destined for congested destinations.
* @{
*/
#define QD_CONGESTIONMASK 0x30
/** Drop/Release the message.*/
#define QD_CONGESTION_DROP 0x00
/**
* Stop issuing replacement credits to slow the producer. This puts a cap on
* the total number of messages addressed to this address from a particular
* producer that can be buffered in the router.
*/
#define QD_CONGESTION_BACKPRESSURE 0x10
/** Redirect messages to an alternate address. */
#define QD_CONGESTION_REDIRECT 0x20
#define QD_CONGESTION(d) (d & QD_CONGESTIONMASK)
/// @}

/** @name Other semantics
* @{
*/
#define QD_DROP_FOR_SLOW_CONSUMERS 0x40
#define QD_BYPASS_VALID_ORIGINS 0x80
///@}
typedef enum {
QD_TREATMENT_MULTICAST_FLOOD = 0,
QD_TREATMENT_MULTICAST_ONCE = 1,
QD_TREATMENT_ANYCAST_CLOSEST = 2,
QD_TREATMENT_ANYCAST_BALANCED = 3,
QD_TREATMENT_LINK_BALANCED = 4
} qd_address_treatment_t;

/**
* @name Sematics groups
* @{
*/
#define QD_SEMANTICS_ROUTER_CONTROL (QD_FANOUT_MULTIPLE | QD_BIAS_NONE | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS)
#define QD_SEMANTICS_DEFAULT (QD_FANOUT_MULTIPLE | QD_BIAS_NONE | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS)
///@}
#include <qpid/dispatch/router_core.h>

/** Message forwarding descriptor
*
@@ -140,23 +88,25 @@ typedef void (*qd_router_message_cb_t)(void *context, qd_message_t *msg, int lin

const char *qd_router_id(const qd_dispatch_t *qd);

qdr_core_t *qd_router_core(qd_dispatch_t *qd);

/** Register an address in the router's hash table.
* @param qd Pointer to the dispatch instance.
* @param address String form of address
* @param on_message Optional callback to be called when a message is received
* for the address.
* @param context Context to be passed to the on_message handler.
* @param semantics Semantics for the address.
* @param treatment Treatment for the address.
* @param global True if the address is global.
* @param forwarder Optional custom forwarder to use when a message is received
* for the address. If null, a default forwarder based on the semantics will
* for the address. If null, a default forwarder based on the treatment will
* be used.
*/
qd_address_t *qd_router_register_address(qd_dispatch_t *qd,
const char *address,
qd_router_message_cb_t on_message,
void *context,
qd_address_semantics_t semantics,
qd_address_treatment_t treatment,
bool global,
qd_router_forwarder_t *forwarder);

@@ -184,7 +134,7 @@ void qd_router_build_node_list(qd_dispatch_t *qd, qd_composed_field_t *field);
const char* qd_address_logstr(qd_address_t* address);

/** Retrieve the proper forwarder for a given semantic */
qd_router_forwarder_t *qd_router_get_forwarder(qd_address_semantics_t s);
qd_router_forwarder_t *qd_router_get_forwarder(qd_address_treatment_t t);

///@}

0 comments on commit 215b586

Please sign in to comment.