diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 2438268c106..2c04e5965d3 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -62,6 +62,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Wildcard fields no longer have a default ignore_above setting of 1024. {issue}30096[30096] {pull}30668[30668] - Remove `common.MapStr` and use `mapstr.M` from `github.com/elastic/elastic-agent-libs` instead. {pull}31420[31420] - Remove `queue.Consumer`. Queues can now be read via a `Get` call directly on the queue object. {pull}31502[31502] +- The `queue.Batch` API now provides access to individual events instead of an array. {pull}31699[31699] ==== Bugfixes diff --git a/libbeat/publisher/pipeline/pipeline_test.go b/libbeat/publisher/pipeline/pipeline_test.go index 4efff03070a..57cdbd635ea 100644 --- a/libbeat/publisher/pipeline/pipeline_test.go +++ b/libbeat/publisher/pipeline/pipeline_test.go @@ -21,7 +21,6 @@ import ( "sync" "github.com/elastic/beats/v7/libbeat/common/atomic" - "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) @@ -33,7 +32,7 @@ type testQueue struct { } type testProducer struct { - publish func(try bool, event publisher.Event) bool + publish func(try bool, event interface{}) bool cancel func() int } @@ -69,14 +68,14 @@ func (q *testQueue) Get(sz int) (queue.Batch, error) { return nil, nil } -func (p *testProducer) Publish(event publisher.Event) bool { +func (p *testProducer) Publish(event interface{}) bool { if p.publish != nil { return p.publish(false, event) } return false } -func (p *testProducer) TryPublish(event publisher.Event) bool { +func (p *testProducer) TryPublish(event interface{}) bool { if p.publish != nil { return p.publish(true, event) } @@ -115,7 +114,7 @@ func makeTestQueue() queue.Queue { var producer *testProducer p := blockingProducer(cfg) producer = &testProducer{ - publish: func(try bool, event publisher.Event) bool { + publish: func(try bool, event interface{}) bool { if try { return p.TryPublish(event) } @@ -147,7 +146,7 @@ func blockingProducer(_ queue.ProducerConfig) queue.Producer { waiting := atomic.MakeInt(0) return &testProducer{ - publish: func(_ bool, _ publisher.Event) bool { + publish: func(_ bool, _ interface{}) bool { waiting.Inc() <-sig return false diff --git a/libbeat/publisher/pipeline/ttl_batch.go b/libbeat/publisher/pipeline/ttl_batch.go index 4b3bdb6489b..897c0756719 100644 --- a/libbeat/publisher/pipeline/ttl_batch.go +++ b/libbeat/publisher/pipeline/ttl_batch.go @@ -18,8 +18,6 @@ package pipeline import ( - "sync" - "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) @@ -29,7 +27,9 @@ type retryer interface { } type ttlBatch struct { - original queue.Batch + // The callback to inform the queue (and possibly the producer) + // that this batch has been acknowledged. + ack func() // The internal hook back to the eventConsumer, used to implement the // publisher.Batch retry interface. @@ -43,30 +43,30 @@ type ttlBatch struct { events []publisher.Event } -var batchPool = sync.Pool{ - New: func() interface{} { - return &ttlBatch{} - }, -} - func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch { if original == nil { panic("empty batch") } - b := batchPool.Get().(*ttlBatch) - *b = ttlBatch{ - original: original, - retryer: retryer, - ttl: ttl, - events: original.Events(), + count := original.Count() + events := make([]publisher.Event, 0, count) + for i := 0; i < count; i++ { + event, ok := original.Event(i).(publisher.Event) + if ok { + // In Beats this conversion will always succeed because only + // publisher.Event objects are inserted into the queue, but + // there's no harm in making sure. + events = append(events, event) + } } - return b -} -func releaseBatch(b *ttlBatch) { - *b = ttlBatch{} // clear batch - batchPool.Put(b) + b := &ttlBatch{ + ack: original.ACK, + retryer: retryer, + ttl: ttl, + events: events, + } + return b } func (b *ttlBatch) Events() []publisher.Event { @@ -74,13 +74,11 @@ func (b *ttlBatch) Events() []publisher.Event { } func (b *ttlBatch) ACK() { - b.original.ACK() - releaseBatch(b) + b.ack() } func (b *ttlBatch) Drop() { - b.original.ACK() - releaseBatch(b) + b.ack() } func (b *ttlBatch) Retry() { diff --git a/libbeat/publisher/queue/diskqueue/benchmark_test.go b/libbeat/publisher/queue/diskqueue/benchmark_test.go index e69d6088a4c..181c6ab8c8a 100644 --- a/libbeat/publisher/queue/diskqueue/benchmark_test.go +++ b/libbeat/publisher/queue/diskqueue/benchmark_test.go @@ -101,9 +101,8 @@ func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_siz if err != nil { return err } - events := batch.Events() batch.ACK() - received = received + len(events) + received = received + batch.Count() if received == num_events { break } diff --git a/libbeat/publisher/queue/diskqueue/consumer.go b/libbeat/publisher/queue/diskqueue/consumer.go index 5c316916559..2380fd93717 100644 --- a/libbeat/publisher/queue/diskqueue/consumer.go +++ b/libbeat/publisher/queue/diskqueue/consumer.go @@ -20,7 +20,6 @@ package diskqueue import ( "fmt" - "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) @@ -83,12 +82,12 @@ eventLoop: // diskQueueBatch implementation of the queue.Batch interface // -func (batch *diskQueueBatch) Events() []publisher.Event { - events := make([]publisher.Event, len(batch.frames)) - for i, frame := range batch.frames { - events[i] = frame.event - } - return events +func (batch *diskQueueBatch) Count() int { + return len(batch.frames) +} + +func (batch *diskQueueBatch) Event(i int) interface{} { + return batch.frames[i].event } func (batch *diskQueueBatch) ACK() { diff --git a/libbeat/publisher/queue/diskqueue/producer.go b/libbeat/publisher/queue/diskqueue/producer.go index f4ff4ef2706..b7ae0be2772 100644 --- a/libbeat/publisher/queue/diskqueue/producer.go +++ b/libbeat/publisher/queue/diskqueue/producer.go @@ -18,7 +18,6 @@ package diskqueue import ( - "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) @@ -50,21 +49,21 @@ type producerWriteRequest struct { // diskQueueProducer implementation of the queue.Producer interface // -func (producer *diskQueueProducer) Publish(event publisher.Event) bool { +func (producer *diskQueueProducer) Publish(event interface{}) bool { return producer.publish(event, true) } -func (producer *diskQueueProducer) TryPublish(event publisher.Event) bool { +func (producer *diskQueueProducer) TryPublish(event interface{}) bool { return producer.publish(event, false) } func (producer *diskQueueProducer) publish( - event publisher.Event, shouldBlock bool, + event interface{}, shouldBlock bool, ) bool { if producer.cancelled { return false } - serialized, err := producer.encoder.encode(&event) + serialized, err := producer.encoder.encode(event) if err != nil { producer.queue.logger.Errorf( "Couldn't serialize incoming event: %v", err) diff --git a/libbeat/publisher/queue/diskqueue/serialize.go b/libbeat/publisher/queue/diskqueue/serialize.go index 2680b4d4eee..3c75534cdce 100644 --- a/libbeat/publisher/queue/diskqueue/serialize.go +++ b/libbeat/publisher/queue/diskqueue/serialize.go @@ -22,6 +22,7 @@ package diskqueue import ( "bytes" + "fmt" "time" "github.com/elastic/beats/v7/libbeat/beat" @@ -82,7 +83,17 @@ func (e *eventEncoder) reset() { e.folder = folder } -func (e *eventEncoder) encode(event *publisher.Event) ([]byte, error) { +func (e *eventEncoder) encode(evt interface{}) ([]byte, error) { + event, ok := evt.(publisher.Event) + if !ok { + // In order to support events of varying type, the disk queue needs + // to know how to encode them. When we decide to do this, we'll need + // to add an encoder to the settings passed in when creating a disk + // queue. See https://github.com/elastic/elastic-agent-shipper/issues/41. + // For now, just return an error. + return nil, fmt.Errorf("disk queue only supports publisher.Event") + } + e.buf.Reset() err := e.folder.Fold(entry{ @@ -131,12 +142,12 @@ func (d *eventDecoder) Buffer(n int) []byte { } func (d *eventDecoder) Decode() (publisher.Event, error) { - var ( - to entry - err error - ) + var to entry - d.unfolder.SetTarget(&to) + err := d.unfolder.SetTarget(&to) + if err != nil { + return publisher.Event{}, err + } defer d.unfolder.Reset() if d.useJSON { diff --git a/libbeat/publisher/queue/diskqueue/serialize_test.go b/libbeat/publisher/queue/diskqueue/serialize_test.go index 75a79fdefdc..64cb3e3342b 100644 --- a/libbeat/publisher/queue/diskqueue/serialize_test.go +++ b/libbeat/publisher/queue/diskqueue/serialize_test.go @@ -46,7 +46,7 @@ func TestSerialize(t *testing.T) { }, }, } - serialized, err := encoder.encode(&event) + serialized, err := encoder.encode(event) if err != nil { t.Fatalf("[%v] Couldn't encode event: %v", test.name, err) } diff --git a/libbeat/publisher/queue/memqueue/ackloop.go b/libbeat/publisher/queue/memqueue/ackloop.go index 0c6c40e5d79..1b1979dc834 100644 --- a/libbeat/publisher/queue/memqueue/ackloop.go +++ b/libbeat/publisher/queue/memqueue/ackloop.go @@ -36,8 +36,6 @@ type ackLoop struct { func (l *ackLoop) run() { var ( - // log = l.broker.logger - // Buffer up event counter in ackCount. If ackCount > 0, acks will be set to // the broker.acks channel for sending the ACKs while potentially receiving // new batches from the broker event loop. @@ -46,10 +44,11 @@ func (l *ackLoop) run() { // loop, as the ack loop will not block on any channel ackCount int ackChan chan int - sig chan batchAckMsg ) for { + nextBatchChan := l.ackChans.nextBatchChannel() + select { case <-l.broker.done: return @@ -60,25 +59,12 @@ func (l *ackLoop) run() { case chanList := <-l.broker.scheduledACKs: l.ackChans.concat(&chanList) - case <-sig: + case <-nextBatchChan: ackCount += l.handleBatchSig() if ackCount > 0 { ackChan = l.broker.ackChan } } - - // log.Debug("ackloop INFO") - // log.Debug("ackloop: total events scheduled = ", l.totalSched) - // log.Debug("ackloop: total events ack = ", l.totalACK) - // log.Debug("ackloop: total batches scheduled = ", l.batchesSched) - // log.Debug("ackloop: total batches ack = ", l.batchesACKed) - - sig = l.ackChans.channel() - // if l.sig == nil { - // log.Debug("ackloop: no ack scheduled") - // } else { - // log.Debug("ackloop: schedule ack: ", l.lst.head.seq) - // } } } diff --git a/libbeat/publisher/queue/memqueue/batchbuf.go b/libbeat/publisher/queue/memqueue/batchbuf.go index 9ba4b659ed5..53c625688f3 100644 --- a/libbeat/publisher/queue/memqueue/batchbuf.go +++ b/libbeat/publisher/queue/memqueue/batchbuf.go @@ -17,8 +17,6 @@ package memqueue -import "github.com/elastic/beats/v7/libbeat/publisher" - type queueEntry struct { event interface{} client clientState @@ -36,7 +34,7 @@ func newBatchBuffer(sz int) *batchBuffer { return b } -func (b *batchBuffer) add(event *publisher.Event, st clientState) { +func (b *batchBuffer) add(event interface{}, st clientState) { b.entries = append(b.entries, queueEntry{event, st}) } diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 3a99664e827..dafcf1f08ed 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -23,7 +23,6 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/feature" - "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" c "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -91,7 +90,7 @@ type Settings struct { type batch struct { queue *broker - events []publisher.Event + entries []queueEntry ackChan chan batchAckMsg } @@ -242,15 +241,9 @@ func (b *broker) Get(count int) (queue.Batch, error) { // if request has been sent, we have to wait for a response resp := <-responseChan - events := make([]publisher.Event, 0, len(resp.entries)) - for _, entry := range resp.entries { - if event, ok := entry.event.(*publisher.Event); ok { - events = append(events, *event) - } - } return &batch{ queue: b, - events: events, + entries: resp.entries, ackChan: resp.ackChan, }, nil } @@ -268,8 +261,7 @@ var ackChanPool = sync.Pool{ } func newBatchACKState(start, count int, entries []queueEntry) *batchACKState { - //nolint: errcheck // Return value doesn't need to be checked before conversion. - ch := ackChanPool.Get().(*batchACKState) + ch, _ := ackChanPool.Get().(*batchACKState) ch.next = nil ch.start = start ch.count = count @@ -321,7 +313,7 @@ func (l *chanList) front() *batchACKState { return l.head } -func (l *chanList) channel() chan batchAckMsg { +func (l *chanList) nextBatchChannel() chan batchAckMsg { if l.head == nil { return nil } @@ -362,8 +354,12 @@ func AdjustInputQueueSize(requested, mainQueueSize int) (actual int) { return actual } -func (b *batch) Events() []publisher.Event { - return b.events +func (b *batch) Count() int { + return len(b.entries) +} + +func (b *batch) Event(i int) interface{} { + return b.entries[i].event } func (b *batch) ACK() { diff --git a/libbeat/publisher/queue/memqueue/eventloop.go b/libbeat/publisher/queue/memqueue/eventloop.go index a2a3a16dd7d..6b16cd7aefd 100644 --- a/libbeat/publisher/queue/memqueue/eventloop.go +++ b/libbeat/publisher/queue/memqueue/eventloop.go @@ -22,6 +22,7 @@ import ( "math" "time" + "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/elastic-agent-libs/logp" ) @@ -189,14 +190,11 @@ func (l *directEventLoop) processACK(lst chanList, N int) { }() } - acks := lst.front() - start := acks.start entries := l.buf.entries - idx := start + N - 1 - if idx >= len(entries) { - idx -= len(entries) - } + firstIndex := lst.front().start + // Position the index at the end of the block of ACKed events + idx := (firstIndex + N - 1) % len(entries) total := 0 for i := N - 1; i >= 0; i-- { @@ -235,7 +233,6 @@ func (l *directEventLoop) processACK(lst chanList, N int) { N, total, )) } - client.state.cb(int(count)) client.state.lastACK = client.seq client.state = nil @@ -532,7 +529,9 @@ func reportCancelledState(log *logp.Logger, req *pushRequest) { st := req.state if cb := st.dropCB; cb != nil { - cb(req.event.Content) + if event, ok := req.event.(publisher.Event); ok { + cb(event.Content) + } } } diff --git a/libbeat/publisher/queue/memqueue/internal_api.go b/libbeat/publisher/queue/memqueue/internal_api.go index 8d4571ce225..00580d4d39a 100644 --- a/libbeat/publisher/queue/memqueue/internal_api.go +++ b/libbeat/publisher/queue/memqueue/internal_api.go @@ -17,12 +17,10 @@ package memqueue -import "github.com/elastic/beats/v7/libbeat/publisher" - // producer -> broker API type pushRequest struct { - event *publisher.Event + event interface{} seq uint32 state *produceState } diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index 762c17b1ef3..dd87c481427 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -19,7 +19,6 @@ package memqueue import ( "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/logp" ) @@ -68,16 +67,12 @@ func newProducer(b *broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel return &forgetfulProducer{broker: b, openState: openState} } -func (p *forgetfulProducer) Publish(event publisher.Event) bool { - return p.openState.publish(p.makeRequest(&event)) +func (p *forgetfulProducer) Publish(event interface{}) bool { + return p.openState.publish(pushRequest{event: event}) } -func (p *forgetfulProducer) TryPublish(event publisher.Event) bool { - return p.openState.tryPublish(p.makeRequest(&event)) -} - -func (p *forgetfulProducer) makeRequest(event *publisher.Event) pushRequest { - return pushRequest{event: event} +func (p *forgetfulProducer) TryPublish(event interface{}) bool { + return p.openState.tryPublish(pushRequest{event: event}) } func (p *forgetfulProducer) Cancel() int { @@ -85,12 +80,12 @@ func (p *forgetfulProducer) Cancel() int { return 0 } -func (p *ackProducer) Publish(event publisher.Event) bool { - return p.updSeq(p.openState.publish(p.makeRequest(&event))) +func (p *ackProducer) Publish(event interface{}) bool { + return p.updSeq(p.openState.publish(p.makeRequest(event))) } -func (p *ackProducer) TryPublish(event publisher.Event) bool { - return p.updSeq(p.openState.tryPublish(p.makeRequest(&event))) +func (p *ackProducer) TryPublish(event interface{}) bool { + return p.updSeq(p.openState.tryPublish(p.makeRequest(event))) } func (p *ackProducer) updSeq(ok bool) bool { @@ -100,7 +95,7 @@ func (p *ackProducer) updSeq(ok bool) bool { return ok } -func (p *ackProducer) makeRequest(event *publisher.Event) pushRequest { +func (p *ackProducer) makeRequest(event interface{}) pushRequest { req := pushRequest{ event: event, seq: p.seq, diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 14291c8ea4a..9ca660ba28e 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -23,7 +23,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/opt" - "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) @@ -112,14 +111,14 @@ type ProducerConfig struct { type Producer interface { // Publish adds an event to the queue, blocking if necessary, and returns // true on success. - Publish(event publisher.Event) bool + Publish(event interface{}) bool // TryPublish adds an event to the queue if doing so will not block the // caller, otherwise it immediately returns. The reasons a publish attempt // might block are defined by the specific queue implementation and its // configuration. Returns true if the event was successfully added, false // otherwise. - TryPublish(event publisher.Event) bool + TryPublish(event interface{}) bool // Cancel closes this Producer endpoint. If the producer is configured to // drop its events on Cancel, the number of dropped events is returned. @@ -132,6 +131,7 @@ type Producer interface { // Batch of events to be returned to Consumers. The `ACK` method will send the // ACK signal to the queue. type Batch interface { - Events() []publisher.Event + Count() int + Event(i int) interface{} ACK() } diff --git a/libbeat/publisher/queue/queuetest/producer_cancel.go b/libbeat/publisher/queue/queuetest/producer_cancel.go index 64310adf66f..628080854cb 100644 --- a/libbeat/publisher/queue/queuetest/producer_cancel.go +++ b/libbeat/publisher/queue/queuetest/producer_cancel.go @@ -74,14 +74,16 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) { // consume all events total := N2 - N1 - events := make([]publisher.Event, 0, total) + events := make([]interface{}, 0, total) for len(events) < total { batch, err := b.Get(-1) // collect all events if err != nil { panic(err) } - events = append(events, batch.Events()...) + for i := 0; i < batch.Count(); i++ { + events = append(events, batch.Event(i)) + } batch.ACK() } @@ -92,7 +94,9 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) { } for i, event := range events { - value, ok := event.Content.Fields["value"].(int) + pubEvent, ok := event.(publisher.Event) + assert.True(t, ok, "queue output should be the same type as its input") + value, ok := pubEvent.Content.Fields["value"].(int) assert.True(t, ok, "event.value should be an int") assert.Equal(t, i+N1, value) } diff --git a/libbeat/publisher/queue/queuetest/queuetest.go b/libbeat/publisher/queue/queuetest/queuetest.go index 29b1da7a7d2..eb65e0259d0 100644 --- a/libbeat/publisher/queue/queuetest/queuetest.go +++ b/libbeat/publisher/queue/queuetest/queuetest.go @@ -302,10 +302,9 @@ func multiConsumer(numConsumers, maxEvents, batchSize int) workerFactory { return } - collected := batch.Events() - log.Debug("consumer: process batch", len(collected)) + log.Debug("consumer: process batch", batch.Count()) - for range collected { + for j := 0; j < batch.Count(); j++ { events.Done() } batch.ACK()