Skip to content

Commit

Permalink
lib: istream: Allow switching to a specific ioloop.
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanbosch authored and villesavolainen committed Mar 12, 2018
1 parent 0cb6dc4 commit e364ae8
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 19 deletions.
6 changes: 4 additions & 2 deletions src/lib-http/http-server-request.c
Expand Up @@ -461,14 +461,16 @@ struct http_server_istream {
};

static void
http_server_istream_switch_ioloop(struct istream_private *stream)
http_server_istream_switch_ioloop_to(struct istream_private *stream,
struct ioloop *ioloop)
{
struct http_server_istream *hsristream =
(struct http_server_istream *)stream;

if (hsristream->istream.istream.blocking)
return;

i_assert(ioloop == current_ioloop);
http_server_connection_switch_ioloop(hsristream->req->conn);
}

Expand Down Expand Up @@ -581,7 +583,7 @@ http_server_request_get_payload_input(struct http_server_request *req,
hsristream->istream.stream_size_passthrough = TRUE;

hsristream->istream.read = http_server_istream_read;
hsristream->istream.switch_ioloop = http_server_istream_switch_ioloop;
hsristream->istream.switch_ioloop_to = http_server_istream_switch_ioloop_to;
hsristream->istream.iostream.destroy = http_server_istream_destroy;

hsristream->istream.istream.readable_fd = FALSE;
Expand Down
7 changes: 4 additions & 3 deletions src/lib/istream-multiplex.c
Expand Up @@ -189,11 +189,12 @@ static ssize_t i_stream_multiplex_ichannel_read(struct istream_private *stream)
}

static void
i_stream_multiplex_ichannel_switch_ioloop(struct istream_private *stream)
i_stream_multiplex_ichannel_switch_ioloop_to(struct istream_private *stream,
struct ioloop *ioloop)
{
struct multiplex_ichannel *channel = (struct multiplex_ichannel*)stream;

i_stream_switch_ioloop(channel->mstream->parent);
i_stream_switch_ioloop_to(channel->mstream->parent, ioloop);
}

