Skip to content

Commit

Permalink
o_stream_send_istream() API changed again
Browse files Browse the repository at this point in the history
The new API enforces the caller to correctly handle all the possible
situations. It also makes in unambiguous whether to wait for input or
output stream.
  • Loading branch information
sirainen authored and GitLab committed May 30, 2016
1 parent 1ada45c commit 378e6cb
Show file tree
Hide file tree
Showing 23 changed files with 411 additions and 265 deletions.
30 changes: 22 additions & 8 deletions src/doveadm/server-connection.c
Expand Up @@ -83,26 +83,40 @@ static void print_connection_released(void)

static int server_connection_send_cmd_input_more(struct server_connection *conn)
{
int ret;
enum ostream_send_istream_result res;
int ret = -1;

/* ostream-dot writes only up to max buffer size, so keep it non-zero */
o_stream_set_max_buffer_size(conn->cmd_output, IO_BLOCK_SIZE);
ret = o_stream_send_istream(conn->cmd_output, conn->cmd_input);
res = o_stream_send_istream(conn->cmd_output, conn->cmd_input);
o_stream_set_max_buffer_size(conn->cmd_output, (size_t)-1);

if (ret == 0) {
o_stream_set_flush_pending(conn->cmd_output, TRUE);
switch (res) {
case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
break;
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
return 1;
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
return 0;
}
if (conn->cmd_input->stream_errno != 0) {
case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
i_error("read(%s) failed: %s",
i_stream_get_name(conn->cmd_input),
i_stream_get_error(conn->cmd_input));
} else if (conn->cmd_output->stream_errno != 0 ||
o_stream_flush(conn->cmd_output) < 0) {
break;
case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
i_error("write(%s) failed: %s",
o_stream_get_name(conn->cmd_output),
o_stream_get_error(conn->cmd_output));
break;
}
if (res == OSTREAM_SEND_ISTREAM_RESULT_FINISHED) {
if ((ret = o_stream_flush(conn->cmd_output)) == 0)
return 0;
else if (ret < 0) {
i_error("write(%s) failed: %s",
o_stream_get_name(conn->cmd_output),
o_stream_get_error(conn->cmd_output));
}
}

i_stream_destroy(&conn->cmd_input);
Expand Down
20 changes: 13 additions & 7 deletions src/imap/cmd-getmetadata.c
Expand Up @@ -211,25 +211,31 @@ static void cmd_getmetadata_send_entry(struct imap_getmetadata_context *ctx,
static bool
cmd_getmetadata_stream_continue(struct imap_getmetadata_context *ctx)
{
int ret;
enum ostream_send_istream_result res;

o_stream_set_max_buffer_size(ctx->cmd->client->output, 0);
ret = o_stream_send_istream(ctx->cmd->client->output, ctx->cur_stream);
res = o_stream_send_istream(ctx->cmd->client->output, ctx->cur_stream);
o_stream_set_max_buffer_size(ctx->cmd->client->output, (size_t)-1);

if (ret > 0) {
/* finished */
switch (res) {
case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
return TRUE;
} else if (ret < 0) {
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
i_unreached();
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
return FALSE;
case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
i_error("read(%s) failed: %s",
i_stream_get_name(ctx->cur_stream),
i_stream_get_error(ctx->cur_stream));
client_disconnect(ctx->cmd->client,
"Internal GETMETADATA failure");
return TRUE;
case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
/* client disconnected */
return TRUE;
}
o_stream_set_flush_pending(ctx->cmd->client->output, TRUE);
return FALSE;
i_unreached();
}

static int
Expand Down
24 changes: 13 additions & 11 deletions src/imap/cmd-urlfetch.c
Expand Up @@ -83,6 +83,7 @@ static int cmd_urlfetch_transfer_literal(struct client_command_context *cmd)
struct client *client = cmd->client;
struct cmd_urlfetch_context *ctx =
(struct cmd_urlfetch_context *)cmd->context;
enum ostream_send_istream_result res;
int ret;

/* are we in the middle of an urlfetch literal? */
Expand All @@ -98,27 +99,28 @@ static int cmd_urlfetch_transfer_literal(struct client_command_context *cmd)

/* transfer literal to client */
o_stream_set_max_buffer_size(client->output, 0);
ret = o_stream_send_istream(client->output, ctx->input);
res = o_stream_send_istream(client->output, ctx->input);
o_stream_set_max_buffer_size(client->output, (size_t)-1);

if (ret > 0) {
/* finished successfully */
switch (res) {
case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
i_stream_unref(&ctx->input);
return 1;
}
if (client->output->closed) {
/* client disconnected */
return -1;
}
if (ctx->input->stream_errno != 0) {
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
i_unreached();
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
return 0;
case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
i_error("read(%s) failed: %s (URLFETCH)",
i_stream_get_name(ctx->input),
i_stream_get_error(ctx->input));
client_disconnect(client, "URLFETCH failed");
return -1;
case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
/* client disconnected */
return -1;
}
o_stream_set_flush_pending(client->output, TRUE);
return 0;
i_unreached();
}

static bool cmd_urlfetch_continue(struct client_command_context *cmd)
Expand Down
34 changes: 17 additions & 17 deletions src/imap/imap-fetch-body.c
Expand Up @@ -93,25 +93,20 @@ static int fetch_stream_continue(struct imap_fetch_context *ctx)
struct imap_fetch_state *state = &ctx->state;
const char *disconnect_reason;
uoff_t orig_input_offset = state->cur_input->v_offset;
int ret;
enum ostream_send_istream_result res;

o_stream_set_max_buffer_size(ctx->client->output, 0);
ret = o_stream_send_istream(ctx->client->output, state->cur_input);
res = o_stream_send_istream(ctx->client->output, state->cur_input);
o_stream_set_max_buffer_size(ctx->client->output, (size_t)-1);

if (ctx->state.cur_stats_sizep != NULL) {
*ctx->state.cur_stats_sizep +=
state->cur_input->v_offset - orig_input_offset;
}

if (state->cur_input->v_offset != state->cur_size) {
/* unfinished */
if (state->cur_input->stream_errno != 0) {
fetch_read_error(ctx, &disconnect_reason);
client_disconnect(ctx->client, disconnect_reason);
return -1;
}
if (!i_stream_have_bytes_left(state->cur_input)) {
switch (res) {
case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
if (state->cur_input->v_offset != state->cur_size) {
/* Input stream gave less data than expected */
mail_set_cache_corrupted_reason(state->cur_mail,
state->cur_size_field, t_strdup_printf(
Expand All @@ -123,15 +118,20 @@ static int fetch_stream_continue(struct imap_fetch_context *ctx)
client_disconnect(ctx->client, "FETCH failed");
return -1;
}
if (ret < 0) {
/* client probably disconnected */
return -1;
}

o_stream_set_flush_pending(ctx->client->output, TRUE);
return 1;
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
i_unreached();
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
return 0;
case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
fetch_read_error(ctx, &disconnect_reason);
client_disconnect(ctx->client, disconnect_reason);
return -1;
case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
/* client disconnected */
return -1;
}
return 1;
i_unreached();
}

static const char *
Expand Down
12 changes: 7 additions & 5 deletions src/lib-fs/fs-api.c
Expand Up @@ -831,20 +831,22 @@ int fs_default_copy(struct fs_file *src, struct fs_file *dest)
dest->copy_input = fs_read_stream(src, IO_BLOCK_SIZE);
dest->copy_output = fs_write_stream(dest);
}
if (o_stream_send_istream(dest->copy_output, dest->copy_input) == 0) {
switch (o_stream_send_istream(dest->copy_output, dest->copy_input)) {
case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
break;
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
fs_set_error_async(dest->fs);
return -1;
}
if (dest->copy_input->stream_errno != 0) {
case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
errno = dest->copy_input->stream_errno;
fs_set_error(dest->fs, "read(%s) failed: %s",
i_stream_get_name(dest->copy_input),
i_stream_get_error(dest->copy_input));
i_stream_unref(&dest->copy_input);
fs_write_stream_abort(dest, &dest->copy_output);
return -1;
}
if (dest->copy_output->stream_errno != 0) {
case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
errno = dest->copy_output->stream_errno;
fs_set_error(dest->fs, "write(%s) failed: %s",
o_stream_get_name(dest->copy_output),
Expand Down
8 changes: 4 additions & 4 deletions src/lib-fs/ostream-metawrap.c
Expand Up @@ -38,20 +38,20 @@ o_stream_metawrap_sendv(struct ostream_private *stream,
return ret;
}

static int
static enum ostream_send_istream_result
o_stream_metawrap_send_istream(struct ostream_private *_outstream,
struct istream *instream)
{
struct metawrap_ostream *outstream =
(struct metawrap_ostream *)_outstream;
uoff_t orig_instream_offset = instream->v_offset;
int ret;
enum ostream_send_istream_result res;

o_stream_metawrap_call_callback(outstream);
if ((ret = o_stream_send_istream(_outstream->parent, instream)) < 0)
if ((res = o_stream_send_istream(_outstream->parent, instream)) == OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT)
o_stream_copy_error_from_parent(_outstream);
_outstream->ostream.offset += instream->v_offset - orig_instream_offset;
return ret;
return res;
}

struct ostream *
Expand Down
67 changes: 34 additions & 33 deletions src/lib-http/http-client-request.c
Expand Up @@ -833,7 +833,7 @@ int http_client_request_send_more(struct http_client_request *req,
{
struct http_client_connection *conn = req->conn;
struct ostream *output = req->payload_output;
int ret;
enum ostream_send_istream_result res;

i_assert(req->payload_input != NULL);
i_assert(req->payload_output != NULL);
Expand All @@ -843,31 +843,11 @@ int http_client_request_send_more(struct http_client_request *req,

/* chunked ostream needs to write to the parent stream's buffer */
o_stream_set_max_buffer_size(output, IO_BLOCK_SIZE);
ret = o_stream_send_istream(output, req->payload_input);
res = o_stream_send_istream(output, req->payload_input);
o_stream_set_max_buffer_size(output, (size_t)-1);

if (req->payload_input->stream_errno != 0) {
/* we're in the middle of sending a request, so the connection
will also have to be aborted */
*error_r = t_strdup_printf("read(%s) failed: %s",
i_stream_get_name(req->payload_input),
i_stream_get_error(req->payload_input));

/* the payload stream assigned to this request is broken,
fail this the request immediately */
http_client_request_error(&req,
HTTP_CLIENT_REQUEST_ERROR_BROKEN_PAYLOAD,
"Broken payload stream");
return -1;
} else if (output->stream_errno != 0) {
/* failed to send request */
*error_r = t_strdup_printf("write(%s) failed: %s",
o_stream_get_name(output),
o_stream_get_error(output));
return -1;
}

if (ret > 0) {
switch (res) {
case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
/* finished sending */
if (!req->payload_chunked &&
req->payload_input->v_offset - req->payload_offset != req->payload_size) {
Expand All @@ -891,22 +871,43 @@ int http_client_request_send_more(struct http_client_request *req,
/* finished sending payload */
http_client_request_finish_payload_out(req);
}
} else if (i_stream_have_bytes_left(req->payload_input)) {
/* output is blocking (server needs to act; enable timeout) */
conn->output_locked = TRUE;
if (!pipelined)
http_client_connection_start_request_timeout(conn);
o_stream_set_flush_pending(output, TRUE);
http_client_request_debug(req, "Partially sent payload");
} else {
return 0;
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
/* input is blocking (client needs to act; disable timeout) */
conn->output_locked = TRUE;
if (!pipelined)
http_client_connection_stop_request_timeout(conn);
conn->io_req_payload = io_add_istream(req->payload_input,
http_client_request_payload_input, req);
return 0;
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
/* output is blocking (server needs to act; enable timeout) */
conn->output_locked = TRUE;
if (!pipelined)
http_client_connection_start_request_timeout(conn);
http_client_request_debug(req, "Partially sent payload");
return 0;
case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
/* we're in the middle of sending a request, so the connection
will also have to be aborted */
*error_r = t_strdup_printf("read(%s) failed: %s",
i_stream_get_name(req->payload_input),
i_stream_get_error(req->payload_input));

/* the payload stream assigned to this request is broken,
fail this the request immediately */
http_client_request_error(&req,
HTTP_CLIENT_REQUEST_ERROR_BROKEN_PAYLOAD,
"Broken payload stream");
return -1;
case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
/* failed to send request */
*error_r = t_strdup_printf("write(%s) failed: %s",
o_stream_get_name(output),
o_stream_get_error(output));
return -1;
}
return 0;
i_unreached();
}

static int http_client_request_send_real(struct http_client_request *req,
Expand Down

0 comments on commit 378e6cb

Please sign in to comment.