diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c index 8043968cfa8..16df09f2fad 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c @@ -50,9 +50,7 @@ #include "cloudwatch_api.h" #define ERR_CODE_ALREADY_EXISTS "ResourceAlreadyExistsException" -#define ERR_CODE_INVALID_SEQUENCE_TOKEN "InvalidSequenceTokenException" #define ERR_CODE_NOT_FOUND "ResourceNotFoundException" -#define ERR_CODE_DATA_ALREADY_ACCEPTED "DataAlreadyAcceptedException" #define AMZN_REQUEST_ID_HEADER "x-amzn-RequestId" @@ -229,23 +227,6 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf, goto error; } - if (stream->sequence_token) { - if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, - "\"sequenceToken\":\"", 17)) { - goto error; - } - - if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, - stream->sequence_token, 0)) { - goto error; - } - - if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, - "\",", 2)) { - goto error; - } - } - if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, "\"logEvents\":[", 13)) { goto error; @@ -493,9 +474,6 @@ void reset_flush_buf(struct flb_cloudwatch *ctx, struct cw_flush *buf) { if (buf->current_stream != NULL) { buf->data_size += strlen(buf->current_stream->name); buf->data_size += strlen(buf->current_stream->group); - if (buf->current_stream->sequence_token) { - buf->data_size += strlen(buf->current_stream->sequence_token); - } } } @@ -1153,7 +1131,6 @@ static int set_log_group_retention(struct flb_cloudwatch *ctx, struct log_stream struct flb_aws_client *cw_client; flb_sds_t body; flb_sds_t tmp; - flb_sds_t error; flb_plg_info(ctx->ins, "Setting retention policy on log group %s to %dd", stream->group, ctx->log_retention_days); @@ -1196,17 +1173,9 @@ static int set_log_group_retention(struct flb_cloudwatch *ctx, struct log_stream /* Check error */ if (c->resp.payload_size > 0) { - error = flb_aws_error(c->resp.payload, c->resp.payload_size); - if (error != NULL) { - /* some other error occurred; notify user */ - flb_aws_print_error(c->resp.payload, c->resp.payload_size, - "PutRetentionPolicy", ctx->ins); - flb_sds_destroy(error); - } - else { - /* error can not be parsed, print raw response to debug */ - flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload); - } + /* some error occurred; notify user */ + flb_aws_print_error(c->resp.payload, c->resp.payload_size, + "PutRetentionPolicy", ctx->ins); } } @@ -1287,8 +1256,8 @@ int create_log_group(struct flb_cloudwatch *ctx, struct log_stream *stream) flb_sds_destroy(error); } else { - /* error can not be parsed, print raw response to debug */ - flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload); + /* error can not be parsed, print raw response */ + flb_plg_error(ctx->ins, "Raw response: %s", c->resp.payload); } } } @@ -1402,8 +1371,8 @@ int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream, flb_sds_destroy(error); } else { - /* error can not be parsed, print raw response to debug */ - flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload); + /* error can not be parsed, print raw response */ + flb_plg_error(ctx->ins, "Raw response: %s", c->resp.payload); } } } @@ -1417,8 +1386,7 @@ int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream, } /* - * Returns -1 on failure, 0 on success, and 1 for a sequence token error, - * which means the caller can retry. + * Returns -1 on failure, 0 on success */ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf, struct log_stream *stream, size_t payload_size) @@ -1427,7 +1395,6 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf, struct flb_http_client *c = NULL; struct flb_aws_client *cw_client; flb_sds_t tmp; - flb_sds_t error; int num_headers = 1; int retry = FLB_TRUE; @@ -1460,8 +1427,7 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf, if (c->resp.data == NULL || c->resp.data_len == 0 || strstr(c->resp.data, AMZN_REQUEST_ID_HEADER) == NULL) { /* code was 200, but response is invalid, treat as failure */ if (c->resp.data != NULL) { - flb_plg_debug(ctx->ins, "Could not find sequence token in " - "response: response body is empty: full data: `%.*s`", c->resp.data_len, c->resp.data); + flb_plg_debug(ctx->ins, "Invalid response: full data: `%.*s`", c->resp.data_len, c->resp.data); } flb_http_client_destroy(c); @@ -1474,27 +1440,6 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf, AMZN_REQUEST_ID_HEADER); return -1; } - - - /* success */ - if (c->resp.payload_size > 0) { - flb_plg_debug(ctx->ins, "Sent events to %s", stream->name); - tmp = flb_json_get_val(c->resp.payload, c->resp.payload_size, - "nextSequenceToken"); - if (tmp) { - if (stream->sequence_token != NULL) { - flb_sds_destroy(stream->sequence_token); - } - stream->sequence_token = tmp; - - flb_http_client_destroy(c); - return 0; - } - else { - flb_plg_error(ctx->ins, "Could not find sequence token in " - "response: %s", c->resp.payload); - } - } flb_http_client_destroy(c); return 0; @@ -1502,45 +1447,8 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf, /* Check error */ if (c->resp.payload_size > 0) { - error = flb_aws_error(c->resp.payload, c->resp.payload_size); - if (error != NULL) { - if (strcmp(error, ERR_CODE_INVALID_SEQUENCE_TOKEN) == 0) { - /* - * This case will happen when we do not know the correct - * sequence token; we can find it in the error response - * and retry. - */ - flb_plg_debug(ctx->ins, "Sequence token was invalid, " - "will retry"); - tmp = flb_json_get_val(c->resp.payload, c->resp.payload_size, - "expectedSequenceToken"); - if (tmp) { - if (stream->sequence_token != NULL) { - flb_sds_destroy(stream->sequence_token); - } - stream->sequence_token = tmp; - flb_sds_destroy(error); - flb_http_client_destroy(c); - /* tell the caller to retry */ - return 1; - } - } else if (strcmp(error, ERR_CODE_DATA_ALREADY_ACCEPTED) == 0) { - /* not sure what causes this but it counts as success */ - flb_plg_info(ctx->ins, "Got %s, a previous retry must have succeeded asychronously", ERR_CODE_DATA_ALREADY_ACCEPTED); - flb_sds_destroy(error); - flb_http_client_destroy(c); - /* success */ - return 0; - } - /* some other error occurred; notify user */ - flb_aws_print_error(c->resp.payload, c->resp.payload_size, - "PutLogEvents", ctx->ins); - flb_sds_destroy(error); - } - else { - /* error could not be parsed, print raw response to debug */ - flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload); - } + flb_aws_print_error(c->resp.payload, c->resp.payload_size, + "PutLogEvents", ctx->ins); } } diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.c b/plugins/out_cloudwatch_logs/cloudwatch_logs.c index f6aef224088..87c31949a55 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.c @@ -56,7 +56,6 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins, const char *tmp; char *session_name = NULL; struct flb_cloudwatch *ctx = NULL; - struct cw_flush *buf = NULL; int ret; flb_sds_t tmp_sds = NULL; (void) config; @@ -348,18 +347,33 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins, flb_output_upstream_set(upstream, ctx->ins); ctx->cw_client->host = ctx->endpoint; - /* alloc the payload/processing buffer */ + /* Export context */ + flb_output_set_context(ins, ctx); + + return 0; + +error: + flb_free(session_name); + flb_plg_error(ctx->ins, "Initialization failed"); + flb_cloudwatch_ctx_destroy(ctx); + return -1; +} + +struct cw_flush *new_buffer() +{ + struct cw_flush *buf; + buf = flb_calloc(1, sizeof(struct cw_flush)); if (!buf) { flb_errno(); - goto error; + return NULL; } buf->out_buf = flb_malloc(PUT_LOG_EVENTS_PAYLOAD_SIZE); if (!buf->out_buf) { flb_errno(); cw_flush_destroy(buf); - goto error; + return NULL; } buf->out_buf_size = PUT_LOG_EVENTS_PAYLOAD_SIZE; @@ -367,7 +381,7 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins, if (!buf->tmp_buf) { flb_errno(); cw_flush_destroy(buf); - goto error; + return NULL; } buf->tmp_buf_size = PUT_LOG_EVENTS_PAYLOAD_SIZE; @@ -375,23 +389,11 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins, if (!buf->events) { flb_errno(); cw_flush_destroy(buf); - goto error; + return NULL; } buf->events_capacity = MAX_EVENTS_PER_PUT; - ctx->buf = buf; - - - /* Export context */ - flb_output_set_context(ins, ctx); - - return 0; - -error: - flb_free(session_name); - flb_plg_error(ctx->ins, "Initialization failed"); - flb_cloudwatch_ctx_destroy(ctx); - return -1; + return buf; } static void cb_cloudwatch_flush(struct flb_event_chunk *event_chunk, @@ -405,15 +407,21 @@ static void cb_cloudwatch_flush(struct flb_event_chunk *event_chunk, (void) i_ins; (void) config; - event_count = process_and_send(ctx, i_ins->p->name, ctx->buf, event_chunk->tag, - event_chunk->data, event_chunk->size); + struct cw_flush *buf; + + buf = new_buffer(); + if (!buf) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + event_count = process_and_send(ctx, i_ins->p->name, buf, event_chunk->tag, event_chunk->data, event_chunk->size); if (event_count < 0) { flb_plg_error(ctx->ins, "Failed to send events"); + cw_flush_destroy(buf); FLB_OUTPUT_RETURN(FLB_RETRY); } - // TODO: this msg is innaccurate if events are skipped - flb_plg_debug(ctx->ins, "Sent %d events to CloudWatch", event_count); + cw_flush_destroy(buf); FLB_OUTPUT_RETURN(FLB_OK); } @@ -429,10 +437,6 @@ void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx) flb_aws_provider_destroy(ctx->base_aws_provider); } - if (ctx->buf) { - cw_flush_destroy(ctx->buf); - } - if (ctx->aws_provider) { flb_aws_provider_destroy(ctx->aws_provider); } @@ -496,9 +500,6 @@ void log_stream_destroy(struct log_stream *stream) if (stream->name) { flb_sds_destroy(stream->name); } - if (stream->sequence_token) { - flb_sds_destroy(stream->sequence_token); - } if (stream->group) { flb_sds_destroy(stream->group); } @@ -657,12 +658,7 @@ struct flb_output_plugin out_cloudwatch_logs_plugin = { .cb_init = cb_cloudwatch_init, .cb_flush = cb_cloudwatch_flush, .cb_exit = cb_cloudwatch_exit, - - /* - * Allow cloudwatch to use async network stack synchronously by opting into - * FLB_OUTPUT_SYNCHRONOUS synchronous task scheduler - */ - .flags = FLB_OUTPUT_SYNCHRONOUS, + .flags = 0, .workers = 1, /* Configuration */ diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.h b/plugins/out_cloudwatch_logs/cloudwatch_logs.h index 7fe8bf0b764..b08ef9df0cf 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.h @@ -70,7 +70,7 @@ struct cw_event { struct log_stream { flb_sds_t name; flb_sds_t group; - flb_sds_t sequence_token; + /* * log streams in CloudWatch do not expire; but our internal representations * of them are periodically cleaned up if they have been unused for too long @@ -87,8 +87,6 @@ struct log_stream { struct mk_list _head; }; -void log_stream_destroy(struct log_stream *stream); - struct flb_cloudwatch { /* * TLS instances can not be re-used. So we have one for: @@ -138,8 +136,6 @@ struct flb_cloudwatch { /* stores log streams we're putting to */ struct mk_list streams; - /* buffers for data processing and request payload */ - struct cw_flush *buf; /* The namespace to use for the metric */ flb_sds_t metric_namespace; @@ -155,4 +151,6 @@ struct flb_cloudwatch { void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx); +void log_stream_destroy(struct log_stream *stream); + #endif diff --git a/src/aws/flb_aws_util.c b/src/aws/flb_aws_util.c index 533bba7eb4d..b8550b3bcfd 100644 --- a/src/aws/flb_aws_util.c +++ b/src/aws/flb_aws_util.c @@ -581,6 +581,8 @@ void flb_aws_print_error(char *response, size_t response_len, error = flb_json_get_val(response, response_len, "__type"); if (!error) { + /* error can not be parsed, print raw response */ + flb_plg_error(ins, "Raw response: %s", response); return; } @@ -596,6 +598,7 @@ void flb_aws_print_error(char *response, size_t response_len, } flb_sds_destroy(error); + return; } /* parses AWS JSON API error responses and returns the value of the __type field */