Skip to content

Commit

Permalink
Add more precise tests of the new behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
faec committed Feb 12, 2024
1 parent c169973 commit 5d14cf4
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 62 deletions.
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (l *ackLoop) run() {
nextBatchChan := l.pendingBatches.nextBatchChannel()

select {
case <-b.done:
case <-b.ctx.Done():
// The queue is shutting down.
return

Expand Down
48 changes: 33 additions & 15 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package memqueue

import (
"context"
"io"
"sync"
"time"
Expand All @@ -43,7 +44,8 @@ type broker struct {
settings Settings
logger *logp.Logger

done chan struct{}
ctx context.Context
ctxCancel context.CancelFunc

// The ring buffer backing the queue. All buffer positions should be taken
// modulo the size of this array.
Expand Down Expand Up @@ -158,6 +160,32 @@ func NewQueue(
ackCallback func(eventCount int),
settings Settings,
inputQueueSize int,
) *broker {
b := newQueue(logger, ackCallback, settings, inputQueueSize)

// Start the queue workers
b.wg.Add(2)
go func() {
defer b.wg.Done()
b.runLoop.run()
}()
go func() {
defer b.wg.Done()
b.ackLoop.run()
}()

return b
}

// newQueue does most of the work of creating a queue from the given
// parameters, but doesn't start the runLoop or ackLoop workers. This
// lets us perform more granular / deterministic tests by controlling
// when the workers are active.
func newQueue(
logger *logp.Logger,
ackCallback func(eventCount int),
settings Settings,
inputQueueSize int,
) *broker {
chanSize := AdjustInputQueueSize(inputQueueSize, settings.Events)

Expand All @@ -181,7 +209,6 @@ func NewQueue(

b := &broker{
settings: settings,
done: make(chan struct{}),
logger: logger,

buf: make([]queueEntry, settings.Events),
Expand All @@ -198,25 +225,16 @@ func NewQueue(

ackCallback: ackCallback,
}
b.ctx, b.ctxCancel = context.WithCancel(context.Background())

b.runLoop = newRunLoop(b)
b.ackLoop = newACKLoop(b)

b.wg.Add(2)
go func() {
defer b.wg.Done()
b.runLoop.run()
}()
go func() {
defer b.wg.Done()
b.ackLoop.run()
}()

return b
}

func (b *broker) Close() error {
close(b.done)
b.ctxCancel()
return nil
}

Expand All @@ -237,7 +255,7 @@ func (b *broker) Producer(cfg queue.ProducerConfig) queue.Producer {
func (b *broker) Get(count int) (queue.Batch, error) {
responseChan := make(chan *batch, 1)
select {
case <-b.done:
case <-b.ctx.Done():
return nil, io.EOF
case b.getChan <- getRequest{
entryCount: count, responseChan: responseChan}:
Expand All @@ -252,7 +270,7 @@ func (b *broker) Metrics() (queue.Metrics, error) {

responseChan := make(chan memQueueMetrics, 1)
select {
case <-b.done:
case <-b.ctx.Done():
return queue.Metrics{}, io.EOF
case b.metricChan <- metricsRequest{
responseChan: responseChan}:
Expand Down
98 changes: 52 additions & 46 deletions libbeat/publisher/queue/memqueue/runloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,63 +81,69 @@ func newRunLoop(broker *broker) *runLoop {
}

func (l *runLoop) run() {
for {
var pushChan chan pushRequest
// Push requests are enabled if the queue isn't yet full.
if l.eventCount < len(l.broker.buf) {
pushChan = l.broker.pushChan
}
for l.broker.ctx.Err() == nil {
l.runIteration()
}
}

var getChan chan getRequest
// Get requests are enabled if the queue has events that weren't yet sent
// to consumers, and no existing request is active.
if l.pendingGetRequest == nil && l.eventCount > l.consumedCount {
getChan = l.broker.getChan
}
// Perform one iteration of the queue's main run loop. Broken out into a
// standalone helper function to allow testing of loop invariants.
func (l *runLoop) runIteration() {
var pushChan chan pushRequest
// Push requests are enabled if the queue isn't yet full.
if l.eventCount < len(l.broker.buf) {
pushChan = l.broker.pushChan
}

var consumedChan chan batchList
// Enable sending to the scheduled ACKs channel if we have
// something to send.
if !l.consumedBatches.empty() {
consumedChan = l.broker.consumedChan
}
var getChan chan getRequest
// Get requests are enabled if the queue has events that weren't yet sent
// to consumers, and no existing request is active.
if l.pendingGetRequest == nil && l.eventCount > l.consumedCount {
getChan = l.broker.getChan
}

var timeoutChan <-chan time.Time
// Enable the timeout channel if a get request is waiting for events
if l.pendingGetRequest != nil {
timeoutChan = l.getTimer.C
}
var consumedChan chan batchList
// Enable sending to the scheduled ACKs channel if we have
// something to send.
if !l.consumedBatches.empty() {
consumedChan = l.broker.consumedChan
}

select {
case <-l.broker.done:
return
var timeoutChan <-chan time.Time
// Enable the timeout channel if a get request is waiting for events
if l.pendingGetRequest != nil {
timeoutChan = l.getTimer.C
}

case req := <-pushChan: // producer pushing new event
l.handleInsert(&req)
select {
case <-l.broker.ctx.Done():
return

case req := <-l.broker.cancelChan: // producer cancelling active events
l.handleCancel(&req)
case req := <-pushChan: // producer pushing new event
l.handleInsert(&req)

case req := <-getChan: // consumer asking for next batch
l.handleGetRequest(&req)
case req := <-l.broker.cancelChan: // producer cancelling active events
l.handleCancel(&req)

case consumedChan <- l.consumedBatches:
// We've sent all the pending batches to the ackLoop for processing,
// clear the pending list.
l.consumedBatches = batchList{}
case req := <-getChan: // consumer asking for next batch
l.handleGetRequest(&req)

case count := <-l.broker.deleteChan:
l.handleDelete(count)
case consumedChan <- l.consumedBatches:
// We've sent all the pending batches to the ackLoop for processing,
// clear the pending list.
l.consumedBatches = batchList{}

case req := <-l.broker.metricChan: // asking broker for queue metrics
l.handleMetricsRequest(&req)
case count := <-l.broker.deleteChan:
l.handleDelete(count)

case <-timeoutChan:
// The get timer has expired, handle the blocked request
l.getTimer.Stop()
l.handleGetReply(l.pendingGetRequest)
l.pendingGetRequest = nil
}
case req := <-l.broker.metricChan: // asking broker for queue metrics
l.handleMetricsRequest(&req)

case <-timeoutChan:
// The get timer has expired, handle the blocked request
l.getTimer.Stop()
l.handleGetReply(l.pendingGetRequest)
l.pendingGetRequest = nil
}
}

Expand Down
113 changes: 113 additions & 0 deletions libbeat/publisher/queue/memqueue/runloop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package memqueue

import (
"testing"
"time"

"github.com/elastic/elastic-agent-libs/logp"

Check failure on line 24 in libbeat/publisher/queue/memqueue/runloop_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

File is not `goimports`-ed with -local github.com/elastic (goimports)
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestFlushSettingsDoNotBlockFullBatches(t *testing.T) {
// In previous versions of the queue, setting flush.min_events (currently
// corresponding to memqueue.Settings.MaxGetRequest) to a high value would
// delay get requests even if the number of requested events was immediately
// available. This test verifies that Get requests that can be completely
// filled do not wait for the flush timer.

broker := newQueue(
logp.NewLogger("testing"),
nil,
Settings{
Events: 1000,
MaxGetRequest: 500,
FlushTimeout: 10 * time.Second,
},
10)

producer := newProducer(broker, nil, nil, false)
rl := broker.runLoop
for i := 0; i < 100; i++ {
// Pair each publish call with an iteration of the run loop so we
// get a response.
go rl.runIteration()
_, ok := producer.Publish(i)
require.True(t, ok, "Queue publish call must succeed")
}

// The queue now has 100 events, but MaxGetRequest is 500.
// In the old queue, a Get call now would block until the flush
// timer expires. With current changes, it should return
// immediately on any request size up to 100.
go func() {
// Run the Get asynchronously so the test itself doesn't block if
// there's a logical error.
_, _ = broker.Get(100)
}()
rl.runIteration()
assert.Nil(t, rl.pendingGetRequest, "Queue should have no pending get request since the request should succeed immediately")
assert.Equal(t, 100, rl.consumedCount, "Queue should have a consumedCount of 100 after a consumer requested all its events")
}

func TestFlushSettingsBlockPartialBatches(t *testing.T) {
// The previous test confirms that Get requests are handled immediately if
// there are enough events. This one uses the same setup to confirm that
// Get requests are delayed if there aren't enough events.

broker := newQueue(
logp.NewLogger("testing"),
nil,
Settings{
Events: 1000,
MaxGetRequest: 500,
FlushTimeout: 10 * time.Second,
},
10)

producer := newProducer(broker, nil, nil, false)
rl := broker.runLoop
for i := 0; i < 100; i++ {
// Pair each publish call with an iteration of the run loop so we
// get a response.
go rl.runIteration()
_, ok := producer.Publish("some event")
require.True(t, ok, "Queue publish call must succeed")
}

// The queue now has 100 events, and a positive flush timeout, so a
// request for 101 events should block.
go func() {
// Run the Get asynchronously so the test itself doesn't block if
// there's a logical error.
_, _ = broker.Get(101)
}()
rl.runIteration()
assert.NotNil(t, rl.pendingGetRequest, "Queue should have a pending get request since the queue doesn't have the requested event count")
assert.Equal(t, 0, rl.consumedCount, "Queue should have a consumedCount of 0 since the Get request couldn't be completely filled")

// Now confirm that adding one more event unblocks the request
go func() {
_, _ = producer.Publish("some event")
}()
rl.runIteration()
assert.Nil(t, rl.pendingGetRequest, "Queue should have no pending get request since adding an event should unblock the previous one")
assert.Equal(t, 101, rl.consumedCount, "Queue should have a consumedCount of 101 after adding an event unblocked the pending get request")
}

0 comments on commit 5d14cf4

Please sign in to comment.