Skip to content

Commit

Permalink
multiline: respect emitter paused status
Browse files Browse the repository at this point in the history
When the multiline plugin tries to send logs to the emitter, it never
used to check if the emitter was paused or not. The input chunk
appending routines would subsequently reject the log when it got to the
point of checking if the current input plugin is paused. This would
return -1 and cause the esoteric "error registering chunk for tag"
message to be spammed constantly while the emitter is paused.

This PR adds a check before appending logs to the emitter to see if it
is currently paused. If it is, then the log will not be sent to the
emitter and will be dropped (just as it would have been if it went
through the rest of the chunk appending procedure, but cuts out all the
extra steps).

I have an automatic whitespace trimmer that runs on save, so there's
lots of trailing whitespace deleted too sorry lol

Signed-off-by: braydonk <braydonk@google.com>
  • Loading branch information
braydonk committed Feb 22, 2024
1 parent 202da13 commit 1f250c5
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 16 deletions.
55 changes: 40 additions & 15 deletions plugins/filter_multiline/ml.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ static int flush_callback(struct flb_ml_parser *parser,
int ret;
struct ml_ctx *ctx = data;
struct ml_stream *stream;
char *name;
uint64_t ts;

if (ctx->debug_flush) {
flb_ml_flush_stdout(parser, mst, data, buf_data, buf_size);
Expand All @@ -166,6 +168,16 @@ static int flush_callback(struct flb_ml_parser *parser,
return 0;

} else { /* buffered mode */
if (ctx->ins_emitter->mem_buf_status == FLB_INPUT_PAUSED) {
flb_plg_debug(ctx->ins, "emitter is paused, dropping log");
#ifdef FLB_HAVE_METRICS
name = (char*) flb_filter_name(ctx->ins);
ts = cfl_time_now();
cmt_counter_inc(ctx->cmt_dropped_by_emitter, ts, 1, (char* []) {name});
#endif
return -1;
}

stream = get_by_id(ctx, mst->id);
if (!stream) {
flb_plg_error(ctx->ins, "Could not find tag to re-emit from stream %s",
Expand Down Expand Up @@ -204,9 +216,9 @@ static int cb_ml_init(struct flb_filter_instance *ins,
ctx->config = config;
ctx->timer_created = FLB_FALSE;

/*
/*
* Config map is not yet set at this point in the code
* user must explicitly set buffer to false to turn it off
* user must explicitly set buffer to false to turn it off
*/
ctx->use_buffer = FLB_TRUE;
tmp = (char *) flb_filter_get_property("buffer", ins);
Expand All @@ -221,7 +233,7 @@ static int cb_ml_init(struct flb_filter_instance *ins,
} else if (strcasecmp(tmp, FLB_MULTILINE_MODE_PARSER) == 0) {
ctx->partial_mode = FLB_FALSE;
} else {
flb_plg_error(ins, "'Mode' must be '%s' or '%s'",
flb_plg_error(ins, "'Mode' must be '%s' or '%s'",
FLB_MULTILINE_MODE_PARTIAL_MESSAGE,
FLB_MULTILINE_MODE_PARSER);
return -1;
Expand Down Expand Up @@ -278,7 +290,7 @@ static int cb_ml_init(struct flb_filter_instance *ins,
flb_free(ctx);
return -1;
}

/* Set plugin context */
flb_filter_set_context(ins, ctx);

Expand Down Expand Up @@ -313,7 +325,7 @@ static int cb_ml_init(struct flb_filter_instance *ins,
flb_free(ctx);
return -1;
}

/* Create the emitter context */
ret = emitter_create(ctx);
if (ret == -1) {
Expand All @@ -328,6 +340,10 @@ static int cb_ml_init(struct flb_filter_instance *ins,
"Total number of emitted records",
1, (char *[]) {"name"});

ctx->cmt_dropped_by_emitter = cmt_counter_create(ins->cmt,
"fluentbit", "filter", "dropped_by_emitter",
"Total number of records dropped because emitter was paused",
1, (char *[]) {"name"});
/* OLD api */
flb_metrics_add(FLB_MULTILINE_METRIC_EMITTED,
"emit_records", ctx->ins->metrics);
Expand Down Expand Up @@ -412,7 +428,7 @@ static struct ml_stream *get_by_id(struct ml_ctx *ctx, uint64_t stream_id)
}

static struct ml_stream *get_or_create_stream(struct ml_ctx *ctx,
struct flb_input_instance *i_ins,
struct flb_input_instance *i_ins,
const char *tag, int tag_len)
{
uint64_t stream_id;
Expand Down Expand Up @@ -504,22 +520,31 @@ static void partial_timer_cb(struct flb_config *config, void *data)
struct split_message_packer *packer;
unsigned long long now;
unsigned long long diff;
int ret;
int ret;

now = ml_current_timestamp();

mk_list_foreach_safe(head, tmp, &ctx->split_message_packers) {
packer = mk_list_entry(head, struct split_message_packer, _head);

diff = now - packer->last_write_time;
if (diff <= ctx->flush_ms) {
continue;
}

mk_list_del(&packer->_head);
ml_split_message_packer_complete(packer);

if (ctx->ins_emitter->mem_buf_status == FLB_INPUT_PAUSED) {
flb_plg_debug(ctx->ins, "emitter is paused, dropping log");
#ifdef FLB_HAVE_METRICS
name = (char*) flb_filter_name(ctx->ins);
ts = cfl_time_now();
cmt_counter_inc(ctx->cmt_dropped_by_emitter, ts, 1, (char* []) {name});
#endif
}
/* re-emit record with original tag */
if (packer->log_encoder.output_buffer != NULL &&
else if (packer->log_encoder.output_buffer != NULL &&
packer->log_encoder.output_length > 0) {

flb_plg_trace(ctx->ins, "emitting from %s to %s", packer->input_name, packer->tag);
Expand Down Expand Up @@ -601,7 +626,7 @@ static int ml_filter_partial(const void *data, size_t bytes,
sched = flb_sched_ctx_get();

ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
ctx->flush_ms / 2, partial_timer_cb,
ctx->flush_ms / 2, partial_timer_cb,
ctx, NULL);
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to create flush timer");
Expand All @@ -610,7 +635,7 @@ static int ml_filter_partial(const void *data, size_t bytes,
}
}

/*
/*
* Create temporary msgpack buffer
* for non-partial messages which are passed on as-is
*/
Expand All @@ -630,7 +655,7 @@ static int ml_filter_partial(const void *data, size_t bytes,
partial_records--;
goto pack_non_partial;
}
packer = ml_get_packer(&ctx->split_message_packers, tag,
packer = ml_get_packer(&ctx->split_message_packers, tag,
i_ins->name, partial_id_str, partial_id_size);
if (packer == NULL) {
flb_plg_trace(ctx->ins, "Found new partial record with tag %s", tag);
Expand Down Expand Up @@ -796,7 +821,7 @@ static int cb_ml_filter(const void *data, size_t bytes,

/* unlikely to happen.. but just in case */
return FLB_FILTER_NOTOUCH;

} else { /* buffered mode */
stream = get_or_create_stream(ctx, i_ins, tag, tag_len);

Expand All @@ -820,7 +845,7 @@ static int cb_ml_filter(const void *data, size_t bytes,

flb_log_event_decoder_destroy(&decoder);

/*
/*
* always returned modified, which will be 0 records, since the emitter takes
* all records.
*/
Expand Down
3 changes: 2 additions & 1 deletion plugins/filter_multiline/ml.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#define FLB_MULTILINE_MODE_PARTIAL_MESSAGE "partial_message"
#define FLB_MULTILINE_MODE_PARSER "parser"

/*
/*
* input instance + tag is the unique identifier
* for a multiline stream
* TODO: implement clean up of streams that haven't been used recently
Expand Down Expand Up @@ -76,6 +76,7 @@ struct ml_ctx {

#ifdef FLB_HAVE_METRICS
struct cmt_counter *cmt_emitted;
struct cmt_counter *cmt_dropped_by_emitter;
#endif
};

Expand Down

0 comments on commit 1f250c5

Please sign in to comment.