Skip to content

Commit

Permalink
processor/otel: copy shared labels for logs (#7358)
Browse files Browse the repository at this point in the history
(cherry picked from commit afdd45e)

# Conflicts:
#	changelogs/head.asciidoc
#	processor/otel/logs_test.go
  • Loading branch information
axw authored and mergify-bot committed Feb 21, 2022
1 parent 1faf1e6 commit 5ddd12d
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 15 deletions.
32 changes: 32 additions & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[[release-notes-head]]
== APM version HEAD

https://github.com/elastic/apm-server/compare/8.0\...main[View commits]

Starting in version 8.0.0, {fleet} uses the APM integration to set up and manage APM index templates,
ILM policies, and ingest pipelines. APM Server will only send data to {es} _after_ the APM integration has been installed.
See <<apm-server-configuration>> for more information.

[float]
==== Breaking Changes
- APM Server now emits events with `event.duration`, and renames the field to `<event>.duration.us` in an ingest pipeline {pull}7261[7261]

[float]
==== Bug fixes
- Fix infinite loop in tail-based sampling subscriber causing high CPU and repeated Elasticsearch searches {pull}7211[7211]
- Do not overwrite `service version` if no transaction/error/... specific `service.name` is givven {pull}7281[7281]
- Fix panic when processing OpenTelemetry histogram metrics without bounds {pull}7316[7316]
- Fix mixing of labels across OpenTelemetry log records {pull}7358[7358]

[float]
==== Intake API Changes

[float]
==== Added
- `apm-server` artifacts now have the apm java-attacher.jar packaged alongside them {pull}6593[6593]
- Run the java attacher jar when configured and not in a cloud environment {pull}6617[6617]
- Added several dimensions to the aggregated transaction metrics {pull}7033[7033]
- Implemented translation of OpenTelemetry host system metrics (CPU utilization / Memory usage) {pull}7090[7090]
- Added support for storing OpenTelemetry span links as `span.links` {pull}7291[7291]
- seccomp is disabled by default {pull}7308[7308]
- Added data stream namespace configuration for standalone with `apm-server.data_streams.namespace` {pull}7314[7314]
5 changes: 2 additions & 3 deletions processor/otel/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (c *Consumer) convertResourceLogs(resourceLogs pdata.ResourceLogs, receiveT
resource := resourceLogs.Resource()
baseEvent := model.APMEvent{Processor: model.LogProcessor}
translateResourceMetadata(resource, &baseEvent)

if exportTimestamp, ok := exportTimestamp(resource); ok {
timeDelta = receiveTimestamp.Sub(exportTimestamp)
}
Expand Down Expand Up @@ -100,6 +101,7 @@ func (c *Consumer) convertLogRecord(
timeDelta time.Duration,
) model.APMEvent {
event := baseEvent
initEventLabels(&event)
event.Timestamp = record.Timestamp().AsTime().Add(timeDelta)
event.Event.Severity = int64(record.SeverityNumber())
event.Event.Action = record.Name()
Expand All @@ -126,9 +128,6 @@ func (c *Consumer) convertLogRecord(
}

func setLabels(m pdata.AttributeMap, event *model.APMEvent) {
if event.Labels == nil || event.NumericLabels == nil {
initEventLabels(event)
}
m.Range(func(k string, v pdata.AttributeValue) bool {
setLabel(k, event, ifaceAttributeValue(v))
return true
Expand Down
75 changes: 63 additions & 12 deletions processor/otel/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,19 @@ func TestConsumerConsumeLogs(t *testing.T) {
Severity: int64(pdata.SeverityNumberINFO),
Action: "doOperation()",
},
Log: model.Log{Level: "Info"},
Span: &model.Span{ID: "0200000000000000"},
Trace: model.Trace{ID: "01000000000000000000000000000000"},
Labels: model.Labels{
"key": model.LabelValue{Value: "value"},
},
NumericLabels: model.NumericLabels{
"numeric_key": model.NumericLabelValue{Value: 1234},
},
Log: model.Log{Level: "Info"},
Span: &model.Span{ID: "0200000000000000"},
Trace: model.Trace{ID: "01000000000000000000000000000000"},
Labels: model.Labels{},
NumericLabels: model.NumericLabels{},
}
test := func(name string, body interface{}, expectedMessage string) {
t.Run(name, func(t *testing.T) {
logs := newLogs(body)
logs := pdata.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
logs.ResourceLogs().At(0).Resource().Attributes().InsertString(semconv.AttributeTelemetrySDKLanguage, "go")
instrumentationLogs := resourceLogs.InstrumentationLibraryLogs().AppendEmpty()
newLogRecord(body).CopyTo(instrumentationLogs.LogRecords().AppendEmpty())

var processed model.Batch
var processor model.ProcessBatchFunc = func(_ context.Context, batch *model.Batch) error {
Expand All @@ -113,9 +113,10 @@ func TestConsumerConsumeLogs(t *testing.T) {
// TODO(marclop): How to test map body
}

func newLogs(body interface{}) pdata.Logs {
func TestConsumerConsumeLogsLabels(t *testing.T) {
logs := pdata.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
<<<<<<< HEAD
logs.ResourceLogs().At(0).Resource().Attributes().InitFromMap(map[string]pdata.AttributeValue{
semconv.AttributeTelemetrySDKLanguage: pdata.NewAttributeValueString("go"),
})
Expand All @@ -131,6 +132,56 @@ func newLogs(body interface{}) pdata.Logs {
"key": pdata.NewAttributeValueString("value"),
"numeric_key": pdata.NewAttributeValueDouble(1234),
})
=======
resourceAttrs := logs.ResourceLogs().At(0).Resource().Attributes()
resourceAttrs.InsertString(semconv.AttributeTelemetrySDKLanguage, "go")
resourceAttrs.InsertString("key0", "zero")
instrumentationLogs := resourceLogs.InstrumentationLibraryLogs().AppendEmpty()

record1 := newLogRecord("whatever")
record1.Attributes().InsertString("key1", "one")
record1.CopyTo(instrumentationLogs.LogRecords().AppendEmpty())

record2 := newLogRecord("andever")
record2.Attributes().InsertDouble("key2", 2)
record2.CopyTo(instrumentationLogs.LogRecords().AppendEmpty())

record3 := newLogRecord("amen")
record3.Attributes().InsertString("key3", "three")
record3.Attributes().InsertInt("key4", 4)
record3.CopyTo(instrumentationLogs.LogRecords().AppendEmpty())

var processed model.Batch
var processor model.ProcessBatchFunc = func(_ context.Context, batch *model.Batch) error {
if processed != nil {
panic("already processes batch")
}
processed = *batch
assert.NotNil(t, processed[0].Timestamp)
processed[0].Timestamp = time.Time{}
return nil
}
consumer := otel.Consumer{Processor: processor}
assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs))

assert.Len(t, processed, 3)
assert.Equal(t, model.Labels{"key0": {Value: "zero"}, "key1": {Value: "one"}}, processed[0].Labels)
assert.Empty(t, processed[0].NumericLabels)
assert.Equal(t, model.Labels{"key0": {Value: "zero"}}, processed[1].Labels)
assert.Equal(t, model.NumericLabels{"key2": {Value: 2}}, processed[1].NumericLabels)
assert.Equal(t, model.Labels{"key0": {Value: "zero"}, "key3": {Value: "three"}}, processed[2].Labels)
assert.Equal(t, model.NumericLabels{"key4": {Value: 4}}, processed[2].NumericLabels)
}

func newLogRecord(body interface{}) pdata.LogRecord {
otelLogRecord := pdata.NewLogRecord()
otelLogRecord.SetTraceID(pdata.NewTraceID([16]byte{1}))
otelLogRecord.SetSpanID(pdata.NewSpanID([8]byte{2}))
otelLogRecord.SetName("doOperation()")
otelLogRecord.SetSeverityNumber(pdata.SeverityNumberINFO)
otelLogRecord.SetSeverityText("Info")
otelLogRecord.SetTimestamp(pdata.NewTimestampFromTime(time.Now()))
>>>>>>> afdd45ef (processor/otel: copy shared labels for logs (#7358))

switch b := body.(type) {
case string:
Expand All @@ -146,5 +197,5 @@ func newLogs(body interface{}) pdata.Logs {
// as a map.
// otelLog.Body()
}
return logs
return otelLogRecord
}

0 comments on commit 5ddd12d

Please sign in to comment.