Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix missing support for setting document id in decoder_json pr… #15859

Merged
merged 3 commits into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ occur.

*`document_id`*:: Option configuration setting that specifies the JSON key to
set the document id. If configured, the field will be removed from the original
json document and stored in `@metadata.id`
json document and stored in `@metadata._id`

*`ignore_decoding_error`*:: An optional configuration setting that specifies if
JSON decoding errors should be logged or not. If set to true, errors will not
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (h *Harvester) onMessage(

if id != "" {
meta = common.MapStr{
"id": id,
"_id": id,
}
}
} else if &text != nil {
Expand Down
4 changes: 2 additions & 2 deletions filebeat/tests/system/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ def test_id_in_message(self):

assert len(output) == 3
for i in xrange(len(output)):
assert("@metadata.id" in output[i])
assert(output[i]["@metadata.id"] == str(i))
assert("@metadata._id" in output[i])
assert(output[i]["@metadata._id"] == str(i))
assert("json.id" not in output[i])

def test_with_generic_filtering(self):
Expand Down
2 changes: 1 addition & 1 deletion libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (e *Event) SetID(id string) {
if e.Meta == nil {
e.Meta = common.MapStr{}
}
e.Meta["id"] = id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the special nature of this field name and the desire to keep it consistent in multiple places, do you think we should make it an exported const?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have more than one field that is special to meta. Let's clean these up (the other fields as well) in a follow up PR.

e.Meta["_id"] = id
}

func (e *Event) GetValue(key string) (interface{}, error) {
Expand Down
8 changes: 4 additions & 4 deletions libbeat/beat/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestEventPutGetTimestamp(t *testing.T) {

func TestEventMetadata(t *testing.T) {
const id = "123"
newMeta := func() common.MapStr { return common.MapStr{"id": id} }
newMeta := func() common.MapStr { return common.MapStr{"_id": id} }

t.Run("put", func(t *testing.T) {
evt := newEmptyEvent()
Expand All @@ -75,7 +75,7 @@ func TestEventMetadata(t *testing.T) {
t.Run("put sub-key", func(t *testing.T) {
evt := newEmptyEvent()

evt.PutValue("@metadata.id", id)
evt.PutValue("@metadata._id", id)

assert.Equal(t, newMeta(), evt.Meta)
assert.Empty(t, evt.Fields)
Expand All @@ -85,7 +85,7 @@ func TestEventMetadata(t *testing.T) {
evt := newEmptyEvent()
evt.Meta = newMeta()

v, err := evt.GetValue("@metadata.id")
v, err := evt.GetValue("@metadata._id")

assert.NoError(t, err)
assert.Equal(t, id, v)
Expand All @@ -105,7 +105,7 @@ func TestEventMetadata(t *testing.T) {
evt := newEmptyEvent()
evt.Meta = newMeta()

err := evt.Delete("@metadata.id")
err := evt.Delete("@metadata._id")

assert.NoError(t, err)
assert.Empty(t, evt.Meta)
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func createEventBulkMeta(

var id string
if m := event.Meta; m != nil {
if tmp := m["id"]; tmp != nil {
if tmp := m["_id"]; tmp != nil {
if s, ok := tmp.(string); ok {
id = s
} else {
Expand Down
31 changes: 30 additions & 1 deletion libbeat/processors/actions/decode_json_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type decodeJSONFields struct {
overwriteKeys bool
addErrorKey bool
processArray bool
documentID string
target *string
}

Expand All @@ -50,6 +51,7 @@ type config struct {
AddErrorKey bool `config:"add_error_key"`
ProcessArray bool `config:"process_array"`
Target *string `config:"target"`
DocumentID string `config:"document_id"`
}

var (
Expand Down Expand Up @@ -81,7 +83,15 @@ func NewDecodeJSONFields(c *common.Config) (processors.Processor, error) {
return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err)
}

f := &decodeJSONFields{fields: config.Fields, maxDepth: config.MaxDepth, overwriteKeys: config.OverwriteKeys, addErrorKey: config.AddErrorKey, processArray: config.ProcessArray, target: config.Target}
f := &decodeJSONFields{
fields: config.Fields,
maxDepth: config.MaxDepth,
overwriteKeys: config.OverwriteKeys,
addErrorKey: config.AddErrorKey,
processArray: config.ProcessArray,
documentID: config.DocumentID,
target: config.Target,
}
return f, nil
}

Expand Down Expand Up @@ -115,6 +125,18 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) {
target = *f.target
}

var id string
if key := f.documentID; key != "" {
if dict, ok := output.(map[string]interface{}); ok {
if tmp, err := common.MapStr(dict).GetValue(key); err == nil {
if v, ok := tmp.(string); ok {
id = v
common.MapStr(dict).Delete(key)
}
}
}
}

if target != "" {
_, err = event.PutValue(target, output)
} else {
Expand All @@ -131,6 +153,13 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) {
errs = append(errs, err.Error())
continue
}

if id != "" {
if event.Meta == nil {
event.Meta = common.MapStr{}
}
event.Meta["_id"] = id
}
}

if len(errs) > 0 {
Expand Down
33 changes: 33 additions & 0 deletions libbeat/processors/actions/decode_json_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -94,6 +95,38 @@ func TestInvalidJSONMultiple(t *testing.T) {
assert.Equal(t, expected.String(), actual.String())
}

func TestDocumentID(t *testing.T) {
logp.TestingSetup()

input := common.MapStr{
"msg": `{"log": "message", "myid": "myDocumentID"}`,
}

config := common.MustNewConfigFrom(map[string]interface{}{
"fields": []string{"msg"},
"document_id": "myid",
})

p, err := NewDecodeJSONFields(config)
if err != nil {
logp.Err("Error initializing decode_json_fields")
t.Fatal(err)
}

actual, err := p.Run(&beat.Event{Fields: input})
require.NoError(t, err)

wantFields := common.MapStr{
"msg": map[string]interface{}{"log": "message"},
}
wantMeta := common.MapStr{
"_id": "myDocumentID",
}

assert.Equal(t, wantFields, actual.Fields)
assert.Equal(t, wantMeta, actual.Meta)
}

func TestValidJSONDepthOne(t *testing.T) {
input := common.MapStr{
"msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}",
Expand Down
3 changes: 3 additions & 0 deletions libbeat/processors/actions/docs/decode_json_fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ default value is false.
`error` field is going to be part of event with error message. If it set to false, there
will not be any error in event's field. Even error occurs while decoding json keys. The
default value is false
`document_id`:: (Optional) JSON key to use as the document id. If configured,
the field will be removed from the original json document and stored in
`@metadata._id`
4 changes: 2 additions & 2 deletions libbeat/processors/add_id/add_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestDefaultTargetField(t *testing.T) {
newEvent, err := p.Run(testEvent)
assert.NoError(t, err)

v, err := newEvent.GetValue("@metadata.id")
v, err := newEvent.GetValue("@metadata._id")
assert.NoError(t, err)
assert.NotEmpty(t, v)
}
Expand All @@ -59,7 +59,7 @@ func TestNonDefaultTargetField(t *testing.T) {
assert.NoError(t, err)
assert.NotEmpty(t, v)

v, err = newEvent.GetValue("@metadata.id")
v, err = newEvent.GetValue("@metadata._id")
assert.NoError(t, err)
assert.Empty(t, v)
}
2 changes: 1 addition & 1 deletion libbeat/processors/add_id/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type config struct {

func defaultConfig() config {
return config{
TargetField: "@metadata.id",
TargetField: "@metadata._id",
Type: "elasticsearch",
}
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/processors/add_id/docs/add_id.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ processors:

The following settings are supported:

`target_field`:: (Optional) Field where the generated ID will be stored. Default is `@metadata.id`.
`target_field`:: (Optional) Field where the generated ID will be stored. Default is `@metadata._id`.

`type`:: (Optional) Type of ID to generate. Currently only `elasticsearch` is supported and is the default.
The `elasticsearch` type generates IDs using the same algorithm that Elasticsearch uses for auto-generating
Expand Down