Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/router_core/router_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ ALLOC_DEFINE(qdr_delivery_ref_t);
ALLOC_DEFINE(qdr_link_t);
ALLOC_DEFINE(qdr_router_ref_t);
ALLOC_DEFINE(qdr_link_ref_t);
ALLOC_DEFINE(qdr_delivery_cleanup_t);
ALLOC_DEFINE(qdr_general_work_t);
ALLOC_DEFINE(qdr_link_work_t);
ALLOC_DEFINE(qdr_connection_ref_t);
Expand Down
34 changes: 25 additions & 9 deletions src/router_core/router_core_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,20 @@ struct qdr_action_t {
ALLOC_DECLARE(qdr_action_t);
DEQ_DECLARE(qdr_action_t, qdr_action_list_t);

//
//
//
typedef struct qdr_delivery_cleanup_t qdr_delivery_cleanup_t;

struct qdr_delivery_cleanup_t {
DEQ_LINKS(qdr_delivery_cleanup_t);
qd_message_t *msg;
qd_iterator_t *iter;
};

ALLOC_DECLARE(qdr_delivery_cleanup_t);
DEQ_DECLARE(qdr_delivery_cleanup_t, qdr_delivery_cleanup_list_t);

//
// General Work
//
Expand All @@ -194,15 +208,16 @@ typedef void (*qdr_general_work_handler_t) (qdr_core_t *core, qdr_general_work_t

struct qdr_general_work_t {
DEQ_LINKS(qdr_general_work_t);
qdr_general_work_handler_t handler;
qdr_field_t *field;
int maskbit;
int inter_router_cost;
qdr_receive_t on_message;
void *on_message_context;
qd_message_t *msg;
uint64_t in_conn_id;
int treatment;
qdr_general_work_handler_t handler;
qdr_field_t *field;
int maskbit;
int inter_router_cost;
qd_message_t *msg;
qdr_receive_t on_message;
void *on_message_context;
uint64_t in_conn_id;
int treatment;
qdr_delivery_cleanup_list_t delivery_cleanup_list;
};

ALLOC_DECLARE(qdr_general_work_t);
Expand Down Expand Up @@ -819,6 +834,7 @@ struct qdr_core_t {
qdr_exchange_list_t exchanges;
qdr_forwarder_t *forwarders[QD_TREATMENT_LINK_BALANCED + 1];

qdr_delivery_cleanup_list_t delivery_cleanup_list; ///< List of delivery cleanup items to be processed in an IO thread

// Overall delivery counters
uint64_t presettled_deliveries;
Expand Down
26 changes: 25 additions & 1 deletion src/router_core/router_core_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,22 @@ static void qdr_activate_connections_CT(qdr_core_t *core)
}


static void qdr_do_message_to_addr_free(qdr_core_t *core, qdr_general_work_t *work)
{
qdr_delivery_cleanup_t *cleanup = DEQ_HEAD(work->delivery_cleanup_list);

while (cleanup) {
DEQ_REMOVE_HEAD(work->delivery_cleanup_list);
if (cleanup->msg)
qd_message_free(cleanup->msg);
if (cleanup->iter)
qd_iterator_free(cleanup->iter);
free_qdr_delivery_cleanup_t(cleanup);
cleanup = DEQ_HEAD(work->delivery_cleanup_list);
}
}


void qdr_modules_init(qdr_core_t *core)
{
//
Expand All @@ -84,7 +100,6 @@ void qdr_modules_init(qdr_core_t *core)

module = DEQ_NEXT(module);
}

}


Expand Down Expand Up @@ -154,6 +169,15 @@ void *router_core_thread(void *arg)
// Activate all connections that were flagged for activation during the above processing
//
qdr_activate_connections_CT(core);

//
// Schedule the cleanup of deliveries freed during this core-thread pass
//
if (DEQ_SIZE(core->delivery_cleanup_list) > 0) {
qdr_general_work_t *work = qdr_general_work(qdr_do_message_to_addr_free);
DEQ_MOVE(core->delivery_cleanup_list, work->delivery_cleanup_list);
qdr_post_general_work_CT(core, work);
}
}

qd_log(core->log, QD_LOG_INFO, "Router Core thread exited");
Expand Down
20 changes: 7 additions & 13 deletions src/router_core/transfer.c
Original file line number Diff line number Diff line change
Expand Up @@ -550,25 +550,19 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
}


static void qdr_do_message_to_addr_free(qdr_core_t *core, qdr_general_work_t *work)
{
if (work->msg)
qd_message_free(work->msg);
if (work->on_message_context)
qd_iterator_free((qd_iterator_t *)work->on_message_context);
}


static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery)
{
assert(sys_atomic_get(&delivery->ref_count) == 0);
qdr_link_t *link = delivery->link;

if (delivery->msg || delivery->to_addr) {
qdr_general_work_t *work = qdr_general_work(qdr_do_message_to_addr_free);
work->msg = delivery->msg;
work->on_message_context = delivery->to_addr;
qdr_post_general_work_CT(core, work);
qdr_delivery_cleanup_t *cleanup = new_qdr_delivery_cleanup_t();

DEQ_ITEM_INIT(cleanup);
cleanup->msg = delivery->msg;
cleanup->iter = delivery->to_addr;

DEQ_INSERT_TAIL(core->delivery_cleanup_list, cleanup);
}

if (delivery->tracking_addr) {
Expand Down