diff --git a/configure.ac b/configure.ac index b8556270f18b..a5b6d5c79a6c 100644 --- a/configure.ac +++ b/configure.ac @@ -421,6 +421,7 @@ AC_CONFIG_FILES( \ src/common/libschedutil/Makefile \ src/common/libeventlog/Makefile \ src/common/libioencode/Makefile \ + src/common/librouter/Makefile \ src/bindings/Makefile \ src/bindings/lua/Makefile \ src/bindings/python/Makefile \ diff --git a/doc/man3/Makefile.am b/doc/man3/Makefile.am index 9aff29d2430f..cc03b026674e 100644 --- a/doc/man3/Makefile.am +++ b/doc/man3/Makefile.am @@ -12,7 +12,6 @@ MAN3_FILES_PRIMARY = \ flux_event_publish.3 \ flux_pollevents.3 \ flux_msg_encode.3 \ - flux_msg_sendfd.3 \ flux_rpc.3 \ flux_mrpc.3 \ flux_get_rank.3 \ @@ -70,7 +69,6 @@ MAN3_FILES_SECONDARY = \ flux_event_publish_get_seq.3 \ flux_pollfd.3 \ flux_msg_decode.3 \ - flux_msg_recvfd.3 \ flux_get_size.3 \ flux_attr_set.3 \ flux_set_reactor.3 \ @@ -213,7 +211,6 @@ flux_event_publish_raw.3: flux_event_publish.3 flux_event_publish_get_seq.3: flux_event_publish.3 flux_pollfd.3: flux_pollevents.3 flux_msg_decode.3: flux_msg_encode.3 -flux_msg_recvfd.3: flux_msg_sendfd.3 flux_get_size.3: flux_get_rank.3 flux_attr_set.3: flux_attr_get.3 flux_set_reactor.3: flux_get_reactor.3 diff --git a/doc/man3/flux_msg_sendfd.adoc b/doc/man3/flux_msg_sendfd.adoc deleted file mode 100644 index bdc2507e6019..000000000000 --- a/doc/man3/flux_msg_sendfd.adoc +++ /dev/null @@ -1,97 +0,0 @@ -flux_msg_sendfd(3) -================== -:doctype: manpage - - -NAME ----- -flux_msg_sendfd, flux_msg_recvfd - send/recv a Flux message by file descriptor - - -SYNOPSIS --------- -#include - -int flux_msg_sendfd (int fd, const flux_msg_t *msg, - struct flux_msg_iobuf *iobuf); - -flux_msg_t *flux_msg_recvfd (int fd, struct flux_msg_iobuf *iobuf); - -void flux_msg_iobuf_init (struct flux_msg_iobuf *iobuf); - -void flux_msg_iobuf_clean (struct flux_msg_iobuf *iobuf); - -DESCRIPTION ------------ - -`flux_msg_sendfd()` encodes and sends a message on _fd_. - -`flux_msg_recvfd()` receives and decodes a message on _fd_. -The caller must dispose of the returned message with flux_msg_destroy(). - -If the file descriptor is open in non-blocking mode, _iobuf_ may passed in -to capture intermediate state on internal EWOULDBLOCK or EAGAIN errors. -The function may be called again with identical arguments to continue -the operation once the file descriptor is ready. - -_iobuf_ should first be initialized with `flux_msg_iobuf_init()`. - -While EWOULDBLOCK or EAGAIN handling is in progress, and only then, _iobuf_ -contains internally allocated storage. If you need to abort an -in-progress call, `flux_msg_iobuf_clean()` should be called to free -this internal storage. This function is a no-op if there is no storage -allocated. - -ENCODING --------- - -Message are internally encoded with `flux_msg_encode(3)` and decoded -with `flux_msg_decode(3)`. In addition, a 4 byte message size is encoded -before the encoded message, in network byte order. - - -RETURN VALUE ------------- - -`flux_msg_sendfd()` returns 0 on success. On error, -1 is returned, -and errno is set appropriately. - -`flux_msg_recvfd()` returns the decoded message on success. On error, NULL -is returned, and errno is set appropriately. - -ERRORS ------- - -EINVAL:: -Some arguments were invalid. - -ENOMEM:: -Out of memory. - -EAGAIN or EWOULDBLOCK:: -See ERRORS in read(2) and write(2). - -EPROTO:: -Message was malformed; EOF was received on socket; or EWOULDBLOCK, EAGAIN -was internally converted to EPROTO because _iobuf_ was NULL. -After an EPROTO, the file descriptor may not be positioned at a message -boundary, and should not be used. - -AUTHOR ------- -This page is maintained by the Flux community. - - -RESOURCES ---------- -Github: - - -COPYRIGHT ---------- -include::COPYRIGHT.adoc[] - - -SEE ALSO --------- -flux_msg_encode(3) diff --git a/src/cmd/builtin/proxy.c b/src/cmd/builtin/proxy.c index 691415d29667..eea207a4d682 100644 --- a/src/cmd/builtin/proxy.c +++ b/src/cmd/builtin/proxy.c @@ -33,6 +33,7 @@ #include "src/common/libutil/oom.h" #include "src/common/libutil/cleanup.h" #include "src/common/libutil/fdutils.h" +#include "src/common/librouter/sendfd.h" #define LISTEN_BACKLOG 5 @@ -63,8 +64,8 @@ typedef struct { int wfd; flux_watcher_t *inw; flux_watcher_t *outw; - struct flux_msg_iobuf inbuf; - struct flux_msg_iobuf outbuf; + struct iobuf inbuf; + struct iobuf outbuf; zlist_t *outqueue; /* queue of outbound flux_msg_t */ proxy_ctx_t *ctx; zhash_t *disconnect_notify; @@ -137,8 +138,8 @@ static client_t * client_create (proxy_ctx_t *ctx, int rfd, int wfd) goto error; } flux_watcher_start (c->inw); - flux_msg_iobuf_init (&c->inbuf); - flux_msg_iobuf_init (&c->outbuf); + iobuf_init (&c->inbuf); + iobuf_init (&c->outbuf); if (fd_set_nonblocking (c->rfd) < 0) { flux_log_error (h, "fd_set_nonblocking"); goto error; @@ -159,7 +160,7 @@ static int client_send_try (client_t *c) flux_msg_t *msg = zlist_head (c->outqueue); if (msg) { - if (flux_msg_sendfd (c->wfd, msg, &c->outbuf) < 0) { + if (sendfd (c->wfd, msg, &c->outbuf) < 0) { if (errno != EWOULDBLOCK && errno != EAGAIN) return -1; //flux_log (c->ctx->h, LOG_DEBUG, "send: client not ready"); @@ -424,11 +425,11 @@ static void client_destroy (client_t *c) } flux_watcher_stop (c->outw); flux_watcher_destroy (c->outw); - flux_msg_iobuf_clean (&c->outbuf); + iobuf_clean (&c->outbuf); flux_watcher_stop (c->inw); flux_watcher_destroy (c->inw); - flux_msg_iobuf_clean (&c->inbuf); + iobuf_clean (&c->inbuf); if (c->rfd != -1) close (c->rfd); @@ -509,13 +510,13 @@ static void client_read_cb (flux_reactor_t *r, flux_watcher_t *w, * EWOULDBLOCK, EAGAIN stores state in c->inbuf for continuation */ //flux_log (h, LOG_DEBUG, "recv: client ready"); - if (!(msg = flux_msg_recvfd (c->rfd, &c->inbuf))) { + if (!(msg = recvfd (c->rfd, &c->inbuf))) { if (errno == EWOULDBLOCK || errno == EAGAIN) { //flux_log (h, LOG_DEBUG, "recv: client not ready"); return; } if (errno != ECONNRESET) - flux_log_error (h, "flux_msg_recvfd"); + flux_log_error (h, "recvfd"); goto disconnect; } if (flux_msg_get_type (msg, &type) < 0) { diff --git a/src/common/Makefile.am b/src/common/Makefile.am index daa2f4f724c8..7c2f2eff701e 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -13,7 +13,8 @@ SUBDIRS = libtap \ libaggregate \ libschedutil \ libeventlog \ - libioencode + libioencode \ + librouter AM_CFLAGS = $(WARNING_CFLAGS) $(CODE_COVERAGE_CFLAGS) AM_LDFLAGS = $(CODE_COVERAGE_LIBS) @@ -31,6 +32,7 @@ libflux_internal_la_LIBADD = \ $(builddir)/libtomlc99/libtomlc99.la \ $(builddir)/libeventlog/libeventlog.la \ $(builddir)/libioencode/libioencode.la \ + $(builddir)/librouter/librouter.la \ $(JANSSON_LIBS) $(ZMQ_LIBS) $(LIBPTHREAD) $(LIBUTIL) \ $(LIBDL) $(LIBRT) $(FLUX_SECURITY_LIBS) $(LIBSODIUM_LIBS) libflux_internal_la_LDFLAGS = $(san_ld_zdef_flag) diff --git a/src/common/libflux/message.c b/src/common/libflux/message.c index de6d88f2270a..5939117f4352 100644 --- a/src/common/libflux/message.c +++ b/src/common/libflux/message.c @@ -1418,133 +1418,6 @@ void flux_msg_fprint (FILE *f, const flux_msg_t *msg) zframe_fprint (proto, prefix, f); } -#define IOBUF_MAGIC 0xffee0012 - -void flux_msg_iobuf_init (struct flux_msg_iobuf *iobuf) -{ - memset (iobuf, 0, sizeof (*iobuf)); -} - -void flux_msg_iobuf_clean (struct flux_msg_iobuf *iobuf) -{ - if (iobuf->buf && iobuf->buf != iobuf->buf_fixed) - free (iobuf->buf); - memset (iobuf, 0, sizeof (*iobuf)); -} - -int flux_msg_sendfd (int fd, const flux_msg_t *msg, - struct flux_msg_iobuf *iobuf) -{ - struct flux_msg_iobuf local; - struct flux_msg_iobuf *io = iobuf ? iobuf : &local; - int rc = -1; - - if (fd < 0 || !msg) { - errno = EINVAL; - goto done; - } - if (!iobuf) - flux_msg_iobuf_init (&local); - if (!io->buf) { - io->size = flux_msg_encode_size (msg) + 8; - if (io->size <= sizeof (io->buf_fixed)) - io->buf = io->buf_fixed; - else if (!(io->buf = malloc (io->size))) { - errno = ENOMEM; - goto done; - } - *(uint32_t *)&io->buf[0] = IOBUF_MAGIC; - *(uint32_t *)&io->buf[4] = htonl (io->size - 8); - if (flux_msg_encode (msg, &io->buf[8], io->size - 8) < 0) - goto done; - io->done = 0; - } - do { - rc = write (fd, io->buf + io->done, io->size - io->done); - if (rc < 0) - goto done; - io->done += rc; - } while (io->done < io->size); - rc = 0; -done: - if (iobuf) { - if (rc == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) - flux_msg_iobuf_clean (iobuf); - } else { - if (rc < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) - errno = EPROTO; - flux_msg_iobuf_clean (&local); - } - return rc; -} - -flux_msg_t *flux_msg_recvfd (int fd, struct flux_msg_iobuf *iobuf) -{ - struct flux_msg_iobuf local; - struct flux_msg_iobuf *io = iobuf ? iobuf : &local; - flux_msg_t *msg = NULL; - int rc = -1; - - if (fd < 0) { - errno = EINVAL; - goto done; - } - if (!iobuf) - flux_msg_iobuf_init (&local); - if (!io->buf) { - io->buf = io->buf_fixed; - io->size = sizeof (io->buf_fixed); - } - do { - if (io->done < 8) { - rc = read (fd, io->buf + io->done, 8 - io->done); - if (rc < 0) - goto done; - if (rc == 0) { - errno = ECONNRESET; - goto done; - } - io->done += rc; - if (io->done == 8) { - if (*(uint32_t *)&io->buf[0] != IOBUF_MAGIC) { - errno = EPROTO; - goto done; - } - io->size = ntohl (*(uint32_t *)&io->buf[4]) + 8; - if (io->size > sizeof (io->buf_fixed)) { - if (!(io->buf = malloc (io->size))) { - errno = EPROTO; - goto done; - } - memcpy (io->buf, io->buf_fixed, 8); - } - } - } - if (io->done >= 8 && io->done < io->size) { - rc = read (fd, io->buf + io->done, io->size - io->done); - if (rc < 0) - goto done; - if (rc == 0) { - errno = ECONNRESET; - goto done; - } - io->done += rc; - } - } while (io->done < io->size); - if (!(msg = flux_msg_decode (io->buf + 8, io->size - 8))) - goto done; -done: - if (iobuf) { - if (msg != NULL || (errno != EAGAIN && errno != EWOULDBLOCK)) - flux_msg_iobuf_clean (iobuf); - } else { - if (rc < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) - errno = EPROTO; - flux_msg_iobuf_clean (&local); - } - return msg; -} - int flux_msg_sendzsock (void *sock, const flux_msg_t *msg) { int rc = -1; diff --git a/src/common/libflux/message.h b/src/common/libflux/message.h index 1d341a9dee8b..c4737e5e9b9b 100644 --- a/src/common/libflux/message.h +++ b/src/common/libflux/message.h @@ -77,13 +77,6 @@ struct flux_match { .topic_glob = NULL, \ } -struct flux_msg_iobuf { - uint8_t *buf; - size_t size; - size_t done; - uint8_t buf_fixed[4096]; -}; - /* Create a new Flux message. * Returns new message or null on failure, with errno set (e.g. ENOMEM, EINVAL) * Caller must destroy message with flux_msg_destroy() or equivalent. @@ -123,19 +116,6 @@ int flux_msg_frames (const flux_msg_t *msg); */ flux_msg_t *flux_msg_decode (const void *buf, size_t size); -/* Send message to file descriptor. - * iobuf captures intermediate state to make EAGAIN/EWOULDBLOCK restartable. - * Returns 0 on success, -1 on failure with errno set. - */ -int flux_msg_sendfd (int fd, const flux_msg_t *msg, - struct flux_msg_iobuf *iobuf); - -/* Receive message from file descriptor. - * iobuf captures intermediate state to make EAGAIN/EWOULDBLOCK restartable. - * Returns message on success, NULL on failure with errno set. - */ -flux_msg_t *flux_msg_recvfd (int fd, struct flux_msg_iobuf *iobuf); - /* Send message to zeromq socket. * Returns 0 on success, -1 on failure with errno set. */ @@ -146,15 +126,6 @@ int flux_msg_sendzsock (void *dest, const flux_msg_t *msg); */ flux_msg_t *flux_msg_recvzsock (void *dest); -/* Initialize iobuf members. - */ -void flux_msg_iobuf_init (struct flux_msg_iobuf *iobuf); - -/* Free any internal memory allocated to iobuf. - * Only necessary if destroying with partial I/O in progress. - */ -void flux_msg_iobuf_clean (struct flux_msg_iobuf *iobuf); - /* Get/set message type * For FLUX_MSGTYPE_REQUEST: set_type initializes nodeid to FLUX_NODEID_ANY * For FLUX_MSGTYPE_RESPONSE: set_type initializes errnum to 0 diff --git a/src/common/libflux/test/message.c b/src/common/libflux/test/message.c index d3826dc6043c..b68b201616cc 100644 --- a/src/common/libflux/test/message.c +++ b/src/common/libflux/test/message.c @@ -519,39 +519,6 @@ void check_encode (void) flux_msg_destroy (msg2); } -/* Send a small message over a blocking pipe. - * We assume that there's enough buffer to do this in one go. - */ -void check_sendfd (void) -{ - int pfd[2]; - flux_msg_t *msg, *msg2; - const char *topic; - int type; - - ok (pipe2 (pfd, O_CLOEXEC) == 0, - "got blocking pipe"); - ok ((msg = flux_msg_create (FLUX_MSGTYPE_REQUEST)) != NULL, - "flux_msg_create works"); - ok (flux_msg_set_topic (msg, "foo.bar") == 0, - "flux_msg_set_topic works"); - ok (flux_msg_sendfd (pfd[1], msg, NULL) == 0, - "flux_msg_sendfd works"); - ok ((msg2 = flux_msg_recvfd (pfd[0], NULL)) != NULL, - "flux_msg_recvfd works"); - ok (flux_msg_get_type (msg2, &type) == 0 && type == FLUX_MSGTYPE_REQUEST, - "decoded expected message type"); - ok (flux_msg_get_topic (msg2, &topic) == 0 && !strcmp (topic, "foo.bar"), - "decoded expected topic string"); - ok (flux_msg_has_payload (msg2) == false, - "decoded expected (lack of) payload"); - - flux_msg_destroy (msg); - flux_msg_destroy (msg2); - close (pfd[1]); - close (pfd[0]); -} - void check_sendzsock (void) { zsock_t *zsock[2] = { NULL, NULL }; @@ -876,7 +843,6 @@ int main (int argc, char *argv[]) check_cmp (); check_encode (); - check_sendfd (); check_sendzsock (); check_params (); diff --git a/src/common/librouter/Makefile.am b/src/common/librouter/Makefile.am new file mode 100644 index 000000000000..e4e5fb6db5cb --- /dev/null +++ b/src/common/librouter/Makefile.am @@ -0,0 +1,43 @@ +AM_CFLAGS = \ + $(WARNING_CFLAGS) \ + $(CODE_COVERAGE_CFLAGS) + +AM_LDFLAGS = \ + $(CODE_COVERAGE_LDFLAGS) + +AM_CPPFLAGS = \ + -I$(top_srcdir) \ + -I$(top_srcdir)/src/include \ + -I$(top_builddir)/src/common/libflux \ + $(ZMQ_CFLAGS) + +noinst_LTLIBRARIES = \ + librouter.la + +librouter_la_SOURCES = \ + sendfd.h \ + sendfd.c + +TESTS = \ + test_sendfd.t + +check_PROGRAMS = \ + $(TESTS) + +TEST_EXTENSIONS = .t +T_LOG_DRIVER = env AM_TAP_AWK='$(AWK)' $(SHELL) \ + $(top_srcdir)/config/tap-driver.sh + +test_ldadd = \ + $(top_builddir)/src/common/librouter/librouter.la \ + $(top_builddir)/src/common/libflux-internal.la \ + $(top_builddir)/src/common/libflux-core.la \ + $(top_builddir)/src/common/libtap/libtap.la + +test_cppflags = \ + $(AM_CPPFLAGS) \ + -I$(top_srcdir)/src/common/libtap + +test_sendfd_t_SOURCES = test/sendfd.c +test_sendfd_t_CPPFLAGS = $(test_cppflags) +test_sendfd_t_LDADD = $(test_ldadd) diff --git a/src/common/librouter/sendfd.c b/src/common/librouter/sendfd.c new file mode 100644 index 000000000000..ce91ec8aec2c --- /dev/null +++ b/src/common/librouter/sendfd.c @@ -0,0 +1,181 @@ +/************************************************************\ + * Copyright 2014 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* sendfd.c - send and receive flux_msg_t over file descriptors + * + * These functions use the following encoding for each message: + * + * 4 bytes - IOBUF_MAGIC + * 4 bytes - size in network byte order, includes magic and size + * N bytes - message encoded with flux_msg_encode() + * + * These functions work with file descriptors configured for either + * blocking or non-blocking modes. In blocking mode, the iobuf + * argument may be set to NULL. In non-blocking mode, an iobuf should + * be provided to allow messages to be assembled across multiple calls. + * + * In non-blocking mode, sendfd() or recfd() may fail with EWOULDBLOCK + * or EAGAIN. This should not be treated as an error. When poll(2) or + * equivalent indicates that the file descriptor is ready again, sendfd() + * or recvfd() may be called again, continuing I/O to/from the same iobuf. + * + * Separate iobufs are required for sendfd() and recvfd(). + * Call iobuf_init() on an iobuf before its first use. + * Call iobuf_clean() on an iobuf after its last use. + * The iobuf is managed by sendfd() and recvfd() across multiple messages. + * + * Notes: + * + * - to decrease small message latency, the iobuf contains a fixed size + * static buffer. When a message requires more than this fixed size for + * assembly, a dynamic buffer is allocated temporarily while that message + * is assembled, then it is freed. The static buffer is sized somewhat + * arbitrarily at 4K. + * + * - sendfd/recvfd do not encrypt messages, therefore this transport + * is only appropriate for use on AF_LOCAL sockets or on file descriptors + * tunneled through a secure channel. + */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include + +#include "sendfd.h" + +#define IOBUF_MAGIC 0xffee0012 + +void iobuf_init (struct iobuf *iobuf) +{ + memset (iobuf, 0, sizeof (*iobuf)); +} + +void iobuf_clean (struct iobuf *iobuf) +{ + if (iobuf->buf && iobuf->buf != iobuf->buf_fixed) + free (iobuf->buf); + memset (iobuf, 0, sizeof (*iobuf)); +} + +int sendfd (int fd, const flux_msg_t *msg, struct iobuf *iobuf) +{ + struct iobuf local; + struct iobuf *io = iobuf ? iobuf : &local; + int rc = -1; + + if (fd < 0 || !msg) { + errno = EINVAL; + return -1; + } + if (!iobuf) + iobuf_init (&local); + if (!io->buf) { + io->size = flux_msg_encode_size (msg) + 8; + if (io->size <= sizeof (io->buf_fixed)) + io->buf = io->buf_fixed; + else if (!(io->buf = malloc (io->size))) + goto done; + *(uint32_t *)&io->buf[0] = IOBUF_MAGIC; + *(uint32_t *)&io->buf[4] = htonl (io->size - 8); + if (flux_msg_encode (msg, &io->buf[8], io->size - 8) < 0) + goto done; + io->done = 0; + } + do { + rc = write (fd, io->buf + io->done, io->size - io->done); + if (rc < 0) + goto done; + io->done += rc; + } while (io->done < io->size); + rc = 0; +done: + if (iobuf) { + if (rc == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) + iobuf_clean (iobuf); + } else { + if (rc < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + errno = EPROTO; + iobuf_clean (&local); + } + return rc; +} + +flux_msg_t *recvfd (int fd, struct iobuf *iobuf) +{ + struct iobuf local; + struct iobuf *io = iobuf ? iobuf : &local; + flux_msg_t *msg = NULL; + int rc = -1; + + if (fd < 0) { + errno = EINVAL; + return NULL; + } + if (!iobuf) + iobuf_init (&local); + if (!io->buf) { + io->buf = io->buf_fixed; + io->size = sizeof (io->buf_fixed); + } + do { + if (io->done < 8) { + rc = read (fd, io->buf + io->done, 8 - io->done); + if (rc < 0) + goto done; + if (rc == 0) { + errno = ECONNRESET; + goto done; + } + io->done += rc; + if (io->done == 8) { + if (*(uint32_t *)&io->buf[0] != IOBUF_MAGIC) { + errno = EPROTO; + goto done; + } + io->size = ntohl (*(uint32_t *)&io->buf[4]) + 8; + if (io->size > sizeof (io->buf_fixed)) { + if (!(io->buf = malloc (io->size))) + goto done; + memcpy (io->buf, io->buf_fixed, 8); + } + } + } + if (io->done >= 8 && io->done < io->size) { + rc = read (fd, io->buf + io->done, io->size - io->done); + if (rc < 0) + goto done; + if (rc == 0) { + errno = ECONNRESET; + goto done; + } + io->done += rc; + } + } while (io->done < io->size); + if (!(msg = flux_msg_decode (io->buf + 8, io->size - 8))) + goto done; +done: + if (iobuf) { + if (msg != NULL || (errno != EAGAIN && errno != EWOULDBLOCK)) + iobuf_clean (iobuf); + } else { + if (rc < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + errno = EPROTO; + iobuf_clean (&local); + } + return msg; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/common/librouter/sendfd.h b/src/common/librouter/sendfd.h new file mode 100644 index 000000000000..b36023728bd5 --- /dev/null +++ b/src/common/librouter/sendfd.h @@ -0,0 +1,49 @@ +/************************************************************\ + * Copyright 2014 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _ROUTER_SENDFD_H +#define _ROUTER_SENDFD_H + +#include + +struct iobuf { + uint8_t *buf; + size_t size; + size_t done; + uint8_t buf_fixed[4096]; +}; + +/* Send message to file descriptor. + * iobuf captures intermediate state to make EAGAIN/EWOULDBLOCK restartable. + * Returns 0 on success, -1 on failure with errno set. + */ +int sendfd (int fd, const flux_msg_t *msg, struct iobuf *iobuf); + +/* Receive message from file descriptor. + * iobuf captures intermediate state to make EAGAIN/EWOULDBLOCK restartable. + * Returns message on success, NULL on failure with errno set. + */ +flux_msg_t *recvfd (int fd, struct iobuf *iobuf); + +/* Initialize iobuf members. + */ +void iobuf_init (struct iobuf *iobuf); + +/* Free any internal memory allocated to iobuf. + * Only necessary if destroying with partial I/O in progress. + */ +void iobuf_clean (struct iobuf *iobuf); + +#endif /* !_ROUTER_SENDFD_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/common/librouter/test/sendfd.c b/src/common/librouter/test/sendfd.c new file mode 100644 index 000000000000..600b2f08ccb7 --- /dev/null +++ b/src/common/librouter/test/sendfd.c @@ -0,0 +1,336 @@ +/************************************************************\ + * Copyright 2014 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include +#include + +#include + +#include "src/common/librouter/sendfd.h" +#include "src/common/libutil/fdutils.h" +#include "src/common/libtap/tap.h" + +/* Send a small message over a blocking pipe. + * We assume that there's enough buffer to do this in one go. + */ +void test_basic (void) +{ + int pfd[2]; + flux_msg_t *msg, *msg2; + const char *topic; + const char *payload = NULL; + + if (pipe2 (pfd, O_CLOEXEC) < 0) + BAIL_OUT ("pipe2 failed"); + if (!(msg = flux_request_encode ("foo.bar", NULL))) + BAIL_OUT ("flux_request_encode failed"); + ok (sendfd (pfd[1], msg, NULL) == 0, + "sendfd works"); + ok ((msg2 = recvfd (pfd[0], NULL)) != NULL, + "recvfd works"); + ok (flux_request_decode (msg2, &topic, &payload) == 0, + "received request can be decoded"); + ok (!strcmp (topic, "foo.bar"), + "decoded request has expected topic string"); + ok (payload == NULL, + "decoded request has expected (lack of) payload"); + + flux_msg_destroy (msg); + flux_msg_destroy (msg2); + close (pfd[1]); + close (pfd[0]); +} + +/* Send a large (>4k static buffer) message over a blocking pipe. + */ +void test_large (void) +{ + int pfd[2]; + flux_msg_t *msg, *msg2; + char buf[8192]; + const char *topic; + const void *buf2; + int buf2len; + + memset (buf, 0x0f, sizeof (buf)); + + if (pipe2 (pfd, O_CLOEXEC) < 0) + BAIL_OUT ("pipe2 failed"); + if (!(msg = flux_request_encode_raw ("foo.bar", buf, sizeof (buf)))) + BAIL_OUT ("flux_request_encode failed"); + ok (sendfd (pfd[1], msg, NULL) == 0, + "sendfd works"); + ok ((msg2 = recvfd (pfd[0], NULL)) != NULL, + "recvfd works"); + ok (flux_request_decode_raw (msg2, &topic, &buf2, &buf2len) == 0, + "received request can be decoded"); + ok (!strcmp (topic, "foo.bar"), + "decoded request has expected topic string"); + ok (buf2 != NULL + && buf2len == sizeof (buf) + && memcmp (buf, buf2, buf2len) == 0, + "decoded request has expected payload"); + + flux_msg_destroy (msg); + flux_msg_destroy (msg2); + close (pfd[1]); + close (pfd[0]); +} + +/* Close the sending end of a blocking pipe and ensure the + * receiving end gets ECONNRESET. + */ +void test_eof (void) +{ + int pfd[2]; + + if (pipe2 (pfd, O_CLOEXEC) < 0) + BAIL_OUT ("pipe2 failed"); + close (pfd[1]); + errno = 0; + ok (recvfd (pfd[0], NULL) == NULL && errno == ECONNRESET, + "recvfd fails with ECONNRESET when sender closes pipe"); + close (pfd[0]); +} + +struct io { + zlist_t *queue; + struct iobuf iobuf; + int fd; + flux_watcher_t *w; + int max; +}; + +void recv_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) +{ + struct io *io = arg; + + if ((revents & FLUX_POLLERR)) + BAIL_OUT ("recv_cb POLLERR"); + if ((revents & FLUX_POLLIN)) { + flux_msg_t *msg; + if (!(msg = recvfd (io->fd, &io->iobuf))) { + if (errno == EWOULDBLOCK || errno == EAGAIN) { + diag ("recv EWOULDBLOCK"); + return; + } + BAIL_OUT ("recvfd error: %s", strerror (errno)); + } + if (zlist_append (io->queue, msg) < 0) + BAIL_OUT ("zlist_append failed"); + if (zlist_size (io->queue) == io->max) { + diag ("recv queue full, stopping receiver"); + flux_watcher_stop (io->w); + } + } +} + +void send_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) +{ + struct io *io = arg; + + if ((revents & FLUX_POLLERR)) + BAIL_OUT ("recv_cb POLLERR"); + if ((revents & FLUX_POLLOUT)) { + flux_msg_t *msg; + + if ((msg = zlist_first (io->queue))) { + if (sendfd (io->fd, msg, &io->iobuf) < 0) { + if (errno == EWOULDBLOCK || errno == EAGAIN) { + diag ("send EWOULDBLOCK"); + return; + } + BAIL_OUT ("sendfd error: %s", strerror (errno)); + } + (void)zlist_pop (io->queue); + flux_msg_destroy (msg); + } + else { + diag ("send queue empty, stopping sender"); + flux_watcher_stop (io->w); + } + } +} + +void io_destroy (struct io *io) +{ + if (io) { + int saved_errno = errno; + if (io->queue) { + flux_msg_t *msg; + while ((msg = zlist_pop (io->queue))) + flux_msg_destroy (msg); + zlist_destroy (&io->queue); + } + flux_watcher_destroy (io->w); + iobuf_clean (&io->iobuf); + free (io); + errno = saved_errno; + } +} + +struct io *io_create (flux_reactor_t *r, + int fd, + int flags, + flux_watcher_f cb) +{ + struct io *io; + if (!(io = calloc (1, sizeof (*io)))) + return NULL; + iobuf_init (&io->iobuf); + if (!(io->queue = zlist_new ())) + goto error; + io->fd = fd; + if (fd_set_nonblocking (fd) < 0) + goto error; + if (!(io->w = flux_fd_watcher_create (r, fd, flags, cb, io))) + goto error; + flux_watcher_start (io->w); + return io; +error: + io_destroy (io); + return NULL; +} + +/* Enqueue 'count' messages with payload 'size'. + * Set up nonblocking sender and receiver. + * Run the reactor: + * - sender sends all enqueued messages + * - receiver enqueues all recived messages + * Verify that messages are all received intact. + */ +void test_nonblock (int size, int count) +{ + int pfd[2]; + struct io *iow; + struct io *ior; + flux_reactor_t *r; + int i; + char *buf; + int errors; + flux_msg_t *msg; + + if (!(buf = malloc (size))) + BAIL_OUT ("malloc failed"); + memset (buf, 0xf0, size); + + if (!(r = flux_reactor_create (0))) + BAIL_OUT ("flux_reactor_create failed"); + if (pipe2 (pfd, O_CLOEXEC) < 0) + BAIL_OUT ("pipe2 failed"); + if (!(iow = io_create (r, pfd[1], FLUX_POLLOUT, send_cb))) + BAIL_OUT ("io_create failed: %s", flux_strerror (errno)); + if (!(ior = io_create (r, pfd[0], FLUX_POLLIN, recv_cb))) + BAIL_OUT ("io_create failed: %s", flux_strerror (errno)); + + for (i = 0; i < count; i++) { + if (!(msg = flux_request_encode_raw ("foo.bar", buf, size))) + BAIL_OUT ("flux_request_encode failed"); + if (zlist_append (iow->queue, msg) < 0) + BAIL_OUT ("zlist_append failed"); + } + ior->max = count; + + diag ("messages enqueued, starting reactor", count); + + ok (flux_reactor_run (r, 0) == 0, + "nonblock %d,%d: reactor ran", count, size); + + ok (zlist_size (ior->queue) == count, + "nonblock %d,%d: all messages received", + count, + size); + + errors = 0; + while ((msg = zlist_pop (ior->queue))) { + const char *topic; + const void *buf2; + int buf2len; + + if (flux_request_decode_raw (msg, &topic, &buf2, &buf2len) < 0) { + diag ("flux_request_decode_raw: %s", flux_strerror (errno)); + errors++; + goto next; + } + if (strcmp (topic, "foo.bar") != 0) { + diag ("decoded wrong topic: %s", topic); + errors++; + goto next; + } + if (buf2len != size || memcmp (buf, buf2, buf2len) != 0) { + diag ("decoded payload incorrectly"); + errors++; + goto next; + } +next: + flux_msg_destroy (msg); + } + + ok (errors == 0, + "nonblock %d,%d: received messages are intact", + count, + size); + + io_destroy (iow); + io_destroy (ior); + close (pfd[1]); + close (pfd[0]); + flux_reactor_destroy (r); + free (buf); +} + +void test_inval (void) +{ + flux_msg_t *msg; + + if (!(msg = flux_msg_create (FLUX_MSGTYPE_REQUEST))) + BAIL_OUT ("flux_msg_create failed"); + + errno = 0; + ok (recvfd (-1, NULL) == NULL && errno == EINVAL, + "recvfd fd=-1 fails with EINVAL"); + + errno = 0; + ok (sendfd (-1, msg, NULL) < 0 && errno == EINVAL, + "sendfd fd=-1 fails with EINVAL"); + errno = 0; + ok (sendfd (0, NULL, NULL) < 0 && errno == EINVAL, + "senfd msg=NULL fails with EINVAL"); + + flux_msg_destroy (msg); +} + +int main (int argc, char *argv[]) +{ + plan (NO_PLAN); + + test_basic (); + test_large (); + test_eof (); + test_nonblock (1024, 1024); + test_nonblock (4096, 256); + test_nonblock (16384, 64); + test_nonblock (1048586, 1); + test_inval (); + + done_testing(); + return (0); +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/connectors/local/local.c b/src/connectors/local/local.c index 6a6e771267ae..1646ba8418d9 100644 --- a/src/connectors/local/local.c +++ b/src/connectors/local/local.c @@ -26,14 +26,15 @@ #include "src/common/libutil/log.h" #include "src/common/libutil/macros.h" #include "src/common/libutil/fdutils.h" +#include "src/common/librouter/sendfd.h" #define CTX_MAGIC 0xf434aaab typedef struct { int magic; int fd; int fd_nonblock; - struct flux_msg_iobuf outbuf; - struct flux_msg_iobuf inbuf; + struct iobuf outbuf; + struct iobuf inbuf; uint32_t testing_userid; uint32_t testing_rolemask; flux_t *h; @@ -88,7 +89,7 @@ static int send_normal (local_ctx_t *c, const flux_msg_t *msg, int flags) { if (set_nonblock (c, (flags & FLUX_O_NONBLOCK)) < 0) return -1; - if (flux_msg_sendfd (c->fd, msg, &c->outbuf) < 0) + if (sendfd (c->fd, msg, &c->outbuf) < 0) return -1; return 0; } @@ -129,7 +130,7 @@ static flux_msg_t *op_recv (void *impl, int flags) if (set_nonblock (c, (flags & FLUX_O_NONBLOCK)) < 0) return NULL; - return flux_msg_recvfd (c->fd, &c->inbuf); + return recvfd (c->fd, &c->inbuf); } static int op_event (void *impl, const char *topic, const char *msg_topic) @@ -197,8 +198,8 @@ static void op_fini (void *impl) local_ctx_t *c = impl; assert (c->magic == CTX_MAGIC); - flux_msg_iobuf_clean (&c->outbuf); - flux_msg_iobuf_clean (&c->inbuf); + iobuf_clean (&c->outbuf); + iobuf_clean (&c->inbuf); if (c->fd >= 0) (void)close (c->fd); c->magic = ~CTX_MAGIC; @@ -284,8 +285,8 @@ flux_t *connector_init (const char *path, int flags) errno = e; goto error; } - flux_msg_iobuf_init (&c->outbuf); - flux_msg_iobuf_init (&c->inbuf); + iobuf_init (&c->outbuf); + iobuf_init (&c->inbuf); if (!(c->h = flux_handle_create (c, &handle_ops, flags))) goto error; return c->h; diff --git a/src/connectors/ssh/ssh.c b/src/connectors/ssh/ssh.c index fce60331fd21..493164f6fefc 100644 --- a/src/connectors/ssh/ssh.c +++ b/src/connectors/ssh/ssh.c @@ -27,14 +27,15 @@ #include "src/common/libutil/log.h" #include "src/common/libutil/popen2.h" #include "src/common/libutil/fdutils.h" +#include "src/common/librouter/sendfd.h" #define CTX_MAGIC 0xe534babb typedef struct { int magic; int fd; int fd_nonblock; - struct flux_msg_iobuf outbuf; - struct flux_msg_iobuf inbuf; + struct iobuf outbuf; + struct iobuf inbuf; const char *ssh_cmd; char *ssh_argz; size_t ssh_argz_len; @@ -96,7 +97,7 @@ static int op_send (void *impl, const flux_msg_t *msg, int flags) if (set_nonblock (c, (flags & FLUX_O_NONBLOCK)) < 0) return -1; - if (flux_msg_sendfd (c->fd, msg, &c->outbuf) < 0) + if (sendfd (c->fd, msg, &c->outbuf) < 0) return -1; return 0; } @@ -108,7 +109,7 @@ static flux_msg_t *op_recv (void *impl, int flags) if (set_nonblock (c, (flags & FLUX_O_NONBLOCK)) < 0) return NULL; - return flux_msg_recvfd (c->fd, &c->inbuf); + return recvfd (c->fd, &c->inbuf); } static int op_event_subscribe (void *impl, const char *topic) @@ -152,8 +153,8 @@ static void op_fini (void *impl) ssh_ctx_t *c = impl; assert (c->magic == CTX_MAGIC); - flux_msg_iobuf_clean (&c->outbuf); - flux_msg_iobuf_clean (&c->inbuf); + iobuf_clean (&c->outbuf); + iobuf_clean (&c->inbuf); if (c->fd >= 0) (void)close (c->fd); if (c->ssh_argz) @@ -386,8 +387,8 @@ flux_t *connector_init (const char *path, int flags) } c->fd = popen2_get_fd (c->p); c->fd_nonblock = -1; - flux_msg_iobuf_init (&c->outbuf); - flux_msg_iobuf_init (&c->inbuf); + iobuf_init (&c->outbuf); + iobuf_init (&c->inbuf); if (!(c->h = flux_handle_create (c, &handle_ops, flags))) goto error; if (test_broker_connection (c) < 0) diff --git a/src/modules/connector-local/local.c b/src/modules/connector-local/local.c index 97e65a91aa80..3d001e6e523e 100644 --- a/src/modules/connector-local/local.c +++ b/src/modules/connector-local/local.c @@ -32,6 +32,7 @@ #include "src/common/libutil/cleanup.h" #include "src/common/libutil/iterators.h" #include "src/common/libutil/fdutils.h" +#include "src/common/librouter/sendfd.h" enum { DEBUG_AUTHFAIL_ONESHOT = 1, /* force auth to fail one time */ @@ -66,8 +67,8 @@ typedef struct { int fd; flux_watcher_t *inw; flux_watcher_t *outw; - struct flux_msg_iobuf inbuf; - struct flux_msg_iobuf outbuf; + struct iobuf inbuf; + struct iobuf outbuf; zlist_t *outqueue; /* queue of outbound flux_msg_t */ mod_local_ctx_t *ctx; zhash_t *disconnect_notify; @@ -250,8 +251,8 @@ static client_t * client_create (mod_local_ctx_t *ctx, int fd) client_write_cb, c))) goto error; flux_watcher_start (c->inw); - flux_msg_iobuf_init (&c->inbuf); - flux_msg_iobuf_init (&c->outbuf); + iobuf_init (&c->inbuf); + iobuf_init (&c->outbuf); if (send_auth_response (fd, 0) < 0) goto error_noresponse; if (fd_set_nonblocking (fd) < 0) @@ -270,7 +271,7 @@ static int client_send_try (client_t *c) flux_msg_t *msg = zlist_head (c->outqueue); if (msg) { - if (flux_msg_sendfd (c->fd, msg, &c->outbuf) < 0) { + if (sendfd (c->fd, msg, &c->outbuf) < 0) { if (errno != EWOULDBLOCK && errno != EAGAIN) return -1; //flux_log (c->ctx->h, LOG_DEBUG, "send: client not ready"); @@ -931,11 +932,11 @@ static void client_destroy (client_t *c) } flux_watcher_stop (c->outw); flux_watcher_destroy (c->outw); - flux_msg_iobuf_clean (&c->outbuf); + iobuf_clean (&c->outbuf); flux_watcher_stop (c->inw); flux_watcher_destroy (c->inw); - flux_msg_iobuf_clean (&c->inbuf); + iobuf_clean (&c->inbuf); if (c->fd != -1) close (c->fd); @@ -1023,13 +1024,13 @@ static void client_read_cb (flux_reactor_t *r, flux_watcher_t *w, * EWOULDBLOCK, EAGAIN stores state in c->inbuf for continuation */ //flux_log (h, LOG_DEBUG, "recv: client ready"); - if (!(msg = flux_msg_recvfd (c->fd, &c->inbuf))) { + if (!(msg = recvfd (c->fd, &c->inbuf))) { if (errno == EWOULDBLOCK || errno == EAGAIN) { //flux_log (h, LOG_DEBUG, "recv: client not ready"); return; } if (errno != ECONNRESET && errno != EPROTO) - flux_log_error (h, "flux_msg_recvfd"); + flux_log_error (h, "recvfd"); goto error_disconnect; } if (flux_msg_get_type (msg, &type) < 0) {