From 960053db22ea62633d4b09c6d0c7f0248256736e Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 19 Jan 2017 10:14:47 -0500 Subject: [PATCH 1/4] DISPATCH-390: Centralize use of qdpn_ symbols. qdpn_ functions are part of the old driver. Remove references from src/router_node.c, src/policy.c. Only remaining references are in driver, server and http related files. --- include/qpid/dispatch/router_core.h | 10 +++++----- src/policy.c | 12 ++++++------ src/router_core/connections.c | 8 ++++---- src/router_node.c | 4 ++-- src/server_private.h | 8 ++++++++ 5 files changed, 25 insertions(+), 17 deletions(-) diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 2d4c9639c4..d71149189d 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -684,7 +684,7 @@ void qdr_manage_update(qdr_core_t *core, void *context, qd_router_entity_type_t * b) if more is false or count is exceeded, call qdr_query_free, close the outer list, close the map */ -qdr_query_t *qdr_manage_query(qdr_core_t *core, void *context, qd_router_entity_type_t type, +qdr_query_t *qdr_manage_query(qdr_core_t *core, void *context, qd_router_entity_type_t type, qd_parsed_field_t *attribute_names, qd_composed_field_t *body); void qdr_query_add_attribute_names(qdr_query_t *query); void qdr_query_get_first(qdr_query_t *query, int offset); @@ -699,10 +699,10 @@ qdr_connection_info_t *qdr_connection_info(bool is_encrypted, bool opened, char *sasl_mechanisms, qd_direction_t dir, - char *host, - char *ssl_proto, - char *ssl_cipher, - char *user, + const char *host, + const char *ssl_proto, + const char *ssl_cipher, + const char *user, const char *container, pn_data_t *connection_properties, int ssl_ssf, diff --git a/src/policy.c b/src/policy.c index d03257d851..cea02167e1 100644 --- a/src/policy.c +++ b/src/policy.c @@ -239,7 +239,7 @@ void qd_policy_socket_close(void *context, const qd_connection_t *conn) } qd_python_unlock(lock_state); } - const char *hostname = qdpn_connector_name(conn->pn_cxtr); + const char *hostname = qd_connection_name(conn); qd_log(policy->log_source, QD_LOG_DEBUG, "Connection '%s' closed with resources n_sessions=%d, n_senders=%d, n_receivers=%d. nConnections= %d.", hostname, conn->n_sessions, conn->n_senders, conn->n_receivers, n_connections); } @@ -402,7 +402,7 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn) pn_connection_t *conn = qd_connection_pn(qd_conn); qd_dispatch_t *qd = qd_conn->server->qd; qd_policy_t *policy = qd->policy; - const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr); + const char *hostip = qd_connection_hostip(qd_conn); const char *vhost = pn_connection_remote_hostname(conn); if (result) { qd_log(policy->log_source, @@ -574,7 +574,7 @@ bool _qd_policy_approve_link_name(const char *username, const char *allowed, con // bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_conn) { - const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr); + const char *hostip = qd_connection_hostip(qd_conn); const char *vhost = pn_connection_remote_hostname(qd_connection_pn(qd_conn)); if (qd_conn->policy_settings->maxSenders) { @@ -625,7 +625,7 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *qd_conn) { - const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr); + const char *hostip = qd_connection_hostip(qd_conn); const char *vhost = pn_connection_remote_hostname(qd_connection_pn(qd_conn)); if (qd_conn->policy_settings->maxReceivers) { @@ -695,10 +695,10 @@ void qd_policy_amqp_open(void *context, bool discard) if (policy->enableVhostPolicy) { // Open connection or not based on policy. pn_transport_t *pn_trans = pn_connection_transport(conn); - const char *hostip = qdpn_connector_hostip(qd_conn->pn_cxtr); + const char *hostip = qd_connection_hostip(qd_conn); const char *pcrh = pn_connection_remote_hostname(conn); const char *vhost = (pcrh ? pcrh : ""); - const char *conn_name = qdpn_connector_name(qd_conn->pn_cxtr); + const char *conn_name = qd_connection_name(qd_conn); #define SETTINGS_NAME_SIZE 256 char settings_name[SETTINGS_NAME_SIZE]; uint32_t conn_id = qd_conn->connection_id; diff --git a/src/router_core/connections.c b/src/router_core/connections.c index ad2194f3f0..2d88a7b340 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -124,10 +124,10 @@ qdr_connection_info_t *qdr_connection_info(bool is_encrypted, bool opened, char *sasl_mechanisms, qd_direction_t dir, - char *host, - char *ssl_proto, - char *ssl_cipher, - char *user, + const char *host, + const char *ssl_proto, + const char *ssl_cipher, + const char *user, const char *container, pn_data_t *connection_properties, int ssl_ssf, diff --git a/src/router_node.c b/src/router_node.c index 114873ddee..fa466a68b4 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -619,7 +619,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool if (sasl) mech = pn_sasl_get_mech(sasl); - char *host = 0; + const char *host = 0; const qd_server_config_t *config; if (conn->connector) { @@ -629,7 +629,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool host = &host_local[0]; } else - host = (char *)qdpn_connector_name(conn->pn_cxtr); + host = qd_connection_name(conn); qd_router_connection_get_config(conn, &role, &cost, &name, &multi_tenant, diff --git a/src/server_private.h b/src/server_private.h index 2508bb2aa7..28ce2ba63d 100644 --- a/src/server_private.h +++ b/src/server_private.h @@ -130,6 +130,14 @@ struct qd_connection_t { qd_pn_free_link_session_list_t free_link_session_list; }; +static inline const char* qd_connection_name(const qd_connection_t *c) { + return qdpn_connector_name(c->pn_cxtr); +} + +static inline const char* qd_connection_hostip(const qd_connection_t *c) { + return qdpn_connector_hostip(c->pn_cxtr); +} + DEQ_DECLARE(qd_connection_t, qd_connection_list_t); From 1ea0219bcbc5c2761c60fc94e7e387927cfb0418 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 19 Jan 2017 12:48:42 -0500 Subject: [PATCH 2/4] DISPATCH-390: Cleanup unused code in server.c Removed unused code, mark file-local functions static with no qd_ prefix. --- include/qpid/dispatch.h | 1 - include/qpid/dispatch/user_fd.h | 128 ---------------------------- src/server.c | 104 +++-------------------- src/server_private.h | 13 --- tests/CMakeLists.txt | 1 - tests/run_unit_tests.c | 2 - tests/server_test.c | 146 -------------------------------- 7 files changed, 10 insertions(+), 385 deletions(-) delete mode 100644 include/qpid/dispatch/user_fd.h delete mode 100755 tests/server_test.c diff --git a/include/qpid/dispatch.h b/include/qpid/dispatch.h index 156f831824..cab1beac96 100644 --- a/include/qpid/dispatch.h +++ b/include/qpid/dispatch.h @@ -53,7 +53,6 @@ #include #include #include -#include #include #include #include diff --git a/include/qpid/dispatch/user_fd.h b/include/qpid/dispatch/user_fd.h deleted file mode 100644 index 56d9124a67..0000000000 --- a/include/qpid/dispatch/user_fd.h +++ /dev/null @@ -1,128 +0,0 @@ -#ifndef __dispatch_user_fd_h__ -#define __dispatch_user_fd_h__ 1 -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include -#include - -/**@file - * Server User-File-Descriptor functions. - * - * @defgroup user_fd user_fd - * - * Server User-File-Descriptor Functions - * @{ - */ - -typedef struct qd_user_fd_t qd_user_fd_t; - - -/** - * User_fd Handler - * - * Callback invoked when a user-managed file descriptor is available for reading or writing or there - * was an error on the file descriptor. - * - * @param context The handler context supplied in the qd_user_fd call. - * @param ufd The user_fd handle for the processable fd. - */ -typedef void (*qd_user_fd_handler_cb_t)(void* context, qd_user_fd_t *ufd); - - -/** - * Set the user-fd handler callback for the server. This handler is optional, but must be supplied - * if the qd_server is used to manage the activation of user file descriptors. - */ -void qd_server_set_user_fd_handler(qd_dispatch_t *qd, qd_user_fd_handler_cb_t ufd_handler); - - -/** - * Create a tracker for a user-managed file descriptor. - * - * A user-fd is appropriate for use when the application opens and manages file descriptors - * for purposes other than AMQP communication. Registering a user fd with the dispatch server - * controls processing of the FD alongside the FDs used for messaging. - * - * @param qd Pointer to the dispatch instance. - * @param fd The open file descriptor being managed by the application. - * @param context User context passed back in the connection handler. - * @return A pointer to the new user_fd. - */ -qd_user_fd_t *qd_user_fd(qd_dispatch_t *qd, int fd, void *context); - - -/** - * Free the resources for a user-managed FD tracker. - * - * @param ufd Structure pointer returned by qd_user_fd. - */ -void qd_user_fd_free(qd_user_fd_t *ufd); - - -/** - * Activate a user-fd for read. - * - * Use this activation when the application has capacity to receive data from the user-fd. This will - * cause the callback set in qd_server_set_user_fd_handler to later be invoked when the - * file descriptor has data to read. - * - * @param ufd Structure pointer returned by qd_user_fd. - */ -void qd_user_fd_activate_read(qd_user_fd_t *ufd); - - -/** - * Activate a user-fd for write. - * - * Use this activation when the application has data to write via the user-fd. This will - * cause the callback set in qd_server_set_user_fd_handler to later be invoked when the - * file descriptor is writable. - * - * @param ufd Structure pointer returned by qd_user_fd. - */ -void qd_user_fd_activate_write(qd_user_fd_t *ufd); - - -/** - * Check readable status of a user-fd - * - * Note: It is possible that readable status is spurious (i.e. this function returns true - * but the file-descriptor is not readable and will block if not set to O_NONBLOCK). - * Code accordingly. - * - * @param ufd Structure pointer returned by qd_user_fd. - * @return true iff the user file descriptor is readable. - */ -bool qd_user_fd_is_readable(qd_user_fd_t *ufd); - - -/** - * Check writable status of a user-fd - * - * @param ufd Structure pointer returned by qd_user_fd. - * @return true iff the user file descriptor is writable. - */ -bool qd_user_fd_is_writeable(qd_user_fd_t *ufd); - -/** - * @} - */ - -#endif diff --git a/src/server.c b/src/server.c index 4a937cd95e..a86500afd4 100644 --- a/src/server.c +++ b/src/server.c @@ -47,7 +47,6 @@ ALLOC_DEFINE(qd_listener_t); ALLOC_DEFINE(qd_connector_t); ALLOC_DEFINE(qd_deferred_call_t); ALLOC_DEFINE(qd_connection_t); -ALLOC_DEFINE(qd_user_fd_t); const char *QD_CONNECTION_TYPE = "connection"; const char *MECH_EXTERNAL = "EXTERNAL"; @@ -109,12 +108,10 @@ static void free_qd_connection(qd_connection_t *ctx) free_qd_connection_t(ctx); } -qd_error_t qd_entity_update_connection(qd_entity_t* entity, void *impl); - /** * This function is set as the pn_transport->tracer and is invoked when proton tries to write the log message to pn_transport->tracer */ -static void qd_transport_tracer(pn_transport_t *transport, const char *message) +static void transport_tracer(pn_transport_t *transport, const char *message) { qd_connection_t *ctx = (qd_connection_t*) pn_transport_get_context(transport); if (ctx) @@ -143,7 +140,7 @@ qd_error_t qd_register_display_name_service(qd_dispatch_t *qd, void *displayname * Returns a char pointer to a user id which is constructed from components specified in the config->ssl_uid_format. * Parses through each component and builds a semi-colon delimited string which is returned as the user id. */ -static const char *qd_transport_get_user(qd_connection_t *conn, pn_transport_t *tport) +static const char *transport_get_user(qd_connection_t *conn, pn_transport_t *tport) { const qd_server_config_t *config = conn->connector ? conn->connector->config : conn->listener->config; @@ -346,7 +343,7 @@ static const char *qd_transport_get_user(qd_connection_t *conn, pn_transport_t * * Allocate a new qd_connection * with DEQ items initialized, call lock allocated, and all other fields cleared. */ -qd_connection_t *qd_connection_allocate() +static qd_connection_t *connection_allocate() { qd_connection_t *ctx = new_qd_connection_t(); ZERO(ctx); @@ -367,7 +364,7 @@ void qd_connection_set_user(qd_connection_t *conn) conn->user_id = pn_transport_get_user(tport); // We want to set the user name only if it is not already set and the selected sasl mechanism is EXTERNAL if (mech && strcmp(mech, MECH_EXTERNAL) == 0) { - const char *user_id = qd_transport_get_user(conn, tport); + const char *user_id = transport_get_user(conn, tport); if (user_id) conn->user_id = user_id; } @@ -504,7 +501,7 @@ static void thread_process_listeners_LH(qd_server_t *qd_server) char logbuf[qd_log_max_len()]; - ctx = qd_connection_allocate(); + ctx = connection_allocate(); ctx->server = qd_server; ctx->owner_thread = CONTEXT_UNSPECIFIED_OWNER; ctx->pn_cxtr = cxtr; @@ -549,13 +546,13 @@ static void thread_process_listeners_LH(qd_server_t *qd_server) pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000); // - // Proton pushes out its trace to qd_transport_tracer() which in turn writes a trace message to the qdrouter log + // Proton pushes out its trace to transport_tracer() which in turn writes a trace message to the qdrouter log // If trace level logging is enabled on the router set PN_TRACE_DRV | PN_TRACE_FRM | PN_TRACE_RAW on the proton transport // pn_transport_set_context(tport, ctx); if (qd_log_enabled(qd_server->log_source, QD_LOG_TRACE)) { pn_transport_trace(tport, PN_TRACE_FRM); - pn_transport_set_tracer(tport, qd_transport_tracer); + pn_transport_set_tracer(tport, transport_tracer); } if (li->http) { @@ -661,15 +658,6 @@ static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr) if (ctx->closed) return 0; - // - // If this is a user connection, bypass the AMQP processing and invoke the - // UserFD handler instead. - // - if (ctx->ufd) { - qd_server->ufd_handler(ctx->ufd->context, ctx->ufd); - return 1; - } - do { passes++; @@ -1054,7 +1042,7 @@ static void cxtr_try_open(void *context) if (ct->state != CXTR_STATE_CONNECTING) return; - qd_connection_t *ctx = qd_connection_allocate(); + qd_connection_t *ctx = connection_allocate(); ctx->server = ct->server; ctx->owner_thread = CONTEXT_UNSPECIFIED_OWNER; ctx->pn_conn = pn_connection(); @@ -1127,13 +1115,13 @@ static void cxtr_try_open(void *context) pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000); // - // Proton pushes out its trace to qd_transport_tracer() which in turn writes a trace message to the qdrouter log + // Proton pushes out its trace to transport_tracer() which in turn writes a trace message to the qdrouter log // // If trace level logging is enabled on the router set PN_TRACE_DRV | PN_TRACE_FRM | PN_TRACE_RAW on the proton transport pn_transport_set_context(tport, ctx); if (qd_log_enabled(ct->server->log_source, QD_LOG_TRACE)) { pn_transport_trace(tport, PN_TRACE_FRM); - pn_transport_set_tracer(tport, qd_transport_tracer); + pn_transport_set_tracer(tport, transport_tracer); } // @@ -1244,7 +1232,6 @@ qd_server_t *qd_server(qd_dispatch_t *qd, int thread_count, const char *containe qd_server->conn_handler = 0; qd_server->pn_event_handler = 0; qd_server->signal_handler = 0; - qd_server->ufd_handler = 0; qd_server->start_context = 0; qd_server->signal_context = 0; qd_server->lock = sys_mutex(); @@ -1319,12 +1306,6 @@ void qd_server_set_start_handler(qd_dispatch_t *qd, qd_thread_start_cb_t handler } -void qd_server_set_user_fd_handler(qd_dispatch_t *qd, qd_user_fd_handler_cb_t ufd_handler) -{ - qd->server->ufd_handler = ufd_handler; -} - - static void qd_server_announce(qd_server_t* qd_server) { qd_log(qd_server->log_source, QD_LOG_INFO, "Operational, %d Threads Running", qd_server->thread_count); @@ -1653,71 +1634,6 @@ void qd_server_connector_free(qd_connector_t* ct) free_qd_connector_t(ct); } - -qd_user_fd_t *qd_user_fd(qd_dispatch_t *qd, int fd, void *context) -{ - qd_server_t *qd_server = qd->server; - qd_user_fd_t *ufd = new_qd_user_fd_t(); - - if (!ufd) - return 0; - - qd_connection_t *ctx = qd_connection_allocate(); - ctx->server = qd_server; - ctx->owner_thread = CONTEXT_NO_OWNER; - ctx->ufd = ufd; - - // Copy the role from the connector config - if (ctx->connector && ctx->connector->config) { - int role_length = strlen(ctx->connector->config->role) + 1; - ctx->role = (char*) malloc(role_length); - strcpy(ctx->role, ctx->connector->config->role); - } - - ufd->context = context; - ufd->server = qd_server; - ufd->fd = fd; - ufd->pn_conn = qdpn_connector_fd(qd_server->driver, fd, (void*) ctx); - qdpn_driver_wakeup(qd_server->driver); - - return ufd; -} - - -void qd_user_fd_free(qd_user_fd_t *ufd) -{ - if (!ufd) return; - qdpn_connector_close(ufd->pn_conn); - free_qd_user_fd_t(ufd); -} - - -void qd_user_fd_activate_read(qd_user_fd_t *ufd) -{ - qdpn_connector_activate(ufd->pn_conn, QDPN_CONNECTOR_READABLE); - qdpn_driver_wakeup(ufd->server->driver); -} - - -void qd_user_fd_activate_write(qd_user_fd_t *ufd) -{ - qdpn_connector_activate(ufd->pn_conn, QDPN_CONNECTOR_WRITABLE); - qdpn_driver_wakeup(ufd->server->driver); -} - - -bool qd_user_fd_is_readable(qd_user_fd_t *ufd) -{ - return qdpn_connector_activated(ufd->pn_conn, QDPN_CONNECTOR_READABLE); -} - - -bool qd_user_fd_is_writeable(qd_user_fd_t *ufd) -{ - return qdpn_connector_activated(ufd->pn_conn, QDPN_CONNECTOR_WRITABLE); -} - - void qd_server_timer_pending_LH(qd_timer_t *timer) { DEQ_INSERT_TAIL(timer->server->pending_timers, timer); diff --git a/src/server_private.h b/src/server_private.h index 28ce2ba63d..3bec06385c 100644 --- a/src/server_private.h +++ b/src/server_private.h @@ -21,7 +21,6 @@ #include #include -#include #include "alloc.h" #include #include @@ -113,7 +112,6 @@ struct qd_connection_t { void *context; // Copy of context from listener or connector void *user_context; void *link_context; // Context shared by this connection's links - qd_user_fd_t *ufd; uint64_t connection_id; // A unique identifier for the qd_connection_t. The underlying pn_connection already has one but it is long and clunky. const char *user_id; // A unique identifier for the user on the connection. This is currently populated from the client ssl cert. See ssl_uid_format in server.h for more info bool free_user_id; @@ -140,15 +138,6 @@ static inline const char* qd_connection_hostip(const qd_connection_t *c) { DEQ_DECLARE(qd_connection_t, qd_connection_list_t); - -struct qd_user_fd_t { - qd_server_t *server; - void *context; - int fd; - qdpn_connector_t *pn_conn; -}; - - typedef struct qd_thread_t { qd_server_t *qd_server; int thread_id; @@ -179,7 +168,6 @@ struct qd_server_t { qd_conn_handler_cb_t conn_handler; qd_pn_event_handler_cb_t pn_event_handler; qd_pn_event_complete_cb_t pn_event_complete_handler; - qd_user_fd_handler_cb_t ufd_handler; void *start_context; void *conn_handler_context; sys_cond_t *cond; @@ -209,7 +197,6 @@ ALLOC_DECLARE(qd_listener_t); ALLOC_DECLARE(qd_deferred_call_t); ALLOC_DECLARE(qd_connector_t); ALLOC_DECLARE(qd_connection_t); -ALLOC_DECLARE(qd_user_fd_t); ALLOC_DECLARE(qd_pn_free_link_session_t); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index a1d2520d66..dc72e85d2f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -29,7 +29,6 @@ set(unit_test_SOURCES compose_test.c policy_test.c run_unit_tests.c - server_test.c timer_test.c tool_test.c ) diff --git a/tests/run_unit_tests.c b/tests/run_unit_tests.c index 173fc38694..203ee47677 100644 --- a/tests/run_unit_tests.c +++ b/tests/run_unit_tests.c @@ -26,7 +26,6 @@ int tool_tests(void); int timer_tests(void); int alloc_tests(void); -int server_tests(qd_dispatch_t *qd); int compose_tests(void); int policy_tests(void); @@ -53,7 +52,6 @@ int main(int argc, char** argv) return 1; } result += timer_tests(); - result += server_tests(qd); result += tool_tests(); result += compose_tests(); #if USE_MEMORY_POOL diff --git a/tests/server_test.c b/tests/server_test.c deleted file mode 100755 index 621f039fa8..0000000000 --- a/tests/server_test.c +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#define _GNU_SOURCE -#include -#include -#include -#include -#include -#include "test_case.h" -#include - -#define THREAD_COUNT 4 -#define OCTET_COUNT 100 - -static qd_dispatch_t *qd; -static sys_mutex_t *test_lock; - -static int call_count; -static char stored_error[512]; - -static int write_count; -static int read_count; -static int fd[2]; -static qd_user_fd_t *ufd_write; -static qd_user_fd_t *ufd_read; - - -static void ufd_handler(void *context, qd_user_fd_t *ufd) -{ - long dir = (long) context; - char buffer; - ssize_t len; - static int in_read = 0; - static int in_write = 0; - - if (dir == 0) { // READ - in_read++; - assert(in_read == 1); - len = read(fd[0], &buffer, 1); - if (len < 0) { - sprintf(stored_error, "Error while reading"); - qd_server_stop(qd); - } else if (len == 1) { - read_count++; - if (read_count == OCTET_COUNT) - qd_server_stop(qd); - } - qd_user_fd_activate_read(ufd_read); - in_read--; - } else { // WRITE - in_write++; - assert(in_write == 1); - if (!qd_user_fd_is_writeable(ufd_write)) { - sprintf(stored_error, "Expected Writable"); - qd_server_stop(qd); - } else { - if (write(fd[1], "X", 1) < 0) abort(); - - write_count++; - if (write_count < OCTET_COUNT) - qd_user_fd_activate_write(ufd_write); - } - in_write--; - } -} - - -static void fd_test_start(void *context, int unused) -{ - if (++call_count == THREAD_COUNT) { - qd_user_fd_activate_read(ufd_read); - } -} - - -static char* test_user_fd(void *context) -{ - int res; - - call_count = 0; - qd_server_set_start_handler(qd, fd_test_start, 0); - qd_server_set_user_fd_handler(qd, ufd_handler); - - stored_error[0] = 0x0; - - res = pipe(fd); // Don't use pipe2 because it's not available on RHEL5 - if (res != 0) return "Error creating pipe2"; - - for (int i = 0; i < 2; i++) { - int flags = fcntl(fd[i], F_GETFL); - flags |= O_NONBLOCK; - if (fcntl(fd[i], F_SETFL, flags) < 0) { - perror("fcntl"); - return "Failed to set socket to non-blocking"; - } - } - - ufd_write = qd_user_fd(qd, fd[1], (void*) 1); - ufd_read = qd_user_fd(qd, fd[0], (void*) 0); - - qd_server_run(qd); - close(fd[0]); - close(fd[1]); - - qd_user_fd_free(ufd_read); - qd_user_fd_free(ufd_write); - - if (stored_error[0]) return stored_error; - if (write_count - OCTET_COUNT > 2) sprintf(stored_error, "Excessively high Write Count: %d", write_count); - if (read_count != OCTET_COUNT) sprintf(stored_error, "Incorrect Read Count: %d", read_count);; - - if (stored_error[0]) return stored_error; - return 0; -} - - -int server_tests(qd_dispatch_t *_qd) -{ - int result = 0; - test_lock = sys_mutex(); - - qd = _qd; - - TEST_CASE(test_user_fd, 0); - - sys_mutex_free(test_lock); - return result; -} - From f82944b3019685d283b785bc9df78308fa3bdc8b Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 19 Jan 2017 13:51:43 -0500 Subject: [PATCH 3/4] DISPATCH-390: Move declarations from server_private.h to server.c Move unnecessarily public declarations from server_private.h to server.c --- src/policy.c | 18 ++++++------ src/server.c | 64 ++++++++++++++++++++++++++++++++++++++++++ src/server_private.h | 66 ++------------------------------------------ 3 files changed, 76 insertions(+), 72 deletions(-) diff --git a/src/policy.c b/src/policy.c index cea02167e1..182fc617df 100644 --- a/src/policy.c +++ b/src/policy.c @@ -400,7 +400,7 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn) } } pn_connection_t *conn = qd_connection_pn(qd_conn); - qd_dispatch_t *qd = qd_conn->server->qd; + qd_dispatch_t *qd = qd_server_dispatch(qd_conn->server); qd_policy_t *policy = qd->policy; const char *hostip = qd_connection_hostip(qd_conn); const char *vhost = pn_connection_remote_hostname(conn); @@ -580,7 +580,7 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_ if (qd_conn->policy_settings->maxSenders) { if (qd_conn->n_senders == qd_conn->policy_settings->maxSenders) { // Max sender limit specified and violated. - qd_log(qd_conn->server->qd->policy->log_source, QD_LOG_INFO, + qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO, "DENY AMQP Attach sender for user '%s', rhost '%s', vhost '%s' based on maxSenders limit", qd_conn->user_id, hostip, vhost); _qd_policy_deny_amqp_sender_link(pn_link, qd_conn); @@ -598,7 +598,7 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_ // a target is specified lookup = _qd_policy_approve_link_name(qd_conn->user_id, qd_conn->policy_settings->targets, target); - qd_log(qd_conn->server->qd->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO), + qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO), "%s AMQP Attach sender link '%s' for user '%s', rhost '%s', vhost '%s' based on link target name", (lookup ? "ALLOW" : "DENY"), target, qd_conn->user_id, hostip, vhost); @@ -610,7 +610,7 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_ // A sender with no remote target. // This happens all the time with anonymous relay lookup = qd_conn->policy_settings->allowAnonymousSender; - qd_log(qd_conn->server->qd->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO), + qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO), "%s AMQP Attach anonymous sender for user '%s', rhost '%s', vhost '%s'", (lookup ? "ALLOW" : "DENY"), qd_conn->user_id, hostip, vhost); if (!lookup) { @@ -631,7 +631,7 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q if (qd_conn->policy_settings->maxReceivers) { if (qd_conn->n_receivers == qd_conn->policy_settings->maxReceivers) { // Max sender limit specified and violated. - qd_log(qd_conn->server->qd->policy->log_source, QD_LOG_INFO, + qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO, "DENY AMQP Attach receiver for user '%s', rhost '%s', vhost '%s' based on maxReceivers limit", qd_conn->user_id, hostip, vhost); _qd_policy_deny_amqp_receiver_link(pn_link, qd_conn); @@ -646,7 +646,7 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q bool dynamic_src = pn_terminus_is_dynamic(pn_link_remote_source(pn_link)); if (dynamic_src) { bool lookup = qd_conn->policy_settings->allowDynamicSource; - qd_log(qd_conn->server->qd->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO), + qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO), "%s AMQP Attach receiver dynamic source for user '%s', rhost '%s', vhost '%s',", (lookup ? "ALLOW" : "DENY"), qd_conn->user_id, hostip, vhost); // Dynamic source policy rendered the decision @@ -660,7 +660,7 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q // a source is specified bool lookup = _qd_policy_approve_link_name(qd_conn->user_id, qd_conn->policy_settings->sources, source); - qd_log(qd_conn->server->qd->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO), + qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO), "%s AMQP Attach receiver link '%s' for user '%s', rhost '%s', vhost '%s' based on link source name", (lookup ? "ALLOW" : "DENY"), source, qd_conn->user_id, hostip, vhost); @@ -670,7 +670,7 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q } } else { // A receiver with no remote source. - qd_log(qd_conn->server->qd->policy->log_source, QD_LOG_TRACE, + qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_TRACE, "DENY AMQP Attach receiver link '' for user '%s', rhost '%s', vhost '%s'", qd_conn->user_id, hostip, vhost); _qd_policy_deny_amqp_receiver_link(pn_link, qd_conn); @@ -688,7 +688,7 @@ void qd_policy_amqp_open(void *context, bool discard) qd_connection_t *qd_conn = (qd_connection_t *)context; if (!discard) { pn_connection_t *conn = qd_connection_pn(qd_conn); - qd_dispatch_t *qd = qd_conn->server->qd; + qd_dispatch_t *qd = qd_server_dispatch(qd_conn->server); qd_policy_t *policy = qd->policy; bool connection_allowed = true; diff --git a/src/server.c b/src/server.c index a86500afd4..7da9f232f0 100644 --- a/src/server.c +++ b/src/server.c @@ -38,6 +38,60 @@ #include #include +typedef struct qd_thread_t { + qd_server_t *qd_server; + int thread_id; + volatile int running; + volatile int canceled; + int using_thread; + sys_thread_t *thread; +} qd_thread_t; + + +typedef struct qd_work_item_t { + DEQ_LINKS(struct qd_work_item_t); + qdpn_connector_t *cxtr; +} qd_work_item_t; + +DEQ_DECLARE(qd_work_item_t, qd_work_list_t); + + +struct qd_server_t { + qd_dispatch_t *qd; + int thread_count; + const char *container_name; + const char *sasl_config_path; + const char *sasl_config_name; + qdpn_driver_t *driver; + qd_log_source_t *log_source; + qd_thread_start_cb_t start_handler; + qd_conn_handler_cb_t conn_handler; + qd_pn_event_handler_cb_t pn_event_handler; + qd_pn_event_complete_cb_t pn_event_complete_handler; + void *start_context; + void *conn_handler_context; + sys_cond_t *cond; + sys_mutex_t *lock; + qd_thread_t **threads; + qd_work_list_t work_queue; + qd_timer_list_t pending_timers; + bool a_thread_is_waiting; + int threads_active; + int pause_requests; + int threads_paused; + int pause_next_sequence; + int pause_now_serving; + qd_signal_handler_cb_t signal_handler; + bool signal_handler_running; + void *signal_context; + int pending_signal; + qd_connection_list_t connections; + qd_timer_t *heartbeat_timer; + uint64_t next_connection_id; + void *py_displayname_obj; + qd_http_server_t *http; +}; + static __thread qd_server_t *thread_server = 0; #define HEARTBEAT_INTERVAL 1000 @@ -1645,3 +1699,13 @@ void qd_server_timer_cancel_LH(qd_timer_t *timer) { DEQ_REMOVE(timer->server->pending_timers, timer); } + +qd_dispatch_t* qd_server_dispatch(qd_server_t *server) { return server->qd; } + +const char* qd_connection_name(const qd_connection_t *c) { + return qdpn_connector_name(c->pn_cxtr); +} + +const char* qd_connection_hostip(const qd_connection_t *c) { + return qdpn_connector_hostip(c->pn_cxtr); +} diff --git a/src/server_private.h b/src/server_private.h index 3bec06385c..896128d2cc 100644 --- a/src/server_private.h +++ b/src/server_private.h @@ -34,6 +34,9 @@ void qd_server_timer_pending_LH(qd_timer_t *timer); void qd_server_timer_cancel_LH(qd_timer_t *timer); +struct qd_dispatch_t* qd_server_dispatch(qd_server_t *server); +const char* qd_connection_name(const qd_connection_t *c); +const char* qd_connection_hostip(const qd_connection_t *c); #define CONTEXT_NO_OWNER -1 #define CONTEXT_UNSPECIFIED_OWNER -2 @@ -128,71 +131,8 @@ struct qd_connection_t { qd_pn_free_link_session_list_t free_link_session_list; }; -static inline const char* qd_connection_name(const qd_connection_t *c) { - return qdpn_connector_name(c->pn_cxtr); -} - -static inline const char* qd_connection_hostip(const qd_connection_t *c) { - return qdpn_connector_hostip(c->pn_cxtr); -} - DEQ_DECLARE(qd_connection_t, qd_connection_list_t); -typedef struct qd_thread_t { - qd_server_t *qd_server; - int thread_id; - volatile int running; - volatile int canceled; - int using_thread; - sys_thread_t *thread; -} qd_thread_t; - - -typedef struct qd_work_item_t { - DEQ_LINKS(struct qd_work_item_t); - qdpn_connector_t *cxtr; -} qd_work_item_t; - -DEQ_DECLARE(qd_work_item_t, qd_work_list_t); - - -struct qd_server_t { - qd_dispatch_t *qd; - int thread_count; - const char *container_name; - const char *sasl_config_path; - const char *sasl_config_name; - qdpn_driver_t *driver; - qd_log_source_t *log_source; - qd_thread_start_cb_t start_handler; - qd_conn_handler_cb_t conn_handler; - qd_pn_event_handler_cb_t pn_event_handler; - qd_pn_event_complete_cb_t pn_event_complete_handler; - void *start_context; - void *conn_handler_context; - sys_cond_t *cond; - sys_mutex_t *lock; - qd_thread_t **threads; - qd_work_list_t work_queue; - qd_timer_list_t pending_timers; - bool a_thread_is_waiting; - int threads_active; - int pause_requests; - int threads_paused; - int pause_next_sequence; - int pause_now_serving; - qd_signal_handler_cb_t signal_handler; - bool signal_handler_running; - void *signal_context; - int pending_signal; - qd_connection_list_t connections; - qd_timer_t *heartbeat_timer; - uint64_t next_connection_id; - void *py_displayname_obj; - qd_http_server_t *http; -}; - -ALLOC_DECLARE(qd_work_item_t); ALLOC_DECLARE(qd_listener_t); ALLOC_DECLARE(qd_deferred_call_t); ALLOC_DECLARE(qd_connector_t); From 9b8f0efe9b0cd8278de1a66413da9b14c4ebf0a2 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 19 Jan 2017 15:59:14 -0500 Subject: [PATCH 4/4] DISPATCH-390: Move move declarations from server_private.h to server.c Move unnecessarily public declarations from server_private.h to server.c --- include/qpid/dispatch/server.h | 1 + src/connection_manager.c | 2 +- src/http-libwebsockets.c | 2 +- src/router_node.c | 4 ++-- src/server.c | 39 ++++++++++++++++++++++++++++++++++ src/server_private.h | 33 ++++++++-------------------- 6 files changed, 53 insertions(+), 28 deletions(-) diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h index 0562222d79..dd868a6fdb 100644 --- a/include/qpid/dispatch/server.h +++ b/include/qpid/dispatch/server.h @@ -603,6 +603,7 @@ pn_connection_t *qd_connection_pn(qd_connection_t *conn); bool qd_connection_inbound(qd_connection_t *conn); +/* FIXME aconway 2017-01-19: hide for batching */ /** * Get the event collector for a connection. * diff --git a/src/connection_manager.c b/src/connection_manager.c index 70ebe6dbae..c785dea04b 100644 --- a/src/connection_manager.c +++ b/src/connection_manager.c @@ -499,7 +499,7 @@ void qd_connection_manager_start(qd_dispatch_t *qd) if (cl->listener == 0 ) if (cl->state == QD_BIND_NONE) { //Try to start listening only if we have never tried to listen on that port before cl->listener = qd_server_listen(qd, &cl->configuration, cl); - if (cl->listener && cl->listener->pn_listener) + if (cl->listener) cl->state = QD_BIND_SUCCESSFUL; else { cl->state = QD_BIND_FAILED; diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c index 798851a2db..9ceaa49fe0 100644 --- a/src/http-libwebsockets.c +++ b/src/http-libwebsockets.c @@ -265,7 +265,7 @@ static void check_timer(void *void_http_server) { static qd_http_listener_t * qdpn_connector_http_listener(qdpn_connector_t* c) { qd_listener_t* ql = (qd_listener_t*)qdpn_listener_context(qdpn_connector_listener(c)); - return ql->http; + return qd_listener_http(ql); } static void http_connector_process(qdpn_connector_t *c) { diff --git a/src/router_node.c b/src/router_node.c index fa466a68b4..d3baf8e426 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -622,9 +622,9 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool const char *host = 0; const qd_server_config_t *config; - if (conn->connector) { + if (qd_connection_connector(conn)) { char host_local[255]; - config = conn->connector->config; + config = qd_connector_config(qd_connection_connector(conn)); snprintf(host_local, strlen(config->host)+strlen(config->port)+2, "%s:%s", config->host, config->port); host = &host_local[0]; } diff --git a/src/server.c b/src/server.c index 7da9f232f0..4ba100c383 100644 --- a/src/server.c +++ b/src/server.c @@ -92,6 +92,33 @@ struct qd_server_t { qd_http_server_t *http; }; +/** + * Listener objects represent the desire to accept incoming transport connections. + */ +struct qd_listener_t { + qd_server_t *server; + const qd_server_config_t *config; + void *context; + qdpn_listener_t *pn_listener; + qd_http_listener_t *http; +}; + + +/** + * Connector objects represent the desire to create and maintain an outgoing transport connection. + */ +struct qd_connector_t { + qd_server_t *server; + cxtr_state_t state; + const qd_server_config_t *config; + void *context; + qd_connection_t *ctx; + qd_timer_t *timer; + long delay; +}; + + + static __thread qd_server_t *thread_server = 0; #define HEARTBEAT_INTERVAL 1000 @@ -1709,3 +1736,15 @@ const char* qd_connection_name(const qd_connection_t *c) { const char* qd_connection_hostip(const qd_connection_t *c) { return qdpn_connector_hostip(c->pn_cxtr); } + +qd_connector_t* qd_connection_connector(const qd_connection_t *c) { + return c->connector; +} + +const qd_server_config_t *qd_connector_config(const qd_connector_t *c) { + return c->config; +} + +qd_http_listener_t *qd_listener_http(qd_listener_t *l) { + return l->http; +} diff --git a/src/server_private.h b/src/server_private.h index 896128d2cc..576b5eccb7 100644 --- a/src/server_private.h +++ b/src/server_private.h @@ -34,9 +34,18 @@ void qd_server_timer_pending_LH(qd_timer_t *timer); void qd_server_timer_cancel_LH(qd_timer_t *timer); + +/* FIXME aconway 2017-01-19: to include/server.h? */ + struct qd_dispatch_t* qd_server_dispatch(qd_server_t *server); + const char* qd_connection_name(const qd_connection_t *c); const char* qd_connection_hostip(const qd_connection_t *c); +qd_connector_t* qd_connection_connector(const qd_connection_t *c); + +const qd_server_config_t *qd_connector_config(const qd_connector_t *c); + +qd_http_listener_t *qd_listener_http(qd_listener_t *l); #define CONTEXT_NO_OWNER -1 #define CONTEXT_UNSPECIFIED_OWNER -2 @@ -54,30 +63,6 @@ typedef enum { } cxtr_state_t; -/** - * Listener objects represent the desire to accept incoming transport connections. - */ -struct qd_listener_t { - qd_server_t *server; - const qd_server_config_t *config; - void *context; - qdpn_listener_t *pn_listener; - qd_http_listener_t *http; -}; - - -/** - * Connector objects represent the desire to create and maintain an outgoing transport connection. - */ -struct qd_connector_t { - qd_server_t *server; - cxtr_state_t state; - const qd_server_config_t *config; - void *context; - qd_connection_t *ctx; - qd_timer_t *timer; - long delay; -}; typedef struct qd_deferred_call_t {