Skip to content

Commit

Permalink
deprecate obsreport.Must functions in favor of obsreport.New (#6510)
Browse files Browse the repository at this point in the history
* deprecate obsreport.Must functions in favor of obsreport.New

* add changelog

* fix lint

* split changelog in two

* Update .chloggen/return_error_for_obsreport_new_functions.yaml

Co-authored-by: Bogdan Drutu <lazy@splunk.com>

Co-authored-by: Bogdan Drutu <lazy@splunk.com>
  • Loading branch information
paivagustavo and Bogdan Drutu committed Nov 10, 2022
1 parent 7a8d102 commit 20e3aac
Show file tree
Hide file tree
Showing 26 changed files with 272 additions and 126 deletions.
20 changes: 20 additions & 0 deletions .chloggen/deprecate_obsreport_must_new_functions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'deprecation'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: obsreport

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "deprecate `obsreport.MustNew[Receiver|Scraper|Processor|Exporter]` in favor of `obsreport.New[Receiver|Scraper|Processor|Exporter]`"

# One or more tracking issues or pull requests related to the change
issues: [6458]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
- Deprecate `obsreport.MustNewReceiver()` in favor of `obsreport.NewReceiver()`
- Deprecate `obsreport.MustNewScraper()` in favor of `obsreport.NewScraper()`
- Deprecate `obsreport.MustNewProcessor()` in favor of `obsreport.NewProcessor()`
- Deprecate `obsreport.MustNewExporter()` in favor of `obsreport.NewExporter()`
16 changes: 16 additions & 0 deletions .chloggen/return_error_for_obsreport_new_functions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'breaking'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: obsreport

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "`obsreport.New[Receiver|Scraper|Processor|Exporter]` returns error now"

# One or more tracking issues or pull requests related to the change
issues: [6458]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
11 changes: 8 additions & 3 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,15 @@ type baseExporter struct {
qrSender *queuedRetrySender
}

func newBaseExporter(cfg component.ExporterConfig, set component.ExporterCreateSettings, bs *baseSettings, signal component.DataType, reqUnmarshaler internal.RequestUnmarshaler) *baseExporter {
func newBaseExporter(cfg component.ExporterConfig, set component.ExporterCreateSettings, bs *baseSettings, signal component.DataType, reqUnmarshaler internal.RequestUnmarshaler) (*baseExporter, error) {
be := &baseExporter{}

be.obsrep = newObsExporter(obsreport.ExporterSettings{ExporterID: cfg.ID(), ExporterCreateSettings: set}, globalInstruments)
var err error
be.obsrep, err = newObsExporter(obsreport.ExporterSettings{ExporterID: cfg.ID(), ExporterCreateSettings: set}, globalInstruments)
if err != nil {
return nil, err
}

be.qrSender = newQueuedRetrySender(cfg.ID(), signal, bs.QueueSettings, bs.RetrySettings, reqUnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.sender = be.qrSender
be.StartFunc = func(ctx context.Context, host component.Host) error {
Expand All @@ -176,7 +181,7 @@ func newBaseExporter(cfg component.ExporterConfig, set component.ExporterCreateS
// Last shutdown the wrapped exporter itself.
return bs.ShutdownFunc.Shutdown(ctx)
}
return be
return be, nil
}

// wrapConsumerSender wraps the consumer sender (the sender that uses retries and timeout) with the given wrapper.
Expand Down
6 changes: 4 additions & 2 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ var (
)

func TestBaseExporter(t *testing.T) {
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(), "", nopRequestUnmarshaler())
be, err := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(), "", nopRequestUnmarshaler())
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
}

func TestBaseExporterWithOptions(t *testing.T) {
want := errors.New("my error")
be := newBaseExporter(
be, err := newBaseExporter(
&defaultExporterCfg,
componenttest.NewNopExporterCreateSettings(),
fromOptions(
Expand All @@ -57,6 +58,7 @@ func TestBaseExporterWithOptions(t *testing.T) {
"",
nopRequestUnmarshaler(),
)
require.NoError(t, err)
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
require.Equal(t, want, be.Shutdown(context.Background()))
}
Expand Down
11 changes: 7 additions & 4 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ func NewLogsExporter(
}

bs := fromOptions(options...)
be := newBaseExporter(cfg, set, bs, component.DataTypeLogs, newLogsRequestUnmarshalerFunc(pusher))
be, err := newBaseExporter(cfg, set, bs, component.DataTypeLogs, newLogsRequestUnmarshalerFunc(pusher))
if err != nil {
return nil, err
}
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &logsExporterWithObservability{
obsrep: be.obsrep,
Expand All @@ -108,11 +111,11 @@ func NewLogsExporter(

lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
req := newLogsRequest(ctx, ld, pusher)
err := be.sender.send(req)
if errors.Is(err, errSendingQueueIsFull) {
serr := be.sender.send(req)
if errors.Is(serr, errSendingQueueIsFull) {
be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.Count()))
}
return err
return serr
}, bs.consumerOptions...)

return &logsExporter{
Expand Down
11 changes: 7 additions & 4 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ func NewMetricsExporter(
}

bs := fromOptions(options...)
be := newBaseExporter(cfg, set, bs, component.DataTypeMetrics, newMetricsRequestUnmarshalerFunc(pusher))
be, err := newBaseExporter(cfg, set, bs, component.DataTypeMetrics, newMetricsRequestUnmarshalerFunc(pusher))
if err != nil {
return nil, err
}
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &metricsSenderWithObservability{
obsrep: be.obsrep,
Expand All @@ -109,11 +112,11 @@ func NewMetricsExporter(

mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
req := newMetricsRequest(ctx, md, pusher)
err := be.sender.send(req)
if errors.Is(err, errSendingQueueIsFull) {
serr := be.sender.send(req)
if errors.Is(serr, errSendingQueueIsFull) {
be.obsrep.recordMetricsEnqueueFailure(req.Context(), int64(req.Count()))
}
return err
return serr
}, bs.consumerOptions...)

return &metricsExporter{
Expand Down
11 changes: 8 additions & 3 deletions exporter/exporterhelper/obsreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,23 @@ type obsExporter struct {
}

// newObsExporter creates a new observability exporter.
func newObsExporter(cfg obsreport.ExporterSettings, insts *instruments) *obsExporter {
func newObsExporter(cfg obsreport.ExporterSettings, insts *instruments) (*obsExporter, error) {
labelValue := metricdata.NewLabelValue(cfg.ExporterID.String())
failedToEnqueueTraceSpansEntry, _ := insts.failedToEnqueueTraceSpans.GetEntry(labelValue)
failedToEnqueueMetricPointsEntry, _ := insts.failedToEnqueueMetricPoints.GetEntry(labelValue)
failedToEnqueueLogRecordsEntry, _ := insts.failedToEnqueueLogRecords.GetEntry(labelValue)

exp, err := obsreport.NewExporter(cfg)
if err != nil {
return nil, err
}

return &obsExporter{
Exporter: obsreport.MustNewExporter(cfg),
Exporter: exp,
failedToEnqueueTraceSpansEntry: failedToEnqueueTraceSpansEntry,
failedToEnqueueMetricPointsEntry: failedToEnqueueMetricPointsEntry,
failedToEnqueueLogRecordsEntry: failedToEnqueueLogRecordsEntry,
}
}, nil
}

// recordTracesEnqueueFailure records number of spans that failed to be added to the sending queue.
Expand Down
3 changes: 2 additions & 1 deletion exporter/exporterhelper/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ func TestExportEnqueueFailure(t *testing.T) {
exporter := component.NewID("fakeExporter")

insts := newInstruments(metric.NewRegistry())
obsrep := newObsExporter(obsreport.ExporterSettings{
obsrep, err := newObsExporter(obsreport.ExporterSettings{
ExporterID: exporter,
ExporterCreateSettings: tt.ToExporterCreateSettings(),
}, insts)
require.NoError(t, err)

logRecords := int64(7)
obsrep.recordLogsEnqueueFailure(context.Background(), logRecords)
Expand Down
47 changes: 31 additions & 16 deletions exporter/exporterhelper/queued_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) {
qCfg := NewDefaultQueueSettings()
rCfg := NewDefaultRetrySettings()
mockR := newMockRequest(context.Background(), 2, consumererror.NewPermanent(errors.New("bad data")))
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", mockRequestUnmarshaler(mockR))
be, err := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", mockRequestUnmarshaler(mockR))
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -73,7 +74,8 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) {
qCfg := NewDefaultQueueSettings()
rCfg := NewDefaultRetrySettings()
rCfg.Enabled = false
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
be, err := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -98,7 +100,8 @@ func TestQueuedRetry_OnError(t *testing.T) {
qCfg.NumConsumers = 1
rCfg := NewDefaultRetrySettings()
rCfg.InitialInterval = 0
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
be, err := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -124,7 +127,8 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 1
rCfg := NewDefaultRetrySettings()
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
be, err := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -157,7 +161,8 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 1
rCfg := NewDefaultRetrySettings()
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
be, err := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -186,7 +191,8 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) {
rCfg := NewDefaultRetrySettings()
rCfg.InitialInterval = time.Millisecond
rCfg.MaxElapsedTime = 100 * time.Millisecond
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
be, err := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -232,7 +238,8 @@ func TestQueuedRetry_ThrottleError(t *testing.T) {
qCfg.NumConsumers = 1
rCfg := NewDefaultRetrySettings()
rCfg.InitialInterval = 10 * time.Millisecond
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
be, err := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -264,7 +271,8 @@ func TestQueuedRetry_RetryOnError(t *testing.T) {
qCfg.QueueSize = 1
rCfg := NewDefaultRetrySettings()
rCfg.InitialInterval = 0
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
be, err := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -290,14 +298,15 @@ func TestQueuedRetry_DropOnFull(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.QueueSize = 0
rCfg := NewDefaultRetrySettings()
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
be, err := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})
err := be.sender.send(newMockRequest(context.Background(), 2, errors.New("transient error")))
err = be.sender.send(newMockRequest(context.Background(), 2, errors.New("transient error")))
require.Error(t, err)
}

Expand All @@ -308,7 +317,8 @@ func TestQueuedRetryHappyPath(t *testing.T) {

qCfg := NewDefaultQueueSettings()
rCfg := NewDefaultRetrySettings()
be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
be, err := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -342,7 +352,8 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 0 // to make every request go straight to the queue
rCfg := NewDefaultRetrySettings()
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
be, err := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

checkValueForGlobalManager(t, defaultExporterTags, int64(5000), "exporter/queue_capacity")
Expand Down Expand Up @@ -475,7 +486,8 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) {
qCfg.NumConsumers = 1
rCfg := NewDefaultRetrySettings()
rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
be, err := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
be.qrSender.requeuingEnabled = true
Expand Down Expand Up @@ -506,7 +518,8 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) {
qCfg.QueueSize = 0
rCfg := NewDefaultRetrySettings()
rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
be, err := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
require.NoError(t, err)
be.qrSender.requeuingEnabled = true
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
Expand All @@ -529,7 +542,8 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) {
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence
rCfg := NewDefaultRetrySettings()
be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
be, err := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
require.NoError(t, err)

var extensions = map[component.ID]component.Extension{
storageID: &mockStorageExtension{},
Expand All @@ -551,7 +565,8 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence
rCfg := NewDefaultRetrySettings()
be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
be, err := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
require.NoError(t, err)

var extensions = map[component.ID]component.Extension{
storageID: &mockStorageExtension{GetClientError: storageError},
Expand Down
11 changes: 7 additions & 4 deletions exporter/exporterhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ func NewTracesExporter(
}

bs := fromOptions(options...)
be := newBaseExporter(cfg, set, bs, component.DataTypeTraces, newTraceRequestUnmarshalerFunc(pusher))
be, err := newBaseExporter(cfg, set, bs, component.DataTypeTraces, newTraceRequestUnmarshalerFunc(pusher))
if err != nil {
return nil, err
}
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &tracesExporterWithObservability{
obsrep: be.obsrep,
Expand All @@ -109,11 +112,11 @@ func NewTracesExporter(

tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error {
req := newTracesRequest(ctx, td, pusher)
err := be.sender.send(req)
if errors.Is(err, errSendingQueueIsFull) {
serr := be.sender.send(req)
if errors.Is(serr, errSendingQueueIsFull) {
be.obsrep.recordTracesEnqueueFailure(req.Context(), int64(req.Count()))
}
return err
return serr
}, bs.consumerOptions...)

return &traceExporter{
Expand Down
Loading

0 comments on commit 20e3aac

Please sign in to comment.