Skip to content

Commit

Permalink
[exporterhelper] Fix small batch due to scheduling in batch sender
Browse files Browse the repository at this point in the history
Drop the number of goroutines in batch at once to workaround unfavorable
goroutine scheduling.

Fixes open-telemetry#9952
  • Loading branch information
carsonip committed Jun 5, 2024
1 parent 760f773 commit 01b07b7
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 42 deletions.
20 changes: 20 additions & 0 deletions .chloggen/fix_batch_sender_small_batch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix small batch due to unfavorable goroutine scheduling in batch sender

# One or more tracking issues or pull requests related to the change
issues: [9952]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
64 changes: 42 additions & 22 deletions exporter/exporterhelper/batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type batchSender struct {
// concurrencyLimit is the maximum number of goroutines that can be blocked by the batcher.
// If this number is reached and all the goroutines are busy, the batch will be sent right away.
// Populated from the number of queue consumers if queue is enabled.
concurrencyLimit uint64
activeRequests atomic.Uint64
concurrencyLimit int64
activeRequests atomic.Int64

resetTimerCh chan struct{}

Expand Down Expand Up @@ -74,7 +74,9 @@ func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
for bs.activeRequests.Load() > 0 {
bs.mu.Lock()
if bs.activeBatch.request != nil {
bs.exportActiveBatch()
bs.exportActiveBatch(func(b *batch) {
bs.activeRequests.Add(-b.requests)
})
}
bs.mu.Unlock()
}
Expand All @@ -86,7 +88,9 @@ func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
case <-timer.C:
bs.mu.Lock()
if bs.activeBatch.request != nil {
bs.exportActiveBatch()
bs.exportActiveBatch(func(b *batch) {
bs.activeRequests.Add(-b.requests)
})
}
bs.mu.Unlock()
timer.Reset(bs.cfg.FlushTimeout)
Expand All @@ -103,10 +107,11 @@ func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
}

type batch struct {
ctx context.Context
request Request
done chan struct{}
err error
ctx context.Context
request Request
done chan struct{}
err error
requests int64 // number of requests in this batch
}

