diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h index aebe36e230..1f850fb1b0 100644 --- a/include/qpid/dispatch/server.h +++ b/include/qpid/dispatch/server.h @@ -476,6 +476,15 @@ typedef int (*qd_conn_handler_cb_t)(void *handler_context, void* conn_context, q typedef int (*qd_pn_event_handler_cb_t)(void *handler_context, void* conn_context, pn_event_t *event, qd_connection_t *conn); +/** + * Post event process handler + * Invoke only after all proton events have been popped from the collector. + * + * @param conn The connection for which all proton events have been popped. + */ +typedef void (*qd_pn_event_complete_cb_t)(qd_connection_t *conn); + + /** * Set the connection event handler callback. * @@ -487,7 +496,11 @@ typedef int (*qd_pn_event_handler_cb_t)(void *handler_context, void* conn_contex * @param pn_event_handler The handler for proton events. * @param handler_context Context data to associate with the handler. */ -void qd_server_set_conn_handler(qd_dispatch_t *qd, qd_conn_handler_cb_t conn_handler, qd_pn_event_handler_cb_t pn_event_handler, void *handler_context); +void qd_server_set_conn_handler(qd_dispatch_t *qd, + qd_conn_handler_cb_t conn_handler, + qd_pn_event_handler_cb_t pn_event_handler, + qd_pn_event_complete_cb_t pn_event_complete_handler, + void *handler_context); /** diff --git a/src/container.c b/src/container.c index 5103a6612d..e9ac53f0e4 100644 --- a/src/container.c +++ b/src/container.c @@ -83,6 +83,8 @@ struct qd_container_t { qdc_node_type_list_t node_type_list; }; +ALLOC_DEFINE(qd_pn_free_link_session_t); + static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link) { qd_node_t *node = container->default_node; @@ -312,6 +314,80 @@ static int writable_handler(qd_container_t *container, pn_connection_t *conn, qd return event_count; } +/** + * Returns true if the free_link already exists in free_link_list, false otherwise + */ +static bool link_exists(qd_pn_free_link_session_list_t **free_list, pn_link_t *free_link) +{ + qd_pn_free_link_session_t *free_item = DEQ_HEAD(**free_list); + while(free_item) { + if (free_item->pn_link == free_link) + return true; + free_item = DEQ_NEXT(free_item); + } + return false; +} + +/** + * Returns true if the free_session already exists in free_session_list, false otherwise + */ +static bool session_exists(qd_pn_free_link_session_list_t **free_list, pn_session_t *free_session) +{ + qd_pn_free_link_session_t *free_item = DEQ_HEAD(**free_list); + while(free_item) { + if (free_item->pn_session == free_session) + return true; + free_item = DEQ_NEXT(free_item); + } + return false; +} + +static void add_session_to_free_list(qd_pn_free_link_session_list_t *free_link_session_list, pn_session_t *ssn) +{ + if (!session_exists(&free_link_session_list, ssn)) { + qd_pn_free_link_session_t *to_free = new_qd_pn_free_link_session_t(); + DEQ_ITEM_INIT(to_free); + to_free->pn_session = ssn; + to_free->pn_link = 0; + DEQ_INSERT_TAIL(*free_link_session_list, to_free); + } +} + + +static void add_link_to_free_list(qd_pn_free_link_session_list_t *free_link_session_list, pn_link_t *pn_link) +{ + if (!link_exists(&free_link_session_list, pn_link)) { + qd_pn_free_link_session_t *to_free = new_qd_pn_free_link_session_t(); + DEQ_ITEM_INIT(to_free); + to_free->pn_link = pn_link; + to_free->pn_session = 0; + DEQ_INSERT_TAIL(*free_link_session_list, to_free); + } + +} + +void pn_event_complete_handler(qd_connection_t *qd_conn) +{ + qd_pn_free_link_session_t *to_free_link = DEQ_HEAD(qd_conn->free_link_session_list); + qd_pn_free_link_session_t *to_free_session = DEQ_HEAD(qd_conn->free_link_session_list); + while(to_free_link) { + if (to_free_link->pn_link) { + pn_link_free(to_free_link->pn_link); + to_free_link->pn_link = 0; + } + to_free_link = DEQ_NEXT(to_free_link); + } + + while(to_free_session) { + if (to_free_session->pn_session) { + pn_session_free(to_free_session->pn_session); + to_free_session->pn_session = 0; + } + DEQ_REMOVE_HEAD(qd_conn->free_link_session_list); + free_qd_pn_free_link_session_t(to_free_session); + to_free_session = DEQ_HEAD(qd_conn->free_link_session_list); + } +} int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *event, qd_connection_t *qd_conn) { @@ -360,7 +436,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even case PN_SESSION_LOCAL_CLOSE : ssn = pn_event_session(event); if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { - pn_session_free(ssn); + add_session_to_free_list(&qd_conn->free_link_session_list,ssn); } break; case PN_SESSION_REMOTE_CLOSE : @@ -400,7 +476,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even pn_session_close(ssn); } else if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { - pn_session_free(ssn); + add_session_to_free_list(&qd_conn->free_link_session_list,ssn); } } break; @@ -464,7 +540,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even if (pn_link_state(pn_link) & PN_LOCAL_CLOSED) { if (qd_link->close_sess_with_link && sess) pn_session_close(sess); - pn_link_free(pn_link); + add_link_to_free_list(&qd_conn->free_link_session_list, pn_link); } } } @@ -474,7 +550,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even case PN_LINK_LOCAL_CLOSE: pn_link = pn_event_link(event); if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { - pn_link_free(pn_link); + add_link_to_free_list(&qd_conn->free_link_session_list, pn_link); } break; @@ -548,7 +624,7 @@ qd_container_t *qd_container(qd_dispatch_t *qd) DEQ_INIT(container->nodes); DEQ_INIT(container->node_type_list); - qd_server_set_conn_handler(qd, handler, pn_event_handler, container); + qd_server_set_conn_handler(qd, handler, pn_event_handler, pn_event_complete_handler, container); qd_log(container->log_source, QD_LOG_TRACE, "Container Initialized"); return container; diff --git a/src/server.c b/src/server.c index 557c0b481f..066f544ed8 100644 --- a/src/server.c +++ b/src/server.c @@ -366,6 +366,7 @@ qd_connection_t *qd_connection_allocate() DEQ_ITEM_INIT(ctx); DEQ_INIT(ctx->deferred_calls); ctx->deferred_call_lock = sys_mutex(); + DEQ_INIT(ctx->free_link_session_list); return ctx; } @@ -852,6 +853,11 @@ static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr) event = ctx->event_stall ? 0 : pn_collector_peek(collector); } + + // + // Free up any links and sessions that need to be freed since all the events have been popped from the collector. + // + qd_server->pn_event_complete_handler(qd_conn); events += qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, QD_CONN_EVENT_WRITABLE, qd_conn); } } while (events > 0); @@ -1415,11 +1421,13 @@ void qd_server_free(qd_server_t *qd_server) void qd_server_set_conn_handler(qd_dispatch_t *qd, qd_conn_handler_cb_t handler, qd_pn_event_handler_cb_t pn_event_handler, + qd_pn_event_complete_cb_t pn_event_complete_handler, void *handler_context) { - qd->server->conn_handler = handler; - qd->server->pn_event_handler = pn_event_handler; - qd->server->conn_handler_context = handler_context; + qd->server->conn_handler = handler; + qd->server->pn_event_handler = pn_event_handler; + qd->server->pn_event_complete_handler = pn_event_complete_handler; + qd->server->conn_handler_context = handler_context; } diff --git a/src/server_private.h b/src/server_private.h index eb23fa1abc..8c4d89e2f4 100644 --- a/src/server_private.h +++ b/src/server_private.h @@ -85,6 +85,14 @@ typedef struct qd_deferred_call_t { DEQ_DECLARE(qd_deferred_call_t, qd_deferred_call_list_t); +typedef struct qd_pn_free_link_session_t { + DEQ_LINKS(struct qd_pn_free_link_session_t); + pn_session_t *pn_session; + pn_link_t *pn_link; +} qd_pn_free_link_session_t; + +DEQ_DECLARE(qd_pn_free_link_session_t, qd_pn_free_link_session_list_t); + /** * Connection objects wrap Proton connection objects. */ @@ -118,6 +126,7 @@ struct qd_connection_t { bool event_stall; bool policy_counted; char *role; //The specified role of the connection, e.g. "normal", "inter-router", "route-container" etc. + qd_pn_free_link_session_list_t free_link_session_list; }; DEQ_DECLARE(qd_connection_t, qd_connection_list_t); @@ -160,6 +169,7 @@ struct qd_server_t { qd_thread_start_cb_t start_handler; qd_conn_handler_cb_t conn_handler; qd_pn_event_handler_cb_t pn_event_handler; + qd_pn_event_complete_cb_t pn_event_complete_handler; qd_user_fd_handler_cb_t ufd_handler; void *start_context; void *conn_handler_context; @@ -191,5 +201,7 @@ ALLOC_DECLARE(qd_deferred_call_t); ALLOC_DECLARE(qd_connector_t); ALLOC_DECLARE(qd_connection_t); ALLOC_DECLARE(qd_user_fd_t); +ALLOC_DECLARE(qd_pn_free_link_session_t); + #endif