Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/http_endpoint: make input GA #39410

Merged
merged 13 commits into from
May 17, 2024
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Upgrade azure-event-hubs-go and azure-storage-blob-go dependencies. {pull}38861[38861]
- Fix concurrency/error handling bugs in the AWS S3 input that could drop data and prevent ingestion of large buckets. {pull}39131[39131]
- Fix EntraID query handling. {issue}39419[39419] {pull}39420[39420]
- Fix request trace filename handling in http_endpoint input. {pull}39410[39410]

*Heartbeat*

Expand Down Expand Up @@ -273,6 +274,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add default user-agent to CEL HTTP requests. {issue}39502[39502] {pull}39587[39587]
- Improve reindexing support in security module pipelines. {issue}38224[38224] {pull}[]
- Improve reindexing support in security module pipelines. {issue}38224[38224] {pull}39588[39588]
- Make HTTP Endpoint input GA. {issue}38979[38979] {pull}39410[39410]

*Auditbeat*

Expand Down
77 changes: 68 additions & 9 deletions x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@
<titleabbrev>HTTP Endpoint</titleabbrev>
++++

beta[]

The HTTP Endpoint input initializes a listening HTTP server that collects
incoming HTTP POST requests containing a JSON body. The body must be either an
object or an array of objects. Any other data types will result in an HTTP 400
(Bad Request) response. For arrays, one document is created for each object in
the array.
incoming HTTP POST requests containing a JSON body. The body must be either
an object or an array of objects, otherwise a Common Expression Language
expression that converts the the JSON body to these types can be provided.
Any other data types will result in an HTTP 400 (Bad Request) response. For
arrays, one document is created for each object in the array.

gzip encoded request bodies are supported if a `Content-Encoding: gzip` header
is sent with the request.
Expand All @@ -35,14 +34,27 @@ These are the possible response codes from the server.
|=========================================================================================================================================================
| HTTP Response Code | Name | Reason
| 200 | OK | Returned on success.
| 400 | Bad Request | Returned if JSON body decoding fails.
| 400 | Bad Request | Returned if JSON body decoding fails or if `wait_for_completion_timeout` query validation fails.
| 401 | Unauthorized | Returned when basic auth, secret header, or HMAC validation fails.
| 405 | Method Not Allowed | Returned if methods other than POST are used.
| 406 | Not Acceptable | Returned if the POST request does not contain a body.
| 415 | Unsupported Media Type | Returned if the Content-Type is not application/json. Or if Content-Encoding is present and is not gzip.
| 500 | Internal Server Error | Returned if an I/O error occurs reading the request.
| 504 | Gateway Timeout | Returned if a request publication cannot be ACKed within the required timeout.
|=========================================================================================================================================================

The endpoint will enforce end-to-end ACK when a URL query parameter
`wait_for_completion_timeout` with a duration is provided. For example
`http://localhost:8080/?wait_for_completion_timeout=1m` will wait up
to 1 minute for the event to be published to the cluster and then return
the user-defined response message. In the case that the publication
does not complete within the timeout duration, the HTTP response will
have a 504 Gateway Timeout status code. The syntax for durations is
a number followed by units which may be h, m and s. No other HTTP query
is accepted. If another query parameter is provided or duration syntax
is incorrect, the request will fail with an HTTP 400 "Bad Request"
status.

Example configurations:

Basic example:
Expand All @@ -69,6 +81,17 @@ Custom response example:
prefix: "json"
----

Map request to root of document example:
["source","yaml",subs="attributes"]
----
{beatname_lc}.inputs:
- type: http_endpoint
enabled: true
listen_address: 192.168.1.1
listen_port: 8080
prefix: "."
----

Multiple endpoints example:
["source","yaml",subs="attributes"]
----
Expand Down Expand Up @@ -171,6 +194,40 @@ Preserving original event and including headers in document
include_headers: ["TestHeader"]
----

Common Expression Language example:
["source","yaml",subs="attributes"]
----
{beatname_lc}.inputs:
- type: http_endpoint
enabled: true
listen_address: 192.168.1.1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion]
Use the loopback IP address so people can copy/paste the config as is for testing/experimenting.

Suggested change
listen_address: 192.168.1.1
listen_address: 127.0.0.1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in this case documentational consistency and clarity trumps copy/paste convenience; the other examples all use 192.168.1.1 and that address is an address that is not special. Changing either of those things I think harms clarity.

listen_port: 8080
program: |
obj.records.map(r, {
"requestId": obj.requestId,
"timestamp": string(obj.timestamp),
"event": r,
})
----
This example would allow handling of a JSON body that is an object containing
more than one event that each should be ingested as separate documents with
the common timestamp and request ID:
["source","json",subs="attributes"]
----
{
"requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f",
"timestamp": 1578090901599,
"records": [
{
"data": "event record 1"
},
{
"data": "event record 2"
}
]
}
----

==== Configuration options

The `http_endpoint` input supports the following configuration options plus the
Expand Down Expand Up @@ -230,7 +287,7 @@ In certain scenarios when the source of the request is not able to do that, it c
[float]
==== `program`

