Skip to content

Commit

Permalink
lib: Add multiplex stream support
Browse files Browse the repository at this point in the history
This allows having multiple channels of data in single stream.
  • Loading branch information
cmouse authored and Timo Sirainen committed Oct 5, 2017
1 parent b0421c7 commit 1de7b73
Show file tree
Hide file tree
Showing 9 changed files with 1,192 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/lib/Makefile.am
Expand Up @@ -73,6 +73,7 @@ liblib_la_SOURCES = \
istream-jsonstr.c \
istream-limit.c \
istream-mmap.c \
istream-multiplex.c \
istream-rawlog.c \
istream-seekable.c \
istream-sized.c \
Expand Down Expand Up @@ -115,6 +116,7 @@ liblib_la_SOURCES = \
ostream-failure-at.c \
ostream-file.c \
ostream-hash.c \
ostream-multiplex.c \
ostream-null.c \
ostream-rawlog.c \
ostream-unix.c \
Expand Down Expand Up @@ -224,6 +226,7 @@ headers = \
istream-file-private.h \
istream-hash.h \
istream-jsonstr.h \
istream-multiplex.h \
istream-private.h \
istream-rawlog.h \
istream-seekable.h \
Expand Down Expand Up @@ -259,6 +262,7 @@ headers = \
ostream-failure-at.h \
ostream-file-private.h \
ostream-hash.h \
ostream-multiplex.h \
ostream-private.h \
ostream-null.h \
ostream-rawlog.h \
Expand Down Expand Up @@ -348,6 +352,7 @@ test_lib_SOURCES = \
test-istream-crlf.c \
test-istream-failure-at.c \
test-istream-jsonstr.c \
test-istream-multiplex.c \
test-istream-seekable.c \
test-istream-sized.c \
test-istream-tee.c \
Expand All @@ -367,6 +372,8 @@ test_lib_SOURCES = \
test-ostream-escaped.c \
test-ostream-failure-at.c \
test-ostream-file.c \
test-ostream-multiplex.c \
test-multiplex.c \
test-path-util.c \
test-primes.c \
test-printf-format-fix.c \
Expand Down
270 changes: 270 additions & 0 deletions src/lib/istream-multiplex.c
@@ -0,0 +1,270 @@
/* Copyright (c) 2017 Dovecot authors, see the included COPYING file */

#include "lib.h"
#include "ioloop.h"
#include "array.h"
#include "istream-private.h"
#include "istream-multiplex.h"

/* all multiplex packets are [1 byte cid][4 byte length][data] */

struct multiplex_istream;

struct multiplex_ichannel {
struct istream_private istream;
struct multiplex_istream *mstream;
uint8_t cid;
size_t pending_pos;
bool closed:1;
};

struct multiplex_istream {
struct istream *parent;

/* channel 0 is main channel */
uint8_t cur_channel;
unsigned int remain;
size_t bufsize;
ARRAY(struct multiplex_ichannel *) channels;

bool blocking:1;
};

static ssize_t i_stream_multiplex_ichannel_read(struct istream_private *stream);

static struct multiplex_ichannel *
get_channel(struct multiplex_istream *mstream, uint8_t cid)
{
struct multiplex_ichannel **channelp;
i_assert(mstream != NULL);
array_foreach_modifiable(&mstream->channels, channelp) {
if (*channelp != NULL && (*channelp)->cid == cid)
return *channelp;
}
return NULL;
}

static void propagate_error(struct multiplex_istream *mstream, int stream_errno)
{
struct multiplex_ichannel **channelp;
array_foreach_modifiable(&mstream->channels, channelp)
if (*channelp != NULL)
(*channelp)->istream.istream.stream_errno = stream_errno;
}

static void propagate_eof(struct multiplex_istream *mstream)
{
struct multiplex_ichannel **channelp;
array_foreach_modifiable(&mstream->channels, channelp) {
if (*channelp != NULL) {
(*channelp)->istream.istream.eof = TRUE;
}
}
}

static ssize_t
i_stream_multiplex_read(struct multiplex_istream *mstream, uint8_t cid)
{
struct multiplex_ichannel *req_channel = get_channel(mstream, cid);
const unsigned char *data;
size_t len = 0, used, wanted, avail;
ssize_t ret, got = 0;

if (mstream->parent == NULL) {
req_channel->istream.istream.eof = TRUE;
return -1;
}

data = i_stream_get_data(mstream->parent, &len);

if (len == 0 && mstream->parent->closed) {
req_channel->istream.istream.eof = TRUE;
return -1;
}

if (((mstream->remain > 0 && len == 0) ||
(mstream->remain == 0 && len < 5)) &&
(ret = i_stream_read(mstream->parent)) <= 0) {
propagate_error(mstream, mstream->parent->stream_errno);
if (mstream->parent->eof)
propagate_eof(mstream);
return ret;
}

for(;;) {
data = i_stream_get_data(mstream->parent, &len);
if (len == 0) {
if (got == 0 && mstream->blocking)
got += i_stream_multiplex_read(mstream, cid);
break;
}

if (mstream->remain > 0) {
struct multiplex_ichannel *channel =
get_channel(mstream, mstream->cur_channel);
wanted = I_MIN(len, mstream->remain);
/* is it open? */
if (channel != NULL && !channel->closed) {
struct istream_private *stream = &channel->istream;
stream->pos += channel->pending_pos;
bool alloc_ret = i_stream_try_alloc(stream, wanted, &avail);
stream->pos -= channel->pending_pos;
if (!alloc_ret) {
i_stream_set_input_pending(&stream->istream, TRUE);
if (channel->cid != cid)
return 0;
if (got > 0)
break;
return -2;
}

used = I_MIN(wanted, avail);

/* dump into buffer */
if (channel->cid != cid) {
i_assert(stream->pos + channel->pending_pos + used <= stream->buffer_size);
memcpy(stream->w_buffer + stream->pos + channel->pending_pos,
data, used);
channel->pending_pos += used;
i_stream_set_input_pending(&stream->istream, TRUE);
} else {
i_assert(stream->pos + used <= stream->buffer_size);
memcpy(stream->w_buffer + stream->pos, data, used);
stream->pos += used;
got += used;
}
} else {
used = wanted;
}
mstream->remain -= used;
i_stream_skip(mstream->parent, used);
/* see if there is more to read */
continue;
}
if (mstream->remain == 0) {
/* need more data */
if (len < 5) {
ret = i_stream_multiplex_ichannel_read(&req_channel->istream);
if (ret > 0)
got += ret;
break;
}
/* channel ID */
mstream->cur_channel = data[0];
/* data length */
mstream->remain = be32_to_cpu_unaligned(data+1);
i_stream_skip(mstream->parent, 5);
}
}

propagate_error(mstream, mstream->parent->stream_errno);
if (mstream->parent->eof)
propagate_eof(mstream);

return got;
}

