Skip to content

Commit

Permalink
Remove queue.Consumer (#31502)
Browse files Browse the repository at this point in the history
  • Loading branch information
faec committed May 12, 2022
1 parent 682045a commit 416f70b
Show file tree
Hide file tree
Showing 18 changed files with 132 additions and 350 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Removed deprecated disk spool from Beats. Use disk queue instead. {pull}28869[28869]
- Wildcard fields no longer have a default ignore_above setting of 1024. {issue}30096[30096] {pull}30668[30668]
- Remove `common.MapStr` and use `mapstr.M` from `github.com/elastic/elastic-agent-libs` instead. {pull}31420[31420]
- Remove `queue.Consumer`. Queues can now be read via a `Get` call directly on the queue object. {pull}31502[31502]

==== Bugfixes

Expand Down
12 changes: 5 additions & 7 deletions libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,15 @@ func TestClient(t *testing.T) {
},
}

if testing.Verbose() {
logp.TestingSetup()
}
err := logp.TestingSetup()
assert.Nil(t, err)

for name, test := range cases {
t.Run(name, func(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)

pipeline := makePipeline(Settings{}, makeBlockingQueue())
pipeline := makePipeline(Settings{}, makeTestQueue())
defer pipeline.Close()

var ctx context.Context
Expand Down Expand Up @@ -140,9 +139,8 @@ func TestClientWaitClose(t *testing.T) {

return p
}
if testing.Verbose() {
logp.TestingSetup()
}
err := logp.TestingSetup()
assert.Nil(t, err)

q := memqueue.NewQueue(logp.L(), memqueue.Settings{Events: 1})
pipeline := makePipeline(Settings{}, q)
Expand Down
25 changes: 2 additions & 23 deletions libbeat/publisher/pipeline/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,8 @@ func (c *eventConsumer) run() {
log.Debug("start pipeline event consumer")

// Create a queueReader to run our queue fetches in the background
c.wg.Add(1)
queueReader := makeQueueReader()
go func() {
defer c.wg.Done()
queueReader.run(log)
}()

Expand All @@ -118,10 +116,6 @@ func (c *eventConsumer) run() {
// The output channel (and associated parameters) that will receive
// the batches we're loading.
target consumerTarget

// The queue.Consumer we get the raw batches from. Reset whenever
// the target changes.
consumer queue.Consumer = c.queue.Consumer()
)

outerLoop:
Expand All @@ -130,7 +124,7 @@ outerLoop:
if queueBatch == nil && !pendingRead {
pendingRead = true
queueReader.req <- queueReaderRequest{
consumer: consumer,
queue: c.queue,
retryer: c,
batchSize: target.batchSize,
timeToLive: target.timeToLive,
Expand Down Expand Up @@ -175,11 +169,10 @@ outerLoop:
pendingRead = false

case req := <-c.retryChan:
alive := true
if req.decreaseTTL {
countFailed := len(req.batch.Events())

alive = req.batch.reduceTTL()
alive := req.batch.reduceTTL()

countDropped := countFailed - len(req.batch.Events())
c.observer.eventsDropped(countDropped)
Expand All @@ -197,22 +190,8 @@ outerLoop:
}
}

// Close the queue.Consumer, otherwise queueReader can get blocked
// waiting on a read.
consumer.Close()

// Close the queueReader request channel so it knows to shutdown.
close(queueReader.req)

// If there's an outstanding request, we need to read the response
// to unblock it, but we won't pass on the value.
if pendingRead {
batch := <-queueReader.resp
if batch != nil {
// Inform any listeners that we couldn't deliver this batch.
batch.Drop()
}
}
}

func (c *eventConsumer) setTarget(target consumerTarget) {
Expand Down
16 changes: 9 additions & 7 deletions libbeat/publisher/pipeline/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,6 @@ func loadOutput(
monitors Monitors,
makeOutput OutputFactory,
) (outputs.Group, error) {
log := monitors.Logger
if log == nil {
log = logp.L()
}

if publishDisabled {
return outputs.Group{}, nil
}
Expand All @@ -138,7 +133,11 @@ func loadOutput(
if monitors.Metrics != nil {
metrics = monitors.Metrics.GetRegistry("output")
if metrics != nil {
metrics.Clear()
err := metrics.Clear()
if err != nil {
return outputs.Group{}, err
}

} else {
metrics = monitors.Metrics.NewRegistry("output")
}
Expand All @@ -156,7 +155,10 @@ func loadOutput(
if monitors.Telemetry != nil {
telemetry := monitors.Telemetry.GetRegistry("output")
if telemetry != nil {
telemetry.Clear()
err := telemetry.Clear()
if err != nil {
return outputs.Group{}, err
}
} else {
telemetry = monitors.Telemetry.NewRegistry("output")
}
Expand Down
5 changes: 2 additions & 3 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ func (p *Pipeline) Close() error {

// Note: active clients are not closed / disconnected.

// close output before shutting down queue
// Closing the queue stops ACKs from propagating, so we close the output first
// to give it a chance to wait for any outstanding events to be acknowledged.
p.output.Close()

// shutdown queue
err := p.queue.Close()
if err != nil {
Expand All @@ -211,7 +211,6 @@ func (p *Pipeline) Close() error {
if p.sigNewClient != nil {
close(p.sigNewClient)
}

return nil
}

Expand Down
84 changes: 10 additions & 74 deletions libbeat/publisher/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,14 @@ type testQueue struct {
close func() error
bufferConfig func() queue.BufferConfig
producer func(queue.ProducerConfig) queue.Producer
consumer func() queue.Consumer
get func(sz int) (queue.Batch, error)
}

type testProducer struct {
publish func(try bool, event publisher.Event) bool
cancel func() int
}

type testConsumer struct {
get func(sz int) (queue.Batch, error)
close func() error
}

func (q *testQueue) Metrics() (queue.Metrics, error) {
return queue.Metrics{}, nil
}
Expand All @@ -67,11 +62,11 @@ func (q *testQueue) Producer(cfg queue.ProducerConfig) queue.Producer {
return nil
}

func (q *testQueue) Consumer() queue.Consumer {
if q.consumer != nil {
return q.consumer()
func (q *testQueue) Get(sz int) (queue.Batch, error) {
if q.get != nil {
return q.get(sz)
}
return nil
return nil, nil
}

func (p *testProducer) Publish(event publisher.Event) bool {
Expand All @@ -95,39 +90,14 @@ func (p *testProducer) Cancel() int {
return 0
}

func (p *testConsumer) Get(sz int) (queue.Batch, error) {
if p.get != nil {
return p.get(sz)
}
return nil, nil
}

func (p *testConsumer) Close() error {
if p.close() != nil {
return p.close()
}
return nil
}

func makeBlockingQueue() queue.Queue {
return makeTestQueue(emptyConsumer, blockingProducer)
}

func makeTestQueue(
makeConsumer func() queue.Consumer,
makeProducer func(queue.ProducerConfig) queue.Producer,
) queue.Queue {
func makeTestQueue() queue.Queue {
var mux sync.Mutex
var wg sync.WaitGroup
consumers := map[*testConsumer]struct{}{}
producers := map[queue.Producer]struct{}{}

return &testQueue{
close: func() error {
mux.Lock()
for consumer := range consumers {
consumer.Close()
}
for producer := range producers {
producer.Cancel()
}
Expand All @@ -136,34 +106,14 @@ func makeTestQueue(
wg.Wait()
return nil
},

consumer: func() queue.Consumer {
var consumer *testConsumer
c := makeConsumer()
consumer = &testConsumer{
get: func(sz int) (queue.Batch, error) { return c.Get(sz) },
close: func() error {
err := c.Close()

mux.Lock()
defer mux.Unlock()
delete(consumers, consumer)
wg.Done()

return err
},
}

mux.Lock()
defer mux.Unlock()
consumers[consumer] = struct{}{}
wg.Add(1)
return consumer
get: func(count int) (queue.Batch, error) {
//<-done
return nil, nil
},

producer: func(cfg queue.ProducerConfig) queue.Producer {
var producer *testProducer
p := makeProducer(cfg)
p := blockingProducer(cfg)
producer = &testProducer{
publish: func(try bool, event publisher.Event) bool {
if try {
Expand Down Expand Up @@ -192,20 +142,6 @@ func makeTestQueue(
}
}

func emptyConsumer() queue.Consumer {
done := make(chan struct{})
return &testConsumer{
get: func(sz int) (queue.Batch, error) {
<-done
return nil, nil
},
close: func() error {
close(done)
return nil
},
}
}

func blockingProducer(_ queue.ProducerConfig) queue.Producer {
sig := make(chan struct{})
waiting := atomic.MakeInt(0)
Expand Down
17 changes: 13 additions & 4 deletions libbeat/publisher/pipeline/queue_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package pipeline

import (
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

// queueReader is a standalone stateless helper goroutine to dispatch
Expand All @@ -30,7 +31,7 @@ type queueReader struct {
}

type queueReaderRequest struct {
consumer queue.Consumer
queue queue.Queue
retryer retryer
batchSize int
timeToLive int
Expand All @@ -53,11 +54,19 @@ func (qr *queueReader) run(logger *logp.Logger) {
logger.Debug("pipeline event consumer queue reader: stop")
return
}
queueBatch, _ := req.consumer.Get(req.batchSize)
queueBatch, _ := req.queue.Get(req.batchSize)
var batch *ttlBatch
if queueBatch != nil {
batch = newBatch(req.retryer, queueBatch, req.timeToLive)
}
qr.resp <- batch
select {
case qr.resp <- batch:
case <-qr.req:
// If the request channel unblocks before we've sent our response,
// it means we're shutting down and the pending request can be
// discarded.
logger.Debug("pipeline event consumer queue reader: stop")
return
}
}
}
5 changes: 0 additions & 5 deletions libbeat/publisher/pipeline/sync_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ type ISyncClient interface {

// SyncClient wraps an existing beat.Client and provide a sync interface.
type SyncClient struct {
// Chain callbacks already defined in the original ClientConfig
ackCount func(int)
ackEvents func([]interface{})
ackLastEvent func(interface{})

client beat.Client
wg sync.WaitGroup
log *logp.Logger
Expand Down
20 changes: 7 additions & 13 deletions libbeat/publisher/pipeline/sync_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,8 @@ func (d *dummyPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error)

func TestSyncClient(t *testing.T) {
receiver := func(c *dummyClient, sc *SyncClient) {
select {
case i := <-c.Received:
sc.onACK(i)
return
}
i := <-c.Received
sc.onACK(i)
}

t.Run("Publish", func(t *testing.T) {
Expand Down Expand Up @@ -109,7 +106,7 @@ func TestSyncClient(t *testing.T) {
sc.Wait()
})

t.Run("PublishAll multiple independant ACKs", func(t *testing.T) {
t.Run("PublishAll multiple independent ACKs", func(t *testing.T) {
c := newDummyClient()

pipeline := newDummyPipeline(c)
Expand All @@ -120,13 +117,10 @@ func TestSyncClient(t *testing.T) {
defer sc.Close()

go func(c *dummyClient, sc *SyncClient) {
select {
case <-c.Received:
// simulate multiple acks
sc.onACK(5)
sc.onACK(5)
return
}
<-c.Received
// simulate multiple acks
sc.onACK(5)
sc.onACK(5)
}(c, sc)

err = sc.PublishAll(make([]beat.Event, 10))
Expand Down
Loading

0 comments on commit 416f70b

Please sign in to comment.