Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sumologicexporter): ensure immutability #1383

Merged
merged 3 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/1383.changed.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
feat(sumologicexporter): ensure immutability
20 changes: 19 additions & 1 deletion pkg/exporter/sumologicexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func TestAllSuccess(t *testing.T) {
})

logs := LogRecordsToLogs(exampleLog())
logs.MarkReadOnly()

err := test.exp.pushLogsData(context.Background(), logs)
assert.NoError(t, err)
Expand Down Expand Up @@ -200,6 +201,7 @@ func TestLogsResourceAttributesSentAsFields(t *testing.T) {
logs := LogRecordsToLogs(buffer)
logs.ResourceLogs().At(0).Resource().Attributes().PutStr("res_attr1", "1")
logs.ResourceLogs().At(0).Resource().Attributes().PutStr("res_attr2", "2")
logs.MarkReadOnly()
return logs
},
},
Expand Down Expand Up @@ -236,6 +238,8 @@ func TestAllFailed(t *testing.T) {
logsRecords2 := logsSlice.ScopeLogs().AppendEmpty().LogRecords()
logsRecords2.AppendEmpty().Body().SetStr("Another example log")

logs.MarkReadOnly()

logsExpected := plog.NewLogs()
logsSlice.CopyTo(logsExpected.ResourceLogs().AppendEmpty())

Expand Down Expand Up @@ -273,6 +277,8 @@ func TestPartiallyFailed(t *testing.T) {
logsRecords2 := logsSlice2.ScopeLogs().AppendEmpty().LogRecords()
logsRecords2.AppendEmpty().Body().SetStr("Another example log")

logs.MarkReadOnly()

logsExpected := plog.NewLogs()
logsSlice2.CopyTo(logsExpected.ResourceLogs().AppendEmpty())

Expand Down Expand Up @@ -311,6 +317,7 @@ func TestPushInvalidCompressor(t *testing.T) {
test.exp.config.CompressEncoding = "invalid"

logs := LogRecordsToLogs(exampleLog())
logs.MarkReadOnly()

err := test.exp.pushLogsData(context.Background(), logs)
assert.EqualError(t, err, "failed to initialize compressor: invalid format: invalid")
Expand Down Expand Up @@ -346,6 +353,7 @@ func TestPushFailedBatch(t *testing.T) {

logs := LogRecordsToLogs(exampleLog())
logs.ResourceLogs().EnsureCapacity(maxBufferSize + 1)
logs.MarkReadOnly()
log := logs.ResourceLogs().At(0)
rLogs := logs.ResourceLogs()

Expand All @@ -362,6 +370,7 @@ func TestPushOTLPLogsClearTimestamp(t *testing.T) {
exampleLogs := exampleLog()
exampleLogs[0].SetTimestamp(12345)
logs := LogRecordsToLogs(exampleLogs)
logs.MarkReadOnly()
return logs
}

Expand Down Expand Up @@ -438,6 +447,7 @@ func TestPushLogs_DontRemoveSourceAttributes(t *testing.T) {
resourceAttrs.PutStr("_sourceCategory", "my-source-category")
resourceAttrs.PutStr("_sourceHost", "my-source-host")
resourceAttrs.PutStr("_sourceName", "my-source-name")
logs.MarkReadOnly()

return logs
}
Expand Down Expand Up @@ -506,6 +516,7 @@ gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1
test.exp.config.MetricFormat = PrometheusFormat

metric := metricAndAttributesToPdataMetrics(tc.metricFunc())
metric.MarkReadOnly()

err := test.exp.pushMetricsData(context.Background(), metric)
assert.NoError(t, err)
Expand Down Expand Up @@ -575,6 +586,7 @@ gauge_metric_name{test="test_value",test2="second_value",remote_name="156955",ur
attrs,
metricSum, metricGauge,
)
metrics.MarkReadOnly()
return metrics
},
expectedError: "failed sending data: status: 500 Internal Server Error",
Expand Down Expand Up @@ -632,6 +644,7 @@ gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1

func TestPushMetricsInvalidCompressor(t *testing.T) {
metrics := metricAndAttributesToPdataMetrics(exampleIntMetric())
metrics.MarkReadOnly()

// Expect no requests
test := prepareExporterTest(t, createTestConfig(), nil)
Expand All @@ -658,6 +671,8 @@ func TestMetricsPrometheusFormatMetadataFilter(t *testing.T) {
attrs.PutStr("key1", "value1")
attrs.PutStr("key2", "value2")

metrics.MarkReadOnly()

err := test.exp.pushMetricsData(context.Background(), metrics)
assert.NoError(t, err)
}
Expand Down Expand Up @@ -692,7 +707,9 @@ func Benchmark_ExporterPushLogs(b *testing.B) {
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
err := exp.pushLogsData(context.Background(), LogRecordsToLogs(exampleNLogs(128)))
logs := LogRecordsToLogs(exampleNLogs(128))
logs.MarkReadOnly()
err := exp.pushLogsData(context.Background(), logs)
if err != nil {
b.Logf("Failed pushing logs: %v", err)
}
Expand All @@ -710,6 +727,7 @@ func TestSendEmptyLogsOTLP(t *testing.T) {
})

logs := plog.NewLogs()
logs.MarkReadOnly()

err := test.exp.pushLogsData(context.Background(), logs)
assert.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/exporter/sumologicexporter/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ func (s *sender) sendNonOTLPMetrics(ctx context.Context, md pmetric.Metrics) (pm
if err := s.send(ctx, MetricsPipeline, body.toCountingReader(), previousFields); err != nil {
errs = append(errs, err)
for _, resource := range currentResources {
resource.MoveTo(droppedMetrics.ResourceMetrics().AppendEmpty())
resource.CopyTo(droppedMetrics.ResourceMetrics().AppendEmpty())
}
}
body.Reset()
Expand Down Expand Up @@ -589,7 +589,7 @@ func (s *sender) sendNonOTLPMetrics(ctx context.Context, md pmetric.Metrics) (pm
// failed at sending, add the resource to the dropped metrics
// move instead of copy here to avoid duplicating data in memory on failure
for _, resource := range currentResources {
resource.MoveTo(droppedMetrics.ResourceMetrics().AppendEmpty())
resource.CopyTo(droppedMetrics.ResourceMetrics().AppendEmpty())
}
}
}
Expand All @@ -607,7 +607,7 @@ func (s *sender) sendNonOTLPMetrics(ctx context.Context, md pmetric.Metrics) (pm
if err := s.send(ctx, MetricsPipeline, body.toCountingReader(), flds); err != nil {
errs = append(errs, err)
for _, resource := range currentResources {
resource.MoveTo(droppedMetrics.ResourceMetrics().AppendEmpty())
resource.CopyTo(droppedMetrics.ResourceMetrics().AppendEmpty())
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/exporter/sumologicexporter/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,8 @@ func TestSendLogsOTLP(t *testing.T) {
logRecords[i].MoveTo(ls.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty())
}

l.MarkReadOnly()

assert.NoError(t, test.s.sendOTLPLogs(context.Background(), l))
assert.EqualValues(t, 1, *test.reqCounter)
}
Expand Down Expand Up @@ -1126,6 +1128,7 @@ func TestSendMetrics(t *testing.T) {
attrs,
metricSum, metricGauge,
)
metrics.MarkReadOnly()

_, errs := test.s.sendNonOTLPMetrics(context.Background(), metrics)
assert.Empty(t, errs)
Expand Down Expand Up @@ -1153,6 +1156,7 @@ func TestSendMetricsSplit(t *testing.T) {
attrs,
metricSum, metricGauge,
)
metrics.MarkReadOnly()

_, errs := test.s.sendNonOTLPMetrics(context.Background(), metrics)
assert.Empty(t, errs)
Expand All @@ -1179,6 +1183,7 @@ func TestSendOTLPHistogram(t *testing.T) {
rms := metrics.ResourceMetrics().AppendEmpty()
attrs.CopyTo(rms.Resource().Attributes())
metricHistogram.CopyTo(rms.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty())
metrics.MarkReadOnly()

err := test.s.sendOTLPMetrics(context.Background(), metrics)
assert.NoError(t, err)
Expand Down Expand Up @@ -1216,6 +1221,7 @@ func TestSendMetricsSplitBySource(t *testing.T) {
attrs.CopyTo(rmsGauge.Resource().Attributes())
rmsGauge.Resource().Attributes().PutStr("_sourceHost", "value2")
metricGauge.CopyTo(rmsGauge.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty())
metrics.MarkReadOnly()

_, errs := test.s.sendNonOTLPMetrics(context.Background(), metrics)
assert.Empty(t, errs)
Expand Down Expand Up @@ -1253,6 +1259,7 @@ func TestSendMetricsSplitFailedOne(t *testing.T) {
rmsGauge := metrics.ResourceMetrics().AppendEmpty()
attrs.CopyTo(rmsGauge.Resource().Attributes())
metricGauge.CopyTo(rmsGauge.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty())
metrics.MarkReadOnly()

dropped, errs := test.s.sendNonOTLPMetrics(context.Background(), metrics)
assert.Len(t, errs, 1)
Expand Down Expand Up @@ -1295,6 +1302,7 @@ func TestSendMetricsSplitFailedAll(t *testing.T) {
rmsGauge := metrics.ResourceMetrics().AppendEmpty()
attrs.CopyTo(rmsGauge.Resource().Attributes())
metricGauge.CopyTo(rmsGauge.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty())
metrics.MarkReadOnly()

dropped, errs := test.s.sendNonOTLPMetrics(context.Background(), metrics)
assert.Len(t, errs, 2)
Expand All @@ -1320,6 +1328,7 @@ func TestSendMetricsUnexpectedFormat(t *testing.T) {

metricSum, attrs := exampleIntMetric()
metrics := metricAndAttrsToPdataMetrics(attrs, metricSum)
metrics.MarkReadOnly()

dropped, errs := test.s.sendNonOTLPMetrics(context.Background(), metrics)
assert.Len(t, errs, 1)
Expand Down
2 changes: 2 additions & 0 deletions pkg/exporter/sumologicexporter/test_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func metricPairToMetrics(mp ...metricPair) pmetric.Metrics {
record.metric.CopyTo(rms.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty())
}

metrics.MarkReadOnly()
return metrics
}

Expand Down Expand Up @@ -316,5 +317,6 @@ func exampleTrace() ptrace.Traces {
span.SetStartTimestamp(1544712660000000000)
span.SetEndTimestamp(1544712661000000000)
span.Attributes().PutInt("attr1", 55)
td.MarkReadOnly()
return td
}
4 changes: 2 additions & 2 deletions pkg/exporter/syslogexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ func (se *syslogexporter) pushLogsData(ctx context.Context, ld plog.Logs) error
for i := range dropped {
rls := ld.ResourceLogs().AppendEmpty()
logRecords := rls.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
dropped[i].resource.MoveTo(rls.Resource())
dropped[i].resource.CopyTo(rls.Resource())
for j := 0; j < len(dropped[i].records); j++ {
dropped[i].records[j].MoveTo(logRecords)
dropped[i].records[j].CopyTo(logRecords)
}
}
errs = deduplicateErrors(errs)
Expand Down