Skip to content

Commit

Permalink
Add a proxy queue to avoid double-queueing every event when using the…
Browse files Browse the repository at this point in the history
… shipper output (#34377)
  • Loading branch information
faec authored and chrisberkhout committed Jun 1, 2023
1 parent 18aa0e9 commit f1e8bbd
Show file tree
Hide file tree
Showing 17 changed files with 1,873 additions and 103 deletions.
2 changes: 2 additions & 0 deletions libbeat/outputs/outest/batch.go
Expand Up @@ -71,6 +71,8 @@ func (b *Batch) RetryEvents(events []publisher.Event) {
b.doSignal(BatchSignal{Tag: BatchRetryEvents, Events: events})
}

func (b *Batch) FreeEntries() {}

func (b *Batch) Cancelled() {
b.doSignal(BatchSignal{Tag: BatchCancelled})
}
Expand Down
229 changes: 137 additions & 92 deletions libbeat/outputs/shipper/shipper.go
Expand Up @@ -46,26 +46,37 @@ import (
type pendingBatch struct {
batch publisher.Batch
index uint64
serverID string
eventCount int
droppedCount int
}

type shipper struct {
log *logp.Logger
observer outputs.Observer

config Config
config Config

conn *grpc.ClientConn
client sc.ProducerClient
ackClient sc.Producer_PersistedIndexClient

serverID string

pending []pendingBatch
pendingMutex sync.Mutex
// The publish function sends to ackLoopChan to notify the ack worker of
// new pending batches
ackBatchChan chan pendingBatch

// The ack RPC listener sends to ackIndexChan to notify the ack worker
// of the new persisted index
ackIndexChan chan uint64

conn *grpc.ClientConn
client sc.ProducerClient
clientMutex sync.Mutex
// ackWaitGroup is used to synchronize the shutdown of the ack listener
// and the ack worker when a connection is closed.
ackWaitGroup sync.WaitGroup

backgroundCtx context.Context
backgroundCancel func()
// ackCancel cancels the context for the ack listener and the ack worker,
// notifying them to shut down.
ackCancel context.CancelFunc
}

func init() {
Expand All @@ -91,9 +102,6 @@ func makeShipper(
config: config,
}

// for `Close` function to stop all the background work like acknowledgment loop
s.backgroundCtx, s.backgroundCancel = context.WithCancel(context.Background())

swb := outputs.WithBackoff(s, config.Backoff.Init, config.Backoff.Max)

return outputs.Success(config.BulkMaxSize, config.MaxRetries, swb)
Expand Down Expand Up @@ -121,68 +129,39 @@ func (s *shipper) Connect() error {
grpc.WithTransportCredentials(creds),
}

ctx, cancel := context.WithTimeout(context.Background(), s.config.Timeout)
defer cancel()

s.log.Debugf("trying to connect to %s...", s.config.Server)

ctx, cancel := context.WithTimeout(context.Background(), s.config.Timeout)
defer cancel()
conn, err := grpc.DialContext(ctx, s.config.Server, opts...)
if err != nil {
return fmt.Errorf("shipper connection failed with: %w", err)
}

s.conn = conn
s.clientMutex.Lock()
defer s.clientMutex.Unlock()

s.client = sc.NewProducerClient(conn)

// we don't need a timeout context here anymore, we use the
// `s.backgroundCtx` instead, it's going to be a long running client
ackCtx, ackCancel := context.WithCancel(s.backgroundCtx)
defer func() {
// in case we return an error before we start the `ackLoop`
// then we don't need this client anymore and must close the stream
if err != nil {
ackCancel()
}
}()

indexClient, err := s.client.PersistedIndex(ackCtx, &messages.PersistedIndexRequest{
PollingInterval: durationpb.New(s.config.AckPollingInterval),
})
if err != nil {
return fmt.Errorf("failed to connect to the server: %w", err)
}
indexReply, err := indexClient.Recv()
if err != nil {
return fmt.Errorf("failed to fetch server information: %w", err)
}
s.serverID = indexReply.GetUuid()

s.log.Debugf("connection to %s (%s) established.", s.config.Server, s.serverID)

go func() {
defer ackCancel()
s.log.Debugf("starting acknowledgment loop with server %s", s.serverID)
// the loop returns only in case of error
err := s.ackLoop(s.backgroundCtx, indexClient)
s.log.Errorf("acknowledgment loop stopped: %s", err)
}()

return nil
return s.startACKLoop()
}

// Publish converts and sends a batch of events to the shipper server.
// Also, implements `outputs.Client`
func (s *shipper) Publish(ctx context.Context, batch publisher.Batch) error {
if s.client == nil {
err := s.publish(ctx, batch)
if err != nil {
// If there was an error then we are dropping our connection.
s.Close()
}
return err
}

func (s *shipper) publish(ctx context.Context, batch publisher.Batch) error {
if s.conn == nil {
return fmt.Errorf("connection is not established")
}

st := s.observer
events := batch.Events()
st.NewBatch(len(events))
s.observer.NewBatch(len(events))

toSend := make([]*messages.Event, 0, len(events))

Expand All @@ -204,7 +183,7 @@ func (s *shipper) Publish(ctx context.Context, batch publisher.Batch) error {

convertedCount := len(toSend)

st.Dropped(droppedCount)
s.observer.Dropped(droppedCount)
s.log.Debugf("%d events converted to protobuf, %d dropped", convertedCount, droppedCount)

var lastAcceptedIndex uint64
Expand All @@ -219,8 +198,8 @@ func (s *shipper) Publish(ctx context.Context, batch publisher.Batch) error {
})

if status.Code(err) != codes.OK {
batch.Cancelled() // does not decrease the TTL
st.Cancelled(len(events)) // we cancel the whole batch not just non-dropped events
batch.Cancelled() // does not decrease the TTL
s.observer.Cancelled(len(events)) // we cancel the whole batch not just non-dropped events
return fmt.Errorf("failed to publish the batch to the shipper, none of the %d events were accepted: %w", len(toSend), err)
}

Expand All @@ -239,29 +218,31 @@ func (s *shipper) Publish(ctx context.Context, batch publisher.Batch) error {

s.log.Debugf("total of %d events have been accepted from batch, %d dropped", convertedCount, droppedCount)

s.pendingMutex.Lock()
s.pending = append(s.pending, pendingBatch{
// We've sent as much as we can to the shipper, release the batch's events and
// save it in the queue of batches awaiting acknowledgment.
batch.FreeEntries()
s.ackBatchChan <- pendingBatch{
batch: batch,
index: lastAcceptedIndex,
serverID: s.serverID,
eventCount: len(events),
droppedCount: droppedCount,
})
s.pendingMutex.Unlock()
}

return nil
}

// Close closes the connection to the shipper server.
// Also, implements `outputs.Client`
func (s *shipper) Close() error {
if s.client == nil {
if s.conn == nil {
return fmt.Errorf("connection is not established")
}
s.backgroundCancel()
s.ackCancel()
s.ackWaitGroup.Wait()

err := s.conn.Close()
s.conn = nil
s.client = nil
s.pending = nil

return err
}
Expand All @@ -271,53 +252,117 @@ func (s *shipper) String() string {
return "shipper"
}

func (s *shipper) ackLoop(ctx context.Context, ackClient sc.Producer_PersistedIndexClient) error {
st := s.observer
func (s *shipper) startACKLoop() error {
ctx, cancel := context.WithCancel(context.Background())
s.ackCancel = cancel

indexClient, err := s.client.PersistedIndex(ctx, &messages.PersistedIndexRequest{
PollingInterval: durationpb.New(s.config.AckPollingInterval),
})
if err != nil {
return fmt.Errorf("failed to connect to the server: %w", err)
}
indexReply, err := indexClient.Recv()
if err != nil {
return fmt.Errorf("failed to fetch server information: %w", err)
}
s.serverID = indexReply.GetUuid()

s.log.Debugf("connection to %s (%s) established.", s.config.Server, s.serverID)

s.ackClient = indexClient
s.ackBatchChan = make(chan pendingBatch)
s.ackIndexChan = make(chan uint64)
s.ackWaitGroup.Add(2)

go func() {
s.ackWorker(ctx)
s.ackWaitGroup.Done()
}()

go func() {
err := s.ackListener(ctx)
s.ackWaitGroup.Done()
if err != nil {
s.log.Errorf("acknowledgment listener stopped: %s", err)

// Shut down the connection and clear the output metadata.
// This will not propagate back to the pipeline immediately,
// but the next time Publish is called it will return an error
// because there is no connection, which will signal the pipeline
// to try to revive this output worker via Connect().
s.Close()
}
}()

return nil
}

// ackListener's only job is to listen to the persisted index RPC stream
// and forward its values to the ack worker.
func (s *shipper) ackListener(ctx context.Context) error {
s.log.Debugf("starting acknowledgment listener with server %s", s.serverID)
for {
select {
indexReply, err := s.ackClient.Recv()
if err != nil {
select {
case <-ctx.Done():
// If our context has been closed, this is an intentional closed
// connection, so don't return the error.
return nil
default:
// If the context itself is not closed then this means a real
// connection error.
return fmt.Errorf("ack listener closed connection: %w", err)
}
}
s.ackIndexChan <- indexReply.PersistedIndex
}
}

case <-ctx.Done():
return ctx.Err()
// ackWorker listens for newly published batches awaiting acknowledgment,
// and for new persisted indexes that should be forwarded to already-published
// batches.
func (s *shipper) ackWorker(ctx context.Context) {
s.log.Debugf("starting acknowledgment loop with server %s", s.serverID)

default:
// this sends an update and unblocks only if the `PersistedIndex` value has changed
indexReply, err := ackClient.Recv()
if err != nil {
return fmt.Errorf("acknowledgment failed due to the connectivity error: %w", err)
pending := []pendingBatch{}
for {
select {
case <-ctx.Done():
// If there are any pending batches left when the ack loop returns, then
// they will never be acknowledged, so send the cancel signal.
for _, p := range pending {
p.batch.Cancelled()
}
return

s.pendingMutex.Lock()
lastProcessed := 0
for _, p := range s.pending {
if p.serverID != indexReply.Uuid {
s.log.Errorf("acknowledgment failed due to a connection to a different server %s, batch was accepted by %s", indexReply.Uuid, p.serverID)
p.batch.Cancelled()
st.Cancelled(len(p.batch.Events()))
lastProcessed++
continue
}
case newBatch := <-s.ackBatchChan:
pending = append(pending, newBatch)

case newIndex := <-s.ackIndexChan:
lastProcessed := 0
for _, p := range pending {
// if we met a batch that is ahead of the persisted index
// we stop iterating and wait for another update from the server.
// The latest pending batch has the max(AcceptedIndex).
if p.index > indexReply.PersistedIndex {
if p.index > newIndex {
break
}

p.batch.ACK()
ackedCount := len(p.batch.Events()) - p.droppedCount
st.Acked(ackedCount)
ackedCount := p.eventCount - p.droppedCount
s.observer.Acked(ackedCount)
s.log.Debugf("%d events have been acknowledged, %d dropped", ackedCount, p.droppedCount)
lastProcessed++
}
// so we don't perform any manipulation when the pending list is empty
// or none of the batches were acknowledged by this persisted index update
if lastProcessed != 0 {
copy(s.pending[0:], s.pending[lastProcessed:])
s.pending = s.pending[lastProcessed:]
remaining := len(pending) - lastProcessed
copy(pending[0:], pending[lastProcessed:])
pending = pending[:remaining]
}
s.pendingMutex.Unlock()
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions libbeat/publisher/event.go
Expand Up @@ -41,6 +41,17 @@ type Batch interface {
// Try sending the events in this list again; all others are acknowledged.
RetryEvents(events []Event)

// Release the internal pointer to this batch's events but do not yet
// acknowledge this batch. This exists specifically for the shipper output,
// where there is potentially a long gap between when events are handed off
// to the shipper and when they are acknowledged upstream; during that time,
// we need to preserve batch metadata for producer end-to-end acknowledgments,
// but we do not need the events themselves since they are already queued by
// the shipper. It is only guaranteed to release event pointers when using the
// proxy queue.
// Never call this on a batch that might be retried.
FreeEntries()

// Send was aborted, try again but don't decrease the batch's TTL counter.
Cancelled()
}
Expand All @@ -64,6 +75,7 @@ type EventCache struct {

// Put lets outputs put key-value pairs into the event cache
func (ec *EventCache) Put(key string, value interface{}) (interface{}, error) {
//nolint:typecheck // Nil checks are ok here
if ec.m == nil {
// uninitialized map
ec.m = mapstr.M{}
Expand All @@ -74,6 +86,7 @@ func (ec *EventCache) Put(key string, value interface{}) (interface{}, error) {

// GetValue lets outputs retrieve values from the event cache by key
func (ec *EventCache) GetValue(key string) (interface{}, error) {
//nolint:typecheck // Nil checks are ok here
if ec.m == nil {
// uninitialized map
return nil, mapstr.ErrKeyNotFound
Expand Down

0 comments on commit f1e8bbd

Please sign in to comment.