Skip to content

Commit

Permalink
lib: Simplified and clarified o_stream_send_istream() API
Browse files Browse the repository at this point in the history
  • Loading branch information
sirainen authored and GitLab committed May 18, 2016
1 parent 8b2cf1c commit 6adf683
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 100 deletions.
7 changes: 4 additions & 3 deletions src/lib-fs/ostream-metawrap.c
Expand Up @@ -37,19 +37,20 @@ o_stream_metawrap_sendv(struct ostream_private *stream,
return ret;
}

static off_t
static int
o_stream_metawrap_send_istream(struct ostream_private *_outstream,
struct istream *instream)
{
struct metawrap_ostream *outstream =
(struct metawrap_ostream *)_outstream;
uoff_t orig_outstream_offset = _outstream->ostream.offset;
off_t ret;

o_stream_metawrap_call_callback(outstream);
if ((ret = o_stream_send_istream(_outstream->parent, instream)) < 0)
o_stream_copy_error_from_parent(_outstream);
else
_outstream->ostream.offset += ret;
_outstream->ostream.offset +=
_outstream->ostream.offset - orig_outstream_offset;
return ret;
}

Expand Down
28 changes: 11 additions & 17 deletions src/lib/iostream-temp.c
Expand Up @@ -128,28 +128,27 @@ static int o_stream_temp_dup_cancel(struct temp_ostream *tstream)
struct istream *input;
uoff_t size = tstream->dupstream_offset -
tstream->dupstream_start_offset;
off_t ret;
int ret = -1;

i_stream_seek(tstream->dupstream, tstream->dupstream_start_offset);

input = i_stream_create_limit(tstream->dupstream, size);
do {
ret = io_stream_copy(&tstream->ostream.ostream, input);
} while (input->v_offset < tstream->dupstream_offset && ret > 0);
if (ret < 0 && tstream->ostream.ostream.stream_errno == 0) {
if (io_stream_copy(&tstream->ostream.ostream, input) > 0) {
/* everything copied */
ret = 0;
} else if (tstream->ostream.ostream.stream_errno == 0) {
i_assert(input->stream_errno != 0);
tstream->ostream.ostream.stream_errno = input->stream_errno;
}
i_stream_destroy(&input);
i_stream_unref(&tstream->dupstream);
return ret < 0 ? -1 : 0;
return ret;
}

static int o_stream_temp_dup_istream(struct temp_ostream *outstream,
struct istream *instream)
{
uoff_t in_size;
off_t ret;

if (!instream->readable_fd || i_stream_get_fd(instream) == -1)
return 0;
Expand All @@ -170,25 +169,20 @@ static int o_stream_temp_dup_istream(struct temp_ostream *outstream,
outstream->dupstream_offset > in_size)
return o_stream_temp_dup_cancel(outstream);
}
ret = in_size - instream->v_offset;
i_stream_seek(instream, in_size);
outstream->dupstream_offset = instream->v_offset;
return ret;
return 1;
}

