From 378e6cb162b355d6f103526505bc00b9a78962e7 Mon Sep 17 00:00:00 2001 From: Timo Sirainen Date: Fri, 20 May 2016 15:16:41 +0300 Subject: [PATCH] o_stream_send_istream() API changed again The new API enforces the caller to correctly handle all the possible situations. It also makes in unambiguous whether to wait for input or output stream. --- src/doveadm/server-connection.c | 30 +++++-- src/imap/cmd-getmetadata.c | 20 +++-- src/imap/cmd-urlfetch.c | 24 +++--- src/imap/imap-fetch-body.c | 34 ++++---- src/lib-fs/fs-api.c | 12 +-- src/lib-fs/ostream-metawrap.c | 8 +- src/lib-http/http-client-request.c | 67 +++++++-------- src/lib-http/http-server-response.c | 58 +++++++------ src/lib-http/test-http-payload.c | 40 ++++----- src/lib-http/test-http-request-parser.c | 2 +- src/lib-http/test-http-response-parser.c | 2 +- src/lib-http/test-http-transfer.c | 6 +- src/lib-imap-client/imapc-connection.c | 34 ++++++-- src/lib-storage/index/index-storage.c | 14 +++- src/lib/file-copy.c | 20 +++-- src/lib/iostream-temp.c | 60 ++++++++----- src/lib/ostream-file.c | 82 +++++++++++------- src/lib/ostream-private.h | 8 +- src/lib/ostream.c | 88 ++++++++++++-------- src/lib/ostream.h | 27 +++--- src/lib/test-iostream-temp.c | 10 +-- src/lib/test-ostream-file.c | 8 +- src/plugins/mail-filter/istream-ext-filter.c | 22 +++-- 23 files changed, 411 insertions(+), 265 deletions(-) diff --git a/src/doveadm/server-connection.c b/src/doveadm/server-connection.c index 9f048f6f5a..0b44b5177d 100644 --- a/src/doveadm/server-connection.c +++ b/src/doveadm/server-connection.c @@ -83,26 +83,40 @@ static void print_connection_released(void) static int server_connection_send_cmd_input_more(struct server_connection *conn) { - int ret; + enum ostream_send_istream_result res; + int ret = -1; /* ostream-dot writes only up to max buffer size, so keep it non-zero */ o_stream_set_max_buffer_size(conn->cmd_output, IO_BLOCK_SIZE); - ret = o_stream_send_istream(conn->cmd_output, conn->cmd_input); + res = o_stream_send_istream(conn->cmd_output, conn->cmd_input); o_stream_set_max_buffer_size(conn->cmd_output, (size_t)-1); - if (ret == 0) { - o_stream_set_flush_pending(conn->cmd_output, TRUE); + switch (res) { + case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: + break; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: + return 1; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: return 0; - } - if (conn->cmd_input->stream_errno != 0) { + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: i_error("read(%s) failed: %s", i_stream_get_name(conn->cmd_input), i_stream_get_error(conn->cmd_input)); - } else if (conn->cmd_output->stream_errno != 0 || - o_stream_flush(conn->cmd_output) < 0) { + break; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: i_error("write(%s) failed: %s", o_stream_get_name(conn->cmd_output), o_stream_get_error(conn->cmd_output)); + break; + } + if (res == OSTREAM_SEND_ISTREAM_RESULT_FINISHED) { + if ((ret = o_stream_flush(conn->cmd_output)) == 0) + return 0; + else if (ret < 0) { + i_error("write(%s) failed: %s", + o_stream_get_name(conn->cmd_output), + o_stream_get_error(conn->cmd_output)); + } } i_stream_destroy(&conn->cmd_input); diff --git a/src/imap/cmd-getmetadata.c b/src/imap/cmd-getmetadata.c index 891e08f04b..eb350e12c0 100644 --- a/src/imap/cmd-getmetadata.c +++ b/src/imap/cmd-getmetadata.c @@ -211,25 +211,31 @@ static void cmd_getmetadata_send_entry(struct imap_getmetadata_context *ctx, static bool cmd_getmetadata_stream_continue(struct imap_getmetadata_context *ctx) { - int ret; + enum ostream_send_istream_result res; o_stream_set_max_buffer_size(ctx->cmd->client->output, 0); - ret = o_stream_send_istream(ctx->cmd->client->output, ctx->cur_stream); + res = o_stream_send_istream(ctx->cmd->client->output, ctx->cur_stream); o_stream_set_max_buffer_size(ctx->cmd->client->output, (size_t)-1); - if (ret > 0) { - /* finished */ + switch (res) { + case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: return TRUE; - } else if (ret < 0) { + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: + i_unreached(); + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: + return FALSE; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: i_error("read(%s) failed: %s", i_stream_get_name(ctx->cur_stream), i_stream_get_error(ctx->cur_stream)); client_disconnect(ctx->cmd->client, "Internal GETMETADATA failure"); return TRUE; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: + /* client disconnected */ + return TRUE; } - o_stream_set_flush_pending(ctx->cmd->client->output, TRUE); - return FALSE; + i_unreached(); } static int diff --git a/src/imap/cmd-urlfetch.c b/src/imap/cmd-urlfetch.c index 611892388d..50299f2478 100644 --- a/src/imap/cmd-urlfetch.c +++ b/src/imap/cmd-urlfetch.c @@ -83,6 +83,7 @@ static int cmd_urlfetch_transfer_literal(struct client_command_context *cmd) struct client *client = cmd->client; struct cmd_urlfetch_context *ctx = (struct cmd_urlfetch_context *)cmd->context; + enum ostream_send_istream_result res; int ret; /* are we in the middle of an urlfetch literal? */ @@ -98,27 +99,28 @@ static int cmd_urlfetch_transfer_literal(struct client_command_context *cmd) /* transfer literal to client */ o_stream_set_max_buffer_size(client->output, 0); - ret = o_stream_send_istream(client->output, ctx->input); + res = o_stream_send_istream(client->output, ctx->input); o_stream_set_max_buffer_size(client->output, (size_t)-1); - if (ret > 0) { - /* finished successfully */ + switch (res) { + case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: i_stream_unref(&ctx->input); return 1; - } - if (client->output->closed) { - /* client disconnected */ - return -1; - } - if (ctx->input->stream_errno != 0) { + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: + i_unreached(); + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: + return 0; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: i_error("read(%s) failed: %s (URLFETCH)", i_stream_get_name(ctx->input), i_stream_get_error(ctx->input)); client_disconnect(client, "URLFETCH failed"); return -1; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: + /* client disconnected */ + return -1; } - o_stream_set_flush_pending(client->output, TRUE); - return 0; + i_unreached(); } static bool cmd_urlfetch_continue(struct client_command_context *cmd) diff --git a/src/imap/imap-fetch-body.c b/src/imap/imap-fetch-body.c index 0a8aafef17..0cb0bd2b67 100644 --- a/src/imap/imap-fetch-body.c +++ b/src/imap/imap-fetch-body.c @@ -93,10 +93,10 @@ static int fetch_stream_continue(struct imap_fetch_context *ctx) struct imap_fetch_state *state = &ctx->state; const char *disconnect_reason; uoff_t orig_input_offset = state->cur_input->v_offset; - int ret; + enum ostream_send_istream_result res; o_stream_set_max_buffer_size(ctx->client->output, 0); - ret = o_stream_send_istream(ctx->client->output, state->cur_input); + res = o_stream_send_istream(ctx->client->output, state->cur_input); o_stream_set_max_buffer_size(ctx->client->output, (size_t)-1); if (ctx->state.cur_stats_sizep != NULL) { @@ -104,14 +104,9 @@ static int fetch_stream_continue(struct imap_fetch_context *ctx) state->cur_input->v_offset - orig_input_offset; } - if (state->cur_input->v_offset != state->cur_size) { - /* unfinished */ - if (state->cur_input->stream_errno != 0) { - fetch_read_error(ctx, &disconnect_reason); - client_disconnect(ctx->client, disconnect_reason); - return -1; - } - if (!i_stream_have_bytes_left(state->cur_input)) { + switch (res) { + case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: + if (state->cur_input->v_offset != state->cur_size) { /* Input stream gave less data than expected */ mail_set_cache_corrupted_reason(state->cur_mail, state->cur_size_field, t_strdup_printf( @@ -123,15 +118,20 @@ static int fetch_stream_continue(struct imap_fetch_context *ctx) client_disconnect(ctx->client, "FETCH failed"); return -1; } - if (ret < 0) { - /* client probably disconnected */ - return -1; - } - - o_stream_set_flush_pending(ctx->client->output, TRUE); + return 1; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: + i_unreached(); + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: return 0; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: + fetch_read_error(ctx, &disconnect_reason); + client_disconnect(ctx->client, disconnect_reason); + return -1; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: + /* client disconnected */ + return -1; } - return 1; + i_unreached(); } static const char * diff --git a/src/lib-fs/fs-api.c b/src/lib-fs/fs-api.c index 785a32212a..f124b2b102 100644 --- a/src/lib-fs/fs-api.c +++ b/src/lib-fs/fs-api.c @@ -831,11 +831,14 @@ int fs_default_copy(struct fs_file *src, struct fs_file *dest) dest->copy_input = fs_read_stream(src, IO_BLOCK_SIZE); dest->copy_output = fs_write_stream(dest); } - if (o_stream_send_istream(dest->copy_output, dest->copy_input) == 0) { + switch (o_stream_send_istream(dest->copy_output, dest->copy_input)) { + case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: + break; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: fs_set_error_async(dest->fs); return -1; - } - if (dest->copy_input->stream_errno != 0) { + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: errno = dest->copy_input->stream_errno; fs_set_error(dest->fs, "read(%s) failed: %s", i_stream_get_name(dest->copy_input), @@ -843,8 +846,7 @@ int fs_default_copy(struct fs_file *src, struct fs_file *dest) i_stream_unref(&dest->copy_input); fs_write_stream_abort(dest, &dest->copy_output); return -1; - } - if (dest->copy_output->stream_errno != 0) { + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: errno = dest->copy_output->stream_errno; fs_set_error(dest->fs, "write(%s) failed: %s", o_stream_get_name(dest->copy_output), diff --git a/src/lib-fs/ostream-metawrap.c b/src/lib-fs/ostream-metawrap.c index 08cd4caed3..1cde1b983a 100644 --- a/src/lib-fs/ostream-metawrap.c +++ b/src/lib-fs/ostream-metawrap.c @@ -38,20 +38,20 @@ o_stream_metawrap_sendv(struct ostream_private *stream, return ret; } -static int +static enum ostream_send_istream_result o_stream_metawrap_send_istream(struct ostream_private *_outstream, struct istream *instream) { struct metawrap_ostream *outstream = (struct metawrap_ostream *)_outstream; uoff_t orig_instream_offset = instream->v_offset; - int ret; + enum ostream_send_istream_result res; o_stream_metawrap_call_callback(outstream); - if ((ret = o_stream_send_istream(_outstream->parent, instream)) < 0) + if ((res = o_stream_send_istream(_outstream->parent, instream)) == OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT) o_stream_copy_error_from_parent(_outstream); _outstream->ostream.offset += instream->v_offset - orig_instream_offset; - return ret; + return res; } struct ostream * diff --git a/src/lib-http/http-client-request.c b/src/lib-http/http-client-request.c index 25bc3d6a7a..5f2b4cf67e 100644 --- a/src/lib-http/http-client-request.c +++ b/src/lib-http/http-client-request.c @@ -833,7 +833,7 @@ int http_client_request_send_more(struct http_client_request *req, { struct http_client_connection *conn = req->conn; struct ostream *output = req->payload_output; - int ret; + enum ostream_send_istream_result res; i_assert(req->payload_input != NULL); i_assert(req->payload_output != NULL); @@ -843,31 +843,11 @@ int http_client_request_send_more(struct http_client_request *req, /* chunked ostream needs to write to the parent stream's buffer */ o_stream_set_max_buffer_size(output, IO_BLOCK_SIZE); - ret = o_stream_send_istream(output, req->payload_input); + res = o_stream_send_istream(output, req->payload_input); o_stream_set_max_buffer_size(output, (size_t)-1); - if (req->payload_input->stream_errno != 0) { - /* we're in the middle of sending a request, so the connection - will also have to be aborted */ - *error_r = t_strdup_printf("read(%s) failed: %s", - i_stream_get_name(req->payload_input), - i_stream_get_error(req->payload_input)); - - /* the payload stream assigned to this request is broken, - fail this the request immediately */ - http_client_request_error(&req, - HTTP_CLIENT_REQUEST_ERROR_BROKEN_PAYLOAD, - "Broken payload stream"); - return -1; - } else if (output->stream_errno != 0) { - /* failed to send request */ - *error_r = t_strdup_printf("write(%s) failed: %s", - o_stream_get_name(output), - o_stream_get_error(output)); - return -1; - } - - if (ret > 0) { + switch (res) { + case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: /* finished sending */ if (!req->payload_chunked && req->payload_input->v_offset - req->payload_offset != req->payload_size) { @@ -891,22 +871,43 @@ int http_client_request_send_more(struct http_client_request *req, /* finished sending payload */ http_client_request_finish_payload_out(req); } - } else if (i_stream_have_bytes_left(req->payload_input)) { - /* output is blocking (server needs to act; enable timeout) */ - conn->output_locked = TRUE; - if (!pipelined) - http_client_connection_start_request_timeout(conn); - o_stream_set_flush_pending(output, TRUE); - http_client_request_debug(req, "Partially sent payload"); - } else { + return 0; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: /* input is blocking (client needs to act; disable timeout) */ conn->output_locked = TRUE; if (!pipelined) http_client_connection_stop_request_timeout(conn); conn->io_req_payload = io_add_istream(req->payload_input, http_client_request_payload_input, req); + return 0; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: + /* output is blocking (server needs to act; enable timeout) */ + conn->output_locked = TRUE; + if (!pipelined) + http_client_connection_start_request_timeout(conn); + http_client_request_debug(req, "Partially sent payload"); + return 0; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: + /* we're in the middle of sending a request, so the connection + will also have to be aborted */ + *error_r = t_strdup_printf("read(%s) failed: %s", + i_stream_get_name(req->payload_input), + i_stream_get_error(req->payload_input)); + + /* the payload stream assigned to this request is broken, + fail this the request immediately */ + http_client_request_error(&req, + HTTP_CLIENT_REQUEST_ERROR_BROKEN_PAYLOAD, + "Broken payload stream"); + return -1; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: + /* failed to send request */ + *error_r = t_strdup_printf("write(%s) failed: %s", + o_stream_get_name(output), + o_stream_get_error(output)); + return -1; } - return 0; + i_unreached(); } static int http_client_request_send_real(struct http_client_request *req, diff --git a/src/lib-http/http-server-response.c b/src/lib-http/http-server-response.c index d9ffe98e53..d05eb3bc32 100644 --- a/src/lib-http/http-server-response.c +++ b/src/lib-http/http-server-response.c @@ -481,7 +481,8 @@ int http_server_response_send_more(struct http_server_response *resp, { struct http_server_connection *conn = resp->request->conn; struct ostream *output = resp->payload_output; - off_t ret; + enum ostream_send_istream_result res; + int ret = 0; *error_r = NULL; @@ -494,17 +495,44 @@ int http_server_response_send_more(struct http_server_response *resp, /* chunked ostream needs to write to the parent stream's buffer */ o_stream_set_max_buffer_size(output, IO_BLOCK_SIZE); - ret = o_stream_send_istream(output, resp->payload_input); + res = o_stream_send_istream(output, resp->payload_input); o_stream_set_max_buffer_size(output, (size_t)-1); - if (resp->payload_input->stream_errno != 0) { + switch (res) { + case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: + /* finished sending */ + if (!resp->payload_chunked && + resp->payload_input->v_offset - resp->payload_offset != + resp->payload_size) { + *error_r = t_strdup_printf( + "Input stream %s size changed unexpectedly", + i_stream_get_name(resp->payload_input)); + ret = -1; + } else { + ret = 1; + } + break; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: + /* input is blocking */ + conn->output_locked = TRUE; + conn->io_resp_payload = io_add_istream(resp->payload_input, + http_server_response_payload_input, resp); + break; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: + /* output is blocking */ + conn->output_locked = TRUE; + o_stream_set_flush_pending(output, TRUE); + //http_server_response_debug(resp, "Partially sent payload"); + break; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: /* we're in the middle of sending a response, so the connection will also have to be aborted */ *error_r = t_strdup_printf("read(%s) failed: %s", i_stream_get_name(resp->payload_input), i_stream_get_error(resp->payload_input)); ret = -1; - } else if (output->stream_errno != 0) { + break; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: /* failed to send response */ if (output->stream_errno != EPIPE && output->stream_errno != ECONNRESET) { @@ -512,30 +540,12 @@ int http_server_response_send_more(struct http_server_response *resp, o_stream_get_name(output), o_stream_get_error(output)); } ret = -1; + break; } if (ret != 0) { - /* finished sending */ - if (ret > 0 && !resp->payload_chunked && - resp->payload_input->v_offset - resp->payload_offset != - resp->payload_size) { - *error_r = t_strdup_printf( - "Input stream %s size changed unexpectedly", - i_stream_get_name(resp->payload_input)); - ret = -1; - } - /* finished sending payload */ + /* finished sending payload (or error) */ http_server_response_finish_payload_out(resp); - } else if (i_stream_have_bytes_left(resp->payload_input)) { - /* output is blocking */ - conn->output_locked = TRUE; - o_stream_set_flush_pending(output, TRUE); - //http_server_response_debug(resp, "Partially sent payload"); - } else { - /* input is blocking */ - conn->output_locked = TRUE; - conn->io_resp_payload = io_add_istream(resp->payload_input, - http_server_response_payload_input, resp); } return ret < 0 ? -1 : 0; } diff --git a/src/lib-http/test-http-payload.c b/src/lib-http/test-http-payload.c index 01bf55b048..82546a9793 100644 --- a/src/lib-http/test-http-payload.c +++ b/src/lib-http/test-http-payload.c @@ -218,7 +218,6 @@ client_handle_download_request( struct istream *fstream; struct ostream *output; unsigned int status; - int ret; if (strcmp(hreq->method, "GET") != 0) { http_server_request_fail(req, @@ -244,8 +243,7 @@ client_handle_download_request( if (blocking) { output = http_server_response_get_payload_output(resp, TRUE); - ret=o_stream_send_istream(output, fstream); - if (ret < 0) { + if (o_stream_send_istream(output, fstream) != OSTREAM_SEND_ISTREAM_RESULT_FINISHED) { i_fatal("test server: download: " "failed to send blocking file payload"); } @@ -273,24 +271,25 @@ client_request_read_echo_more(struct client_request *creq) { struct http_server_response *resp; struct istream *payload_input; - int ret; + enum ostream_send_istream_result res; o_stream_set_max_buffer_size(creq->payload_output, IO_BLOCK_SIZE); - ret = o_stream_send_istream(creq->payload_output, creq->payload_input); + res = o_stream_send_istream(creq->payload_output, creq->payload_input); o_stream_set_max_buffer_size(creq->payload_output, (size_t)-1); - if (ret < 0) { - if (creq->payload_output->stream_errno != 0) { - i_fatal("test server: echo: " - "Failed to write all echo payload [%s]", creq->path); - } - if (creq->payload_input->stream_errno != 0) { - i_fatal("test server: echo: " - "Failed to read all echo payload [%s]", creq->path); - } - i_unreached(); - } - if (i_stream_have_bytes_left(creq->payload_input)) + + switch (res) { + case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: + break; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: return; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: + i_fatal("test server: echo: " + "Failed to read all echo payload [%s]", creq->path); + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: + i_fatal("test server: echo: " + "Failed to write all echo payload [%s]", creq->path); + } io_remove(&creq->io); i_stream_unref(&creq->payload_input); @@ -321,7 +320,6 @@ client_handle_echo_request(struct client_request *creq, struct http_server_response *resp; struct ostream *payload_output; uoff_t size; - int ret; creq->path = p_strdup (http_server_request_get_pool(req), path); @@ -358,8 +356,7 @@ client_handle_echo_request(struct client_request *creq, payload_input = partial; } - ret = o_stream_send_istream(payload_output, payload_input); - if (ret < 0) { + if (o_stream_send_istream(payload_output, payload_input) != OSTREAM_SEND_ISTREAM_RESULT_FINISHED) { i_fatal("test server: echo: " "failed to receive blocking echo payload"); } @@ -376,8 +373,7 @@ client_handle_echo_request(struct client_request *creq, http_server_response_add_header(resp, "Content-Type", "text/plain"); payload_output = http_server_response_get_payload_output(resp, TRUE); - ret = o_stream_send_istream(payload_output, payload_input); - if (ret < 0) { + if (o_stream_send_istream(payload_output, payload_input) != OSTREAM_SEND_ISTREAM_RESULT_FINISHED) { i_fatal("test server: echo: " "failed to send blocking echo payload"); } diff --git a/src/lib-http/test-http-request-parser.c b/src/lib-http/test-http-request-parser.c index 3011db5522..029aae68e0 100644 --- a/src/lib-http/test-http-request-parser.c +++ b/src/lib-http/test-http-request-parser.c @@ -201,7 +201,7 @@ static void test_http_request_parse_valid(void) buffer_set_used_size(payload_buffer, 0); output = o_stream_create_buffer(payload_buffer); test_out("payload receive", - o_stream_send_istream(output, request.payload)); + o_stream_send_istream(output, request.payload) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED); o_stream_destroy(&output); payload = str_c(payload_buffer); } else { diff --git a/src/lib-http/test-http-response-parser.c b/src/lib-http/test-http-response-parser.c index 7d2c11dbd3..5105bfc9b2 100644 --- a/src/lib-http/test-http-response-parser.c +++ b/src/lib-http/test-http-response-parser.c @@ -126,7 +126,7 @@ static void test_http_response_parse_valid(void) buffer_set_used_size(payload_buffer, 0); output = o_stream_create_buffer(payload_buffer); test_out("payload receive", - o_stream_send_istream(output, response.payload)); + o_stream_send_istream(output, response.payload) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED); o_stream_destroy(&output); payload = str_c(payload_buffer); } else { diff --git a/src/lib-http/test-http-transfer.c b/src/lib-http/test-http-transfer.c index 95b2e4cc46..2a54af0f9c 100644 --- a/src/lib-http/test-http-transfer.c +++ b/src/lib-http/test-http-transfer.c @@ -104,7 +104,7 @@ static void test_http_transfer_chunked_input_valid(void) buffer_set_used_size(payload_buffer, 0); output = o_stream_create_buffer(payload_buffer); - test_out("payload read", o_stream_send_istream(output, chunked) > 0 + test_out("payload read", o_stream_send_istream(output, chunked) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED && chunked->stream_errno == 0); o_stream_destroy(&output); i_stream_unref(&chunked); @@ -315,8 +315,8 @@ static void test_http_transfer_chunked_output_valid(void) /* read back chunk */ buffer_set_used_size(plain_buffer, 0); output = o_stream_create_buffer(plain_buffer); - ret = o_stream_send_istream(output, ichunked); - test_out("payload unchunk", ret >= 0 + test_out("payload unchunk", + o_stream_send_istream(output, ichunked) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED && ichunked->stream_errno == 0); o_stream_destroy(&output); i_stream_destroy(&ichunked); diff --git a/src/lib-imap-client/imapc-connection.c b/src/lib-imap-client/imapc-connection.c index f0d372378f..8bf5b7d8ea 100644 --- a/src/lib-imap-client/imapc-connection.c +++ b/src/lib-imap-client/imapc-connection.c @@ -1888,21 +1888,33 @@ static int imapc_command_try_send_stream(struct imapc_connection *conn, struct imapc_command *cmd) { struct imapc_command_stream *stream; - int ret; + enum ostream_send_istream_result res; stream = imapc_command_get_sending_stream(cmd); if (stream == NULL) - return -1; + return -2; /* we're sending the stream now */ o_stream_set_max_buffer_size(conn->output, 0); - ret = o_stream_send_istream(conn->output, stream->input); + res = o_stream_send_istream(conn->output, stream->input); o_stream_set_max_buffer_size(conn->output, (size_t)-1); - if (ret == 0) { - o_stream_set_flush_pending(conn->output, TRUE); + switch (res) { + case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: + break; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: + i_unreached(); + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: i_assert(stream->input->v_offset < stream->size); return 0; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: + i_error("imapc: read(%s) failed: %s", + i_stream_get_name(stream->input), + i_stream_get_error(stream->input)); + return -1; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: + /* disconnected */ + return -1; } i_assert(stream->input->v_offset == stream->size); @@ -2018,9 +2030,19 @@ static void imapc_command_send_more(struct imapc_connection *conn) timeout_reset(conn->to_output); if ((ret = imapc_command_try_send_stream(conn, cmd)) == 0) return; + if (ret == -1) { + memset(&reply, 0, sizeof(reply)); + reply.text_without_resp = reply.text_full = "Mailbox not open"; + reply.state = IMAPC_COMMAND_STATE_DISCONNECTED; + + array_delete(&conn->cmd_send_queue, 0, 1); + imapc_command_reply_free(cmd, &reply); + imapc_command_send_more(conn); + return; + } seek_pos = cmd->send_pos; - if (seek_pos != 0 && ret < 0) { + if (seek_pos != 0 && ret == -2) { /* skip over the literal. we can also get here from AUTHENTICATE command, which doesn't use a literal */ if (parse_sync_literal(cmd->data->data, seek_pos, &size)) { diff --git a/src/lib-storage/index/index-storage.c b/src/lib-storage/index/index-storage.c index c0a93b6ac9..8ea8142e36 100644 --- a/src/lib-storage/index/index-storage.c +++ b/src/lib-storage/index/index-storage.c @@ -1025,9 +1025,17 @@ int index_storage_save_continue(struct mail_save_context *ctx, struct mail_storage *storage = ctx->transaction->box->storage; do { - if (o_stream_send_istream(ctx->data.output, input) < 0) { - if (input->stream_errno != 0) - break; + switch (o_stream_send_istream(ctx->data.output, input)) { + case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: + break; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: + break; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: + i_unreached(); + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: + /* handle below */ + break; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: if (!mail_storage_set_error_from_errno(storage)) { mail_storage_set_critical(storage, "save: write(%s) failed: %s", diff --git a/src/lib/file-copy.c b/src/lib/file-copy.c index aab1f756e5..9e6058f1c7 100644 --- a/src/lib/file-copy.c +++ b/src/lib/file-copy.c @@ -70,11 +70,21 @@ static int file_copy_to_tmp(const char *srcpath, const char *tmppath, input = i_stream_create_fd(fd_in, IO_BLOCK_SIZE, FALSE); output = o_stream_create_fd_file(fd_out, 0, FALSE); - ret = o_stream_send_istream(output, input); - if (ret < 0) - i_error("write(%s) failed: %m", tmppath); - else - i_assert(ret != 0); + switch (o_stream_send_istream(output, input)) { + case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: + break; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: + i_unreached(); + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: + i_error("read(%s) failed: %s", srcpath, + i_stream_get_error(input)); + break; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: + i_error("write(%s) failed: %s", tmppath, + o_stream_get_error(output)); + break; + } i_stream_destroy(&input); o_stream_destroy(&output); diff --git a/src/lib/iostream-temp.c b/src/lib/iostream-temp.c index af29eba06b..baefc597d3 100644 --- a/src/lib/iostream-temp.c +++ b/src/lib/iostream-temp.c @@ -30,7 +30,8 @@ struct temp_ostream { uoff_t fd_size; }; -static int o_stream_temp_dup_cancel(struct temp_ostream *tstream); +static bool o_stream_temp_dup_cancel(struct temp_ostream *tstream, + enum ostream_send_istream_result *res_r); static void o_stream_temp_close(struct iostream_private *stream, @@ -104,10 +105,12 @@ o_stream_temp_sendv(struct ostream_private *stream, struct temp_ostream *tstream = (struct temp_ostream *)stream; ssize_t ret = 0; unsigned int i; + enum ostream_send_istream_result res; + tstream->flags &= ~IOSTREAM_TEMP_FLAG_TRY_FD_DUP; if (tstream->dupstream != NULL) { - if (o_stream_temp_dup_cancel(tstream) < 0) + if (o_stream_temp_dup_cancel(tstream, &res)) return -1; } @@ -129,12 +132,13 @@ o_stream_temp_sendv(struct ostream_private *stream, return ret; } -static int o_stream_temp_dup_cancel(struct temp_ostream *tstream) +static bool o_stream_temp_dup_cancel(struct temp_ostream *tstream, + enum ostream_send_istream_result *res_r) { struct istream *input; uoff_t size = tstream->dupstream_offset - tstream->dupstream_start_offset; - int ret = -1; + bool ret = TRUE; /* use res_r to return error */ i_stream_seek(tstream->dupstream, tstream->dupstream_start_offset); tstream->ostream.ostream.offset = 0; @@ -142,29 +146,43 @@ static int o_stream_temp_dup_cancel(struct temp_ostream *tstream) input = i_stream_create_limit(tstream->dupstream, size); i_stream_unref(&tstream->dupstream); - if (io_stream_copy(&tstream->ostream.ostream, input) > 0) { + *res_r = io_stream_copy(&tstream->ostream.ostream, input); + switch (*res_r) { + case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: /* everything copied */ - ret = 0; - } else if (tstream->ostream.ostream.stream_errno == 0) { - i_assert(input->stream_errno != 0); + ret = FALSE; + break; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: + i_unreached(); + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: tstream->ostream.ostream.stream_errno = input->stream_errno; + io_stream_set_error(&tstream->ostream.iostream, + "iostream-temp: read(%s) failed: %s", + i_stream_get_name(input), + i_stream_get_error(input)); + break; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: + break; } i_stream_destroy(&input); return ret; } -static int o_stream_temp_dup_istream(struct temp_ostream *outstream, - struct istream *instream) +static bool +o_stream_temp_dup_istream(struct temp_ostream *outstream, + struct istream *instream, + enum ostream_send_istream_result *res_r) { uoff_t in_size; if (!instream->readable_fd || i_stream_get_fd(instream) == -1) - return 0; + return FALSE; if (i_stream_get_size(instream, TRUE, &in_size) <= 0) { if (outstream->dupstream != NULL) - return o_stream_temp_dup_cancel(outstream); - return 0; + return o_stream_temp_dup_cancel(outstream, res_r); + return FALSE; } if (outstream->dupstream == NULL) { @@ -175,7 +193,7 @@ static int o_stream_temp_dup_istream(struct temp_ostream *outstream, if (outstream->dupstream != instream || outstream->dupstream_offset != instream->v_offset || outstream->dupstream_offset > in_size) - return o_stream_temp_dup_cancel(outstream); + return o_stream_temp_dup_cancel(outstream, res_r); } i_stream_seek(instream, in_size); /* we should be at EOF now. o_stream_send_istream() asserts if @@ -184,18 +202,20 @@ static int o_stream_temp_dup_istream(struct temp_ostream *outstream, outstream->dupstream_offset = instream->v_offset; outstream->ostream.ostream.offset = outstream->dupstream_offset - outstream->dupstream_start_offset; - return 1; + *res_r = OSTREAM_SEND_ISTREAM_RESULT_FINISHED; + return TRUE; } -static int o_stream_temp_send_istream(struct ostream_private *_outstream, - struct istream *instream) +static enum ostream_send_istream_result +o_stream_temp_send_istream(struct ostream_private *_outstream, + struct istream *instream) { struct temp_ostream *outstream = (struct temp_ostream *)_outstream; - int ret; + enum ostream_send_istream_result res; if ((outstream->flags & IOSTREAM_TEMP_FLAG_TRY_FD_DUP) != 0) { - if ((ret = o_stream_temp_dup_istream(outstream, instream)) != 0) - return ret; + if (!o_stream_temp_dup_istream(outstream, instream, &res)) + return res; outstream->flags &= ~IOSTREAM_TEMP_FLAG_TRY_FD_DUP; } return io_stream_copy(&outstream->ostream.ostream, instream); diff --git a/src/lib/ostream-file.c b/src/lib/ostream-file.c index 51658c97dc..4ccc1740dd 100644 --- a/src/lib/ostream-file.c +++ b/src/lib/ostream-file.c @@ -687,31 +687,40 @@ o_stream_file_write_at(struct ostream_private *stream, return 0; } -static int io_stream_sendfile(struct ostream_private *outstream, - struct istream *instream, int in_fd, - bool *sendfile_not_supported_r) +static bool +io_stream_sendfile(struct ostream_private *outstream, + struct istream *instream, int in_fd, + enum ostream_send_istream_result *res_r) { struct file_ostream *foutstream = (struct file_ostream *)outstream; uoff_t in_size, offset, send_size, v_offset, abs_start_offset; ssize_t ret; + bool sendfile_not_supported = FALSE; - *sendfile_not_supported_r = FALSE; - - if ((ret = i_stream_get_size(instream, TRUE, &in_size)) < 0) - return -1; + if ((ret = i_stream_get_size(instream, TRUE, &in_size)) < 0) { + *res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT; + return TRUE; + } if (ret == 0) { - *sendfile_not_supported_r = TRUE; - return -1; + /* size unknown. we can't use sendfile(). */ + return FALSE; } o_stream_socket_cork(foutstream); /* flush out any data in buffer */ - if ((ret = buffer_flush(foutstream)) <= 0) - return ret; + if ((ret = buffer_flush(foutstream)) < 0) { + *res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT; + return TRUE; + } else if (ret == 0) { + *res_r = OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT; + return TRUE; + } - if (o_stream_lseek(foutstream) < 0) - return -1; + if (o_stream_lseek(foutstream) < 0) { + *res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT; + return TRUE; + } v_offset = instream->v_offset; abs_start_offset = i_stream_get_absolute_offset(instream) - v_offset; @@ -736,7 +745,7 @@ static int io_stream_sendfile(struct ostream_private *outstream, } } if (errno == EINVAL) - *sendfile_not_supported_r = TRUE; + sendfile_not_supported = TRUE; else { io_stream_set_error(&outstream->iostream, "sendfile() failed: %m"); @@ -757,14 +766,22 @@ static int io_stream_sendfile(struct ostream_private *outstream, i_stream_seek(instream, v_offset); if (v_offset == in_size) { instream->eof = TRUE; - return 1; + *res_r = OSTREAM_SEND_ISTREAM_RESULT_FINISHED; + return TRUE; } i_assert(ret <= 0); - return ret; + if (sendfile_not_supported) + return FALSE; + if (ret < 0) + *res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT; + else + *res_r = OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT; + return TRUE; } -static int io_stream_copy_backwards(struct ostream_private *outstream, - struct istream *instream, uoff_t in_size) +static enum ostream_send_istream_result +io_stream_copy_backwards(struct ostream_private *outstream, + struct istream *instream, uoff_t in_size) { struct file_ostream *foutstream = (struct file_ostream *)outstream; uoff_t in_start_offset, in_offset, in_limit, out_offset; @@ -804,6 +821,7 @@ static int io_stream_copy_backwards(struct ostream_private *outstream, i_stream_seek(instream, in_offset); read_size = in_limit - in_offset; + /* FIXME: something's wrong here */ if (i_stream_read_bytes(instream, &data, &size, read_size) == 0) i_unreached(); @@ -832,7 +850,7 @@ static int io_stream_copy_backwards(struct ostream_private *outstream, if (ret < 0) { /* error */ outstream->ostream.stream_errno = errno; - return -1; + return OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT; } i_stream_skip(instream, size); } @@ -841,11 +859,12 @@ static int io_stream_copy_backwards(struct ostream_private *outstream, instream->eof = TRUE; outstream->ostream.offset += in_size - in_start_offset; - return 1; + return OSTREAM_SEND_ISTREAM_RESULT_FINISHED; } -static int io_stream_copy_same_stream(struct ostream_private *outstream, - struct istream *instream) +static enum ostream_send_istream_result +io_stream_copy_same_stream(struct ostream_private *outstream, + struct istream *instream) { uoff_t in_size; off_t in_abs_offset, ret = 0; @@ -853,7 +872,7 @@ static int io_stream_copy_same_stream(struct ostream_private *outstream, /* copying data within same fd. we'll have to be careful with seeks and overlapping writes. */ if ((ret = i_stream_get_size(instream, TRUE, &in_size)) < 0) - return -1; + return OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT; if (ret == 0) { /* if we couldn't find out the size, it means that instream isn't a regular file_istream. we can be reasonably sure that @@ -868,7 +887,7 @@ static int io_stream_copy_same_stream(struct ostream_private *outstream, if (ret == 0) { /* copying data over itself. we don't really need to do that, just fake it. */ - return 1; + return OSTREAM_SEND_ISTREAM_RESULT_FINISHED; } if (ret > 0 && in_size > (uoff_t)ret) { /* overlapping */ @@ -880,21 +899,20 @@ static int io_stream_copy_same_stream(struct ostream_private *outstream, } } -static int o_stream_file_send_istream(struct ostream_private *outstream, - struct istream *instream) +static enum ostream_send_istream_result +o_stream_file_send_istream(struct ostream_private *outstream, + struct istream *instream) { struct file_ostream *foutstream = (struct file_ostream *)outstream; bool same_stream; - int in_fd, ret; - bool sendfile_not_supported; + int in_fd; + enum ostream_send_istream_result res; in_fd = !instream->readable_fd ? -1 : i_stream_get_fd(instream); if (!foutstream->no_sendfile && in_fd != -1 && in_fd != foutstream->fd && instream->seekable) { - ret = io_stream_sendfile(outstream, instream, in_fd, - &sendfile_not_supported); - if (ret >= 0 || !sendfile_not_supported) - return ret; + if (io_stream_sendfile(outstream, instream, in_fd, &res)) + return res; /* sendfile() not supported (with this fd), fallback to regular sending. */ diff --git a/src/lib/ostream-private.h b/src/lib/ostream-private.h index 7f6da788e5..c792e87dd8 100644 --- a/src/lib/ostream-private.h +++ b/src/lib/ostream-private.h @@ -22,8 +22,9 @@ struct ostream_private { unsigned int iov_count); int (*write_at)(struct ostream_private *stream, const void *data, size_t size, uoff_t offset); - int (*send_istream)(struct ostream_private *outstream, - struct istream *instream); + enum ostream_send_istream_result + (*send_istream)(struct ostream_private *outstream, + struct istream *instream); void (*switch_ioloop)(struct ostream_private *stream); /* data: */ @@ -47,7 +48,8 @@ struct ostream * o_stream_create(struct ostream_private *_stream, struct ostream *parent, int fd) ATTR_NULL(2); -int io_stream_copy(struct ostream *outstream, struct istream *instream); +enum ostream_send_istream_result +io_stream_copy(struct ostream *outstream, struct istream *instream); void o_stream_copy_error_from_parent(struct ostream_private *_stream); /* This should be called before sending data to parent stream. It makes sure diff --git a/src/lib/ostream.c b/src/lib/ostream.c index 557d0700d4..d5df233cd7 100644 --- a/src/lib/ostream.c +++ b/src/lib/ostream.c @@ -341,57 +341,71 @@ void o_stream_set_no_error_handling(struct ostream *stream, bool set) stream->real_stream->error_handling_disabled = set; } -int o_stream_send_istream(struct ostream *outstream, struct istream *instream) +enum ostream_send_istream_result +o_stream_send_istream(struct ostream *outstream, struct istream *instream) { struct ostream_private *_outstream = outstream->real_stream; uoff_t old_outstream_offset = outstream->offset; uoff_t old_instream_offset = instream->v_offset; - int ret; - - if (unlikely(outstream->closed || instream->closed || - outstream->stream_errno != 0)) { - errno = outstream->stream_errno; - return -1; - } + enum ostream_send_istream_result res; - ret = _outstream->send_istream(_outstream, instream); - if (instream->stream_errno != 0) { + if (unlikely(instream->closed || instream->stream_errno != 0)) { errno = instream->stream_errno; - return -1; - } else if (outstream->stream_errno != 0) { + return OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT; + } + if (unlikely(outstream->closed || outstream->stream_errno != 0)) { errno = outstream->stream_errno; - return -1; + return OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT; } - if (ret == 0) { - /* partial send */ - i_assert(!outstream->blocking || !instream->blocking); - } else { - /* fully sent everything */ - i_assert(ret == 1); + + res = _outstream->send_istream(_outstream, instream); + switch (res) { + case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: + i_assert(instream->stream_errno == 0); + i_assert(outstream->stream_errno == 0); i_assert(!i_stream_have_bytes_left(instream)); + break; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: + i_assert(!instream->blocking); + break; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: + i_assert(!outstream->blocking); + o_stream_set_flush_pending(outstream, TRUE); + break; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: + i_assert(instream->stream_errno != 0); + return res; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: + i_assert(outstream->stream_errno != 0); + return res; } + /* non-failure - make sure stream offsets match */ i_assert((outstream->offset - old_outstream_offset) == (instream->v_offset - old_instream_offset)); - return ret; + return res; } void o_stream_nsend_istream(struct ostream *outstream, struct istream *instream) { + i_assert(instream->blocking); + switch (o_stream_send_istream(outstream, instream)) { - case 1: + case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: break; - case 0: + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: + i_unreached(); + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: outstream->real_stream->noverflow = TRUE; break; - default: - if (outstream->stream_errno != 0) - break; - i_assert(instream->stream_errno != 0); + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: outstream->stream_errno = instream->stream_errno; io_stream_set_error(&outstream->real_stream->iostream, "nsend-istream: read(%s) failed: %s", i_stream_get_name(instream), - o_stream_get_name(outstream)); + i_stream_get_error(instream)); + break; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: + break; } outstream->real_stream->last_errors_not_checked = TRUE; } @@ -415,7 +429,8 @@ int o_stream_pwrite(struct ostream *stream, const void *data, size_t size, return ret; } -int io_stream_copy(struct ostream *outstream, struct istream *instream) +enum ostream_send_istream_result +io_stream_copy(struct ostream *outstream, struct istream *instream) { struct const_iovec iov; const unsigned char *data; @@ -423,14 +438,18 @@ int io_stream_copy(struct ostream *outstream, struct istream *instream) while (i_stream_read_more(instream, &data, &iov.iov_len) > 0) { iov.iov_base = data; - if ((ret = o_stream_sendv(outstream, &iov, 1)) <= 0) - return ret; + if ((ret = o_stream_sendv(outstream, &iov, 1)) < 0) + return OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT; + else if (ret == 0) + return OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT; i_stream_skip(instream, ret); } if (instream->stream_errno != 0) - return -1; - return i_stream_have_bytes_left(instream) ? 0 : 1; + return OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT; + if (i_stream_have_bytes_left(instream)) + return OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT; + return OSTREAM_SEND_ISTREAM_RESULT_FINISHED; } void o_stream_switch_ioloop(struct ostream *stream) @@ -581,8 +600,9 @@ o_stream_default_write_at(struct ostream_private *_stream, return -1; } -static int o_stream_default_send_istream(struct ostream_private *outstream, - struct istream *instream) +static enum ostream_send_istream_result +o_stream_default_send_istream(struct ostream_private *outstream, + struct istream *instream) { return io_stream_copy(&outstream->ostream, instream); } diff --git a/src/lib/ostream.h b/src/lib/ostream.h index 4808f05c10..2b9afe1874 100644 --- a/src/lib/ostream.h +++ b/src/lib/ostream.h @@ -3,6 +3,20 @@ #include "ioloop.h" +enum ostream_send_istream_result { + /* All of the istream was successfully sent to ostream. */ + OSTREAM_SEND_ISTREAM_RESULT_FINISHED, + /* Caller needs to wait for more input from non-blocking istream. */ + OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT, + /* Caller needs to wait for output to non-blocking ostream. + o_stream_set_flush_pending() is automatically called. */ + OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT, + /* Read from istream failed. See istream->stream_errno. */ + OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT, + /* Write to ostream failed. See ostream->stream_errno. */ + OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT +}; + struct ostream { /* Number of bytes sent via o_stream_send*() and similar functions. This is counting the input data. For example with a compressed @@ -151,24 +165,17 @@ void o_stream_ignore_last_errors(struct ostream *stream); When creating wrapper streams, they copy this behavior from the parent stream. */ void o_stream_set_no_error_handling(struct ostream *stream, bool set); -/* Send data from input stream. Returns 1 if the entire instream was sent - without errors, 0 if either instream or outstream is nonblocking and not - everything was sent, or -1 if either instream or outstream failed (see their - stream_errno for which one). +/* Send all of the instream to outstream. On non-failure instream is skips over all data written to outstream. This means that the number of bytes written to outstream is always equal to the number of bytes skipped in instream. - For non-blocking outstreams: Note that this function may not add anything to - the output buffer, so if you want the flush callback to be called when more - data can be written, you'll need to call o_stream_set_flush_pending() - explicitly. - It's also possible to use this function to copy data within same file descriptor, even if the source and destination overlaps. If the file must be grown, you have to do it manually before calling this function. */ -int o_stream_send_istream(struct ostream *outstream, struct istream *instream); +enum ostream_send_istream_result +o_stream_send_istream(struct ostream *outstream, struct istream *instream); /* Same as o_stream_send_istream(), but assume that reads and writes will succeed. If not, o_stream_nfinish() will fail with the correct error message (even istream's). */ diff --git a/src/lib/test-iostream-temp.c b/src/lib/test-iostream-temp.c index d4157ba3c5..0a3bdf7b1b 100644 --- a/src/lib/test-iostream-temp.c +++ b/src/lib/test-iostream-temp.c @@ -61,7 +61,7 @@ static void test_iostream_temp_istream(void) /* a working fd-dup */ output = iostream_temp_create_sized(".nonexistent/", IOSTREAM_TEMP_FLAG_TRY_FD_DUP, "test", 1); - test_assert(o_stream_send_istream(output, input) > 0); + test_assert(o_stream_send_istream(output, input) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED); temp_input = iostream_temp_finish(&output, 128); test_assert(i_stream_read(temp_input) == 6); i_stream_destroy(&temp_input); @@ -72,7 +72,7 @@ static void test_iostream_temp_istream(void) IOSTREAM_TEMP_FLAG_TRY_FD_DUP, "test", 4); test_assert(o_stream_send(output, "1234", 4) == 4); test_expect_errors(1); - test_assert(o_stream_send_istream(output, input) > 0); + test_assert(o_stream_send_istream(output, input) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED); test_expect_no_more_errors(); o_stream_destroy(&output); @@ -80,7 +80,7 @@ static void test_iostream_temp_istream(void) i_stream_seek(input, 0); output = iostream_temp_create_sized(".intentional-nonexistent-error/", IOSTREAM_TEMP_FLAG_TRY_FD_DUP, "test", 4); - test_assert(o_stream_send_istream(output, input) > 0); + test_assert(o_stream_send_istream(output, input) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED); test_expect_errors(1); test_assert(o_stream_send(output, "1", 1) == 1); test_expect_no_more_errors(); @@ -91,9 +91,9 @@ static void test_iostream_temp_istream(void) input2 = i_stream_create_limit(input, (uoff_t)-1); output = iostream_temp_create_sized(".intentional-nonexistent-error/", IOSTREAM_TEMP_FLAG_TRY_FD_DUP, "test", 4); - test_assert(o_stream_send_istream(output, input) > 0); + test_assert(o_stream_send_istream(output, input) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED); test_expect_errors(1); - test_assert(o_stream_send_istream(output, input2) > 0); + test_assert(o_stream_send_istream(output, input2) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED); test_expect_no_more_errors(); o_stream_destroy(&output); i_stream_unref(&input2); diff --git a/src/lib/test-ostream-file.c b/src/lib/test-ostream-file.c index d7573063fb..281e701c9d 100644 --- a/src/lib/test-ostream-file.c +++ b/src/lib/test-ostream-file.c @@ -95,7 +95,7 @@ static void test_ostream_file_send_istream_file(void) /* test that writing works between two files */ i_stream_seek(input, 3); input2 = i_stream_create_limit(input, 4); - test_assert(o_stream_send_istream(output, input2) > 0); + test_assert(o_stream_send_istream(output, input2) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED); test_assert(output->offset == 4); test_assert(pread(fd, buf, sizeof(buf), 0) == 4 && memcmp(buf, "4567", 4) == 0); @@ -109,7 +109,7 @@ static void test_ostream_file_send_istream_file(void) o_stream_seek(output, 1); i_stream_seek(input, 2); input2 = i_stream_create_limit(input, 2); - test_assert(o_stream_send_istream(output, input2) > 0); + test_assert(o_stream_send_istream(output, input2) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED); test_assert(output->offset == 3); test_assert(pread(fd, buf, sizeof(buf), 0) == 4 && memcmp(buf, "4677", 4) == 0); @@ -121,7 +121,7 @@ static void test_ostream_file_send_istream_file(void) test_assert(pwrite(fd, buf, 4, 0) == 4); input = i_stream_create_fd(fd, 1024, FALSE); o_stream_seek(output, 1); - test_assert(o_stream_send_istream(output, input) > 0); + test_assert(o_stream_send_istream(output, input) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED); test_assert(output->offset == 5); test_assert(pread(fd, buf, sizeof(buf), 0) == 5 && memcmp(buf, "11234", 5) == 0); @@ -158,7 +158,7 @@ static void test_ostream_file_send_istream_sendfile(void) /* test that sendfile() works */ i_stream_seek(input, 3); input2 = i_stream_create_limit(input, 4); - test_assert(o_stream_send_istream(output, input2) > 0); + test_assert(o_stream_send_istream(output, input2) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED); test_assert(output->offset == 4); test_assert(read(sock_fd[1], buf, sizeof(buf)) == 4 && memcmp(buf, "defg", 4) == 0); diff --git a/src/plugins/mail-filter/istream-ext-filter.c b/src/plugins/mail-filter/istream-ext-filter.c index 93112ba582..0c8ac22702 100644 --- a/src/plugins/mail-filter/istream-ext-filter.c +++ b/src/plugins/mail-filter/istream-ext-filter.c @@ -80,13 +80,8 @@ i_stream_mail_filter_read_once(struct mail_filter_istream *mstream) if (mstream->ext_out != NULL) { /* we haven't sent everything yet */ - ret = o_stream_send_istream(mstream->ext_out, stream->parent); - if (mstream->ext_out->stream_errno != 0) { - stream->istream.stream_errno = - mstream->ext_out->stream_errno; - return -1; - } - if (ret > 0) { + switch (o_stream_send_istream(mstream->ext_out, stream->parent)) { + case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: o_stream_destroy(&mstream->ext_out); /* if we wanted to be a blocking stream, from now on the rest of the reads are */ @@ -94,6 +89,19 @@ i_stream_mail_filter_read_once(struct mail_filter_istream *mstream) net_set_nonblock(mstream->fd, FALSE); if (shutdown(mstream->fd, SHUT_WR) < 0) i_error("ext-filter: shutdown() failed: %m"); + break; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: + break; + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: + stream->istream.stream_errno = + mstream->ext_out->stream_errno; + io_stream_set_error(&stream->iostream, + "write(%s) failed: %s", + o_stream_get_name(mstream->ext_out), + o_stream_get_error(mstream->ext_out)); + return -1; } }