static void
Expand Down Expand Up @@ -244,7 +245,7 @@ i_stream_add_channel_real(struct multiplex_istream *mstream, uint8_t cid)
channel->cid = cid;
channel->mstream = mstream;
channel->istream.read = i_stream_multiplex_ichannel_read;
channel->istream.switch_ioloop = i_stream_multiplex_ichannel_switch_ioloop;
channel->istream.switch_ioloop_to = i_stream_multiplex_ichannel_switch_ioloop_to;
channel->istream.iostream.close = i_stream_multiplex_ichannel_close;
channel->istream.iostream.destroy = i_stream_multiplex_ichannel_destroy;
channel->istream.max_buffer_size = mstream->bufsize;
Expand Down
3 changes: 2 additions & 1 deletion src/lib/istream-private.h
Expand Up @@ -19,7 +19,8 @@ struct istream_private {
void (*sync)(struct istream_private *stream);
int (*stat)(struct istream_private *stream, bool exact);
int (*get_size)(struct istream_private *stream, bool exact, uoff_t *size_r);
void (*switch_ioloop)(struct istream_private *stream);
void (*switch_ioloop_to)(struct istream_private *stream,
struct ioloop *ioloop);
struct istream_snapshot *
(*snapshot)(struct istream_private *stream,
struct istream_snapshot *prev_snapshot);
Expand Down
19 changes: 12 additions & 7 deletions src/lib/istream-timeout.c
Expand Up @@ -27,16 +27,18 @@ static void i_stream_timeout_close(struct iostream_private *stream,
i_stream_close(tstream->istream.parent);
}

static void i_stream_timeout_switch_ioloop(struct istream_private *stream)
static void i_stream_timeout_switch_ioloop_to(struct istream_private *stream,
struct ioloop *ioloop)
{
struct timeout_istream *tstream = (struct timeout_istream *)stream;

if (tstream->to != NULL)
tstream->to = io_loop_move_timeout(&tstream->to);
tstream->to = io_loop_move_timeout_to(ioloop, &tstream->to);
}

static void i_stream_timeout(struct timeout_istream *tstream)
{
struct iostream_private *iostream = &tstream->istream.iostream;
unsigned int over_msecs;
int diff;

Expand All @@ -54,8 +56,9 @@ static void i_stream_timeout(struct timeout_istream *tstream)
/* we haven't reached the read timeout yet, update it */
if (diff < 0)
diff = 0;
tstream->to = timeout_add(tstream->timeout_msecs - diff,
i_stream_timeout, tstream);
tstream->to = timeout_add_to(io_stream_get_ioloop(iostream),
tstream->timeout_msecs - diff,
i_stream_timeout, tstream);
return;
}
over_msecs = diff - tstream->timeout_msecs;
Expand Down Expand Up @@ -86,6 +89,7 @@ static ssize_t
i_stream_timeout_read(struct istream_private *stream)
{
struct timeout_istream *tstream = (struct timeout_istream *)stream;
struct iostream_private *iostream = &tstream->istream.iostream;
ssize_t ret;

i_stream_seek(stream->parent, stream->parent_start_offset +
Expand All @@ -105,8 +109,9 @@ i_stream_timeout_read(struct istream_private *stream)
/* first read. add the timeout here instead of in init
in case the stream is created long before it's actually
read from. */
tstream->to = timeout_add(tstream->timeout_msecs,
i_stream_timeout, tstream);
tstream->to = timeout_add_to(io_stream_get_ioloop(iostream),
tstream->timeout_msecs,
i_stream_timeout, tstream);
i_stream_timeout_set_pending(tstream);
} else if (ret > 0 && tstream->to != NULL) {
/* we read something, reset the timeout */
Expand All @@ -131,7 +136,7 @@ i_stream_create_timeout(struct istream *input, unsigned int timeout_msecs)
tstream->created = ioloop_time;

tstream->istream.read = i_stream_timeout_read;
tstream->istream.switch_ioloop = i_stream_timeout_switch_ioloop;
tstream->istream.switch_ioloop_to = i_stream_timeout_switch_ioloop_to;
tstream->istream.iostream.close = i_stream_timeout_close;

tstream->istream.istream.readable_fd = input->readable_fd;
Expand Down
16 changes: 11 additions & 5 deletions src/lib/istream.c
Expand Up @@ -926,18 +926,24 @@ void i_stream_set_input_pending(struct istream *stream, bool pending)
io_set_pending(stream->real_stream->io);
}

void i_stream_switch_ioloop(struct istream *stream)
void i_stream_switch_ioloop_to(struct istream *stream, struct ioloop *ioloop)
{
io_stream_switch_ioloop_to(&stream->real_stream->iostream,
current_ioloop);
io_stream_switch_ioloop_to(&stream->real_stream->iostream, ioloop);

do {
if (stream->real_stream->switch_ioloop != NULL)
stream->real_stream->switch_ioloop(stream->real_stream);
if (stream->real_stream->switch_ioloop_to != NULL) {
stream->real_stream->switch_ioloop_to(
stream->real_stream, ioloop);
}
stream = stream->real_stream->parent;
} while (stream != NULL);
}

void i_stream_switch_ioloop(struct istream *stream)
{
i_stream_switch_ioloop_to(stream, current_ioloop);
}

void i_stream_set_io(struct istream *stream, struct io *io)
{
stream = i_stream_get_root_io(stream);
Expand Down
5 changes: 4 additions & 1 deletion src/lib/istream.h
Expand Up @@ -4,6 +4,8 @@
/* Note that some systems (Solaris) may use a macro to redefine struct stat */
#include <sys/stat.h>

struct ioloop;

struct istream {
uoff_t v_offset;

Expand Down Expand Up @@ -233,7 +235,8 @@ bool i_stream_add_data(struct istream *stream, const unsigned char *data,
void i_stream_set_input_pending(struct istream *stream, bool pending);

/* If there are any I/O loop items associated with the stream, move all of
them to current_ioloop. */
them to provided/current ioloop. */
void i_stream_switch_ioloop_to(struct istream *stream, struct ioloop *ioloop);
void i_stream_switch_ioloop(struct istream *stream);

#endif

0 comments on commit e364ae8

Please sign in to comment.