static off_t o_stream_temp_send_istream(struct ostream_private *_outstream,
struct istream *instream)
static int o_stream_temp_send_istream(struct ostream_private *_outstream,
struct istream *instream)
{
struct temp_ostream *outstream = (struct temp_ostream *)_outstream;
uoff_t orig_offset;
int ret;

if ((outstream->flags & IOSTREAM_TEMP_FLAG_TRY_FD_DUP) != 0) {
orig_offset = outstream->dupstream_offset;
if ((ret = o_stream_temp_dup_istream(outstream, instream)) > 0)
return outstream->dupstream_offset - orig_offset;
if (ret < 0)
return -1;
if ((ret = o_stream_temp_dup_istream(outstream, instream)) != 0)
return ret;
outstream->flags &= ~IOSTREAM_TEMP_FLAG_TRY_FD_DUP;
}
return io_stream_copy(&outstream->ostream.ostream, instream);
Expand Down
62 changes: 26 additions & 36 deletions src/lib/ostream-file.c
Expand Up @@ -687,20 +687,20 @@ o_stream_file_write_at(struct ostream_private *stream,
return 0;
}

static off_t io_stream_sendfile(struct ostream_private *outstream,
struct istream *instream, int in_fd,
bool *sendfile_not_supported_r)
static int io_stream_sendfile(struct ostream_private *outstream,
struct istream *instream, int in_fd,
bool *sendfile_not_supported_r)
{
struct file_ostream *foutstream = (struct file_ostream *)outstream;
uoff_t start_offset;
uoff_t in_size, offset, send_size, v_offset;
ssize_t ret;

*sendfile_not_supported_r = FALSE;

if ((ret = i_stream_get_size(instream, TRUE, &in_size)) <= 0) {
outstream->ostream.stream_errno = ret == 0 ? ESPIPE :
instream->stream_errno;
if ((ret = i_stream_get_size(instream, TRUE, &in_size)) < 0)
return -1;
if (ret == 0) {
*sendfile_not_supported_r = TRUE;
return -1;
}

Expand All @@ -713,8 +713,8 @@ static off_t io_stream_sendfile(struct ostream_private *outstream,
if (o_stream_lseek(foutstream) < 0)
return -1;

start_offset = v_offset = instream->v_offset;
do {
v_offset = instream->v_offset;
for (;;) {
offset = instream->real_stream->abs_start_offset + v_offset;
send_size = in_size - v_offset;

Expand All @@ -737,6 +737,8 @@ static off_t io_stream_sendfile(struct ostream_private *outstream,
if (errno == EINVAL)
*sendfile_not_supported_r = TRUE;
else {
io_stream_set_error(&outstream->iostream,
"sendfile() failed: %m");
outstream->ostream.stream_errno = errno;
/* close only if error wasn't because
sendfile() isn't supported */
Expand All @@ -749,24 +751,14 @@ static off_t io_stream_sendfile(struct ostream_private *outstream,
foutstream->real_offset += ret;
foutstream->buffer_offset += ret;
outstream->ostream.offset += ret;
} while ((uoff_t)ret != send_size);
}

i_stream_seek(instream, v_offset);
if (ret == 0) {
/* we should be at EOF. if not, write more. */
i_assert(!foutstream->file ||
instream->v_offset - start_offset == in_size);
if (i_stream_read(instream) > 0) {
if (io_stream_sendfile(outstream, instream, in_fd,
sendfile_not_supported_r) < 0)
return -1;
}
}
return ret < 0 ? -1 : (off_t)(instream->v_offset - start_offset);
return ret <= 0 ? ret : 1;
}

static off_t io_stream_copy_backwards(struct ostream_private *outstream,
struct istream *instream, uoff_t in_size)
static int io_stream_copy_backwards(struct ostream_private *outstream,
struct istream *instream, uoff_t in_size)
{
struct file_ostream *foutstream = (struct file_ostream *)outstream;
uoff_t in_start_offset, in_offset, in_limit, out_offset;
Expand Down Expand Up @@ -806,8 +798,9 @@ static off_t io_stream_copy_backwards(struct ostream_private *outstream,
i_stream_seek(instream, in_offset);
read_size = in_limit - in_offset;

(void)i_stream_read_bytes(instream, &data, &size,
read_size);
if (i_stream_read_bytes(instream, &data, &size,
read_size) == 0)
i_unreached();
if (size >= read_size) {
size = read_size;
if (instream->mmaped) {
Expand Down Expand Up @@ -839,21 +832,19 @@ static off_t io_stream_copy_backwards(struct ostream_private *outstream,
}

outstream->ostream.offset += in_size - in_start_offset;
return (off_t) (in_size - in_start_offset);
return 1;
}

static off_t io_stream_copy_same_stream(struct ostream_private *outstream,
struct istream *instream)
static int io_stream_copy_same_stream(struct ostream_private *outstream,
struct istream *instream)
{
uoff_t in_size;
off_t in_abs_offset, ret = 0;

/* copying data within same fd. we'll have to be careful with
seeks and overlapping writes. */
if ((ret = i_stream_get_size(instream, TRUE, &in_size)) < 0) {
outstream->ostream.stream_errno = instream->stream_errno;
if ((ret = i_stream_get_size(instream, TRUE, &in_size)) < 0)
return -1;
}
if (ret == 0) {
/* if we couldn't find out the size, it means that instream
isn't a regular file_istream. we can be reasonably sure that
Expand All @@ -869,7 +860,7 @@ static off_t io_stream_copy_same_stream(struct ostream_private *outstream,
if (ret == 0) {
/* copying data over itself. we don't really
need to do that, just fake it. */
return in_size - instream->v_offset;
return 1;
}
if (ret > 0 && in_size > (uoff_t)ret) {
/* overlapping */
Expand All @@ -881,13 +872,12 @@ static off_t io_stream_copy_same_stream(struct ostream_private *outstream,
}
}

static off_t o_stream_file_send_istream(struct ostream_private *outstream,
struct istream *instream)
static int o_stream_file_send_istream(struct ostream_private *outstream,
struct istream *instream)
{
struct file_ostream *foutstream = (struct file_ostream *)outstream;
bool same_stream;
int in_fd;
off_t ret;
int in_fd, ret;
bool sendfile_not_supported;

in_fd = !instream->readable_fd ? -1 : i_stream_get_fd(instream);
Expand Down
6 changes: 3 additions & 3 deletions src/lib/ostream-private.h
Expand Up @@ -22,8 +22,8 @@ struct ostream_private {
unsigned int iov_count);
int (*write_at)(struct ostream_private *stream,
const void *data, size_t size, uoff_t offset);
off_t (*send_istream)(struct ostream_private *outstream,
struct istream *instream);
int (*send_istream)(struct ostream_private *outstream,
struct istream *instream);
void (*switch_ioloop)(struct ostream_private *stream);

/* data: */
Expand All @@ -47,7 +47,7 @@ struct ostream *
o_stream_create(struct ostream_private *_stream, struct ostream *parent, int fd)
ATTR_NULL(2);

off_t io_stream_copy(struct ostream *outstream, struct istream *instream);
int io_stream_copy(struct ostream *outstream, struct istream *instream);

void o_stream_copy_error_from_parent(struct ostream_private *_stream);
/* This should be called before sending data to parent stream. It makes sure
Expand Down
60 changes: 29 additions & 31 deletions src/lib/ostream.c
Expand Up @@ -341,11 +341,12 @@ void o_stream_set_no_error_handling(struct ostream *stream, bool set)
stream->real_stream->error_handling_disabled = set;
}

off_t o_stream_send_istream(struct ostream *outstream,
struct istream *instream)
int o_stream_send_istream(struct ostream *outstream, struct istream *instream)
{
struct ostream_private *_outstream = outstream->real_stream;
off_t ret;
uoff_t old_outstream_offset = outstream->offset;
uoff_t old_instream_offset = instream->v_offset;
int ret;

if (unlikely(outstream->closed || instream->closed ||
outstream->stream_errno != 0)) {
Expand All @@ -354,13 +355,22 @@ off_t o_stream_send_istream(struct ostream *outstream,
}

ret = _outstream->send_istream(_outstream, instream);
if (unlikely(ret < 0)) {
if (outstream->stream_errno != 0) {
errno = outstream->stream_errno;
} else {
i_assert(instream->stream_errno != 0);
}
if (instream->stream_errno != 0) {
errno = instream->stream_errno;
return -1;
} else if (outstream->stream_errno != 0) {
errno = outstream->stream_errno;
return -1;
}
if (ret == 0) {
/* partial send */
i_assert(!outstream->blocking || !instream->blocking);
} else {
/* fully sent everything */
i_assert(!i_stream_have_bytes_left(instream));
}
i_assert((outstream->offset - old_outstream_offset) ==
(instream->v_offset - old_instream_offset));
return ret;
}

Expand All @@ -383,34 +393,22 @@ int o_stream_pwrite(struct ostream *stream, const void *data, size_t size,
return ret;
}

off_t io_stream_copy(struct ostream *outstream, struct istream *instream)
int io_stream_copy(struct ostream *outstream, struct istream *instream)
{
uoff_t start_offset;
struct const_iovec iov;
const unsigned char *data;
ssize_t ret;

start_offset = instream->v_offset;
do {
(void)i_stream_read_more(instream, &data, &iov.iov_len);
if (iov.iov_len == 0) {
/* all sent */
if (instream->stream_errno != 0)
return -1;
break;
}

while (i_stream_read_more(instream, &data, &iov.iov_len) > 0) {
iov.iov_base = data;
ret = o_stream_sendv(outstream, &iov, 1);
if (ret <= 0) {
if (ret == 0)
break;
return -1;
}
if ((ret = o_stream_sendv(outstream, &iov, 1)) <= 0)
return ret;
i_stream_skip(instream, ret);
} while ((size_t)ret == iov.iov_len);
}

return (off_t)(instream->v_offset - start_offset);
if (instream->stream_errno != 0)
return -1;
return i_stream_have_bytes_left(instream) ? 0 : 1;
}

void o_stream_switch_ioloop(struct ostream *stream)
Expand Down Expand Up @@ -561,8 +559,8 @@ o_stream_default_write_at(struct ostream_private *_stream,
return -1;
}

static off_t o_stream_default_send_istream(struct ostream_private *outstream,
struct istream *instream)
static int o_stream_default_send_istream(struct ostream_private *outstream,
struct istream *instream)
{
return io_stream_copy(&outstream->ostream, instream);
}
Expand Down
25 changes: 15 additions & 10 deletions src/lib/ostream.h
Expand Up @@ -143,19 +143,24 @@ void o_stream_ignore_last_errors(struct ostream *stream);
When creating wrapper streams, they copy this behavior from the parent
stream. */
void o_stream_set_no_error_handling(struct ostream *stream, bool set);
/* Send data from input stream. Returns number of bytes sent, or -1 if error
in either outstream or instream. Note that this function may block if either
instream or outstream is blocking.
/* Send data from input stream. Returns 1 if the entire instream was sent
without errors, 0 if either instream or outstream is nonblocking and not
everything was sent, or -1 if either instream or outstream failed (see their
stream_errno for which one).
Also note that this function may not add anything to the output buffer, so
if you want the flush callback to be called when more data can be written,
you'll need to call o_stream_set_flush_pending() manually.
On non-failure instream is skips over all data written to outstream.
This means that the number of bytes written to outstream is always equal to
the number of bytes skipped in instream.
For non-blocking outstreams: Note that this function may not add anything to
the output buffer, so if you want the flush callback to be called when more
data can be written, you'll need to call o_stream_set_flush_pending()
explicitly.
It's also possible to use this function to copy data within same file
descriptor. If the file must be grown, you have to do it manually before
calling this function. */
off_t o_stream_send_istream(struct ostream *outstream,
struct istream *instream);
descriptor, even if the source and destination overlaps. If the file must
be grown, you have to do it manually before calling this function. */
int o_stream_send_istream(struct ostream *outstream, struct istream *instream);

/* Write data to specified offset. Returns 0 if successful, -1 if error. */
int o_stream_pwrite(struct ostream *stream, const void *data, size_t size,
Expand Down

0 comments on commit 6adf683

Please sign in to comment.