static ssize_t i_stream_multiplex_ichannel_read(struct istream_private *stream)
{
struct multiplex_ichannel *channel = (struct multiplex_ichannel*)stream;
/* if previous multiplex read dumped data for us
actually serve it here. */
if (channel->pending_pos > 0) {
ssize_t ret = channel->pending_pos;
stream->pos += channel->pending_pos;
channel->pending_pos = 0;
return ret;
}
return i_stream_multiplex_read(channel->mstream, channel->cid);
}

static void
i_stream_multiplex_ichannel_close(struct iostream_private *stream, bool close_parent)
{
struct multiplex_ichannel *const *channelp;
struct multiplex_ichannel *channel = (struct multiplex_ichannel*)stream;
channel->closed = TRUE;
if (close_parent) {
array_foreach(&channel->mstream->channels, channelp)
if (*channelp != NULL && !(*channelp)->closed)
return;
i_stream_close(channel->mstream->parent);
}
}

static void i_stream_multiplex_try_destroy(struct multiplex_istream *mstream)
{
struct multiplex_ichannel **channelp;
/* can't do anything until they are all closed */
array_foreach_modifiable(&mstream->channels, channelp)
if (*channelp != NULL)
return;
i_stream_unref(&mstream->parent);
array_free(&mstream->channels);
i_free(mstream);
}

static void i_stream_multiplex_ichannel_destroy(struct iostream_private *stream)
{
struct multiplex_ichannel **channelp;
struct multiplex_ichannel *channel = (struct multiplex_ichannel*)stream;
i_stream_multiplex_ichannel_close(stream, TRUE);
i_free(channel->istream.w_buffer);
array_foreach_modifiable(&channel->mstream->channels, channelp) {
if (*channelp == channel) {
*channelp = NULL;
break;
}
}
i_stream_multiplex_try_destroy(channel->mstream);
}

static struct istream *
i_stream_add_channel_real(struct multiplex_istream *mstream, uint8_t cid)
{
struct multiplex_ichannel *channel = i_new(struct multiplex_ichannel, 1);
channel->cid = cid;
channel->mstream = mstream;
channel->istream.read = i_stream_multiplex_ichannel_read;
channel->istream.iostream.close = i_stream_multiplex_ichannel_close;
channel->istream.iostream.destroy = i_stream_multiplex_ichannel_destroy;
channel->istream.max_buffer_size = mstream->bufsize;
channel->istream.istream.blocking = mstream->blocking;
if (cid == 0)
channel->istream.fd = i_stream_get_fd(mstream->parent);
else
channel->istream.fd = -1;
array_append(&channel->mstream->channels, &channel, 1);

return i_stream_create(&channel->istream, NULL, channel->istream.fd);
}

struct istream *i_stream_multiplex_add_channel(struct istream *stream, uint8_t cid)
{
struct multiplex_ichannel *chan =
(struct multiplex_ichannel *)stream->real_stream;
i_assert(get_channel(chan->mstream, cid) == NULL);

return i_stream_add_channel_real(chan->mstream, cid);
}

struct istream *i_stream_create_multiplex(struct istream *parent, size_t bufsize)
{
struct multiplex_istream *mstream;

mstream = i_new(struct multiplex_istream, 1);
mstream->parent = parent;
mstream->bufsize = bufsize;
mstream->blocking = parent->blocking;
i_array_init(&mstream->channels, 8);
i_stream_ref(parent);

return i_stream_add_channel_real(mstream, 0);
}

uint8_t i_stream_multiplex_get_channel_id(struct istream *stream)
{
struct multiplex_ichannel *channel =
(struct multiplex_ichannel *)stream->real_stream;
return channel->cid;
}
8 changes: 8 additions & 0 deletions src/lib/istream-multiplex.h
@@ -0,0 +1,8 @@
#ifndef ISTREAM_MULTIPLEX
#define ISTREAM_MULTIPLEX 1

struct istream *i_stream_create_multiplex(struct istream *parent, size_t bufsize);
struct istream *i_stream_multiplex_add_channel(struct istream *stream, uint8_t cid);
uint8_t i_stream_multiplex_get_channel_id(struct istream *stream);

#endif

0 comments on commit 1de7b73

Please sign in to comment.