Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion include/qpid/dispatch/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,15 @@ typedef int (*qd_conn_handler_cb_t)(void *handler_context, void* conn_context, q
typedef int (*qd_pn_event_handler_cb_t)(void *handler_context, void* conn_context, pn_event_t *event, qd_connection_t *conn);


/**
* Post event process handler
* Invoke only after all proton events have been popped from the collector.
*
* @param conn The connection for which all proton events have been popped.
*/
typedef void (*qd_pn_event_complete_cb_t)(qd_connection_t *conn);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the callback's need to include the context argument (even it you don't use it in your handler).



/**
* Set the connection event handler callback.
*
Expand All @@ -487,7 +496,11 @@ typedef int (*qd_pn_event_handler_cb_t)(void *handler_context, void* conn_contex
* @param pn_event_handler The handler for proton events.
* @param handler_context Context data to associate with the handler.
*/
void qd_server_set_conn_handler(qd_dispatch_t *qd, qd_conn_handler_cb_t conn_handler, qd_pn_event_handler_cb_t pn_event_handler, void *handler_context);
void qd_server_set_conn_handler(qd_dispatch_t *qd,
qd_conn_handler_cb_t conn_handler,
qd_pn_event_handler_cb_t pn_event_handler,
qd_pn_event_complete_cb_t pn_event_complete_handler,
void *handler_context);


/**
Expand Down
86 changes: 81 additions & 5 deletions src/container.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ struct qd_container_t {
qdc_node_type_list_t node_type_list;
};

ALLOC_DEFINE(qd_pn_free_link_session_t);

static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link)
{
qd_node_t *node = container->default_node;
Expand Down Expand Up @@ -312,6 +314,80 @@ static int writable_handler(qd_container_t *container, pn_connection_t *conn, qd
return event_count;
}

/**
* Returns true if the free_link already exists in free_link_list, false otherwise
*/
static bool link_exists(qd_pn_free_link_session_list_t **free_list, pn_link_t *free_link)
{
qd_pn_free_link_session_t *free_item = DEQ_HEAD(**free_list);
while(free_item) {
if (free_item->pn_link == free_link)
return true;
free_item = DEQ_NEXT(free_item);
}
return false;
}

/**
* Returns true if the free_session already exists in free_session_list, false otherwise
*/
static bool session_exists(qd_pn_free_link_session_list_t **free_list, pn_session_t *free_session)
{
qd_pn_free_link_session_t *free_item = DEQ_HEAD(**free_list);
while(free_item) {
if (free_item->pn_session == free_session)
return true;
free_item = DEQ_NEXT(free_item);
}
return false;
}

static void add_session_to_free_list(qd_pn_free_link_session_list_t *free_link_session_list, pn_session_t *ssn)
{
if (!session_exists(&free_link_session_list, ssn)) {
qd_pn_free_link_session_t *to_free = new_qd_pn_free_link_session_t();
DEQ_ITEM_INIT(to_free);
to_free->pn_session = ssn;
to_free->pn_link = 0;
DEQ_INSERT_TAIL(*free_link_session_list, to_free);
}
}


static void add_link_to_free_list(qd_pn_free_link_session_list_t *free_link_session_list, pn_link_t *pn_link)
{
if (!link_exists(&free_link_session_list, pn_link)) {
qd_pn_free_link_session_t *to_free = new_qd_pn_free_link_session_t();
DEQ_ITEM_INIT(to_free);
to_free->pn_link = pn_link;
to_free->pn_session = 0;
DEQ_INSERT_TAIL(*free_link_session_list, to_free);
}

}

void pn_event_complete_handler(qd_connection_t *qd_conn)
{
qd_pn_free_link_session_t *to_free_link = DEQ_HEAD(qd_conn->free_link_session_list);
qd_pn_free_link_session_t *to_free_session = DEQ_HEAD(qd_conn->free_link_session_list);
while(to_free_link) {
if (to_free_link->pn_link) {
pn_link_free(to_free_link->pn_link);
to_free_link->pn_link = 0;
}
to_free_link = DEQ_NEXT(to_free_link);
}

while(to_free_session) {
if (to_free_session->pn_session) {
pn_session_free(to_free_session->pn_session);
to_free_session->pn_session = 0;
}
DEQ_REMOVE_HEAD(qd_conn->free_link_session_list);
free_qd_pn_free_link_session_t(to_free_session);
to_free_session = DEQ_HEAD(qd_conn->free_link_session_list);
}
}

