Skip to content

Commit

Permalink
Merge pull request #2375 from garlick/librouter_sendfd
Browse files Browse the repository at this point in the history
librouter: move sendfd, recvfd out of public API
  • Loading branch information
grondo committed Sep 15, 2019
2 parents e7c28a2 + 9e64a8e commit a7e2a6d
Show file tree
Hide file tree
Showing 15 changed files with 651 additions and 325 deletions.
1 change: 1 addition & 0 deletions configure.ac
Expand Up @@ -421,6 +421,7 @@ AC_CONFIG_FILES( \
src/common/libschedutil/Makefile \
src/common/libeventlog/Makefile \
src/common/libioencode/Makefile \
src/common/librouter/Makefile \
src/bindings/Makefile \
src/bindings/lua/Makefile \
src/bindings/python/Makefile \
Expand Down
3 changes: 0 additions & 3 deletions doc/man3/Makefile.am
Expand Up @@ -12,7 +12,6 @@ MAN3_FILES_PRIMARY = \
flux_event_publish.3 \
flux_pollevents.3 \
flux_msg_encode.3 \
flux_msg_sendfd.3 \
flux_rpc.3 \
flux_mrpc.3 \
flux_get_rank.3 \
Expand Down Expand Up @@ -70,7 +69,6 @@ MAN3_FILES_SECONDARY = \
flux_event_publish_get_seq.3 \
flux_pollfd.3 \
flux_msg_decode.3 \
flux_msg_recvfd.3 \
flux_get_size.3 \
flux_attr_set.3 \
flux_set_reactor.3 \
Expand Down Expand Up @@ -213,7 +211,6 @@ flux_event_publish_raw.3: flux_event_publish.3
flux_event_publish_get_seq.3: flux_event_publish.3
flux_pollfd.3: flux_pollevents.3
flux_msg_decode.3: flux_msg_encode.3
flux_msg_recvfd.3: flux_msg_sendfd.3
flux_get_size.3: flux_get_rank.3
flux_attr_set.3: flux_attr_get.3
flux_set_reactor.3: flux_get_reactor.3
Expand Down
97 changes: 0 additions & 97 deletions doc/man3/flux_msg_sendfd.adoc

This file was deleted.

19 changes: 10 additions & 9 deletions src/cmd/builtin/proxy.c
Expand Up @@ -33,6 +33,7 @@
#include "src/common/libutil/oom.h"
#include "src/common/libutil/cleanup.h"
#include "src/common/libutil/fdutils.h"
#include "src/common/librouter/sendfd.h"

#define LISTEN_BACKLOG 5

Expand Down Expand Up @@ -63,8 +64,8 @@ typedef struct {
int wfd;
flux_watcher_t *inw;
flux_watcher_t *outw;
struct flux_msg_iobuf inbuf;
struct flux_msg_iobuf outbuf;
struct iobuf inbuf;
struct iobuf outbuf;
zlist_t *outqueue; /* queue of outbound flux_msg_t */
proxy_ctx_t *ctx;
zhash_t *disconnect_notify;
Expand Down Expand Up @@ -137,8 +138,8 @@ static client_t * client_create (proxy_ctx_t *ctx, int rfd, int wfd)
goto error;
}
flux_watcher_start (c->inw);
flux_msg_iobuf_init (&c->inbuf);
flux_msg_iobuf_init (&c->outbuf);
iobuf_init (&c->inbuf);
iobuf_init (&c->outbuf);
if (fd_set_nonblocking (c->rfd) < 0) {
flux_log_error (h, "fd_set_nonblocking");
goto error;
Expand All @@ -159,7 +160,7 @@ static int client_send_try (client_t *c)
flux_msg_t *msg = zlist_head (c->outqueue);

if (msg) {
if (flux_msg_sendfd (c->wfd, msg, &c->outbuf) < 0) {
if (sendfd (c->wfd, msg, &c->outbuf) < 0) {
if (errno != EWOULDBLOCK && errno != EAGAIN)
return -1;
//flux_log (c->ctx->h, LOG_DEBUG, "send: client not ready");
Expand Down Expand Up @@ -424,11 +425,11 @@ static void client_destroy (client_t *c)
}
flux_watcher_stop (c->outw);
flux_watcher_destroy (c->outw);
flux_msg_iobuf_clean (&c->outbuf);
iobuf_clean (&c->outbuf);

flux_watcher_stop (c->inw);
flux_watcher_destroy (c->inw);
flux_msg_iobuf_clean (&c->inbuf);
iobuf_clean (&c->inbuf);

if (c->rfd != -1)
close (c->rfd);
Expand Down Expand Up @@ -509,13 +510,13 @@ static void client_read_cb (flux_reactor_t *r, flux_watcher_t *w,
* EWOULDBLOCK, EAGAIN stores state in c->inbuf for continuation
*/
//flux_log (h, LOG_DEBUG, "recv: client ready");
if (!(msg = flux_msg_recvfd (c->rfd, &c->inbuf))) {
if (!(msg = recvfd (c->rfd, &c->inbuf))) {
if (errno == EWOULDBLOCK || errno == EAGAIN) {
//flux_log (h, LOG_DEBUG, "recv: client not ready");
return;
}
if (errno != ECONNRESET)
flux_log_error (h, "flux_msg_recvfd");
flux_log_error (h, "recvfd");
goto disconnect;
}
if (flux_msg_get_type (msg, &type) < 0) {
Expand Down
4 changes: 3 additions & 1 deletion src/common/Makefile.am
Expand Up @@ -13,7 +13,8 @@ SUBDIRS = libtap \
libaggregate \
libschedutil \
libeventlog \
libioencode
libioencode \
librouter

AM_CFLAGS = $(WARNING_CFLAGS) $(CODE_COVERAGE_CFLAGS)
AM_LDFLAGS = $(CODE_COVERAGE_LIBS)
Expand All @@ -31,6 +32,7 @@ libflux_internal_la_LIBADD = \
$(builddir)/libtomlc99/libtomlc99.la \
$(builddir)/libeventlog/libeventlog.la \
$(builddir)/libioencode/libioencode.la \
$(builddir)/librouter/librouter.la \
$(JANSSON_LIBS) $(ZMQ_LIBS) $(LIBPTHREAD) $(LIBUTIL) \
$(LIBDL) $(LIBRT) $(FLUX_SECURITY_LIBS) $(LIBSODIUM_LIBS)
libflux_internal_la_LDFLAGS = $(san_ld_zdef_flag)
Expand Down
127 changes: 0 additions & 127 deletions src/common/libflux/message.c
Expand Up @@ -1418,133 +1418,6 @@ void flux_msg_fprint (FILE *f, const flux_msg_t *msg)
zframe_fprint (proto, prefix, f);
}

#define IOBUF_MAGIC 0xffee0012

void flux_msg_iobuf_init (struct flux_msg_iobuf *iobuf)
{
memset (iobuf, 0, sizeof (*iobuf));
}

void flux_msg_iobuf_clean (struct flux_msg_iobuf *iobuf)
{
if (iobuf->buf && iobuf->buf != iobuf->buf_fixed)
free (iobuf->buf);
memset (iobuf, 0, sizeof (*iobuf));
}

int flux_msg_sendfd (int fd, const flux_msg_t *msg,
struct flux_msg_iobuf *iobuf)
{
struct flux_msg_iobuf local;
struct flux_msg_iobuf *io = iobuf ? iobuf : &local;
int rc = -1;

if (fd < 0 || !msg) {
errno = EINVAL;
goto done;
}
if (!iobuf)
flux_msg_iobuf_init (&local);
if (!io->buf) {
io->size = flux_msg_encode_size (msg) + 8;
if (io->size <= sizeof (io->buf_fixed))
io->buf = io->buf_fixed;
else if (!(io->buf = malloc (io->size))) {
errno = ENOMEM;
goto done;
}
*(uint32_t *)&io->buf[0] = IOBUF_MAGIC;
*(uint32_t *)&io->buf[4] = htonl (io->size - 8);
if (flux_msg_encode (msg, &io->buf[8], io->size - 8) < 0)
goto done;
io->done = 0;
}
do {
rc = write (fd, io->buf + io->done, io->size - io->done);
if (rc < 0)
goto done;
io->done += rc;
} while (io->done < io->size);
rc = 0;
done:
if (iobuf) {
if (rc == 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
flux_msg_iobuf_clean (iobuf);
} else {
if (rc < 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
errno = EPROTO;
flux_msg_iobuf_clean (&local);
}
return rc;
}

flux_msg_t *flux_msg_recvfd (int fd, struct flux_msg_iobuf *iobuf)
{
struct flux_msg_iobuf local;
struct flux_msg_iobuf *io = iobuf ? iobuf : &local;
flux_msg_t *msg = NULL;
int rc = -1;

if (fd < 0) {
errno = EINVAL;
goto done;
}
if (!iobuf)
flux_msg_iobuf_init (&local);
if (!io->buf) {
io->buf = io->buf_fixed;
io->size = sizeof (io->buf_fixed);
}
do {
if (io->done < 8) {
rc = read (fd, io->buf + io->done, 8 - io->done);
if (rc < 0)
goto done;
if (rc == 0) {
errno = ECONNRESET;
goto done;
}
io->done += rc;
if (io->done == 8) {
if (*(uint32_t *)&io->buf[0] != IOBUF_MAGIC) {
errno = EPROTO;
goto done;
}
io->size = ntohl (*(uint32_t *)&io->buf[4]) + 8;
if (io->size > sizeof (io->buf_fixed)) {
if (!(io->buf = malloc (io->size))) {
errno = EPROTO;
goto done;
}
memcpy (io->buf, io->buf_fixed, 8);
}
}
}
if (io->done >= 8 && io->done < io->size) {
rc = read (fd, io->buf + io->done, io->size - io->done);
if (rc < 0)
goto done;
if (rc == 0) {
errno = ECONNRESET;
goto done;
}
io->done += rc;
}
} while (io->done < io->size);
if (!(msg = flux_msg_decode (io->buf + 8, io->size - 8)))
goto done;
done:
if (iobuf) {
if (msg != NULL || (errno != EAGAIN && errno != EWOULDBLOCK))
flux_msg_iobuf_clean (iobuf);
} else {
if (rc < 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
errno = EPROTO;
flux_msg_iobuf_clean (&local);
}
return msg;
}

int flux_msg_sendzsock (void *sock, const flux_msg_t *msg)
{
int rc = -1;
Expand Down

0 comments on commit a7e2a6d

Please sign in to comment.