Skip to content

Commit

Permalink
lib: ostream: Allow switching to a specific ioloop.
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanbosch authored and villesavolainen committed Mar 12, 2018
1 parent e364ae8 commit 860ba28
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 23 deletions.
7 changes: 4 additions & 3 deletions src/lib-ssl-iostream/ostream-openssl.c
Expand Up @@ -159,11 +159,12 @@ o_stream_ssl_sendv(struct ostream_private *stream,
return bytes_sent;
}

static void o_stream_ssl_switch_ioloop(struct ostream_private *stream)
static void o_stream_ssl_switch_ioloop_to(struct ostream_private *stream,
struct ioloop *ioloop)
{
struct ssl_ostream *sstream = (struct ssl_ostream *)stream;

o_stream_switch_ioloop(sstream->ssl_io->plain_output);
o_stream_switch_ioloop_to(sstream->ssl_io->plain_output, ioloop);
}

static int plain_flush_callback(struct ssl_ostream *sstream)
Expand Down Expand Up @@ -227,7 +228,7 @@ struct ostream *openssl_o_stream_create_ssl(struct ssl_iostream *ssl_io)
sstream->ostream.iostream.destroy = o_stream_ssl_destroy;
sstream->ostream.sendv = o_stream_ssl_sendv;
sstream->ostream.flush = o_stream_ssl_flush;
sstream->ostream.switch_ioloop = o_stream_ssl_switch_ioloop;
sstream->ostream.switch_ioloop_to = o_stream_ssl_switch_ioloop_to;