The normal operation of the input treats the body either as a single event when the body is an object, or as a set of events when the body is an array. If the body should be handled differently, for example a set of events in an array field of an object to be handled as a set of events, then a https://opensource.google.com/projects/cel[Common Expression Language (CEL)] program can be provided through this configuration field. No CEL extensions are provided beyond the function in the CEL https://github.com/google/cel-spec/blob/master/doc/langdef.md#standard[standard library]. CEL https://pkg.go.dev/github.com/google/cel-go/cel#OptionalTypes[optional types] are supported.
The normal operation of the input treats the body either as a single event when the body is an object, or as a set of events when the body is an array. If the body should be handled differently, for example a set of events in an array field of an object to be handled as a set of events, then a https://opensource.google.com/projects/cel[Common Expression Language (CEL)] program can be provided through this configuration field. The name of the object in the CEL program is `obj`. No CEL extensions are provided beyond the function in the CEL https://github.com/google/cel-spec/blob/master/doc/langdef.md#standard[standard library]. CEL https://pkg.go.dev/github.com/google/cel-go/cel#OptionalTypes[optional types] are supported.

[float]
==== `response_code`
Expand Down Expand Up @@ -260,7 +317,7 @@ This options specific which URL path to accept requests on. Defaults to `/`
[float]
==== `prefix`

This option specifies which prefix the incoming request will be mapped to.
This option specifies which prefix the incoming request will be mapped to. If `prefix` is "`.`", the request will be mapped to the root of the resulting document.

[float]
==== `include_headers`
Expand Down Expand Up @@ -346,10 +403,12 @@ observe the activity of the input.
| `api_errors_total` | Number of API errors.
| `batches_received_total` | Number of event arrays received.
| `batches_published_total` | Number of event arrays published.
| `batches_acked_total` | Number of event arrays ACKed.
| `events_published_total` | Number of events published.
| `size` | Histogram of request content lengths.
| `batch_size` | Histogram of the received event array length.
| `batch_processing_time` | Histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches).
| `batch_ack_time` | Histogram of the elapsed successful batch ACKing times in nanoseconds (time of handler start to time of ACK for non-empty batches).
|=======

[id="{beatname_lc}-input-{type}-common-options"]
Expand Down
77 changes: 77 additions & 0 deletions x-pack/filebeat/input/http_endpoint/ack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package http_endpoint

import (
"sync"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
)

// newEventACKHandler returns a beat ACKer that can receive callbacks when
// an event has been ACKed an output. If the event contains a private metadata
// pointing to a batchACKTracker then it will invoke the tracker's ACK() method
// to decrement the number of pending ACKs.
Comment on lines +14 to +17
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo

Suggested change
// newEventACKHandler returns a beat ACKer that can receive callbacks when
// an event has been ACKed an output. If the event contains a private metadata
// pointing to a batchACKTracker then it will invoke the tracker's ACK() method
// to decrement the number of pending ACKs.
// newEventACKHandler returns a beat ACKer that can receive callbacks when
// an event has been ACKed by an output. If the event contains a private metadata
// pointing to a batchACKTracker then it will invoke the tracker's ACK() method
// to decrement the number of pending ACKs.

func newEventACKHandler() beat.EventListener {
return acker.ConnectionOnly(
acker.EventPrivateReporter(func(_ int, privates []interface{}) {
for _, private := range privates {
if ack, ok := private.(*batchACKTracker); ok {
ack.ACK()
}
}
}),
)
}

// batchACKTracker invokes batchACK when all events associated to the batch
// have been published and acknowledged by an output.
type batchACKTracker struct {
batchACK func()

mu sync.Mutex
pending int64
}

// newBatchACKTracker returns a new batchACKTracker. The provided batchACK function
// is invoked after the full batch has been acknowledged. Ready() must be invoked
// after all events in the batch are published.
func newBatchACKTracker(fn func()) *batchACKTracker {
return &batchACKTracker{
batchACK: fn,
pending: 1, // Ready() must be called to consume this "1".
}
}

// Ready signals that the batch has been fully consumed. Only
// after the batch is marked as "ready" can the batch be ACKed.
// This prevents the batch from being ACKed prematurely.
func (t *batchACKTracker) Ready() {
t.ACK()
}

// Add increments the number of pending ACKs.
func (t *batchACKTracker) Add() {
t.mu.Lock()
t.pending++
t.mu.Unlock()
}

// ACK decrements the number of pending event ACKs. When all pending ACKs are
// received then the event batch is ACKed.
func (t *batchACKTracker) ACK() {
t.mu.Lock()
defer t.mu.Unlock()

if t.pending <= 0 {
panic("misuse detected: negative ACK counter")
}

t.pending--
if t.pending == 0 {
t.batchACK()
}
}
50 changes: 50 additions & 0 deletions x-pack/filebeat/input/http_endpoint/ack_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package http_endpoint

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestBatchACKTracker(t *testing.T) {
t.Run("empty", func(t *testing.T) {
tracker := make(ack)

acker := newBatchACKTracker(tracker.ACK)
require.False(t, tracker.wasACKed())

acker.Ready()
require.True(t, tracker.wasACKed())
})

t.Run("single_event", func(t *testing.T) {
tracker := make(ack)

acker := newBatchACKTracker(tracker.ACK)
acker.Add()
acker.ACK()
require.False(t, tracker.wasACKed())

acker.Ready()
require.True(t, tracker.wasACKed())
})
}

type ack chan struct{}

func (a ack) ACK() {
close(a)
}

func (a ack) wasACKed() bool {
select {
case <-a:
return true
default:
return false
}
}