Skip to content

Commit

Permalink
Merge 630927d into 569fe2d
Browse files Browse the repository at this point in the history
  • Loading branch information
grondo committed Jun 16, 2018
2 parents 569fe2d + 630927d commit 0732390
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 14 deletions.
34 changes: 25 additions & 9 deletions src/common/libflux/ev_buffer_write.c
Expand Up @@ -24,6 +24,8 @@

#include <stddef.h>
#include <stdbool.h>
#include <unistd.h>
#include <errno.h>

#include "src/common/libev/ev.h"

Expand All @@ -40,7 +42,17 @@ static void buffer_write_cb (struct ev_loop *loop, ev_io *iow, int revents)
if (flux_buffer_read_to_fd (ebw->fb, ebw->fd, -1) < 0)
return;

if (!flux_buffer_bytes (ebw->fb))
if (!flux_buffer_bytes (ebw->fb) && ebw->eof) {
if (close (ebw->fd) < 0)
ebw->close_errno = errno;
ebw->fd = -1;
ebw->closed = true;
ebw->eof = false;
if (ebw->cb)
ebw->cb (loop, ebw, revents);
}

if (!flux_buffer_bytes (ebw->fb) && !ebw->eof)
ev_io_stop (ebw->loop, &(ebw->io_w));
}
else {
Expand All @@ -49,17 +61,21 @@ static void buffer_write_cb (struct ev_loop *loop, ev_io *iow, int revents)
}
}

static void buffer_data_available_cb (flux_buffer_t *fb, void *arg)
/* data is available, start ev io watcher assuming user has
* started the watcher.
*/
void ev_buffer_write_wakeup (struct ev_buffer_write *ebw)
{
struct ev_buffer_write *ebw = arg;

/* data is available, start ev io watcher assuming user has
* started the watcher.
*/
if (ebw->start)
ev_io_start (ebw->loop, &(ebw->io_w));
}

static void buffer_data_available_cb (flux_buffer_t *fb, void *arg)
{
struct ev_buffer_write *ebw = arg;
ev_buffer_write_wakeup (ebw);
}