sstream->ostream.get_used_size = o_stream_ssl_get_used_size;
sstream->ostream.flush_pending = o_stream_ssl_flush_pending;
Expand Down
32 changes: 21 additions & 11 deletions src/lib/ostream-file.c
Expand Up @@ -333,6 +333,7 @@ static int buffer_flush(struct file_ostream *fstream)
static void o_stream_file_cork(struct ostream_private *stream, bool set)
{
struct file_ostream *fstream = (struct file_ostream *)stream;
struct iostream_private *iostream = &fstream->ostream.iostream;
int ret;

if (stream->corked != set && !stream->ostream.closed) {
Expand All @@ -345,8 +346,10 @@ static void o_stream_file_cork(struct ostream_private *stream, bool set)
if (fstream->io == NULL &&
(ret == 0 || fstream->flush_pending) &&
!stream->ostream.closed) {
fstream->io = io_add(fstream->fd, IO_WRITE,
stream_send_io, fstream);
fstream->io = io_add_to(
io_stream_get_ioloop(iostream),
fstream->fd, IO_WRITE,
stream_send_io, fstream);
}
}

Expand All @@ -371,11 +374,13 @@ static void
o_stream_file_flush_pending(struct ostream_private *stream, bool set)
{
struct file_ostream *fstream = (struct file_ostream *) stream;
struct iostream_private *iostream = &fstream->ostream.iostream;

fstream->flush_pending = set;
if (set && !stream->corked && fstream->io == NULL) {
fstream->io = io_add(fstream->fd, IO_WRITE,
stream_send_io, fstream);
fstream->io = io_add_to(io_stream_get_ioloop(iostream),
fstream->fd, IO_WRITE,
stream_send_io, fstream);
}
}

Expand Down Expand Up @@ -459,6 +464,7 @@ static void o_stream_grow_buffer(struct file_ostream *fstream, size_t bytes)
static void stream_send_io(struct file_ostream *fstream)
{
struct ostream *ostream = &fstream->ostream.ostream;
struct iostream_private *iostream = &fstream->ostream.iostream;
bool use_cork = !fstream->ostream.corked;
int ret;

Expand Down Expand Up @@ -488,8 +494,9 @@ static void stream_send_io(struct file_ostream *fstream)
might have just returned 0 without there being any data
to be sent. */
if (fstream->io == NULL) {
fstream->io = io_add(fstream->fd, IO_WRITE,
stream_send_io, fstream);
fstream->io = io_add_to(io_stream_get_ioloop(iostream),
fstream->fd, IO_WRITE,
stream_send_io, fstream);
}
}

Expand All @@ -499,6 +506,7 @@ static void stream_send_io(struct file_ostream *fstream)
static size_t o_stream_add(struct file_ostream *fstream,
const void *data, size_t size)
{
struct iostream_private *iostream = &fstream->ostream.iostream;
size_t unused, sent;
int i;

Expand Down Expand Up @@ -529,8 +537,9 @@ static size_t o_stream_add(struct file_ostream *fstream,

if (sent != 0 && fstream->io == NULL &&
!fstream->ostream.corked && !fstream->file) {
fstream->io = io_add(fstream->fd, IO_WRITE, stream_send_io,
fstream);
fstream->io = io_add_to(io_stream_get_ioloop(iostream),
fstream->fd, IO_WRITE, stream_send_io,
fstream);
}

return sent;
Expand Down Expand Up @@ -927,12 +936,13 @@ o_stream_file_send_istream(struct ostream_private *outstream,
return io_stream_copy_same_stream(outstream, instream);
}

static void o_stream_file_switch_ioloop(struct ostream_private *stream)
static void o_stream_file_switch_ioloop_to(struct ostream_private *stream,
struct ioloop *ioloop)
{
struct file_ostream *fstream = (struct file_ostream *)stream;

if (fstream->io != NULL)
fstream->io = io_loop_move_io(&fstream->io);
fstream->io = io_loop_move_io_to(ioloop, &fstream->io);
}

struct ostream *
Expand All @@ -956,7 +966,7 @@ o_stream_create_file_common(struct file_ostream *fstream,
fstream->ostream.sendv = o_stream_file_sendv;
fstream->ostream.write_at = o_stream_file_write_at;
fstream->ostream.send_istream = o_stream_file_send_istream;
fstream->ostream.switch_ioloop = o_stream_file_switch_ioloop;
fstream->ostream.switch_ioloop_to = o_stream_file_switch_ioloop_to;

fstream->writev = o_stream_file_writev;

Expand Down
3 changes: 2 additions & 1 deletion src/lib/ostream-private.h
Expand Up @@ -25,7 +25,8 @@ struct ostream_private {
enum ostream_send_istream_result
(*send_istream)(struct ostream_private *outstream,
struct istream *instream);
void (*switch_ioloop)(struct ostream_private *stream);
void (*switch_ioloop_to)(struct ostream_private *stream,
struct ioloop *ioloop);

/* data: */
struct ostream ostream;
Expand Down
21 changes: 14 additions & 7 deletions src/lib/ostream.c
Expand Up @@ -499,13 +499,18 @@ io_stream_copy(struct ostream *outstream, struct istream *instream)
return OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
}

void o_stream_switch_ioloop(struct ostream *stream)
void o_stream_switch_ioloop_to(struct ostream *stream, struct ioloop *ioloop)
{
struct ostream_private *_stream = stream->real_stream;

io_stream_switch_ioloop_to(&_stream->iostream, current_ioloop);
io_stream_switch_ioloop_to(&_stream->iostream, ioloop);

_stream->switch_ioloop_to(_stream, ioloop);
}

_stream->switch_ioloop(_stream);
void o_stream_switch_ioloop(struct ostream *stream)
{
o_stream_switch_ioloop_to(stream, current_ioloop);
}

static void o_stream_default_close(struct iostream_private *stream,
Expand Down Expand Up @@ -669,10 +674,12 @@ o_stream_default_send_istream(struct ostream_private *outstream,
return io_stream_copy(&outstream->ostream, instream);
}

static void o_stream_default_switch_ioloop(struct ostream_private *_stream)
static void
o_stream_default_switch_ioloop_to(struct ostream_private *_stream,
struct ioloop *ioloop)
{
if (_stream->parent != NULL)
o_stream_switch_ioloop(_stream->parent);
o_stream_switch_ioloop_to(_stream->parent, ioloop);
}

struct ostream *
Expand Down Expand Up @@ -723,8 +730,8 @@ o_stream_create(struct ostream_private *_stream, struct ostream *parent, int fd)
_stream->write_at = o_stream_default_write_at;
if (_stream->send_istream == NULL)
_stream->send_istream = o_stream_default_send_istream;
if (_stream->switch_ioloop == NULL)
_stream->switch_ioloop = o_stream_default_switch_ioloop;
if (_stream->switch_ioloop_to == NULL)
_stream->switch_ioloop_to = o_stream_default_switch_ioloop_to;

io_stream_init(&_stream->iostream);
return &_stream->ostream;
Expand Down
3 changes: 2 additions & 1 deletion src/lib/ostream.h
Expand Up @@ -210,7 +210,8 @@ int o_stream_pwrite(struct ostream *stream, const void *data, size_t size,
void o_stream_get_last_write_time(struct ostream *stream, struct timeval *tv_r);

/* If there are any I/O loop items associated with the stream, move all of
them to current_ioloop. */
them to provided/current ioloop. */
void o_stream_switch_ioloop_to(struct ostream *stream, struct ioloop *ioloop);
void o_stream_switch_ioloop(struct ostream *stream);

#endif

0 comments on commit 860ba28

Please sign in to comment.