Skip to content

Commit

Permalink
out_es: support 'id_key' to get id from record(#3303) (#3320)
Browse files Browse the repository at this point in the history
Signed-off-by: Takahiro Yamashita <nokute78@gmail.com>
  • Loading branch information
nokute78 authored and edsiper committed Apr 6, 2021
1 parent fab5fbe commit 7c07328
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 0 deletions.
60 changes: 60 additions & 0 deletions plugins/out_es/es.c
Expand Up @@ -27,6 +27,7 @@
#include <fluent-bit/flb_signv4.h>
#include <fluent-bit/flb_aws_credentials.h>
#include <fluent-bit/flb_record_accessor.h>
#include <fluent-bit/flb_ra_key.h>
#include <msgpack.h>

#include <time.h>
Expand Down Expand Up @@ -197,6 +198,40 @@ static int es_pack_array_content(msgpack_packer *tmp_pck,
return 0;
}

/*
* Get _id value from incoming record.
* If it successed, return the value as flb_sds_t.
* If it failed, return NULL.
*/
static flb_sds_t es_get_id_value(struct flb_elasticsearch *ctx,
msgpack_object *map)
{
struct flb_ra_value *rval = NULL;
flb_sds_t tmp_str;
rval = flb_ra_get_value_object(ctx->ra_id_key, *map);
if (rval == NULL) {
flb_plg_warn(ctx->ins, "the value of %s is missing",
ctx->id_key);
return NULL;
}
else if(rval->o.type != MSGPACK_OBJECT_STR) {
flb_plg_warn(ctx->ins, "the value of %s is not string",
ctx->id_key);
flb_ra_key_value_destroy(rval);
return NULL;
}

tmp_str = flb_sds_create_len(rval->o.via.str.ptr,
rval->o.via.str.size);
if (tmp_str == NULL) {
flb_plg_warn(ctx->ins, "cannot create ID string from record");
flb_ra_key_value_destroy(rval);
return NULL;
}
flb_ra_key_value_destroy(rval);
return tmp_str;
}

/*
* Convert the internal Fluent Bit data representation to the required
* one by Elasticsearch.
Expand Down Expand Up @@ -224,6 +259,7 @@ static int elasticsearch_format(struct flb_config *config,
char index_formatted[256];
char es_uuid[37];
flb_sds_t out_buf;
flb_sds_t id_key_str = NULL;
msgpack_unpacked result;
msgpack_object root;
msgpack_object map;
Expand Down Expand Up @@ -468,6 +504,25 @@ static int elasticsearch_format(struct flb_config *config,
es_index, ctx->type, es_uuid);
}
}
if (ctx->ra_id_key) {
id_key_str = es_get_id_value(ctx ,&map);
if (id_key_str) {
if (ctx->suppress_type_name) {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE,
es_index, id_key_str);
}
else {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT_ID,
es_index, ctx->type, id_key_str);
}
flb_sds_destroy(id_key_str);
id_key_str = NULL;
}
}

/* Convert msgpack to JSON */
out_buf = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size);
Expand Down Expand Up @@ -940,6 +995,11 @@ static struct flb_config_map config_map[] = {
"When enabled, generate _id for outgoing records. This prevents duplicate "
"records when retrying ES"
},
{
FLB_CONFIG_MAP_STR, "id_key", NULL,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, id_key),
"If set, _id will be the value of the key from incoming record."
},
{
FLB_CONFIG_MAP_BOOL, "replace_dots", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, replace_dots),
Expand Down
5 changes: 5 additions & 0 deletions plugins/out_es/es.h
Expand Up @@ -100,6 +100,11 @@ struct flb_elasticsearch {
/* time key nanoseconds */
int time_key_nanos;


/* id_key */
flb_sds_t id_key;
struct flb_record_accessor *ra_id_key;

/* include_tag_key */
int include_tag_key;
flb_sds_t tag_key;
Expand Down
15 changes: 15 additions & 0 deletions plugins/out_es/es_conf.c
Expand Up @@ -233,6 +233,17 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
}


if (ctx->id_key) {
ctx->ra_id_key = flb_ra_create(ctx->id_key, FLB_FALSE);
if (ctx->ra_id_key == NULL) {
flb_plg_error(ins, "could not create record accessor for Id Key");
}
if (ctx->generate_id == FLB_TRUE) {
flb_plg_warn(ins, "Generate_ID is ignored when ID_key is set");
ctx->generate_id = FLB_FALSE;
}
}

if (ctx->logstash_prefix_key) {
if (ctx->logstash_prefix_key[0] != '$') {
len = flb_sds_len(ctx->logstash_prefix_key);
Expand Down Expand Up @@ -387,6 +398,10 @@ int flb_es_conf_destroy(struct flb_elasticsearch *ctx)
if (ctx->u) {
flb_upstream_destroy(ctx->u);
}
if (ctx->ra_id_key) {
flb_ra_destroy(ctx->ra_id_key);
ctx->ra_id_key = NULL;
}

#ifdef FLB_HAVE_AWS
if (ctx->base_aws_provider) {
Expand Down
57 changes: 57 additions & 0 deletions tests/runtime/out_elasticsearch.c
Expand Up @@ -73,6 +73,18 @@ static void cb_check_replace_dots(void *ctx, int ffd,
flb_free(res_data);
}

static void cb_check_id_key(void *ctx, int ffd,
int res_ret, void *res_data, size_t res_size,
void *data)
{
char *p;
char *out_js = res_data;
char *record = "\"_id\":\"some string\""; // see data/es/json_es.h

p = strstr(out_js, record);
TEST_CHECK(p != NULL);
flb_free(res_data);
}

void flb_test_index_type()
{
Expand Down Expand Up @@ -301,12 +313,57 @@ void flb_test_replace_dots()
flb_destroy(ctx);
}

void flb_test_id_key()
{
int ret;
int size = sizeof(JSON_DOTS) - 1;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* Create context, flush every second (some checks omitted here) */
ctx = flb_create();
flb_service_set(ctx, "flush", "1", "grace", "1", NULL);

/* Lib input mode */
in_ffd = flb_input(ctx, (char *) "lib", NULL);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

/* Elasticsearch output */
out_ffd = flb_output(ctx, (char *) "es", NULL);
flb_output_set(ctx, out_ffd,
"match", "test",
NULL);

/* Override defaults of index and type */
flb_output_set(ctx, out_ffd,
"id_key", "key_2",
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_id_key,
NULL, NULL);

/* Start */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data sample */
flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

/* Test list */
TEST_LIST = {
{"index_type" , flb_test_index_type },
{"logstash_format" , flb_test_logstash_format },
{"logstash_format_nanos", flb_test_logstash_format_nanos },
{"tag_key" , flb_test_tag_key },
{"replace_dots" , flb_test_replace_dots },
{"id_key" , flb_test_id_key },
{NULL, NULL}
};

0 comments on commit 7c07328

Please sign in to comment.