Skip to content

Commit

Permalink
x-pack/filebeat/input/http_endpoint: add e2e acking
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed May 8, 2024
1 parent 9a800db commit 4f5f26d
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 17 deletions.
77 changes: 77 additions & 0 deletions x-pack/filebeat/input/http_endpoint/ack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package http_endpoint

import (
"sync"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
)

// newEventACKHandler returns a beat ACKer that can receive callbacks when
// an event has been ACKed an output. If the event contains a private metadata
// pointing to a batchACKTracker then it will invoke the tracker's ACK() method
// to decrement the number of pending ACKs.
func newEventACKHandler() beat.EventListener {
return acker.ConnectionOnly(
acker.EventPrivateReporter(func(_ int, privates []interface{}) {
for _, private := range privates {
if ack, ok := private.(*batchACKTracker); ok {
ack.ACK()
}
}
}),
)
}

// batchACKTracker invokes batchACK when all events associated to the batch
// have been published and acknowledged by an output.
type batchACKTracker struct {
batchACK func()

mu sync.Mutex
pending int64
}

// newBatchACKTracker returns a new batchACKTracker. The provided batchACK function
// is invoked after the full batch has been acknowledged. Ready() must be invoked
// after all events in the batch are published.
func newBatchACKTracker(fn func()) *batchACKTracker {
return &batchACKTracker{
batchACK: fn,
pending: 1, // Ready() must be called to consume this "1".
}
}

// Ready signals that the batch has been fully consumed. Only
// after the batch is marked as "ready" can the lumberjack batch
// be ACKed. This prevents the batch from being ACKed prematurely.
func (t *batchACKTracker) Ready() {
t.ACK()
}

// Add increments the number of pending ACKs.
func (t *batchACKTracker) Add() {
t.mu.Lock()
t.pending++
t.mu.Unlock()
}

// ACK decrements the number of pending event ACKs. When all pending ACKs are
// received then the event batch is ACKed.
func (t *batchACKTracker) ACK() {
t.mu.Lock()
defer t.mu.Unlock()

if t.pending <= 0 {
panic("misuse detected: negative ACK counter")
}

t.pending--
if t.pending == 0 {
t.batchACK()
}
}
50 changes: 50 additions & 0 deletions x-pack/filebeat/input/http_endpoint/ack_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package http_endpoint

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestBatchACKTracker(t *testing.T) {
t.Run("empty", func(t *testing.T) {
tracker := make(ack)

acker := newBatchACKTracker(tracker.ACK)
require.False(t, tracker.wasACKed())

acker.Ready()
require.True(t, tracker.wasACKed())
})

t.Run("single_event", func(t *testing.T) {
tracker := make(ack)

acker := newBatchACKTracker(tracker.ACK)
acker.Add()
acker.ACK()
require.False(t, tracker.wasACKed())

acker.Ready()
require.True(t, tracker.wasACKed())
})
}

type ack chan struct{}

func (a ack) ACK() {
close(a)
}

func (a ack) wasACKed() bool {
select {
case <-a:
return true
default:
return false
}
}
16 changes: 11 additions & 5 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/types/known/structpb"

stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/jsontransform"
Expand All @@ -45,7 +44,7 @@ var (

type handler struct {
metrics *inputMetrics
publisher stateless.Publisher
publish func(beat.Event)
log *logp.Logger
validator apiValidator
txBaseID string // Random value to make transaction IDs unique.
Expand All @@ -71,6 +70,11 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

start := time.Now()
acker := newBatchACKTracker(func() {
h.metrics.batchACKTime.Update(time.Since(start).Nanoseconds())
h.metrics.batchesACKedTotal.Inc()
})
defer acker.Ready()
h.metrics.batchesReceived.Add(1)
h.metrics.contentLength.Update(r.ContentLength)
body, status, err := getBodyReader(r)
Expand Down Expand Up @@ -124,7 +128,8 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

if err = h.publishEvent(obj, headers); err != nil {
acker.Add()
if err = h.publishEvent(obj, headers, acker); err != nil {
h.metrics.apiErrors.Add(1)
h.sendAPIErrorResponse(w, r, h.log, http.StatusInternalServerError, err)
return
Expand Down Expand Up @@ -219,10 +224,11 @@ func (h *handler) sendResponse(w http.ResponseWriter, status int, message string
}
}

func (h *handler) publishEvent(obj, headers mapstr.M) error {
func (h *handler) publishEvent(obj, headers mapstr.M, acker *batchACKTracker) error {
event := beat.Event{
Timestamp: time.Now().UTC(),
Fields: mapstr.M{},
Private: acker,
}
if h.preserveOriginalEvent {
event.Fields["event"] = mapstr.M{
Expand All @@ -237,7 +243,7 @@ func (h *handler) publishEvent(obj, headers mapstr.M) error {
return fmt.Errorf("failed to put data into event key %q: %w", h.messageField, err)
}

h.publisher.Publish(event)
h.publish(event)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/http_endpoint/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func Test_apiResponse(t *testing.T) {
pub := new(publisher)
metrics := newInputMetrics("")
defer metrics.Close()
apiHandler := newHandler(ctx, tracerConfig(tc.name, tc.conf, *withTraces), nil, pub, logp.NewLogger("http_endpoint.test"), metrics)
apiHandler := newHandler(ctx, tracerConfig(tc.name, tc.conf, *withTraces), nil, pub.Publish, logp.NewLogger("http_endpoint.test"), metrics)

// Execute handler.
respRec := httptest.NewRecorder()
Expand Down
33 changes: 24 additions & 9 deletions x-pack/filebeat/input/http_endpoint/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"go.uber.org/zap/zapcore"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
conf "github.com/elastic/elastic-agent-libs/config"
Expand All @@ -54,11 +54,11 @@ func Plugin() v2.Plugin {
Name: inputName,
Stability: feature.Stable,
Deprecated: false,
Manager: stateless.NewInputManager(configure),
Manager: v2.ConfigureWith(configure),
}
}

func configure(cfg *conf.C) (stateless.Input, error) {
func configure(cfg *conf.C) (v2.Input, error) {
conf := defaultConfig()
if err := cfg.Unpack(&conf); err != nil {
return nil, err
Expand Down Expand Up @@ -100,10 +100,19 @@ func (e *httpEndpoint) Test(_ v2.TestContext) error {
return l.Close()
}

func (e *httpEndpoint) Run(ctx v2.Context, publisher stateless.Publisher) error {
func (e *httpEndpoint) Run(ctx v2.Context, pipeline beat.Pipeline) error {
metrics := newInputMetrics(ctx.ID)
defer metrics.Close()
err := servers.serve(ctx, e, publisher, metrics)

client, err := pipeline.ConnectWith(beat.ClientConfig{
EventListener: newEventACKHandler(),
})
if err != nil {
return fmt.Errorf("failed to create pipeline client: %w", err)
}
defer client.Close()

err = servers.serve(ctx, e, client.Publish, metrics)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("unable to start server due to error: %w", err)
}
Expand All @@ -125,7 +134,7 @@ type pool struct {
// cancelled or the context of another end-point sharing the same address
// has had its context cancelled. If an end-point is re-registered with
// the same address and mux pattern, serve will return an error.
func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher, metrics *inputMetrics) error {
func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub func(beat.Event), metrics *inputMetrics) error {
log := ctx.Logger.With("address", e.addr)
pattern := e.config.URL

Expand Down Expand Up @@ -300,14 +309,14 @@ func (s *server) getErr() error {
return s.err
}

func newHandler(ctx context.Context, c config, prg *program, pub stateless.Publisher, log *logp.Logger, metrics *inputMetrics) http.Handler {
func newHandler(ctx context.Context, c config, prg *program, pub func(beat.Event), log *logp.Logger, metrics *inputMetrics) http.Handler {
h := &handler{
log: log,
txBaseID: newID(),
txIDCounter: atomic.NewUint64(0),

publisher: pub,
metrics: metrics,
publish: pub,
metrics: metrics,
validator: apiValidator{
basicAuth: c.BasicAuth,
username: c.Username,
Expand Down Expand Up @@ -375,10 +384,12 @@ type inputMetrics struct {
apiErrors *monitoring.Uint // number of API errors
batchesReceived *monitoring.Uint // number of event arrays received
batchesPublished *monitoring.Uint // number of event arrays published
batchesACKedTotal *monitoring.Uint // Number of event arrays ACKed.
eventsPublished *monitoring.Uint // number of events published
contentLength metrics.Sample // histogram of request content lengths.
batchSize metrics.Sample // histogram of the received batch sizes.
batchProcessingTime metrics.Sample // histogram of the elapsed successful batch processing times in nanoseconds (time of handler start to time of ACK for non-empty batches).
batchACKTime metrics.Sample // histogram of the elapsed successful batch acking times in nanoseconds (time of handler start to time of ACK for non-empty batches).
}

func newInputMetrics(id string) *inputMetrics {
Expand All @@ -391,17 +402,21 @@ func newInputMetrics(id string) *inputMetrics {
apiErrors: monitoring.NewUint(reg, "api_errors_total"),
batchesReceived: monitoring.NewUint(reg, "batches_received_total"),
batchesPublished: monitoring.NewUint(reg, "batches_published_total"),
batchesACKedTotal: monitoring.NewUint(reg, "batches_acked_total"),
eventsPublished: monitoring.NewUint(reg, "events_published_total"),
contentLength: metrics.NewUniformSample(1024),
batchSize: metrics.NewUniformSample(1024),
batchProcessingTime: metrics.NewUniformSample(1024),
batchACKTime: metrics.NewUniformSample(1024),
}
_ = adapter.NewGoMetrics(reg, "size", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.contentLength))
_ = adapter.NewGoMetrics(reg, "batch_size", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.batchSize))
_ = adapter.NewGoMetrics(reg, "batch_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.batchProcessingTime))
_ = adapter.NewGoMetrics(reg, "batch_ack_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.batchACKTime))

return out
}
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/http_endpoint/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func TestServerPool(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
err := servers.serve(ctx, cfg, &pub, metrics)
err := servers.serve(ctx, cfg, pub.Publish, metrics)
if err != http.ErrServerClosed {

Check failure on line 284 in x-pack/filebeat/input/http_endpoint/input_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with != will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)

Check failure on line 284 in x-pack/filebeat/input/http_endpoint/input_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with != will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
select {
case fails <- err:
Expand Down Expand Up @@ -331,7 +331,7 @@ func TestServerPool(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
err := servers.serve(ctx, cfg, &pub, metrics)
err := servers.serve(ctx, cfg, pub.Publish, metrics)
if err != nil && err != http.ErrServerClosed && test.wantErr == nil {

Check failure on line 335 in x-pack/filebeat/input/http_endpoint/input_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with != will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)

Check failure on line 335 in x-pack/filebeat/input/http_endpoint/input_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with != will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
t.Errorf("failed to re-register %v: %v", cfg.addr, err)
}
Expand Down

0 comments on commit 4f5f26d

Please sign in to comment.