From 338227ae43d338d519d7ff58f057d0b68c7777a0 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 6 May 2024 13:14:29 +0930 Subject: [PATCH 01/13] x-pack/filebeat/input/http_endpoint: make input GA --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-http-endpoint.asciidoc | 47 ++++++++++++++++--- x-pack/filebeat/input/http_endpoint/input.go | 2 +- .../tests/system/test_http_endpoint.py | 39 +++++++++++++++ 4 files changed, 81 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e63eabc3f1d..fc9ee6c44bf 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -273,6 +273,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add default user-agent to CEL HTTP requests. {issue}39502[39502] {pull}39587[39587] - Improve reindexing support in security module pipelines. {issue}38224[38224] {pull}[] - Improve reindexing support in security module pipelines. {issue}38224[38224] {pull}39588[39588] +- Make HTTP Endpoint input GA. {issue}38979[38979] {pull}39410[39410] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc index a669eae489a..9a235f31076 100644 --- a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc @@ -9,13 +9,12 @@ HTTP Endpoint ++++ -beta[] - The HTTP Endpoint input initializes a listening HTTP server that collects -incoming HTTP POST requests containing a JSON body. The body must be either an -object or an array of objects. Any other data types will result in an HTTP 400 -(Bad Request) response. For arrays, one document is created for each object in -the array. +incoming HTTP POST requests containing a JSON body. The body must be either +an object or an array of objects, otherwise a Common Expression Language +expression that converts the the JSON body to these types can be provided. +Any other data types will result in an HTTP 400 (Bad Request) response. For +arrays, one document is created for each object in the array. gzip encoded request bodies are supported if a `Content-Encoding: gzip` header is sent with the request. @@ -171,6 +170,40 @@ Preserving original event and including headers in document include_headers: ["TestHeader"] ---- +Common Expression Language example: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: http_endpoint + enabled: true + listen_address: 192.168.1.1 + listen_port: 8080 + program: | + obj.records.map(r, { + "requestId": obj.requestId, + "timestamp": string(obj.timestamp), + "event": r, + }) +---- +This example would allow handling of a JSON body that is an object containing +more than one event that each should be ingested as separate documents with +the common timestamp and request ID: +["source","json",subs="attributes"] +---- +{ + "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", + "timestamp": 1578090901599, + "records": [ + { + "data": "event record 1" + }, + { + "data": "event record 2" + } + ] +} +---- + ==== Configuration options The `http_endpoint` input supports the following configuration options plus the @@ -230,7 +263,7 @@ In certain scenarios when the source of the request is not able to do that, it c [float] ==== `program` -The normal operation of the input treats the body either as a single event when the body is an object, or as a set of events when the body is an array. If the body should be handled differently, for example a set of events in an array field of an object to be handled as a set of events, then a https://opensource.google.com/projects/cel[Common Expression Language (CEL)] program can be provided through this configuration field. No CEL extensions are provided beyond the function in the CEL https://github.com/google/cel-spec/blob/master/doc/langdef.md#standard[standard library]. CEL https://pkg.go.dev/github.com/google/cel-go/cel#OptionalTypes[optional types] are supported. +The normal operation of the input treats the body either as a single event when the body is an object, or as a set of events when the body is an array. If the body should be handled differently, for example a set of events in an array field of an object to be handled as a set of events, then a https://opensource.google.com/projects/cel[Common Expression Language (CEL)] program can be provided through this configuration field. The name of the object in the CEL program is `obj`. No CEL extensions are provided beyond the function in the CEL https://github.com/google/cel-spec/blob/master/doc/langdef.md#standard[standard library]. CEL https://pkg.go.dev/github.com/google/cel-go/cel#OptionalTypes[optional types] are supported. [float] ==== `response_code` diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index e9d9bfe7ba9..f89631f22d7 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -52,7 +52,7 @@ type httpEndpoint struct { func Plugin() v2.Plugin { return v2.Plugin{ Name: inputName, - Stability: feature.Beta, + Stability: feature.Stable, Deprecated: false, Manager: stateless.NewInputManager(configure), } diff --git a/x-pack/filebeat/tests/system/test_http_endpoint.py b/x-pack/filebeat/tests/system/test_http_endpoint.py index 9a42896d4ae..119bc03ce3d 100644 --- a/x-pack/filebeat/tests/system/test_http_endpoint.py +++ b/x-pack/filebeat/tests/system/test_http_endpoint.py @@ -90,6 +90,45 @@ def test_http_endpoint_request(self): assert output[0]["input.type"] == "http_endpoint" assert output[0]["json.{}".format(self.prefix)] == message + def test_http_endpoint_cel_request(self): + """ + Test http_endpoint input with HTTP events using CEL. + """ + options = """ + content_type: application/x-ndjson + program: | + {{ + "testmessage": obj.testmessage+'_'+obj.testmessage, + }} +""" + self.get_config(options) + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains( + "Starting HTTP server on {}:{}".format(self.host, self.port))) + + N = 10 + message = "somerandommessage_{}" + payload = "\n".join( + [json.dumps({self.prefix: message.format(i)}) for i in range(N)]) + headers = {"Content-Type": "application/x-ndjson", + "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data=payload) + + self.wait_until(lambda: self.output_count(lambda x: x == N)) + filebeat.check_kill_and_wait() + + output = self.read_output() + + print("response:", r.status_code, r.text) + + assert r.text == '{"message": "success"}' + + assert len(output) == N + for i in range(N): + assert output[i]["input.type"] == "http_endpoint" + assert output[i]["json.{}".format( + self.prefix)] == message.format(i)+'_'+message.format(i) + def test_http_endpoint_request_multiple_documents(self): """ Test http_endpoint input with multiple documents on a single HTTP request. From 3d87bdf116957022c1ac68af69c166b68b3c45c3 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Wed, 8 May 2024 11:09:48 +0930 Subject: [PATCH 02/13] x-pack/filebeat/input/http_endpoint: add acking --- x-pack/filebeat/input/http_endpoint/ack.go | 77 +++++++++++++++++++ .../filebeat/input/http_endpoint/ack_test.go | 50 ++++++++++++ .../filebeat/input/http_endpoint/handler.go | 16 ++-- .../input/http_endpoint/handler_test.go | 2 +- x-pack/filebeat/input/http_endpoint/input.go | 33 +++++--- .../input/http_endpoint/input_test.go | 4 +- 6 files changed, 165 insertions(+), 17 deletions(-) create mode 100644 x-pack/filebeat/input/http_endpoint/ack.go create mode 100644 x-pack/filebeat/input/http_endpoint/ack_test.go diff --git a/x-pack/filebeat/input/http_endpoint/ack.go b/x-pack/filebeat/input/http_endpoint/ack.go new file mode 100644 index 00000000000..aa396deae7d --- /dev/null +++ b/x-pack/filebeat/input/http_endpoint/ack.go @@ -0,0 +1,77 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http_endpoint + +import ( + "sync" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/acker" +) + +// newEventACKHandler returns a beat ACKer that can receive callbacks when +// an event has been ACKed an output. If the event contains a private metadata +// pointing to a batchACKTracker then it will invoke the tracker's ACK() method +// to decrement the number of pending ACKs. +func newEventACKHandler() beat.EventListener { + return acker.ConnectionOnly( + acker.EventPrivateReporter(func(_ int, privates []interface{}) { + for _, private := range privates { + if ack, ok := private.(*batchACKTracker); ok { + ack.ACK() + } + } + }), + ) +} + +// batchACKTracker invokes batchACK when all events associated to the batch +// have been published and acknowledged by an output. +type batchACKTracker struct { + batchACK func() + + mu sync.Mutex + pending int64 +} + +// newBatchACKTracker returns a new batchACKTracker. The provided batchACK function +// is invoked after the full batch has been acknowledged. Ready() must be invoked +// after all events in the batch are published. +func newBatchACKTracker(fn func()) *batchACKTracker { + return &batchACKTracker{ + batchACK: fn, + pending: 1, // Ready() must be called to consume this "1". + } +} + +// Ready signals that the batch has been fully consumed. Only +// after the batch is marked as "ready" can the lumberjack batch +// be ACKed. This prevents the batch from being ACKed prematurely. +func (t *batchACKTracker) Ready() { + t.ACK() +} + +// Add increments the number of pending ACKs. +func (t *batchACKTracker) Add() { + t.mu.Lock() + t.pending++ + t.mu.Unlock() +} + +// ACK decrements the number of pending event ACKs. When all pending ACKs are +// received then the event batch is ACKed. +func (t *batchACKTracker) ACK() { + t.mu.Lock() + defer t.mu.Unlock() + + if t.pending <= 0 { + panic("misuse detected: negative ACK counter") + } + + t.pending-- + if t.pending == 0 { + t.batchACK() + } +} diff --git a/x-pack/filebeat/input/http_endpoint/ack_test.go b/x-pack/filebeat/input/http_endpoint/ack_test.go new file mode 100644 index 00000000000..59b67a39fb8 --- /dev/null +++ b/x-pack/filebeat/input/http_endpoint/ack_test.go @@ -0,0 +1,50 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http_endpoint + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBatchACKTracker(t *testing.T) { + t.Run("empty", func(t *testing.T) { + tracker := make(ack) + + acker := newBatchACKTracker(tracker.ACK) + require.False(t, tracker.wasACKed()) + + acker.Ready() + require.True(t, tracker.wasACKed()) + }) + + t.Run("single_event", func(t *testing.T) { + tracker := make(ack) + + acker := newBatchACKTracker(tracker.ACK) + acker.Add() + acker.ACK() + require.False(t, tracker.wasACKed()) + + acker.Ready() + require.True(t, tracker.wasACKed()) + }) +} + +type ack chan struct{} + +func (a ack) ACK() { + close(a) +} + +func (a ack) wasACKed() bool { + select { + case <-a: + return true + default: + return false + } +} diff --git a/x-pack/filebeat/input/http_endpoint/handler.go b/x-pack/filebeat/input/http_endpoint/handler.go index d21ac145174..2e8523a9068 100644 --- a/x-pack/filebeat/input/http_endpoint/handler.go +++ b/x-pack/filebeat/input/http_endpoint/handler.go @@ -25,7 +25,6 @@ import ( "go.uber.org/zap/zapcore" "google.golang.org/protobuf/types/known/structpb" - stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/jsontransform" @@ -45,7 +44,7 @@ var ( type handler struct { metrics *inputMetrics - publisher stateless.Publisher + publish func(beat.Event) log *logp.Logger validator apiValidator txBaseID string // Random value to make transaction IDs unique. @@ -71,6 +70,11 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } start := time.Now() + acker := newBatchACKTracker(func() { + h.metrics.batchACKTime.Update(time.Since(start).Nanoseconds()) + h.metrics.batchesACKedTotal.Inc() + }) + defer acker.Ready() h.metrics.batchesReceived.Add(1) h.metrics.contentLength.Update(r.ContentLength) body, status, err := getBodyReader(r) @@ -124,7 +128,8 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - if err = h.publishEvent(obj, headers); err != nil { + acker.Add() + if err = h.publishEvent(obj, headers, acker); err != nil { h.metrics.apiErrors.Add(1) h.sendAPIErrorResponse(w, r, h.log, http.StatusInternalServerError, err) return @@ -219,10 +224,11 @@ func (h *handler) sendResponse(w http.ResponseWriter, status int, message string } } -func (h *handler) publishEvent(obj, headers mapstr.M) error { +func (h *handler) publishEvent(obj, headers mapstr.M, acker *batchACKTracker) error { event := beat.Event{ Timestamp: time.Now().UTC(), Fields: mapstr.M{}, + Private: acker, } if h.preserveOriginalEvent { event.Fields["event"] = mapstr.M{ @@ -237,7 +243,7 @@ func (h *handler) publishEvent(obj, headers mapstr.M) error { return fmt.Errorf("failed to put data into event key %q: %w", h.messageField, err) } - h.publisher.Publish(event) + h.publish(event) return nil } diff --git a/x-pack/filebeat/input/http_endpoint/handler_test.go b/x-pack/filebeat/input/http_endpoint/handler_test.go index cb911f8ab18..61bedb7e679 100644 --- a/x-pack/filebeat/input/http_endpoint/handler_test.go +++ b/x-pack/filebeat/input/http_endpoint/handler_test.go @@ -378,7 +378,7 @@ func Test_apiResponse(t *testing.T) { pub := new(publisher) metrics := newInputMetrics("") defer metrics.Close() - apiHandler := newHandler(ctx, tracerConfig(tc.name, tc.conf, *withTraces), nil, pub, logp.NewLogger("http_endpoint.test"), metrics) + apiHandler := newHandler(ctx, tracerConfig(tc.name, tc.conf, *withTraces), nil, pub.Publish, logp.NewLogger("http_endpoint.test"), metrics) // Execute handler. respRec := httptest.NewRecorder() diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index f89631f22d7..c4705cc3001 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -27,7 +27,7 @@ import ( "go.uber.org/zap/zapcore" v2 "github.com/elastic/beats/v7/filebeat/input/v2" - stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" conf "github.com/elastic/elastic-agent-libs/config" @@ -54,11 +54,11 @@ func Plugin() v2.Plugin { Name: inputName, Stability: feature.Stable, Deprecated: false, - Manager: stateless.NewInputManager(configure), + Manager: v2.ConfigureWith(configure), } } -func configure(cfg *conf.C) (stateless.Input, error) { +func configure(cfg *conf.C) (v2.Input, error) { conf := defaultConfig() if err := cfg.Unpack(&conf); err != nil { return nil, err @@ -100,10 +100,19 @@ func (e *httpEndpoint) Test(_ v2.TestContext) error { return l.Close() } -func (e *httpEndpoint) Run(ctx v2.Context, publisher stateless.Publisher) error { +func (e *httpEndpoint) Run(ctx v2.Context, pipeline beat.Pipeline) error { metrics := newInputMetrics(ctx.ID) defer metrics.Close() - err := servers.serve(ctx, e, publisher, metrics) + + client, err := pipeline.ConnectWith(beat.ClientConfig{ + EventListener: newEventACKHandler(), + }) + if err != nil { + return fmt.Errorf("failed to create pipeline client: %w", err) + } + defer client.Close() + + err = servers.serve(ctx, e, client.Publish, metrics) if err != nil && !errors.Is(err, http.ErrServerClosed) { return fmt.Errorf("unable to start server due to error: %w", err) } @@ -125,7 +134,7 @@ type pool struct { // cancelled or the context of another end-point sharing the same address // has had its context cancelled. If an end-point is re-registered with // the same address and mux pattern, serve will return an error. -func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher, metrics *inputMetrics) error { +func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub func(beat.Event), metrics *inputMetrics) error { log := ctx.Logger.With("address", e.addr) pattern := e.config.URL @@ -300,14 +309,14 @@ func (s *server) getErr() error { return s.err } -func newHandler(ctx context.Context, c config, prg *program, pub stateless.Publisher, log *logp.Logger, metrics *inputMetrics) http.Handler { +func newHandler(ctx context.Context, c config, prg *program, pub func(beat.Event), log *logp.Logger, metrics *inputMetrics) http.Handler { h := &handler{ log: log, txBaseID: newID(), txIDCounter: atomic.NewUint64(0), - publisher: pub, - metrics: metrics, + publish: pub, + metrics: metrics, validator: apiValidator{ basicAuth: c.BasicAuth, username: c.Username, @@ -375,10 +384,12 @@ type inputMetrics struct { apiErrors *monitoring.Uint // number of API errors batchesReceived *monitoring.Uint // number of event arrays received batchesPublished *monitoring.Uint // number of event arrays published + batchesACKedTotal *monitoring.Uint // Number of event arrays ACKed. eventsPublished *monitoring.Uint // number of events published contentLength metrics.Sample // histogram of request content lengths. batchSize metrics.Sample // histogram of the received batch sizes. batchProcessingTime metrics.Sample // histogram of the elapsed successful batch processing times in nanoseconds (time of handler start to time of ACK for non-empty batches). + batchACKTime metrics.Sample // histogram of the elapsed successful batch acking times in nanoseconds (time of handler start to time of ACK for non-empty batches). } func newInputMetrics(id string) *inputMetrics { @@ -391,10 +402,12 @@ func newInputMetrics(id string) *inputMetrics { apiErrors: monitoring.NewUint(reg, "api_errors_total"), batchesReceived: monitoring.NewUint(reg, "batches_received_total"), batchesPublished: monitoring.NewUint(reg, "batches_published_total"), + batchesACKedTotal: monitoring.NewUint(reg, "batches_acked_total"), eventsPublished: monitoring.NewUint(reg, "events_published_total"), contentLength: metrics.NewUniformSample(1024), batchSize: metrics.NewUniformSample(1024), batchProcessingTime: metrics.NewUniformSample(1024), + batchACKTime: metrics.NewUniformSample(1024), } _ = adapter.NewGoMetrics(reg, "size", adapter.Accept). Register("histogram", metrics.NewHistogram(out.contentLength)) @@ -402,6 +415,8 @@ func newInputMetrics(id string) *inputMetrics { Register("histogram", metrics.NewHistogram(out.batchSize)) _ = adapter.NewGoMetrics(reg, "batch_processing_time", adapter.Accept). Register("histogram", metrics.NewHistogram(out.batchProcessingTime)) + _ = adapter.NewGoMetrics(reg, "batch_ack_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.batchACKTime)) return out } diff --git a/x-pack/filebeat/input/http_endpoint/input_test.go b/x-pack/filebeat/input/http_endpoint/input_test.go index c7c1b89bf3a..3f530454e1d 100644 --- a/x-pack/filebeat/input/http_endpoint/input_test.go +++ b/x-pack/filebeat/input/http_endpoint/input_test.go @@ -280,7 +280,7 @@ func TestServerPool(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err := servers.serve(ctx, cfg, &pub, metrics) + err := servers.serve(ctx, cfg, pub.Publish, metrics) if err != http.ErrServerClosed { select { case fails <- err: @@ -331,7 +331,7 @@ func TestServerPool(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err := servers.serve(ctx, cfg, &pub, metrics) + err := servers.serve(ctx, cfg, pub.Publish, metrics) if err != nil && err != http.ErrServerClosed && test.wantErr == nil { t.Errorf("failed to re-register %v: %v", cfg.addr, err) } From 8972f9e29864d11bdec5f9b952d3d0ca65d8ca6b Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Thu, 9 May 2024 07:05:30 +0930 Subject: [PATCH 03/13] address pr comments --- x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc | 2 ++ x-pack/filebeat/input/http_endpoint/ack.go | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc index 9a235f31076..66d0281e1e8 100644 --- a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc @@ -379,10 +379,12 @@ observe the activity of the input. | `api_errors_total` | Number of API errors. | `batches_received_total` | Number of event arrays received. | `batches_published_total` | Number of event arrays published. +| `batches_acked_total` | Number of event arrays ACKed. | `events_published_total` | Number of events published. | `size` | Histogram of request content lengths. | `batch_size` | Histogram of the received event array length. | `batch_processing_time` | Histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches). +| `batch_ack_time` | Histogram of the elapsed successful batch ACKing times in nanoseconds (time of handler start to time of ACK for non-empty batches). |======= [id="{beatname_lc}-input-{type}-common-options"] diff --git a/x-pack/filebeat/input/http_endpoint/ack.go b/x-pack/filebeat/input/http_endpoint/ack.go index aa396deae7d..9dfc5a656b5 100644 --- a/x-pack/filebeat/input/http_endpoint/ack.go +++ b/x-pack/filebeat/input/http_endpoint/ack.go @@ -47,8 +47,8 @@ func newBatchACKTracker(fn func()) *batchACKTracker { } // Ready signals that the batch has been fully consumed. Only -// after the batch is marked as "ready" can the lumberjack batch -// be ACKed. This prevents the batch from being ACKed prematurely. +// after the batch is marked as "ready" can the batch be ACKed. +// This prevents the batch from being ACKed prematurely. func (t *batchACKTracker) Ready() { t.ACK() } From f2748c1455a145aa4994ce53949d637780effe66 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Thu, 9 May 2024 15:40:09 +0930 Subject: [PATCH 04/13] x-pack/filebeat/input/http_endpoint: add e2e acking --- .../filebeat/input/http_endpoint/handler.go | 58 +++++++++++++++++-- x-pack/filebeat/input/http_endpoint/input.go | 1 + 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/http_endpoint/handler.go b/x-pack/filebeat/input/http_endpoint/handler.go index 2e8523a9068..c76f11fe52f 100644 --- a/x-pack/filebeat/input/http_endpoint/handler.go +++ b/x-pack/filebeat/input/http_endpoint/handler.go @@ -6,12 +6,14 @@ package http_endpoint import ( "bytes" + "context" "encoding/json" "errors" "fmt" "io" "net" "net/http" + "net/url" "reflect" "strconv" "time" @@ -43,6 +45,8 @@ var ( ) type handler struct { + ctx context.Context + metrics *inputMetrics publish func(beat.Event) log *logp.Logger @@ -69,12 +73,23 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + wait := getTimeoutWait(r.URL, h.log) + var ( + acked chan struct{} + timeout *time.Timer + ) + if wait != 0 { + acked = make(chan struct{}) + timeout = time.NewTimer(wait) + } start := time.Now() acker := newBatchACKTracker(func() { h.metrics.batchACKTime.Update(time.Since(start).Nanoseconds()) h.metrics.batchesACKedTotal.Inc() + if acked != nil { + close(acked) + } }) - defer acker.Ready() h.metrics.batchesReceived.Add(1) h.metrics.contentLength.Update(r.ContentLength) body, status, err := getBodyReader(r) @@ -138,14 +153,49 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { respCode, respBody = h.responseCode, h.responseBody } - h.sendResponse(w, respCode, respBody) - if h.reqLogger != nil { - h.logRequest(r, respCode, nil) + acker.Ready() + if acked == nil { + h.sendResponse(w, respCode, respBody) + } else { + select { + case <-acked: + if !timeout.Stop() { + <-timeout.C + } + h.sendResponse(w, respCode, respBody) + case <-timeout.C: + h.sendAPIErrorResponse(w, r, h.log, http.StatusGatewayTimeout, errTookTooLong) + case <-h.ctx.Done(): + h.sendAPIErrorResponse(w, r, h.log, http.StatusGatewayTimeout, h.ctx.Err()) + } + if h.reqLogger != nil { + h.logRequest(r, respCode, nil) + } } h.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds()) h.metrics.batchesPublished.Add(1) } +var errTookTooLong = errors.New("could not publish event within timeout") + +func getTimeoutWait(u *url.URL, log *logp.Logger) time.Duration { + p := u.Query().Get("wait_for_completion_timeout") + if p == "" { + return 0 + } + log.Debugw("wait_for_completion_timeout parameter", "value", p) + t, err := time.ParseDuration(p) + if err != nil { + log.Warnw("could not parse wait_for_completion_timeout parameter", "error", err) + return 0 + } + if t < 0 { + log.Warnw("negative wait_for_completion_timeout parameter", "error", err) + return 0 + } + return t +} + func (h *handler) sendAPIErrorResponse(w http.ResponseWriter, r *http.Request, log *logp.Logger, status int, apiError error) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(status) diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index c4705cc3001..1968ced0bb6 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -311,6 +311,7 @@ func (s *server) getErr() error { func newHandler(ctx context.Context, c config, prg *program, pub func(beat.Event), log *logp.Logger, metrics *inputMetrics) http.Handler { h := &handler{ + ctx: ctx, log: log, txBaseID: newID(), txIDCounter: atomic.NewUint64(0), From 9bbcac271661c7d230d105c8d9fad5c1e0d83bc1 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Fri, 10 May 2024 07:06:08 +0930 Subject: [PATCH 05/13] x-pack/filebeat/input/http_endpoint: add debug URL logging --- .../filebeat/input/http_endpoint/handler.go | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/x-pack/filebeat/input/http_endpoint/handler.go b/x-pack/filebeat/input/http_endpoint/handler.go index c76f11fe52f..a2926cac8f2 100644 --- a/x-pack/filebeat/input/http_endpoint/handler.go +++ b/x-pack/filebeat/input/http_endpoint/handler.go @@ -67,9 +67,11 @@ type handler struct { } func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + txID := h.nextTxID() + h.log.Debugw("request", "url", r.URL, "tx_id", txID) status, err := h.validator.validateRequest(r) if err != nil { - h.sendAPIErrorResponse(w, r, h.log, status, err) + h.sendAPIErrorResponse(txID, w, r, h.log, status, err) return } @@ -94,7 +96,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.metrics.contentLength.Update(r.ContentLength) body, status, err := getBodyReader(r) if err != nil { - h.sendAPIErrorResponse(w, r, h.log, status, err) + h.sendAPIErrorResponse(txID, w, r, h.log, status, err) h.metrics.apiErrors.Add(1) return } @@ -113,7 +115,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { objs, _, status, err := httpReadJSON(body, h.program) if err != nil { - h.sendAPIErrorResponse(w, r, h.log, status, err) + h.sendAPIErrorResponse(txID, w, r, h.log, status, err) h.metrics.apiErrors.Add(1) return } @@ -138,7 +140,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { break } else if !errors.Is(err, errNotCRC) { h.metrics.apiErrors.Add(1) - h.sendAPIErrorResponse(w, r, h.log, http.StatusBadRequest, err) + h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusBadRequest, err) return } } @@ -146,7 +148,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { acker.Add() if err = h.publishEvent(obj, headers, acker); err != nil { h.metrics.apiErrors.Add(1) - h.sendAPIErrorResponse(w, r, h.log, http.StatusInternalServerError, err) + h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusInternalServerError, err) return } h.metrics.eventsPublished.Add(1) @@ -159,17 +161,20 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } else { select { case <-acked: + h.log.Debugw("request acked", "tx_id", txID) if !timeout.Stop() { <-timeout.C } h.sendResponse(w, respCode, respBody) case <-timeout.C: - h.sendAPIErrorResponse(w, r, h.log, http.StatusGatewayTimeout, errTookTooLong) + h.log.Debugw("request timed out", "tx_id", txID) + h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusGatewayTimeout, errTookTooLong) case <-h.ctx.Done(): - h.sendAPIErrorResponse(w, r, h.log, http.StatusGatewayTimeout, h.ctx.Err()) + h.log.Debugw("request context cancelled", "tx_id", txID) + h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusGatewayTimeout, h.ctx.Err()) } if h.reqLogger != nil { - h.logRequest(r, respCode, nil) + h.logRequest(txID, r, respCode, nil) } } h.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds()) @@ -196,7 +201,7 @@ func getTimeoutWait(u *url.URL, log *logp.Logger) time.Duration { return t } -func (h *handler) sendAPIErrorResponse(w http.ResponseWriter, r *http.Request, log *logp.Logger, status int, apiError error) { +func (h *handler) sendAPIErrorResponse(txID string, w http.ResponseWriter, r *http.Request, log *logp.Logger, status int, apiError error) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(status) @@ -214,11 +219,11 @@ func (h *handler) sendAPIErrorResponse(w http.ResponseWriter, r *http.Request, l log.Debugw("Failed to write HTTP response.", "error", err, "client.address", r.RemoteAddr) } if h.reqLogger != nil { - h.logRequest(r, status, buf.Bytes()) + h.logRequest(txID, r, status, buf.Bytes()) } } -func (h *handler) logRequest(r *http.Request, status int, respBody []byte) { +func (h *handler) logRequest(txID string, r *http.Request, status int, respBody []byte) { // Populate and preserve scheme and host if they are missing; // they are required for httputil.DumpRequestOut. var scheme, host string @@ -244,7 +249,6 @@ func (h *handler) logRequest(r *http.Request, status int, respBody []byte) { zap.ByteString("http.response.body.content", respBody), ) } - txID := h.nextTxID() h.log.Debugw("new request trace transaction", "id", txID) // Limit request logging body size to 10kiB. const maxBodyLen = 10 * (1 << 10) From 4e9325e2f23885f42073225d5364fb57c4f13d82 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Fri, 10 May 2024 17:44:49 +0930 Subject: [PATCH 06/13] x-pack/filebeat/docs/inputs: add timeout documentation --- .../filebeat/docs/inputs/input-http-endpoint.asciidoc | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc index 66d0281e1e8..5ab6be2fc24 100644 --- a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc @@ -40,8 +40,18 @@ These are the possible response codes from the server. | 406 | Not Acceptable | Returned if the POST request does not contain a body. | 415 | Unsupported Media Type | Returned if the Content-Type is not application/json. Or if Content-Encoding is present and is not gzip. | 500 | Internal Server Error | Returned if an I/O error occurs reading the request. +| 504 | Gateway Timeout | Returned if a request publication cannot be ACKed within the required timeout. |========================================================================================================================================================= +The endpoint will enforce end-to-end ACK when a URL query parameter +`wait_for_completion_timeout` with a duration is provided. For example +`http://localhost:8080/?wait_for_completion_timeout=1m` will wait up +to 1min for the event to be published to the cluster and then return +the user-defined response message. In the case that the publication +does not happen within the timeout duration, the HTTP response will +have a 504 Gateway Timeout status code. The syntax for durations is +a number followed by units which may be h, m and s. + Example configurations: Basic example: From d08dad22f34a925da3c9e3631a0bfabab21e19e0 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sat, 11 May 2024 06:55:00 +0930 Subject: [PATCH 07/13] address pr comment --- x-pack/filebeat/input/lumberjack/ack.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/lumberjack/ack.go b/x-pack/filebeat/input/lumberjack/ack.go index 809a7e7d135..6e9e65cb695 100644 --- a/x-pack/filebeat/input/lumberjack/ack.go +++ b/x-pack/filebeat/input/lumberjack/ack.go @@ -62,7 +62,7 @@ func (t *batchACKTracker) ACK() { } // newEventACKHandler returns a beat ACKer that can receive callbacks when -// an event has been ACKed an output. If the event contains a private metadata +// an event has been ACKed by an output. If the event contains a private metadata // pointing to a batchACKTracker then it will invoke the tracker's ACK() method // to decrement the number of pending ACKs. func newEventACKHandler() beat.EventListener { From 310ce79e5af6b30e4b98f1f87b112bd575caa732 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 13 May 2024 06:17:40 +0930 Subject: [PATCH 08/13] validate queries --- .../docs/inputs/input-http-endpoint.asciidoc | 7 ++- .../filebeat/input/http_endpoint/handler.go | 44 +++++++++++++++---- 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc index 5ab6be2fc24..9292ec6b8a9 100644 --- a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc @@ -34,7 +34,7 @@ These are the possible response codes from the server. |========================================================================================================================================================= | HTTP Response Code | Name | Reason | 200 | OK | Returned on success. -| 400 | Bad Request | Returned if JSON body decoding fails. +| 400 | Bad Request | Returned if JSON body decoding fails or if `wait_for_completion_timeout` query validation fails. | 401 | Unauthorized | Returned when basic auth, secret header, or HMAC validation fails. | 405 | Method Not Allowed | Returned if methods other than POST are used. | 406 | Not Acceptable | Returned if the POST request does not contain a body. @@ -50,7 +50,10 @@ to 1min for the event to be published to the cluster and then return the user-defined response message. In the case that the publication does not happen within the timeout duration, the HTTP response will have a 504 Gateway Timeout status code. The syntax for durations is -a number followed by units which may be h, m and s. +a number followed by units which may be h, m and s. No other HTTP query +is accepted. If another query parameter is provided or duration syntax +is incorrect, the request will fail with an HTTP 400 "Bad Request" +status. Example configurations: diff --git a/x-pack/filebeat/input/http_endpoint/handler.go b/x-pack/filebeat/input/http_endpoint/handler.go index a2926cac8f2..bdb98e0d3c7 100644 --- a/x-pack/filebeat/input/http_endpoint/handler.go +++ b/x-pack/filebeat/input/http_endpoint/handler.go @@ -15,7 +15,9 @@ import ( "net/http" "net/url" "reflect" + "sort" "strconv" + "strings" "time" "github.com/google/cel-go/cel" @@ -75,7 +77,11 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - wait := getTimeoutWait(r.URL, h.log) + wait, err := getTimeoutWait(r.URL, h.log) + if err != nil { + h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusBadRequest, err) + return + } var ( acked chan struct{} timeout *time.Timer @@ -183,22 +189,42 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var errTookTooLong = errors.New("could not publish event within timeout") -func getTimeoutWait(u *url.URL, log *logp.Logger) time.Duration { - p := u.Query().Get("wait_for_completion_timeout") +func getTimeoutWait(u *url.URL, log *logp.Logger) (time.Duration, error) { + q := u.Query() + switch len(q) { + case 0: + return 0, nil + case 1: + if _, ok := q["wait_for_completion_timeout"]; !ok { + var k string + for k = range q { + break + } + return 0, fmt.Errorf("unexpected URL query: %s", k) + } + default: + delete(q, "wait_for_completion_timeout") + keys := make([]string, 0, len(q)) + for k := range q { + keys = append(keys, k) + } + sort.Strings(keys) + return 0, fmt.Errorf("unexpected URL query: %s", strings.Join(keys, ", ")) + } + p := q.Get("wait_for_completion_timeout") if p == "" { - return 0 + // This will never happen; it is already handled in the check switch above. + return 0, nil } log.Debugw("wait_for_completion_timeout parameter", "value", p) t, err := time.ParseDuration(p) if err != nil { - log.Warnw("could not parse wait_for_completion_timeout parameter", "error", err) - return 0 + return 0, fmt.Errorf("could not parse wait_for_completion_timeout parameter: %w", err) } if t < 0 { - log.Warnw("negative wait_for_completion_timeout parameter", "error", err) - return 0 + return 0, fmt.Errorf("negative wait_for_completion_timeout parameter: %w", err) } - return t + return t, nil } func (h *handler) sendAPIErrorResponse(txID string, w http.ResponseWriter, r *http.Request, log *logp.Logger, status int, apiError error) { From db44a7b214dca6c0924dab5864af13a42b656d9f Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 13 May 2024 08:05:42 +0930 Subject: [PATCH 09/13] add system test for acking --- .../tests/system/test_http_endpoint.py | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/x-pack/filebeat/tests/system/test_http_endpoint.py b/x-pack/filebeat/tests/system/test_http_endpoint.py index 119bc03ce3d..761b5b78977 100644 --- a/x-pack/filebeat/tests/system/test_http_endpoint.py +++ b/x-pack/filebeat/tests/system/test_http_endpoint.py @@ -90,6 +90,33 @@ def test_http_endpoint_request(self): assert output[0]["input.type"] == "http_endpoint" assert output[0]["json.{}".format(self.prefix)] == message + def test_http_endpoint_request_acked(self): + """ + Test http_endpoint input with HTTP events requiring ACK. + """ + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains( + "Starting HTTP server on {}:{}".format(self.host, self.port))) + + message = "somerandommessage" + payload = {self.prefix: message} + query = {"wait_for_completion_timeout": "1m"} + headers = {"Content-Type": "application/json", + "Accept": "application/json"} + r = requests.post(self.url, params=query, headers=headers, data=json.dumps(payload)) + + self.wait_until(lambda: self.output_count(lambda x: x >= 1)) + filebeat.check_kill_and_wait() + + output = self.read_output() + + print("response:", r.status_code, r.text) + + assert r.text == '{"message": "success"}' + assert output[0]["input.type"] == "http_endpoint" + assert output[0]["json.{}".format(self.prefix)] == message + def test_http_endpoint_cel_request(self): """ Test http_endpoint input with HTTP events using CEL. From e9017535a124dc68c362021056da822a32fdebfb Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Tue, 14 May 2024 08:47:55 +0930 Subject: [PATCH 10/13] x-pack/filebeat/input/http_endpoint: allow user to place events at doc root --- .../docs/inputs/input-http-endpoint.asciidoc | 13 +++++++++++- .../filebeat/input/http_endpoint/handler.go | 12 ++++++----- .../input/http_endpoint/handler_test.go | 20 +++++++++++++++++++ 3 files changed, 39 insertions(+), 6 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc index 9292ec6b8a9..1cd445fe452 100644 --- a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc @@ -81,6 +81,17 @@ Custom response example: prefix: "json" ---- +Map request to root of document example: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: http_endpoint + enabled: true + listen_address: 192.168.1.1 + listen_port: 8080 + prefix: "." +---- + Multiple endpoints example: ["source","yaml",subs="attributes"] ---- @@ -306,7 +317,7 @@ This options specific which URL path to accept requests on. Defaults to `/` [float] ==== `prefix` -This option specifies which prefix the incoming request will be mapped to. +This option specifies which prefix the incoming request will be mapped to. If `prefix` is "`.`", the request will be mapped to the root of the resulting document. [float] ==== `include_headers` diff --git a/x-pack/filebeat/input/http_endpoint/handler.go b/x-pack/filebeat/input/http_endpoint/handler.go index bdb98e0d3c7..4555a869c0d 100644 --- a/x-pack/filebeat/input/http_endpoint/handler.go +++ b/x-pack/filebeat/input/http_endpoint/handler.go @@ -307,9 +307,15 @@ func (h *handler) sendResponse(w http.ResponseWriter, status int, message string func (h *handler) publishEvent(obj, headers mapstr.M, acker *batchACKTracker) error { event := beat.Event{ Timestamp: time.Now().UTC(), - Fields: mapstr.M{}, Private: acker, } + if h.messageField == "." { + event.Fields = obj + } else { + if _, err := event.PutValue(h.messageField, obj); err != nil { + return fmt.Errorf("failed to put data into event key %q: %w", h.messageField, err) + } + } if h.preserveOriginalEvent { event.Fields["event"] = mapstr.M{ "original": obj.String(), @@ -319,10 +325,6 @@ func (h *handler) publishEvent(obj, headers mapstr.M, acker *batchACKTracker) er event.Fields["headers"] = headers } - if _, err := event.PutValue(h.messageField, obj); err != nil { - return fmt.Errorf("failed to put data into event key %q: %w", h.messageField, err) - } - h.publish(event) return nil } diff --git a/x-pack/filebeat/input/http_endpoint/handler_test.go b/x-pack/filebeat/input/http_endpoint/handler_test.go index 61bedb7e679..4c464a34f50 100644 --- a/x-pack/filebeat/input/http_endpoint/handler_test.go +++ b/x-pack/filebeat/input/http_endpoint/handler_test.go @@ -239,6 +239,26 @@ func Test_apiResponse(t *testing.T) { wantStatus: http.StatusOK, wantResponse: `{"message": "success"}`, }, + { + name: "single_event_root", + conf: func() config { + c := defaultConfig() + c.Prefix = "." + return c + }(), + request: func() *http.Request { + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewBufferString(`{"id":0}`)) + req.Header.Set("Content-Type", "application/json") + return req + }(), + events: []mapstr.M{ + { + "id": int64(0), + }, + }, + wantStatus: http.StatusOK, + wantResponse: `{"message": "success"}`, + }, { name: "single_event_gzip", conf: defaultConfig(), From b7d7f50f7e650a832384ac1583e85920c873ee40 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Wed, 15 May 2024 06:56:19 +0930 Subject: [PATCH 11/13] address pr comment --- x-pack/filebeat/input/http_endpoint/handler.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/filebeat/input/http_endpoint/handler.go b/x-pack/filebeat/input/http_endpoint/handler.go index 4555a869c0d..b799248a935 100644 --- a/x-pack/filebeat/input/http_endpoint/handler.go +++ b/x-pack/filebeat/input/http_endpoint/handler.go @@ -196,6 +196,8 @@ func getTimeoutWait(u *url.URL, log *logp.Logger) (time.Duration, error) { return 0, nil case 1: if _, ok := q["wait_for_completion_timeout"]; !ok { + // Get the only key in q. We don't know what it is, so iterate + // over the first one of one. var k string for k = range q { break From 19ce29d48c34c9f8a3bd431e09178fad0ecb1aed Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Fri, 17 May 2024 10:51:47 +0930 Subject: [PATCH 12/13] address pr comments --- x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc index 1cd445fe452..bc0ec78cdf8 100644 --- a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc @@ -46,9 +46,9 @@ These are the possible response codes from the server. The endpoint will enforce end-to-end ACK when a URL query parameter `wait_for_completion_timeout` with a duration is provided. For example `http://localhost:8080/?wait_for_completion_timeout=1m` will wait up -to 1min for the event to be published to the cluster and then return +to 1 minute for the event to be published to the cluster and then return the user-defined response message. In the case that the publication -does not happen within the timeout duration, the HTTP response will +does not complete within the timeout duration, the HTTP response will have a 504 Gateway Timeout status code. The syntax for durations is a number followed by units which may be h, m and s. No other HTTP query is accepted. If another query parameter is provided or duration syntax From 0d7f5818679cf8f4c38697ba87c279a74c948101 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Fri, 17 May 2024 11:15:46 +0930 Subject: [PATCH 13/13] x-pack/filebeat/input/http_endpoint: fix request trace filename handling The filename coming from integrations may include a * which is intended to be replaced with the data stream ID. The code in place does not do this, so add it. --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/http_endpoint/input.go | 16 ++++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index fc9ee6c44bf..96462205dc8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -144,6 +144,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Upgrade azure-event-hubs-go and azure-storage-blob-go dependencies. {pull}38861[38861] - Fix concurrency/error handling bugs in the AWS S3 input that could drop data and prevent ingestion of large buckets. {pull}39131[39131] - Fix EntraID query handling. {issue}39419[39419] {pull}39420[39420] +- Fix request trace filename handling in http_endpoint input. {pull}39410[39410] *Heartbeat* diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index 1968ced0bb6..6737a9b9aa0 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -16,7 +16,9 @@ import ( "net" "net/http" "net/url" + "path/filepath" "reflect" + "strings" "sync" "time" @@ -104,6 +106,11 @@ func (e *httpEndpoint) Run(ctx v2.Context, pipeline beat.Pipeline) error { metrics := newInputMetrics(ctx.ID) defer metrics.Close() + if e.config.Tracer != nil { + id := sanitizeFileName(ctx.ID) + e.config.Tracer.Filename = strings.ReplaceAll(e.config.Tracer.Filename, "*", id) + } + client, err := pipeline.ConnectWith(beat.ClientConfig{ EventListener: newEventACKHandler(), }) @@ -119,6 +126,15 @@ func (e *httpEndpoint) Run(ctx v2.Context, pipeline beat.Pipeline) error { return nil } +// sanitizeFileName returns name with ":" and "/" replaced with "_", removing repeated instances. +// The request.tracer.filename may have ":" when a http_endpoint input has cursor config and +// the macOS Finder will treat this as path-separator and causes to show up strange filepaths. +func sanitizeFileName(name string) string { + name = strings.ReplaceAll(name, ":", string(filepath.Separator)) + name = filepath.Clean(name) + return strings.ReplaceAll(name, string(filepath.Separator), "_") +} + // servers is the package-level server pool. var servers = pool{servers: make(map[string]*server)}