Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove queue.Consumer #31502

Merged
merged 18 commits into from
May 12, 2022
2 changes: 1 addition & 1 deletion libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestClient(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)

pipeline := makePipeline(Settings{}, makeBlockingQueue())
pipeline := makePipeline(Settings{}, makeTestQueue())
defer pipeline.Close()

var ctx context.Context
Expand Down
10 changes: 1 addition & 9 deletions libbeat/publisher/pipeline/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ func (c *eventConsumer) run() {
// The output channel (and associated parameters) that will receive
// the batches we're loading.
target consumerTarget

// The queue.Consumer we get the raw batches from. Reset whenever
// the target changes.
consumer queue.Consumer = c.queue.Consumer()
)

outerLoop:
Expand All @@ -130,7 +126,7 @@ outerLoop:
if queueBatch == nil && !pendingRead {
pendingRead = true
queueReader.req <- queueReaderRequest{
consumer: consumer,
queue: c.queue,
retryer: c,
batchSize: target.batchSize,
timeToLive: target.timeToLive,
Expand Down Expand Up @@ -197,10 +193,6 @@ outerLoop:
}
}

// Close the queue.Consumer, otherwise queueReader can get blocked
// waiting on a read.
consumer.Close()

// Close the queueReader request channel so it knows to shutdown.
close(queueReader.req)