func newEmptyBatch() *batch {
Expand All @@ -118,8 +123,9 @@ func newEmptyBatch() *batch {

// exportActiveBatch exports the active batch asynchronously and replaces it with a new one.
// Caller must hold the lock.
func (bs *batchSender) exportActiveBatch() {
func (bs *batchSender) exportActiveBatch(callback func(*batch)) {
go func(b *batch) {
defer callback(b)
b.err = bs.nextSender.send(b.ctx, b.request)
close(b.done)
}(bs.activeBatch)
Expand Down Expand Up @@ -155,28 +161,40 @@ func (bs *batchSender) send(ctx context.Context, req Request) error {
// sendMergeSplitBatch sends the request to the batch which may be split into multiple requests.
func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) error {
bs.mu.Lock()
bs.activeRequests.Add(1)
defer bs.activeRequests.Add(^uint64(0))

reqs, err := bs.mergeSplitFunc(ctx, bs.cfg.MaxSizeConfig, bs.activeBatch.request, req)
if err != nil || len(reqs) == 0 {
bs.mu.Unlock()
return err
}

bs.activeRequests.Add(1)
if len(reqs) == 1 || bs.activeBatch.request != nil {
bs.updateActiveBatch(ctx, reqs[0])
batch := bs.activeBatch
if bs.isActiveBatchReady() || len(reqs) > 1 {
bs.exportActiveBatch()
b := bs.activeBatch
b.requests++
leftover := len(reqs) > 1
if bs.isActiveBatchReady() || leftover {
bs.exportActiveBatch(func(b *batch) {
if leftover {
// Since there will be some leftover requests after exporting active batch,
// release all requests in this batch except one.
bs.activeRequests.Add(-b.requests + 1)
defer bs.activeRequests.Add(-1)
} else {
bs.activeRequests.Add(-b.requests)
}
})
bs.resetTimer()
}
bs.mu.Unlock()
<-batch.done
if batch.err != nil {
return batch.err
<-b.done
if b.err != nil {
return b.err
}
reqs = reqs[1:]
} else {
defer bs.activeRequests.Add(-1)
bs.mu.Unlock()
}

Expand All @@ -194,7 +212,6 @@ func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) err
func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error {
bs.mu.Lock()
bs.activeRequests.Add(1)
defer bs.activeRequests.Add(^uint64(0))

if bs.activeBatch.request != nil {
var err error
Expand All @@ -205,14 +222,17 @@ func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error {
}
}
bs.updateActiveBatch(ctx, req)
batch := bs.activeBatch
b := bs.activeBatch
b.requests++
if bs.isActiveBatchReady() {
bs.exportActiveBatch()
bs.exportActiveBatch(func(b *batch) {
bs.activeRequests.Add(-b.requests)
})
bs.resetTimer()
}
bs.mu.Unlock()
<-batch.done
return batch.err
<-b.done
return b.err
}

// updateActiveBatch update the active batch to the new merged request and context.
Expand Down
84 changes: 65 additions & 19 deletions exporter/exporterhelper/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestBatchSender_BatchExportError(t *testing.T) {
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == tt.expectedRequests &&
sink.itemsCount.Load() == tt.expectedItems &&
be.batchSender.(*batchSender).activeRequests.Load() == uint64(0) &&
be.batchSender.(*batchSender).activeRequests.Load() == 0 &&
be.queueSender.(*queueSender).queue.Size() == 0
}, 100*time.Millisecond, 10*time.Millisecond)
})
Expand Down Expand Up @@ -270,27 +270,73 @@ func TestBatchSender_PostShutdown(t *testing.T) {
}

func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {
qCfg := exporterqueue.NewDefaultConfig()
qCfg.NumConsumers = 2
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender,
WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)),
WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[Request]()))
require.NotNil(t, be)
require.NoError(t, err)
assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})
cfg := exporterbatcher.NewDefaultConfig()
tests := []struct {
name string
batcherOption Option
expectedRequests uint64
expectedItems uint64
}{
{
name: "merge_only",
batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)),
expectedRequests: 3,
expectedItems: 36,
},
{
name: "merge_without_split_triggered",
batcherOption: func() Option {
c := cfg
c.MaxSizeItems = 200
return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))
}(),
expectedRequests: 3,
expectedItems: 36,
},
{
name: "merge_with_split_triggered",
batcherOption: func() Option {
c := cfg
c.MaxSizeItems = 10
return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))
}(),
expectedRequests: 5,
expectedItems: 36,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
qCfg := exporterqueue.NewDefaultConfig()
qCfg.NumConsumers = 2
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender,
tt.batcherOption,
WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[Request]()))
require.NotNil(t, be)
require.NoError(t, err)
assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})

sink := newFakeRequestSink()
assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink}))
sink := newFakeRequestSink()
assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink}))

// the second request should be sent by reaching max concurrency limit.
assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink}))
// the second request should be sent by reaching max concurrency limit.
assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink}))

assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 16
}, 100*time.Millisecond, 10*time.Millisecond)
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 4
}, 100*time.Millisecond, 10*time.Millisecond)

// do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling.
for i := 0; i < 4; i++ {
assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink}))
}
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == tt.expectedRequests && sink.itemsCount.Load() == tt.expectedItems
}, 100*time.Millisecond, 10*time.Millisecond)
})
}
}

func TestBatchSender_BatchBlocking(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, osf
if bs, ok := be.batchSender.(*batchSender); ok {
// If queue sender is enabled assign to the batch sender the same number of workers.
if qs, ok := be.queueSender.(*queueSender); ok {
bs.concurrencyLimit = uint64(qs.numConsumers)
bs.concurrencyLimit = int64(qs.numConsumers)
}
// Batcher sender mutates the data.
be.consumerOptions = append(be.consumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
Expand Down

0 comments on commit 01b07b7

Please sign in to comment.