Skip to content

Commit fdff29b

Browse files
mergify[bot]efd6
andauthored
[8.19](backport #44365) x-pack/filebeat/input/httpjson: add fleet input status updating (#44941)
* x-pack/filebeat/input/httpjson: add fleet input status updating (#44365) (cherry picked from commit c203b82) # Conflicts: # docs/reference/filebeat/filebeat-input-httpjson.md # x-pack/filebeat/input/httpjson/transform_set_test.go * remove irrelevant changelog entry * remove incorrectly cherry-picked markdown docs * resolve conflicts * add asciidoc documentation --------- Co-authored-by: Dan Kortschak <dan.kortschak@elastic.co>
1 parent a383dd2 commit fdff29b

28 files changed

+273
-161
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
411411
- Introduce lastSync start position to AWS CloudWatch input backed by state registry. {pull}43251[43251]
412412
- Add Fleet status update functionality to udp input. {issue}44419[44419] {pull}44785[44785]
413413
- Filestream now logs at level warn the number of files that are too small to be ingested {pull}44751[44751]
414+
- Add Fleet status updating to HTTP JSON input. {issue}44282[44282] {pull}44365[44365]
414415

415416
*Auditbeat*
416417

x-pack/filebeat/docs/inputs/input-httpjson.asciidoc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,15 @@ e.g. instead of rate-limiting when `remaining` hits `0`, rate-limiting will occu
572572

573573
It is not set by default (by default the rate-limiting as specified in the Response is followed).
574574

575+
[[request-rate-limit-max-non-degraded]]
576+
[float]
577+
==== `request.rate_limit.max_non_degraded`
578+
579+
The maximum number of consecutive 429 rate limit responses from the endpoint to accept
580+
before making an input health status update. If this is not set, rate limit responses
581+
will not be reported as health updates. Default behavior is to not report rate limiting
582+
responses.
583+
575584
[[request-transforms]]
576585
[float]
577586
==== `request.transforms`

x-pack/filebeat/input/httpjson/config_request.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,11 @@ func (c retryConfig) getWaitMax() time.Duration {
6060
}
6161

6262
type rateLimitConfig struct {
63-
Limit *valueTpl `config:"limit"`
64-
Reset *valueTpl `config:"reset"`
65-
Remaining *valueTpl `config:"remaining"`
66-
EarlyLimit *float64 `config:"early_limit"`
63+
Limit *valueTpl `config:"limit"`
64+
Reset *valueTpl `config:"reset"`
65+
Remaining *valueTpl `config:"remaining"`
66+
EarlyLimit *float64 `config:"early_limit"`
67+
MaxNonDegraded *int `config:"max_non_degraded"`
6768
}
6869

6970
func (c rateLimitConfig) Validate() error {
@@ -160,7 +161,7 @@ func (c *requestConfig) Validate() error {
160161
return fmt.Errorf("unsupported method %q", c.Method)
161162
}
162163

163-
if _, err := newBasicTransformsFromConfig(registeredTransforms, c.Transforms, requestNamespace, nil); err != nil {
164+
if _, err := newBasicTransformsFromConfig(registeredTransforms, c.Transforms, requestNamespace, noopReporter{}, nil); err != nil {
164165
return err
165166
}
166167

x-pack/filebeat/input/httpjson/config_response.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ type splitConfig struct {
3737
}
3838

3939
func (c *responseConfig) Validate() error {
40-
if _, err := newBasicTransformsFromConfig(registeredTransforms, c.Transforms, responseNamespace, nil); err != nil {
40+
if _, err := newBasicTransformsFromConfig(registeredTransforms, c.Transforms, responseNamespace, noopReporter{}, nil); err != nil {
4141
return err
4242
}
43-
if _, err := newBasicTransformsFromConfig(registeredTransforms, c.Pagination, paginationNamespace, nil); err != nil {
43+
if _, err := newBasicTransformsFromConfig(registeredTransforms, c.Pagination, paginationNamespace, noopReporter{}, nil); err != nil {
4444
return err
4545
}
4646
if c.DecodeAs != "" {
@@ -52,7 +52,7 @@ func (c *responseConfig) Validate() error {
5252
}
5353

5454
func (c *splitConfig) Validate() error {
55-
if _, err := newBasicTransformsFromConfig(registeredTransforms, c.Transforms, responseNamespace, nil); err != nil {
55+
if _, err := newBasicTransformsFromConfig(registeredTransforms, c.Transforms, responseNamespace, noopReporter{}, nil); err != nil {
5656
return err
5757
}
5858

@@ -71,7 +71,7 @@ func (c *splitConfig) Validate() error {
7171
return fmt.Errorf("invalid split type: %s", c.Type)
7272
}
7373

74-
if _, err := newSplitResponse(c, nil); err != nil {
74+
if _, err := newSplitResponse(c, noopReporter{}, nil); err != nil {
7575
return err
7676
}
7777

x-pack/filebeat/input/httpjson/cursor.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package httpjson
66

77
import (
88
inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
9+
"github.com/elastic/beats/v7/libbeat/management/status"
910
"github.com/elastic/elastic-agent-libs/logp"
1011
"github.com/elastic/elastic-agent-libs/mapstr"
1112
)
@@ -15,11 +16,12 @@ type cursor struct {
1516

1617
state mapstr.M
1718

18-
log *logp.Logger
19+
status status.StatusReporter
20+
log *logp.Logger
1921
}
2022

21-
func newCursor(cfg cursorConfig, log *logp.Logger) *cursor {
22-
return &cursor{cfg: cfg, log: log}
23+
func newCursor(cfg cursorConfig, stat status.StatusReporter, log *logp.Logger) *cursor {
24+
return &cursor{cfg: cfg, status: stat, log: log}
2325
}
2426

2527
func (c *cursor) load(cursor *inputcursor.Cursor) {
@@ -34,6 +36,7 @@ func (c *cursor) load(cursor *inputcursor.Cursor) {
3436

3537
if err := cursor.Unpack(&c.state); err != nil {
3638
c.log.Errorf("Reset cursor state. Failed to read from registry: %v", err)
39+
c.status.UpdateStatus(status.Degraded, "failed to load cursor: "+err.Error())
3740
return
3841
}
3942

@@ -50,7 +53,7 @@ func (c *cursor) update(trCtx *transformContext) {
5053
}
5154

5255
for k, cfg := range c.cfg {
53-
v, _ := cfg.Value.Execute(trCtx, transformable{}, k, cfg.Default, c.log)
56+
v, _ := cfg.Value.Execute(trCtx, transformable{}, k, cfg.Default, c.status, c.log)
5457
if v != "" || !cfg.mustIgnoreEmptyValue() {
5558
_, _ = c.state.Put(k, v)
5659
c.log.Debugf("cursor.%s stored with %s", k, v)

x-pack/filebeat/input/httpjson/cursor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func TestCursorUpdate(t *testing.T) {
110110
conf := cursorConfig{}
111111
require.NoError(t, cfg.Unpack(&conf))
112112

113-
c := newCursor(conf, logp.NewLogger("cursor-test"))
113+
c := newCursor(conf, noopReporter{}, logp.NewLogger("cursor-test"))
114114
c.state = tc.initialState
115115
c.update(tc.trCtx)
116116
assert.Equal(t, tc.expectedState, c.state)

x-pack/filebeat/input/httpjson/input.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
3131
"github.com/elastic/beats/v7/libbeat/beat"
3232
"github.com/elastic/beats/v7/libbeat/feature"
33+
"github.com/elastic/beats/v7/libbeat/management/status"
3334
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
3435
"github.com/elastic/beats/v7/libbeat/statestore"
3536
"github.com/elastic/beats/v7/libbeat/version"
@@ -175,8 +176,14 @@ func runWithMetrics(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr
175176
}
176177

177178
func run(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcursor.Cursor, reg *monitoring.Registry) error {
178-
log := ctx.Logger.With("input_url", cfg.Request.URL)
179+
stat := ctx.StatusReporter
180+
if stat == nil {
181+
stat = noopReporter{}
182+
}
183+
stat.UpdateStatus(status.Starting, "")
184+
stat.UpdateStatus(status.Configuring, "")
179185

186+
log := ctx.Logger.With("input_url", cfg.Request.URL)
180187
stdCtx := ctxtool.FromCanceller(ctx.Cancelation)
181188

182189
if cfg.Request.Tracer != nil {
@@ -196,30 +203,33 @@ func run(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcurso
196203

197204
metrics := newInputMetrics(reg)
198205

199-
client, err := newHTTPClient(stdCtx, cfg, log, reg)
206+
client, err := newHTTPClient(stdCtx, cfg, stat, log, reg)
200207
if err != nil {
208+
stat.UpdateStatus(status.Failed, "failed to create HTTP client: "+err.Error())
201209
return err
202210
}
203211

204-
requestFactory, err := newRequestFactory(stdCtx, cfg, log, metrics, reg)
212+
requestFactory, err := newRequestFactory(stdCtx, cfg, stat, log, metrics, reg)
205213
if err != nil {
206214
log.Errorf("Error while creating requestFactory: %v", err)
215+
stat.UpdateStatus(status.Failed, "failed to create request factory: "+err.Error())
207216
return err
208217
}
209218
var xmlDetails map[string]xml.Detail
210219
if cfg.Response.XSD != "" {
211220
xmlDetails, err = xml.Details([]byte(cfg.Response.XSD))
212221
if err != nil {
213222
log.Errorf("error while collecting xml decoder type hints: %v", err)
223+
stat.UpdateStatus(status.Failed, "error while collecting xml decoder type hints: "+err.Error())
214224
return err
215225
}
216226
}
217-
pagination := newPagination(cfg, client, log)
218-
responseProcessor := newResponseProcessor(cfg, pagination, xmlDetails, metrics, log)
219-
requester := newRequester(client, requestFactory, responseProcessor, metrics, log)
227+
pagination := newPagination(cfg, client, stat, log)
228+
responseProcessor := newResponseProcessor(cfg, pagination, xmlDetails, metrics, stat, log)
229+
requester := newRequester(client, requestFactory, responseProcessor, metrics, stat, log)
220230

221231
trCtx := emptyTransformContext()
222-
trCtx.cursor = newCursor(cfg.Cursor, log)
232+
trCtx.cursor = newCursor(cfg.Cursor, stat, log)
223233
trCtx.cursor.load(crsr)
224234

225235
doFunc := func() error {
@@ -241,6 +251,7 @@ func run(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcurso
241251
metrics.updateIntervalMetrics(err, startTime)
242252

243253
if err := stdCtx.Err(); err != nil {
254+
stat.UpdateStatus(status.Stopping, "")
244255
return err
245256
}
246257

@@ -254,10 +265,14 @@ func run(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcurso
254265
}
255266

256267
log.Infof("Input stopped because context was cancelled with: %v", err)
257-
268+
stat.UpdateStatus(status.Stopped, "")
258269
return nil
259270
}
260271

272+
type noopReporter struct{}
273+
274+
func (noopReporter) UpdateStatus(status.Status, string) {}
275+
261276
// sanitizeFileName returns name with ":" and "/" replaced with "_", removing repeated instances.
262277
// The request.tracer.filename may have ":" when a httpjson input has cursor config and
263278
// the macOS Finder will treat this as path-separator and causes to show up strange filepaths.
@@ -267,7 +282,7 @@ func sanitizeFileName(name string) string {
267282
return strings.ReplaceAll(name, string(filepath.Separator), "_")
268283
}
269284

270-
func newHTTPClient(ctx context.Context, config config, log *logp.Logger, reg *monitoring.Registry) (*httpClient, error) {
285+
func newHTTPClient(ctx context.Context, config config, stat status.StatusReporter, log *logp.Logger, reg *monitoring.Registry) (*httpClient, error) {
271286
client, err := newNetHTTPClient(ctx, config.Request, log, reg)
272287
if err != nil {
273288
return nil, err
@@ -286,7 +301,7 @@ func newHTTPClient(ctx context.Context, config config, log *logp.Logger, reg *mo
286301
}).StandardClient()
287302
}
288303

289-
limiter := newRateLimiterFromConfig(config.Request.RateLimit, log)
304+
limiter := newRateLimiterFromConfig(config.Request.RateLimit, stat, log)
290305

291306
if config.Auth.OAuth2.isEnabled() {
292307
authClient, err := config.Auth.OAuth2.client(ctx, client)
@@ -356,7 +371,7 @@ func newNetHTTPClient(ctx context.Context, cfg *requestConfig, log *logp.Logger,
356371
return netHTTPClient, nil
357372
}
358373

359-
func newChainHTTPClient(ctx context.Context, authCfg *authConfig, requestCfg *requestConfig, log *logp.Logger, reg *monitoring.Registry, p ...*Policy) (*httpClient, error) {
374+
func newChainHTTPClient(ctx context.Context, authCfg *authConfig, requestCfg *requestConfig, stat status.StatusReporter, log *logp.Logger, reg *monitoring.Registry, p ...*Policy) (*httpClient, error) {
360375
client, err := newNetHTTPClient(ctx, requestCfg, log, reg)
361376
if err != nil {
362377
return nil, err
@@ -382,7 +397,7 @@ func newChainHTTPClient(ctx context.Context, authCfg *authConfig, requestCfg *re
382397
}).StandardClient()
383398
}
384399

385-
limiter := newRateLimiterFromConfig(requestCfg.RateLimit, log)
400+
limiter := newRateLimiterFromConfig(requestCfg.RateLimit, stat, log)
386401

387402
if authCfg != nil && authCfg.OAuth2.isEnabled() {
388403
authClient, err := authCfg.OAuth2.client(ctx, client)

x-pack/filebeat/input/httpjson/pagination.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"net/http"
1212
"net/url"
1313

14+
"github.com/elastic/beats/v7/libbeat/management/status"
1415
"github.com/elastic/mito/lib/xml"
1516

1617
"github.com/elastic/elastic-agent-libs/logp"
@@ -23,11 +24,12 @@ type pagination struct {
2324
client *httpClient
2425
requestFactory *requestFactory
2526
decoder decoderFunc
27+
status status.StatusReporter
2628
log *logp.Logger
2729
}
2830

29-
func newPagination(config config, client *httpClient, log *logp.Logger) *pagination {
30-
pagination := &pagination{client: client, log: log}
31+
func newPagination(config config, client *httpClient, stat status.StatusReporter, log *logp.Logger) *pagination {
32+
pagination := &pagination{client: client, status: stat, log: log}
3133
if config.Response == nil {
3234
return pagination
3335
}
@@ -38,8 +40,8 @@ func newPagination(config config, client *httpClient, log *logp.Logger) *paginat
3840
return pagination
3941
}
4042

41-
rts, _ := newBasicTransformsFromConfig(registeredTransforms, config.Request.Transforms, requestNamespace, log)
42-
pts, _ := newBasicTransformsFromConfig(registeredTransforms, config.Response.Pagination, paginationNamespace, log)
43+
rts, _ := newBasicTransformsFromConfig(registeredTransforms, config.Request.Transforms, requestNamespace, stat, log)
44+
pts, _ := newBasicTransformsFromConfig(registeredTransforms, config.Response.Pagination, paginationNamespace, stat, log)
4345

4446
body := func() *mapstr.M {
4547
if config.Response.RequestBodyOnPagination {
@@ -55,13 +57,14 @@ func newPagination(config config, client *httpClient, log *logp.Logger) *paginat
5557
body,
5658
append(rts, pts...),
5759
config.Auth,
60+
stat,
5861
log,
5962
)
6063
pagination.requestFactory = requestFactory
6164
return pagination
6265
}
6366

64-
func newPaginationRequestFactory(method, encodeAs string, url url.URL, body *mapstr.M, ts []basicTransform, authConfig *authConfig, log *logp.Logger) *requestFactory {
67+
func newPaginationRequestFactory(method, encodeAs string, url url.URL, body *mapstr.M, ts []basicTransform, authConfig *authConfig, stat status.StatusReporter, log *logp.Logger) *requestFactory {
6568
// config validation already checked for errors here
6669
rf := &requestFactory{
6770
url: url,
@@ -92,16 +95,19 @@ type pageIterator struct {
9295
done bool
9396

9497
n int64
98+
99+
status status.StatusReporter
95100
}
96101

97-
func (p *pagination) newPageIterator(stdCtx context.Context, trCtx *transformContext, resp *http.Response, xmlDetails map[string]xml.Detail) *pageIterator {
102+
func (p *pagination) newPageIterator(stdCtx context.Context, trCtx *transformContext, resp *http.Response, xmlDetails map[string]xml.Detail, stat status.StatusReporter) *pageIterator {
98103
return &pageIterator{
99104
pagination: p,
100105
stdCtx: stdCtx,
101106
trCtx: trCtx,
102107
resp: resp,
103108
xmlDetails: xmlDetails,
104109
isFirst: true,
110+
status: stat,
105111
}
106112
}
107113

@@ -161,6 +167,7 @@ func (iter *pageIterator) next() (*response, bool, error) {
161167
func (iter *pageIterator) getPage() (*response, error) {
162168
bodyBytes, err := io.ReadAll(iter.resp.Body)
163169
if err != nil {
170+
iter.status.UpdateStatus(status.Degraded, "failed to read page body: "+err.Error())
164171
return nil, err
165172
}
166173
iter.resp.Body.Close()
@@ -182,6 +189,7 @@ func (iter *pageIterator) getPage() (*response, error) {
182189
err = decode(iter.resp.Header.Get("Content-Type"), bodyBytes, &r)
183190
}
184191
if err != nil {
192+
iter.status.UpdateStatus(status.Degraded, "failed to decode page: "+err.Error())
185193
return nil, err
186194
}
187195
}

x-pack/filebeat/input/httpjson/policy.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ import (
1515
"net/url"
1616
"regexp"
1717

18+
"github.com/elastic/beats/v7/libbeat/management/status"
1819
"github.com/elastic/elastic-agent-libs/logp"
1920
)
2021

2122
var (
22-
2323
// A regular expression to match the error returned by net/http when the
2424
// configured number of redirects is exhausted. This error isn't typed
2525
// specifically so we resort to matching on the error string.
@@ -36,20 +36,22 @@ var (
3636
// field value present in data using the defined operator/function in the given expression.
3737
// Example : [[ eq .last_response.body.status "completed" ]] -- which means here data is a http response
3838
// containing a field "status" under the field "body" , and value status should be equal to the string "completed"
39-
type Evaluate func(expression *valueTpl, data []byte, log *logp.Logger) (bool, error)
39+
type Evaluate func(expression *valueTpl, data []byte, stat status.StatusReporter, log *logp.Logger) (bool, error)
4040

4141
// Policy is responsible for maintaining different http client policies
4242
// Currently just contains a retry policy function
4343
type Policy struct {
4444
fn Evaluate
4545
expression *valueTpl
46+
status status.StatusReporter
4647
log *logp.Logger
4748
}
4849

49-
func newHTTPPolicy(fn Evaluate, expression *valueTpl, log *logp.Logger) *Policy {
50+
func newHTTPPolicy(fn Evaluate, expression *valueTpl, stat status.StatusReporter, log *logp.Logger) *Policy {
5051
return &Policy{
5152
fn: fn,
5253
expression: expression,
54+
status: stat,
5355
log: log,
5456
}
5557
}
@@ -115,7 +117,7 @@ func (p *Policy) CustomRetryPolicy(ctx context.Context, resp *http.Response, err
115117
}
116118
resp.Body = io.NopCloser(bytes.NewBuffer(body))
117119

118-
result, err := p.fn(p.expression, body, p.log)
120+
result, err := p.fn(p.expression, body, p.status, p.log)
119121
if err != nil {
120122
return retry, err
121123
}

x-pack/filebeat/input/httpjson/policy_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ func TestPolicy_CustomRetryPolicy(t *testing.T) {
136136
p := &Policy{
137137
fn: tt.fields.fn,
138138
expression: tt.fields.expression,
139+
status: noopReporter{},
139140
log: tt.fields.log,
140141
}
141142
got, err := p.CustomRetryPolicy(tt.args.ctx, tt.args.resp, tt.args.err)

0 commit comments

Comments
 (0)