int ev_buffer_write_init (struct ev_buffer_write *ebw,
int fd,
int size,
Expand Down Expand Up @@ -105,8 +121,8 @@ void ev_buffer_write_start (struct ev_loop *loop, struct ev_buffer_write *ebw)
{
if (!ebw->start) {
ebw->start = true;
/* do not start io watcher unless there is data to be written out */
if (flux_buffer_bytes (ebw->fb))
/* do not start watcher unless there is data or EOF to be written out */
if (flux_buffer_bytes (ebw->fb) || ebw->eof)
ev_io_start (ebw->loop, &(ebw->io_w));
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/common/libflux/ev_buffer_write.h
Expand Up @@ -17,6 +17,9 @@ struct ev_buffer_write {
flux_buffer_t *fb;
struct ev_loop *loop;
bool start; /* flag, if user started reactor */
bool eof; /* flag, eof written */
bool closed; /* flag, fd has been closed */
int close_errno; /* errno from close */
void *data;
};

Expand All @@ -28,5 +31,5 @@ int ev_buffer_write_init (struct ev_buffer_write *ebw,
void ev_buffer_write_cleanup (struct ev_buffer_write *ebw);
void ev_buffer_write_start (struct ev_loop *loop, struct ev_buffer_write *ebw);
void ev_buffer_write_stop (struct ev_loop *loop, struct ev_buffer_write *ebw);

void ev_buffer_write_wakeup (struct ev_buffer_write *ebw);
#endif /* !_EV_BUFFER_WRITE_H */
33 changes: 33 additions & 0 deletions src/common/libflux/reactor.c
Expand Up @@ -563,6 +563,39 @@ flux_buffer_t *flux_buffer_write_watcher_get_buffer (flux_watcher_t *w)
return NULL;
}

int flux_buffer_write_watcher_close (flux_watcher_t *w)
{
struct ev_buffer_write *evw;
if (!w) {
errno = EINVAL;
return (-1);
}
evw = w->data;
if (evw->eof) {
errno = EINPROGRESS;
return (-1);
}
if (evw->closed) {
errno = EINVAL;
return (-1);
}
evw->eof = true;
flux_buffer_readonly (evw->fb);
ev_buffer_write_wakeup (evw);
return (0);
}

int flux_buffer_write_watcher_is_closed (flux_watcher_t *w, int *errp)
{
if (w) {
struct ev_buffer_write *evw = w->data;
if (evw->closed && errp != NULL)
*errp = evw->close_errno;
return (evw->closed);
}
return (0);
}

/* 0MQ sockets
*/

Expand Down
16 changes: 15 additions & 1 deletion src/common/libflux/reactor.h
Expand Up @@ -91,13 +91,27 @@ flux_watcher_t *flux_buffer_read_watcher_create (flux_reactor_t *r, int fd,

flux_buffer_t *flux_buffer_read_watcher_get_buffer (flux_watcher_t *w);

/* 'cb' is used only for FLUX_POLLERR */
/* 'cb' only called after fd closed (FLUX_POLLOUT) or error (FLUX_POLLERR) */
flux_watcher_t *flux_buffer_write_watcher_create (flux_reactor_t *r, int fd,
int size, flux_watcher_f cb,
int flags, void *arg);

flux_buffer_t *flux_buffer_write_watcher_get_buffer (flux_watcher_t *w);

/* "write" EOF to buffer write watcher 'w'. The underlying fd will be closed
* once the buffer is emptied. The underlying flux_buffer_t will be marked
* readonly and subsequent flux_buffer_write* calls will return EROFS.
*
* Once close(2) completes, the watcher callback is called with FLUX_POLLOUT.
* Use flux_buffer_write_watcher_is_closed() to check for errors.
*
* Returns 0 on success, -1 on error with errno set.
*/
int flux_buffer_write_watcher_close (flux_watcher_t *w);

/* Returns 1 if write watcher is closed, errnum from close in close_err */
int flux_buffer_write_watcher_is_closed (flux_watcher_t *w, int *close_err);

/* zmq socket
*/

Expand Down
66 changes: 64 additions & 2 deletions src/common/libflux/test/reactor.c
Expand Up @@ -287,8 +287,8 @@ static void buffer_write (flux_reactor_t *r, flux_watcher_t *w,
"buffer: write callback called with FLUX_POLLERR");
}
else {
ok (false,
"buffer: write callback called");
ok (flux_buffer_write_watcher_is_closed (w, NULL),
"buffer: write callback called after close");
}

(*count)++;
Expand Down Expand Up @@ -366,7 +366,9 @@ static void buffer_read_overflow (flux_reactor_t *r, flux_watcher_t *w,

static void test_buffer (flux_reactor_t *reactor)
{
int errnum = 0;
int fd[2];
int pfds[2];
flux_watcher_t *w;
flux_buffer_t *fb;
int count;
Expand Down Expand Up @@ -577,6 +579,66 @@ static void test_buffer (flux_reactor_t *reactor)
flux_watcher_stop (w);
flux_watcher_destroy (w);

/* write buffer watcher close() testcase */

ok (flux_buffer_write_watcher_close (NULL) == -1 && errno == EINVAL,
"buffer: flux_buffer_write_watcher_close handles NULL argument");

count = 0;
ok (pipe (pfds) == 0,
"buffer: hey I can has a pipe!");
w = flux_buffer_write_watcher_create (reactor,
pfds[1],
1024,
buffer_write,
0,
&count);
ok (w != NULL,
"buffer: write watcher close: watcher created");
fb = flux_buffer_write_watcher_get_buffer (w);
ok (fb != NULL,
"buffer: write watcher close: buffer retrieved");

ok (flux_buffer_write (fb, "foobaz", 6) == 6,
"buffer: write to buffer success");

ok (flux_buffer_write_watcher_is_closed (w, NULL) == 0,
"buffer: flux_buffer_write_watcher_is_closed returns false");
ok (flux_buffer_write_watcher_close (w) == 0,
"buffer: flux_buffer_write_watcher_close: Success");
ok (flux_buffer_write_watcher_is_closed (w, NULL) == 0,
"buffer: watcher still not closed (close(2) not called yet)");
ok (flux_buffer_write_watcher_close (w) == -1 && errno == EINPROGRESS,
"buffer: flux_buffer_write_watcher_close: In progress");

ok (flux_buffer_write (fb, "shouldfail", 10) == -1 && errno == EROFS,
"buffer: flux_buffer_write after close fails with EROFS");

flux_watcher_start (w);

ok (flux_reactor_run (reactor, 0) == 0,
"buffer: reactor ran to completion");

ok (count == 1,
"buffer: write callback called once");
ok (flux_buffer_write_watcher_is_closed (w, &errnum) == 1 && errnum == 0,
"buffer: flux_buffer_write_watcher_is_closed returns true");
ok (flux_buffer_write_watcher_close (w) == -1 && errno == EINVAL,
"buffer: flux_buffer_write_watcher_close after close returns EINVAL");

ok (read (pfds[0], buf, 1024) == 6,
"buffer: read from pipe success");

ok (!memcmp (buf, "foobaz", 6),
"buffer: read from pipe returned correct data");

ok (read (pfds[0], buf, 1024) == 0,
"buffer: read from pipe got EOF");

flux_watcher_stop (w);
flux_watcher_destroy (w);

close (pfds[0]);
close (fd[0]);
close (fd[1]);
}
Expand Down
8 changes: 7 additions & 1 deletion t/Makefile.am
Expand Up @@ -203,7 +203,8 @@ check_PROGRAMS = \
module/basic \
request/treq \
barrier/tbarrier \
wreck/rcalc
wreck/rcalc \
reactor/reactorcat

check_LTLIBRARIES = \
module/parent.la \
Expand Down Expand Up @@ -397,3 +398,8 @@ wreck_sched_dummy_la_CPPFLAGS = $(test_cppflags)
wreck_sched_dummy_la_LDFLAGS = $(fluxmod_ldflags) -module -rpath /nowher
wreck_sched_dummy_la_LIBADD = \
$(test_ldadd) $(LIBDL) $(LIBUTIL)

reactor_reactorcat_SOURCES = reactor/reactorcat.c
reactor_reactorcat_CPPFLAGS = $(test_cppflags)
reactor_reactorcat_LDADD = \
$(test_ldadd) $(LIBDL) $(LIBUTIL)
98 changes: 98 additions & 0 deletions t/reactor/reactorcat.c
@@ -0,0 +1,98 @@

#include <unistd.h>
#include <stdarg.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>

#include <flux/core.h>

static int total_bytes = 0;

static void die (const char *fmt, ...)
{
va_list ap;
va_start (ap, fmt);
vfprintf (stderr, fmt, ap);
va_end (ap);
exit (1);
}

static void write_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
{
int errnum;
if (revents & FLUX_POLLERR)
die ("got POLLERR on stdout. Aborting\n");

if (flux_buffer_write_watcher_is_closed (w, &errnum)) {
if (errnum)
fprintf (stderr, "error: close: %s\n", strerror (errnum));
flux_watcher_stop (w);
}
}

static void read_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
{
const void *data;
int len, n = 0;
flux_watcher_t *writer = arg;
flux_buffer_t *wfb = NULL;
flux_buffer_t *rfb = NULL;;

if (!(wfb = flux_buffer_write_watcher_get_buffer (writer))
|| !(rfb = flux_buffer_read_watcher_get_buffer (w)))
die ("failed to get read/write buffers from watchers!\n");

if (!(data = flux_buffer_peek (rfb, -1, &len)))
die ("flux_buffer_peek: %s\n", strerror (errno));

if ((len > 0) && ((n = flux_buffer_write (wfb, data, len)) < 0))
die ("flux_buffer_write: %s\n", strerror (errno));
else if (len == 0) {
/* Propagate EOF to writer, stop reader */
flux_buffer_write_watcher_close (writer);
flux_watcher_stop (w);
}

/* Drop data in read buffer that was successfully written to writer */
flux_buffer_drop (rfb, n);
total_bytes += n;
}

int main (int argc, char *argv[])
{
int rc;
flux_watcher_t *rw, *ww;
flux_reactor_t *r;

if (!(r = flux_reactor_create (0)))
die ("flux_reactor_create failed\n");

ww = flux_buffer_write_watcher_create (r, STDOUT_FILENO, 4096,
write_cb, 0, NULL);
rw = flux_buffer_read_watcher_create (r, STDIN_FILENO, 4096,
read_cb, 0, (void *) ww);
if (!rw || !ww)
die ("flux buffer watcher create failed\n");

flux_watcher_start (rw);
flux_watcher_start (ww);

if ((rc = flux_reactor_run (r, 0)) < 0)
die ("flux_reactor_run() returned nonzero\n");

fprintf (stderr, "debug: %d bytes transferred.\n", total_bytes);

flux_watcher_destroy (rw);
flux_watcher_destroy (ww);
flux_reactor_destroy (r);

return (0);
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/

9 changes: 9 additions & 0 deletions t/t0001-basic.t
Expand Up @@ -378,4 +378,13 @@ test_expect_success 'passing NULL to flux_log functions logs to stderr (#1191)'
grep "err: world: No such file or directory" std.err
'

reactorcat=${SHARNESS_TEST_DIRECTORY}/reactor/reactorcat
test_expect_success 'reactor: reactorcat example program works' '
dd if=/dev/urandom bs=1024 count=4 >reactorcat.in &&
$reactorcat <reactorcat.in >reactorcat.out &&
test_cmp reactorcat.in reactorcat.out &&
$reactorcat </dev/null >reactorcat.devnull.out &&
test -f reactorcat.devnull.out &&
test_must_fail test -s reactorcat.devnull.out
'
test_done

0 comments on commit 0732390

Please sign in to comment.