int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *event, qd_connection_t *qd_conn)
{
Expand Down Expand Up @@ -360,7 +436,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
case PN_SESSION_LOCAL_CLOSE :
ssn = pn_event_session(event);
if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
pn_session_free(ssn);
add_session_to_free_list(&qd_conn->free_link_session_list,ssn);
}
break;
case PN_SESSION_REMOTE_CLOSE :
Expand Down Expand Up @@ -400,7 +476,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
pn_session_close(ssn);
}
else if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
pn_session_free(ssn);
add_session_to_free_list(&qd_conn->free_link_session_list,ssn);
}
}
break;
Expand Down Expand Up @@ -464,7 +540,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
if (pn_link_state(pn_link) & PN_LOCAL_CLOSED) {
if (qd_link->close_sess_with_link && sess)
pn_session_close(sess);
pn_link_free(pn_link);
add_link_to_free_list(&qd_conn->free_link_session_list, pn_link);
}
}
}
Expand All @@ -474,7 +550,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
case PN_LINK_LOCAL_CLOSE:
pn_link = pn_event_link(event);
if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
pn_link_free(pn_link);
add_link_to_free_list(&qd_conn->free_link_session_list, pn_link);
}
break;

Expand Down Expand Up @@ -548,7 +624,7 @@ qd_container_t *qd_container(qd_dispatch_t *qd)
DEQ_INIT(container->nodes);
DEQ_INIT(container->node_type_list);

qd_server_set_conn_handler(qd, handler, pn_event_handler, container);
qd_server_set_conn_handler(qd, handler, pn_event_handler, pn_event_complete_handler, container);

qd_log(container->log_source, QD_LOG_TRACE, "Container Initialized");
return container;
Expand Down
14 changes: 11 additions & 3 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ qd_connection_t *qd_connection_allocate()
DEQ_ITEM_INIT(ctx);
DEQ_INIT(ctx->deferred_calls);
ctx->deferred_call_lock = sys_mutex();
DEQ_INIT(ctx->free_link_session_list);
return ctx;
}

Expand Down Expand Up @@ -852,6 +853,11 @@ static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr)

event = ctx->event_stall ? 0 : pn_collector_peek(collector);
}

//
// Free up any links and sessions that need to be freed since all the events have been popped from the collector.
//
qd_server->pn_event_complete_handler(qd_conn);
events += qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, QD_CONN_EVENT_WRITABLE, qd_conn);
}
} while (events > 0);
Expand Down Expand Up @@ -1415,11 +1421,13 @@ void qd_server_free(qd_server_t *qd_server)
void qd_server_set_conn_handler(qd_dispatch_t *qd,
qd_conn_handler_cb_t handler,
qd_pn_event_handler_cb_t pn_event_handler,
qd_pn_event_complete_cb_t pn_event_complete_handler,
void *handler_context)
{
qd->server->conn_handler = handler;
qd->server->pn_event_handler = pn_event_handler;
qd->server->conn_handler_context = handler_context;
qd->server->conn_handler = handler;
qd->server->pn_event_handler = pn_event_handler;
qd->server->pn_event_complete_handler = pn_event_complete_handler;
qd->server->conn_handler_context = handler_context;
}


Expand Down
12 changes: 12 additions & 0 deletions src/server_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ typedef struct qd_deferred_call_t {

DEQ_DECLARE(qd_deferred_call_t, qd_deferred_call_list_t);

typedef struct qd_pn_free_link_session_t {
DEQ_LINKS(struct qd_pn_free_link_session_t);
pn_session_t *pn_session;
pn_link_t *pn_link;
} qd_pn_free_link_session_t;

DEQ_DECLARE(qd_pn_free_link_session_t, qd_pn_free_link_session_list_t);

/**
* Connection objects wrap Proton connection objects.
*/
Expand Down Expand Up @@ -118,6 +126,7 @@ struct qd_connection_t {
bool event_stall;
bool policy_counted;
char *role; //The specified role of the connection, e.g. "normal", "inter-router", "route-container" etc.
qd_pn_free_link_session_list_t free_link_session_list;
};

DEQ_DECLARE(qd_connection_t, qd_connection_list_t);
Expand Down Expand Up @@ -160,6 +169,7 @@ struct qd_server_t {
qd_thread_start_cb_t start_handler;
qd_conn_handler_cb_t conn_handler;
qd_pn_event_handler_cb_t pn_event_handler;
qd_pn_event_complete_cb_t pn_event_complete_handler;
qd_user_fd_handler_cb_t ufd_handler;
void *start_context;
void *conn_handler_context;
Expand Down Expand Up @@ -191,5 +201,7 @@ ALLOC_DECLARE(qd_deferred_call_t);
ALLOC_DECLARE(qd_connector_t);
ALLOC_DECLARE(qd_connection_t);
ALLOC_DECLARE(qd_user_fd_t);
ALLOC_DECLARE(qd_pn_free_link_session_t);


#endif