From 6e1ba3e5508af5f796eda97254ca9506554d7537 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 6 Feb 2024 14:36:09 -0800 Subject: [PATCH] [exporter/elasticsearch] Add `mapping.mode: raw` configuration option (#29619) **Description:** This PR adds a new configuration option, `mapping.mode: raw`, to the Elasticsearch exporter. When set, the Elasticsearch exporter will not prefix log or span attributes with `Attributes.` when forming the Elasticsearch document field names for these fields. Additionally, the exporter will also not prefix span events with `Events.*` with forming the Elasticsearch document field names for these fields. **Link to tracking Issue:** Resolves https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/26647 **Testing:** Besides adding/updating relevant unit tests in this PR, I also tested the changes in this PR against a local Elasticsearch cluster, using the following collector configurations: 1. Without the new `mapping.mode: raw` setting. ```yaml receivers: tcplog: listen_address: "0.0.0.0:54545" processors: attributes: actions: - action: insert key: first_attribute value: one - action: insert key: second_attribute value: two exporters: debug: verbosity: detailed elasticsearch: endpoints: [ "https://localhost:9200" ] user: elastic password: XXXXXXXX logs_index: otel-logs tls: insecure_skip_verify: true flush: interval: 1s service: pipelines: logs: receivers: [tcplog] processors: [attributes] exporters: [debug,elasticsearch] ``` _Resulting document in Elasticsearch:_ ```json { "_index": "otel-logs", "_id": "l1E5J4wBD9bb2EmZJuDR", "_score": 1, "_source": { "@timestamp": "1970-01-01T00:00:00.000000000Z", "Attributes": { "first_attribute": "one", "second_attribute": "two" }, "Body": "bar", "Scope": { "name": "", "version": "" }, "SeverityNumber": 0, "TraceFlags": 0 } } ``` 2. With the new `mapping.mode: raw` setting. ```yaml receivers: tcplog: listen_address: "0.0.0.0:54545" processors: attributes: actions: - action: insert key: first_attribute value: one - action: insert key: second_attribute value: two exporters: debug: verbosity: detailed elasticsearch: endpoints: [ "https://localhost:9200" ] user: elastic password: XXXXXXXX logs_index: otel-logs tls: insecure_skip_verify: true flush: interval: 1s mapping: mode: raw service: pipelines: logs: receivers: [tcplog] processors: [attributes] exporters: [debug,elasticsearch] ``` _Resulting document in Elasticsearch:_ ```json { "_index": "otel-logs", "_id": "jlE4J4wBD9bb2EmZp-Cd", "_score": 1, "_source": { "@timestamp": "1970-01-01T00:00:00.000000000Z", "Body": "foo bar baz", "Scope": { "name": "", "version": "" }, "SeverityNumber": 0, "TraceFlags": 0, "first_attribute": "one", "second_attribute": "two" } } ``` **Documentation:** Documented the new configuration option in the Elasticsearch exporter's `README.md`. --------- Co-authored-by: Andrzej Stencel --- .chloggen/exp-es-omit-attributes-prefix.yaml | 33 ++++++ exporter/elasticsearchexporter/README.md | 3 + exporter/elasticsearchexporter/config.go | 11 ++ exporter/elasticsearchexporter/config_test.go | 9 ++ .../elasticsearchexporter/logs_exporter.go | 6 +- .../logs_exporter_test.go | 2 +- exporter/elasticsearchexporter/model.go | 23 +++- exporter/elasticsearchexporter/model_test.go | 111 ++++++++++++++++++ .../testdata/config.yaml | 4 + .../elasticsearchexporter/trace_exporter.go | 6 +- .../traces_exporter_test.go | 2 +- 11 files changed, 203 insertions(+), 7 deletions(-) create mode 100755 .chloggen/exp-es-omit-attributes-prefix.yaml diff --git a/.chloggen/exp-es-omit-attributes-prefix.yaml b/.chloggen/exp-es-omit-attributes-prefix.yaml new file mode 100755 index 0000000000000..24d646b3d8a12 --- /dev/null +++ b/.chloggen/exp-es-omit-attributes-prefix.yaml @@ -0,0 +1,33 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add `mapping.mode: raw` configuration option" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26647] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Setting `mapping.mode: raw` in the Elasticsearch exporter's configuration + will result logs and traces being indexed into Elasticsearch with their + attribute fields directly at the root level of the document instead inside an + `Attributes` object. Similarly, this setting will also result in traces being + indexed into Elasticsearch with their event fields directly at the root level + of the document instead of inside an `Events` object. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index caed01f440226..ff8b80f73fee4 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -68,6 +68,9 @@ This exporter supports sending OpenTelemetry logs to [Elasticsearch](https://www - `ecs`: Try to map fields defined in the [OpenTelemetry Semantic Conventions](https://github.com/open-telemetry/semantic-conventions) to [Elastic Common Schema (ECS)](https://www.elastic.co/guide/en/ecs/current/index.html). + - `raw`: Omit the `Attributes.` string prefixed to field names for log and + span attributes as well as omit the `Events.` string prefixed to + field names for span events. - `fields` (optional): Configure additional fields mappings. - `file` (optional): Read additional field mappings from the provided YAML file. - `dedup` (default=true): Try to find and remove duplicate fields/attributes diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index f755844420bda..b30f486c41160 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -175,6 +175,7 @@ type MappingMode int const ( MappingNone MappingMode = iota MappingECS + MappingRaw ) var ( @@ -188,6 +189,8 @@ func (m MappingMode) String() string { return "" case MappingECS: return "ecs" + case MappingRaw: + return "raw" default: return "" } @@ -198,6 +201,7 @@ var mappingModes = func() map[string]MappingMode { for _, m := range []MappingMode{ MappingNone, MappingECS, + MappingRaw, } { table[strings.ToLower(m.String())] = m } @@ -231,3 +235,10 @@ func (cfg *Config) Validate() error { return nil } + +// MappingMode returns the mapping.mode defined in the given cfg +// object. This method must be called after cfg.Validate() has been +// called without returning an error. +func (cfg *Config) MappingMode() MappingMode { + return mappingModes[cfg.Mapping.Mode] +} diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index 5e7ad7a47fa2c..4a896d77e7623 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -85,6 +85,10 @@ func TestLoadConfig(t *testing.T) { defaultLogstashFormatCfg.(*Config).Endpoints = []string{"http://localhost:9200"} defaultLogstashFormatCfg.(*Config).LogstashFormat.Enabled = true + defaultRawCfg := createDefaultConfig() + defaultRawCfg.(*Config).Endpoints = []string{"http://localhost:9200"} + defaultRawCfg.(*Config).Mapping.Mode = "raw" + tests := []struct { configFile string id component.ID @@ -200,6 +204,11 @@ func TestLoadConfig(t *testing.T) { configFile: "config.yaml", expected: defaultLogstashFormatCfg, }, + { + id: component.NewIDWithName(metadata.Type, "raw"), + configFile: "config.yaml", + expected: defaultRawCfg, + }, } for _, tt := range tests { diff --git a/exporter/elasticsearchexporter/logs_exporter.go b/exporter/elasticsearchexporter/logs_exporter.go index b74a5f942ae2d..2c94d816418da 100644 --- a/exporter/elasticsearchexporter/logs_exporter.go +++ b/exporter/elasticsearchexporter/logs_exporter.go @@ -53,7 +53,11 @@ func newLogsExporter(logger *zap.Logger, cfg *Config) (*elasticsearchLogsExporte maxAttempts = cfg.Retry.MaxRequests } - model := &encodeModel{dedup: cfg.Mapping.Dedup, dedot: cfg.Mapping.Dedot} + model := &encodeModel{ + dedup: cfg.Mapping.Dedup, + dedot: cfg.Mapping.Dedot, + mode: cfg.MappingMode(), + } indexStr := cfg.LogsIndex if cfg.Index != "" { diff --git a/exporter/elasticsearchexporter/logs_exporter_test.go b/exporter/elasticsearchexporter/logs_exporter_test.go index 705fbaf089244..cf83b736604a9 100644 --- a/exporter/elasticsearchexporter/logs_exporter_test.go +++ b/exporter/elasticsearchexporter/logs_exporter_test.go @@ -122,7 +122,7 @@ func TestExporter_New(t *testing.T) { cfg.Mapping.Dedot = false cfg.Mapping.Dedup = true }), - want: successWithInternalModel(&encodeModel{dedot: false, dedup: true}), + want: successWithInternalModel(&encodeModel{dedot: false, dedup: true, mode: MappingECS}), }, } diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index c727de45e644c..bbdfedcb10fc0 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -30,6 +30,7 @@ type mappingModel interface { type encodeModel struct { dedup bool dedot bool + mode MappingMode } const ( @@ -47,7 +48,7 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord document.AddString("SeverityText", record.SeverityText()) document.AddInt("SeverityNumber", int64(record.SeverityNumber())) document.AddAttribute("Body", record.Body()) - document.AddAttributes("Attributes", record.Attributes()) + m.encodeAttributes(&document, record.Attributes()) document.AddAttributes("Resource", resource.Attributes()) document.AddAttributes("Scope", scopeToAttributes(scope)) @@ -74,9 +75,9 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, sc document.AddInt("TraceStatus", int64(span.Status().Code())) document.AddString("TraceStatusDescription", span.Status().Message()) document.AddString("Link", spanLinksToString(span.Links())) - document.AddAttributes("Attributes", span.Attributes()) + m.encodeAttributes(&document, span.Attributes()) document.AddAttributes("Resource", resource.Attributes()) - document.AddEvents("Events", span.Events()) + m.encodeEvents(&document, span.Events()) document.AddInt("Duration", durationAsMicroseconds(span.StartTimestamp().AsTime(), span.EndTimestamp().AsTime())) // unit is microseconds document.AddAttributes("Scope", scopeToAttributes(scope)) @@ -91,6 +92,22 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, sc return buf.Bytes(), err } +func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes pcommon.Map) { + key := "Attributes" + if m.mode == MappingRaw { + key = "" + } + document.AddAttributes(key, attributes) +} + +func (m *encodeModel) encodeEvents(document *objmodel.Document, events ptrace.SpanEventSlice) { + key := "Events" + if m.mode == MappingRaw { + key = "" + } + document.AddEvents(key, events) +} + func spanLinksToString(spanLinkSlice ptrace.SpanLinkSlice) string { linkArray := make([]map[string]any, 0, spanLinkSlice.Len()) for i := 0; i < spanLinkSlice.Len(); i++ { diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index e76d1f39e4916..dc318df6e7687 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -4,13 +4,17 @@ package elasticsearchexporter import ( + "fmt" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/collector/semconv/v1.18.0" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel" ) var expectedSpanBody = `{"@timestamp":"2023-04-19T03:04:05.000000006Z","Attributes.service.instance.id":"23","Duration":1000000,"EndTimestamp":"2023-04-19T03:04:06.000000006Z","Events.fooEvent.evnetMockBar":"bar","Events.fooEvent.evnetMockFoo":"foo","Events.fooEvent.time":"2023-04-19T03:04:05.000000006Z","Kind":"SPAN_KIND_CLIENT","Link":"[{\"attribute\":{},\"spanID\":\"\",\"traceID\":\"01020304050607080807060504030200\"}]","Name":"client span","Resource.cloud.platform":"aws_elastic_beanstalk","Resource.cloud.provider":"aws","Resource.deployment.environment":"BETA","Resource.service.instance.id":"23","Resource.service.name":"some-service","Resource.service.version":"env-version-1234","Scope.lib-foo":"lib-bar","Scope.name":"io.opentelemetry.rabbitmq-2.7","Scope.version":"1.30.0-alpha","SpanId":"1920212223242526","TraceId":"01020304050607080807060504030201","TraceStatus":2,"TraceStatusDescription":"Test"}` @@ -63,3 +67,110 @@ func mockResourceSpans() ptrace.Traces { event.Attributes().PutStr("evnetMockBar", "bar") return traces } + +func TestEncodeAttributes(t *testing.T) { + t.Parallel() + + attributes := pcommon.NewMap() + err := attributes.FromRaw(map[string]any{ + "s": "baz", + "o": map[string]any{ + "sub_i": 19, + }, + }) + require.NoError(t, err) + + tests := map[string]struct { + mappingMode MappingMode + want func() objmodel.Document + }{ + "raw": { + mappingMode: MappingRaw, + want: func() objmodel.Document { + return objmodel.DocumentFromAttributes(attributes) + }, + }, + "none": { + mappingMode: MappingNone, + want: func() objmodel.Document { + doc := objmodel.Document{} + doc.AddAttributes("Attributes", attributes) + return doc + }, + }, + "ecs": { + mappingMode: MappingECS, + want: func() objmodel.Document { + doc := objmodel.Document{} + doc.AddAttributes("Attributes", attributes) + return doc + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + m := encodeModel{ + mode: test.mappingMode, + } + + doc := objmodel.Document{} + m.encodeAttributes(&doc, attributes) + require.Equal(t, test.want(), doc) + }) + } +} + +func TestEncodeEvents(t *testing.T) { + t.Parallel() + + events := ptrace.NewSpanEventSlice() + events.EnsureCapacity(4) + for i := 0; i < 4; i++ { + event := events.AppendEmpty() + event.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(time.Duration(i) * time.Minute))) + event.SetName(fmt.Sprintf("event_%d", i)) + } + + tests := map[string]struct { + mappingMode MappingMode + want func() objmodel.Document + }{ + "raw": { + mappingMode: MappingRaw, + want: func() objmodel.Document { + doc := objmodel.Document{} + doc.AddEvents("", events) + return doc + }, + }, + "none": { + mappingMode: MappingNone, + want: func() objmodel.Document { + doc := objmodel.Document{} + doc.AddEvents("Events", events) + return doc + }, + }, + "ecs": { + mappingMode: MappingECS, + want: func() objmodel.Document { + doc := objmodel.Document{} + doc.AddEvents("Events", events) + return doc + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + m := encodeModel{ + mode: test.mappingMode, + } + + doc := objmodel.Document{} + m.encodeEvents(&doc, events) + require.Equal(t, test.want(), doc) + }) + } +} diff --git a/exporter/elasticsearchexporter/testdata/config.yaml b/exporter/elasticsearchexporter/testdata/config.yaml index 792640222adb2..3054df30355c2 100644 --- a/exporter/elasticsearchexporter/testdata/config.yaml +++ b/exporter/elasticsearchexporter/testdata/config.yaml @@ -44,3 +44,7 @@ elasticsearch/logstash_format: endpoints: [http://localhost:9200] logstash_format: enabled: true +elasticsearch/raw: + endpoints: [http://localhost:9200] + mapping: + mode: raw diff --git a/exporter/elasticsearchexporter/trace_exporter.go b/exporter/elasticsearchexporter/trace_exporter.go index bfa3c485271f2..0d5f0e28bc752 100644 --- a/exporter/elasticsearchexporter/trace_exporter.go +++ b/exporter/elasticsearchexporter/trace_exporter.go @@ -49,7 +49,11 @@ func newTracesExporter(logger *zap.Logger, cfg *Config) (*elasticsearchTracesExp maxAttempts = cfg.Retry.MaxRequests } - model := &encodeModel{dedup: cfg.Mapping.Dedup, dedot: cfg.Mapping.Dedot} + model := &encodeModel{ + dedup: cfg.Mapping.Dedup, + dedot: cfg.Mapping.Dedot, + mode: cfg.MappingMode(), + } return &elasticsearchTracesExporter{ logger: logger, diff --git a/exporter/elasticsearchexporter/traces_exporter_test.go b/exporter/elasticsearchexporter/traces_exporter_test.go index f3722bfbe5cbd..c42e5d58f6b04 100644 --- a/exporter/elasticsearchexporter/traces_exporter_test.go +++ b/exporter/elasticsearchexporter/traces_exporter_test.go @@ -99,7 +99,7 @@ func TestTracesExporter_New(t *testing.T) { cfg.Mapping.Dedot = false cfg.Mapping.Dedup = true }), - want: successWithInternalModel(&encodeModel{dedot: false, dedup: true}), + want: successWithInternalModel(&encodeModel{dedot: false, dedup: true, mode: MappingECS}), }, }