Expand Down
5 changes: 0 additions & 5 deletions libbeat/publisher/pipeline/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,6 @@ func loadOutput(
monitors Monitors,
makeOutput OutputFactory,
) (outputs.Group, error) {
log := monitors.Logger
if log == nil {
log = logp.L()
}

if publishDisabled {
return outputs.Group{}, nil
}
Expand Down
84 changes: 10 additions & 74 deletions libbeat/publisher/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,14 @@ type testQueue struct {
close func() error
bufferConfig func() queue.BufferConfig
producer func(queue.ProducerConfig) queue.Producer
consumer func() queue.Consumer
get func(sz int) (queue.Batch, error)
}

type testProducer struct {
publish func(try bool, event publisher.Event) bool
cancel func() int
}

type testConsumer struct {
get func(sz int) (queue.Batch, error)
close func() error
}

func (q *testQueue) Metrics() (queue.Metrics, error) {
return queue.Metrics{}, nil
}
Expand All @@ -67,11 +62,11 @@ func (q *testQueue) Producer(cfg queue.ProducerConfig) queue.Producer {
return nil
}

func (q *testQueue) Consumer() queue.Consumer {
if q.consumer != nil {
return q.consumer()
func (p *testQueue) Get(sz int) (queue.Batch, error) {
if p.get != nil {
return p.get(sz)
}
return nil
return nil, nil
}

func (p *testProducer) Publish(event publisher.Event) bool {
Expand All @@ -95,39 +90,14 @@ func (p *testProducer) Cancel() int {
return 0
}

func (p *testConsumer) Get(sz int) (queue.Batch, error) {
if p.get != nil {
return p.get(sz)
}
return nil, nil
}

func (p *testConsumer) Close() error {
if p.close() != nil {
return p.close()
}
return nil
}

func makeBlockingQueue() queue.Queue {
return makeTestQueue(emptyConsumer, blockingProducer)
}

func makeTestQueue(
makeConsumer func() queue.Consumer,
makeProducer func(queue.ProducerConfig) queue.Producer,
) queue.Queue {
func makeTestQueue() queue.Queue {
var mux sync.Mutex
var wg sync.WaitGroup
consumers := map[*testConsumer]struct{}{}
producers := map[queue.Producer]struct{}{}

return &testQueue{
close: func() error {
mux.Lock()
for consumer := range consumers {
consumer.Close()
}
for producer := range producers {
producer.Cancel()
}
Expand All @@ -136,34 +106,14 @@ func makeTestQueue(
wg.Wait()
return nil
},

consumer: func() queue.Consumer {
var consumer *testConsumer
c := makeConsumer()
consumer = &testConsumer{
get: func(sz int) (queue.Batch, error) { return c.Get(sz) },
close: func() error {
err := c.Close()

mux.Lock()
defer mux.Unlock()
delete(consumers, consumer)
wg.Done()

return err
},
}

mux.Lock()
defer mux.Unlock()
consumers[consumer] = struct{}{}
wg.Add(1)
return consumer
get: func(count int) (queue.Batch, error) {
//<-done
return nil, nil
},

producer: func(cfg queue.ProducerConfig) queue.Producer {
var producer *testProducer
p := makeProducer(cfg)
p := blockingProducer(cfg)
producer = &testProducer{
publish: func(try bool, event publisher.Event) bool {
if try {
Expand Down Expand Up @@ -192,20 +142,6 @@ func makeTestQueue(
}
}

func emptyConsumer() queue.Consumer {
done := make(chan struct{})
return &testConsumer{
get: func(sz int) (queue.Batch, error) {
<-done
return nil, nil
},
close: func() error {
close(done)
return nil
},
}
}

func blockingProducer(_ queue.ProducerConfig) queue.Producer {
sig := make(chan struct{})
waiting := atomic.MakeInt(0)
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/pipeline/queue_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type queueReader struct {
}

type queueReaderRequest struct {
consumer queue.Consumer
queue queue.Queue
retryer retryer
batchSize int
timeToLive int
Expand All @@ -53,7 +53,7 @@ func (qr *queueReader) run(logger *logp.Logger) {
logger.Debug("pipeline event consumer queue reader: stop")
return
}
queueBatch, _ := req.consumer.Get(req.batchSize)
queueBatch, _ := req.queue.Get(req.batchSize)
var batch *ttlBatch
if queueBatch != nil {
batch = newBatch(req.retryer, queueBatch, req.timeToLive)
Expand Down
5 changes: 0 additions & 5 deletions libbeat/publisher/pipeline/sync_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ type ISyncClient interface {

// SyncClient wraps an existing beat.Client and provide a sync interface.
type SyncClient struct {
// Chain callbacks already defined in the original ClientConfig
ackCount func(int)
ackEvents func([]interface{})
ackLastEvent func(interface{})

client beat.Client
wg sync.WaitGroup
log *logp.Logger
Expand Down
18 changes: 6 additions & 12 deletions libbeat/publisher/pipeline/sync_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,8 @@ func (d *dummyPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error)

func TestSyncClient(t *testing.T) {
receiver := func(c *dummyClient, sc *SyncClient) {
select {
case i := <-c.Received:
sc.onACK(i)
return
}
i := <-c.Received
sc.onACK(i)
}

t.Run("Publish", func(t *testing.T) {
Expand Down Expand Up @@ -120,13 +117,10 @@ func TestSyncClient(t *testing.T) {
defer sc.Close()

go func(c *dummyClient, sc *SyncClient) {
select {
case <-c.Received:
// simulate multiple acks
sc.onACK(5)
sc.onACK(5)
return
}
<-c.Received
// simulate multiple acks
sc.onACK(5)
sc.onACK(5)
}(c, sc)

err = sc.PublishAll(make([]beat.Event, 10))
Expand Down
43 changes: 10 additions & 33 deletions libbeat/publisher/queue/diskqueue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,28 @@ package diskqueue
import (
"fmt"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

type diskQueueConsumer struct {
queue *diskQueue
closed atomic.Bool
done chan struct{}
}

type diskQueueBatch struct {
queue *diskQueue
frames []*readFrame
}

//
// diskQueueConsumer implementation of the queue.Consumer interface
//

func (consumer *diskQueueConsumer) Get(eventCount int) (queue.Batch, error) {
func (queue *diskQueue) Get(eventCount int) (queue.Batch, error) {
// We can always eventually read at least one frame unless the queue or the
// consumer is closed.
var frames []*readFrame
select {
case frame, ok := <-consumer.queue.readerLoop.output:
if !ok {
return nil, fmt.Errorf("tried to read from a closed disk queue")
}
frames = []*readFrame{frame}
case <-consumer.done:
return nil, fmt.Errorf("tried to read from a closed disk queue consumer")
frame, ok := <-queue.readerLoop.output
if !ok {
return nil, fmt.Errorf("tried to read from a closed disk queue")
}
frames := []*readFrame{frame}

eventLoop:
for eventCount <= 0 || len(frames) < eventCount {
select {
case frame, ok := <-consumer.queue.readerLoop.output:
case frame, ok := <-queue.readerLoop.output:
if !ok {
// The queue was closed while we were reading it, just send back
// what we have so far.
Expand All @@ -74,7 +59,7 @@ eventLoop:
// written to readerLoop.output may have been buffered before the
// queue was closed, and we may be reading its leftovers afterwards.
// We could try to detect this case here by checking the
// consumer.queue.done channel, and return nothing if it's been closed.
// queue.done channel, and return nothing if it's been closed.
// But this gives rise to another race: maybe the queue was
// closed _after_ we read those frames, and we _ought_ to return them
// to the reader. The queue interface doesn't specify the proper
Expand All @@ -85,23 +70,15 @@ eventLoop:
// "read," so we lose no consistency by returning them. If someone closes
// the queue while we are draining the channel, nothing changes functionally
// except that any ACKs after that point will be ignored. A well-behaved
// Beats shutdown will always ACK / close its consumers before closing the
// Beats shutdown will always ACK its batches before closing the
// queue itself, so we expect this corner case not to arise in practice, but
// if it does it is innocuous.
return &diskQueueBatch{
queue: consumer.queue,
queue: queue,
frames: frames,
}, nil
}

func (consumer *diskQueueConsumer) Close() error {
if consumer.closed.Swap(true) {
return fmt.Errorf("already closed")
}
close(consumer.done)
return nil
}

//
// diskQueueBatch implementation of the queue.Batch interface
//
Expand Down
4 changes: 0 additions & 4 deletions libbeat/publisher/queue/diskqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,6 @@ func (dq *diskQueue) Producer(cfg queue.ProducerConfig) queue.Producer {
}
}

func (dq *diskQueue) Consumer() queue.Consumer {
return &diskQueueConsumer{queue: dq, done: make(chan struct{})}
}

func (dq *diskQueue) Metrics() (queue.Metrics, error) {
return queue.Metrics{}, queue.ErrMetricsNotImplemented
}
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/state_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func queuePositionFromHandle(
}
if version > currentStateFileVersion {
return queuePosition{},
fmt.Errorf("Unsupported queue metadata version (%d)", version)
fmt.Errorf("unsupported queue metadata version (%d)", version)
}

position := queuePosition{}
Expand Down
Loading