Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the queue's public API accept interface{} instead of publisher.Event #31699

Merged
merged 12 commits into from
May 26, 2022
11 changes: 5 additions & 6 deletions libbeat/publisher/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

Expand All @@ -33,7 +32,7 @@ type testQueue struct {
}

type testProducer struct {
publish func(try bool, event publisher.Event) bool
publish func(try bool, event interface{}) bool
cancel func() int
}

Expand Down Expand Up @@ -69,14 +68,14 @@ func (q *testQueue) Get(sz int) (queue.Batch, error) {
return nil, nil
}

func (p *testProducer) Publish(event publisher.Event) bool {
func (p *testProducer) Publish(event interface{}) bool {
if p.publish != nil {
return p.publish(false, event)
}
return false
}

func (p *testProducer) TryPublish(event publisher.Event) bool {
func (p *testProducer) TryPublish(event interface{}) bool {
if p.publish != nil {
return p.publish(true, event)
}
Expand Down Expand Up @@ -115,7 +114,7 @@ func makeTestQueue() queue.Queue {
var producer *testProducer
p := blockingProducer(cfg)
producer = &testProducer{
publish: func(try bool, event publisher.Event) bool {
publish: func(try bool, event interface{}) bool {
if try {
return p.TryPublish(event)
}
Expand Down Expand Up @@ -147,7 +146,7 @@ func blockingProducer(_ queue.ProducerConfig) queue.Producer {
waiting := atomic.MakeInt(0)

return &testProducer{
publish: func(_ bool, _ publisher.Event) bool {
publish: func(_ bool, _ interface{}) bool {
waiting.Inc()
<-sig
return false
Expand Down
46 changes: 22 additions & 24 deletions libbeat/publisher/pipeline/ttl_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package pipeline

import (
"sync"

"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)
Expand All @@ -29,7 +27,9 @@ type retryer interface {
}

type ttlBatch struct {
original queue.Batch
// The callback to inform the queue (and possibly the producer)
// that this batch has been acknowledged.
ack func()

// The internal hook back to the eventConsumer, used to implement the
// publisher.Batch retry interface.
Expand All @@ -43,44 +43,42 @@ type ttlBatch struct {
events []publisher.Event
}

var batchPool = sync.Pool{
New: func() interface{} {
return &ttlBatch{}
},
}

func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch {
if original == nil {
panic("empty batch")
}

b := batchPool.Get().(*ttlBatch)
*b = ttlBatch{
original: original,
retryer: retryer,
ttl: ttl,
events: original.Events(),
count := original.Count()
events := make([]publisher.Event, 0, count)
for i := 0; i <= count; i++ {
event, ok := original.Event(i).(publisher.Event)
if ok {
// In Beats this conversion will always succeed because only
// publisher.Event objects are inserted into the queue, but
// there's no harm in making sure.
events = append(events, event)
}
}
return b
}

func releaseBatch(b *ttlBatch) {
*b = ttlBatch{} // clear batch
batchPool.Put(b)
b := &ttlBatch{
ack: original.ACK,
retryer: retryer,
ttl: ttl,
events: events,
}
return b
}

func (b *ttlBatch) Events() []publisher.Event {
return b.events
}

func (b *ttlBatch) ACK() {
b.original.ACK()
releaseBatch(b)
b.ack()
}

func (b *ttlBatch) Drop() {
b.original.ACK()
releaseBatch(b)
b.ack()
}

func (b *ttlBatch) Retry() {
Expand Down
13 changes: 6 additions & 7 deletions libbeat/publisher/queue/diskqueue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package diskqueue
import (
"fmt"

"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

Expand Down Expand Up @@ -83,12 +82,12 @@ eventLoop:
// diskQueueBatch implementation of the queue.Batch interface
//

func (batch *diskQueueBatch) Events() []publisher.Event {
events := make([]publisher.Event, len(batch.frames))
for i, frame := range batch.frames {
events[i] = frame.event
}
return events
func (batch *diskQueueBatch) Count() int {
return len(batch.frames)
}

func (batch *diskQueueBatch) Event(i int) interface{} {
return batch.frames[i].event
}

func (batch *diskQueueBatch) ACK() {
Expand Down
9 changes: 4 additions & 5 deletions libbeat/publisher/queue/diskqueue/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package diskqueue

import (
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

Expand Down Expand Up @@ -50,21 +49,21 @@ type producerWriteRequest struct {
// diskQueueProducer implementation of the queue.Producer interface
//

func (producer *diskQueueProducer) Publish(event publisher.Event) bool {
func (producer *diskQueueProducer) Publish(event interface{}) bool {
return producer.publish(event, true)
}

func (producer *diskQueueProducer) TryPublish(event publisher.Event) bool {
func (producer *diskQueueProducer) TryPublish(event interface{}) bool {
return producer.publish(event, false)
}

func (producer *diskQueueProducer) publish(
event publisher.Event, shouldBlock bool,
event interface{}, shouldBlock bool,
) bool {
if producer.cancelled {
return false
}
serialized, err := producer.encoder.encode(&event)
serialized, err := producer.encoder.encode(event)
if err != nil {
producer.queue.logger.Errorf(
"Couldn't serialize incoming event: %v", err)
Expand Down
12 changes: 11 additions & 1 deletion libbeat/publisher/queue/diskqueue/serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package diskqueue

import (
"bytes"
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -82,7 +83,16 @@ func (e *eventEncoder) reset() {
e.folder = folder
}

func (e *eventEncoder) encode(event *publisher.Event) ([]byte, error) {
func (e *eventEncoder) encode(evt interface{}) ([]byte, error) {
event, ok := evt.(publisher.Event)
if !ok {
// In order to support events of varying type, the disk queue needs
// to know how to encode them. When we decide to do this, we'll need
// to add an encoder to the settings passed in when creating a disk
// queue. For now, just return an error.
return nil, fmt.Errorf("disk queue only supports publisher.Event")
cmacknz marked this conversation as resolved.
Show resolved Hide resolved
}

e.buf.Reset()

err := e.folder.Fold(entry{
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/serialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestSerialize(t *testing.T) {
},
},
}
serialized, err := encoder.encode(&event)
serialized, err := encoder.encode(event)
if err != nil {
t.Fatalf("[%v] Couldn't encode event: %v", test.name, err)
}
Expand Down
28 changes: 11 additions & 17 deletions libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package memqueue

import "fmt"

// ackLoop implements the brokers asynchronous ACK worker.
// Multiple concurrent ACKs from consecutive published batches will be batched up by the
// worker, to reduce the number of signals to return to the producer and the
Expand All @@ -36,8 +38,6 @@ type ackLoop struct {

func (l *ackLoop) run() {
var (
// log = l.broker.logger

// Buffer up event counter in ackCount. If ackCount > 0, acks will be set to
// the broker.acks channel for sending the ACKs while potentially receiving
// new batches from the broker event loop.
Expand All @@ -46,39 +46,31 @@ func (l *ackLoop) run() {
// loop, as the ack loop will not block on any channel
ackCount int
ackChan chan int
sig chan batchAckMsg
)

for {
nextBatchChan := l.ackChans.nextBatchChannel()

select {
case <-l.broker.done:
return

case ackChan <- ackCount:
fmt.Printf("ackLoop sent %v to ackChan\n", ackCount)
ackChan, ackCount = nil, 0

case chanList := <-l.broker.scheduledACKs:
fmt.Printf("ackLoop read from scheduledACKs, adding to ackChans\n")
l.ackChans.concat(&chanList)

case <-sig:
case <-nextBatchChan:
fmt.Printf("ackLoop read from ackChans.channel()\n")
ackCount += l.handleBatchSig()
fmt.Printf("ackCount is %d\n", ackCount)
if ackCount > 0 {
ackChan = l.broker.ackChan
}
}

// log.Debug("ackloop INFO")
// log.Debug("ackloop: total events scheduled = ", l.totalSched)
// log.Debug("ackloop: total events ack = ", l.totalACK)
// log.Debug("ackloop: total batches scheduled = ", l.batchesSched)
// log.Debug("ackloop: total batches ack = ", l.batchesACKed)

sig = l.ackChans.channel()
// if l.sig == nil {
// log.Debug("ackloop: no ack scheduled")
// } else {
// log.Debug("ackloop: schedule ack: ", l.lst.head.seq)
// }
}
}

Expand Down Expand Up @@ -114,6 +106,7 @@ func (l *ackLoop) handleBatchSig() int {
}

func (l *ackLoop) collectAcked() chanList {
fmt.Printf("collectAcked\n")
lst := chanList{}

acks := l.ackChans.pop()
Expand All @@ -124,6 +117,7 @@ func (l *ackLoop) collectAcked() chanList {
acks := l.ackChans.front()
select {
case <-acks.ackChan:
fmt.Printf("collectAcked received on ackChan, appending to lst\n")
lst.append(l.ackChans.pop())

default:
Expand Down
4 changes: 1 addition & 3 deletions libbeat/publisher/queue/memqueue/batchbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package memqueue

import "github.com/elastic/beats/v7/libbeat/publisher"

type queueEntry struct {
event interface{}
client clientState
Expand All @@ -36,7 +34,7 @@ func newBatchBuffer(sz int) *batchBuffer {
return b
}

func (b *batchBuffer) add(event *publisher.Event, st clientState) {
func (b *batchBuffer) add(event interface{}, st clientState) {
b.entries = append(b.entries, queueEntry{event, st})
}

Expand Down
24 changes: 10 additions & 14 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
c "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -91,7 +90,7 @@ type Settings struct {

type batch struct {
queue *broker
events []publisher.Event
entries []queueEntry
ackChan chan batchAckMsg
}

Expand Down Expand Up @@ -242,15 +241,9 @@ func (b *broker) Get(count int) (queue.Batch, error) {

// if request has been sent, we have to wait for a response
resp := <-responseChan
events := make([]publisher.Event, 0, len(resp.entries))
for _, entry := range resp.entries {
if event, ok := entry.event.(*publisher.Event); ok {
events = append(events, *event)
}
}
return &batch{
queue: b,
events: events,
entries: resp.entries,
ackChan: resp.ackChan,
}, nil
}
Expand All @@ -268,8 +261,7 @@ var ackChanPool = sync.Pool{
}

func newBatchACKState(start, count int, entries []queueEntry) *batchACKState {
//nolint: errcheck // Return value doesn't need to be checked before conversion.
ch := ackChanPool.Get().(*batchACKState)
ch, _ := ackChanPool.Get().(*batchACKState)
ch.next = nil
ch.start = start
ch.count = count
Expand Down Expand Up @@ -321,7 +313,7 @@ func (l *chanList) front() *batchACKState {
return l.head
}

func (l *chanList) channel() chan batchAckMsg {
func (l *chanList) nextBatchChannel() chan batchAckMsg {
if l.head == nil {
return nil
}
Expand Down Expand Up @@ -362,8 +354,12 @@ func AdjustInputQueueSize(requested, mainQueueSize int) (actual int) {
return actual
}

func (b *batch) Events() []publisher.Event {
return b.events
func (b *batch) Count() int {
return len(b.entries)
}

func (b *batch) Event(i int) interface{} {
return b.entries[i].event
}

func (b *batch) ACK() {
Expand Down
Loading