From 416f70bac869b70f45cb0ad8e85de70bfafd3b8c Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 12 May 2022 11:51:57 -0400 Subject: [PATCH] Remove queue.Consumer (#31502) --- CHANGELOG-developer.next.asciidoc | 1 + libbeat/publisher/pipeline/client_test.go | 12 +-- libbeat/publisher/pipeline/consumer.go | 25 +---- libbeat/publisher/pipeline/module.go | 16 ++-- libbeat/publisher/pipeline/pipeline.go | 5 +- libbeat/publisher/pipeline/pipeline_test.go | 84 ++--------------- libbeat/publisher/pipeline/queue_reader.go | 17 +++- libbeat/publisher/pipeline/sync_client.go | 5 - .../publisher/pipeline/sync_client_test.go | 20 ++-- libbeat/publisher/queue/diskqueue/consumer.go | 43 ++------- libbeat/publisher/queue/diskqueue/queue.go | 4 - .../publisher/queue/diskqueue/state_file.go | 2 +- libbeat/publisher/queue/memqueue/broker.go | 40 +++++++- libbeat/publisher/queue/memqueue/consume.go | 91 ------------------- libbeat/publisher/queue/queue.go | 20 +--- libbeat/publisher/queue/queuetest/log.go | 17 ++-- .../queue/queuetest/producer_cancel.go | 8 +- .../publisher/queue/queuetest/queuetest.go | 72 ++++----------- 18 files changed, 132 insertions(+), 350 deletions(-) delete mode 100644 libbeat/publisher/queue/memqueue/consume.go diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index dfcfbc05746..2438268c106 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -61,6 +61,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Removed deprecated disk spool from Beats. Use disk queue instead. {pull}28869[28869] - Wildcard fields no longer have a default ignore_above setting of 1024. {issue}30096[30096] {pull}30668[30668] - Remove `common.MapStr` and use `mapstr.M` from `github.com/elastic/elastic-agent-libs` instead. {pull}31420[31420] +- Remove `queue.Consumer`. Queues can now be read via a `Get` call directly on the queue object. {pull}31502[31502] ==== Bugfixes diff --git a/libbeat/publisher/pipeline/client_test.go b/libbeat/publisher/pipeline/client_test.go index 4d316ee42a5..a0ea83a890d 100644 --- a/libbeat/publisher/pipeline/client_test.go +++ b/libbeat/publisher/pipeline/client_test.go @@ -83,16 +83,15 @@ func TestClient(t *testing.T) { }, } - if testing.Verbose() { - logp.TestingSetup() - } + err := logp.TestingSetup() + assert.Nil(t, err) for name, test := range cases { t.Run(name, func(t *testing.T) { routinesChecker := resources.NewGoroutinesChecker() defer routinesChecker.Check(t) - pipeline := makePipeline(Settings{}, makeBlockingQueue()) + pipeline := makePipeline(Settings{}, makeTestQueue()) defer pipeline.Close() var ctx context.Context @@ -140,9 +139,8 @@ func TestClientWaitClose(t *testing.T) { return p } - if testing.Verbose() { - logp.TestingSetup() - } + err := logp.TestingSetup() + assert.Nil(t, err) q := memqueue.NewQueue(logp.L(), memqueue.Settings{Events: 1}) pipeline := makePipeline(Settings{}, q) diff --git a/libbeat/publisher/pipeline/consumer.go b/libbeat/publisher/pipeline/consumer.go index 6d3b97d8a58..7501844d5b8 100644 --- a/libbeat/publisher/pipeline/consumer.go +++ b/libbeat/publisher/pipeline/consumer.go @@ -98,10 +98,8 @@ func (c *eventConsumer) run() { log.Debug("start pipeline event consumer") // Create a queueReader to run our queue fetches in the background - c.wg.Add(1) queueReader := makeQueueReader() go func() { - defer c.wg.Done() queueReader.run(log) }() @@ -118,10 +116,6 @@ func (c *eventConsumer) run() { // The output channel (and associated parameters) that will receive // the batches we're loading. target consumerTarget - - // The queue.Consumer we get the raw batches from. Reset whenever - // the target changes. - consumer queue.Consumer = c.queue.Consumer() ) outerLoop: @@ -130,7 +124,7 @@ outerLoop: if queueBatch == nil && !pendingRead { pendingRead = true queueReader.req <- queueReaderRequest{ - consumer: consumer, + queue: c.queue, retryer: c, batchSize: target.batchSize, timeToLive: target.timeToLive, @@ -175,11 +169,10 @@ outerLoop: pendingRead = false case req := <-c.retryChan: - alive := true if req.decreaseTTL { countFailed := len(req.batch.Events()) - alive = req.batch.reduceTTL() + alive := req.batch.reduceTTL() countDropped := countFailed - len(req.batch.Events()) c.observer.eventsDropped(countDropped) @@ -197,22 +190,8 @@ outerLoop: } } - // Close the queue.Consumer, otherwise queueReader can get blocked - // waiting on a read. - consumer.Close() - // Close the queueReader request channel so it knows to shutdown. close(queueReader.req) - - // If there's an outstanding request, we need to read the response - // to unblock it, but we won't pass on the value. - if pendingRead { - batch := <-queueReader.resp - if batch != nil { - // Inform any listeners that we couldn't deliver this batch. - batch.Drop() - } - } } func (c *eventConsumer) setTarget(target consumerTarget) { diff --git a/libbeat/publisher/pipeline/module.go b/libbeat/publisher/pipeline/module.go index f4ad28c8e1a..7f08e85e2af 100644 --- a/libbeat/publisher/pipeline/module.go +++ b/libbeat/publisher/pipeline/module.go @@ -118,11 +118,6 @@ func loadOutput( monitors Monitors, makeOutput OutputFactory, ) (outputs.Group, error) { - log := monitors.Logger - if log == nil { - log = logp.L() - } - if publishDisabled { return outputs.Group{}, nil } @@ -138,7 +133,11 @@ func loadOutput( if monitors.Metrics != nil { metrics = monitors.Metrics.GetRegistry("output") if metrics != nil { - metrics.Clear() + err := metrics.Clear() + if err != nil { + return outputs.Group{}, err + } + } else { metrics = monitors.Metrics.NewRegistry("output") } @@ -156,7 +155,10 @@ func loadOutput( if monitors.Telemetry != nil { telemetry := monitors.Telemetry.GetRegistry("output") if telemetry != nil { - telemetry.Clear() + err := telemetry.Clear() + if err != nil { + return outputs.Group{}, err + } } else { telemetry = monitors.Telemetry.NewRegistry("output") } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index a952cba6157..2d3799c7277 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -198,9 +198,9 @@ func (p *Pipeline) Close() error { // Note: active clients are not closed / disconnected. - // close output before shutting down queue + // Closing the queue stops ACKs from propagating, so we close the output first + // to give it a chance to wait for any outstanding events to be acknowledged. p.output.Close() - // shutdown queue err := p.queue.Close() if err != nil { @@ -211,7 +211,6 @@ func (p *Pipeline) Close() error { if p.sigNewClient != nil { close(p.sigNewClient) } - return nil } diff --git a/libbeat/publisher/pipeline/pipeline_test.go b/libbeat/publisher/pipeline/pipeline_test.go index 42a0e1762bd..4efff03070a 100644 --- a/libbeat/publisher/pipeline/pipeline_test.go +++ b/libbeat/publisher/pipeline/pipeline_test.go @@ -29,7 +29,7 @@ type testQueue struct { close func() error bufferConfig func() queue.BufferConfig producer func(queue.ProducerConfig) queue.Producer - consumer func() queue.Consumer + get func(sz int) (queue.Batch, error) } type testProducer struct { @@ -37,11 +37,6 @@ type testProducer struct { cancel func() int } -type testConsumer struct { - get func(sz int) (queue.Batch, error) - close func() error -} - func (q *testQueue) Metrics() (queue.Metrics, error) { return queue.Metrics{}, nil } @@ -67,11 +62,11 @@ func (q *testQueue) Producer(cfg queue.ProducerConfig) queue.Producer { return nil } -func (q *testQueue) Consumer() queue.Consumer { - if q.consumer != nil { - return q.consumer() +func (q *testQueue) Get(sz int) (queue.Batch, error) { + if q.get != nil { + return q.get(sz) } - return nil + return nil, nil } func (p *testProducer) Publish(event publisher.Event) bool { @@ -95,39 +90,14 @@ func (p *testProducer) Cancel() int { return 0 } -func (p *testConsumer) Get(sz int) (queue.Batch, error) { - if p.get != nil { - return p.get(sz) - } - return nil, nil -} - -func (p *testConsumer) Close() error { - if p.close() != nil { - return p.close() - } - return nil -} - -func makeBlockingQueue() queue.Queue { - return makeTestQueue(emptyConsumer, blockingProducer) -} - -func makeTestQueue( - makeConsumer func() queue.Consumer, - makeProducer func(queue.ProducerConfig) queue.Producer, -) queue.Queue { +func makeTestQueue() queue.Queue { var mux sync.Mutex var wg sync.WaitGroup - consumers := map[*testConsumer]struct{}{} producers := map[queue.Producer]struct{}{} return &testQueue{ close: func() error { mux.Lock() - for consumer := range consumers { - consumer.Close() - } for producer := range producers { producer.Cancel() } @@ -136,34 +106,14 @@ func makeTestQueue( wg.Wait() return nil }, - - consumer: func() queue.Consumer { - var consumer *testConsumer - c := makeConsumer() - consumer = &testConsumer{ - get: func(sz int) (queue.Batch, error) { return c.Get(sz) }, - close: func() error { - err := c.Close() - - mux.Lock() - defer mux.Unlock() - delete(consumers, consumer) - wg.Done() - - return err - }, - } - - mux.Lock() - defer mux.Unlock() - consumers[consumer] = struct{}{} - wg.Add(1) - return consumer + get: func(count int) (queue.Batch, error) { + //<-done + return nil, nil }, producer: func(cfg queue.ProducerConfig) queue.Producer { var producer *testProducer - p := makeProducer(cfg) + p := blockingProducer(cfg) producer = &testProducer{ publish: func(try bool, event publisher.Event) bool { if try { @@ -192,20 +142,6 @@ func makeTestQueue( } } -func emptyConsumer() queue.Consumer { - done := make(chan struct{}) - return &testConsumer{ - get: func(sz int) (queue.Batch, error) { - <-done - return nil, nil - }, - close: func() error { - close(done) - return nil - }, - } -} - func blockingProducer(_ queue.ProducerConfig) queue.Producer { sig := make(chan struct{}) waiting := atomic.MakeInt(0) diff --git a/libbeat/publisher/pipeline/queue_reader.go b/libbeat/publisher/pipeline/queue_reader.go index c4d4969969d..fa68b83739c 100644 --- a/libbeat/publisher/pipeline/queue_reader.go +++ b/libbeat/publisher/pipeline/queue_reader.go @@ -18,8 +18,9 @@ package pipeline import ( - "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/logp" + + "github.com/elastic/beats/v7/libbeat/publisher/queue" ) // queueReader is a standalone stateless helper goroutine to dispatch @@ -30,7 +31,7 @@ type queueReader struct { } type queueReaderRequest struct { - consumer queue.Consumer + queue queue.Queue retryer retryer batchSize int timeToLive int @@ -53,11 +54,19 @@ func (qr *queueReader) run(logger *logp.Logger) { logger.Debug("pipeline event consumer queue reader: stop") return } - queueBatch, _ := req.consumer.Get(req.batchSize) + queueBatch, _ := req.queue.Get(req.batchSize) var batch *ttlBatch if queueBatch != nil { batch = newBatch(req.retryer, queueBatch, req.timeToLive) } - qr.resp <- batch + select { + case qr.resp <- batch: + case <-qr.req: + // If the request channel unblocks before we've sent our response, + // it means we're shutting down and the pending request can be + // discarded. + logger.Debug("pipeline event consumer queue reader: stop") + return + } } } diff --git a/libbeat/publisher/pipeline/sync_client.go b/libbeat/publisher/pipeline/sync_client.go index ea9a2ffce23..9dcf69ff155 100644 --- a/libbeat/publisher/pipeline/sync_client.go +++ b/libbeat/publisher/pipeline/sync_client.go @@ -45,11 +45,6 @@ type ISyncClient interface { // SyncClient wraps an existing beat.Client and provide a sync interface. type SyncClient struct { - // Chain callbacks already defined in the original ClientConfig - ackCount func(int) - ackEvents func([]interface{}) - ackLastEvent func(interface{}) - client beat.Client wg sync.WaitGroup log *logp.Logger diff --git a/libbeat/publisher/pipeline/sync_client_test.go b/libbeat/publisher/pipeline/sync_client_test.go index 69a42164e1c..09eac919bd5 100644 --- a/libbeat/publisher/pipeline/sync_client_test.go +++ b/libbeat/publisher/pipeline/sync_client_test.go @@ -64,11 +64,8 @@ func (d *dummyPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) func TestSyncClient(t *testing.T) { receiver := func(c *dummyClient, sc *SyncClient) { - select { - case i := <-c.Received: - sc.onACK(i) - return - } + i := <-c.Received + sc.onACK(i) } t.Run("Publish", func(t *testing.T) { @@ -109,7 +106,7 @@ func TestSyncClient(t *testing.T) { sc.Wait() }) - t.Run("PublishAll multiple independant ACKs", func(t *testing.T) { + t.Run("PublishAll multiple independent ACKs", func(t *testing.T) { c := newDummyClient() pipeline := newDummyPipeline(c) @@ -120,13 +117,10 @@ func TestSyncClient(t *testing.T) { defer sc.Close() go func(c *dummyClient, sc *SyncClient) { - select { - case <-c.Received: - // simulate multiple acks - sc.onACK(5) - sc.onACK(5) - return - } + <-c.Received + // simulate multiple acks + sc.onACK(5) + sc.onACK(5) }(c, sc) err = sc.PublishAll(make([]beat.Event, 10)) diff --git a/libbeat/publisher/queue/diskqueue/consumer.go b/libbeat/publisher/queue/diskqueue/consumer.go index 649df8717b6..5c316916559 100644 --- a/libbeat/publisher/queue/diskqueue/consumer.go +++ b/libbeat/publisher/queue/diskqueue/consumer.go @@ -20,43 +20,28 @@ package diskqueue import ( "fmt" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) -type diskQueueConsumer struct { - queue *diskQueue - closed atomic.Bool - done chan struct{} -} - type diskQueueBatch struct { queue *diskQueue frames []*readFrame } -// -// diskQueueConsumer implementation of the queue.Consumer interface -// - -func (consumer *diskQueueConsumer) Get(eventCount int) (queue.Batch, error) { +func (dq *diskQueue) Get(eventCount int) (queue.Batch, error) { // We can always eventually read at least one frame unless the queue or the // consumer is closed. - var frames []*readFrame - select { - case frame, ok := <-consumer.queue.readerLoop.output: - if !ok { - return nil, fmt.Errorf("tried to read from a closed disk queue") - } - frames = []*readFrame{frame} - case <-consumer.done: - return nil, fmt.Errorf("tried to read from a closed disk queue consumer") + frame, ok := <-dq.readerLoop.output + if !ok { + return nil, fmt.Errorf("tried to read from a closed disk queue") } + frames := []*readFrame{frame} + eventLoop: for eventCount <= 0 || len(frames) < eventCount { select { - case frame, ok := <-consumer.queue.readerLoop.output: + case frame, ok := <-dq.readerLoop.output: if !ok { // The queue was closed while we were reading it, just send back // what we have so far. @@ -74,7 +59,7 @@ eventLoop: // written to readerLoop.output may have been buffered before the // queue was closed, and we may be reading its leftovers afterwards. // We could try to detect this case here by checking the - // consumer.queue.done channel, and return nothing if it's been closed. + // queue.done channel, and return nothing if it's been closed. // But this gives rise to another race: maybe the queue was // closed _after_ we read those frames, and we _ought_ to return them // to the reader. The queue interface doesn't specify the proper @@ -85,23 +70,15 @@ eventLoop: // "read," so we lose no consistency by returning them. If someone closes // the queue while we are draining the channel, nothing changes functionally // except that any ACKs after that point will be ignored. A well-behaved - // Beats shutdown will always ACK / close its consumers before closing the + // Beats shutdown will always ACK its batches before closing the // queue itself, so we expect this corner case not to arise in practice, but // if it does it is innocuous. return &diskQueueBatch{ - queue: consumer.queue, + queue: dq, frames: frames, }, nil } -func (consumer *diskQueueConsumer) Close() error { - if consumer.closed.Swap(true) { - return fmt.Errorf("already closed") - } - close(consumer.done) - return nil -} - // // diskQueueBatch implementation of the queue.Batch interface // diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go index 3e9fd5ca2a1..df6519edd01 100644 --- a/libbeat/publisher/queue/diskqueue/queue.go +++ b/libbeat/publisher/queue/diskqueue/queue.go @@ -275,10 +275,6 @@ func (dq *diskQueue) Producer(cfg queue.ProducerConfig) queue.Producer { } } -func (dq *diskQueue) Consumer() queue.Consumer { - return &diskQueueConsumer{queue: dq, done: make(chan struct{})} -} - func (dq *diskQueue) Metrics() (queue.Metrics, error) { return queue.Metrics{}, queue.ErrMetricsNotImplemented } diff --git a/libbeat/publisher/queue/diskqueue/state_file.go b/libbeat/publisher/queue/diskqueue/state_file.go index 9247598eaeb..edb238f96c4 100644 --- a/libbeat/publisher/queue/diskqueue/state_file.go +++ b/libbeat/publisher/queue/diskqueue/state_file.go @@ -44,7 +44,7 @@ func queuePositionFromHandle( } if version > currentStateFileVersion { return queuePosition{}, - fmt.Errorf("Unsupported queue metadata version (%d)", version) + fmt.Errorf("unsupported queue metadata version (%d)", version) } position := queuePosition{} diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 95a45d677d6..3a99664e827 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -18,10 +18,12 @@ package memqueue import ( + "io" "sync" "time" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" c "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -87,6 +89,12 @@ type Settings struct { InputQueueSize int } +type batch struct { + queue *broker + events []publisher.Event + ackChan chan batchAckMsg +} + // batchACKState stores the metadata associated with a batch of events sent to // a consumer. When the consumer ACKs that batch, a batchAckMsg is sent on // ackChan and received by @@ -223,8 +231,28 @@ func (b *broker) Producer(cfg queue.ProducerConfig) queue.Producer { return newProducer(b, cfg.ACK, cfg.OnDrop, cfg.DropOnCancel) } -func (b *broker) Consumer() queue.Consumer { - return newConsumer(b) +func (b *broker) Get(count int) (queue.Batch, error) { + responseChan := make(chan getResponse, 1) + select { + case <-b.done: + return nil, io.EOF + case b.getChan <- getRequest{ + entryCount: count, responseChan: responseChan}: + } + + // if request has been sent, we have to wait for a response + resp := <-responseChan + events := make([]publisher.Event, 0, len(resp.entries)) + for _, entry := range resp.entries { + if event, ok := entry.event.(*publisher.Event); ok { + events = append(events, *event) + } + } + return &batch{ + queue: b, + events: events, + ackChan: resp.ackChan, + }, nil } func (b *broker) Metrics() (queue.Metrics, error) { @@ -333,3 +361,11 @@ func AdjustInputQueueSize(requested, mainQueueSize int) (actual int) { } return actual } + +func (b *batch) Events() []publisher.Event { + return b.events +} + +func (b *batch) ACK() { + b.ackChan <- batchAckMsg{} +} diff --git a/libbeat/publisher/queue/memqueue/consume.go b/libbeat/publisher/queue/memqueue/consume.go deleted file mode 100644 index 03018ad4fc5..00000000000 --- a/libbeat/publisher/queue/memqueue/consume.go +++ /dev/null @@ -1,91 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package memqueue - -import ( - "errors" - "io" - - "github.com/elastic/beats/v7/libbeat/common/atomic" - "github.com/elastic/beats/v7/libbeat/publisher" - "github.com/elastic/beats/v7/libbeat/publisher/queue" -) - -type consumer struct { - broker *broker - resp chan getResponse - - done chan struct{} - closed atomic.Bool -} - -type batch struct { - consumer *consumer - events []publisher.Event - ackChan chan batchAckMsg -} - -func newConsumer(b *broker) *consumer { - return &consumer{ - broker: b, - resp: make(chan getResponse), - done: make(chan struct{}), - } -} - -func (c *consumer) Get(sz int) (queue.Batch, error) { - if c.closed.Load() { - return nil, io.EOF - } - - select { - case c.broker.getChan <- getRequest{entryCount: sz, responseChan: c.resp}: - case <-c.done: - return nil, io.EOF - } - - // if request has been send, we do have to wait for a response - resp := <-c.resp - events := make([]publisher.Event, 0, len(resp.entries)) - for _, entry := range resp.entries { - if event, ok := entry.event.(*publisher.Event); ok { - events = append(events, *event) - } - } - return &batch{ - consumer: c, - events: events, - ackChan: resp.ackChan, - }, nil -} - -func (c *consumer) Close() error { - if c.closed.Swap(true) { - return errors.New("already closed") - } - close(c.done) - return nil -} - -func (b *batch) Events() []publisher.Event { - return b.events -} - -func (b *batch) ACK() { - b.ackChan <- batchAckMsg{} -} diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index ecea33400af..14291c8ea4a 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -19,7 +19,6 @@ package queue import ( "errors" - "io" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -65,12 +64,15 @@ var ErrMetricsNotImplemented = errors.New("Queue metrics not implemented") // consumer or flush to some other intermediate storage), it will send an ACK signal // with the number of ACKed events to the Producer (ACK happens in batches). type Queue interface { - io.Closer + Close() error BufferConfig() BufferConfig Producer(cfg ProducerConfig) Producer - Consumer() Consumer + + // Get retrieves a batch of up to eventCount events. If eventCount <= 0, + // there is no bound on the number of returned events. + Get(eventCount int) (Batch, error) Metrics() (Metrics, error) } @@ -127,18 +129,6 @@ type Producer interface { Cancel() int } -// Consumer is an interface to be used by the pipeline output workers, -// used to read events from the head of the queue. -type Consumer interface { - // Get retrieves a batch of up to eventCount events. If eventCount <= 0, - // there is no bound on the number of returned events. - Get(eventCount int) (Batch, error) - - // Close closes this Consumer. Returns an error if the Consumer is - // already closed. - Close() error -} - // Batch of events to be returned to Consumers. The `ACK` method will send the // ACK signal to the queue. type Batch interface { diff --git a/libbeat/publisher/queue/queuetest/log.go b/libbeat/publisher/queue/queuetest/log.go index 49812d23aec..04b1a118564 100644 --- a/libbeat/publisher/queue/queuetest/log.go +++ b/libbeat/publisher/queue/queuetest/log.go @@ -25,6 +25,8 @@ import ( "sync" "testing" + "gotest.tools/assert" + "github.com/elastic/elastic-agent-libs/logp" ) @@ -42,15 +44,6 @@ func init() { flag.BoolVar(&printLog, "debug-print", false, "print test log messages right away") } -type testLogWriter struct { - t *testing.T -} - -func (w *testLogWriter) Write(p []byte) (int, error) { - w.t.Log(string(p)) - return len(p), nil -} - func withOptLogOutput(capture bool, fn func(*testing.T)) func(*testing.T) { if !capture { return fn @@ -95,7 +88,8 @@ func withOptLogOutput(capture bool, fn func(*testing.T)) func(*testing.T) { if debug { level = logp.DebugLevel } - logp.DevelopmentSetup(logp.WithLevel(level)) + err = logp.DevelopmentSetup(logp.WithLevel(level)) + assert.NilError(t, err) fn(t) } } @@ -148,14 +142,17 @@ func (l *TestLogger) Errf(format string, v ...interface{}) { func print(vs []interface{}) { if printLog { + //nolint: forbidigo // Printing is ok during specialized tests. fmt.Println(vs...) } } func printf(format string, vs []interface{}) { if printLog { + //nolint: forbidigo // Printing is ok during specialized tests. fmt.Printf(format, vs...) if format[len(format)-1] != '\n' { + //nolint: forbidigo // Printing is ok during specialized tests. fmt.Println("") } } diff --git a/libbeat/publisher/queue/queuetest/producer_cancel.go b/libbeat/publisher/queue/queuetest/producer_cancel.go index 5cfa2222a74..64310adf66f 100644 --- a/libbeat/publisher/queue/queuetest/producer_cancel.go +++ b/libbeat/publisher/queue/queuetest/producer_cancel.go @@ -72,12 +72,11 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) { })) } - // consumer all events - consumer := b.Consumer() + // consume all events total := N2 - N1 events := make([]publisher.Event, 0, total) for len(events) < total { - batch, err := consumer.Get(-1) // collect all events + batch, err := b.Get(-1) // collect all events if err != nil { panic(err) } @@ -93,7 +92,8 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) { } for i, event := range events { - value := event.Content.Fields["value"].(int) + value, ok := event.Content.Fields["value"].(int) + assert.True(t, ok, "event.value should be an int") assert.Equal(t, i+N1, value) } }) diff --git a/libbeat/publisher/queue/queuetest/queuetest.go b/libbeat/publisher/queue/queuetest/queuetest.go index e56adfcfaad..29b1da7a7d2 100644 --- a/libbeat/publisher/queue/queuetest/queuetest.go +++ b/libbeat/publisher/queue/queuetest/queuetest.go @@ -30,15 +30,17 @@ type QueueFactory func(t *testing.T) queue.Queue type workerFactory func(*sync.WaitGroup, interface{}, *TestLogger, queue.Queue) func() +type testCase struct { + name string + producers, consumers workerFactory +} + func TestSingleProducerConsumer( t *testing.T, events, batchSize int, - factory QueueFactory, + queueFactory QueueFactory, ) { - tests := []struct { - name string - producers, consumers workerFactory - }{ + tests := []testCase{ { "single producer, consumer without ack, complete batches", makeProducer(events, false, countEvent), @@ -62,44 +64,15 @@ func TestSingleProducerConsumer( }, } - for _, test := range tests { - verbose := testing.Verbose() - t.Run(test.name, withOptLogOutput(verbose, func(t *testing.T) { - if !verbose { - t.Parallel() - } - - log := NewTestLogger(t) - log.Debug("run test: ", test.name) - - queue := factory(t) - defer func() { - err := queue.Close() - if err != nil { - t.Error(err) - } - }() - - var wg sync.WaitGroup - - go test.producers(&wg, nil, log, queue)() - go test.consumers(&wg, nil, log, queue)() - - wg.Wait() - })) - } - + runTestCases(t, tests, queueFactory) } func TestMultiProducerConsumer( t *testing.T, events, batchSize int, - factory QueueFactory, + queueFactory QueueFactory, ) { - tests := []struct { - name string - producers, consumers workerFactory - }{ + tests := []testCase{ { "2 producers, 1 consumer, without ack, complete batches", multiple( @@ -210,6 +183,10 @@ func TestMultiProducerConsumer( }, } + runTestCases(t, tests, queueFactory) +} + +func runTestCases(t *testing.T, tests []testCase, queueFactory QueueFactory) { for _, test := range tests { verbose := testing.Verbose() t.Run(test.name, withOptLogOutput(verbose, func(t *testing.T) { @@ -220,7 +197,7 @@ func TestMultiProducerConsumer( log := NewTestLogger(t) log.Debug("run test: ", test.name) - queue := factory(t) + queue := queueFactory(t) defer func() { err := queue.Close() if err != nil { @@ -312,23 +289,15 @@ func multiConsumer(numConsumers, maxEvents, batchSize int) workerFactory { var events sync.WaitGroup - consumers := make([]queue.Consumer, numConsumers) - for i := range consumers { - consumers[i] = b.Consumer() - } - log.Debugf("consumer: wait for %v events\n", maxEvents) events.Add(maxEvents) - for _, c := range consumers { - c := c + for i := 0; i < numConsumers; i++ { + b := b - wg.Add(1) go func() { - defer wg.Done() - for { - batch, err := c.Get(batchSize) + batch, err := b.Get(batchSize) if err != nil { return } @@ -345,11 +314,6 @@ func multiConsumer(numConsumers, maxEvents, batchSize int) workerFactory { } events.Wait() - - // disconnect consumers - for _, c := range consumers { - c.Close() - } } } }