Skip to content
Browse files

High: core: Internal tls api improvements for reuse with future LRMD …

…tls backend.
  • Loading branch information...
1 parent df60384 commit 564f7cc2a51dcd2f28ab12a13394f31be5aa3c93 @davidvossel davidvossel committed Jan 5, 2013
Showing with 939 additions and 460 deletions.
  1. +9 −4 cib/callbacks.c
  2. +5 −1 cib/callbacks.h
  3. +1 −1 cib/notify.c
  4. +205 −121 cib/remote.c
  5. +34 −2 include/crm_internal.h
  6. +139 −151 lib/cib/cib_remote.c
  7. +1 −0 lib/common/mainloop.c
  8. +544 −179 lib/common/remote.c
  9. +1 −1 tools/crm_mon.c
View
13 cib/callbacks.c
@@ -347,13 +347,18 @@ do_local_notify(xmlNode * notify_src, const char *client_id,
int rid = 0;
if(sync_reply) {
- CRM_LOG_ASSERT(client_obj->request_id);
+ if (client_obj->ipc) {
+ CRM_LOG_ASSERT(client_obj->request_id);
- rid = client_obj->request_id;
- client_obj->request_id = 0;
+ rid = client_obj->request_id;
+ client_obj->request_id = 0;
- crm_trace("Sending response %d to %s %s",
+ crm_trace("Sending response %d to %s %s",
rid, client_obj->name, from_peer?"(originator of delegated request)":"");
+ } else {
+ crm_trace("Sending response to %s %s",
+ client_obj->name, from_peer?"(originator of delegated request)":"");
+ }
} else {
crm_trace("Sending an event to %s %s",
View
6 cib/callbacks.h
@@ -41,25 +41,29 @@ typedef struct cib_client_s {
char *name;
char *callback_id;
char *user;
+ char *recv_buf;
int request_id;
qb_ipcs_connection_t *ipc;
#ifdef HAVE_GNUTLS_GNUTLS_H
gnutls_session *session;
+ gboolean handshake_complete;
#else
void *session;
#endif
gboolean encrypted;
+ gboolean remote_auth;
mainloop_io_t *remote;
-
+
unsigned long num_calls;
int pre_notify;
int post_notify;
int confirmations;
int replace;
int diffs;
+ int remote_auth_timeout;
GList *delegated_calls;
} cib_client_t;
View
2 cib/notify.c
@@ -83,7 +83,7 @@ cib_notify_client(gpointer key, gpointer value, gpointer user_data)
CRM_CHECK(client != NULL, return TRUE);
CRM_CHECK(update_msg != NULL, return TRUE);
- if (client->ipc == NULL) {
+ if (client->ipc == NULL && client->session == NULL) {
crm_warn("Skipping client with NULL channel");
return FALSE;
}
View
326 cib/remote.c
@@ -60,10 +60,6 @@
# endif
#endif
-#ifdef HAVE_DECL_NANOSLEEP
-# include <time.h>
-#endif
-
extern int remote_tls_fd;
extern gboolean cib_shutdown_flag;
@@ -73,17 +69,16 @@ void cib_remote_connection_destroy(gpointer user_data);
#ifdef HAVE_GNUTLS_GNUTLS_H
# define DH_BITS 1024
gnutls_dh_params dh_params;
-extern gnutls_anon_server_credentials anon_cred_s;
+gnutls_anon_server_credentials anon_cred_s;
static void
debug_log(int level, const char *str)
{
fputs(str, stderr);
}
-
-extern gnutls_session *create_tls_session(int csock, int type);
-
#endif
+#define REMOTE_AUTH_TIMEOUT 10000
+
int num_clients;
int authenticate_user(const char *user, const char *passwd);
int cib_remote_listen(gpointer data);
@@ -121,7 +116,7 @@ init_remote_listener(int port, gboolean encrypted)
#else
crm_notice("Starting a tls listener on port %d.", port);
gnutls_global_init();
-/* gnutls_global_set_log_level (10); */
+ /* gnutls_global_set_log_level (10); */
gnutls_global_set_log_function(debug_log);
gnutls_dh_params_init(&dh_params);
gnutls_dh_params_generate2(dh_params, DH_BITS);
@@ -215,37 +210,89 @@ check_group_membership(const char *usr, const char *grp)
return FALSE;
}
+static gboolean
+cib_remote_auth(xmlNode *login)
+{
+ const char *user = NULL;
+ const char *pass = NULL;
+ const char *tmp = NULL;
+
+ crm_log_xml_info(login, "Login: ");
+ if (login == NULL) {
+ return FALSE;
+ }
+
+ tmp = crm_element_name(login);
+ if (safe_str_neq(tmp, "cib_command")) {
+ crm_err("Wrong tag: %s", tmp);
+ return FALSE;
+ }
+
+ tmp = crm_element_value(login, "op");
+ if (safe_str_neq(tmp, "authenticate")) {
+ crm_err("Wrong operation: %s", tmp);
+ return FALSE;
+ }
+
+ user = crm_element_value(login, "user");
+ pass = crm_element_value(login, "password");
+
+ if (!user || !pass) {
+ crm_err("missing auth credentials");
+ return FALSE;
+ }
+
+ /* Non-root daemons can only validate the password of the
+ * user they're running as
+ */
+ if (check_group_membership(user, CRM_DAEMON_GROUP) == FALSE) {
+ crm_err("User is not a member of the required group");
+ return FALSE;
+
+ } else if (authenticate_user(user, pass) == FALSE) {
+ crm_err("PAM auth failed");
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static gboolean
+remote_auth_timeout_cb(gpointer data)
+{
+ cib_client_t *client = data;
+
+ client->remote_auth_timeout = 0;
+
+ if (client->remote_auth == TRUE) {
+ return FALSE;
+ }
+
+ mainloop_del_fd(client->remote);
+ crm_err("Remote client authentication timed out");
+
+ return FALSE;
+}
int
cib_remote_listen(gpointer data)
{
- int lpc = 0;
int csock = 0;
unsigned laddr;
- time_t now = 0;
- time_t start = time(NULL);
struct sockaddr_in addr;
int ssock = *(int *)data;
+ int flag;
#ifdef HAVE_GNUTLS_GNUTLS_H
gnutls_session *session = NULL;
#endif
cib_client_t *new_client = NULL;
- xmlNode *login = NULL;
- const char *user = NULL;
- const char *pass = NULL;
- const char *tmp = NULL;
-
-#ifdef HAVE_DECL_NANOSLEEP
- const struct timespec sleepfast = { 0, 10000000 }; /* 10 millisec */
-#endif
-
static struct mainloop_fd_callbacks remote_client_fd_callbacks =
{
.dispatch = cib_remote_msg,
.destroy = cib_remote_connection_destroy,
- };
-
+ };
+
/* accept the connection */
laddr = sizeof(addr);
csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
@@ -257,10 +304,22 @@ cib_remote_listen(gpointer data)
return TRUE;
}
+ if ((flag = fcntl(csock, F_GETFL)) >= 0) {
+ if (fcntl(csock, F_SETFL, flag | O_NONBLOCK) < 0) {
+ crm_err( "fcntl() write failed");
+ close(csock);
+ return TRUE;
+ }
+ } else {
+ crm_err( "fcntl() read failed");
+ close(csock);
+ return TRUE;
+ }
+
if (ssock == remote_tls_fd) {
#ifdef HAVE_GNUTLS_GNUTLS_H
/* create gnutls session for the server socket */
- session = create_tls_session(csock, GNUTLS_SERVER);
+ session = crm_create_anon_tls_session(csock, GNUTLS_SERVER, anon_cred_s);
if (session == NULL) {
crm_err("TLS session creation failed");
close(csock);
@@ -269,73 +328,13 @@ cib_remote_listen(gpointer data)
#endif
}
- do {
- crm_trace("Iter: %d", lpc++);
- if (ssock == remote_tls_fd) {
-#ifdef HAVE_GNUTLS_GNUTLS_H
- login = crm_recv_remote_msg(session, TRUE);
-#endif
- } else {
- login = crm_recv_remote_msg(GINT_TO_POINTER(csock), FALSE);
- }
- if (login != NULL) {
- break;
- }
-#ifdef HAVE_DECL_NANOSLEEP
- nanosleep(&sleepfast, NULL);
-#else
- sleep(1);
-#endif
- now = time(NULL);
-
- /* Peers have 3s to connect */
- } while (login == NULL && (start - now) < 4);
-
- crm_log_xml_info(login, "Login: ");
- if (login == NULL) {
- goto bail;
- }
-
- tmp = crm_element_name(login);
- if (safe_str_neq(tmp, "cib_command")) {
- crm_err("Wrong tag: %s", tmp);
- goto bail;
- }
-
- tmp = crm_element_value(login, "op");
- if (safe_str_neq(tmp, "authenticate")) {
- crm_err("Wrong operation: %s", tmp);
- goto bail;
- }
-
- user = crm_element_value(login, "user");
- pass = crm_element_value(login, "password");
-
- /* Non-root daemons can only validate the password of the
- * user they're running as
- */
- if (check_group_membership(user, CRM_DAEMON_GROUP) == FALSE) {
- crm_err("User is not a member of the required group");
- goto bail;
-
- } else if (authenticate_user(user, pass) == FALSE) {
- crm_err("PAM auth failed");
- goto bail;
- }
-
- /* send ACK */
num_clients++;
new_client = calloc(1, sizeof(cib_client_t));
- new_client->name = crm_element_value_copy(login, "name");
-
- CRM_CHECK(new_client->id == NULL, free(new_client->id));
new_client->id = crm_generate_uuid();
-
-#if ENABLE_ACL
- new_client->user = strdup(user);
-#endif
-
new_client->callback_id = NULL;
+ /* clients have a few seconds to perform handshake. */
+ new_client->remote_auth_timeout = g_timeout_add(REMOTE_AUTH_TIMEOUT, remote_auth_timeout_cb, new_client);
+
if (ssock == remote_tls_fd) {
#ifdef HAVE_GNUTLS_GNUTLS_H
new_client->encrypted = TRUE;
@@ -345,37 +344,19 @@ cib_remote_listen(gpointer data)
new_client->session = GINT_TO_POINTER(csock);
}
- free_xml(login);
- login = create_xml_node(NULL, "cib_result");
- crm_xml_add(login, F_CIB_OPERATION, CRM_OP_REGISTER);
- crm_xml_add(login, F_CIB_CLIENTID, new_client->id);
- crm_send_remote_msg(new_client->session, login, new_client->encrypted);
- free_xml(login);
-
new_client->remote = mainloop_add_fd(
"cib-remote-client", G_PRIORITY_DEFAULT, csock, new_client, &remote_client_fd_callbacks);
g_hash_table_insert(client_list, new_client->id, new_client);
return TRUE;
-
- bail:
- if (ssock == remote_tls_fd) {
-#ifdef HAVE_GNUTLS_GNUTLS_H
- gnutls_bye(*session, GNUTLS_SHUT_RDWR);
- gnutls_deinit(*session);
- gnutls_free(session);
-#endif
- }
- close(csock);
- free_xml(login);
- return TRUE;
}
void
cib_remote_connection_destroy(gpointer user_data)
{
cib_client_t *client = user_data;
+ int csock = 0;
if (client == NULL) {
return;
@@ -393,10 +374,36 @@ cib_remote_connection_destroy(gpointer user_data)
crm_trace("Destroying %s (%p)", client->name, user_data);
num_clients--;
crm_trace("Num unfree'd clients: %d", num_clients);
+ if (client->remote_auth_timeout) {
+ g_source_remove(client->remote_auth_timeout);
+ }
+
+ if (client->encrypted) {
+#ifdef HAVE_GNUTLS_GNUTLS_H
+ if (client->session) {
+ void *sock_ptr = gnutls_transport_get_ptr(*client->session);
+ csock = GPOINTER_TO_INT(sock_ptr);
+ if (client->handshake_complete) {
+ gnutls_bye(*client->session, GNUTLS_SHUT_WR);
+ }
+ gnutls_deinit(*client->session);
+ gnutls_free(client->session);
+ }
+#endif
+ } else {
+ csock = GPOINTER_TO_INT(client->session);
+ }
+ client->session = NULL;
+
+ if (csock > 0) {
+ close(csock);
+ }
+
free(client->name);
free(client->callback_id);
free(client->id);
free(client->user);
+ free(client->recv_buf);
free(client);
crm_trace("Freed the cib client");
@@ -406,24 +413,15 @@ cib_remote_connection_destroy(gpointer user_data)
return;
}
-int
-cib_remote_msg(gpointer data)
+static void
+cib_handle_remote_msg(cib_client_t *client, xmlNode *command)
{
const char *value = NULL;
- xmlNode *command = NULL;
- cib_client_t *client = data;
-
- crm_trace("%s callback", client->encrypted ? "secure" : "clear-text");
-
- command = crm_recv_remote_msg(client->session, client->encrypted);
- if (command == NULL) {
- return -1;
- }
value = crm_element_name(command);
if (safe_str_neq(value, "cib_command")) {
crm_log_xml_trace(command, "Bad command: ");
- goto bail;
+ return;
}
if (client->name == NULL) {
@@ -472,9 +470,95 @@ cib_remote_msg(gpointer data)
crm_log_xml_trace(command, "Remote command: ");
cib_common_callback_worker(0, 0, command, client, TRUE);
- bail:
- free_xml(command);
- command = NULL;
+}
+
+int
+cib_remote_msg(gpointer data)
+{
+ xmlNode *command = NULL;
+ cib_client_t *client = data;
+ int disconnected = 0;
+ int timeout = client->remote_auth ? -1 : 1000;
+
+ crm_trace("%s callback", client->encrypted ? "secure" : "clear-text");
+
+#ifdef HAVE_GNUTLS_GNUTLS_H
+ if (client->encrypted && (client->handshake_complete == FALSE)) {
+ int rc = 0;
+
+ /* Muliple calls to handshake will be required, this callback
+ * will be invoked once the client sends more handshake data. */
+ do {
+ rc = gnutls_handshake(*client->session);
+
+ if (rc < 0 && rc != GNUTLS_E_AGAIN) {
+ crm_err("Remote cib tls handshake failed");
+ return -1;
+ }
+ } while (rc == GNUTLS_E_INTERRUPTED);
+
+ if (rc == 0) {
+ crm_debug("Remote cib tls handshake completed");
+ client->handshake_complete = TRUE;
+ if (client->remote_auth_timeout) {
+ g_source_remove(client->remote_auth_timeout);
+ }
+ /* after handshake, clients must send auth in a few seconds */
+ client->remote_auth_timeout = g_timeout_add(REMOTE_AUTH_TIMEOUT, remote_auth_timeout_cb, client);
+ }
+ return 0;
+ }
+#endif
+
+ crm_recv_remote_msg(client->session, &client->recv_buf, client->encrypted, timeout, &disconnected);
+
+ /* must pass auth before we will process anything else */
+ if (client->remote_auth == FALSE) {
+ xmlNode *reg;
+#if ENABLE_ACL
+ const char *user = NULL;
+#endif
+ command = crm_parse_remote_buffer(&client->recv_buf);
+ if (cib_remote_auth(command) == FALSE) {
+ free_xml(command);
+ return -1;
+ }
+
+ crm_debug("remote connection authenticated successfully");
+ client->remote_auth = TRUE;
+ g_source_remove(client->remote_auth_timeout);
+ client->remote_auth_timeout = 0;
+ client->name = crm_element_value_copy(command, "name");
+
+#if ENABLE_ACL
+ user = crm_element_value(command, "user");
+ if (user) {
+ new_client->user = strdup(user);
+ }
+#endif
+
+ /* send ACK */
+ reg = create_xml_node(NULL, "cib_result");
+ crm_xml_add(reg, F_CIB_OPERATION, CRM_OP_REGISTER);
+ crm_xml_add(reg, F_CIB_CLIENTID, client->id);
+ crm_send_remote_msg(client->session, reg, client->encrypted);
+ free_xml(reg);
+ free_xml(command);
+ }
+
+ command = crm_parse_remote_buffer(&client->recv_buf);
+ while (command) {
+ crm_trace("command received");
+ cib_handle_remote_msg(client, command);
+ free_xml(command);
+ command = crm_parse_remote_buffer(&client->recv_buf);
+ }
+
+ if (disconnected) {
+ crm_trace("disconnected while receiving remote cib msg.");
+ return -1;
+ }
+
return 0;
}
View
36 include/crm_internal.h
@@ -199,8 +199,40 @@ void g_hash_destroy_str(gpointer data);
long long crm_int_helper(const char *text, char **end_text);
char *crm_concat(const char *prefix, const char *suffix, char join);
char *generate_hash_key(const char *crm_msg_reference, const char *sys);
-xmlNode *crm_recv_remote_msg(void *session, gboolean encrypted);
-void crm_send_remote_msg(void *session, xmlNode * msg, gboolean encrypted);
+
+
+/*! remote tcp/tls helper functions */
+gboolean crm_recv_remote_msg(void *session, char **recv_buf, gboolean encrypted, int total_timeout_ms, int *disconnected);
+char *crm_recv_remote_raw(void *data, gboolean encrypted, size_t max_recv, size_t *recv_len, int *disconnected);
+int crm_send_remote_msg(void *session, xmlNode * msg, gboolean encrypted);
+int crm_recv_remote_ready(void *session, gboolean encrypted, int timeout_ms);
+xmlNode *crm_parse_remote_buffer(char **msg_buf);
+int crm_remote_tcp_connect(const char *host, int port);
+
+#ifdef HAVE_GNUTLS_GNUTLS_H
+/*!
+ * \internal
+ * \brief Initiate the client handshake after establishing the tcp socket.
+ * \note This is a blocking function, it will block until the entire handshake
+ * is complete or until the timeout period is reached.
+ * \retval 0 success
+ * \retval negative, failure
+ */
+int crm_initiate_client_tls_handshake(void *session_data, int timeout_ms);
+/*!
+ * \internal
+ * \brief Create client or server session for anon DH encryption credentials
+ * \param sock, the socket the session will use for transport
+ * \param type, GNUTLS_SERVER or GNUTLS_CLIENT
+ * \param credentials, gnutls_anon_server_credentials_t or gnutls_anon_client_credentials_t
+ *
+ * \retval gnutls_session * on success
+ * \retval NULL on failure
+ */
+void *crm_create_anon_tls_session(int sock, int type, void *credentials);
+#endif
+
+#define REMOTE_MSG_TERMINATOR "\r\n\r\n"
const char *daemon_option(const char *option);
void set_daemon_option(const char *option, const char *value);
View
290 lib/cib/cib_remote.c
@@ -38,14 +38,15 @@
#ifdef HAVE_GNUTLS_GNUTLS_H
# undef KEYFILE
# include <gnutls/gnutls.h>
-extern gnutls_anon_client_credentials anon_cred_c;
-extern gnutls_session *create_tls_session(int csock, int type);
+gnutls_anon_client_credentials anon_cred_c;
+#define DEFAULT_CLIENT_HANDSHAKE_TIMEOUT 5000 /* 5 seconds */
const int kx_prio[] = {
GNUTLS_KX_ANON_DH,
0
};
+static gboolean remote_gnutls_credentials_init = FALSE;
#else
typedef void gnutls_session;
#endif
@@ -61,6 +62,7 @@ struct remote_connection_s {
gnutls_session *session;
mainloop_io_t *source;
char *token;
+ char *recv_buf;
};
typedef struct cib_remote_opaque_s {
@@ -76,7 +78,8 @@ typedef struct cib_remote_opaque_s {
} cib_remote_opaque_t;
void cib_remote_connection_destroy(gpointer user_data);
-int cib_remote_dispatch(gpointer user_data);
+int cib_remote_callback_dispatch(gpointer user_data);
+int cib_remote_command_dispatch(gpointer user_data);
int cib_remote_signon(cib_t * cib, const char *name, enum cib_conn_type type);
int cib_remote_signoff(cib_t * cib);
int cib_remote_free(cib_t * cib);
@@ -158,117 +161,91 @@ cib_tls_close(cib_t * cib)
{
cib_remote_opaque_t *private = cib->variant_opaque;
- shutdown(private->command.socket, SHUT_RDWR); /* no more receptions */
- shutdown(private->callback.socket, SHUT_RDWR); /* no more receptions */
- close(private->command.socket);
- close(private->callback.socket);
-
#ifdef HAVE_GNUTLS_GNUTLS_H
if (private->command.encrypted) {
- gnutls_bye(*(private->command.session), GNUTLS_SHUT_RDWR);
- gnutls_deinit(*(private->command.session));
- gnutls_free(private->command.session);
-
- gnutls_bye(*(private->callback.session), GNUTLS_SHUT_RDWR);
- gnutls_deinit(*(private->callback.session));
- gnutls_free(private->callback.session);
+ if (private->command.session) {
+ gnutls_bye(*(private->command.session), GNUTLS_SHUT_RDWR);
+ gnutls_deinit(*(private->command.session));
+ gnutls_free(private->command.session);
+ }
- gnutls_anon_free_client_credentials(anon_cred_c);
- gnutls_global_deinit();
+ if (private->callback.session) {
+ gnutls_bye(*(private->callback.session), GNUTLS_SHUT_RDWR);
+ gnutls_deinit(*(private->callback.session));
+ gnutls_free(private->callback.session);
+ }
+ private->command.session = NULL;
+ private->callback.session = NULL;
+ if (remote_gnutls_credentials_init) {
+ gnutls_anon_free_client_credentials(anon_cred_c);
+ gnutls_global_deinit();
+ remote_gnutls_credentials_init = FALSE;
+ }
}
#endif
+
+ if (private->command.socket) {
+ shutdown(private->command.socket, SHUT_RDWR); /* no more receptions */
+ close(private->command.socket);
+ }
+ if (private->callback.socket) {
+ shutdown(private->callback.socket, SHUT_RDWR); /* no more receptions */
+ close(private->callback.socket);
+ }
+ private->command.socket = 0;
+ private->callback.socket = 0;
+
+ free(private->command.recv_buf);
+ free(private->callback.recv_buf);
+ private->command.recv_buf = NULL;
+ private->callback.recv_buf = NULL;
+
return 0;
}
static int
-cib_tls_signon(cib_t * cib, struct remote_connection_s *connection)
+cib_tls_signon(cib_t * cib, struct remote_connection_s *connection, gboolean event_channel)
{
int sock;
cib_remote_opaque_t *private = cib->variant_opaque;
- struct sockaddr_in addr;
int rc = 0;
- char *server = private->server;
-
- int ret_ga;
- struct addrinfo *res;
- struct addrinfo hints;
+ int disconnected = 0;
xmlNode *answer = NULL;
xmlNode *login = NULL;
- static struct mainloop_fd_callbacks cib_fd_callbacks =
- {
- .dispatch = cib_remote_dispatch,
- .destroy = cib_remote_connection_destroy,
- };
+ static struct mainloop_fd_callbacks cib_fd_callbacks = { 0, };
+
+ cib_fd_callbacks.dispatch = event_channel ? cib_remote_callback_dispatch : cib_remote_command_dispatch;
+ cib_fd_callbacks.destroy = cib_remote_connection_destroy;
connection->socket = 0;
connection->session = NULL;
- /* create socket */
- sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
- if (sock == -1) {
- crm_perror(LOG_ERR, "Socket creation failed");
- return -1;
- }
-
- /* getaddrinfo */
- bzero(&hints, sizeof(struct addrinfo));
- hints.ai_flags = AI_CANONNAME;
- hints.ai_family = AF_INET;
- hints.ai_socktype = SOCK_RAW;
-
- if (hints.ai_family == AF_INET6) {
- hints.ai_protocol = IPPROTO_ICMPV6;
- } else {
- hints.ai_protocol = IPPROTO_ICMP;
- }
-
- crm_debug("Looking up %s", server);
- ret_ga = getaddrinfo(server, NULL, &hints, &res);
- if (ret_ga) {
- crm_err("getaddrinfo: %s", gai_strerror(ret_ga));
- close(sock);
- return -1;
- }
-
- if (res->ai_canonname) {
- server = res->ai_canonname;
- }
-
- crm_debug("Got address %s for %s", server, private->server);
-
- if (!res->ai_addr) {
- fprintf(stderr, "getaddrinfo failed");
- crm_exit(1);
- }
-#if 1
- memcpy(&addr, res->ai_addr, res->ai_addrlen);
-#else
- /* connect to server */
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = AF_INET;
- addr.sin_addr.s_addr = inet_addr(server);
-#endif
- addr.sin_port = htons(private->port);
-
- if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
- crm_perror(LOG_ERR, "Connection to %s:%d failed", server, private->port);
- close(sock);
- return -1;
+ sock = crm_remote_tcp_connect(private->server, private->port);
+ if (sock <= 0) {
+ crm_perror(LOG_ERR, "remote tcp connection to %s:%d failed", private->server, private->port);
}
+ connection->socket = sock;
if (connection->encrypted) {
/* initialize GnuTls lib */
#ifdef HAVE_GNUTLS_GNUTLS_H
- gnutls_global_init();
- gnutls_anon_allocate_client_credentials(&anon_cred_c);
+ if (remote_gnutls_credentials_init == FALSE) {
+ gnutls_global_init();
+ gnutls_anon_allocate_client_credentials(&anon_cred_c);
+ remote_gnutls_credentials_init = TRUE;
+ }
/* bind the socket to GnuTls lib */
- connection->session = create_tls_session(sock, GNUTLS_CLIENT);
- if (connection->session == NULL) {
- crm_perror(LOG_ERR, "Session creation for %s:%d failed", server, private->port);
- close(sock);
+ connection->session = crm_create_anon_tls_session(sock, GNUTLS_CLIENT, anon_cred_c);
+
+ if (crm_initiate_client_tls_handshake(connection->session, DEFAULT_CLIENT_HANDSHAKE_TIMEOUT) != 0) {
+ crm_err("Session creation for %s:%d failed", private->server, private->port);
+
+ gnutls_deinit(*connection->session);
+ gnutls_free(connection->session);
+ connection->session = NULL;
cib_tls_close(cib);
return -1;
}
@@ -289,7 +266,14 @@ cib_tls_signon(cib_t * cib, struct remote_connection_s *connection)
crm_send_remote_msg(connection->session, login, connection->encrypted);
free_xml(login);
- answer = crm_recv_remote_msg(connection->session, connection->encrypted);
+ crm_recv_remote_msg(connection->session, &connection->recv_buf, connection->encrypted, -1, &disconnected);
+
+ if (disconnected) {
+ rc = -ENOTCONN;
+ }
+
+ answer = crm_parse_remote_buffer(&connection->recv_buf);
+
crm_log_xml_trace(answer, "Reply");
if (answer == NULL) {
rc = -EPROTO;
@@ -310,12 +294,15 @@ cib_tls_signon(cib_t * cib, struct remote_connection_s *connection)
connection->token = strdup(tmp_ticket);
}
}
+ free_xml(answer);
+ answer = NULL;
if (rc != 0) {
cib_tls_close(cib);
+ return rc;
}
- connection->socket = sock;
+ crm_trace("remote client connection established");
connection->source = mainloop_add_fd("cib-remote", G_PRIORITY_HIGH, connection->socket, cib, &cib_fd_callbacks);
return rc;
}
@@ -331,35 +318,61 @@ cib_remote_connection_destroy(gpointer user_data)
}
int
-cib_remote_dispatch(gpointer user_data)
+cib_remote_command_dispatch(gpointer user_data)
+{
+ int disconnected = 0;
+ cib_t *cib = user_data;
+ cib_remote_opaque_t *private = cib->variant_opaque;
+
+ crm_recv_remote_msg(private->command.session, &private->command.recv_buf, private->command.encrypted, -1, &disconnected);
+
+ free(private->command.recv_buf);
+ private->command.recv_buf = NULL;
+ crm_err("received late reply for remote cib connection, discarding");
+
+ if (disconnected) {
+ return -1;
+ }
+ return 0;
+}
+
+int
+cib_remote_callback_dispatch(gpointer user_data)
{
cib_t *cib = user_data;
cib_remote_opaque_t *private = cib->variant_opaque;
xmlNode *msg = NULL;
- const char *type = NULL;
+ int disconnected = 0;
crm_info("Message on callback channel");
- msg = crm_recv_remote_msg(private->callback.session, private->callback.encrypted);
- type = crm_element_value(msg, F_TYPE);
- crm_trace("Activating %s callbacks...", type);
+ crm_recv_remote_msg(private->callback.session, &private->callback.recv_buf, private->callback.encrypted, -1, &disconnected);
- if (safe_str_eq(type, T_CIB)) {
- cib_native_callback(cib, msg, 0, 0);
+ msg = crm_parse_remote_buffer(&private->callback.recv_buf);
+ while (msg) {
+ const char *type = crm_element_value(msg, F_TYPE);
+ crm_trace("Activating %s callbacks...", type);
- } else if (safe_str_eq(type, T_CIB_NOTIFY)) {
- g_list_foreach(cib->notify_list, cib_native_notify, msg);
+ if (safe_str_eq(type, T_CIB)) {
+ cib_native_callback(cib, msg, 0, 0);
- } else {
- crm_err("Unknown message type: %s", type);
- }
+ } else if (safe_str_eq(type, T_CIB_NOTIFY)) {
+ g_list_foreach(cib->notify_list, cib_native_notify, msg);
+
+ } else {
+ crm_err("Unknown message type: %s", type);
+ }
- if (msg != NULL) {
free_xml(msg);
- return 0;
+ msg = crm_parse_remote_buffer(&private->callback.recv_buf);
+ }
+
+ if (disconnected) {
+ return -1;
}
- return -1;
+
+ return 0;
}
int
@@ -394,11 +407,11 @@ cib_remote_signon(cib_t * cib, const char *name, enum cib_conn_type type)
}
if (rc == pcmk_ok) {
- rc = cib_tls_signon(cib, &(private->command));
+ rc = cib_tls_signon(cib, &(private->command), FALSE);
}
if (rc == pcmk_ok) {
- rc = cib_tls_signon(cib, &(private->callback));
+ rc = cib_tls_signon(cib, &(private->callback), TRUE);
}
if (rc == pcmk_ok) {
@@ -463,37 +476,20 @@ cib_remote_free(cib_t * cib)
return rc;
}
-static gboolean timer_expired = FALSE;
-static struct timer_rec_s *sync_timer = NULL;
-static gboolean
-cib_timeout_handler(gpointer data)
-{
- struct timer_rec_s *timer = data;
-
- timer_expired = TRUE;
- crm_err("Call %d timed out after %ds", timer->call_id, timer->timeout);
-
- /* Always return TRUE, never remove the handler
- * We do that after the while-loop in cib_native_perform_op()
- */
- return TRUE;
-}
-
int
cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const char *section,
xmlNode * data, xmlNode ** output_data, int call_options, const char *name)
{
int rc = pcmk_ok;
+ int disconnected = 0;
+ int remaining_time = 0;
+ time_t start_time;
xmlNode *op_msg = NULL;
xmlNode *op_reply = NULL;
cib_remote_opaque_t *private = cib->variant_opaque;
- if (sync_timer == NULL) {
- sync_timer = calloc(1, sizeof(struct timer_rec_s));
- }
-
if (cib->state == cib_disconnected) {
return -ENOTCONN;
}
@@ -524,7 +520,11 @@ cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const char
}
crm_trace("Sending %s message to CIB service", op);
- crm_send_remote_msg(private->command.session, op_msg, private->command.encrypted);
+ if (!(call_options & cib_sync_call)) {
+ crm_send_remote_msg(private->callback.session, op_msg, private->command.encrypted);
+ } else {
+ crm_send_remote_msg(private->command.session, op_msg, private->command.encrypted);
+ }
free_xml(op_msg);
if ((call_options & cib_discard_reply)) {
@@ -537,30 +537,21 @@ cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const char
crm_trace("Waiting for a syncronous reply");
- if (cib->call_timeout > 0) {
- /* We need this, even with msgfromIPC_timeout(), because we might
- * get other/older replies that don't match the active request
- */
- timer_expired = FALSE;
- sync_timer->call_id = cib->call_id;
- sync_timer->timeout = cib->call_timeout * 1000;
- sync_timer->ref = g_timeout_add(sync_timer->timeout, cib_timeout_handler, sync_timer);
- }
+ start_time = time(NULL);
+ remaining_time = cib->call_timeout ? cib->call_timeout : 60;
- while (timer_expired == FALSE) {
+ while (remaining_time > 0 && !disconnected) {
int reply_id = -1;
int msg_id = cib->call_id;
- op_reply = crm_recv_remote_msg(private->command.session, private->command.encrypted);
- if (op_reply == NULL) {
+ crm_recv_remote_msg(private->command.session, &private->command.recv_buf, private->command.encrypted, remaining_time * 1000, &disconnected);
+ op_reply = crm_parse_remote_buffer(&private->command.recv_buf);
+
+ if (!op_reply) {
break;
}
crm_element_value_int(op_reply, F_CIB_CALLID, &reply_id);
- CRM_CHECK(reply_id > 0, free_xml(op_reply);
- if (sync_timer->ref > 0) {
- g_source_remove(sync_timer->ref); sync_timer->ref = 0;}
- return -ENOMSG) ;
if (reply_id == msg_id) {
break;
@@ -579,15 +570,9 @@ cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const char
free_xml(op_reply);
op_reply = NULL;
- }
-
- if (sync_timer->ref > 0) {
- g_source_remove(sync_timer->ref);
- sync_timer->ref = 0;
- }
- if (timer_expired) {
- return -ETIME;
+ /* wasn't the right reply, try and read some more */
+ remaining_time = time(NULL) - start_time;
}
/* if(IPC_ISRCONN(native->command_channel) == FALSE) { */
@@ -596,7 +581,10 @@ cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const char
/* cib->state = cib_disconnected; */
/* } */
- if (op_reply == NULL) {
+ if (disconnected) {
+ crm_err("Disconnected while waiting for reply.");
+ return -ENOTCONN;
+ } else if (op_reply == NULL) {
crm_err("No reply message - empty");
return -ENOMSG;
}
View
1 lib/common/mainloop.c
@@ -178,6 +178,7 @@ mainloop_destroy_trigger(crm_trigger_t * source)
source->trigger = FALSE;
if (source->id > 0) {
g_source_remove(source->id);
+ source->id = 0;
}
return TRUE;
}
View
723 lib/common/remote.c
@@ -25,8 +25,10 @@
#include <sys/stat.h>
#include <unistd.h>
#include <sys/socket.h>
-
+#include <arpa/inet.h>
#include <netinet/ip.h>
+#include <netdb.h>
+
#include <stdlib.h>
#include <errno.h>
@@ -42,30 +44,40 @@
#endif
#ifdef HAVE_GNUTLS_GNUTLS_H
-const int tls_kx_order[] = {
+const int anon_tls_kx_order[] = {
GNUTLS_KX_ANON_DH,
GNUTLS_KX_DHE_RSA,
GNUTLS_KX_DHE_DSS,
GNUTLS_KX_RSA,
0
};
-gnutls_anon_client_credentials anon_cred_c;
-gnutls_anon_server_credentials anon_cred_s;
-static char *cib_send_tls(gnutls_session * session, xmlNode * msg);
-static char *cib_recv_tls(gnutls_session * session);
-#endif
+int
+crm_initiate_client_tls_handshake(void *session_data, int timeout_ms)
+{
+ int rc = 0;
+ int pollrc = 0;
+ time_t start = time(NULL);
+ gnutls_session *session = session_data;
-char *cib_recv_plaintext(int sock);
-char *cib_send_plaintext(int sock, xmlNode * msg);
+ do {
+ rc = gnutls_handshake(*session);
+ if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
+ pollrc = crm_recv_remote_ready(session, TRUE, 1000);
+ if (pollrc < 0) {
+ /* poll returned error, there is no hope */
+ rc = -1;
+ }
+ }
+ } while (((time(NULL) - start) < (timeout_ms/1000)) &&
+ (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN));
-#ifdef HAVE_GNUTLS_GNUTLS_H
-gnutls_session *create_tls_session(int csock, int type);
+ return rc;
+}
-gnutls_session *
-create_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ )
+void *
+crm_create_anon_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */, void *credentials)
{
- int rc = 0;
gnutls_session *session = gnutls_malloc(sizeof(gnutls_session));
gnutls_init(session, type);
@@ -75,266 +87,619 @@ create_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ )
/* gnutls_priority_set_direct (*session, "NONE:+VERS-TLS-ALL:+CIPHER-ALL:+MAC-ALL:+SIGN-ALL:+COMP-ALL:+ANON-DH", NULL); */
# else
gnutls_set_default_priority(*session);
- gnutls_kx_set_priority(*session, tls_kx_order);
+ gnutls_kx_set_priority(*session, anon_tls_kx_order);
# endif
gnutls_transport_set_ptr(*session, (gnutls_transport_ptr) GINT_TO_POINTER(csock));
switch (type) {
- case GNUTLS_SERVER:
- gnutls_credentials_set(*session, GNUTLS_CRD_ANON, anon_cred_s);
- break;
- case GNUTLS_CLIENT:
- gnutls_credentials_set(*session, GNUTLS_CRD_ANON, anon_cred_c);
- break;
+ case GNUTLS_SERVER:
+ gnutls_credentials_set(*session, GNUTLS_CRD_ANON, (gnutls_anon_server_credentials_t) credentials);
+ break;
+ case GNUTLS_CLIENT:
+ gnutls_credentials_set(*session, GNUTLS_CRD_ANON, (gnutls_anon_client_credentials_t) credentials);
+ break;
}
- do {
- rc = gnutls_handshake(*session);
- } while (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN);
-
- if (rc < 0) {
- crm_err("Handshake failed: %s", gnutls_strerror(rc));
- gnutls_deinit(*session);
- gnutls_free(session);
- return NULL;
- }
return session;
}
-static char *
-cib_send_tls(gnutls_session * session, xmlNode * msg)
+static int
+crm_send_tls(gnutls_session * session, const char *buf, size_t len)
{
- char *xml_text = NULL;
-
-# if 0
- const char *name = crm_element_name(msg);
+ const char *unsent = buf;
+ int rc = 0;
+ int total_send;
- if (safe_str_neq(name, "cib_command")) {
- xmlNodeSetName(msg, "cib_result");
+ if (buf == NULL) {
+ return -1;
}
-# endif
- xml_text = dump_xml_unformatted(msg);
- if (xml_text != NULL) {
- char *unsent = xml_text;
- int len = strlen(xml_text);
- int rc = 0;
- len++; /* null char */
- crm_trace("Message size: %d", len);
+ total_send = len;
+ crm_trace("Message size: %d", len);
- while (TRUE) {
- rc = gnutls_record_send(*session, unsent, len);
- crm_debug("Sent %d bytes", rc);
+ while (TRUE) {
+ rc = gnutls_record_send(*session, unsent, len);
- if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
- crm_debug("Retry");
+ if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
+ crm_debug("Retry");
- } else if (rc < 0) {
- crm_debug("Connection terminated");
- break;
+ } else if (rc < 0) {
+ crm_err("Connection terminated rc = %d", rc);
+ break;
- } else if (rc < len) {
- crm_debug("Only sent %d of %d bytes", rc, len);
- len -= rc;
- unsent += rc;
- } else {
- break;
- }
+ } else if (rc < len) {
+ crm_debug("Only sent %d of %d bytes", rc, len);
+ len -= rc;
+ unsent += rc;
+ } else {
+ crm_debug("Sent %d bytes", rc);
+ break;
}
-
}
- free(xml_text);
- return NULL;
+ return rc < 0 ? rc : total_send;
}
+
+/*!
+ * \internal
+ * \brief Read bytes off non blocking tls session.
+ *
+ * \param session - tls session to read
+ * \param max_size - max bytes allowed to read for buffer. 0 assumes no limit
+ *
+ * \note only use with NON-Blocking sockets. Should only be used after polling socket.
+ * This function will return once max_size is met, the socket read buffer
+ * is empty, or an error is encountered.
+ *
+ * \retval '\0' terminated buffer on success
+ */
static char *
-cib_recv_tls(gnutls_session * session)
+crm_recv_tls(gnutls_session * session, size_t max_size, size_t *recv_len, int *disconnected)
{
char *buf = NULL;
-
int rc = 0;
- int len = 0;
- int chunk_size = 1024;
+ size_t len = 0;
+ size_t chunk_size = max_size ? max_size : 1024;
+ size_t buf_size = 0;
+ size_t read_size = 0;
if (session == NULL) {
- return NULL;
+ if (disconnected) {
+ *disconnected = 1;
+ }
+ goto done;
}
- buf = calloc(1, chunk_size);
+ buf = calloc(1, chunk_size + 1);
+ buf_size = chunk_size;
while (TRUE) {
- errno = 0;
- rc = gnutls_record_recv(*session, buf + len, chunk_size);
- crm_trace("Got %d more bytes. errno=%d", rc, errno);
+ read_size = buf_size - len;
- if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
- crm_trace("Retry");
+ /* automatically grow the buffer when needed if max_size is not set.*/
+ if (!max_size && (read_size < (chunk_size / 2))) {
+ buf_size += chunk_size;
+ crm_trace("Grow buffer by %d more bytes. buf is now %d bytes", (int)chunk_size, buf_size);
+ buf = realloc(buf, buf_size + 1);
+ CRM_ASSERT(buf != NULL);
- } else if (rc == GNUTLS_E_UNEXPECTED_PACKET_LENGTH) {
- crm_trace("Session disconnected");
- goto bail;
+ read_size = buf_size - len;
+ }
- } else if (rc < 0) {
- crm_err("Error receiving message: %s (%d)", gnutls_strerror(rc), rc);
- goto bail;
+ rc = gnutls_record_recv(*session, buf + len, read_size);
- } else if (rc == chunk_size) {
+ if (rc > 0) {
+ crm_trace("Got %d more bytes.", rc);
len += rc;
- chunk_size *= 2;
- buf = realloc(buf, len + chunk_size);
- crm_trace("Retry with %d more bytes", (int)chunk_size);
- CRM_ASSERT(buf != NULL);
-
- } else if (buf[len + rc - 1] != 0) {
- crm_trace("Last char is %d '%c'", buf[len + rc - 1], buf[len + rc - 1]);
- crm_trace("Retry with %d more bytes", (int)chunk_size);
- len += rc;
- buf = realloc(buf, len + chunk_size);
- CRM_ASSERT(buf != NULL);
+ /* always null terminate buffer, the +1 to alloc always allows for this.*/
+ buf[len] = '\0';
+ }
+ if (max_size && (max_size == read_size)) {
+ crm_trace("Buffer max read size %d met" , max_size);
+ goto done;
+ }
- } else {
- crm_trace("Got %d more bytes", (int)rc);
- return buf;
+ /* process any errors. */
+ if (rc == GNUTLS_E_INTERRUPTED) {
+ crm_trace("EINTR encoutered, retry tls read");
+ } else if (rc == GNUTLS_E_AGAIN) {
+ crm_trace("non-blocking, exiting read on rc = %d", rc);
+ goto done;
+ } else if (rc <= 0) {
+ if (rc == 0) {
+ crm_debug("EOF encoutered during TLS read");
+ } else {
+ crm_debug("Error receiving message: %s (%d)", gnutls_strerror(rc), rc);
+ }
+ if (disconnected) {
+ *disconnected = 1;
+ }
+ goto done;
}
}
- bail:
- free(buf);
- return NULL;
+
+done:
+ if (recv_len) {
+ *recv_len = len;
+ }
+ if (!len) {
+ free(buf);
+ buf = NULL;
+ }
+ return buf;
}
#endif
-char *
-cib_send_plaintext(int sock, xmlNode * msg)
+static int
+crm_send_plaintext(int sock, const char *buf, size_t len)
{
- char *xml_text = dump_xml_unformatted(msg);
- if (xml_text != NULL) {
- int rc = 0;
- char *unsent = xml_text;
- int len = strlen(xml_text);
+ int rc = 0;
+ const char *unsent = buf;
+ int total_send;
- len++; /* null char */
- crm_trace("Message on socket %d: size=%d", sock, len);
- retry:
- rc = write(sock, unsent, len);
- if (rc < 0) {
- switch (errno) {
- case EINTR:
- case EAGAIN:
- crm_trace("Retry");
- goto retry;
- default:
- crm_perror(LOG_ERR, "Could only write %d of the remaining %d bytes", rc, len);
- break;
- }
+ if (buf == NULL) {
+ return -1;
+ }
+ total_send = len;
- } else if (rc < len) {
- crm_trace("Only sent %d of %d remaining bytes", rc, len);
- len -= rc;
- unsent += rc;
+ crm_trace("Message on socket %d: size=%d", sock, len);
+ retry:
+ rc = write(sock, unsent, len);
+ if (rc < 0) {
+ switch (errno) {
+ case EINTR:
+ case EAGAIN:
+ crm_trace("Retry");
goto retry;
-
- } else {
- crm_trace("Sent %d bytes: %.100s", rc, xml_text);
+ default:
+ crm_perror(LOG_ERR, "Could only write %d of the remaining %d bytes", rc, (int) len);
+ break;
}
+
+ } else if (rc < len) {
+ crm_trace("Only sent %d of %d remaining bytes", rc, len);
+ len -= rc;
+ unsent += rc;
+ goto retry;
+
+ } else {
+ crm_trace("Sent %d bytes: %.100s", rc, buf);
}
- free(xml_text);
- return NULL;
+
+ return rc < 0 ? rc : total_send;
}
-char *
-cib_recv_plaintext(int sock)
+/*!
+ * \internal
+ * \brief Read bytes off non blocking socket.
+ *
+ * \param session - tls session to read
+ * \param max_size - max bytes allowed to read for buffer. 0 assumes no limit
+ *
+ * \note only use with NON-Blocking sockets. Should only be used after polling socket.
+ * This function will return once max_size is met, the socket read buffer
+ * is empty, or an error is encountered.
+ *
+ * \retval '\0' terminated buffer on success
+ */
+static char *
+crm_recv_plaintext(int sock, size_t max_size, size_t *recv_len, int *disconnected)
{
char *buf = NULL;
-
ssize_t rc = 0;
ssize_t len = 0;
- ssize_t chunk_size = 512;
+ ssize_t chunk_size = max_size ? max_size : 1024;
+ size_t buf_size = 0;
+ size_t read_size = 0;
- buf = calloc(1, chunk_size);
+ if (sock <= 0) {
+ if (disconnected) {
+ *disconnected = 1;
+ }
+ goto done;
+ }
- while (1) {
- errno = 0;
- rc = read(sock, buf + len, chunk_size);
- crm_trace("Got %d more bytes. errno=%d", (int)rc, errno);
-
- if (errno == EINTR || errno == EAGAIN) {
- crm_trace("Retry: %d", (int)rc);
- if (rc > 0) {
- len += rc;
- buf = realloc(buf, len + chunk_size);
- CRM_ASSERT(buf != NULL);
- }
+ buf = calloc(1, chunk_size + 1);
+ buf_size = chunk_size;
- } else if (rc < 0) {
- crm_perror(LOG_ERR, "Error receiving message: %d", (int)rc);
- goto bail;
+ while (TRUE) {
+ errno = 0;
+ read_size = buf_size - len;
- } else if (rc == chunk_size) {
- len += rc;
- chunk_size *= 2;
- buf = realloc(buf, len + chunk_size);
- crm_trace("Retry with %d more bytes", (int)chunk_size);
+ /* automatically grow the buffer when needed if max_size is not set.*/
+ if (!max_size && (read_size < (chunk_size / 2))) {
+ buf_size += chunk_size;
+ crm_trace("Grow buffer by %d more bytes. buf is now %d bytes", (int)chunk_size, buf_size);
+ buf = realloc(buf, buf_size + 1);
CRM_ASSERT(buf != NULL);
- } else if (buf[len + rc - 1] != 0) {
- crm_trace("Last char is %d '%c'", buf[len + rc - 1], buf[len + rc - 1]);
- crm_trace("Retry with %d more bytes", (int)chunk_size);
+ read_size = buf_size - len;
+ }
+
+ rc = read(sock, buf + len, chunk_size);
+
+ if (rc > 0) {
+ crm_trace("Got %d more bytes. errno=%d", (int)rc, errno);
len += rc;
- buf = realloc(buf, len + chunk_size);
- CRM_ASSERT(buf != NULL);
+ /* always null terminate buffer, the +1 to alloc always allows for this.*/
+ buf[len] = '\0';
+ }
+ if (max_size && (max_size == read_size)) {
+ crm_trace("Buffer max read size %d met" , max_size);
+ goto done;
+ }
- } else {
- return buf;
+ if (rc > 0) {
+ continue;
+ } else if (rc == 0) {
+ if (disconnected) {
+ *disconnected = 1;
+ }
+ crm_trace("EOF encoutered during read");
+ goto done;
+ }
+
+ /* process errors */
+ if (errno == EINTR) {
+ crm_trace("EINTER encoutered, retry socket read.");
+ } else if (errno == EAGAIN) {
+ crm_trace("non-blocking, exiting read on rc = %d", rc);
+ goto done;
+ } else if (errno <= 0) {
+ if (disconnected) {
+ *disconnected = 1;
+ }
+ crm_debug("Error receiving message: %d", (int)rc);
+ goto done;
}
}
- bail:
- free(buf);
- return NULL;
+done:
+ if (recv_len) {
+ *recv_len = len;
+ }
+ if (!len) {
+ free(buf);
+ buf = NULL;
+ }
+ return buf;
}
-void
-crm_send_remote_msg(void *session, xmlNode * msg, gboolean encrypted)
+static int
+crm_send_remote_msg_raw(void *session, const char *buf, size_t len, gboolean encrypted)
{
+ int rc = -1;
if (encrypted) {
#ifdef HAVE_GNUTLS_GNUTLS_H
- cib_send_tls(session, msg);
+ rc = crm_send_tls(session, buf, len);
#else
CRM_ASSERT(encrypted == FALSE);
#endif
} else {
- cib_send_plaintext(GPOINTER_TO_INT(session), msg);
+ rc = crm_send_plaintext(GPOINTER_TO_INT(session), buf, len);
}
+ return rc;
}
+int
+crm_send_remote_msg(void *session, xmlNode * msg, gboolean encrypted)
+{
+ int rc = -1;
+ char *xml_text = NULL;
+ int len = 0;
+
+ xml_text = dump_xml_unformatted(msg);
+ if (xml_text) {
+ len = strlen(xml_text);
+ } else {
+ crm_err("Invalid XML, can not send msg");
+ return -1;
+ }
+
+ rc = crm_send_remote_msg_raw(session, xml_text, len, encrypted);
+ if (rc < 0) {
+ goto done;
+ }
+ rc = crm_send_remote_msg_raw(session, REMOTE_MSG_TERMINATOR, strlen(REMOTE_MSG_TERMINATOR), encrypted);
+
+done:
+ if (rc < 0) {
+ crm_err("Failed to send remote msg, rc = %d", rc);
+ }
+
+ free(xml_text);
+ return rc;
+}
+
+/*!
+ * \internal
+ * \brief handles the recv buffer and parsing out msgs.
+ * \note new_data is owned by this function once it is passed in.
+ */
xmlNode *
-crm_recv_remote_msg(void *session, gboolean encrypted)
+crm_parse_remote_buffer(char **msg_buf)
{
- char *reply = NULL;
+ char *buf = NULL;
+ char *start = NULL;
+ char *end = NULL;
xmlNode *xml = NULL;
+ if (*msg_buf == NULL) {
+ return NULL;
+ }
+
+ /* take ownership of the buffer */
+ buf = *msg_buf;
+ *msg_buf = NULL;
+
+ /* MSGS are separated by a '\r\n\r\n'. Split a message off the buffer and return it. */
+ start = buf;
+ end = strstr(start, REMOTE_MSG_TERMINATOR);
+
+ while (!xml && end) {
+
+ /* grab the message */
+ end[0] = '\0';
+ end += strlen(REMOTE_MSG_TERMINATOR);
+
+ xml = string2xml(start);
+ if (xml == NULL) {
+ crm_err("Couldn't parse: '%.120s'", start);
+ }
+ start = end;
+ end = strstr(start, REMOTE_MSG_TERMINATOR);
+ }
+
+ if (xml && start) {
+ /* we have msgs left over, save it until next time */
+ *msg_buf = strdup(start);
+ free(buf);
+ } else if (!xml) {
+ /* no msg present */
+ *msg_buf = buf;
+ }
+
+ return xml;
+}
+
+/*!
+ * \internal
+ * \brief Determine if a remote session has data to read
+ *
+ * \retval 0, timeout occured.
+ * \retval positive, data is ready to be read
+ * \retval negative, session has ended
+ */
+int
+crm_recv_remote_ready(void *session, gboolean encrypted, int timeout /* ms */)
+{
+ struct pollfd fds = { 0, };
+ int sock = 0;
+ void *sock_ptr = NULL;
+ int rc = 0;
+ time_t start;
+
+ if (encrypted) {
+#ifdef HAVE_GNUTLS_GNUTLS_H
+ gnutls_session *tls_session = session;
+ sock_ptr = gnutls_transport_get_ptr(*tls_session);
+#else
+ CRM_ASSERT(encrypted == FALSE);
+#endif
+ } else {
+ sock_ptr = session;
+ }
+
+ sock = GPOINTER_TO_INT(sock_ptr);
+ if (sock <= 0) {
+ return -ENOTCONN;
+ }
+
+ start = time(NULL);
+ errno = 0;
+ do {
+ fds.fd = sock;
+ fds.events = POLLIN;
+
+ /* If we got an EINTR while polling, and we have a
+ * specific timeout we are trying to honor, attempt
+ * to adjust the timeout to the closest second. */
+ if (errno == EINTR && (timeout > 0)) {
+ timeout = timeout - ((time(NULL) - start) * 1000);
+ if (timeout < 1000) {
+ timeout = 1000;
+ }
+ }
+
+ rc = poll(&fds, 1, timeout);
+ } while (rc < 0 && errno == EINTR);
+
+ return rc;
+}
+
+char *
+crm_recv_remote_raw(void *session, gboolean encrypted, size_t max_recv, size_t *recv_len, int *disconnected)
+{
+ char *reply = NULL;
+ if (recv_len) {
+ *recv_len = 0;
+ }
+
+ if (disconnected) {
+ *disconnected = 0;
+ }
+
if (encrypted) {
#ifdef HAVE_GNUTLS_GNUTLS_H
- reply = cib_recv_tls(session);
+ reply = crm_recv_tls(session, max_recv, recv_len, disconnected);
#else
CRM_ASSERT(encrypted == FALSE);
#endif
} else {
- reply = cib_recv_plaintext(GPOINTER_TO_INT(session));
+ reply = crm_recv_plaintext(GPOINTER_TO_INT(session), max_recv, recv_len, disconnected);
}
if (reply == NULL || strlen(reply) == 0) {
crm_trace("Empty reply");
+ }
- } else {
- xml = string2xml(reply);
- if (xml == NULL) {
- crm_err("Couldn't parse: '%.120s'", reply);
+ return reply;
+}
+
+/*!
+ * \internal
+ * \brief Read data off the socket until at least one full message is present or timeout occures.
+ * \retval TRUE message read
+ * \retval FALSE full message not read
+ */
+
+gboolean
+crm_recv_remote_msg(void *session, char **recv_buf, gboolean encrypted, int total_timeout /*ms */, int *disconnected)
+{
+ int ret;
+ size_t request_len = 0;
+ time_t start = time(NULL);
+ char *raw_request = NULL;
+ int remaining_timeout = 0;
+
+ if (total_timeout == 0) {
+ total_timeout = 10000;
+ } else if (total_timeout < 0) {
+ total_timeout = 60000;
+ }
+ *disconnected = 0;
+
+ remaining_timeout = total_timeout;
+ while ((remaining_timeout > 0) && !(*disconnected)) {
+
+ /* read some more off the tls buffer if we still have time left. */
+ crm_trace("waiting to receive remote msg, starting timeout %d, remaining_timeout %d", total_timeout, remaining_timeout);
+ ret = crm_recv_remote_ready(session, encrypted, remaining_timeout);
+ raw_request = NULL;
+
+ if (ret == 0) {
+ crm_err("poll timed out (%d ms) while waiting to receive msg", remaining_timeout);
+ return FALSE;
+
+ } else if (ret < 0) {
+ if (errno != EINTR) {
+ crm_debug("poll returned error while waiting for msg, rc: %d, errno: %d", ret, errno);
+ *disconnected = 1;
+ return FALSE;
+ }
+ crm_debug("poll EINTR encountered during poll, retrying");
+ } else {
+ raw_request = crm_recv_remote_raw(session, encrypted, 0, &request_len, disconnected);
+ }
+
+ remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000);
+
+ if (!raw_request) {
+ crm_debug("Empty msg received after poll");
+ continue;
+ }
+
+ if (*recv_buf) {
+ int old_len = strlen(*recv_buf);
+
+ crm_trace("Expanding recv buffer from %d to %d", old_len, old_len+request_len);
+
+ *recv_buf = realloc(*recv_buf, old_len + request_len + 1);
+ memcpy(*recv_buf + old_len, raw_request, request_len);
+ *(*recv_buf+old_len+request_len) = '\0';
+ free(raw_request);
+ } else {
+ *recv_buf = raw_request;
+ }
+
+ if (strstr(*recv_buf, REMOTE_MSG_TERMINATOR)) {
+ return TRUE;
}
}
- free(reply);
- return xml;
+ return FALSE;
}
+
+/*!
+ * \internal
+ * \brief tcp connection to server at specified port
+ * \retval positive, socket fd.
+ * \retval negative, failed to connect.
+ */
+int
+crm_remote_tcp_connect(const char *host, int port)
+{
+ struct addrinfo *res;
+ struct addrinfo *rp;
+ struct addrinfo hints;
+ const char *server = host;
+ int ret_ga;
+ int sock;
+
+ /* getaddrinfo */
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_CANONNAME;
+
+ crm_debug("Looking up %s", server);
+ ret_ga = getaddrinfo(server, NULL, &hints, &res);
+ if (ret_ga) {
+ crm_err("getaddrinfo: %s", gai_strerror(ret_ga));
+ return -1;
+ }
+
+ if (!res || !res->ai_addr) {
+ crm_err("getaddrinfo failed");
+ return -1;
+ }
+
+ for (rp = res; rp != NULL; rp = rp->ai_next) {
+ struct sockaddr *addr = rp->ai_addr;
+ int flag = 0;
+ if (!addr) {
+ continue;
+ }
+
+ if (rp->ai_canonname) {
+ server = res->ai_canonname;
+ }
+ crm_debug("Got address %s for %s", server, host);
+
+ /* create socket */
+ sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
+ if (sock == -1) {
+ crm_err("Socket creation failed for remote client connection.");
+ continue;
+ }
+ if (addr->sa_family == AF_INET6) {
+ struct sockaddr_in6 *addr_in = (struct sockaddr_in6 *) addr;
+ addr_in->sin6_port = htons(port);
+ } else {
+ struct sockaddr_in *addr_in = (struct sockaddr_in *) addr;
+ addr_in->sin_port = htons(port);
+ crm_info("Attempting to connect to remote server at %s:%d", inet_ntoa(addr_in->sin_addr), port);
+ }
+
+ if (connect(sock, rp->ai_addr, rp->ai_addrlen) == 0) {
+ if ((flag = fcntl(sock, F_GETFL)) >= 0) {
+ if (fcntl(sock, F_SETFL, flag | O_NONBLOCK) < 0) {
+ crm_err( "fcntl() write failed");
+ close(sock);
+ sock = -1;
+ continue;
+ }
+ }
+ break; /* Success */
+ }
+
+ close(sock);
+ sock = -1;
+ }
+ freeaddrinfo(res);
+
+ return sock;
+}
+
View
2 tools/crm_mon.c
@@ -275,7 +275,7 @@ cib_connect(gboolean full)
if (rc == pcmk_ok) {
rc = cib->cmds->set_connection_dnotify(cib, mon_cib_connection_destroy);
if (rc == -EPROTONOSUPPORT) {
- print_as("Notification setup failed, won't be able to reconnect after failure");
+ print_as("Notification setup not supported, won't be able to reconnect after failure");
if (as_console) {
sleep(2);
}

0 comments on commit 564f7cc

Please sign in to comment.
Something went wrong with that request. Please try again.