diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9e27962b7e5..81da0dfe107 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -15,9 +15,10 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] platform, and when viewed from a metadata API standpoint, it is impossible to differentiate it from OpenStack. If you know that your deployments run on Huawei Cloud exclusively, and you wish to have `cloud.provider` value as `huawei`, you can achieve this by overwriting the value using an `add_fields` processor. {pull}35184[35184] - - In managed mode, Beats running under Elastic Agent will report the package +- In managed mode, Beats running under Elastic Agent will report the package version of Elastic Agent as their own version. This includes all additional fields added to events containing the Beats version. {pull}37553[37553] +- The behavior of `queue.mem.flush.min_events` has been simplified. It now serves as a simple maximum on the size of all event batches. There are no longer performance implications in its relationship to `bulk_max_size`. {pull}37795[37795] *Auditbeat* diff --git a/libbeat/docs/queueconfig.asciidoc b/libbeat/docs/queueconfig.asciidoc index 08ece0f752f..499ab9d4667 100644 --- a/libbeat/docs/queueconfig.asciidoc +++ b/libbeat/docs/queueconfig.asciidoc @@ -32,20 +32,32 @@ The memory queue waits for the output to acknowledge or drop events. If the queue is full, no new events can be inserted into the memory queue. Only after the signal from the output will the queue free up space for more events to be accepted. -The memory queue is controlled by the parameters `flush.min_events` and `flush.timeout`. If -`flush.timeout` is `0s` or `flush.min_events` is `0` or `1` then events can be sent by the output as -soon as they are available. If the output supports a `bulk_max_size` parameter it controls the -maximum batch size that can be sent. +The memory queue is controlled by the parameters `flush.min_events` and `flush.timeout`. +`flush.min_events` gives a limit on the number of events that can be included in a +single batch, and `flush.timeout` specifies how long the queue should wait to completely +fill an event request. If the output supports a `bulk_max_size` parameter, the maximum +batch size will be the smaller of `bulk_max_size` and `flush.min_events`. -If `flush.min_events` is greater than `1` and `flush.timeout` is greater than `0s`, events will only -be sent to the output when the queue contains at least `flush.min_events` events or the -`flush.timeout` period has expired. In this mode the maximum size batch that that can be sent by the -output is `flush.min_events`. If the output supports a `bulk_max_size` parameter, values of -`bulk_max_size` greater than `flush.min_events` have no effect. The value of `flush.min_events` -should be evenly divisible by `bulk_max_size` to avoid sending partial batches to the output. +`flush.min_events` is a legacy parameter, and new configurations should prefer to control +batch size with `bulk_max_size`. As of 8.13, there is never a performance advantage to +limiting batch size with `flush.min_events` instead of `bulk_max_size`. -This sample configuration forwards events to the output if 512 events are available or the oldest -available event has been waiting for 5s in the queue: +In synchronous mode, an event request is always filled as soon as events are available, +even if there are not enough events to fill the requested batch. This is useful when +latency must be minimized. To use synchronous mode, set `flush.timeout` to 0. + +For backwards compatibility, synchronous mode can also be activated by setting `flush.min_events` +to 0 or 1. In this case, batch size will be capped at 1/2 the queue capacity. + +In asynchronous mode, an event request will wait up to the specified timeout to try +and fill the requested batch completely. If the timeout expires, the queue returns a +partial batch with all available events. To use asynchronous mode, set `flush.timeout` +to a positive duration, e.g. `5s`. + +This sample configuration forwards events to the output when there are enough events +to fill the output's request (usually controlled by `bulk_max_size`, and limited to at +most 512 events by `flush.min_events`), or when events have been waiting for 5s without +filling the requested size: [source,yaml] ------------------------------------------------------------------------------ @@ -64,8 +76,7 @@ You can specify the following options in the `queue.mem` section of the +{beatna [[queue-mem-events-option]] ===== `events` -Number of events the queue can store. This value should be evenly divisible by `flush.min_events` to -avoid sending partial batches to the output. +Number of events the queue can store. The default value is 3200 events. @@ -73,11 +84,13 @@ The default value is 3200 events. [[queue-mem-flush-min-events-option]] ===== `flush.min_events` -Minimum number of events required for publishing. If this value is set to 0 or 1, events are -available to the output immediately. If this value is greater than 1 the output must wait for the -queue to accumulate this minimum number of events or for `flush.timeout` to expire before -publishing. When greater than `1` this value also defines the maximum possible batch that can be -sent by the output. +If greater than 1, specifies the maximum number of events per batch. In this case the +output must wait for the +queue to accumulate the requested number of events or for `flush.timeout` to expire before +publishing. + +If 0 or 1, sets the maximum number of events per batch to half the queue size, and sets +the queue to synchronous mode (equivalent to `flush.timeout` of 0). The default value is 1600. @@ -85,8 +98,7 @@ The default value is 1600. [[queue-mem-flush-timeout-option]] ===== `flush.timeout` -Maximum wait time for `flush.min_events` to be fulfilled. If set to 0s, events are available to the -output immediately. +Maximum wait time for event requests from the output to be fulfilled. If set to 0s, events are returned immediately. The default value is 10s. diff --git a/libbeat/publisher/pipeline/client_test.go b/libbeat/publisher/pipeline/client_test.go index a3f0c822b9e..015d8f70c9d 100644 --- a/libbeat/publisher/pipeline/client_test.go +++ b/libbeat/publisher/pipeline/client_test.go @@ -128,9 +128,9 @@ func TestClient(t *testing.T) { // a small in-memory queue with a very short flush interval q := memqueue.NewQueue(l, nil, memqueue.Settings{ - Events: 5, - FlushMinEvents: 1, - FlushTimeout: time.Millisecond, + Events: 5, + MaxGetRequest: 1, + FlushTimeout: time.Millisecond, }, 5) // model a processor that we're going to make produce errors after diff --git a/libbeat/publisher/queue/memqueue/ackloop.go b/libbeat/publisher/queue/memqueue/ackloop.go index f61439a6d50..1a964d8bb45 100644 --- a/libbeat/publisher/queue/memqueue/ackloop.go +++ b/libbeat/publisher/queue/memqueue/ackloop.go @@ -25,26 +25,28 @@ package memqueue type ackLoop struct { broker *broker - // A list of ACK channels given to queue consumers, + // A list of batches given to queue consumers, // used to maintain sequencing of event acknowledgements. - ackChans chanList + pendingBatches batchList +} - processACK func(chanList, int) +func newACKLoop(broker *broker) *ackLoop { + return &ackLoop{broker: broker} } func (l *ackLoop) run() { + b := l.broker for { - nextBatchChan := l.ackChans.nextBatchChannel() + nextBatchChan := l.pendingBatches.nextBatchChannel() select { - case <-l.broker.done: + case <-b.ctx.Done(): // The queue is shutting down. return - case chanList := <-l.broker.scheduledACKs: - // A new batch has been generated, add its ACK channel to the end of - // the pending list. - l.ackChans.concat(&chanList) + case chanList := <-b.consumedChan: + // New batches have been generated, add them to the pending list + l.pendingBatches.concat(&chanList) case <-nextBatchChan: // The oldest outstanding batch has been acknowledged, advance our @@ -57,11 +59,11 @@ func (l *ackLoop) run() { // handleBatchSig collects and handles a batch ACK/Cancel signal. handleBatchSig // is run by the ackLoop. func (l *ackLoop) handleBatchSig() int { - lst := l.collectAcked() + ackedBatches := l.collectAcked() count := 0 - for current := lst.front(); current != nil; current = current.next { - count += current.count + for batch := ackedBatches.front(); batch != nil; batch = batch.next { + count += batch.count } if count > 0 { @@ -70,11 +72,12 @@ func (l *ackLoop) handleBatchSig() int { } // report acks to waiting clients - l.processACK(lst, count) + l.processACK(ackedBatches, count) } - for !lst.empty() { - releaseACKChan(lst.pop()) + for !ackedBatches.empty() { + // Release finished batch structs into the shared memory pool + releaseBatch(ackedBatches.pop()) } // return final ACK to EventLoop, in order to clean up internal buffer @@ -84,23 +87,63 @@ func (l *ackLoop) handleBatchSig() int { return count } -func (l *ackLoop) collectAcked() chanList { - lst := chanList{} +func (l *ackLoop) collectAcked() batchList { + ackedBatches := batchList{} - acks := l.ackChans.pop() - lst.append(acks) + acks := l.pendingBatches.pop() + ackedBatches.append(acks) done := false - for !l.ackChans.empty() && !done { - acks := l.ackChans.front() + for !l.pendingBatches.empty() && !done { + acks := l.pendingBatches.front() select { case <-acks.doneChan: - lst.append(l.ackChans.pop()) + ackedBatches.append(l.pendingBatches.pop()) default: done = true } } - return lst + return ackedBatches +} + +// Called by ackLoop. This function exists to decouple the work of collecting +// and running producer callbacks from logical deletion of the events, so +// input callbacks can't block the queue by occupying the runLoop goroutine. +func (l *ackLoop) processACK(lst batchList, N int) { + ackCallbacks := []func(){} + // First we traverse the entries we're about to remove, collecting any callbacks + // we need to run. + lst.reverse() + for !lst.empty() { + batch := lst.pop() + + // Traverse entries from last to first, so we can acknowledge the most recent + // ones first and skip subsequent producer callbacks. + for i := batch.count - 1; i >= 0; i-- { + entry := batch.rawEntry(i) + if entry.producer == nil { + continue + } + + if entry.producerID <= entry.producer.state.lastACK { + // This index was already acknowledged on a previous iteration, skip. + entry.producer = nil + continue + } + producerState := entry.producer.state + count := int(entry.producerID - producerState.lastACK) + ackCallbacks = append(ackCallbacks, func() { producerState.cb(count) }) + entry.producer.state.lastACK = entry.producerID + entry.producer = nil + } + } + // Signal runLoop to delete the events + l.broker.deleteChan <- N + + // The events have been removed; notify their listeners. + for _, f := range ackCallbacks { + f() + } } diff --git a/libbeat/publisher/queue/memqueue/batchbuf.go b/libbeat/publisher/queue/memqueue/batchbuf.go deleted file mode 100644 index 87c3a1052f3..00000000000 --- a/libbeat/publisher/queue/memqueue/batchbuf.go +++ /dev/null @@ -1,53 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package memqueue - -type batchBuffer struct { - next *batchBuffer - flushed bool - entries []queueEntry -} - -func newBatchBuffer(sz int) *batchBuffer { - b := &batchBuffer{} - b.entries = make([]queueEntry, 0, sz) - return b -} - -func (b *batchBuffer) add(entry queueEntry) { - b.entries = append(b.entries, entry) -} - -func (b *batchBuffer) length() int { - return len(b.entries) -} - -func (b *batchBuffer) cancel(producer *ackProducer) int { - entries := b.entries[:0] - - removedCount := 0 - for _, entry := range b.entries { - if entry.producer == producer { - removedCount++ - continue - } - entries = append(entries, entry) - } - b.entries = entries - return removedCount -} diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index ac5b9dc6615..e1d0fd46c00 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -18,6 +18,7 @@ package memqueue import ( + "context" "io" "sync" "time" @@ -35,12 +36,23 @@ const ( maxInputQueueSizeRatio = 0.1 ) +// broker is the main implementation type for the memory queue. An active queue +// consists of two goroutines: runLoop, which handles all public API requests +// and owns the buffer state, and ackLoop, which listens for acknowledgments of +// consumed events and runs any appropriate completion handlers. type broker struct { - done chan struct{} + settings Settings + logger *logp.Logger - logger *logp.Logger + ctx context.Context + ctxCancel context.CancelFunc - bufSize int + // The ring buffer backing the queue. All buffer positions should be taken + // modulo the size of this array. + buf []queueEntry + + // wait group for queue workers (runLoop and ackLoop) + wg sync.WaitGroup /////////////////////////// // api channels @@ -55,35 +67,49 @@ type broker struct { // sent so far that have not yet reached a consumer. cancelChan chan producerCancelRequest + // Metrics() sends requests to metricChan to expose internal queue + // metrics to external callers. + metricChan chan metricsRequest + /////////////////////////// // internal channels - // When events are sent to consumers, the ACK channels for their batches - // are collected into chanLists and sent to scheduledACKs. - // These are then read by ackLoop and concatenated to its internal - // chanList of all outstanding ACK channels. - scheduledACKs chan chanList + // Batches sent to consumers are also collected and forwarded to ackLoop + // through this channel so ackLoop can monitor them for acknowledgments. + consumedChan chan batchList - // A callback that should be invoked when ACKs are processed. + // ackCallback is a configurable callback to invoke when ACKs are processed. // ackLoop calls this function when it advances the consumer ACK position. // Right now this forwards the notification to queueACKed() in // the pipeline observer, which updates the beats registry if needed. ackCallback func(eventCount int) - // This channel is used to request/return metrics where such metrics require insight into - // the actual eventloop itself. This seems like it might be overkill, but it seems that - // all communication between the broker and the eventloops - // happens via channels, so we're doing it this way. - metricChan chan metricsRequest + // When batches are acknowledged, ackLoop saves any metadata needed + // for producer callbacks and such, then notifies runLoop that it's + // safe to free these events and advance the queue by sending the + // acknowledged event count to this channel. + deleteChan chan int - // wait group for worker shutdown - wg sync.WaitGroup + /////////////////////////////// + // internal goroutine state + + // The goroutine that manages the queue's core run state + runLoop *runLoop + + // The goroutine that manages ack notifications and callbacks + ackLoop *ackLoop } type Settings struct { - Events int - FlushMinEvents int - FlushTimeout time.Duration + // The number of events the queue can hold. + Events int + + // The most events that will ever be returned from one Get request. + MaxGetRequest int + + // If positive, the amount of time the queue will wait to fill up + // a batch if a Get request asks for more events than we have. + FlushTimeout time.Duration } type queueEntry struct { @@ -95,24 +121,22 @@ type queueEntry struct { } type batch struct { - queue *broker - entries []queueEntry - doneChan chan batchDoneMsg -} + queue *broker + + // Next batch in the containing batchList + next *batch -// batchACKState stores the metadata associated with a batch of events sent to -// a consumer. When the consumer ACKs that batch, a batchAckMsg is sent on -// ackChan and received by -type batchACKState struct { - next *batchACKState - doneChan chan batchDoneMsg - start, count int // number of events waiting for ACK - entries []queueEntry + // Position and length of the events within the queue buffer + start, count int + + // batch.Done() sends to doneChan, where ackLoop reads it and handles + // acknowledgment / cleanup. + doneChan chan batchDoneMsg } -type chanList struct { - head *batchACKState - tail *batchACKState +type batchList struct { + head *batch + tail *batch } // FactoryForSettings is a simple wrapper around NewQueue so a concrete @@ -137,23 +161,46 @@ func NewQueue( settings Settings, inputQueueSize int, ) *broker { - var ( - sz = settings.Events - minEvents = settings.FlushMinEvents - flushTimeout = settings.FlushTimeout - ) + b := newQueue(logger, ackCallback, settings, inputQueueSize) - chanSize := AdjustInputQueueSize(inputQueueSize, sz) + // Start the queue workers + b.wg.Add(2) + go func() { + defer b.wg.Done() + b.runLoop.run() + }() + go func() { + defer b.wg.Done() + b.ackLoop.run() + }() - if minEvents < 1 { - minEvents = 1 - } - if minEvents > 1 && flushTimeout <= 0 { - minEvents = 1 - flushTimeout = 0 + return b +} + +// newQueue does most of the work of creating a queue from the given +// parameters, but doesn't start the runLoop or ackLoop workers. This +// lets us perform more granular / deterministic tests by controlling +// when the workers are active. +func newQueue( + logger *logp.Logger, + ackCallback func(eventCount int), + settings Settings, + inputQueueSize int, +) *broker { + chanSize := AdjustInputQueueSize(inputQueueSize, settings.Events) + + // Backwards compatibility: an old way to select synchronous queue + // behavior was to set "flush.min_events" to 0 or 1, in which case the + // timeout was disabled and the max get request was half the queue. + // (Otherwise, it would make sense to leave FlushTimeout unchanged here.) + if settings.MaxGetRequest <= 1 { + settings.FlushTimeout = 0 + settings.MaxGetRequest = (settings.Events + 1) / 2 } - if minEvents > sz { - minEvents = sz + + // Can't request more than the full queue + if settings.MaxGetRequest > settings.Events { + settings.MaxGetRequest = settings.Events } if logger == nil { @@ -161,52 +208,33 @@ func NewQueue( } b := &broker{ - done: make(chan struct{}), - logger: logger, + settings: settings, + logger: logger, + + buf: make([]queueEntry, settings.Events), // broker API channels pushChan: make(chan pushRequest, chanSize), getChan: make(chan getRequest), cancelChan: make(chan producerCancelRequest, 5), + metricChan: make(chan metricsRequest), - // internal broker and ACK handler channels - scheduledACKs: make(chan chanList), + // internal runLoop and ackLoop channels + consumedChan: make(chan batchList), + deleteChan: make(chan int), ackCallback: ackCallback, - metricChan: make(chan metricsRequest), } + b.ctx, b.ctxCancel = context.WithCancel(context.Background()) - var eventLoop interface { - run() - processACK(chanList, int) - } - - if minEvents > 1 { - eventLoop = newBufferingEventLoop(b, sz, minEvents, flushTimeout) - } else { - eventLoop = newDirectEventLoop(b, sz) - } - - b.bufSize = sz - ackLoop := &ackLoop{ - broker: b, - processACK: eventLoop.processACK} - - b.wg.Add(2) - go func() { - defer b.wg.Done() - eventLoop.run() - }() - go func() { - defer b.wg.Done() - ackLoop.run() - }() + b.runLoop = newRunLoop(b) + b.ackLoop = newACKLoop(b) return b } func (b *broker) Close() error { - close(b.done) + b.ctxCancel() return nil } @@ -216,7 +244,7 @@ func (b *broker) QueueType() string { func (b *broker) BufferConfig() queue.BufferConfig { return queue.BufferConfig{ - MaxEvents: b.bufSize, + MaxEvents: len(b.buf), } } @@ -225,9 +253,9 @@ func (b *broker) Producer(cfg queue.ProducerConfig) queue.Producer { } func (b *broker) Get(count int) (queue.Batch, error) { - responseChan := make(chan getResponse, 1) + responseChan := make(chan *batch, 1) select { - case <-b.done: + case <-b.ctx.Done(): return nil, io.EOF case b.getChan <- getRequest{ entryCount: count, responseChan: responseChan}: @@ -235,18 +263,14 @@ func (b *broker) Get(count int) (queue.Batch, error) { // if request has been sent, we have to wait for a response resp := <-responseChan - return &batch{ - queue: b, - entries: resp.entries, - doneChan: resp.ackChan, - }, nil + return resp, nil } func (b *broker) Metrics() (queue.Metrics, error) { responseChan := make(chan memQueueMetrics, 1) select { - case <-b.done: + case <-b.ctx.Done(): return queue.Metrics{}, io.EOF case b.metricChan <- metricsRequest{ responseChan: responseChan}: @@ -255,43 +279,43 @@ func (b *broker) Metrics() (queue.Metrics, error) { return queue.Metrics{ EventCount: opt.UintWith(uint64(resp.currentQueueSize)), - EventLimit: opt.UintWith(uint64(b.bufSize)), + EventLimit: opt.UintWith(uint64(len(b.buf))), UnackedConsumedEvents: opt.UintWith(uint64(resp.occupiedRead)), OldestEntryID: resp.oldestEntryID, }, nil } -var ackChanPool = sync.Pool{ +var batchPool = sync.Pool{ New: func() interface{} { - return &batchACKState{ + return &batch{ doneChan: make(chan batchDoneMsg, 1), } }, } -func newBatchACKState(start, count int, entries []queueEntry) *batchACKState { - ch := ackChanPool.Get().(*batchACKState) - ch.next = nil - ch.start = start - ch.count = count - ch.entries = entries - return ch +func newBatch(queue *broker, start, count int) *batch { + batch := batchPool.Get().(*batch) + batch.next = nil + batch.queue = queue + batch.start = start + batch.count = count + return batch } -func releaseACKChan(c *batchACKState) { - c.next = nil - ackChanPool.Put(c) +func releaseBatch(b *batch) { + b.next = nil + batchPool.Put(b) } -func (l *chanList) prepend(ch *batchACKState) { - ch.next = l.head - l.head = ch +func (l *batchList) prepend(b *batch) { + b.next = l.head + l.head = b if l.tail == nil { - l.tail = ch + l.tail = b } } -func (l *chanList) concat(other *chanList) { +func (l *batchList) concat(other *batchList) { if other.head == nil { return } @@ -305,31 +329,31 @@ func (l *chanList) concat(other *chanList) { l.tail = other.tail } -func (l *chanList) append(ch *batchACKState) { +func (l *batchList) append(b *batch) { if l.head == nil { - l.head = ch + l.head = b } else { - l.tail.next = ch + l.tail.next = b } - l.tail = ch + l.tail = b } -func (l *chanList) empty() bool { +func (l *batchList) empty() bool { return l.head == nil } -func (l *chanList) front() *batchACKState { +func (l *batchList) front() *batch { return l.head } -func (l *chanList) nextBatchChannel() chan batchDoneMsg { +func (l *batchList) nextBatchChannel() chan batchDoneMsg { if l.head == nil { return nil } return l.head.doneChan } -func (l *chanList) pop() *batchACKState { +func (l *batchList) pop() *batch { ch := l.head if ch != nil { l.head = ch.next @@ -342,9 +366,9 @@ func (l *chanList) pop() *batchACKState { return ch } -func (l *chanList) reverse() { +func (l *batchList) reverse() { tmp := *l - *l = chanList{} + *l = batchList{} for !tmp.empty() { l.prepend(tmp.pop()) @@ -364,11 +388,18 @@ func AdjustInputQueueSize(requested, mainQueueSize int) (actual int) { } func (b *batch) Count() int { - return len(b.entries) + return b.count +} + +// Return a pointer to the queueEntry for the i-th element of this batch +func (b *batch) rawEntry(i int) *queueEntry { + // Indexes wrap around the end of the queue buffer + return &b.queue.buf[(b.start+i)%len(b.queue.buf)] } +// Return the event referenced by the i-th element of this batch func (b *batch) Entry(i int) interface{} { - return b.entries[i].event + return b.rawEntry(i).event } func (b *batch) FreeEntries() { diff --git a/libbeat/publisher/queue/memqueue/config.go b/libbeat/publisher/queue/memqueue/config.go index 5e4f78ae41c..7d9593b30e3 100644 --- a/libbeat/publisher/queue/memqueue/config.go +++ b/libbeat/publisher/queue/memqueue/config.go @@ -26,19 +26,23 @@ import ( ) type config struct { - Events int `config:"events" validate:"min=32"` - FlushMinEvents int `config:"flush.min_events" validate:"min=0"` - FlushTimeout time.Duration `config:"flush.timeout"` + Events int `config:"events" validate:"min=32"` + // This field is named MaxGetRequest because its logical effect is to give + // a maximum on the number of events a Get request can return, but the + // user-exposed name is "flush.min_events" for backwards compatibility, + // since it used to control buffer size in the internal buffer chain. + MaxGetRequest int `config:"flush.min_events" validate:"min=0"` + FlushTimeout time.Duration `config:"flush.timeout"` } var defaultConfig = config{ - Events: 3200, - FlushMinEvents: 1600, - FlushTimeout: 10 * time.Second, + Events: 3200, + MaxGetRequest: 1600, + FlushTimeout: 10 * time.Second, } func (c *config) Validate() error { - if c.FlushMinEvents > c.Events { + if c.MaxGetRequest > c.Events { return errors.New("flush.min_events must be less events") } return nil @@ -55,8 +59,8 @@ func SettingsForUserConfig(cfg *c.C) (Settings, error) { } //nolint:gosimple // Actually want this conversion to be explicit since the types aren't definitionally equal. return Settings{ - Events: config.Events, - FlushMinEvents: config.FlushMinEvents, - FlushTimeout: config.FlushTimeout, + Events: config.Events, + MaxGetRequest: config.MaxGetRequest, + FlushTimeout: config.FlushTimeout, }, nil } diff --git a/libbeat/publisher/queue/memqueue/eventloop.go b/libbeat/publisher/queue/memqueue/eventloop.go deleted file mode 100644 index ccb50565365..00000000000 --- a/libbeat/publisher/queue/memqueue/eventloop.go +++ /dev/null @@ -1,560 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package memqueue - -import ( - "time" - - "github.com/elastic/beats/v7/libbeat/publisher/queue" - "github.com/elastic/elastic-agent-libs/logp" -) - -// directEventLoop implements the broker main event loop. It buffers events, -// but tries to forward events as early as possible. -type directEventLoop struct { - broker *broker - buf ringBuffer - deleteChan chan int - - // pendingACKs aggregates a list of ACK channels for batches that have been sent - // to consumers, which is then sent to the broker's scheduledACKs channel. - pendingACKs chanList - - nextEntryID queue.EntryID -} - -// bufferingEventLoop implements the broker main event loop. -// Events in the buffer are forwarded to consumers only if the buffer is full or on flush timeout. -type bufferingEventLoop struct { - broker *broker - deleteChan chan int - - // The current buffer that incoming events are appended to. When it gets - // full enough, or enough time has passed, it is added to flushList. - // Events will still be added to buf even after it is in flushList, until - // either it reaches minEvents or a consumer requests it. - buf *batchBuffer - - // flushList is the list of buffers that are ready to be sent to consumers. - flushList flushList - - // pendingACKs aggregates a list of ACK channels for batches that have been sent - // to consumers, which is then sent to the broker's scheduledACKs channel. - pendingACKs chanList - - // The number of events currently waiting in the queue, including - // those that have not yet been acked. - eventCount int - - // The next entry ID that will be read by a consumer, and the next - // entry ID that has been consumed and is waiting for acknowledgment. - // We need to track these here because bufferingEventLoop discards - // its event buffers when they are sent to consumers, so we can't - // look directly at the event itself to get the current id like we - // do in the unbuffered loop. - nextConsumedID queue.EntryID - nextACKedID queue.EntryID - - minEvents int - maxEvents int - flushTimeout time.Duration - - // buffer flush timer state - timer *time.Timer - idleC <-chan time.Time - - nextEntryID queue.EntryID -} - -type flushList struct { - head *batchBuffer - tail *batchBuffer - count int -} - -func newDirectEventLoop(b *broker, size int) *directEventLoop { - l := &directEventLoop{ - broker: b, - deleteChan: make(chan int), - } - l.buf.init(b.logger, size) - - return l -} - -func (l *directEventLoop) run() { - var ( - broker = l.broker - buf = &l.buf - ) - - for { - var pushChan chan pushRequest - // Push requests are enabled if the queue isn't yet full. - if !l.buf.Full() { - pushChan = l.broker.pushChan - } - - var getChan chan getRequest - // Get requests are enabled if there are events in the queue - // that haven't yet been sent to a consumer. - if buf.Avail() > 0 { - getChan = l.broker.getChan - } - - var schedACKs chan chanList - // Sending pending ACKs to the broker's scheduled ACKs - // channel is enabled if it is nonempty. - if !l.pendingACKs.empty() { - schedACKs = l.broker.scheduledACKs - } - - select { - case <-broker.done: - return - - case req := <-pushChan: // producer pushing new event - l.insert(&req) - - case count := <-l.deleteChan: - l.buf.removeEntries(count) - - case req := <-l.broker.cancelChan: // producer cancelling active events - l.handleCancel(&req) - // re-enable pushRequest if buffer can take new events - - case req := <-getChan: // consumer asking for next batch - l.handleGetRequest(&req) - - case req := <-l.broker.metricChan: // broker asking for queue metrics - l.handleMetricsRequest(&req) - - case schedACKs <- l.pendingACKs: - // on send complete list of pending batches has been forwarded -> clear list - l.pendingACKs = chanList{} - } - } -} - -func (l *directEventLoop) handleMetricsRequest(req *metricsRequest) { - // If the queue is empty, we report the "oldest" ID as the next - // one that will be assigned. Otherwise, we report the ID attached - // to the oldest queueEntry. - oldestEntryID := l.nextEntryID - if oldestEntry := l.buf.OldestEntry(); oldestEntry != nil { - oldestEntryID = oldestEntry.id - } - - req.responseChan <- memQueueMetrics{ - currentQueueSize: l.buf.Items(), - occupiedRead: l.buf.reserved, - oldestEntryID: oldestEntryID, - } -} - -func (l *directEventLoop) insert(req *pushRequest) { - log := l.broker.logger - - if req.producer != nil && req.producer.state.cancelled { - reportCancelledState(log, req) - } else { - req.resp <- l.nextEntryID - l.buf.insert(queueEntry{ - event: req.event, - id: l.nextEntryID, - producer: req.producer, - producerID: req.producerID}) - l.nextEntryID++ - } -} - -func (l *directEventLoop) handleCancel(req *producerCancelRequest) { - // log := l.broker.logger - // log.Debug("handle cancel request") - - var removed int - - if producer := req.producer; producer != nil { - producer.state.cancelled = true - removed = l.buf.cancel(producer) - } - - // signal cancel request being finished - if req.resp != nil { - req.resp <- producerCancelResponse{removed: removed} - } -} - -func (l *directEventLoop) handleGetRequest(req *getRequest) { - // log := l.broker.logger - // log.Debugf("try reserve %v events", req.sz) - - start, buf := l.buf.reserve(req.entryCount) - count := len(buf) - if count == 0 { - panic("empty batch returned") - } - - ackCH := newBatchACKState(start, count, l.buf.entries) - - req.responseChan <- getResponse{ackCH.doneChan, buf} - l.pendingACKs.append(ackCH) -} - -// processACK is called by the ackLoop to process the list of acked batches -func (l *directEventLoop) processACK(lst chanList, N int) { - log := l.broker.logger - { - start := time.Now() - log.Debug("handle ACKs: ", N) - defer func() { - log.Debug("handle ACK took: ", time.Since(start)) - }() - } - - entries := l.buf.entries - - firstIndex := lst.front().start - - // We want to acknowledge N events starting at position firstIndex - // in the entries array. - // We iterate over the events from last to first, so we encounter the - // highest producer IDs first and can skip subsequent callbacks to the - // same producer. - producerCallbacks := []func(){} - for i := N - 1; i >= 0; i-- { - // idx is the index in entries of the i-th event after firstIndex, wrapping - // around the end of the array. - idx := (firstIndex + i) % len(entries) - entry := &entries[idx] - - producer := entry.producer - - // Set the producer in the entires array to nil to mark it as visited; a nil - // producer indicates that an entry requires no more ack processing (either - // because it has already been ACKed, or because its producer does not listen to ACKs). - entry.producer = nil - if producer == nil || entry.producerID <= producer.state.lastACK { - // This has a lower index than the previous ACK for this producer, - // so it was covered in the previous call and we can skip it. - continue - } - // This update is safe because lastACK is only used from the event loop. - count := int(entry.producerID - producer.state.lastACK) - producer.state.lastACK = entry.producerID - - producerCallbacks = append(producerCallbacks, func() { producer.state.cb(count) }) - } - l.deleteChan <- N - for _, f := range producerCallbacks { - f() - } -} - -func newBufferingEventLoop(b *broker, size int, minEvents int, flushTimeout time.Duration) *bufferingEventLoop { - l := &bufferingEventLoop{ - broker: b, - deleteChan: make(chan int), - maxEvents: size, - minEvents: minEvents, - flushTimeout: flushTimeout, - } - l.buf = newBatchBuffer(l.minEvents) - - l.timer = time.NewTimer(flushTimeout) - if !l.timer.Stop() { - <-l.timer.C - } - - return l -} - -func (l *bufferingEventLoop) run() { - broker := l.broker - - for { - var pushChan chan pushRequest - // Push requests are enabled if the queue isn't yet full. - if l.eventCount < l.maxEvents { - pushChan = l.broker.pushChan - } - - var getChan chan getRequest - // Get requests are enabled if the queue has events that - // weren't yet sent to consumers. - if !l.flushList.empty() { - getChan = l.broker.getChan - } - - var schedACKs chan chanList - // Enable sending to the scheduled ACKs channel if we have - // something to send. - if !l.pendingACKs.empty() { - schedACKs = l.broker.scheduledACKs - } - - select { - case <-broker.done: - return - - case req := <-pushChan: // producer pushing new event - l.handleInsert(&req) - - case req := <-l.broker.cancelChan: // producer cancelling active events - l.handleCancel(&req) - - case req := <-getChan: // consumer asking for next batch - l.handleGetRequest(&req) - - case schedACKs <- l.pendingACKs: - l.pendingACKs = chanList{} - - case count := <-l.deleteChan: - l.handleDelete(count) - - case req := <-l.broker.metricChan: // broker asking for queue metrics - l.handleMetricsRequest(&req) - - case <-l.idleC: - l.idleC = nil - l.timer.Stop() - if l.buf.length() > 0 { - l.flushBuffer() - } - } - } -} - -func (l *bufferingEventLoop) handleMetricsRequest(req *metricsRequest) { - req.responseChan <- memQueueMetrics{ - currentQueueSize: l.eventCount, - occupiedRead: int(l.nextConsumedID - l.nextACKedID), - oldestEntryID: l.nextACKedID, - } -} - -func (l *bufferingEventLoop) handleInsert(req *pushRequest) { - if l.insert(req, l.nextEntryID) { - // Send back the new event id. - req.resp <- l.nextEntryID - - l.nextEntryID++ - l.eventCount++ - - L := l.buf.length() - if !l.buf.flushed { - if L < l.minEvents { - l.startFlushTimer() - } else { - l.stopFlushTimer() - l.flushBuffer() - l.buf = newBatchBuffer(l.minEvents) - } - } else if L >= l.minEvents { - l.buf = newBatchBuffer(l.minEvents) - } - } -} - -func (l *bufferingEventLoop) insert(req *pushRequest, id queue.EntryID) bool { - if req.producer != nil && req.producer.state.cancelled { - reportCancelledState(l.broker.logger, req) - return false - } - - l.buf.add(queueEntry{ - event: req.event, - id: id, - producer: req.producer, - producerID: req.producerID, - }) - return true -} - -func (l *bufferingEventLoop) handleCancel(req *producerCancelRequest) { - removed := 0 - if producer := req.producer; producer != nil { - // remove from actively flushed buffers - for buf := l.flushList.head; buf != nil; buf = buf.next { - removed += buf.cancel(producer) - } - if !l.buf.flushed { - removed += l.buf.cancel(producer) - } - - producer.state.cancelled = true - } - - if req.resp != nil { - req.resp <- producerCancelResponse{removed: removed} - } - - // remove flushed but empty buffers: - tmpList := flushList{} - for l.flushList.head != nil { - b := l.flushList.head - l.flushList.head = b.next - - if b.length() > 0 { - tmpList.add(b) - } - } - l.flushList = tmpList - l.eventCount -= removed -} - -func (l *bufferingEventLoop) handleGetRequest(req *getRequest) { - buf := l.flushList.head - if buf == nil { - panic("get from non-flushed buffers") - } - - count := buf.length() - if count == 0 { - panic("empty buffer in flush list") - } - - if sz := req.entryCount; sz > 0 { - if sz < count { - count = sz - } - } - - if count == 0 { - panic("empty batch returned") - } - - entries := buf.entries[:count] - acker := newBatchACKState(0, count, entries) - - req.responseChan <- getResponse{acker.doneChan, entries} - l.pendingACKs.append(acker) - - l.nextConsumedID += queue.EntryID(len(entries)) - buf.entries = buf.entries[count:] - if buf.length() == 0 { - l.advanceFlushList() - } -} - -func (l *bufferingEventLoop) handleDelete(count int) { - l.nextACKedID += queue.EntryID(count) - l.eventCount -= count -} - -func (l *bufferingEventLoop) startFlushTimer() { - if l.idleC == nil { - l.timer.Reset(l.flushTimeout) - l.idleC = l.timer.C - } -} - -func (l *bufferingEventLoop) stopFlushTimer() { - if l.idleC != nil { - l.idleC = nil - if !l.timer.Stop() { - <-l.timer.C - } - } -} - -func (l *bufferingEventLoop) advanceFlushList() { - l.flushList.pop() - if l.flushList.count == 0 && l.buf.flushed { - l.buf = newBatchBuffer(l.minEvents) - } -} - -func (l *bufferingEventLoop) flushBuffer() { - l.buf.flushed = true - l.flushList.add(l.buf) -} - -// Called by ackLoop. This function exists to decouple the work of collecting -// and running producer callbacks from logical deletion of the events, so -// input callbacks can't block the main queue goroutine. -func (l *bufferingEventLoop) processACK(lst chanList, N int) { - ackCallbacks := []func(){} - // First we traverse the entries we're about to remove, collecting any callbacks - // we need to run. - lst.reverse() - for !lst.empty() { - current := lst.pop() - entries := current.entries - - // Traverse entries from last to first, so we can acknowledge the most recent - // ones first and skip subsequent producer callbacks. - for i := len(entries) - 1; i >= 0; i-- { - entry := &entries[i] - if entry.producer == nil { - continue - } - - if entry.producerID <= entry.producer.state.lastACK { - // This index was already acknowledged on a previous iteration, skip. - entry.producer = nil - continue - } - producerState := entry.producer.state - count := int(entry.producerID - producerState.lastACK) - ackCallbacks = append(ackCallbacks, func() { producerState.cb(count) }) - entry.producer.state.lastACK = entry.producerID - entry.producer = nil - } - } - // Signal the queue to delete the events - l.deleteChan <- N - - // The events have been removed; notify their listeners. - for _, f := range ackCallbacks { - f() - } -} - -func (l *flushList) pop() { - l.count-- - if l.count > 0 { - l.head = l.head.next - } else { - l.head = nil - l.tail = nil - } -} - -func (l *flushList) empty() bool { - return l.head == nil -} - -func (l *flushList) add(b *batchBuffer) { - l.count++ - b.next = nil - if l.tail == nil { - l.head = b - l.tail = b - } else { - l.tail.next = b - l.tail = b - } -} - -func reportCancelledState(log *logp.Logger, req *pushRequest) { - // do not add waiting events if producer did send cancel signal - if cb := req.producer.state.dropCB; cb != nil { - cb(req.event) - } -} diff --git a/libbeat/publisher/queue/memqueue/internal_api.go b/libbeat/publisher/queue/memqueue/internal_api.go index 07485af99b4..ae93a5df0d5 100644 --- a/libbeat/publisher/queue/memqueue/internal_api.go +++ b/libbeat/publisher/queue/memqueue/internal_api.go @@ -46,13 +46,8 @@ type producerCancelResponse struct { // consumer -> broker API type getRequest struct { - entryCount int // request entryCount events from the broker - responseChan chan getResponse // channel to send response to -} - -type getResponse struct { - ackChan chan batchDoneMsg - entries []queueEntry + entryCount int // request entryCount events from the broker + responseChan chan *batch // channel to send response to } type batchDoneMsg struct{} diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 531acdce3e9..141514483f3 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -46,10 +46,10 @@ func TestProduceConsumer(t *testing.T) { maxEvents := 1024 minEvents := 32 - rand.Seed(seed) - events := rand.Intn(maxEvents-minEvents) + minEvents - batchSize := rand.Intn(events-8) + 4 - bufferSize := rand.Intn(batchSize*2) + 4 + randGen := rand.New(rand.NewSource(seed)) + events := randGen.Intn(maxEvents-minEvents) + minEvents + batchSize := randGen.Intn(events-8) + 4 + bufferSize := randGen.Intn(batchSize*2) + 4 // events := 4 // batchSize := 1 @@ -90,9 +90,9 @@ func TestProduceConsumer(t *testing.T) { func TestProducerDoesNotBlockWhenCancelled(t *testing.T) { q := NewQueue(nil, nil, Settings{ - Events: 2, // Queue size - FlushMinEvents: 1, // make sure the queue won't buffer events - FlushTimeout: time.Millisecond, + Events: 2, // Queue size + MaxGetRequest: 1, // make sure the queue won't buffer events + FlushTimeout: time.Millisecond, }, 0) p := q.Producer(queue.ProducerConfig{ @@ -155,9 +155,9 @@ func TestQueueMetricsDirect(t *testing.T) { // Test the directEventLoop directSettings := Settings{ - Events: maxEvents, - FlushMinEvents: 1, - FlushTimeout: 0, + Events: maxEvents, + MaxGetRequest: 1, + FlushTimeout: 0, } t.Logf("Testing directEventLoop") queueTestWithSettings(t, directSettings, eventsToTest, "directEventLoop") @@ -169,9 +169,9 @@ func TestQueueMetricsBuffer(t *testing.T) { maxEvents := 10 // Test Buffered Event Loop bufferedSettings := Settings{ - Events: maxEvents, - FlushMinEvents: eventsToTest, // The buffered event loop can only return FlushMinEvents per Get() - FlushTimeout: time.Millisecond, + Events: maxEvents, + MaxGetRequest: eventsToTest, // The buffered event loop can only return FlushMinEvents per Get() + FlushTimeout: time.Millisecond, } t.Logf("Testing bufferedEventLoop") queueTestWithSettings(t, bufferedSettings, eventsToTest, "bufferedEventLoop") @@ -219,9 +219,9 @@ func TestProducerCancelRemovesEvents(t *testing.T) { func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.QueueFactory { return func(_ *testing.T) queue.Queue { return NewQueue(nil, nil, Settings{ - Events: sz, - FlushMinEvents: minEvents, - FlushTimeout: flushTimeout, + Events: sz, + MaxGetRequest: minEvents, + FlushTimeout: flushTimeout, }, 0) } } @@ -343,12 +343,12 @@ func TestEntryIDs(t *testing.T) { }) t.Run("acking in forward order with bufferedEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000, FlushMinEvents: 2, FlushTimeout: time.Microsecond}, 0) + testQueue := NewQueue(nil, nil, Settings{Events: 1000, MaxGetRequest: 2, FlushTimeout: time.Microsecond}, 0) testForward(testQueue) }) t.Run("acking in reverse order with bufferedEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000, FlushMinEvents: 2, FlushTimeout: time.Microsecond}, 0) + testQueue := NewQueue(nil, nil, Settings{Events: 1000, MaxGetRequest: 2, FlushTimeout: time.Microsecond}, 0) testBackward(testQueue) }) } diff --git a/libbeat/publisher/queue/memqueue/ringbuf.go b/libbeat/publisher/queue/memqueue/ringbuf.go deleted file mode 100644 index d593cc2351b..00000000000 --- a/libbeat/publisher/queue/memqueue/ringbuf.go +++ /dev/null @@ -1,210 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package memqueue - -import ( - "fmt" - - "github.com/elastic/elastic-agent-libs/logp" -) - -// Internal event ring buffer. -// The ring is split into 2 contiguous regions. -// Events are appended to region A until it grows to the end of the internal -// buffer. Then region B is created at the beginning of the internal buffer, -// and events are inserted there until region A is emptied. When A becomes empty, -// we rename region B to region A, and the cycle repeats every time we wrap around -// the internal array storage. -type ringBuffer struct { - logger *logp.Logger - - entries []queueEntry - - // The underlying array is divided up into two contiguous regions. - regA, regB region - - // The number of events awaiting ACK at the beginning of region A. - reserved int -} - -// region represents a contiguous region in ringBuffer's internal storage (i.e. -// one that does not cross the end of the array). -type region struct { - // The starting position of this region within the full event buffer. - index int - - // The number of events currently stored in this region. - size int -} - -func (b *ringBuffer) init(logger *logp.Logger, size int) { - *b = ringBuffer{ - logger: logger, - entries: make([]queueEntry, size), - } -} - -// Returns true if the ringBuffer is full after handling -// the given insertion, false otherwise. -func (b *ringBuffer) insert(entry queueEntry) { - // always insert into region B, if region B exists. - // That is, we have 2 regions and region A is currently processed by consumers - if b.regB.size > 0 { - // log.Debug(" - push into B region") - - idx := b.regB.index + b.regB.size - avail := b.regA.index - idx - if avail > 0 { - b.entries[idx] = entry - b.regB.size++ - } - return - } - - // region B does not exist yet, check if region A is available for use - idx := b.regA.index + b.regA.size - if b.regA.index+b.regA.size >= len(b.entries) { - // region A extends to the end of the buffer - if b.regA.index > 0 { - // If there is space before region A, create - // region B there. - b.regB = region{index: 0, size: 1} - b.entries[0] = entry - } - return - } - - // space available in region A -> let's append the event - // log.Debug(" - push into region A") - b.entries[idx] = entry - b.regA.size++ -} - -// cancel removes all buffered events matching `st`, not yet reserved by -// any consumer -func (b *ringBuffer) cancel(producer *ackProducer) int { - cancelledB := b.cancelRegion(producer, b.regB) - b.regB.size -= cancelledB - - cancelledA := b.cancelRegion(producer, region{ - index: b.regA.index + b.reserved, - size: b.regA.size - b.reserved, - }) - b.regA.size -= cancelledA - - return cancelledA + cancelledB -} - -// cancelRegion removes the events in the specified range having -// the specified produceState. It returns the number of events -// removed. -func (b *ringBuffer) cancelRegion(producer *ackProducer, reg region) int { - start := reg.index - end := start + reg.size - entries := b.entries[start:end] - - toEntries := entries[:0] - - // filter loop - for i := 0; i < reg.size; i++ { - if entries[i].producer == producer { - continue // remove - } - toEntries = append(toEntries, entries[i]) - } - - // re-initialize old buffer elements to help garbage collector - entries = entries[len(toEntries):] - for i := range entries { - entries[i] = queueEntry{} - } - - return len(entries) -} - -// reserve returns up to `sz` events from the brokerBuffer, -// exclusively marking the events as 'reserved'. Subsequent calls to `reserve` -// will only return enqueued and non-reserved events from the buffer. -// If `sz == -1`, all available events will be reserved. -func (b *ringBuffer) reserve(sz int) (int, []queueEntry) { - use := b.regA.size - b.reserved - - if sz > 0 && use > sz { - use = sz - } - - start := b.regA.index + b.reserved - end := start + use - b.reserved += use - return start, b.entries[start:end] -} - -// Remove the specified number of previously-reserved buffer entries from the -// start of region A. Called by the event loop when events are ACKed by -// consumers. -func (b *ringBuffer) removeEntries(count int) { - if b.regA.size < count { - panic(fmt.Errorf("commit region to big (commit region=%v, buffer size=%v)", - count, b.regA.size, - )) - } - - // clear region, so published events can be collected by the garbage collector: - end := b.regA.index + count - for i := b.regA.index; i < end; i++ { - b.entries[i] = queueEntry{} - } - - b.regA.index = end - b.regA.size -= count - b.reserved -= count - if b.regA.size == 0 { - // region A is empty, transfer region B into region A - b.regA = b.regB - b.regB.index = 0 - b.regB.size = 0 - } -} - -// Number of events that consumers can currently request. -func (b *ringBuffer) Avail() int { - return b.regA.size - b.reserved -} - -func (b *ringBuffer) Full() bool { - if b.regB.size > 0 { - return b.regA.index == (b.regB.index + b.regB.size) - } - return b.regA.size == len(b.entries) -} - -func (b *ringBuffer) Size() int { - return len(b.entries) -} - -// Items returns the count of events currently in the buffer -func (b *ringBuffer) Items() int { - return b.regA.size + b.regB.size -} - -func (b *ringBuffer) OldestEntry() *queueEntry { - if b.regA.size == 0 { - return nil - } - return &b.entries[b.regA.index] -} diff --git a/libbeat/publisher/queue/memqueue/runloop.go b/libbeat/publisher/queue/memqueue/runloop.go new file mode 100644 index 00000000000..0f7788c6209 --- /dev/null +++ b/libbeat/publisher/queue/memqueue/runloop.go @@ -0,0 +1,307 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memqueue + +import ( + "time" + + "github.com/elastic/beats/v7/libbeat/publisher/queue" +) + +// runLoop internal state. These fields could mostly be local variables +// in runLoop.run(), but they're exposed here to facilitate testing. In a +// live queue, only the runLoop goroutine should read or write these fields. +type runLoop struct { + broker *broker + + // The index of the beginning of the current ring buffer within its backing + // array. If the queue isn't empty, bufPos points to the oldest remaining + // event. + bufPos int + + // The total number of events in the queue. + eventCount int + + // The number of consumed events waiting for acknowledgment. The next Get + // request will return events starting at position + // (bufPos + consumedCount) % len(buf). + consumedCount int + + // The list of batches that have been consumed and are waiting to be sent + // to ackLoop for acknowledgment handling. (This list doesn't contain all + // outstanding batches, only the ones not yet forwarded to ackLoop.) + consumedBatches batchList + + // If there aren't enough events ready to fill an incoming get request, + // the queue may block based on its flush settings. When this happens, + // pendingGetRequest stores the request until we're ready to handle it. + pendingGetRequest *getRequest + + // This timer tracks the configured flush timeout when we will respond + // to a pending getRequest even if we can't fill the requested event count. + // It is active if and only if pendingGetRequest is non-nil. + getTimer *time.Timer + + // TODO (https://github.com/elastic/beats/issues/37893): entry IDs were a + // workaround for an external project that no longer exists. At this point + // they just complicate the API and should be removed. + nextEntryID queue.EntryID +} + +func newRunLoop(broker *broker) *runLoop { + var timer *time.Timer + + // Create the timer we'll use for get requests, but stop it until a + // get request is active. + if broker.settings.FlushTimeout > 0 { + timer = time.NewTimer(broker.settings.FlushTimeout) + if !timer.Stop() { + <-timer.C + } + } + return &runLoop{ + broker: broker, + getTimer: timer, + } +} + +func (l *runLoop) run() { + for l.broker.ctx.Err() == nil { + l.runIteration() + } +} + +// Perform one iteration of the queue's main run loop. Broken out into a +// standalone helper function to allow testing of loop invariants. +func (l *runLoop) runIteration() { + var pushChan chan pushRequest + // Push requests are enabled if the queue isn't yet full. + if l.eventCount < len(l.broker.buf) { + pushChan = l.broker.pushChan + } + + var getChan chan getRequest + // Get requests are enabled if the queue has events that weren't yet sent + // to consumers, and no existing request is active. + if l.pendingGetRequest == nil && l.eventCount > l.consumedCount { + getChan = l.broker.getChan + } + + var consumedChan chan batchList + // Enable sending to the scheduled ACKs channel if we have + // something to send. + if !l.consumedBatches.empty() { + consumedChan = l.broker.consumedChan + } + + var timeoutChan <-chan time.Time + // Enable the timeout channel if a get request is waiting for events + if l.pendingGetRequest != nil { + timeoutChan = l.getTimer.C + } + + select { + case <-l.broker.ctx.Done(): + return + + case req := <-pushChan: // producer pushing new event + l.handleInsert(&req) + + case req := <-l.broker.cancelChan: // producer cancelling active events + l.handleCancel(&req) + + case req := <-getChan: // consumer asking for next batch + l.handleGetRequest(&req) + + case consumedChan <- l.consumedBatches: + // We've sent all the pending batches to the ackLoop for processing, + // clear the pending list. + l.consumedBatches = batchList{} + + case count := <-l.broker.deleteChan: + l.handleDelete(count) + + case req := <-l.broker.metricChan: // asking broker for queue metrics + l.handleMetricsRequest(&req) + + case <-timeoutChan: + // The get timer has expired, handle the blocked request + l.getTimer.Stop() + l.handleGetReply(l.pendingGetRequest) + l.pendingGetRequest = nil + } +} + +func (l *runLoop) handleGetRequest(req *getRequest) { + if req.entryCount <= 0 || req.entryCount > l.broker.settings.MaxGetRequest { + req.entryCount = l.broker.settings.MaxGetRequest + } + if l.getRequestShouldBlock(req) { + l.pendingGetRequest = req + l.getTimer.Reset(l.broker.settings.FlushTimeout) + return + } + l.handleGetReply(req) +} + +func (l *runLoop) getRequestShouldBlock(req *getRequest) bool { + if l.broker.settings.FlushTimeout <= 0 { + // Never block if the flush timeout isn't positive + return false + } + eventsAvailable := l.eventCount - l.consumedCount + // Block if the available events aren't enough to fill the request + return eventsAvailable < req.entryCount +} + +// Respond to the given get request without blocking or waiting for more events +func (l *runLoop) handleGetReply(req *getRequest) { + eventsAvailable := l.eventCount - l.consumedCount + batchSize := req.entryCount + if eventsAvailable < batchSize { + batchSize = eventsAvailable + } + + startIndex := l.bufPos + l.consumedCount + batch := newBatch(l.broker, startIndex, batchSize) + + // Send the batch to the caller and update internal state + req.responseChan <- batch + l.consumedBatches.append(batch) + l.consumedCount += batchSize +} + +func (l *runLoop) handleDelete(count int) { + // Clear the internal event pointers so they can be garbage collected + for i := 0; i < count; i++ { + index := (l.bufPos + i) % len(l.broker.buf) + l.broker.buf[index].event = nil + } + + // Advance position and counters + l.bufPos = (l.bufPos + count) % len(l.broker.buf) + l.eventCount -= count + l.consumedCount -= count +} + +func (l *runLoop) handleInsert(req *pushRequest) { + if l.insert(req, l.nextEntryID) { + // Send back the new event id. + req.resp <- l.nextEntryID + + l.nextEntryID++ + l.eventCount++ + + // See if this gave us enough for a new batch + l.maybeUnblockGetRequest() + } +} + +// Checks if we can handle pendingGetRequest yet, and handles it if so +func (l *runLoop) maybeUnblockGetRequest() { + // If a get request is blocked waiting for more events, check if + // we should unblock it. + if getRequest := l.pendingGetRequest; getRequest != nil { + available := l.eventCount - l.consumedCount + if available >= getRequest.entryCount { + l.pendingGetRequest = nil + if !l.getTimer.Stop() { + <-l.getTimer.C + } + l.handleGetReply(getRequest) + } + } +} + +// Returns true if the event was inserted, false if insertion was cancelled. +func (l *runLoop) insert(req *pushRequest, id queue.EntryID) bool { + if req.producer != nil && req.producer.state.cancelled { + reportCancelledState(req) + return false + } + + index := (l.bufPos + l.eventCount) % len(l.broker.buf) + l.broker.buf[index] = queueEntry{ + event: req.event, + id: id, + producer: req.producer, + producerID: req.producerID, + } + return true +} + +func (l *runLoop) handleMetricsRequest(req *metricsRequest) { + oldestEntryID := l.nextEntryID + if l.eventCount > 0 { + index := l.bufPos % len(l.broker.buf) + oldestEntryID = l.broker.buf[index].id + } + + req.responseChan <- memQueueMetrics{ + currentQueueSize: l.eventCount, + occupiedRead: l.consumedCount, + oldestEntryID: oldestEntryID, + } +} + +func (l *runLoop) handleCancel(req *producerCancelRequest) { + var removedCount int + + // Traverse all unconsumed events in the buffer, removing any with + // the specified producer. As we go we condense all the remaining + // events to be sequential. + buf := l.broker.buf + startIndex := l.bufPos + l.consumedCount + unconsumedEventCount := l.eventCount - l.consumedCount + for i := 0; i < unconsumedEventCount; i++ { + readIndex := (startIndex + i) % len(buf) + if buf[readIndex].producer == req.producer { + // The producer matches, skip this event + removedCount++ + } else { + // Move the event to its final position after accounting for any + // earlier indices that were removed. + // (Count backwards from (startIndex + i), not from readIndex, to avoid + // sign issues when the buffer wraps.) + writeIndex := (startIndex + i - removedCount) % len(buf) + buf[writeIndex] = buf[readIndex] + } + } + + // Clear the event pointers at the end of the buffer so we don't keep + // old events in memory by accident. + for i := 0; i < removedCount; i++ { + index := (l.bufPos + l.eventCount - removedCount + i) % len(buf) + buf[index].event = nil + } + + // Subtract removed events from the internal event count + l.eventCount -= removedCount + + // signal cancel request being finished + if req.resp != nil { + req.resp <- producerCancelResponse{removed: removedCount} + } +} + +func reportCancelledState(req *pushRequest) { + // do not add waiting events if producer did send cancel signal + if cb := req.producer.state.dropCB; cb != nil { + cb(req.event) + } +} diff --git a/libbeat/publisher/queue/memqueue/runloop_test.go b/libbeat/publisher/queue/memqueue/runloop_test.go new file mode 100644 index 00000000000..9b3a467647a --- /dev/null +++ b/libbeat/publisher/queue/memqueue/runloop_test.go @@ -0,0 +1,114 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memqueue + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-libs/logp" +) + +func TestFlushSettingsDoNotBlockFullBatches(t *testing.T) { + // In previous versions of the queue, setting flush.min_events (currently + // corresponding to memqueue.Settings.MaxGetRequest) to a high value would + // delay get requests even if the number of requested events was immediately + // available. This test verifies that Get requests that can be completely + // filled do not wait for the flush timer. + + broker := newQueue( + logp.NewLogger("testing"), + nil, + Settings{ + Events: 1000, + MaxGetRequest: 500, + FlushTimeout: 10 * time.Second, + }, + 10) + + producer := newProducer(broker, nil, nil, false) + rl := broker.runLoop + for i := 0; i < 100; i++ { + // Pair each publish call with an iteration of the run loop so we + // get a response. + go rl.runIteration() + _, ok := producer.Publish(i) + require.True(t, ok, "Queue publish call must succeed") + } + + // The queue now has 100 events, but MaxGetRequest is 500. + // In the old queue, a Get call now would block until the flush + // timer expires. With current changes, it should return + // immediately on any request size up to 100. + go func() { + // Run the Get asynchronously so the test itself doesn't block if + // there's a logical error. + _, _ = broker.Get(100) + }() + rl.runIteration() + assert.Nil(t, rl.pendingGetRequest, "Queue should have no pending get request since the request should succeed immediately") + assert.Equal(t, 100, rl.consumedCount, "Queue should have a consumedCount of 100 after a consumer requested all its events") +} + +func TestFlushSettingsBlockPartialBatches(t *testing.T) { + // The previous test confirms that Get requests are handled immediately if + // there are enough events. This one uses the same setup to confirm that + // Get requests are delayed if there aren't enough events. + + broker := newQueue( + logp.NewLogger("testing"), + nil, + Settings{ + Events: 1000, + MaxGetRequest: 500, + FlushTimeout: 10 * time.Second, + }, + 10) + + producer := newProducer(broker, nil, nil, false) + rl := broker.runLoop + for i := 0; i < 100; i++ { + // Pair each publish call with an iteration of the run loop so we + // get a response. + go rl.runIteration() + _, ok := producer.Publish("some event") + require.True(t, ok, "Queue publish call must succeed") + } + + // The queue now has 100 events, and a positive flush timeout, so a + // request for 101 events should block. + go func() { + // Run the Get asynchronously so the test itself doesn't block if + // there's a logical error. + _, _ = broker.Get(101) + }() + rl.runIteration() + assert.NotNil(t, rl.pendingGetRequest, "Queue should have a pending get request since the queue doesn't have the requested event count") + assert.Equal(t, 0, rl.consumedCount, "Queue should have a consumedCount of 0 since the Get request couldn't be completely filled") + + // Now confirm that adding one more event unblocks the request + go func() { + _, _ = producer.Publish("some event") + }() + rl.runIteration() + assert.Nil(t, rl.pendingGetRequest, "Queue should have no pending get request since adding an event should unblock the previous one") + assert.Equal(t, 101, rl.consumedCount, "Queue should have a consumedCount of 101 after adding an event unblocked the pending get request") +}