diff --git a/cib/main.c b/cib/main.c index 79b79d903c..08a0c19d9e 100644 --- a/cib/main.c +++ b/cib/main.c @@ -440,7 +440,6 @@ cib_ais_dispatch(AIS_Message * wrapper, char *data, int sender) static void cib_ais_destroy(gpointer user_data) { - ais_fd_sync = -1; if (cib_shutdown_flag) { crm_info("Corosync disconnection complete"); } else { diff --git a/crmd/corosync.c b/crmd/corosync.c index 3d846b4cd2..c68ab7e6e3 100644 --- a/crmd/corosync.c +++ b/crmd/corosync.c @@ -117,7 +117,7 @@ crmd_cman_dispatch(unsigned long long seq, gboolean quorate) } static void -crmd_cman_destroy(gpointer user_data) +crmd_quorum_destroy(gpointer user_data) { if (is_set(fsa_input_register, R_HA_DISCONNECTED)) { crm_err("connection terminated"); @@ -129,7 +129,7 @@ crmd_cman_destroy(gpointer user_data) } static void -crmd_quorum_destroy(gpointer user_data) +crmd_ais_destroy(gpointer user_data) { if (is_set(fsa_input_register, R_HA_DISCONNECTED)) { crm_err("connection terminated"); @@ -140,8 +140,9 @@ crmd_quorum_destroy(gpointer user_data) } } +#if SUPPORT_CMAN static void -crmd_ais_destroy(gpointer user_data) +crmd_cman_destroy(gpointer user_data) { if (is_set(fsa_input_register, R_HA_DISCONNECTED)) { crm_err("connection terminated"); @@ -151,6 +152,7 @@ crmd_ais_destroy(gpointer user_data) crm_info("connection closed"); } } +#endif extern gboolean crm_connect_corosync(void); @@ -169,10 +171,12 @@ crm_connect_corosync(void) init_quorum_connection(crmd_cman_dispatch, crmd_quorum_destroy); } +#if SUPPORT_CMAN if (rc && is_cman_cluster()) { init_cman_connection(crmd_cman_dispatch, crmd_cman_destroy); set_bit_inplace(fsa_input_register, R_MEMBERSHIP); } +#endif return rc; } diff --git a/fencing/main.c b/fencing/main.c index 81cd65480e..317738d542 100644 --- a/fencing/main.c +++ b/fencing/main.c @@ -282,7 +282,6 @@ static void stonith_peer_ais_destroy(gpointer user_data) { crm_err("AIS connection terminated"); - ais_fd_sync = -1; exit(1); } #endif diff --git a/include/crm/common/cluster.h b/include/crm/common/cluster.h index 11d8f632c4..a3c1d6c466 100644 --- a/include/crm/common/cluster.h +++ b/include/crm/common/cluster.h @@ -61,8 +61,6 @@ extern void destroy_crm_node(gpointer data); extern crm_node_t *crm_get_peer(unsigned int id, const char *uname); -extern crm_node_t *crm_update_ais_node(xmlNode * member, long long seq); -extern crm_node_t *crm_update_cman_node(xmlNode * member, long long seq); extern void crm_update_peer_proc(const char *uname, uint32_t flag, const char *status); extern crm_node_t *crm_update_peer(unsigned int id, uint64_t born, uint64_t seen, int32_t votes, uint32_t children, const char *uuid, const char *uname, diff --git a/lib/cluster/Makefile.am b/lib/cluster/Makefile.am index 2b11f987a5..e5dce425b1 100644 --- a/lib/cluster/Makefile.am +++ b/lib/cluster/Makefile.am @@ -33,8 +33,12 @@ libcrmcluster_la_LIBADD = $(top_builddir)/lib/common/libcrmcommon.la $(top_buil libcrmcluster_la_DEPENDENCIES = $(top_builddir)/lib/common/libcrmcommon.la $(top_builddir)/lib/fencing/libstonithd.la if BUILD_CS_SUPPORT +if BUILD_CS_PLUGIN +libcrmcluster_la_SOURCES += legacy.c +else libcrmcluster_la_SOURCES += corosync.c endif +endif if BUILD_HEARTBEAT_SUPPORT libcrmcluster_la_SOURCES += heartbeat.c diff --git a/lib/cluster/corosync.c b/lib/cluster/corosync.c index 1975e589f8..87ffff0607 100644 --- a/lib/cluster/corosync.c +++ b/lib/cluster/corosync.c @@ -18,53 +18,35 @@ #include #include +#include +#include +#include + #include #include #include #include #include "stack.h" -#if SUPPORT_COROSYNC -# if CS_USES_LIBQB -# include -# include -# include -# include -# include -# if HAVE_CONFDB -# include -# endif -# else -# include -# include -# endif -# include + +#include +#include + +#include +#include +#include +#include +#include +#include + cpg_handle_t pcmk_cpg_handle = 0; struct cpg_name pcmk_cpg_group = { .length = 0, .value[0] = 0, }; -#endif - -#if HAVE_CMAP -# include -#endif - -#if SUPPORT_CMAN -# include -cman_handle_t pcmk_cman_handle = NULL; -#endif - -#ifdef SUPPORT_CS_QUORUM -# include -# include -# include - -# include quorum_handle_t pcmk_quorum_handle = 0; - -#endif +gboolean(*quorum_app_callback) (unsigned long long seq, gboolean quorate) = NULL; static char *pcmk_uname = NULL; static int pcmk_uname_len = 0; @@ -124,107 +106,10 @@ text2msg_type(const char *text) return type; } -char * -get_ais_data(const AIS_Message * msg) -{ - int rc = BZ_OK; - char *uncompressed = NULL; - unsigned int new_size = msg->size + 1; - - if (msg->is_compressed == FALSE) { - crm_trace("Returning uncompressed message data"); - uncompressed = strdup(msg->data); - - } else { - crm_trace("Decompressing message data"); - crm_malloc0(uncompressed, new_size); - - rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, (char *)msg->data, - msg->compressed_size, 1, 0); - - CRM_ASSERT(rc == BZ_OK); - CRM_ASSERT(new_size == msg->size); - } - - return uncompressed; -} - -#if SUPPORT_COROSYNC -int ais_fd_sync = -1; -int ais_fd_async = -1; /* never send messages via this channel */ -void *ais_ipc_ctx = NULL; - -# if CS_USES_LIBQB -qb_ipcc_connection_t *ais_ipc_handle = NULL; -# else -hdb_handle_t ais_ipc_handle = 0; -# endif -GFDSource *ais_source = NULL; -GFDSource *ais_source_sync = NULL; -GFDSource *cman_source = NULL; GFDSource *cpg_source = NULL; GFDSource *quorumd_source = NULL; static char *ais_cluster_name = NULL; -gboolean -get_ais_nodeid(uint32_t * id, char **uname) -{ - struct iovec iov; - int retries = 0; - int rc = CS_OK; - cs_ipc_header_response_t header; - struct crm_ais_nodeid_resp_s answer; - - header.error = CS_OK; - header.id = crm_class_nodeid; - header.size = sizeof(cs_ipc_header_response_t); - - CRM_CHECK(id != NULL, return FALSE); - CRM_CHECK(uname != NULL, return FALSE); - - iov.iov_base = &header; - iov.iov_len = header.size; - - retry: - errno = 0; -# if CS_USES_LIBQB - rc = qb_to_cs_error(qb_ipcc_sendv_recv(ais_ipc_handle, &iov, 1, &answer, sizeof(answer), -1)); -# else - rc = coroipcc_msg_send_reply_receive(ais_ipc_handle, &iov, 1, &answer, sizeof(answer)); -# endif - if (rc == CS_OK) { - CRM_CHECK(answer.header.size == sizeof(struct crm_ais_nodeid_resp_s), - crm_err("Odd message: id=%d, size=%d, error=%d", - answer.header.id, answer.header.size, answer.header.error)); - CRM_CHECK(answer.header.id == crm_class_nodeid, - crm_err("Bad response id: %d", answer.header.id)); - } - - if ((rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) && retries < 20) { - retries++; - crm_info("Peer overloaded: Re-sending message (Attempt %d of 20)", retries); - sleep(retries); /* Proportional back off */ - goto retry; - } - - if (rc != CS_OK) { - crm_err("Sending nodeid request: FAILED (rc=%d): %s", rc, ais_error2text(rc)); - return FALSE; - - } else if (answer.header.error != CS_OK) { - crm_err("Bad response from peer: (rc=%d): %s", rc, ais_error2text(rc)); - return FALSE; - } - - crm_info("Server details: id=%u uname=%s cname=%s", answer.id, answer.uname, answer.cname); - - *id = answer.id; - *uname = crm_strdup(answer.uname); - ais_cluster_name = crm_strdup(answer.cname); - - return TRUE; -} - gboolean crm_get_cluster_name(char **cname) { @@ -242,7 +127,6 @@ send_ais_text(int class, const char *data, { static int msg_id = 0; static int local_pid = 0; - enum cluster_type_e cluster_type = get_cluster_type(); int retries = 0; int rc = CS_OK; @@ -251,7 +135,6 @@ send_ais_text(int class, const char *data, char *buf = NULL; struct iovec iov; const char *transport = "pcmk"; - cs_ipc_header_response_t *header = NULL; AIS_Message *ais_msg = NULL; enum crm_ais_msg_types sender = text2msg_type(crm_system_name); @@ -354,53 +237,22 @@ send_ais_text(int class, const char *data, } errno = 0; - switch (cluster_type) { - case pcmk_cluster_classic_ais: -# if CS_USES_LIBQB - rc = qb_to_cs_error(qb_ipcc_sendv_recv(ais_ipc_handle, &iov, 1, buf, buf_len, -1)); -# else - rc = coroipcc_msg_send_reply_receive(ais_ipc_handle, &iov, 1, buf, buf_len); -# endif - header = (cs_ipc_header_response_t *) buf; - if (rc == CS_OK) { - CRM_CHECK(header->size == sizeof(cs_ipc_header_response_t), - crm_err("Odd message: id=%d, size=%d, class=%d, error=%d", - header->id, header->size, class, header->error)); - - CRM_ASSERT(buf_len >= header->size); - CRM_CHECK(header->id == CRM_MESSAGE_IPC_ACK, - crm_err("Bad response id (%d) for request (%d)", header->id, - ais_msg->header.id)); - CRM_CHECK(header->error == CS_OK, rc = header->error); - } - break; + transport = "cpg"; + CRM_CHECK(dest != crm_msg_ais, rc = CS_ERR_MESSAGE_ERROR; goto bail); + rc = cpg_mcast_joined(pcmk_cpg_handle, CPG_TYPE_AGREED, &iov, 1); + if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { + cpg_flow_control_state_t fc_state = CPG_FLOW_CONTROL_DISABLED; + int rc2 = cpg_flow_control_state_get(pcmk_cpg_handle, &fc_state); - case pcmk_cluster_corosync: - case pcmk_cluster_cman: - transport = "cpg"; - CRM_CHECK(dest != crm_msg_ais, rc = CS_ERR_MESSAGE_ERROR; goto bail); - rc = cpg_mcast_joined(pcmk_cpg_handle, CPG_TYPE_AGREED, &iov, 1); - if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { - cpg_flow_control_state_t fc_state = CPG_FLOW_CONTROL_DISABLED; - int rc2 = cpg_flow_control_state_get(pcmk_cpg_handle, &fc_state); - - if (rc2 == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) { - crm_warn("Connection overloaded, cannot send messages"); - goto bail; - - } else if (rc2 != CS_OK) { - crm_warn("Could not determin the connection state: %s (%d)", - ais_error2text(rc2), rc2); - goto bail; - } - } - break; + if (rc2 == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) { + crm_warn("Connection overloaded, cannot send messages"); + goto bail; - case pcmk_cluster_unknown: - case pcmk_cluster_invalid: - case pcmk_cluster_heartbeat: - CRM_ASSERT(is_openais_cluster()); - break; + } else if (rc2 != CS_OK) { + crm_warn("Could not determin the connection state: %s (%d)", + ais_error2text(rc2), rc2); + goto bail; + } } } while ((rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) && retries < 20); @@ -423,16 +275,7 @@ gboolean send_ais_message(xmlNode * msg, gboolean local, const char *node, enum crm_ais_msg_types dest) { gboolean rc = TRUE; - char *data = NULL; - - if (is_classic_ais_cluster()) { - if (ais_fd_async < 0 || ais_source == NULL) { - crm_err("Not connected to AIS: %d %p", ais_fd_async, ais_source); - return FALSE; - } - } - - data = dump_xml_unformatted(msg); + char *data = dump_xml_unformatted(msg); rc = send_ais_text(crm_class_cluster, data, local, node, dest); crm_free(data); return rc; @@ -443,61 +286,24 @@ terminate_ais_connection(void) { crm_notice("Disconnecting from Corosync"); -/* G_main_del_fd(ais_source); */ -/* G_main_del_fd(ais_source_sync); */ - - if (is_classic_ais_cluster()) { - if(ais_ipc_handle) { - crm_trace("Disconnecting plugin"); -# if CS_USES_LIBQB - qb_ipcc_disconnect(ais_ipc_handle); - ais_ipc_handle = NULL; -# else - coroipcc_service_disconnect(ais_ipc_handle); - ais_ipc_handle = 0; -# endif - } else { - crm_info("No plugin connection"); - } - + if(pcmk_cpg_handle) { + crm_trace("Disconnecting CPG"); + cpg_leave(pcmk_cpg_handle, &pcmk_cpg_group); + cpg_finalize(pcmk_cpg_handle); + pcmk_cpg_handle = 0; + } else { - if(pcmk_cpg_handle) { - crm_trace("Disconnecting CPG"); - cpg_leave(pcmk_cpg_handle, &pcmk_cpg_group); - cpg_finalize(pcmk_cpg_handle); - pcmk_cpg_handle = 0; - - } else { - crm_info("No CPG connection"); - } - } - -# ifdef SUPPORT_CS_QUORUM - if (is_corosync_cluster()) { - if(pcmk_quorum_handle) { - crm_trace("Disconnecting quorum"); - quorum_finalize(pcmk_quorum_handle); - pcmk_quorum_handle = 0; - - } else { - crm_info("No Quorum connection"); - } + crm_info("No CPG connection"); } -# endif -# if SUPPORT_CMAN - if (is_cman_cluster()) { - if(pcmk_cman_handle) { - crm_trace("Disconnecting cman"); - cman_stop_notification(pcmk_cman_handle); - cman_finish(pcmk_cman_handle); - } else { - crm_info("No cman connection"); - } + if(pcmk_quorum_handle) { + crm_trace("Disconnecting quorum"); + quorum_finalize(pcmk_quorum_handle); + pcmk_quorum_handle = 0; + + } else { + crm_info("No Quorum connection"); } -# endif - ais_fd_async = -1; - ais_fd_sync = -1; } int ais_membership_timer = 0; @@ -563,49 +369,6 @@ ais_dispatch_message(AIS_Message * msg, gboolean(*dispatch) (AIS_Message *, char crm_info("Removing peer %s/%u", data, id); reap_crm_member(id); goto done; - - } else if (msg->header.id == crm_class_members || msg->header.id == crm_class_quorum) { - - xml = string2xml(data); - if (xml == NULL) { - crm_err("Invalid membership update: %s", data); - goto badmsg; - } - - if (is_classic_ais_cluster() == FALSE) { - xmlNode *node = NULL; - - for (node = __xml_first_child(xml); node != NULL; node = __xml_next(node)) { - crm_update_cman_node(node, crm_peer_seq); - } - - } else { - xmlNode *node = NULL; - const char *value = NULL; - gboolean quorate = FALSE; - - value = crm_element_value(xml, "quorate"); - CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No quorum value:"); goto badmsg); - if (crm_is_true(value)) { - quorate = TRUE; - } - - value = crm_element_value(xml, "id"); - CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No membership id"); goto badmsg); - crm_peer_seq = crm_int_helper(value, NULL); - - if (quorate != crm_have_quorum) { - crm_notice("Membership %s: quorum %s", value, quorate ? "acquired" : "lost"); - crm_have_quorum = quorate; - - } else { - crm_info("Membership %s: quorum %s", value, quorate ? "retained" : "still lost"); - } - - for (node = __xml_first_child(xml); node != NULL; node = __xml_next(node)) { - crm_update_ais_node(node, crm_peer_seq); - } - } } crm_trace("Payload: %s", data); @@ -628,65 +391,8 @@ ais_dispatch_message(AIS_Message * msg, gboolean(*dispatch) (AIS_Message *, char goto done; } -gboolean -ais_dispatch(int sender, gpointer user_data) -{ - int rc = CS_OK; - gboolean good = TRUE; - - gboolean(*dispatch) (AIS_Message *, char *, int) = user_data; - - do { -# if CS_USES_LIBQB - char buffer[AIS_IPC_MESSAGE_SIZE]; - - rc = qb_to_cs_error(qb_ipcc_event_recv(ais_ipc_handle, (void *)buffer, - AIS_IPC_MESSAGE_SIZE, 100)); -# else - char *buffer = NULL; - - rc = coroipcc_dispatch_get(ais_ipc_handle, (void **)&buffer, 0); -# endif - - if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { - return TRUE; - } - if (rc != CS_OK) { - crm_perror(LOG_ERR, "Receiving message body failed: (%d) %s", rc, ais_error2text(rc)); - goto bail; - } -# if !CS_USES_LIBQB - if (buffer == NULL) { - /* NULL is a legal "no message afterall" value */ - return TRUE; - } -# endif - good = ais_dispatch_message((AIS_Message *) buffer, dispatch); -# if !CS_USES_LIBQB - coroipcc_dispatch_put(ais_ipc_handle); -# endif - - } while (good && ais_ipc_handle); - - return good; - - bail: - if(ais_ipc_handle) { - crm_err("AIS connection failed: %p", (void*)ais_ipc_handle); - } - return FALSE; -} - -static void -ais_destroy(gpointer user_data) -{ - crm_err("AIS connection terminated"); - ais_fd_sync = -1; - exit(1); -} - static gboolean -pcmk_proc_dispatch(IPC_Channel * ch, gpointer user_data) +pcmk_mcp_dispatch(IPC_Channel * ch, gpointer user_data) { xmlNode *msg = NULL; gboolean stay_connected = TRUE; @@ -728,138 +434,6 @@ pcmk_proc_dispatch(IPC_Channel * ch, gpointer user_data) return stay_connected; } -# if SUPPORT_CMAN - -static gboolean -pcmk_cman_dispatch(int sender, gpointer user_data) -{ - int rc = cman_dispatch(pcmk_cman_handle, CMAN_DISPATCH_ALL); - - if (rc < 0) { - crm_err("Connection to cman failed: %d", rc); - return FALSE; - } - return TRUE; -} - -# define MAX_NODES 256 - -static void -cman_event_callback(cman_handle_t handle, void *privdata, int reason, int arg) -{ - int rc = 0, lpc = 0, node_count = 0; - - cman_cluster_t cluster; - static cman_node_t cman_nodes[MAX_NODES]; - - gboolean(*dispatch) (unsigned long long, gboolean) = privdata; - - switch (reason) { - case CMAN_REASON_STATECHANGE: - - memset(&cluster, 0, sizeof(cluster)); - rc = cman_get_cluster(pcmk_cman_handle, &cluster); - if (rc < 0) { - crm_err("Couldn't query cman cluster details: %d %d", rc, errno); - return; - } - - crm_peer_seq = cluster.ci_generation; - if (arg != crm_have_quorum) { - crm_notice("Membership %llu: quorum %s", crm_peer_seq, arg ? "acquired" : "lost"); - crm_have_quorum = arg; - - } else { - crm_info("Membership %llu: quorum %s", crm_peer_seq, - arg ? "retained" : "still lost"); - } - - rc = cman_get_nodes(pcmk_cman_handle, MAX_NODES, &node_count, cman_nodes); - if (rc < 0) { - crm_err("Couldn't query cman node list: %d %d", rc, errno); - return; - } - - for (lpc = 0; lpc < node_count; lpc++) { - if (cman_nodes[lpc].cn_nodeid == 0) { - /* Never allow node ID 0 to be considered a member #315711 */ - cman_nodes[lpc].cn_member = 0; - } - crm_update_peer(cman_nodes[lpc].cn_nodeid, cman_nodes[lpc].cn_incarnation, - cman_nodes[lpc].cn_member ? crm_peer_seq : 0, 0, 0, - cman_nodes[lpc].cn_name, cman_nodes[lpc].cn_name, NULL, - cman_nodes[lpc].cn_member ? CRM_NODE_MEMBER : CRM_NODE_LOST); - } - - if (dispatch) { - dispatch(crm_peer_seq, crm_have_quorum); - } - break; - - case CMAN_REASON_TRY_SHUTDOWN: - /* Always reply with a negative - pacemaker needs to be stopped first */ - crm_info("CMAN wants to shut down: %s", arg ? "forced" : "optional"); - cman_replyto_shutdown(pcmk_cman_handle, 0); - break; - - case CMAN_REASON_CONFIG_UPDATE: - /* Ignore */ - break; - } -} -# endif - -gboolean -init_cman_connection(gboolean(*dispatch) (unsigned long long, gboolean), void (*destroy) (gpointer)) -{ -# if SUPPORT_CMAN - int rc = -1, fd = -1; - cman_cluster_t cluster; - - crm_info("Configuring Pacemaker to obtain quorum from cman"); - - memset(&cluster, 0, sizeof(cluster)); - - pcmk_cman_handle = cman_init(dispatch); - if (pcmk_cman_handle == NULL || cman_is_active(pcmk_cman_handle) == FALSE) { - crm_err("Couldn't connect to cman"); - goto cman_bail; - } - - rc = cman_get_cluster(pcmk_cman_handle, &cluster); - if (rc < 0) { - crm_err("Couldn't query cman cluster details: %d %d", rc, errno); - goto cman_bail; - } - ais_cluster_name = crm_strdup(cluster.ci_name); - - rc = cman_start_notification(pcmk_cman_handle, cman_event_callback); - if (rc < 0) { - crm_err("Couldn't register for cman notifications: %d %d", rc, errno); - goto cman_bail; - } - - /* Get the current membership state */ - cman_event_callback(pcmk_cman_handle, dispatch, CMAN_REASON_STATECHANGE, - cman_is_quorate(pcmk_cman_handle)); - - fd = cman_get_fd(pcmk_cman_handle); - crm_debug("Adding fd=%d to mainloop", fd); - cman_source = G_main_add_fd(G_PRIORITY_HIGH, fd, FALSE, pcmk_cman_dispatch, dispatch, destroy); - - cman_bail: - if (rc < 0) { - cman_finish(pcmk_cman_handle); - return FALSE; - } -# else - crm_err("cman qorum is not supported in this build"); - exit(100); -# endif - return TRUE; -} - -# ifdef SUPPORT_COROSYNC gboolean(*pcmk_cpg_dispatch_fn) (AIS_Message *, char *, int) = NULL; static gboolean @@ -935,9 +509,55 @@ cpg_callbacks_t cpg_callbacks = { .cpg_deliver_fn = pcmk_cpg_deliver, .cpg_confchg_fn = pcmk_cpg_membership, }; -# endif -# ifdef SUPPORT_CS_QUORUM +static gboolean +init_cpg_connection(gboolean(*dispatch) (AIS_Message *, char *, int), void (*destroy) (gpointer), + uint32_t * nodeid) +{ + int rc = -1; + int fd = 0; + int retries = 0; + + strcpy(pcmk_cpg_group.value, crm_system_name); + pcmk_cpg_group.length = strlen(crm_system_name) + 1; + + cs_repeat(retries, 30, rc = cpg_initialize(&pcmk_cpg_handle, &cpg_callbacks)); + if (rc != CS_OK) { + crm_err("Could not connect to the Cluster Process Group API: %d\n", rc); + goto bail; + } + + retries = 0; + cs_repeat(retries, 30, rc = cpg_local_get(pcmk_cpg_handle, (unsigned int *)nodeid)); + if (rc != CS_OK) { + crm_err("Could not get local node id from the CPG API"); + goto bail; + } + + retries = 0; + cs_repeat(retries, 30, rc = cpg_join(pcmk_cpg_handle, &pcmk_cpg_group)); + if (rc != CS_OK) { + crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc); + goto bail; + } + + rc = cpg_fd_get(pcmk_cpg_handle, &fd); + if (rc != CS_OK) { + crm_err("Could not obtain the CPG API connection: %d\n", rc); + goto bail; + } + + crm_debug("Adding fd=%d to mainloop", fd); + cpg_source = G_main_add_fd(G_PRIORITY_HIGH, fd, FALSE, pcmk_cpg_dispatch, dispatch, destroy); + + bail: + if (rc != CS_OK) { + cpg_finalize(pcmk_cpg_handle); + return FALSE; + } + return TRUE; +} + static gboolean pcmk_quorum_dispatch(int sender, gpointer user_data) { @@ -951,8 +571,6 @@ pcmk_quorum_dispatch(int sender, gpointer user_data) return TRUE; } -gboolean(*quorum_app_callback) (unsigned long long seq, gboolean quorate) = NULL; - static void corosync_mark_unseen_peer_dead(gpointer key, gpointer value, gpointer user_data) { @@ -1012,66 +630,10 @@ quorum_callbacks_t quorum_callbacks = { .quorum_notify_fn = pcmk_quorum_notification, }; -# endif - -static gboolean -init_cpg_connection(gboolean(*dispatch) (AIS_Message *, char *, int), void (*destroy) (gpointer), - uint32_t * nodeid) -{ -# ifdef SUPPORT_COROSYNC - int rc = -1; - int fd = 0; - int retries = 0; - - strcpy(pcmk_cpg_group.value, crm_system_name); - pcmk_cpg_group.length = strlen(crm_system_name) + 1; - - cs_repeat(retries, 30, rc = cpg_initialize(&pcmk_cpg_handle, &cpg_callbacks)); - if (rc != CS_OK) { - crm_err("Could not connect to the Cluster Process Group API: %d\n", rc); - goto bail; - } - - retries = 0; - cs_repeat(retries, 30, rc = cpg_local_get(pcmk_cpg_handle, (unsigned int *)nodeid)); - if (rc != CS_OK) { - crm_err("Could not get local node id from the CPG API"); - goto bail; - } - - retries = 0; - cs_repeat(retries, 30, rc = cpg_join(pcmk_cpg_handle, &pcmk_cpg_group)); - if (rc != CS_OK) { - crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc); - goto bail; - } - - rc = cpg_fd_get(pcmk_cpg_handle, &fd); - if (rc != CS_OK) { - crm_err("Could not obtain the CPG API connection: %d\n", rc); - goto bail; - } - - crm_debug("Adding fd=%d to mainloop", fd); - cpg_source = G_main_add_fd(G_PRIORITY_HIGH, fd, FALSE, pcmk_cpg_dispatch, dispatch, destroy); - - bail: - if (rc != CS_OK) { - cpg_finalize(pcmk_cpg_handle); - return FALSE; - } -# else - crm_err("The Corosync CPG API is not supported in this build"); - exit(100); -# endif - return TRUE; -} - gboolean init_quorum_connection(gboolean(*dispatch) (unsigned long long, gboolean), void (*destroy) (gpointer)) { -# ifdef SUPPORT_CS_QUORUM int rc = -1; int fd = 0; int quorate = 0; @@ -1094,6 +656,7 @@ init_quorum_connection(gboolean(*dispatch) (unsigned long long, gboolean), crm_err("Could not obtain the current Quorum API state: %d\n", rc); goto bail; } + crm_notice("Quorum %s", quorate ? "acquired" : "lost"); quorum_app_callback = dispatch; crm_have_quorum = quorate; @@ -1118,87 +681,6 @@ init_quorum_connection(gboolean(*dispatch) (unsigned long long, gboolean), quorum_finalize(pcmk_quorum_handle); return FALSE; } -# else - crm_err("The Corosync quorum API is not supported in this build"); - exit(100); -# endif - return TRUE; -} - -static gboolean -init_ais_connection_classic(gboolean(*dispatch) (AIS_Message *, char *, int), - void (*destroy) (gpointer), char **our_uuid, char **our_uname, - int *nodeid) -{ - int rc; - int pid = 0; - char *pid_s = NULL; - struct utsname name; - - crm_info("Creating connection to our Corosync plugin"); -# if CS_USES_LIBQB - rc = CS_OK; - ais_ipc_handle = qb_ipcc_connect("pacemaker.engine", AIS_IPC_MESSAGE_SIZE); -# else - rc = coroipcc_service_connect(COROSYNC_SOCKET_NAME, PCMK_SERVICE_ID, - AIS_IPC_MESSAGE_SIZE, AIS_IPC_MESSAGE_SIZE, AIS_IPC_MESSAGE_SIZE, - &ais_ipc_handle); -# endif - if (ais_ipc_handle) { -# if CS_USES_LIBQB - qb_ipcc_fd_get(ais_ipc_handle, &ais_fd_async); -# else - coroipcc_fd_get(ais_ipc_handle, &ais_fd_async); -# endif - } else { - crm_info("Connection to our AIS plugin (%d) failed: %s (%d)", - PCMK_SERVICE_ID, strerror(errno), errno); - return FALSE; - } - if (ais_fd_async <= 0 && rc == CS_OK) { - crm_err("No context created, but connection reported 'ok'"); - rc = CS_ERR_LIBRARY; - } - if (rc != CS_OK) { - crm_info("Connection to our AIS plugin (%d) failed: %s (%d)", PCMK_SERVICE_ID, - ais_error2text(rc), rc); - } - - if (rc != CS_OK) { - return FALSE; - } - - if (destroy == NULL) { - destroy = ais_destroy; - } - - if (dispatch) { - crm_debug("Adding fd=%d to mainloop", ais_fd_async); - ais_source = - G_main_add_fd(G_PRIORITY_HIGH, ais_fd_async, FALSE, ais_dispatch, dispatch, destroy); - } - - crm_info("AIS connection established"); - - pid = getpid(); - pid_s = crm_itoa(pid); - send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais); - crm_free(pid_s); - - if (uname(&name) < 0) { - crm_perror(LOG_ERR, "Could not determin the current host"); - exit(100); - } - - get_ais_nodeid(&pcmk_nodeid, &pcmk_uname); - if (safe_str_neq(name.nodename, pcmk_uname)) { - crm_crit("Node name mismatch! OpenAIS supplied %s, our lookup returned %s", - pcmk_uname, name.nodename); - crm_notice - ("Node name mismatches usually occur when assigned automatically by DHCP servers"); - crm_notice("If this node was part of the cluster with a different name," - " you will need to remove the old entry with crm_node --remove"); - } return TRUE; } @@ -1209,16 +691,13 @@ init_ais_connection(gboolean(*dispatch) (AIS_Message *, char *, int), void (*des int retries = 0; while (retries++ < 30) { + IPC_Channel *ch = NULL; int rc = init_ais_connection_once(dispatch, destroy, our_uuid, our_uname, nodeid); switch (rc) { case CS_OK: - if (getenv("HA_mcp")) { - IPC_Channel *ch = init_client_ipc_comms_nodispatch("pcmk"); - - G_main_add_IPC_Channel(G_PRIORITY_HIGH, ch, FALSE, pcmk_proc_dispatch, NULL, - destroy); - } + ch = init_client_ipc_comms_nodispatch("pcmk"); + G_main_add_IPC_Channel(G_PRIORITY_HIGH, ch, FALSE, pcmk_mcp_dispatch, NULL, destroy); return TRUE; break; case CS_ERR_TRY_AGAIN: @@ -1233,74 +712,29 @@ init_ais_connection(gboolean(*dispatch) (AIS_Message *, char *, int), void (*des return FALSE; } -static char * -get_local_node_name(void) -{ - char *name = NULL; - struct utsname res; - - if (is_cman_cluster()) { -# if SUPPORT_CMAN - cman_node_t us; - cman_handle_t cman; - - cman = cman_init(NULL); - if (cman != NULL && cman_is_active(cman)) { - us.cn_name[0] = 0; - cman_get_node(cman, CMAN_NODEID_US, &us); - name = crm_strdup(us.cn_name); - crm_info("Using CMAN node name: %s", name); - - } else { - crm_err("Couldn't determin node name from CMAN"); - } - - cman_finish(cman); -# endif - - } else if (uname(&res) < 0) { - crm_perror(LOG_ERR, "Could not determin the current host"); - exit(100); - - } else { - name = crm_strdup(res.nodename); - } - return name; -} - -extern int set_cluster_type(enum cluster_type_e type); - gboolean init_ais_connection_once(gboolean(*dispatch) (AIS_Message *, char *, int), void (*destroy) (gpointer), char **our_uuid, char **our_uname, int *nodeid) { + struct utsname res; enum cluster_type_e stack = get_cluster_type(); crm_peer_init(); /* Here we just initialize comms */ - switch (stack) { - case pcmk_cluster_classic_ais: - if (init_ais_connection_classic(dispatch, destroy, our_uuid, &pcmk_uname, nodeid) == - FALSE) { - return FALSE; - } - break; - case pcmk_cluster_cman: - case pcmk_cluster_corosync: - if (init_cpg_connection(dispatch, destroy, &pcmk_nodeid) == FALSE) { - return FALSE; - } - pcmk_uname = get_local_node_name(); - break; - case pcmk_cluster_heartbeat: - crm_info("Could not find an active corosync based cluster"); - return FALSE; - break; - default: - crm_err("Invalid cluster type: %s (%d)", name_for_cluster_type(stack), stack); - return FALSE; - break; + if(stack != pcmk_cluster_corosync) { + crm_err("Invalid cluster type: %s (%d)", name_for_cluster_type(stack), stack); + return FALSE; + } + + if (init_cpg_connection(dispatch, destroy, &pcmk_nodeid) == FALSE) { + return FALSE; + } else if (uname(&res) < 0) { + crm_perror(LOG_ERR, "Could not determin the current host"); + exit(100); + + } else { + pcmk_uname = crm_strdup(res.nodename); } crm_info("Connection to '%s': established", name_for_cluster_type(stack)); @@ -1397,150 +831,7 @@ check_message_sanity(const AIS_Message * msg, const char *data) return sane; } -#endif - -#if HAVE_CONFDB -static int -get_config_opt(confdb_handle_t config, - hdb_handle_t object_handle, const char *key, char **value, const char *fallback) -{ - size_t len = 0; - char *env_key = NULL; - const char *env_value = NULL; - char buffer[256]; - - if (*value) { - crm_free(*value); - *value = NULL; - } - - if (object_handle > 0) { - if (CS_OK == confdb_key_get(config, object_handle, key, strlen(key), &buffer, &len)) { - *value = crm_strdup(buffer); - } - } - - if (*value) { - crm_info("Found '%s' for option: %s", *value, key); - return 0; - } - - env_key = crm_concat("HA", key, '_'); - env_value = getenv(env_key); - crm_free(env_key); - - if (*value) { - crm_info("Found '%s' in ENV for option: %s", *value, key); - *value = crm_strdup(env_value); - return 0; - } - - if (fallback) { - crm_info("Defaulting to '%s' for option: %s", fallback, key); - *value = crm_strdup(fallback); - - } else { - crm_info("No default for option: %s", key); - } - - return -1; -} - -static confdb_handle_t -config_find_init(confdb_handle_t config) -{ - cs_error_t rc = CS_OK; - confdb_handle_t local_handle = OBJECT_PARENT_HANDLE; - - rc = confdb_object_find_start(config, local_handle); - if (rc == CS_OK) { - return local_handle; - } else { - crm_err("Couldn't create search context: %d", rc); - } - return 0; -} - -static hdb_handle_t -config_find_next(confdb_handle_t config, const char *name, confdb_handle_t top_handle) -{ - cs_error_t rc = CS_OK; - hdb_handle_t local_handle = 0; - - if (top_handle == 0) { - crm_err("Couldn't search for %s: no valid context", name); - return 0; - } - - crm_trace("Searching for %s in " HDB_X_FORMAT, name, top_handle); - rc = confdb_object_find(config, top_handle, name, strlen(name), &local_handle); - if (rc != CS_OK) { - crm_info("No additional configuration supplied for: %s", name); - local_handle = 0; - } else { - crm_info("Processing additional %s options...", name); - } - return local_handle; -} - -enum cluster_type_e -find_corosync_variant(void) -{ - confdb_handle_t config; - enum cluster_type_e found = pcmk_cluster_unknown; - - int rc; - char *value = NULL; - confdb_handle_t top_handle = 0; - hdb_handle_t local_handle = 0; - static confdb_callbacks_t callbacks = { }; - - rc = confdb_initialize(&config, &callbacks); - if (rc != CS_OK) { - crm_debug("Could not initialize Cluster Configuration Database API instance error %d", rc); - return found; - } - - top_handle = config_find_init(config); - local_handle = config_find_next(config, "service", top_handle); - while (local_handle) { - crm_free(value); - get_config_opt(config, local_handle, "name", &value, NULL); - if (safe_str_eq("pacemaker", value)) { - found = pcmk_cluster_classic_ais; - - crm_free(value); - get_config_opt(config, local_handle, "ver", &value, "0"); - crm_trace("Found Pacemaker plugin version: %s", value); - break; - } - - local_handle = config_find_next(config, "service", top_handle); - } - crm_free(value); - - if (found == pcmk_cluster_unknown) { - top_handle = config_find_init(config); - local_handle = config_find_next(config, "quorum", top_handle); - get_config_opt(config, local_handle, "provider", &value, NULL); - - if (safe_str_eq("quorum_cman", value)) { - crm_trace("Found CMAN quorum provider"); - found = pcmk_cluster_cman; - } - } - crm_free(value); - - if (found == pcmk_cluster_unknown) { - crm_trace("Defaulting to a 'bare' corosync cluster"); - found = pcmk_cluster_corosync; - } - - confdb_finalize(config); - return found; -} -#else enum cluster_type_e find_corosync_variant(void) { @@ -1557,4 +848,3 @@ find_corosync_variant(void) cmap_finalize(handle); return pcmk_cluster_corosync; } -#endif diff --git a/lib/cluster/legacy.c b/lib/cluster/legacy.c new file mode 100644 index 0000000000..3baf44ccf7 --- /dev/null +++ b/lib/cluster/legacy.c @@ -0,0 +1,1373 @@ +/* + * Copyright (C) 2004 Andrew Beekhof + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include +#include +#include +#include "stack.h" +#if SUPPORT_COROSYNC +# include +# include +# include +cpg_handle_t pcmk_cpg_handle = 0; + +struct cpg_name pcmk_cpg_group = { + .length = 0, + .value[0] = 0, +}; +#endif + +#if HAVE_CMAP +# include +#endif + +#if SUPPORT_CMAN +# include +cman_handle_t pcmk_cman_handle = NULL; +#endif + +static char *pcmk_uname = NULL; +static int pcmk_uname_len = 0; +static uint32_t pcmk_nodeid = 0; +int ais_membership_timer = 0; +gboolean ais_membership_force = FALSE; + +#define cs_repeat(counter, max, code) do { \ + code; \ + if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \ + counter++; \ + crm_debug("Retrying operation after %ds", counter); \ + sleep(counter); \ + } else { \ + break; \ + } \ + } while(counter < max) + +enum crm_ais_msg_types +text2msg_type(const char *text) +{ + int type = crm_msg_none; + + CRM_CHECK(text != NULL, return type); + if (safe_str_eq(text, "ais")) { + type = crm_msg_ais; + } else if (safe_str_eq(text, "crm_plugin")) { + type = crm_msg_ais; + } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) { + type = crm_msg_cib; + } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) { + type = crm_msg_crmd; + } else if (safe_str_eq(text, CRM_SYSTEM_DC)) { + type = crm_msg_crmd; + } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) { + type = crm_msg_te; + } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) { + type = crm_msg_pe; + } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) { + type = crm_msg_lrmd; + } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) { + type = crm_msg_stonithd; + } else if (safe_str_eq(text, "stonith-ng")) { + type = crm_msg_stonith_ng; + } else if (safe_str_eq(text, "attrd")) { + type = crm_msg_attrd; + + } else { + /* This will normally be a transient client rather than + * a cluster daemon. Set the type to the pid of the client + */ + int scan_rc = sscanf(text, "%d", &type); + + if (scan_rc != 1) { + /* Ensure its sane */ + type = crm_msg_none; + } + } + return type; +} + +char * +get_ais_data(const AIS_Message * msg) +{ + int rc = BZ_OK; + char *uncompressed = NULL; + unsigned int new_size = msg->size + 1; + + if (msg->is_compressed == FALSE) { + crm_trace("Returning uncompressed message data"); + uncompressed = strdup(msg->data); + + } else { + crm_trace("Decompressing message data"); + crm_malloc0(uncompressed, new_size); + + rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, (char *)msg->data, + msg->compressed_size, 1, 0); + + CRM_ASSERT(rc == BZ_OK); + CRM_ASSERT(new_size == msg->size); + } + + return uncompressed; +} + +#if SUPPORT_COROSYNC +int ais_fd_sync = -1; +int ais_fd_async = -1; /* never send messages via this channel */ +void *ais_ipc_ctx = NULL; + +hdb_handle_t ais_ipc_handle = 0; +GFDSource *ais_source = NULL; +GFDSource *ais_source_sync = NULL; +GFDSource *cman_source = NULL; +GFDSource *cpg_source = NULL; +static char *ais_cluster_name = NULL; + +gboolean +get_ais_nodeid(uint32_t * id, char **uname) +{ + struct iovec iov; + int retries = 0; + int rc = CS_OK; + cs_ipc_header_response_t header; + struct crm_ais_nodeid_resp_s answer; + + header.error = CS_OK; + header.id = crm_class_nodeid; + header.size = sizeof(cs_ipc_header_response_t); + + CRM_CHECK(id != NULL, return FALSE); + CRM_CHECK(uname != NULL, return FALSE); + + iov.iov_base = &header; + iov.iov_len = header.size; + + retry: + errno = 0; + rc = coroipcc_msg_send_reply_receive(ais_ipc_handle, &iov, 1, &answer, sizeof(answer)); + if (rc == CS_OK) { + CRM_CHECK(answer.header.size == sizeof(struct crm_ais_nodeid_resp_s), + crm_err("Odd message: id=%d, size=%d, error=%d", + answer.header.id, answer.header.size, answer.header.error)); + CRM_CHECK(answer.header.id == crm_class_nodeid, + crm_err("Bad response id: %d", answer.header.id)); + } + + if ((rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) && retries < 20) { + retries++; + crm_info("Peer overloaded: Re-sending message (Attempt %d of 20)", retries); + sleep(retries); /* Proportional back off */ + goto retry; + } + + if (rc != CS_OK) { + crm_err("Sending nodeid request: FAILED (rc=%d): %s", rc, ais_error2text(rc)); + return FALSE; + + } else if (answer.header.error != CS_OK) { + crm_err("Bad response from peer: (rc=%d): %s", rc, ais_error2text(rc)); + return FALSE; + } + + crm_info("Server details: id=%u uname=%s cname=%s", answer.id, answer.uname, answer.cname); + + *id = answer.id; + *uname = crm_strdup(answer.uname); + ais_cluster_name = crm_strdup(answer.cname); + + return TRUE; +} + +gboolean +crm_get_cluster_name(char **cname) +{ + CRM_CHECK(cname != NULL, return FALSE); + if (ais_cluster_name) { + *cname = crm_strdup(ais_cluster_name); + return TRUE; + } + return FALSE; +} + +gboolean +send_ais_text(int class, const char *data, + gboolean local, const char *node, enum crm_ais_msg_types dest) +{ + static int msg_id = 0; + static int local_pid = 0; + enum cluster_type_e cluster_type = get_cluster_type(); + + int retries = 0; + int rc = CS_OK; + int buf_len = sizeof(cs_ipc_header_response_t); + + char *buf = NULL; + struct iovec iov; + const char *transport = "pcmk"; + cs_ipc_header_response_t *header = NULL; + AIS_Message *ais_msg = NULL; + enum crm_ais_msg_types sender = text2msg_type(crm_system_name); + + /* There are only 6 handlers registered to crm_lib_service in plugin.c */ + CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class); return FALSE); + + if (data == NULL) { + data = ""; + } + + if (local_pid == 0) { + local_pid = getpid(); + } + + if (sender == crm_msg_none) { + sender = local_pid; + } + + crm_malloc0(ais_msg, sizeof(AIS_Message)); + + ais_msg->id = msg_id++; + ais_msg->header.id = class; + ais_msg->header.error = CS_OK; + + ais_msg->host.type = dest; + ais_msg->host.local = local; + if (node) { + ais_msg->host.size = strlen(node); + memset(ais_msg->host.uname, 0, MAX_NAME); + memcpy(ais_msg->host.uname, node, ais_msg->host.size); + ais_msg->host.id = 0; + + } else { + ais_msg->host.size = 0; + memset(ais_msg->host.uname, 0, MAX_NAME); + ais_msg->host.id = 0; + } + + ais_msg->sender.id = 0; + ais_msg->sender.type = sender; + ais_msg->sender.pid = local_pid; + ais_msg->sender.size = pcmk_uname_len; + memset(ais_msg->sender.uname, 0, MAX_NAME); + memcpy(ais_msg->sender.uname, pcmk_uname, ais_msg->sender.size); + + ais_msg->size = 1 + strlen(data); + + if (ais_msg->size < CRM_BZ2_THRESHOLD) { + failback: + crm_realloc(ais_msg, sizeof(AIS_Message) + ais_msg->size); + memcpy(ais_msg->data, data, ais_msg->size); + + } else { + char *compressed = NULL; + char *uncompressed = crm_strdup(data); + unsigned int len = (ais_msg->size * 1.1) + 600; /* recomended size */ + + crm_trace("Compressing message payload"); + crm_malloc(compressed, len); + + rc = BZ2_bzBuffToBuffCompress(compressed, &len, uncompressed, ais_msg->size, CRM_BZ2_BLOCKS, + 0, CRM_BZ2_WORK); + + crm_free(uncompressed); + + if (rc != BZ_OK) { + crm_err("Compression failed: %d", rc); + crm_free(compressed); + goto failback; + } + + crm_realloc(ais_msg, sizeof(AIS_Message) + len + 1); + memcpy(ais_msg->data, compressed, len); + ais_msg->data[len] = 0; + crm_free(compressed); + + ais_msg->is_compressed = TRUE; + ais_msg->compressed_size = len; + + crm_trace("Compression details: %d -> %d", ais_msg->size, ais_data_len(ais_msg)); + } + + ais_msg->header.size = sizeof(AIS_Message) + ais_data_len(ais_msg); + + crm_trace("Sending%s message %d to %s.%s (data=%d, total=%d)", + ais_msg->is_compressed ? " compressed" : "", + ais_msg->id, ais_dest(&(ais_msg->host)), msg_type2text(dest), + ais_data_len(ais_msg), ais_msg->header.size); + + iov.iov_base = ais_msg; + iov.iov_len = ais_msg->header.size; + crm_realloc(buf, buf_len); + + do { + if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { + retries++; + crm_info("Peer overloaded or membership in flux:" + " Re-sending message (Attempt %d of 20)", retries); + sleep(retries); /* Proportional back off */ + } + + errno = 0; + switch (cluster_type) { + case pcmk_cluster_classic_ais: + rc = coroipcc_msg_send_reply_receive(ais_ipc_handle, &iov, 1, buf, buf_len); + header = (cs_ipc_header_response_t *) buf; + if (rc == CS_OK) { + CRM_CHECK(header->size == sizeof(cs_ipc_header_response_t), + crm_err("Odd message: id=%d, size=%d, class=%d, error=%d", + header->id, header->size, class, header->error)); + + CRM_ASSERT(buf_len >= header->size); + CRM_CHECK(header->id == CRM_MESSAGE_IPC_ACK, + crm_err("Bad response id (%d) for request (%d)", header->id, + ais_msg->header.id)); + CRM_CHECK(header->error == CS_OK, rc = header->error); + } + break; + + case pcmk_cluster_cman: + transport = "cpg"; + CRM_CHECK(dest != crm_msg_ais, rc = CS_ERR_MESSAGE_ERROR; goto bail); + rc = cpg_mcast_joined(pcmk_cpg_handle, CPG_TYPE_AGREED, &iov, 1); + if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { + cpg_flow_control_state_t fc_state = CPG_FLOW_CONTROL_DISABLED; + int rc2 = cpg_flow_control_state_get(pcmk_cpg_handle, &fc_state); + + if (rc2 == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) { + crm_warn("Connection overloaded, cannot send messages"); + goto bail; + + } else if (rc2 != CS_OK) { + crm_warn("Could not determin the connection state: %s (%d)", + ais_error2text(rc2), rc2); + goto bail; + } + } + break; + + case pcmk_cluster_unknown: + case pcmk_cluster_invalid: + case pcmk_cluster_heartbeat: + CRM_ASSERT(is_openais_cluster()); + break; + } + + } while ((rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) && retries < 20); + + bail: + if (rc != CS_OK) { + crm_perror(LOG_ERR, "Sending message %d via %s: FAILED (rc=%d): %s", + ais_msg->id, transport, rc, ais_error2text(rc)); + + } else { + crm_trace("Message %d: sent", ais_msg->id); + } + + crm_free(buf); + crm_free(ais_msg); + return (rc == CS_OK); +} + +gboolean +send_ais_message(xmlNode * msg, gboolean local, const char *node, enum crm_ais_msg_types dest) +{ + gboolean rc = TRUE; + char *data = NULL; + + if (is_classic_ais_cluster()) { + if (ais_fd_async < 0 || ais_source == NULL) { + crm_err("Not connected to AIS: %d %p", ais_fd_async, ais_source); + return FALSE; + } + } + + data = dump_xml_unformatted(msg); + rc = send_ais_text(crm_class_cluster, data, local, node, dest); + crm_free(data); + return rc; +} + +void +terminate_ais_connection(void) +{ + crm_notice("Disconnecting from Corosync"); + +/* G_main_del_fd(ais_source); */ +/* G_main_del_fd(ais_source_sync); */ + + if (is_classic_ais_cluster()) { + if(ais_ipc_handle) { + crm_trace("Disconnecting plugin"); + coroipcc_service_disconnect(ais_ipc_handle); + ais_ipc_handle = 0; + } else { + crm_info("No plugin connection"); + } + + } else { + if(pcmk_cpg_handle) { + crm_trace("Disconnecting CPG"); + cpg_leave(pcmk_cpg_handle, &pcmk_cpg_group); + cpg_finalize(pcmk_cpg_handle); + pcmk_cpg_handle = 0; + + } else { + crm_info("No CPG connection"); + } + } + +# if SUPPORT_CMAN + if (is_cman_cluster()) { + if(pcmk_cman_handle) { + crm_trace("Disconnecting cman"); + cman_stop_notification(pcmk_cman_handle); + cman_finish(pcmk_cman_handle); + + } else { + crm_info("No cman connection"); + } + } +# endif + ais_fd_async = -1; + ais_fd_sync = -1; +} + + +static crm_node_t * +crm_update_ais_node(xmlNode * member, long long seq) +{ + const char *id_s = crm_element_value(member, "id"); + const char *addr = crm_element_value(member, "addr"); + const char *uname = crm_element_value(member, "uname"); + const char *state = crm_element_value(member, "state"); + const char *born_s = crm_element_value(member, "born"); + const char *seen_s = crm_element_value(member, "seen"); + const char *votes_s = crm_element_value(member, "votes"); + const char *procs_s = crm_element_value(member, "processes"); + + int votes = crm_int_helper(votes_s, NULL); + unsigned int id = crm_int_helper(id_s, NULL); + unsigned int procs = crm_int_helper(procs_s, NULL); + + /* TODO: These values will contain garbage if version < 0.7.1 */ + uint64_t born = crm_int_helper(born_s, NULL); + uint64_t seen = crm_int_helper(seen_s, NULL); + + return crm_update_peer(id, born, seen, votes, procs, uname, uname, addr, state); +} + +static crm_node_t * +crm_update_cman_node(xmlNode * member, long long seq) +{ + const char *id_s = crm_element_value(member, "id"); + const char *uname = crm_element_value(member, "uname"); + const char *procs_s = crm_element_value(member, "processes"); + + unsigned int id = crm_int_helper(id_s, NULL); + unsigned int procs = crm_int_helper(procs_s, NULL); + + crm_info("Updating peer processes for %s", crm_str(uname)); + return crm_update_peer(id, 0, 0, 0, procs, uname, uname, NULL, NULL); +} + + +static gboolean +ais_dispatch_message(AIS_Message * msg, gboolean(*dispatch) (AIS_Message *, char *, int)) +{ + char *data = NULL; + char *uncompressed = NULL; + + xmlNode *xml = NULL; + + CRM_ASSERT(msg != NULL); + + crm_trace("Got new%s message (size=%d, %d, %d)", + msg->is_compressed ? " compressed" : "", + ais_data_len(msg), msg->size, msg->compressed_size); + + data = msg->data; + if (msg->is_compressed && msg->size > 0) { + int rc = BZ_OK; + unsigned int new_size = msg->size + 1; + + if (check_message_sanity(msg, NULL) == FALSE) { + goto badmsg; + } + + crm_trace("Decompressing message data"); + crm_malloc0(uncompressed, new_size); + rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, data, msg->compressed_size, 1, 0); + + if (rc != BZ_OK) { + crm_err("Decompression failed: %d", rc); + goto badmsg; + } + + CRM_ASSERT(rc == BZ_OK); + CRM_ASSERT(new_size == msg->size); + + data = uncompressed; + + } else if (check_message_sanity(msg, data) == FALSE) { + goto badmsg; + + } else if (safe_str_eq("identify", data)) { + int pid = getpid(); + char *pid_s = crm_itoa(pid); + + send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais); + crm_free(pid_s); + goto done; + } + + if (msg->header.id != crm_class_members) { + crm_update_peer(msg->sender.id, 0, 0, 0, 0, msg->sender.uname, msg->sender.uname, NULL, + NULL); + } + + if (msg->header.id == crm_class_rmpeer) { + uint32_t id = crm_int_helper(data, NULL); + + crm_info("Removing peer %s/%u", data, id); + reap_crm_member(id); + goto done; + + } else if (msg->header.id == crm_class_members || msg->header.id == crm_class_quorum) { + + xml = string2xml(data); + if (xml == NULL) { + crm_err("Invalid membership update: %s", data); + goto badmsg; + } + + if (is_classic_ais_cluster() == FALSE) { + xmlNode *node = NULL; + + for (node = __xml_first_child(xml); node != NULL; node = __xml_next(node)) { + crm_update_cman_node(node, crm_peer_seq); + } + + } else { + xmlNode *node = NULL; + const char *value = NULL; + gboolean quorate = FALSE; + + value = crm_element_value(xml, "quorate"); + CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No quorum value:"); goto badmsg); + if (crm_is_true(value)) { + quorate = TRUE; + } + + value = crm_element_value(xml, "id"); + CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No membership id"); goto badmsg); + crm_peer_seq = crm_int_helper(value, NULL); + + if (quorate != crm_have_quorum) { + crm_notice("Membership %s: quorum %s", value, quorate ? "acquired" : "lost"); + crm_have_quorum = quorate; + + } else { + crm_info("Membership %s: quorum %s", value, quorate ? "retained" : "still lost"); + } + + for (node = __xml_first_child(xml); node != NULL; node = __xml_next(node)) { + crm_update_ais_node(node, crm_peer_seq); + } + } + } + + crm_trace("Payload: %s", data); + if (dispatch != NULL) { + dispatch(msg, data, 0); + } + + done: + crm_free(uncompressed); + free_xml(xml); + return TRUE; + + badmsg: + crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):" + " min=%d, total=%d, size=%d, bz2_size=%d", + msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type), + ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), + msg->sender.pid, (int)sizeof(AIS_Message), + msg->header.size, msg->size, msg->compressed_size); + goto done; +} + +gboolean +ais_dispatch(int sender, gpointer user_data) +{ + int rc = CS_OK; + gboolean good = TRUE; + + gboolean(*dispatch) (AIS_Message *, char *, int) = user_data; + + do { + char *buffer = NULL; + + rc = coroipcc_dispatch_get(ais_ipc_handle, (void **)&buffer, 0); + if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { + return TRUE; + } + if (rc != CS_OK) { + crm_perror(LOG_ERR, "Receiving message body failed: (%d) %s", rc, ais_error2text(rc)); + goto bail; + } + if (buffer == NULL) { + /* NULL is a legal "no message afterall" value */ + return TRUE; + } + good = ais_dispatch_message((AIS_Message *) buffer, dispatch); + coroipcc_dispatch_put(ais_ipc_handle); + + } while (good && ais_ipc_handle); + + return good; + + bail: + if(ais_ipc_handle) { + crm_err("AIS connection failed: %p", (void*)ais_ipc_handle); + } + return FALSE; +} + +static void +ais_destroy(gpointer user_data) +{ + crm_err("AIS connection terminated"); + ais_fd_sync = -1; + exit(1); +} + +static gboolean +pcmk_proc_dispatch(IPC_Channel * ch, gpointer user_data) +{ + xmlNode *msg = NULL; + gboolean stay_connected = TRUE; + + while (IPC_ISRCONN(ch)) { + if (ch->ops->is_message_pending(ch) == 0) { + break; + } + + msg = xmlfromIPC(ch, MAX_IPC_DELAY); + + if (msg) { + xmlNode *node = NULL; + + for (node = __xml_first_child(msg); node != NULL; node = __xml_next(node)) { + int id = 0; + int children = 0; + const char *uname = crm_element_value(node, "uname"); + + crm_element_value_int(node, "id", &id); + crm_element_value_int(node, "processes", &children); + if (id == 0) { + crm_log_xml_err(msg, "Bad Update"); + } else { + crm_update_peer(id, 0, 0, 0, children, NULL, uname, NULL, NULL); + } + } + free_xml(msg); + } + + if (ch->ch_status != IPC_CONNECT) { + break; + } + } + + if (ch->ch_status != IPC_CONNECT) { + stay_connected = FALSE; + } + return stay_connected; +} + +# if SUPPORT_CMAN + +static gboolean +pcmk_cman_dispatch(int sender, gpointer user_data) +{ + int rc = cman_dispatch(pcmk_cman_handle, CMAN_DISPATCH_ALL); + + if (rc < 0) { + crm_err("Connection to cman failed: %d", rc); + return FALSE; + } + return TRUE; +} + +# define MAX_NODES 256 + +static void +cman_event_callback(cman_handle_t handle, void *privdata, int reason, int arg) +{ + int rc = 0, lpc = 0, node_count = 0; + + cman_cluster_t cluster; + static cman_node_t cman_nodes[MAX_NODES]; + + gboolean(*dispatch) (unsigned long long, gboolean) = privdata; + + switch (reason) { + case CMAN_REASON_STATECHANGE: + + memset(&cluster, 0, sizeof(cluster)); + rc = cman_get_cluster(pcmk_cman_handle, &cluster); + if (rc < 0) { + crm_err("Couldn't query cman cluster details: %d %d", rc, errno); + return; + } + + crm_peer_seq = cluster.ci_generation; + if (arg != crm_have_quorum) { + crm_notice("Membership %llu: quorum %s", crm_peer_seq, arg ? "acquired" : "lost"); + crm_have_quorum = arg; + + } else { + crm_info("Membership %llu: quorum %s", crm_peer_seq, + arg ? "retained" : "still lost"); + } + + rc = cman_get_nodes(pcmk_cman_handle, MAX_NODES, &node_count, cman_nodes); + if (rc < 0) { + crm_err("Couldn't query cman node list: %d %d", rc, errno); + return; + } + + for (lpc = 0; lpc < node_count; lpc++) { + if (cman_nodes[lpc].cn_nodeid == 0) { + /* Never allow node ID 0 to be considered a member #315711 */ + cman_nodes[lpc].cn_member = 0; + } + crm_update_peer(cman_nodes[lpc].cn_nodeid, cman_nodes[lpc].cn_incarnation, + cman_nodes[lpc].cn_member ? crm_peer_seq : 0, 0, 0, + cman_nodes[lpc].cn_name, cman_nodes[lpc].cn_name, NULL, + cman_nodes[lpc].cn_member ? CRM_NODE_MEMBER : CRM_NODE_LOST); + } + + if (dispatch) { + dispatch(crm_peer_seq, crm_have_quorum); + } + break; + + case CMAN_REASON_TRY_SHUTDOWN: + /* Always reply with a negative - pacemaker needs to be stopped first */ + crm_info("CMAN wants to shut down: %s", arg ? "forced" : "optional"); + cman_replyto_shutdown(pcmk_cman_handle, 0); + break; + + case CMAN_REASON_CONFIG_UPDATE: + /* Ignore */ + break; + } +} +# endif + +gboolean +init_cman_connection(gboolean(*dispatch) (unsigned long long, gboolean), void (*destroy) (gpointer)) +{ +# if SUPPORT_CMAN + int rc = -1, fd = -1; + cman_cluster_t cluster; + + crm_info("Configuring Pacemaker to obtain quorum from cman"); + + memset(&cluster, 0, sizeof(cluster)); + + pcmk_cman_handle = cman_init(dispatch); + if (pcmk_cman_handle == NULL || cman_is_active(pcmk_cman_handle) == FALSE) { + crm_err("Couldn't connect to cman"); + goto cman_bail; + } + + rc = cman_get_cluster(pcmk_cman_handle, &cluster); + if (rc < 0) { + crm_err("Couldn't query cman cluster details: %d %d", rc, errno); + goto cman_bail; + } + ais_cluster_name = crm_strdup(cluster.ci_name); + + rc = cman_start_notification(pcmk_cman_handle, cman_event_callback); + if (rc < 0) { + crm_err("Couldn't register for cman notifications: %d %d", rc, errno); + goto cman_bail; + } + + /* Get the current membership state */ + cman_event_callback(pcmk_cman_handle, dispatch, CMAN_REASON_STATECHANGE, + cman_is_quorate(pcmk_cman_handle)); + + fd = cman_get_fd(pcmk_cman_handle); + crm_debug("Adding fd=%d to mainloop", fd); + cman_source = G_main_add_fd(G_PRIORITY_HIGH, fd, FALSE, pcmk_cman_dispatch, dispatch, destroy); + + cman_bail: + if (rc < 0) { + cman_finish(pcmk_cman_handle); + return FALSE; + } +# else + crm_err("cman qorum is not supported in this build"); + exit(100); +# endif + return TRUE; +} + +# ifdef SUPPORT_COROSYNC +gboolean(*pcmk_cpg_dispatch_fn) (AIS_Message *, char *, int) = NULL; + +static gboolean +pcmk_cpg_dispatch(int sender, gpointer user_data) +{ + int rc = 0; + + pcmk_cpg_dispatch_fn = user_data; + rc = cpg_dispatch(pcmk_cpg_handle, CS_DISPATCH_ALL); + if (rc != CS_OK) { + crm_err("Connection to the CPG API failed: %d", rc); + return FALSE; + } + return TRUE; +} + +static void +pcmk_cpg_deliver(cpg_handle_t handle, + const struct cpg_name *groupName, + uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) +{ + AIS_Message *ais_msg = (AIS_Message *) msg; + + if (ais_msg->sender.id > 0 && ais_msg->sender.id != nodeid) { + crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, ais_msg->sender.id); + return; + + } else if (ais_msg->host.size != 0 && safe_str_neq(ais_msg->host.uname, pcmk_uname)) { + /* Not for us */ + return; + } + + ais_msg->sender.id = nodeid; + if (ais_msg->sender.size == 0) { + crm_node_t *peer = crm_get_peer(nodeid, NULL); + + if (peer == NULL) { + crm_err("Peer with nodeid=%u is unknown", nodeid); + + } else if (peer->uname == NULL) { + crm_err("No uname for peer with nodeid=%u", nodeid); + + } else { + crm_notice("Fixing uname for peer with nodeid=%u", nodeid); + ais_msg->sender.size = strlen(peer->uname); + memset(ais_msg->sender.uname, 0, MAX_NAME); + memcpy(ais_msg->sender.uname, peer->uname, ais_msg->sender.size); + } + } + + ais_dispatch_message(ais_msg, pcmk_cpg_dispatch_fn); +} + +static void +pcmk_cpg_membership(cpg_handle_t handle, + const struct cpg_name *groupName, + const struct cpg_address *member_list, size_t member_list_entries, + const struct cpg_address *left_list, size_t left_list_entries, + const struct cpg_address *joined_list, size_t joined_list_entries) +{ + int i; + + for (i = 0; i < member_list_entries; i++) { + crm_debug("Member[%d] %d ", i, member_list[i].nodeid); + } + + for (i = 0; i < left_list_entries; i++) { + crm_debug("Left[%d] %d ", i, left_list[i].nodeid); + } +} + +cpg_callbacks_t cpg_callbacks = { + .cpg_deliver_fn = pcmk_cpg_deliver, + .cpg_confchg_fn = pcmk_cpg_membership, +}; +# endif + +static gboolean +init_cpg_connection(gboolean(*dispatch) (AIS_Message *, char *, int), void (*destroy) (gpointer), + uint32_t * nodeid) +{ +# ifdef SUPPORT_COROSYNC + int rc = -1; + int fd = 0; + int retries = 0; + + strcpy(pcmk_cpg_group.value, crm_system_name); + pcmk_cpg_group.length = strlen(crm_system_name) + 1; + + cs_repeat(retries, 30, rc = cpg_initialize(&pcmk_cpg_handle, &cpg_callbacks)); + if (rc != CS_OK) { + crm_err("Could not connect to the Cluster Process Group API: %d\n", rc); + goto bail; + } + + retries = 0; + cs_repeat(retries, 30, rc = cpg_local_get(pcmk_cpg_handle, (unsigned int *)nodeid)); + if (rc != CS_OK) { + crm_err("Could not get local node id from the CPG API"); + goto bail; + } + + retries = 0; + cs_repeat(retries, 30, rc = cpg_join(pcmk_cpg_handle, &pcmk_cpg_group)); + if (rc != CS_OK) { + crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc); + goto bail; + } + + rc = cpg_fd_get(pcmk_cpg_handle, &fd); + if (rc != CS_OK) { + crm_err("Could not obtain the CPG API connection: %d\n", rc); + goto bail; + } + + crm_debug("Adding fd=%d to mainloop", fd); + cpg_source = G_main_add_fd(G_PRIORITY_HIGH, fd, FALSE, pcmk_cpg_dispatch, dispatch, destroy); + + bail: + if (rc != CS_OK) { + cpg_finalize(pcmk_cpg_handle); + return FALSE; + } +# else + crm_err("The Corosync CPG API is not supported in this build"); + exit(100); +# endif + return TRUE; +} + +gboolean +init_quorum_connection(gboolean(*dispatch) (unsigned long long, gboolean), + void (*destroy) (gpointer)) +{ + crm_err("The Corosync quorum API is not supported in this build"); + exit(100); + return TRUE; +} + +static gboolean +init_ais_connection_classic(gboolean(*dispatch) (AIS_Message *, char *, int), + void (*destroy) (gpointer), char **our_uuid, char **our_uname, + int *nodeid) +{ + int rc; + int pid = 0; + char *pid_s = NULL; + struct utsname name; + + crm_info("Creating connection to our Corosync plugin"); + rc = coroipcc_service_connect(COROSYNC_SOCKET_NAME, PCMK_SERVICE_ID, + AIS_IPC_MESSAGE_SIZE, AIS_IPC_MESSAGE_SIZE, AIS_IPC_MESSAGE_SIZE, + &ais_ipc_handle); + if (ais_ipc_handle) { + coroipcc_fd_get(ais_ipc_handle, &ais_fd_async); + } else { + crm_info("Connection to our AIS plugin (%d) failed: %s (%d)", + PCMK_SERVICE_ID, strerror(errno), errno); + return FALSE; + } + if (ais_fd_async <= 0 && rc == CS_OK) { + crm_err("No context created, but connection reported 'ok'"); + rc = CS_ERR_LIBRARY; + } + if (rc != CS_OK) { + crm_info("Connection to our AIS plugin (%d) failed: %s (%d)", PCMK_SERVICE_ID, + ais_error2text(rc), rc); + } + + if (rc != CS_OK) { + return FALSE; + } + + if (destroy == NULL) { + destroy = ais_destroy; + } + + if (dispatch) { + crm_debug("Adding fd=%d to mainloop", ais_fd_async); + ais_source = + G_main_add_fd(G_PRIORITY_HIGH, ais_fd_async, FALSE, ais_dispatch, dispatch, destroy); + } + + crm_info("AIS connection established"); + + pid = getpid(); + pid_s = crm_itoa(pid); + send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais); + crm_free(pid_s); + + if (uname(&name) < 0) { + crm_perror(LOG_ERR, "Could not determin the current host"); + exit(100); + } + + get_ais_nodeid(&pcmk_nodeid, &pcmk_uname); + if (safe_str_neq(name.nodename, pcmk_uname)) { + crm_crit("Node name mismatch! OpenAIS supplied %s, our lookup returned %s", + pcmk_uname, name.nodename); + crm_notice + ("Node name mismatches usually occur when assigned automatically by DHCP servers"); + crm_notice("If this node was part of the cluster with a different name," + " you will need to remove the old entry with crm_node --remove"); + } + return TRUE; +} + +gboolean +init_ais_connection(gboolean(*dispatch) (AIS_Message *, char *, int), void (*destroy) (gpointer), + char **our_uuid, char **our_uname, int *nodeid) +{ + int retries = 0; + + while (retries++ < 30) { + int rc = init_ais_connection_once(dispatch, destroy, our_uuid, our_uname, nodeid); + + switch (rc) { + case CS_OK: + if (getenv("HA_mcp")) { + IPC_Channel *ch = init_client_ipc_comms_nodispatch("pcmk"); + + G_main_add_IPC_Channel(G_PRIORITY_HIGH, ch, FALSE, pcmk_proc_dispatch, NULL, + destroy); + } + return TRUE; + break; + case CS_ERR_TRY_AGAIN: + case CS_ERR_QUEUE_FULL: + break; + default: + return FALSE; + } + } + + crm_err("Retry count exceeded: %d", retries); + return FALSE; +} + +static char * +get_local_node_name(void) +{ + char *name = NULL; + struct utsname res; + + if (is_cman_cluster()) { +# if SUPPORT_CMAN + cman_node_t us; + cman_handle_t cman; + + cman = cman_init(NULL); + if (cman != NULL && cman_is_active(cman)) { + us.cn_name[0] = 0; + cman_get_node(cman, CMAN_NODEID_US, &us); + name = crm_strdup(us.cn_name); + crm_info("Using CMAN node name: %s", name); + + } else { + crm_err("Couldn't determin node name from CMAN"); + } + + cman_finish(cman); +# endif + + } else if (uname(&res) < 0) { + crm_perror(LOG_ERR, "Could not determin the current host"); + exit(100); + + } else { + name = crm_strdup(res.nodename); + } + return name; +} + +extern int set_cluster_type(enum cluster_type_e type); + +gboolean +init_ais_connection_once(gboolean(*dispatch) (AIS_Message *, char *, int), + void (*destroy) (gpointer), char **our_uuid, char **our_uname, int *nodeid) +{ + enum cluster_type_e stack = get_cluster_type(); + + crm_peer_init(); + + /* Here we just initialize comms */ + switch (stack) { + case pcmk_cluster_classic_ais: + if (init_ais_connection_classic(dispatch, destroy, our_uuid, &pcmk_uname, nodeid) == + FALSE) { + return FALSE; + } + break; + case pcmk_cluster_cman: + if (init_cpg_connection(dispatch, destroy, &pcmk_nodeid) == FALSE) { + return FALSE; + } + pcmk_uname = get_local_node_name(); + break; + case pcmk_cluster_heartbeat: + crm_info("Could not find an active corosync based cluster"); + return FALSE; + break; + default: + crm_err("Invalid cluster type: %s (%d)", name_for_cluster_type(stack), stack); + return FALSE; + break; + } + + crm_info("Connection to '%s': established", name_for_cluster_type(stack)); + + CRM_ASSERT(pcmk_uname != NULL); + pcmk_uname_len = strlen(pcmk_uname); + + if (pcmk_nodeid != 0) { + /* Ensure the local node always exists */ + crm_update_peer(pcmk_nodeid, 0, 0, 0, 0, pcmk_uname, pcmk_uname, NULL, NULL); + } + + if (our_uuid != NULL) { + *our_uuid = get_corosync_uuid(pcmk_nodeid, pcmk_uname); + } + + if (our_uname != NULL) { + *our_uname = crm_strdup(pcmk_uname); + } + + if (nodeid != NULL) { + *nodeid = pcmk_nodeid; + } + + return TRUE; +} + +gboolean +check_message_sanity(const AIS_Message * msg, const char *data) +{ + gboolean sane = TRUE; + gboolean repaired = FALSE; + int dest = msg->host.type; + int tmp_size = msg->header.size - sizeof(AIS_Message); + + if (sane && msg->header.size == 0) { + crm_warn("Message with no size"); + sane = FALSE; + } + + if (sane && msg->header.error != CS_OK) { + crm_warn("Message header contains an error: %d", msg->header.error); + sane = FALSE; + } + + if (sane && ais_data_len(msg) != tmp_size) { + crm_warn("Message payload size is incorrect: expected %d, got %d", ais_data_len(msg), + tmp_size); + sane = TRUE; + } + + if (sane && ais_data_len(msg) == 0) { + crm_warn("Message with no payload"); + sane = FALSE; + } + + if (sane && data && msg->is_compressed == FALSE) { + int str_size = strlen(data) + 1; + + if (ais_data_len(msg) != str_size) { + int lpc = 0; + + crm_warn("Message payload is corrupted: expected %d bytes, got %d", + ais_data_len(msg), str_size); + sane = FALSE; + for (lpc = (str_size - 10); lpc < msg->size; lpc++) { + if (lpc < 0) { + lpc = 0; + } + crm_debug("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]); + } + } + } + + if (sane == FALSE) { + crm_err("Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", + msg->id, ais_dest(&(msg->host)), msg_type2text(dest), + ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), + msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); + + } else if (repaired) { + crm_err + ("Repaired message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", + msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), + msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, + ais_data_len(msg), msg->header.size); + } else { + crm_trace + ("Verfied message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", + msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), + msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, + ais_data_len(msg), msg->header.size); + } + + return sane; +} +#endif + +static int +get_config_opt(confdb_handle_t config, + hdb_handle_t object_handle, const char *key, char **value, const char *fallback) +{ + size_t len = 0; + char *env_key = NULL; + const char *env_value = NULL; + char buffer[256]; + + if (*value) { + crm_free(*value); + *value = NULL; + } + + if (object_handle > 0) { + if (CS_OK == confdb_key_get(config, object_handle, key, strlen(key), &buffer, &len)) { + *value = crm_strdup(buffer); + } + } + + if (*value) { + crm_info("Found '%s' for option: %s", *value, key); + return 0; + } + + env_key = crm_concat("HA", key, '_'); + env_value = getenv(env_key); + crm_free(env_key); + + if (*value) { + crm_info("Found '%s' in ENV for option: %s", *value, key); + *value = crm_strdup(env_value); + return 0; + } + + if (fallback) { + crm_info("Defaulting to '%s' for option: %s", fallback, key); + *value = crm_strdup(fallback); + + } else { + crm_info("No default for option: %s", key); + } + + return -1; +} + +static confdb_handle_t +config_find_init(confdb_handle_t config) +{ + cs_error_t rc = CS_OK; + confdb_handle_t local_handle = OBJECT_PARENT_HANDLE; + + rc = confdb_object_find_start(config, local_handle); + if (rc == CS_OK) { + return local_handle; + } else { + crm_err("Couldn't create search context: %d", rc); + } + return 0; +} + +static hdb_handle_t +config_find_next(confdb_handle_t config, const char *name, confdb_handle_t top_handle) +{ + cs_error_t rc = CS_OK; + hdb_handle_t local_handle = 0; + + if (top_handle == 0) { + crm_err("Couldn't search for %s: no valid context", name); + return 0; + } + + crm_trace("Searching for %s in " HDB_X_FORMAT, name, top_handle); + rc = confdb_object_find(config, top_handle, name, strlen(name), &local_handle); + if (rc != CS_OK) { + crm_info("No additional configuration supplied for: %s", name); + local_handle = 0; + } else { + crm_info("Processing additional %s options...", name); + } + return local_handle; +} + +enum cluster_type_e +find_corosync_variant(void) +{ + confdb_handle_t config; + enum cluster_type_e found = pcmk_cluster_unknown; + + int rc; + char *value = NULL; + confdb_handle_t top_handle = 0; + hdb_handle_t local_handle = 0; + static confdb_callbacks_t callbacks = { }; + + rc = confdb_initialize(&config, &callbacks); + if (rc != CS_OK) { + crm_debug("Could not initialize Cluster Configuration Database API instance error %d", rc); + return found; + } + + top_handle = config_find_init(config); + local_handle = config_find_next(config, "service", top_handle); + while (local_handle) { + crm_free(value); + get_config_opt(config, local_handle, "name", &value, NULL); + if (safe_str_eq("pacemaker", value)) { + found = pcmk_cluster_classic_ais; + + crm_free(value); + get_config_opt(config, local_handle, "ver", &value, "0"); + crm_trace("Found Pacemaker plugin version: %s", value); + break; + } + + local_handle = config_find_next(config, "service", top_handle); + } + crm_free(value); + + if (found == pcmk_cluster_unknown) { + top_handle = config_find_init(config); + local_handle = config_find_next(config, "quorum", top_handle); + get_config_opt(config, local_handle, "provider", &value, NULL); + + if (safe_str_eq("quorum_cman", value)) { + crm_trace("Found CMAN quorum provider"); + found = pcmk_cluster_cman; + } + } + crm_free(value); + + confdb_finalize(config); + return found; +} + diff --git a/lib/cluster/membership.c b/lib/cluster/membership.c index b00318cfb1..25762a4f80 100644 --- a/lib/cluster/membership.c +++ b/lib/cluster/membership.c @@ -402,43 +402,6 @@ crm_update_peer(unsigned int id, uint64_t born, uint64_t seen, int32_t votes, ui return node; } -crm_node_t * -crm_update_ais_node(xmlNode * member, long long seq) -{ - const char *id_s = crm_element_value(member, "id"); - const char *addr = crm_element_value(member, "addr"); - const char *uname = crm_element_value(member, "uname"); - const char *state = crm_element_value(member, "state"); - const char *born_s = crm_element_value(member, "born"); - const char *seen_s = crm_element_value(member, "seen"); - const char *votes_s = crm_element_value(member, "votes"); - const char *procs_s = crm_element_value(member, "processes"); - - int votes = crm_int_helper(votes_s, NULL); - unsigned int id = crm_int_helper(id_s, NULL); - unsigned int procs = crm_int_helper(procs_s, NULL); - - /* TODO: These values will contain garbage if version < 0.7.1 */ - uint64_t born = crm_int_helper(born_s, NULL); - uint64_t seen = crm_int_helper(seen_s, NULL); - - return crm_update_peer(id, born, seen, votes, procs, uname, uname, addr, state); -} - -crm_node_t * -crm_update_cman_node(xmlNode * member, long long seq) -{ - const char *id_s = crm_element_value(member, "id"); - const char *uname = crm_element_value(member, "uname"); - const char *procs_s = crm_element_value(member, "processes"); - - unsigned int id = crm_int_helper(id_s, NULL); - unsigned int procs = crm_int_helper(procs_s, NULL); - - crm_info("Updating peer processes for %s", crm_str(uname)); - return crm_update_peer(id, 0, 0, 0, procs, uname, uname, NULL, NULL); -} - #if SUPPORT_HEARTBEAT crm_node_t * crm_update_ccm_node(const oc_ev_membership_t * oc, int offset, const char *state, uint64_t seq) diff --git a/tools/attrd.c b/tools/attrd.c index ece61c5a0a..d3c88f35e2 100644 --- a/tools/attrd.c +++ b/tools/attrd.c @@ -409,7 +409,6 @@ attrd_ais_dispatch(AIS_Message * wrapper, char *data, int sender) static void attrd_ais_destroy(gpointer unused) { - ais_fd_sync = -1; if (need_shutdown) { /* we signed out, so this is expected */ crm_info("OpenAIS disconnection complete");