From f511d43da1b7044b421151210a7e193ef91ccc8c Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 12 May 2022 15:42:26 -0400 Subject: [PATCH 01/11] queue api can publish arbitrary event types --- libbeat/publisher/queue/diskqueue/producer.go | 9 ++++---- .../publisher/queue/diskqueue/serialize.go | 12 +++++++++- .../queue/diskqueue/serialize_test.go | 2 +- libbeat/publisher/queue/memqueue/batchbuf.go | 4 +--- libbeat/publisher/queue/memqueue/eventloop.go | 7 +++++- .../publisher/queue/memqueue/internal_api.go | 4 +--- libbeat/publisher/queue/memqueue/produce.go | 23 ++++++++----------- libbeat/publisher/queue/queue.go | 4 ++-- 8 files changed, 35 insertions(+), 30 deletions(-) diff --git a/libbeat/publisher/queue/diskqueue/producer.go b/libbeat/publisher/queue/diskqueue/producer.go index f4ff4ef2706..b7ae0be2772 100644 --- a/libbeat/publisher/queue/diskqueue/producer.go +++ b/libbeat/publisher/queue/diskqueue/producer.go @@ -18,7 +18,6 @@ package diskqueue import ( - "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) @@ -50,21 +49,21 @@ type producerWriteRequest struct { // diskQueueProducer implementation of the queue.Producer interface // -func (producer *diskQueueProducer) Publish(event publisher.Event) bool { +func (producer *diskQueueProducer) Publish(event interface{}) bool { return producer.publish(event, true) } -func (producer *diskQueueProducer) TryPublish(event publisher.Event) bool { +func (producer *diskQueueProducer) TryPublish(event interface{}) bool { return producer.publish(event, false) } func (producer *diskQueueProducer) publish( - event publisher.Event, shouldBlock bool, + event interface{}, shouldBlock bool, ) bool { if producer.cancelled { return false } - serialized, err := producer.encoder.encode(&event) + serialized, err := producer.encoder.encode(event) if err != nil { producer.queue.logger.Errorf( "Couldn't serialize incoming event: %v", err) diff --git a/libbeat/publisher/queue/diskqueue/serialize.go b/libbeat/publisher/queue/diskqueue/serialize.go index 2680b4d4eee..996c637d3c2 100644 --- a/libbeat/publisher/queue/diskqueue/serialize.go +++ b/libbeat/publisher/queue/diskqueue/serialize.go @@ -22,6 +22,7 @@ package diskqueue import ( "bytes" + "fmt" "time" "github.com/elastic/beats/v7/libbeat/beat" @@ -82,7 +83,16 @@ func (e *eventEncoder) reset() { e.folder = folder } -func (e *eventEncoder) encode(event *publisher.Event) ([]byte, error) { +func (e *eventEncoder) encode(evt interface{}) ([]byte, error) { + event, ok := evt.(publisher.Event) + if !ok { + // In order to support events of varying type, the disk queue needs + // to know how to encode them. When we decide to do this, we'll need + // to add an encoder to the settings passed in when creating a disk + // queue. For now, just return an error. + return nil, fmt.Errorf("disk queue only supports publisher.Event") + } + e.buf.Reset() err := e.folder.Fold(entry{ diff --git a/libbeat/publisher/queue/diskqueue/serialize_test.go b/libbeat/publisher/queue/diskqueue/serialize_test.go index 75a79fdefdc..64cb3e3342b 100644 --- a/libbeat/publisher/queue/diskqueue/serialize_test.go +++ b/libbeat/publisher/queue/diskqueue/serialize_test.go @@ -46,7 +46,7 @@ func TestSerialize(t *testing.T) { }, }, } - serialized, err := encoder.encode(&event) + serialized, err := encoder.encode(event) if err != nil { t.Fatalf("[%v] Couldn't encode event: %v", test.name, err) } diff --git a/libbeat/publisher/queue/memqueue/batchbuf.go b/libbeat/publisher/queue/memqueue/batchbuf.go index 9ba4b659ed5..53c625688f3 100644 --- a/libbeat/publisher/queue/memqueue/batchbuf.go +++ b/libbeat/publisher/queue/memqueue/batchbuf.go @@ -17,8 +17,6 @@ package memqueue -import "github.com/elastic/beats/v7/libbeat/publisher" - type queueEntry struct { event interface{} client clientState @@ -36,7 +34,7 @@ func newBatchBuffer(sz int) *batchBuffer { return b } -func (b *batchBuffer) add(event *publisher.Event, st clientState) { +func (b *batchBuffer) add(event interface{}, st clientState) { b.entries = append(b.entries, queueEntry{event, st}) } diff --git a/libbeat/publisher/queue/memqueue/eventloop.go b/libbeat/publisher/queue/memqueue/eventloop.go index a2a3a16dd7d..543cafe1f96 100644 --- a/libbeat/publisher/queue/memqueue/eventloop.go +++ b/libbeat/publisher/queue/memqueue/eventloop.go @@ -22,6 +22,7 @@ import ( "math" "time" + "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/elastic-agent-libs/logp" ) @@ -532,7 +533,11 @@ func reportCancelledState(log *logp.Logger, req *pushRequest) { st := req.state if cb := st.dropCB; cb != nil { - cb(req.event.Content) + // TODO(fae): should this cast be to `publisher.Event` or `*publisher.Event`? + // interface casts are strange... + if event, ok := req.event.(publisher.Event); ok { + cb(event.Content) + } } } diff --git a/libbeat/publisher/queue/memqueue/internal_api.go b/libbeat/publisher/queue/memqueue/internal_api.go index 8d4571ce225..00580d4d39a 100644 --- a/libbeat/publisher/queue/memqueue/internal_api.go +++ b/libbeat/publisher/queue/memqueue/internal_api.go @@ -17,12 +17,10 @@ package memqueue -import "github.com/elastic/beats/v7/libbeat/publisher" - // producer -> broker API type pushRequest struct { - event *publisher.Event + event interface{} seq uint32 state *produceState } diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index 762c17b1ef3..6ecf5be6af8 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -19,7 +19,6 @@ package memqueue import ( "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/logp" ) @@ -68,16 +67,12 @@ func newProducer(b *broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel return &forgetfulProducer{broker: b, openState: openState} } -func (p *forgetfulProducer) Publish(event publisher.Event) bool { - return p.openState.publish(p.makeRequest(&event)) +func (p *forgetfulProducer) Publish(event interface{}) bool { + return p.openState.publish(pushRequest{event: event}) } -func (p *forgetfulProducer) TryPublish(event publisher.Event) bool { - return p.openState.tryPublish(p.makeRequest(&event)) -} - -func (p *forgetfulProducer) makeRequest(event *publisher.Event) pushRequest { - return pushRequest{event: event} +func (p *forgetfulProducer) TryPublish(event interface{}) bool { + return p.openState.tryPublish(pushRequest{event: event}) } func (p *forgetfulProducer) Cancel() int { @@ -85,12 +80,12 @@ func (p *forgetfulProducer) Cancel() int { return 0 } -func (p *ackProducer) Publish(event publisher.Event) bool { - return p.updSeq(p.openState.publish(p.makeRequest(&event))) +func (p *ackProducer) Publish(event interface{}) bool { + return p.updSeq(p.openState.publish(pushRequest{event: event})) } -func (p *ackProducer) TryPublish(event publisher.Event) bool { - return p.updSeq(p.openState.tryPublish(p.makeRequest(&event))) +func (p *ackProducer) TryPublish(event interface{}) bool { + return p.updSeq(p.openState.tryPublish(p.makeRequest(event))) } func (p *ackProducer) updSeq(ok bool) bool { @@ -100,7 +95,7 @@ func (p *ackProducer) updSeq(ok bool) bool { return ok } -func (p *ackProducer) makeRequest(event *publisher.Event) pushRequest { +func (p *ackProducer) makeRequest(event interface{}) pushRequest { req := pushRequest{ event: event, seq: p.seq, diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 14291c8ea4a..4f2e9870740 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -112,14 +112,14 @@ type ProducerConfig struct { type Producer interface { // Publish adds an event to the queue, blocking if necessary, and returns // true on success. - Publish(event publisher.Event) bool + Publish(event interface{}) bool // TryPublish adds an event to the queue if doing so will not block the // caller, otherwise it immediately returns. The reasons a publish attempt // might block are defined by the specific queue implementation and its // configuration. Returns true if the event was successfully added, false // otherwise. - TryPublish(event publisher.Event) bool + TryPublish(event interface{}) bool // Cancel closes this Producer endpoint. If the producer is configured to // drop its events on Cancel, the number of dropped events is returned. From f08184c8341cf77626859147daf8464d9b643c90 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 18 May 2022 15:14:17 -0400 Subject: [PATCH 02/11] adjusting queue qpi --- libbeat/publisher/queue/memqueue/ackloop.go | 28 ++++++++----------- libbeat/publisher/queue/memqueue/broker.go | 14 ++++++---- libbeat/publisher/queue/memqueue/eventloop.go | 18 +++++++----- libbeat/publisher/queue/memqueue/produce.go | 5 +++- .../publisher/queue/queuetest/queuetest.go | 21 +++++++++++--- 5 files changed, 52 insertions(+), 34 deletions(-) diff --git a/libbeat/publisher/queue/memqueue/ackloop.go b/libbeat/publisher/queue/memqueue/ackloop.go index 0c6c40e5d79..f966f55c3d4 100644 --- a/libbeat/publisher/queue/memqueue/ackloop.go +++ b/libbeat/publisher/queue/memqueue/ackloop.go @@ -17,6 +17,8 @@ package memqueue +import "fmt" + // ackLoop implements the brokers asynchronous ACK worker. // Multiple concurrent ACKs from consecutive published batches will be batched up by the // worker, to reduce the number of signals to return to the producer and the @@ -36,8 +38,6 @@ type ackLoop struct { func (l *ackLoop) run() { var ( - // log = l.broker.logger - // Buffer up event counter in ackCount. If ackCount > 0, acks will be set to // the broker.acks channel for sending the ACKs while potentially receiving // new batches from the broker event loop. @@ -46,39 +46,31 @@ func (l *ackLoop) run() { // loop, as the ack loop will not block on any channel ackCount int ackChan chan int - sig chan batchAckMsg ) for { + nextBatchChan := l.ackChans.nextBatchChannel() + select { case <-l.broker.done: return case ackChan <- ackCount: + fmt.Printf("ackLoop sent %v to ackChan\n", ackCount) ackChan, ackCount = nil, 0 case chanList := <-l.broker.scheduledACKs: + fmt.Printf("ackLoop read from scheduledACKs, adding to ackChans\n") l.ackChans.concat(&chanList) - case <-sig: + case <-nextBatchChan: + fmt.Printf("ackLoop read from ackChans.channel()\n") ackCount += l.handleBatchSig() + fmt.Printf("ackCount is %d\n", ackCount) if ackCount > 0 { ackChan = l.broker.ackChan } } - - // log.Debug("ackloop INFO") - // log.Debug("ackloop: total events scheduled = ", l.totalSched) - // log.Debug("ackloop: total events ack = ", l.totalACK) - // log.Debug("ackloop: total batches scheduled = ", l.batchesSched) - // log.Debug("ackloop: total batches ack = ", l.batchesACKed) - - sig = l.ackChans.channel() - // if l.sig == nil { - // log.Debug("ackloop: no ack scheduled") - // } else { - // log.Debug("ackloop: schedule ack: ", l.lst.head.seq) - // } } } @@ -114,6 +106,7 @@ func (l *ackLoop) handleBatchSig() int { } func (l *ackLoop) collectAcked() chanList { + fmt.Printf("collectAcked\n") lst := chanList{} acks := l.ackChans.pop() @@ -124,6 +117,7 @@ func (l *ackLoop) collectAcked() chanList { acks := l.ackChans.front() select { case <-acks.ackChan: + fmt.Printf("collectAcked received on ackChan, appending to lst\n") lst.append(l.ackChans.pop()) default: diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 3a99664e827..c6db3641d5f 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -18,6 +18,7 @@ package memqueue import ( + "fmt" "io" "sync" "time" @@ -244,8 +245,10 @@ func (b *broker) Get(count int) (queue.Batch, error) { resp := <-responseChan events := make([]publisher.Event, 0, len(resp.entries)) for _, entry := range resp.entries { - if event, ok := entry.event.(*publisher.Event); ok { - events = append(events, *event) + if event, ok := entry.event.(publisher.Event); ok { + events = append(events, event) + } else { + panic("idk") } } return &batch{ @@ -268,8 +271,7 @@ var ackChanPool = sync.Pool{ } func newBatchACKState(start, count int, entries []queueEntry) *batchACKState { - //nolint: errcheck // Return value doesn't need to be checked before conversion. - ch := ackChanPool.Get().(*batchACKState) + ch, _ := ackChanPool.Get().(*batchACKState) ch.next = nil ch.start = start ch.count = count @@ -321,7 +323,7 @@ func (l *chanList) front() *batchACKState { return l.head } -func (l *chanList) channel() chan batchAckMsg { +func (l *chanList) nextBatchChannel() chan batchAckMsg { if l.head == nil { return nil } @@ -367,5 +369,7 @@ func (b *batch) Events() []publisher.Event { } func (b *batch) ACK() { + fmt.Printf("batch.ACK()\n") b.ackChan <- batchAckMsg{} + fmt.Printf("batch.ACK() done\n") } diff --git a/libbeat/publisher/queue/memqueue/eventloop.go b/libbeat/publisher/queue/memqueue/eventloop.go index 543cafe1f96..89e4bacd354 100644 --- a/libbeat/publisher/queue/memqueue/eventloop.go +++ b/libbeat/publisher/queue/memqueue/eventloop.go @@ -113,6 +113,7 @@ func (l *directEventLoop) run() { case count := <-l.broker.ackChan: // Events have been ACKed, remove them from the internal buffer. + fmt.Printf("directEventLoop got %d on ackChan\n", count) l.buf.removeEntries(count) case req := <-l.broker.cancelChan: // producer cancelling active events @@ -123,6 +124,7 @@ func (l *directEventLoop) run() { l.handleGetRequest(&req) case schedACKs <- l.pendingACKs: + fmt.Printf("sent pendingACKs to schedACKs\n") // on send complete list of pending batches has been forwarded -> clear list l.pendingACKs = chanList{} } @@ -135,10 +137,12 @@ func (l *directEventLoop) insert(req *pushRequest) { st := req.state if st == nil { + fmt.Printf("directEventLoop.insert nil state\n") l.buf.insert(req.event, clientState{}) } else if st.cancelled { reportCancelledState(log, req) } else { + fmt.Printf("directEventLoop.insert non-nil state\n") l.buf.insert(req.event, clientState{ seq: req.seq, state: st, @@ -181,6 +185,8 @@ func (l *directEventLoop) handleGetRequest(req *getRequest) { // processACK is called by the ackLoop to process the list of acked batches func (l *directEventLoop) processACK(lst chanList, N int) { + fmt.Printf("processACK(%v)\n", N) + defer fmt.Printf("processACK finished\n") log := l.broker.logger { start := time.Now() @@ -190,14 +196,11 @@ func (l *directEventLoop) processACK(lst chanList, N int) { }() } - acks := lst.front() - start := acks.start entries := l.buf.entries - idx := start + N - 1 - if idx >= len(entries) { - idx -= len(entries) - } + firstIndex := lst.front().start + // Position the index at the end of the block of ACKed events + idx := (firstIndex + N - 1) % len(entries) total := 0 for i := N - 1; i >= 0; i-- { @@ -210,6 +213,7 @@ func (l *directEventLoop) processACK(lst chanList, N int) { idx-- if client.state == nil { + fmt.Printf("no state set\n") log.Debug("no state set") continue } @@ -236,7 +240,7 @@ func (l *directEventLoop) processACK(lst chanList, N int) { N, total, )) } - + fmt.Printf("calling produceState callback\n") client.state.cb(int(count)) client.state.lastACK = client.seq client.state = nil diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index 6ecf5be6af8..325242fcf63 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -18,6 +18,8 @@ package memqueue import ( + "fmt" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/logp" @@ -59,6 +61,7 @@ func newProducer(b *broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel } if cb != nil { + fmt.Printf("creating producer with non-nil ack handler\n") p := &ackProducer{broker: b, seq: 1, dropOnCancel: dropOnCancel, openState: openState} p.state.cb = cb p.state.dropCB = dropCB @@ -81,7 +84,7 @@ func (p *forgetfulProducer) Cancel() int { } func (p *ackProducer) Publish(event interface{}) bool { - return p.updSeq(p.openState.publish(pushRequest{event: event})) + return p.updSeq(p.openState.publish(p.makeRequest(event))) } func (p *ackProducer) TryPublish(event interface{}) bool { diff --git a/libbeat/publisher/queue/queuetest/queuetest.go b/libbeat/publisher/queue/queuetest/queuetest.go index 29b1da7a7d2..4d134d4ed56 100644 --- a/libbeat/publisher/queue/queuetest/queuetest.go +++ b/libbeat/publisher/queue/queuetest/queuetest.go @@ -18,6 +18,7 @@ package queuetest import ( + "fmt" "sync" "testing" @@ -73,14 +74,14 @@ func TestMultiProducerConsumer( queueFactory QueueFactory, ) { tests := []testCase{ - { + /*{ "2 producers, 1 consumer, without ack, complete batches", multiple( makeProducer(events, false, countEvent), makeProducer(events, false, countEvent), ), makeConsumer(events*2, -1), - }, + },*/ { "2 producers, 1 consumer, all ack, complete batches", multiple( @@ -89,7 +90,7 @@ func TestMultiProducerConsumer( ), makeConsumer(events*2, -1), }, - { + /*{ "2 producers, 1 consumer, 1 ack, complete batches", multiple( makeProducer(events, true, countEvent), @@ -180,7 +181,7 @@ func TestMultiProducerConsumer( makeProducer(events, false, countEvent), ), multiConsumer(2, events*2, batchSize), - }, + },*/ } runTestCases(t, tests, queueFactory) @@ -210,7 +211,9 @@ func runTestCases(t *testing.T, tests []testCase, queueFactory QueueFactory) { go test.producers(&wg, nil, log, queue)() go test.consumers(&wg, nil, log, queue)() + fmt.Printf("waiting on wg\n") wg.Wait() + fmt.Printf("done waiting\n") })) } } @@ -242,6 +245,8 @@ func makeProducer( return func() { defer wg.Done() + fmt.Printf("start producer\n") + defer fmt.Printf("stop producer\n") log.Debug("start producer") defer log.Debug("stop producer") @@ -251,10 +256,12 @@ func makeProducer( ) if waitACK { + fmt.Printf("waitACK is true\n") ackWG.Add(maxEvents) total := 0 ackCB = func(N int) { + fmt.Printf("ackCB(%v)\n", N) total += N log.Debugf("producer ACK: N=%v, total=%v\n", N, total) @@ -296,6 +303,8 @@ func multiConsumer(numConsumers, maxEvents, batchSize int) workerFactory { b := b go func() { + fmt.Printf("start consumer\n") + defer fmt.Printf("end consumer\n") for { batch, err := b.Get(batchSize) if err != nil { @@ -303,17 +312,21 @@ func multiConsumer(numConsumers, maxEvents, batchSize int) workerFactory { } collected := batch.Events() + fmt.Printf("got batch of size %d\n", len(collected)) log.Debug("consumer: process batch", len(collected)) for range collected { events.Done() } batch.ACK() + fmt.Printf("batch acked\n") } }() } + fmt.Printf("waiting on events wg\n") events.Wait() + fmt.Printf("done waiting on events wg\n") } } } From c11b568303ffda31b3e752e0577e328f17b31c33 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 20 May 2022 11:30:39 -0400 Subject: [PATCH 03/11] Expose type-agnostic queue API --- libbeat/publisher/pipeline/pipeline_test.go | 11 ++++---- libbeat/publisher/pipeline/ttl_batch.go | 28 ++++++++++++++----- libbeat/publisher/queue/diskqueue/consumer.go | 13 ++++----- libbeat/publisher/queue/memqueue/broker.go | 24 ++++++---------- libbeat/publisher/queue/queue.go | 4 +-- .../queue/queuetest/producer_cancel.go | 10 +++++-- .../publisher/queue/queuetest/queuetest.go | 7 ++--- 7 files changed, 52 insertions(+), 45 deletions(-) diff --git a/libbeat/publisher/pipeline/pipeline_test.go b/libbeat/publisher/pipeline/pipeline_test.go index 4efff03070a..57cdbd635ea 100644 --- a/libbeat/publisher/pipeline/pipeline_test.go +++ b/libbeat/publisher/pipeline/pipeline_test.go @@ -21,7 +21,6 @@ import ( "sync" "github.com/elastic/beats/v7/libbeat/common/atomic" - "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) @@ -33,7 +32,7 @@ type testQueue struct { } type testProducer struct { - publish func(try bool, event publisher.Event) bool + publish func(try bool, event interface{}) bool cancel func() int } @@ -69,14 +68,14 @@ func (q *testQueue) Get(sz int) (queue.Batch, error) { return nil, nil } -func (p *testProducer) Publish(event publisher.Event) bool { +func (p *testProducer) Publish(event interface{}) bool { if p.publish != nil { return p.publish(false, event) } return false } -func (p *testProducer) TryPublish(event publisher.Event) bool { +func (p *testProducer) TryPublish(event interface{}) bool { if p.publish != nil { return p.publish(true, event) } @@ -115,7 +114,7 @@ func makeTestQueue() queue.Queue { var producer *testProducer p := blockingProducer(cfg) producer = &testProducer{ - publish: func(try bool, event publisher.Event) bool { + publish: func(try bool, event interface{}) bool { if try { return p.TryPublish(event) } @@ -147,7 +146,7 @@ func blockingProducer(_ queue.ProducerConfig) queue.Producer { waiting := atomic.MakeInt(0) return &testProducer{ - publish: func(_ bool, _ publisher.Event) bool { + publish: func(_ bool, _ interface{}) bool { waiting.Inc() <-sig return false diff --git a/libbeat/publisher/pipeline/ttl_batch.go b/libbeat/publisher/pipeline/ttl_batch.go index 4b3bdb6489b..8edc67d6aea 100644 --- a/libbeat/publisher/pipeline/ttl_batch.go +++ b/libbeat/publisher/pipeline/ttl_batch.go @@ -29,7 +29,9 @@ type retryer interface { } type ttlBatch struct { - original queue.Batch + // The callback to inform the queue (and possibly the producer) + // that this batch has been acknowledged. + ack func() // The internal hook back to the eventConsumer, used to implement the // publisher.Batch retry interface. @@ -54,12 +56,24 @@ func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch { panic("empty batch") } + count := original.Count() + events := make([]publisher.Event, 0, count) + for i := 0; i <= count; i++ { + event, ok := original.Event(i).(publisher.Event) + if ok { + // In Beats this conversion will always succeed because only + // publisher.Event objects are inserted into the queue, but + // there's no harm in making sure. + events = append(events, event) + } + } + b := batchPool.Get().(*ttlBatch) *b = ttlBatch{ - original: original, - retryer: retryer, - ttl: ttl, - events: original.Events(), + ack: original.ACK, + retryer: retryer, + ttl: ttl, + events: events, } return b } @@ -74,12 +88,12 @@ func (b *ttlBatch) Events() []publisher.Event { } func (b *ttlBatch) ACK() { - b.original.ACK() + b.ack() releaseBatch(b) } func (b *ttlBatch) Drop() { - b.original.ACK() + b.ack() releaseBatch(b) } diff --git a/libbeat/publisher/queue/diskqueue/consumer.go b/libbeat/publisher/queue/diskqueue/consumer.go index 5c316916559..2380fd93717 100644 --- a/libbeat/publisher/queue/diskqueue/consumer.go +++ b/libbeat/publisher/queue/diskqueue/consumer.go @@ -20,7 +20,6 @@ package diskqueue import ( "fmt" - "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) @@ -83,12 +82,12 @@ eventLoop: // diskQueueBatch implementation of the queue.Batch interface // -func (batch *diskQueueBatch) Events() []publisher.Event { - events := make([]publisher.Event, len(batch.frames)) - for i, frame := range batch.frames { - events[i] = frame.event - } - return events +func (batch *diskQueueBatch) Count() int { + return len(batch.frames) +} + +func (batch *diskQueueBatch) Event(i int) interface{} { + return batch.frames[i].event } func (batch *diskQueueBatch) ACK() { diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index c6db3641d5f..dafcf1f08ed 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -18,13 +18,11 @@ package memqueue import ( - "fmt" "io" "sync" "time" "github.com/elastic/beats/v7/libbeat/feature" - "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" c "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -92,7 +90,7 @@ type Settings struct { type batch struct { queue *broker - events []publisher.Event + entries []queueEntry ackChan chan batchAckMsg } @@ -243,17 +241,9 @@ func (b *broker) Get(count int) (queue.Batch, error) { // if request has been sent, we have to wait for a response resp := <-responseChan - events := make([]publisher.Event, 0, len(resp.entries)) - for _, entry := range resp.entries { - if event, ok := entry.event.(publisher.Event); ok { - events = append(events, event) - } else { - panic("idk") - } - } return &batch{ queue: b, - events: events, + entries: resp.entries, ackChan: resp.ackChan, }, nil } @@ -364,12 +354,14 @@ func AdjustInputQueueSize(requested, mainQueueSize int) (actual int) { return actual } -func (b *batch) Events() []publisher.Event { - return b.events +func (b *batch) Count() int { + return len(b.entries) +} + +func (b *batch) Event(i int) interface{} { + return b.entries[i].event } func (b *batch) ACK() { - fmt.Printf("batch.ACK()\n") b.ackChan <- batchAckMsg{} - fmt.Printf("batch.ACK() done\n") } diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 4f2e9870740..9ca660ba28e 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -23,7 +23,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/opt" - "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) @@ -132,6 +131,7 @@ type Producer interface { // Batch of events to be returned to Consumers. The `ACK` method will send the // ACK signal to the queue. type Batch interface { - Events() []publisher.Event + Count() int + Event(i int) interface{} ACK() } diff --git a/libbeat/publisher/queue/queuetest/producer_cancel.go b/libbeat/publisher/queue/queuetest/producer_cancel.go index 64310adf66f..628080854cb 100644 --- a/libbeat/publisher/queue/queuetest/producer_cancel.go +++ b/libbeat/publisher/queue/queuetest/producer_cancel.go @@ -74,14 +74,16 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) { // consume all events total := N2 - N1 - events := make([]publisher.Event, 0, total) + events := make([]interface{}, 0, total) for len(events) < total { batch, err := b.Get(-1) // collect all events if err != nil { panic(err) } - events = append(events, batch.Events()...) + for i := 0; i < batch.Count(); i++ { + events = append(events, batch.Event(i)) + } batch.ACK() } @@ -92,7 +94,9 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) { } for i, event := range events { - value, ok := event.Content.Fields["value"].(int) + pubEvent, ok := event.(publisher.Event) + assert.True(t, ok, "queue output should be the same type as its input") + value, ok := pubEvent.Content.Fields["value"].(int) assert.True(t, ok, "event.value should be an int") assert.Equal(t, i+N1, value) } diff --git a/libbeat/publisher/queue/queuetest/queuetest.go b/libbeat/publisher/queue/queuetest/queuetest.go index 4d134d4ed56..545d6d402b4 100644 --- a/libbeat/publisher/queue/queuetest/queuetest.go +++ b/libbeat/publisher/queue/queuetest/queuetest.go @@ -311,11 +311,10 @@ func multiConsumer(numConsumers, maxEvents, batchSize int) workerFactory { return } - collected := batch.Events() - fmt.Printf("got batch of size %d\n", len(collected)) - log.Debug("consumer: process batch", len(collected)) + fmt.Printf("got batch of size %d\n", batch.Count()) + log.Debug("consumer: process batch", batch.Count()) - for range collected { + for j := 0; j <= batch.Count(); j++ { events.Done() } batch.ACK() From 5fad678829f882729d0e1f41ca133da5f6c20da0 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 20 May 2022 13:26:27 -0400 Subject: [PATCH 04/11] remove old todo --- libbeat/publisher/queue/memqueue/eventloop.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/libbeat/publisher/queue/memqueue/eventloop.go b/libbeat/publisher/queue/memqueue/eventloop.go index 89e4bacd354..6847cae615d 100644 --- a/libbeat/publisher/queue/memqueue/eventloop.go +++ b/libbeat/publisher/queue/memqueue/eventloop.go @@ -537,8 +537,6 @@ func reportCancelledState(log *logp.Logger, req *pushRequest) { st := req.state if cb := st.dropCB; cb != nil { - // TODO(fae): should this cast be to `publisher.Event` or `*publisher.Event`? - // interface casts are strange... if event, ok := req.event.(publisher.Event); ok { cb(event.Content) } From e184644b45b95e4597bb2dd9e4767bb8d9da73bf Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 20 May 2022 13:28:53 -0400 Subject: [PATCH 05/11] remove the ttlBatch pool --- libbeat/publisher/pipeline/ttl_batch.go | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/libbeat/publisher/pipeline/ttl_batch.go b/libbeat/publisher/pipeline/ttl_batch.go index 8edc67d6aea..36c817f1cf3 100644 --- a/libbeat/publisher/pipeline/ttl_batch.go +++ b/libbeat/publisher/pipeline/ttl_batch.go @@ -18,8 +18,6 @@ package pipeline import ( - "sync" - "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) @@ -45,12 +43,6 @@ type ttlBatch struct { events []publisher.Event } -var batchPool = sync.Pool{ - New: func() interface{} { - return &ttlBatch{} - }, -} - func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch { if original == nil { panic("empty batch") @@ -68,8 +60,7 @@ func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch { } } - b := batchPool.Get().(*ttlBatch) - *b = ttlBatch{ + b := &ttlBatch{ ack: original.ACK, retryer: retryer, ttl: ttl, @@ -78,23 +69,16 @@ func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch { return b } -func releaseBatch(b *ttlBatch) { - *b = ttlBatch{} // clear batch - batchPool.Put(b) -} - func (b *ttlBatch) Events() []publisher.Event { return b.events } func (b *ttlBatch) ACK() { b.ack() - releaseBatch(b) } func (b *ttlBatch) Drop() { b.ack() - releaseBatch(b) } func (b *ttlBatch) Retry() { From 7f10c618ce96d7b10897baf899e77985434dff5e Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 23 May 2022 10:42:59 -0400 Subject: [PATCH 06/11] queue and pipeline tests pass --- libbeat/publisher/pipeline/ttl_batch.go | 2 +- libbeat/publisher/queue/memqueue/ackloop.go | 8 -------- libbeat/publisher/queue/memqueue/eventloop.go | 2 -- libbeat/publisher/queue/queuetest/queuetest.go | 14 ++++++++------ 4 files changed, 9 insertions(+), 17 deletions(-) diff --git a/libbeat/publisher/pipeline/ttl_batch.go b/libbeat/publisher/pipeline/ttl_batch.go index 36c817f1cf3..897c0756719 100644 --- a/libbeat/publisher/pipeline/ttl_batch.go +++ b/libbeat/publisher/pipeline/ttl_batch.go @@ -50,7 +50,7 @@ func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch { count := original.Count() events := make([]publisher.Event, 0, count) - for i := 0; i <= count; i++ { + for i := 0; i < count; i++ { event, ok := original.Event(i).(publisher.Event) if ok { // In Beats this conversion will always succeed because only diff --git a/libbeat/publisher/queue/memqueue/ackloop.go b/libbeat/publisher/queue/memqueue/ackloop.go index f966f55c3d4..1b1979dc834 100644 --- a/libbeat/publisher/queue/memqueue/ackloop.go +++ b/libbeat/publisher/queue/memqueue/ackloop.go @@ -17,8 +17,6 @@ package memqueue -import "fmt" - // ackLoop implements the brokers asynchronous ACK worker. // Multiple concurrent ACKs from consecutive published batches will be batched up by the // worker, to reduce the number of signals to return to the producer and the @@ -56,17 +54,13 @@ func (l *ackLoop) run() { return case ackChan <- ackCount: - fmt.Printf("ackLoop sent %v to ackChan\n", ackCount) ackChan, ackCount = nil, 0 case chanList := <-l.broker.scheduledACKs: - fmt.Printf("ackLoop read from scheduledACKs, adding to ackChans\n") l.ackChans.concat(&chanList) case <-nextBatchChan: - fmt.Printf("ackLoop read from ackChans.channel()\n") ackCount += l.handleBatchSig() - fmt.Printf("ackCount is %d\n", ackCount) if ackCount > 0 { ackChan = l.broker.ackChan } @@ -106,7 +100,6 @@ func (l *ackLoop) handleBatchSig() int { } func (l *ackLoop) collectAcked() chanList { - fmt.Printf("collectAcked\n") lst := chanList{} acks := l.ackChans.pop() @@ -117,7 +110,6 @@ func (l *ackLoop) collectAcked() chanList { acks := l.ackChans.front() select { case <-acks.ackChan: - fmt.Printf("collectAcked received on ackChan, appending to lst\n") lst.append(l.ackChans.pop()) default: diff --git a/libbeat/publisher/queue/memqueue/eventloop.go b/libbeat/publisher/queue/memqueue/eventloop.go index 6847cae615d..7203d552fb3 100644 --- a/libbeat/publisher/queue/memqueue/eventloop.go +++ b/libbeat/publisher/queue/memqueue/eventloop.go @@ -142,7 +142,6 @@ func (l *directEventLoop) insert(req *pushRequest) { } else if st.cancelled { reportCancelledState(log, req) } else { - fmt.Printf("directEventLoop.insert non-nil state\n") l.buf.insert(req.event, clientState{ seq: req.seq, state: st, @@ -213,7 +212,6 @@ func (l *directEventLoop) processACK(lst chanList, N int) { idx-- if client.state == nil { - fmt.Printf("no state set\n") log.Debug("no state set") continue } diff --git a/libbeat/publisher/queue/queuetest/queuetest.go b/libbeat/publisher/queue/queuetest/queuetest.go index 545d6d402b4..54d7f0d3ff1 100644 --- a/libbeat/publisher/queue/queuetest/queuetest.go +++ b/libbeat/publisher/queue/queuetest/queuetest.go @@ -74,14 +74,14 @@ func TestMultiProducerConsumer( queueFactory QueueFactory, ) { tests := []testCase{ - /*{ + { "2 producers, 1 consumer, without ack, complete batches", multiple( makeProducer(events, false, countEvent), makeProducer(events, false, countEvent), ), makeConsumer(events*2, -1), - },*/ + }, { "2 producers, 1 consumer, all ack, complete batches", multiple( @@ -90,7 +90,7 @@ func TestMultiProducerConsumer( ), makeConsumer(events*2, -1), }, - /*{ + { "2 producers, 1 consumer, 1 ack, complete batches", multiple( makeProducer(events, true, countEvent), @@ -181,7 +181,7 @@ func TestMultiProducerConsumer( makeProducer(events, false, countEvent), ), multiConsumer(2, events*2, batchSize), - },*/ + }, } runTestCases(t, tests, queueFactory) @@ -292,6 +292,7 @@ func multiConsumer(numConsumers, maxEvents, batchSize int) workerFactory { return func(wg *sync.WaitGroup, info interface{}, log *TestLogger, b queue.Queue) func() { wg.Add(1) return func() { + total := 0 defer wg.Done() var events sync.WaitGroup @@ -311,10 +312,11 @@ func multiConsumer(numConsumers, maxEvents, batchSize int) workerFactory { return } - fmt.Printf("got batch of size %d\n", batch.Count()) + total += batch.Count() + fmt.Printf("got batch of size %d, total %d / %d\n", batch.Count(), total, maxEvents) log.Debug("consumer: process batch", batch.Count()) - for j := 0; j <= batch.Count(); j++ { + for j := 0; j < batch.Count(); j++ { events.Done() } batch.ACK() From 85acb26ecca97a7292f2ec30359b141129d0a2f9 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 23 May 2022 10:45:20 -0400 Subject: [PATCH 07/11] remove debug prints --- libbeat/publisher/queue/memqueue/eventloop.go | 6 ------ libbeat/publisher/queue/memqueue/produce.go | 3 --- libbeat/publisher/queue/queuetest/queuetest.go | 15 --------------- 3 files changed, 24 deletions(-) diff --git a/libbeat/publisher/queue/memqueue/eventloop.go b/libbeat/publisher/queue/memqueue/eventloop.go index 7203d552fb3..6b16cd7aefd 100644 --- a/libbeat/publisher/queue/memqueue/eventloop.go +++ b/libbeat/publisher/queue/memqueue/eventloop.go @@ -113,7 +113,6 @@ func (l *directEventLoop) run() { case count := <-l.broker.ackChan: // Events have been ACKed, remove them from the internal buffer. - fmt.Printf("directEventLoop got %d on ackChan\n", count) l.buf.removeEntries(count) case req := <-l.broker.cancelChan: // producer cancelling active events @@ -124,7 +123,6 @@ func (l *directEventLoop) run() { l.handleGetRequest(&req) case schedACKs <- l.pendingACKs: - fmt.Printf("sent pendingACKs to schedACKs\n") // on send complete list of pending batches has been forwarded -> clear list l.pendingACKs = chanList{} } @@ -137,7 +135,6 @@ func (l *directEventLoop) insert(req *pushRequest) { st := req.state if st == nil { - fmt.Printf("directEventLoop.insert nil state\n") l.buf.insert(req.event, clientState{}) } else if st.cancelled { reportCancelledState(log, req) @@ -184,8 +181,6 @@ func (l *directEventLoop) handleGetRequest(req *getRequest) { // processACK is called by the ackLoop to process the list of acked batches func (l *directEventLoop) processACK(lst chanList, N int) { - fmt.Printf("processACK(%v)\n", N) - defer fmt.Printf("processACK finished\n") log := l.broker.logger { start := time.Now() @@ -238,7 +233,6 @@ func (l *directEventLoop) processACK(lst chanList, N int) { N, total, )) } - fmt.Printf("calling produceState callback\n") client.state.cb(int(count)) client.state.lastACK = client.seq client.state = nil diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index 325242fcf63..dd87c481427 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -18,8 +18,6 @@ package memqueue import ( - "fmt" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/logp" @@ -61,7 +59,6 @@ func newProducer(b *broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel } if cb != nil { - fmt.Printf("creating producer with non-nil ack handler\n") p := &ackProducer{broker: b, seq: 1, dropOnCancel: dropOnCancel, openState: openState} p.state.cb = cb p.state.dropCB = dropCB diff --git a/libbeat/publisher/queue/queuetest/queuetest.go b/libbeat/publisher/queue/queuetest/queuetest.go index 54d7f0d3ff1..eb65e0259d0 100644 --- a/libbeat/publisher/queue/queuetest/queuetest.go +++ b/libbeat/publisher/queue/queuetest/queuetest.go @@ -18,7 +18,6 @@ package queuetest import ( - "fmt" "sync" "testing" @@ -211,9 +210,7 @@ func runTestCases(t *testing.T, tests []testCase, queueFactory QueueFactory) { go test.producers(&wg, nil, log, queue)() go test.consumers(&wg, nil, log, queue)() - fmt.Printf("waiting on wg\n") wg.Wait() - fmt.Printf("done waiting\n") })) } } @@ -245,8 +242,6 @@ func makeProducer( return func() { defer wg.Done() - fmt.Printf("start producer\n") - defer fmt.Printf("stop producer\n") log.Debug("start producer") defer log.Debug("stop producer") @@ -256,12 +251,10 @@ func makeProducer( ) if waitACK { - fmt.Printf("waitACK is true\n") ackWG.Add(maxEvents) total := 0 ackCB = func(N int) { - fmt.Printf("ackCB(%v)\n", N) total += N log.Debugf("producer ACK: N=%v, total=%v\n", N, total) @@ -292,7 +285,6 @@ func multiConsumer(numConsumers, maxEvents, batchSize int) workerFactory { return func(wg *sync.WaitGroup, info interface{}, log *TestLogger, b queue.Queue) func() { wg.Add(1) return func() { - total := 0 defer wg.Done() var events sync.WaitGroup @@ -304,30 +296,23 @@ func multiConsumer(numConsumers, maxEvents, batchSize int) workerFactory { b := b go func() { - fmt.Printf("start consumer\n") - defer fmt.Printf("end consumer\n") for { batch, err := b.Get(batchSize) if err != nil { return } - total += batch.Count() - fmt.Printf("got batch of size %d, total %d / %d\n", batch.Count(), total, maxEvents) log.Debug("consumer: process batch", batch.Count()) for j := 0; j < batch.Count(); j++ { events.Done() } batch.ACK() - fmt.Printf("batch acked\n") } }() } - fmt.Printf("waiting on events wg\n") events.Wait() - fmt.Printf("done waiting on events wg\n") } } } From ffd434aeacfe70ce5a6f6e344d6c7145fdea9409 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 23 May 2022 10:48:04 -0400 Subject: [PATCH 08/11] link outstanding issue in comment --- libbeat/publisher/queue/diskqueue/serialize.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libbeat/publisher/queue/diskqueue/serialize.go b/libbeat/publisher/queue/diskqueue/serialize.go index 996c637d3c2..13b52732ba3 100644 --- a/libbeat/publisher/queue/diskqueue/serialize.go +++ b/libbeat/publisher/queue/diskqueue/serialize.go @@ -89,7 +89,8 @@ func (e *eventEncoder) encode(evt interface{}) ([]byte, error) { // In order to support events of varying type, the disk queue needs // to know how to encode them. When we decide to do this, we'll need // to add an encoder to the settings passed in when creating a disk - // queue. For now, just return an error. + // queue. See https://github.com/elastic/elastic-agent-shipper/issues/41. + // For now, just return an error. return nil, fmt.Errorf("disk queue only supports publisher.Event") } From 5b67bfdaad447966d491593c9f9690567971d480 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 23 May 2022 11:20:37 -0400 Subject: [PATCH 09/11] update new test to the modified batch api --- libbeat/publisher/queue/diskqueue/benchmark_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/libbeat/publisher/queue/diskqueue/benchmark_test.go b/libbeat/publisher/queue/diskqueue/benchmark_test.go index e69d6088a4c..181c6ab8c8a 100644 --- a/libbeat/publisher/queue/diskqueue/benchmark_test.go +++ b/libbeat/publisher/queue/diskqueue/benchmark_test.go @@ -101,9 +101,8 @@ func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_siz if err != nil { return err } - events := batch.Events() batch.ACK() - received = received + len(events) + received = received + batch.Count() if received == num_events { break } From af7a0160f400432cd560df33da72306ad10d269f Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 23 May 2022 11:23:13 -0400 Subject: [PATCH 10/11] update developer changelog --- CHANGELOG-developer.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 2438268c106..2c04e5965d3 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -62,6 +62,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Wildcard fields no longer have a default ignore_above setting of 1024. {issue}30096[30096] {pull}30668[30668] - Remove `common.MapStr` and use `mapstr.M` from `github.com/elastic/elastic-agent-libs` instead. {pull}31420[31420] - Remove `queue.Consumer`. Queues can now be read via a `Get` call directly on the queue object. {pull}31502[31502] +- The `queue.Batch` API now provides access to individual events instead of an array. {pull}31699[31699] ==== Bugfixes From 47fa7283095e7e390e5ceeaae5636d4291c9735a Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 23 May 2022 17:12:43 -0400 Subject: [PATCH 11/11] lint --- libbeat/publisher/queue/diskqueue/serialize.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/libbeat/publisher/queue/diskqueue/serialize.go b/libbeat/publisher/queue/diskqueue/serialize.go index 13b52732ba3..3c75534cdce 100644 --- a/libbeat/publisher/queue/diskqueue/serialize.go +++ b/libbeat/publisher/queue/diskqueue/serialize.go @@ -142,12 +142,12 @@ func (d *eventDecoder) Buffer(n int) []byte { } func (d *eventDecoder) Decode() (publisher.Event, error) { - var ( - to entry - err error - ) + var to entry - d.unfolder.SetTarget(&to) + err := d.unfolder.SetTarget(&to) + if err != nil { + return publisher.Event{}, err + } defer d.unfolder.Reset() if d.useJSON {