Skip to content

Commit

Permalink
feat(sumologicexporter): ensure immutability (#1383)
Browse files Browse the repository at this point in the history
* feat: ensure immutability for non-otlp metrics and logs

Signed-off-by: Dominik Rosiek <drosiek@sumologic.com>

* tests(sumologicexporter): set objects to read only in sender and exporter tests

Signed-off-by: Dominik Rosiek <drosiek@sumologic.com>

* chore: changelog

Signed-off-by: Dominik Rosiek <drosiek@sumologic.com>

---------

Signed-off-by: Dominik Rosiek <drosiek@sumologic.com>
  • Loading branch information
sumo-drosiek committed Dec 8, 2023
1 parent 86be332 commit 3055aea
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 6 deletions.
1 change: 1 addition & 0 deletions .changelog/1383.changed.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
feat(sumologicexporter): ensure immutability
20 changes: 19 additions & 1 deletion pkg/exporter/sumologicexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func TestAllSuccess(t *testing.T) {
})

logs := LogRecordsToLogs(exampleLog())
logs.MarkReadOnly()

err := test.exp.pushLogsData(context.Background(), logs)
assert.NoError(t, err)
Expand Down Expand Up @@ -200,6 +201,7 @@ func TestLogsResourceAttributesSentAsFields(t *testing.T) {
logs := LogRecordsToLogs(buffer)
logs.ResourceLogs().At(0).Resource().Attributes().PutStr("res_attr1", "1")
logs.ResourceLogs().At(0).Resource().Attributes().PutStr("res_attr2", "2")
logs.MarkReadOnly()
return logs
},
},
Expand Down Expand Up @@ -236,6 +238,8 @@ func TestAllFailed(t *testing.T) {
logsRecords2 := logsSlice.ScopeLogs().AppendEmpty().LogRecords()
logsRecords2.AppendEmpty().Body().SetStr("Another example log")

logs.MarkReadOnly()

logsExpected := plog.NewLogs()
logsSlice.CopyTo(logsExpected.ResourceLogs().AppendEmpty())

Expand Down Expand Up @@ -273,6 +277,8 @@ func TestPartiallyFailed(t *testing.T) {
logsRecords2 := logsSlice2.ScopeLogs().AppendEmpty().LogRecords()
logsRecords2.AppendEmpty().Body().SetStr("Another example log")

logs.MarkReadOnly()

logsExpected := plog.NewLogs()
logsSlice2.CopyTo(logsExpected.ResourceLogs().AppendEmpty())

Expand Down Expand Up @@ -311,6 +317,7 @@ func TestPushInvalidCompressor(t *testing.T) {
test.exp.config.CompressEncoding = "invalid"

logs := LogRecordsToLogs(exampleLog())
logs.MarkReadOnly()

err := test.exp.pushLogsData(context.Background(), logs)
assert.EqualError(t, err, "failed to initialize compressor: invalid format: invalid")
Expand Down Expand Up @@ -346,6 +353,7 @@ func TestPushFailedBatch(t *testing.T) {

logs := LogRecordsToLogs(exampleLog())
logs.ResourceLogs().EnsureCapacity(maxBufferSize + 1)
logs.MarkReadOnly()
log := logs.ResourceLogs().At(0)
rLogs := logs.ResourceLogs()

Expand All @@ -362,6 +370,7 @@ func TestPushOTLPLogsClearTimestamp(t *testing.T) {
exampleLogs := exampleLog()
exampleLogs[0].SetTimestamp(12345)
logs := LogRecordsToLogs(exampleLogs)
logs.MarkReadOnly()
return logs
}

Expand Down Expand Up @@ -438,6 +447,7 @@ func TestPushLogs_DontRemoveSourceAttributes(t *testing.T) {
resourceAttrs.PutStr("_sourceCategory", "my-source-category")
resourceAttrs.PutStr("_sourceHost", "my-source-host")
resourceAttrs.PutStr("_sourceName", "my-source-name")
logs.MarkReadOnly()

return logs
}
Expand Down Expand Up @@ -506,6 +516,7 @@ gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1
test.exp.config.MetricFormat = PrometheusFormat

metric := metricAndAttributesToPdataMetrics(tc.metricFunc())
metric.MarkReadOnly()

err := test.exp.pushMetricsData(context.Background(), metric)
assert.NoError(t, err)
Expand Down Expand Up @@ -575,6 +586,7 @@ gauge_metric_name{test="test_value",test2="second_value",remote_name="156955",ur
attrs,
metricSum, metricGauge,
)
metrics.MarkReadOnly()
return metrics
},
expectedError: "failed sending data: status: 500 Internal Server Error",
Expand Down Expand Up @@ -632,6 +644,7 @@ gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1

func TestPushMetricsInvalidCompressor(t *testing.T) {
metrics := metricAndAttributesToPdataMetrics(exampleIntMetric())
metrics.MarkReadOnly()

// Expect no requests
test := prepareExporterTest(t, createTestConfig(), nil)
Expand All @@ -658,6 +671,8 @@ func TestMetricsPrometheusFormatMetadataFilter(t *testing.T) {
attrs.PutStr("key1", "value1")
attrs.PutStr("key2", "value2")

metrics.MarkReadOnly()

err := test.exp.pushMetricsData(context.Background(), metrics)
assert.NoError(t, err)
}
Expand Down Expand Up @@ -692,7 +707,9 @@ func Benchmark_ExporterPushLogs(b *testing.B) {
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
err := exp.pushLogsData(context.Background(), LogRecordsToLogs(exampleNLogs(128)))
logs := LogRecordsToLogs(exampleNLogs(128))
logs.MarkReadOnly()
err := exp.pushLogsData(context.Background(), logs)
if err != nil {
b.Logf("Failed pushing logs: %v", err)
}
Expand All @@ -710,6 +727,7 @@ func TestSendEmptyLogsOTLP(t *testing.T) {
})

logs := plog.NewLogs()
logs.MarkReadOnly()

err := test.exp.pushLogsData(context.Background(), logs)
assert.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/exporter/sumologicexporter/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ func (s *sender) sendNonOTLPMetrics(ctx context.Context, md pmetric.Metrics) (pm
if err := s.send(ctx, MetricsPipeline, body.toCountingReader(), previousFields); err != nil {
errs = append(errs, err)
for _, resource := range currentResources {
resource.MoveTo(droppedMetrics.ResourceMetrics().AppendEmpty())
resource.CopyTo(droppedMetrics.ResourceMetrics().AppendEmpty())
}
}
body.Reset()
Expand Down Expand Up @@ -589,7 +589,7 @@ func (s *sender) sendNonOTLPMetrics(ctx context.Context, md pmetric.Metrics) (pm
// failed at sending, add the resource to the dropped metrics
// move instead of copy here to avoid duplicating data in memory on failure
for _, resource := range currentResources {
resource.MoveTo(droppedMetrics.ResourceMetrics().AppendEmpty())
resource.CopyTo(droppedMetrics.ResourceMetrics().AppendEmpty())
}
}
}
Expand All @@ -607,7 +607,7 @@ func (s *sender) sendNonOTLPMetrics(ctx context.Context, md pmetric.Metrics) (pm
if err := s.send(ctx, MetricsPipeline, body.toCountingReader(), flds); err != nil {
errs = append(errs, err)
for _, resource := range currentResources {
resource.MoveTo(droppedMetrics.ResourceMetrics().AppendEmpty())
resource.CopyTo(droppedMetrics.ResourceMetrics().AppendEmpty())
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/exporter/sumologicexporter/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,8 @@ func TestSendLogsOTLP(t *testing.T) {
logRecords[i].MoveTo(ls.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty())
}

l.MarkReadOnly()

assert.NoError(t, test.s.sendOTLPLogs(context.Background(), l))
assert.EqualValues(t, 1, *test.reqCounter)
}
Expand Down Expand Up @@ -1126,6 +1128,7 @@ func TestSendMetrics(t *testing.T) {
attrs,
metricSum, metricGauge,
)
metrics.MarkReadOnly()

_, errs := test.s.sendNonOTLPMetrics(context.Background(), metrics)
assert.Empty(t, errs)
Expand Down Expand Up @@ -1153,6 +1156,7 @@ func TestSendMetricsSplit(t *testing.T) {
attrs,
metricSum, metricGauge,
)
metrics.MarkReadOnly()

_, errs := test.s.sendNonOTLPMetrics(context.Background(), metrics)
assert.Empty(t, errs)
Expand All @@ -1179,6 +1183,7 @@ func TestSendOTLPHistogram(t *testing.T) {
rms := metrics.ResourceMetrics().AppendEmpty()
attrs.CopyTo(rms.Resource().Attributes())
metricHistogram.CopyTo(rms.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty())
metrics.MarkReadOnly()

err := test.s.sendOTLPMetrics(context.Background(), metrics)
assert.NoError(t, err)
Expand Down Expand Up @@ -1216,6 +1221,7 @@ func TestSendMetricsSplitBySource(t *testing.T) {
attrs.CopyTo(rmsGauge.Resource().Attributes())
rmsGauge.Resource().Attributes().PutStr("_sourceHost", "value2")
metricGauge.CopyTo(rmsGauge.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty())
metrics.MarkReadOnly()

_, errs := test.s.sendNonOTLPMetrics(context.Background(), metrics)
assert.Empty(t, errs)
Expand Down Expand Up @@ -1253,6 +1259,7 @@ func TestSendMetricsSplitFailedOne(t *testing.T) {
rmsGauge := metrics.ResourceMetrics().AppendEmpty()
attrs.CopyTo(rmsGauge.Resource().Attributes())
metricGauge.CopyTo(rmsGauge.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty())
metrics.MarkReadOnly()

dropped, errs := test.s.sendNonOTLPMetrics(context.Background(), metrics)
assert.Len(t, errs, 1)
Expand Down Expand Up @@ -1295,6 +1302,7 @@ func TestSendMetricsSplitFailedAll(t *testing.T) {
rmsGauge := metrics.ResourceMetrics().AppendEmpty()
attrs.CopyTo(rmsGauge.Resource().Attributes())
metricGauge.CopyTo(rmsGauge.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty())
metrics.MarkReadOnly()

dropped, errs := test.s.sendNonOTLPMetrics(context.Background(), metrics)
assert.Len(t, errs, 2)
Expand All @@ -1320,6 +1328,7 @@ func TestSendMetricsUnexpectedFormat(t *testing.T) {

metricSum, attrs := exampleIntMetric()
metrics := metricAndAttrsToPdataMetrics(attrs, metricSum)
metrics.MarkReadOnly()

dropped, errs := test.s.sendNonOTLPMetrics(context.Background(), metrics)
assert.Len(t, errs, 1)
Expand Down
2 changes: 2 additions & 0 deletions pkg/exporter/sumologicexporter/test_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func metricPairToMetrics(mp ...metricPair) pmetric.Metrics {
record.metric.CopyTo(rms.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty())
}

metrics.MarkReadOnly()
return metrics
}

Expand Down Expand Up @@ -316,5 +317,6 @@ func exampleTrace() ptrace.Traces {
span.SetStartTimestamp(1544712660000000000)
span.SetEndTimestamp(1544712661000000000)
span.Attributes().PutInt("attr1", 55)
td.MarkReadOnly()
return td
}
4 changes: 2 additions & 2 deletions pkg/exporter/syslogexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ func (se *syslogexporter) pushLogsData(ctx context.Context, ld plog.Logs) error
for i := range dropped {
rls := ld.ResourceLogs().AppendEmpty()
logRecords := rls.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
dropped[i].resource.MoveTo(rls.Resource())
dropped[i].resource.CopyTo(rls.Resource())
for j := 0; j < len(dropped[i].records); j++ {
dropped[i].records[j].MoveTo(logRecords)
dropped[i].records[j].CopyTo(logRecords)
}
}
errs = deduplicateErrors(errs)
Expand Down

0 comments on commit 3055aea

Please sign in to comment.