Skip to content

Commit

Permalink
filter_multiline: address PR feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
  • Loading branch information
PettitWesley committed Apr 13, 2022
1 parent cbe2f6e commit 5864186
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 63 deletions.
31 changes: 16 additions & 15 deletions plugins/filter_multiline/ml.c
Expand Up @@ -502,7 +502,7 @@ static void partial_timer_cb(struct flb_config *config, void *data)
unsigned long long diff;
int ret;

now = current_timestamp();
now = ml_current_timestamp();

mk_list_foreach_safe(head, tmp, &ctx->split_message_packers) {
packer = mk_list_entry(head, struct split_message_packer, _head);
Expand All @@ -513,7 +513,7 @@ static void partial_timer_cb(struct flb_config *config, void *data)
}

mk_list_del(&packer->_head);
split_message_packer_complete(packer);
ml_split_message_packer_complete(packer);
/* re-emit record with original tag */
flb_plg_trace(ctx->ins, "emitting from %s to %s", packer->input_name, packer->tag);
ret = in_emitter_add_record(packer->tag, flb_sds_len(packer->tag),
Expand All @@ -524,7 +524,7 @@ static void partial_timer_cb(struct flb_config *config, void *data)
flb_plg_warn(ctx->ins, "Couldn't send concatenated record of size %zu bytes to in_emitter %s",
packer->mp_sbuf.size, ctx->ins_emitter->name);
}
split_message_packer_destroy(packer);
ml_split_message_packer_destroy(packer);
}

}
Expand Down Expand Up @@ -571,7 +571,8 @@ static int ml_filter_partial(const void *data, size_t bytes,
sched = flb_sched_ctx_get();

ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
ctx->flush_ms, partial_timer_cb, ctx, NULL);
ctx->flush_ms / 2, partial_timer_cb,
ctx, NULL);
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to create flush timer");
} else {
Expand All @@ -591,22 +592,22 @@ static int ml_filter_partial(const void *data, size_t bytes,
total_records++;
flb_time_pop_from_msgpack(&tm, &result, &obj);

partial = is_partial(obj);
partial = ml_is_partial(obj);
if (partial == FLB_TRUE) {
partial_records++;
ret = get_partial_id(obj, &partial_id_str, &partial_id_size);
ret = ml_get_partial_id(obj, &partial_id_str, &partial_id_size);
if (ret == -1) {
flb_plg_warn(ctx->ins, "Could not find partial_id but partial_message key is FLB_TRUE for record with tag %s", tag);
/* handle this record as non-partial */
partial_records--;
goto pack_non_partial;
}
packer = get_packer(&ctx->split_message_packers, tag,
i_ins->name, partial_id_str, partial_id_size);
packer = ml_get_packer(&ctx->split_message_packers, tag,
i_ins->name, partial_id_str, partial_id_size);
if (packer == NULL) {
flb_plg_trace(ctx->ins, "Found new partial record with tag %s", tag);
packer = create_packer(tag, i_ins->name, partial_id_str, partial_id_size,
obj, "log", &tm);
packer = ml_create_packer(tag, i_ins->name, partial_id_str, partial_id_size,
obj, ctx->key_content, &tm);
if (packer == NULL) {
flb_plg_warn(ctx->ins, "Could not create packer for partial record with tag %s", tag);
/* handle this record as non-partial */
Expand All @@ -615,21 +616,21 @@ static int ml_filter_partial(const void *data, size_t bytes,
}
mk_list_add(&packer->_head, &ctx->split_message_packers);
}
ret = split_message_packer_write(packer, obj, "log");
ret = ml_split_message_packer_write(packer, obj, ctx->key_content);
if (ret < 0) {
flb_plg_warn(ctx->ins, "Could not append content for partial record with tag %s", tag);
/* handle this record as non-partial */
partial_records--;
goto pack_non_partial;
}
is_last_partial = is_partial_last(obj);
is_last_partial = ml_is_partial_last(obj);
if (is_last_partial == FLB_TRUE) {
/* emit the record in this filter invocation */
return_records++;
split_message_packer_complete(packer);
append_complete_record(packer->mp_sbuf.data, packer->mp_sbuf.size, &tmp_pck);
ml_split_message_packer_complete(packer);
ml_append_complete_record(packer->mp_sbuf.data, packer->mp_sbuf.size, &tmp_pck);
mk_list_del(&packer->_head);
split_message_packer_destroy(packer);
ml_split_message_packer_destroy(packer);
}
} else {

Expand Down
61 changes: 32 additions & 29 deletions plugins/filter_multiline/ml_concat.c
Expand Up @@ -31,7 +31,7 @@

#include "ml_concat.h"

msgpack_object_kv *get_key(msgpack_object *map, char *check_for_key)
msgpack_object_kv *ml_get_key(msgpack_object *map, char *check_for_key)
{
int i;
char *key_str = NULL;
Expand Down Expand Up @@ -66,13 +66,13 @@ msgpack_object_kv *get_key(msgpack_object *map, char *check_for_key)
return NULL;
}

int is_partial(msgpack_object *map)
int ml_is_partial(msgpack_object *map)
{
char *val_str = NULL;
msgpack_object_kv *kv;
msgpack_object val;

kv = get_key(map, FLB_MULTILINE_PARTIAL_MESSAGE_KEY);
kv = ml_get_key(map, FLB_MULTILINE_PARTIAL_MESSAGE_KEY);

if (kv == NULL) {
return FLB_FALSE;
Expand All @@ -92,13 +92,13 @@ int is_partial(msgpack_object *map)
return FLB_FALSE;
}

int is_partial_last(msgpack_object *map)
int ml_is_partial_last(msgpack_object *map)
{
char *val_str = NULL;
msgpack_object_kv *kv;
msgpack_object val;

kv = get_key(map, FLB_MULTILINE_PARTIAL_LAST_KEY);
kv = ml_get_key(map, FLB_MULTILINE_PARTIAL_LAST_KEY);

if (kv == NULL) {
return FLB_FALSE;
Expand All @@ -118,7 +118,7 @@ int is_partial_last(msgpack_object *map)
return FLB_FALSE;
}

int get_partial_id(msgpack_object *map,
int ml_get_partial_id(msgpack_object *map,
char **partial_id_str,
size_t *partial_id_size)
{
Expand All @@ -127,7 +127,7 @@ int get_partial_id(msgpack_object *map,
msgpack_object_kv *kv;
msgpack_object val;

kv = get_key(map, FLB_MULTILINE_PARTIAL_ID_KEY);
kv = ml_get_key(map, FLB_MULTILINE_PARTIAL_ID_KEY);

if (kv == NULL) {
return -1;
Expand All @@ -149,9 +149,9 @@ int get_partial_id(msgpack_object *map,
return 0;
}

struct split_message_packer *get_packer(struct mk_list *packers, const char *tag,
char *input_name,
char *partial_id_str, size_t partial_id_size)
struct split_message_packer *ml_get_packer(struct mk_list *packers, const char *tag,
char *input_name,
char *partial_id_str, size_t partial_id_size)
{
struct mk_list *tmp;
struct mk_list *head;
Expand Down Expand Up @@ -180,10 +180,10 @@ struct split_message_packer *get_packer(struct mk_list *packers, const char *tag
return NULL;
}

struct split_message_packer *create_packer(const char *tag, char *input_name,
char *partial_id_str, size_t partial_id_size,
msgpack_object *map, char *multiline_key_content,
struct flb_time *tm)
struct split_message_packer *ml_create_packer(const char *tag, char *input_name,
char *partial_id_str, size_t partial_id_size,
msgpack_object *map, char *multiline_key_content,
struct flb_time *tm)
{
struct split_message_packer *packer;
msgpack_object_kv *kv;
Expand Down Expand Up @@ -214,34 +214,34 @@ struct split_message_packer *create_packer(const char *tag, char *input_name,
tmp = flb_sds_create(tag);
if (!tmp) {
flb_errno();
split_message_packer_destroy(packer);
ml_split_message_packer_destroy(packer);
return NULL;
}
packer->tag = tmp;

tmp = flb_sds_create_len(partial_id_str, partial_id_size);
if (!tmp) {
flb_errno();
split_message_packer_destroy(packer);
ml_split_message_packer_destroy(packer);
return NULL;
}
packer->partial_id = tmp;

packer->buf = flb_sds_create_size(FLB_MULTILINE_PARTIAL_BUF_SIZE);
if (!packer->buf) {
flb_errno();
split_message_packer_destroy(packer);
ml_split_message_packer_destroy(packer);
return NULL;
}

msgpack_sbuffer_init(&packer->mp_sbuf);
msgpack_packer_init(&packer->mp_pck, &packer->mp_sbuf, msgpack_sbuffer_write);

/* get the key that is split */
split_kv = get_key(map, multiline_key_content);
split_kv = ml_get_key(map, multiline_key_content);
if (split_kv == NULL) {
flb_error("[partial message concat] Could not find key %s in record", multiline_key_content);
split_message_packer_destroy(packer);
ml_split_message_packer_destroy(packer);
return NULL;
}

Expand Down Expand Up @@ -324,23 +324,23 @@ struct split_message_packer *create_packer(const char *tag, char *input_name,
return packer;
}

unsigned long long current_timestamp() {
unsigned long long ml_current_timestamp() {
struct timeval te;
unsigned long long milliseconds;
gettimeofday(&te, NULL);
milliseconds = te.tv_sec*1000LL + te.tv_usec/1000;
return milliseconds;
}

int split_message_packer_write(struct split_message_packer *packer,
msgpack_object *map, char *multiline_key_content)
int ml_split_message_packer_write(struct split_message_packer *packer,
msgpack_object *map, char *multiline_key_content)
{
char *val_str = NULL;
size_t val_str_size = 0;
msgpack_object_kv *kv;
msgpack_object val;

kv = get_key(map, multiline_key_content);
kv = ml_get_key(map, multiline_key_content);

if (kv == NULL) {
flb_error("[partial message concat] Could not find key %s in record", multiline_key_content);
Expand All @@ -351,27 +351,30 @@ int split_message_packer_write(struct split_message_packer *packer,
if (val.type == MSGPACK_OBJECT_BIN) {
val_str = (char *) val.via.bin.ptr;
val_str_size = val.via.bin.size;
}
if (val.type == MSGPACK_OBJECT_STR) {
} else if (val.type == MSGPACK_OBJECT_STR) {
val_str = (char *) val.via.str.ptr;
val_str_size = val.via.str.size;
} else {
return -1;
}



flb_sds_cat_safe(&packer->buf, val_str, val_str_size);
packer->last_write_time = current_timestamp();
packer->last_write_time = ml_current_timestamp();

return 0;
}

void split_message_packer_complete(struct split_message_packer *packer)
void ml_split_message_packer_complete(struct split_message_packer *packer)
{
int len;
len = flb_sds_len(packer->buf);
msgpack_pack_str(&packer->mp_pck, len);
msgpack_pack_str_body(&packer->mp_pck, packer->buf, len);
}

void append_complete_record(char *data, size_t bytes, msgpack_packer *tmp_pck)
void ml_append_complete_record(char *data, size_t bytes, msgpack_packer *tmp_pck)
{
int ok = MSGPACK_UNPACK_SUCCESS;
size_t off = 0;
Expand All @@ -388,7 +391,7 @@ void append_complete_record(char *data, size_t bytes, msgpack_packer *tmp_pck)
}
}

void split_message_packer_destroy(struct split_message_packer *packer)
void ml_split_message_packer_destroy(struct split_message_packer *packer)
{
if (!packer) {
return;
Expand Down
38 changes: 19 additions & 19 deletions plugins/filter_multiline/ml_concat.h
Expand Up @@ -56,25 +56,25 @@ struct split_message_packer {
struct mk_list _head;
};

msgpack_object_kv *get_key(msgpack_object *map, char *check_for_key);
int is_partial(msgpack_object *map);
int is_partial_last(msgpack_object *map);
int get_partial_id(msgpack_object *map,
char **partial_id_str,
size_t *partial_id_size);
struct split_message_packer *get_packer(struct mk_list *packers, const char *tag,
char *input_name,
char *partial_id_str, size_t partial_id_size);
struct split_message_packer *create_packer(const char *tag, char *input_name,
char *partial_id_str, size_t partial_id_size,
msgpack_object *map, char *multiline_key_content,
struct flb_time *tm);
int split_message_packer_write(struct split_message_packer *packer,
msgpack_object *map, char *multiline_key_content);
void split_message_packer_complete(struct split_message_packer *packer);
void split_message_packer_destroy(struct split_message_packer *packer);
void append_complete_record(char *data, size_t bytes, msgpack_packer *tmp_pck);
unsigned long long current_timestamp();
msgpack_object_kv *ml_get_key(msgpack_object *map, char *check_for_key);
int ml_is_partial(msgpack_object *map);
int ml_is_partial_last(msgpack_object *map);
int ml_get_partial_id(msgpack_object *map,
char **partial_id_str,
size_t *partial_id_size);
struct split_message_packer *ml_get_packer(struct mk_list *packers, const char *tag,
char *input_name,
char *partial_id_str, size_t partial_id_size);
struct split_message_packer *ml_create_packer(const char *tag, char *input_name,
char *partial_id_str, size_t partial_id_size,
msgpack_object *map, char *multiline_key_content,
struct flb_time *tm);
int ml_split_message_packer_write(struct split_message_packer *packer,
msgpack_object *map, char *multiline_key_content);
void ml_split_message_packer_complete(struct split_message_packer *packer);
void ml_split_message_packer_destroy(struct split_message_packer *packer);
void ml_append_complete_record(char *data, size_t bytes, msgpack_packer *tmp_pck);
unsigned long long ml_current_timestamp();


#endif

0 comments on commit 5864186

Please sign in to comment.