Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: beekhof/pacemaker
...
head fork: beekhof/pacemaker
  • 1 commit
  • 1 file changed
  • 0 commit comments
  • 1 contributor
Commits on Dec 20, 2011
@beekhof wip: Prepare for libqb ipc a19029e
Showing with 242 additions and 90 deletions.
  1. +242 −90 lib/common/ipc.c
View
332 lib/common/ipc.c
@@ -34,7 +34,7 @@
#include <crm/common/cluster.h>
xmlNode *
-xmlfromIPC(IPC_Channel * ch, int timeout)
+xmlfromIPC(crm_ipcc_connection * ch, int timeout)
{
xmlNode *xml = NULL;
HA_Message *msg = NULL;
@@ -82,7 +82,7 @@ xmlfromIPC(IPC_Channel * ch, int timeout)
}
static int
-xml2ipcchan(xmlNode * m, IPC_Channel * ch)
+xml2ipcchan(xmlNode * m, crm_ipcc_connection * ch)
{
HA_Message *msg = NULL;
IPC_Message *imsg = NULL;
@@ -116,7 +116,50 @@ xml2ipcchan(xmlNode * m, IPC_Channel * ch)
/* frees msg */
gboolean
-send_ipc_message(IPC_Channel * ipc_client, xmlNode * msg)
+send_ipc_message(crm_ipcc_connection * ipc_client, xmlNode * msg)
+{
+ gboolean all_is_good = TRUE;
+ int fail_level = LOG_WARNING;
+
+ if (ipc_client != NULL && ipc_client->conntype == IPC_CLIENT) {
+ fail_level = LOG_ERR;
+ }
+
+ if (msg == NULL) {
+ crm_err("cant send NULL message");
+ all_is_good = FALSE;
+
+ } else if (ipc_client == NULL) {
+ crm_err("cant send message without an IPC Channel");
+ all_is_good = FALSE;
+
+ } else if (ipc_client->ops->get_chan_status(ipc_client) != IPC_CONNECT) {
+ do_crm_log(fail_level, "IPC Channel to %d is not connected", (int)ipc_client->farside_pid);
+ all_is_good = FALSE;
+ }
+
+ if (all_is_good && xml2ipcchan(msg, ipc_client) != HA_OK) {
+ do_crm_log(fail_level, "Could not send IPC message to %d", (int)ipc_client->farside_pid);
+ all_is_good = FALSE;
+
+ if (ipc_client->ops->get_chan_status(ipc_client) != IPC_CONNECT) {
+ do_crm_log(fail_level,
+ "IPC Channel to %d is no longer connected", (int)ipc_client->farside_pid);
+
+ } else if (ipc_client->conntype == IPC_CLIENT) {
+ if (ipc_client->send_queue->current_qlen >= ipc_client->send_queue->max_qlen) {
+ crm_err("Send queue to %d (size=%d) full.",
+ ipc_client->farside_pid, (int)ipc_client->send_queue->max_qlen);
+ }
+ }
+ }
+ /* crm_log_xml(all_is_good?LOG_MSG:LOG_WARNING,"IPC[outbound]",msg); */
+
+ return all_is_good;
+}
+
+gboolean
+send_ipcs_message(crm_ipcs_connection * ipc_client, xmlNode * msg)
{
gboolean all_is_good = TRUE;
int fail_level = LOG_WARNING;
@@ -164,13 +207,197 @@ default_ipc_connection_destroy(gpointer user_data)
return;
}
+
+#ifdef LIBQB_IPC
+struct gio_to_qb_poll {
+ int32_t is_used;
+ GIOChannel *channel;
+ int32_t events;
+ void * data;
+ qb_ipcs_dispatch_fn_t fn;
+ enum qb_loop_priority p;
+};
+
+static qb_array_t *gio_map;
+
+static gboolean
+gio_read_socket (GIOChannel *gio, GIOCondition condition, gpointer data)
+{
+ struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
+ gint fd = g_io_channel_unix_get_fd(gio);
+
+ return (adaptor->fn(fd, condition, adaptor->data) == 0);
+}
+
+static int32_t pcmk_ipcs_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
+ void *data, qb_ipcs_dispatch_fn_t fn)
+{
+ struct gio_to_qb_poll *adaptor;
+ GIOChannel *channel;
+ int32_t res = 0;
+
+ res = qb_array_grow(gio_map, fd + 1);
+ if (res < 0) {
+ return res;
+ }
+ res = qb_array_index(gio_map, fd, (void**)&adaptor);
+ if (res < 0) {
+ return res;
+ }
+ if (adaptor->is_used) {
+ return -EEXIST;
+ }
+
+ channel = g_io_channel_unix_new(fd);
+ if (!channel) {
+ return -ENOMEM;
+ }
+
+ adaptor->channel = channel;
+ adaptor->fn = fn;
+ adaptor->events = evts;
+ adaptor->data = data;
+ adaptor->p = p;
+ adaptor->is_used = QB_TRUE;
+
+ g_io_add_watch(channel, evts, gio_read_socket, adaptor);
+ return 0;
+}
+
+static int32_t pcmk_ipcs_dispatch_del(int32_t fd)
+{
+ struct gio_to_qb_poll *adaptor;
+ if (qb_array_index(gio_map, fd, (void**)&adaptor) == 0) {
+ g_io_channel_unref(adaptor->channel);
+ adaptor->is_used = QB_FALSE;
+ }
+ return 0;
+}
+
+
+
+static int32_t s1_connection_accept_fn(qb_ipcs_connection_t *c, uid_t uid, gid_t gid)
+{
+#if 0
+ if (uid == 0 && gid == 0) {
+ crm_trace("authenticated connection");
+ return 1;
+ }
+ crm_err("BAD user!");
+ return 0;
+#else
+ return 0;
+#endif
+}
+
+
+static void s1_connection_created_fn(qb_ipcs_connection_t *c)
+{
+ struct qb_ipcs_stats srv_stats;
+
+ qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE);
+ crm_trace("Connection created > active:%d > closed:%d",
+ srv_stats.active_connections,
+ srv_stats.closed_connections);
+}
+
+static int32_t s1_msg_process_fn(qb_ipcs_connection_t *c, void *data, size_t size)
+{
+ struct qb_ipc_request_header *req_pt = (struct qb_ipc_request_header *)data;
+ struct qb_ipc_response_header response;
+ ssize_t res;
+
+ crm_trace("msg:%d, size:%d", req_pt->id, req_pt->size);
+ response.size = sizeof(struct qb_ipc_response_header);
+ response.id = 13;
+ response.error = 0;
+ if (blocking) {
+ res = qb_ipcs_response_send(c, &response, sizeof(response));
+ if (res < 0) {
+ crm_perror(LOG_ERR, "qb_ipcs_response_send");
+ }
+ }
+ if (events) {
+ res = qb_ipcs_event_send(c, &response, sizeof(response));
+ if (res < 0) {
+ crm_perror(LOG_ERR, "qb_ipcs_event_send");
+ }
+ }
+ return 0;
+}
+
+static int32_t s1_connection_closed_fn(qb_ipcs_connection_t *c)
+{
+ struct qb_ipcs_connection_stats stats;
+ struct qb_ipcs_stats srv_stats;
+
+ qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE);
+
+ qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
+
+ crm_trace(" Connection to pid:%d destroyed > active:%d > closed:%d",
+ stats.client_pid,
+ srv_stats.active_connections,
+ srv_stats.closed_connections);
+
+ crm_trace(" Requests %"PRIu64, stats.requests);
+ crm_trace(" Responses %"PRIu64, stats.responses);
+ crm_trace(" Events %"PRIu64, stats.events);
+ crm_trace(" Send retries %"PRIu64, stats.send_retries);
+ crm_trace(" Recv retries %"PRIu64, stats.recv_retries);
+ crm_trace(" FC state %d", stats.flow_control_state);
+ crm_trace(" FC count %"PRIu64, stats.flow_control_count);
+ return 0;
+}
+
+static void s1_connection_destroyed_fn(qb_ipcs_connection_t *c)
+{
+ crm_trace("connection about to be freed");
+}
+
+#endif
+
int
init_server_ipc_comms(char *channel_name,
- gboolean(*channel_client_connect) (IPC_Channel * newclient,
- gpointer user_data),
- void (*channel_connection_destroy) (gpointer user_data))
+ gboolean(*connect) (crm_ipcc_connection * newclient, gpointer user_data),
+ void (*destroy) (gpointer user_data))
{
- /* the clients wait channel is the other source of events.
+#ifdef LIBQB_IPC
+ struct qb_ipcs_service_handlers sh = {
+ .connection_accept = ipc_server_connection_accept_fn,
+ .connection_created = ipc_server_connection_created_fn,
+ .msg_process = ipc_server_msg_process_fn,
+ .connection_destroyed = ipc_server_connection_destroyed_fn,
+ .connection_closed = ipc_server_connection_closed_fn,
+ };
+ struct qb_ipcs_poll_handlers glib_ph = {
+ .job_add = NULL, /* FIXME */
+ .dispatch_add = pcmk_ipcs_dispatch_add,
+ .dispatch_mod = NULL,
+ .dispatch_del = pcmk_ipcs_dispatch_del,
+ };
+
+ qb_ipcs_service_t *ipc_server = NULL;
+ enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
+
+ if(gio_map == NULL) {
+ gio_map = qb_array_create(64, sizeof(struct gio_to_qb_poll));
+ }
+
+ ipc_server = qb_ipcs_create(channel_name, sender, QB_IPC_SOCKET, &sh);
+ if (ipc_server == 0) {
+ crm_perror(LOG_ERR, "qb_ipcs_create");
+ return -1;
+ }
+
+ /* qb_ipcs_context_set(struct qb_ipcs_connection *c, channel_name); */
+ qb_ipcs_poll_handlers_set(ipc_server, &glib_ph);
+ qb_ipcs_run(ipc_server);
+
+ res = qb_ipcs_run(ipc_server);
+
+#else
+/* the clients wait channel is the other source of events.
* This source delivers the clients connection events.
* listen to this source at a relatively lower priority.
*/
@@ -190,16 +417,16 @@ init_server_ipc_comms(char *channel_name,
channel_client_connect, channel_name, channel_connection_destroy);
crm_debug_3("Listening on: %s", commpath);
-
+#endif
return 0;
}
GCHSource *
init_client_ipc_comms(const char *channel_name,
- gboolean(*dispatch) (IPC_Channel * source_data, gpointer user_data),
- void *client_data, IPC_Channel ** ch)
+ gboolean(*dispatch) (crm_ipcc_connection * source_data, gpointer user_data),
+ void *client_data, crm_ipcc_connection ** ch)
{
- IPC_Channel *a_ch = NULL;
+ crm_ipcc_connection *a_ch = NULL;
GCHSource *the_source = NULL;
void *callback_data = client_data;
@@ -230,10 +457,10 @@ init_client_ipc_comms(const char *channel_name,
return the_source;
}
-IPC_Channel *
+crm_ipcc_connection *
init_client_ipc_comms_nodispatch(const char *channel_name)
{
- IPC_Channel *ch;
+ crm_ipcc_connection *ch;
GHashTable *attrs;
static char path[] = IPC_PATH_ATTR;
@@ -302,82 +529,7 @@ wait_channel_init(char daemonsocket[])
}
gboolean
-subsystem_msg_dispatch(IPC_Channel * sender, void *user_data)
-{
- int lpc = 0;
- xmlNode *msg = NULL;
- xmlNode *data = NULL;
- gboolean all_is_well = TRUE;
- const char *sys_to;
- const char *task;
-
- gboolean(*process_function)
- (xmlNode * msg, xmlNode * data, IPC_Channel * sender) = NULL;
-
- while (IPC_ISRCONN(sender)) {
- gboolean process = FALSE;
-
- if (sender->ops->is_message_pending(sender) == 0) {
- break;
- }
-
- msg = xmlfromIPC(sender, MAX_IPC_DELAY);
- if (msg == NULL) {
- break;
- }
-
- lpc++;
- crm_log_xml(LOG_MSG, __FUNCTION__, msg);
-
- sys_to = crm_element_value(msg, F_CRM_SYS_TO);
- task = crm_element_value(msg, F_CRM_TASK);
-
- if (safe_str_eq(task, CRM_OP_HELLO)) {
- process = TRUE;
-
- } else if (sys_to == NULL) {
- crm_err("Value of %s was NULL!!", F_CRM_SYS_TO);
-
- } else if (task == NULL) {
- crm_err("Value of %s was NULL!!", F_CRM_TASK);
-
- } else {
- process = TRUE;
- }
-
- if (process == FALSE) {
- free_xml(msg);
- msg = NULL;
- continue;
- }
-
- data = get_message_xml(msg, F_CRM_DATA);
- process_function = user_data;
- if (FALSE == process_function(msg, data, sender)) {
- crm_warn("Received a message destined for %s" " by mistake", sys_to);
- }
-
- free_xml(msg);
- msg = NULL;
-
- if (sender->ch_status == IPC_CONNECT) {
- break;
- }
- }
-
- crm_debug_2("Processed %d messages", lpc);
- if (sender->ch_status != IPC_CONNECT) {
- crm_err("The server %d has left us: Shutting down...NOW", sender->farside_pid);
-
- exit(1); /* shutdown properly later */
-
- return !all_is_well;
- }
- return all_is_well;
-}
-
-gboolean
-is_ipc_empty(IPC_Channel * ch)
+is_ipc_empty(crm_ipcc_connection * ch)
{
if (ch == NULL) {
return TRUE;
@@ -389,7 +541,7 @@ is_ipc_empty(IPC_Channel * ch)
}
void
-send_hello_message(IPC_Channel * ipc_client,
+send_hello_message(crm_ipcc_connection * ipc_client,
const char *uuid,
const char *client_name, const char *major_version, const char *minor_version)
{

No commit comments for this range

Something went wrong with that request. Please try again.