Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][httpjson] Add http metrics to httpjson input #35392

Merged
merged 30 commits into from Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
bcb0104
Add http metrics to httpjson input
marc-gr May 9, 2023
0e830be
Add changelog entry
marc-gr May 9, 2023
15c0b3c
Document the exposed metrics
marc-gr May 9, 2023
43e8791
Move response total counter out of error block
marc-gr May 9, 2023
6a1bcfa
Instrument chain HTTP client
marc-gr May 9, 2023
e71d625
Add httpjson interval metrics
marc-gr May 10, 2023
411a5d6
Update docs
marc-gr May 10, 2023
b142b01
Merge branch 'main' into feat/add-httpjson-metrics
marc-gr May 10, 2023
8f8bde5
Format imports
marc-gr May 10, 2023
565e131
Merge remote-tracking branch 'origin/feat/add-httpjson-metrics' into …
marc-gr May 10, 2023
6f3d807
Merge branch 'main' into feat/add-httpjson-metrics
marc-gr May 10, 2023
f5012c8
Account for possible existing metric on passed in registry
marc-gr May 10, 2023
8b59275
Merge remote-tracking branch 'origin/feat/add-httpjson-metrics' into …
marc-gr May 10, 2023
6b9fcb5
Merge branch 'main' into feat/add-httpjson-metrics
marc-gr May 11, 2023
67e6e2e
Count pages for all responses
marc-gr May 11, 2023
1a33725
Merge remote-tracking branch 'origin/feat/add-httpjson-metrics' into …
marc-gr May 11, 2023
aaf1164
Rename metrics to be flat and add metrics gathering test
marc-gr May 26, 2023
9220ff7
Merge remote-tracking branch 'upstream/main' into feat/add-httpjson-m…
marc-gr May 26, 2023
1240b64
Rename in docs
marc-gr May 26, 2023
f4d3dfd
Rename in docs
marc-gr May 26, 2023
0d20516
Add total body bytes metrics
marc-gr May 26, 2023
5f26918
Merge branch 'main' into feat/add-httpjson-metrics
marc-gr May 26, 2023
39fc186
Add _total suffix to all relevant metrics
marc-gr May 31, 2023
35818fc
Merge remote-tracking branch 'origin/feat/add-httpjson-metrics' into …
marc-gr May 31, 2023
800b4f9
Merge remote-tracking branch 'upstream/main' into feat/add-httpjson-m…
marc-gr May 31, 2023
92f5d24
Make test check against runtime values
marc-gr May 31, 2023
cc31f76
Use http variables
marc-gr May 31, 2023
686ecaa
Merge remote-tracking branch 'upstream/main' into feat/add-httpjson-m…
marc-gr May 31, 2023
44cc9ea
Simplify test for runtime variability
marc-gr May 31, 2023
e1c03bd
Merge remote-tracking branch 'upstream/main' into feat/add-httpjson-m…
marc-gr May 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions CHANGELOG.next.asciidoc
Expand Up @@ -291,13 +291,12 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add execution budget to CEL input. {pull}35409[35409]
- Add XML decoding support to HTTPJSON. {issue}34438[34438] {pull}35235[35235]
- Add delegated account support when using Google ADC in `httpjson` input. {pull}35507[35507]
- Add support for collecting `httpjson` metrics. {pull}35392[35392]
- Add XML decoding support to CEL. {issue}34438[34438] {pull}35372[35372]

*Auditbeat*
- Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817]

*Filebeat*

*Libbeat*
- Added support for apache parquet file reader. {issue}34662[34662] {pull}35183[35183]

Expand Down
39 changes: 39 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Expand Up @@ -1525,6 +1525,45 @@ image:images/input-httpjson-chain-lifecycle.png[Chain Request lifecycle]
. Go back to step-2 for the next step.
. Publish collected responses from the last chain step.


[float]
=== Metrics

This input exposes metrics under the <<http-endpoint, HTTP monitoring endpoint>>.
These metrics are exposed under the `/inputs` path. They can be used to
observe the activity of the input.

[options="header"]
|=======
| Metric | Description
| `http_request_total` | Total number of processed requests.
| `http_request_errors_total` | Total number of request errors.
| `http_request_delete_total` | Total number of `DELETE` requests.
| `http_request_get_total` | Total number of `GET` requests.
| `http_request_head_total` | Total number of `HEAD` requests.
| `http_request_options_total` | Total number of `OPTIONS` requests.
| `http_request_patch_total` | Total number of `PATCH` requests.
| `http_request_post_total` | Total number of `POST` requests.
| `http_request_put_total` | Total number of `PUT` requests.
| `http_request_body_bytes_total` | Total of the requests body size.
| `http_request_body_bytes` | Histogram of the requests body size.
| `http_response_total` | Total number of responses received.
| `http_response_errors_total` | Total number of response errors.
| `http_response_1xx_total` | Total number of `1xx` responses.
| `http_response_2xx_total` | Total number of `2xx` responses.
| `http_response_3xx_total` | Total number of `3xx` responses.
| `http_response_4xx_total` | Total number of `4xx` responses.
| `http_response_5xx_total` | Total number of `5xx` responses.
| `http_response_body_bytes_total` | Total of the responses body size.
| `http_response_body_bytes` | Histogram of the responses body size.
| `http_round_trip_time` | Histogram of the round trip time.
| `httpjson_interval_total` | Total number of intervals executed.
| `httpjson_interval_errors_total` | Total number of interval errors.
| `httpjson_interval_execution_time` | Histogram of the interval execution time.
| `httpjson_interval_pages_total` | Histogram of the total number of pages per interval.
| `httpjson_interval_pages_execution_time` | Histogram of the interval pages execution time.
|=======

[id="{beatname_lc}-input-{type}-common-options"]
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]

Expand Down
87 changes: 61 additions & 26 deletions x-pack/filebeat/input/httpjson/input.go
Expand Up @@ -26,10 +26,12 @@ import (
inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/transport"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
"github.com/elastic/elastic-agent-libs/useragent"
Expand Down Expand Up @@ -105,11 +107,23 @@ func test(url *url.URL) error {
return nil
}

func runWithMetrics(
ctx v2.Context,
config config,
publisher inputcursor.Publisher,
cursor *inputcursor.Cursor,
) error {
reg, unreg := inputmon.NewInputRegistry("httpjson", ctx.ID, nil)
defer unreg()
return run(ctx, config, publisher, cursor, reg)
}

func run(
ctx v2.Context,
config config,
publisher inputcursor.Publisher,
cursor *inputcursor.Cursor,
reg *monitoring.Registry,
) error {
log := ctx.Logger.With("input_url", config.Request.URL)

Expand All @@ -120,12 +134,14 @@ func run(
config.Request.Tracer.Filename = strings.ReplaceAll(config.Request.Tracer.Filename, "*", id)
}

httpClient, err := newHTTPClient(stdCtx, config, log)
metrics := newInputMetrics(reg)

httpClient, err := newHTTPClient(stdCtx, config, log, reg)
if err != nil {
return err
}

requestFactory, err := newRequestFactory(stdCtx, config, log)
requestFactory, err := newRequestFactory(stdCtx, config, log, metrics, reg)
if err != nil {
log.Errorf("Error while creating requestFactory: %v", err)
return err
Expand All @@ -139,7 +155,7 @@ func run(
}
}
pagination := newPagination(config, httpClient, log)
responseProcessor := newResponseProcessor(config, pagination, xmlDetails, log)
responseProcessor := newResponseProcessor(config, pagination, xmlDetails, metrics, log)
requester := newRequester(httpClient, requestFactory, responseProcessor, log)

trCtx := emptyTransformContext()
Expand All @@ -149,11 +165,16 @@ func run(
doFunc := func() error {
log.Info("Process another repeated request.")

if err := requester.doRequest(stdCtx, trCtx, publisher); err != nil {
startTime := time.Now()

var err error
if err = requester.doRequest(stdCtx, trCtx, publisher); err != nil {
log.Errorf("Error while processing http request: %v", err)
}

if stdCtx.Err() != nil {
metrics.updateIntervalMetrics(err, startTime)

if err := stdCtx.Err(); err != nil {
return err
}

Expand All @@ -180,32 +201,13 @@ func sanitizeFileName(name string) string {
return strings.ReplaceAll(name, string(filepath.Separator), "_")
}

func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpClient, error) {
func newHTTPClient(ctx context.Context, config config, log *logp.Logger, reg *monitoring.Registry) (*httpClient, error) {
// Make retryable HTTP client
netHTTPClient, err := config.Request.Transport.Client(clientOptions(config.Request.URL.URL, config.Request.KeepAlive.settings())...)
netHTTPClient, err := newNetHTTPClient(ctx, config.Request, log, reg)
if err != nil {
return nil, err
}

if config.Request.Tracer != nil {
w := zapcore.AddSync(config.Request.Tracer)
go func() {
// Close the logger when we are done.
<-ctx.Done()
config.Request.Tracer.Close()
}()
core := ecszap.NewCore(
ecszap.NewDefaultEncoderConfig(),
w,
zap.DebugLevel,
)
traceLogger := zap.New(core)

netHTTPClient.Transport = httplog.NewLoggingRoundTripper(netHTTPClient.Transport, traceLogger)
}

netHTTPClient.CheckRedirect = checkRedirect(config.Request, log)

client := &retryablehttp.Client{
HTTPClient: netHTTPClient,
Logger: newRetryLogger(log),
Expand All @@ -229,6 +231,39 @@ func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpC
return &httpClient{client: client.StandardClient(), limiter: limiter}, nil
}

func newNetHTTPClient(ctx context.Context, cfg *requestConfig, log *logp.Logger, reg *monitoring.Registry) (*http.Client, error) {
// Make retryable HTTP client
netHTTPClient, err := cfg.Transport.Client(clientOptions(cfg.URL.URL, cfg.KeepAlive.settings())...)
if err != nil {
return nil, err
}

if cfg.Tracer != nil {
w := zapcore.AddSync(cfg.Tracer)
go func() {
// Close the logger when we are done.
<-ctx.Done()
cfg.Tracer.Close()
}()
core := ecszap.NewCore(
ecszap.NewDefaultEncoderConfig(),
w,
zap.DebugLevel,
)
traceLogger := zap.New(core)

netHTTPClient.Transport = httplog.NewLoggingRoundTripper(netHTTPClient.Transport, traceLogger)
}

if reg != nil {
netHTTPClient.Transport = httplog.NewMetricsRoundTripper(netHTTPClient.Transport, reg)
}

netHTTPClient.CheckRedirect = checkRedirect(cfg, log)

return netHTTPClient, nil
}

// clientOption returns constructed client configuration options, including
// setting up http+unix and http+npipe transports if requested.
func clientOptions(u *url.URL, keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/input_cursor.go
Expand Up @@ -52,5 +52,5 @@ func (in *cursorInput) Run(
publisher inputcursor.Publisher,
) error {
s := src.(*source)
return run(ctx, s.config, publisher, &cursor)
return runWithMetrics(ctx, s.config, publisher, &cursor)
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/input_stateless.go
Expand Up @@ -48,5 +48,5 @@ func (pub statelessPublisher) Publish(event beat.Event, _ interface{}) error {
// It will return on context cancellation, any other error will be retried.
func (in *statelessInput) Run(ctx v2.Context, publisher stateless.Publisher) error {
pub := statelessPublisher{wrapped: publisher}
return run(ctx, in.config, pub, nil)
return runWithMetrics(ctx, in.config, pub, nil)
}
70 changes: 70 additions & 0 deletions x-pack/filebeat/input/httpjson/metrics.go
@@ -0,0 +1,70 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package httpjson

import (
"time"

"github.com/rcrowley/go-metrics"

"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
)

type inputMetrics struct {
intervalExecutionTime metrics.Sample // histogram of the total time elapsed during an interval
intervalPageExecutionTime metrics.Sample // histogram of per page execution time during an interval
intervalPages metrics.Sample // histogram of pages per interval
intervals *monitoring.Uint // total number of intervals executed
intervalErrs *monitoring.Uint // total number of interval errors
}

func newInputMetrics(reg *monitoring.Registry) *inputMetrics {
if reg == nil {
return nil
}

out := &inputMetrics{
intervals: monitoring.NewUint(reg, "httpjson_interval_total"),
intervalErrs: monitoring.NewUint(reg, "httpjson_interval_errors_total"),
intervalExecutionTime: metrics.NewUniformSample(1024),
intervalPageExecutionTime: metrics.NewUniformSample(1024),
intervalPages: metrics.NewUniformSample(1024),
}

_ = adapter.GetGoMetrics(reg, "httpjson_interval_execution_time", adapter.Accept).
GetOrRegister("histogram", metrics.NewHistogram(out.intervalExecutionTime))
_ = adapter.GetGoMetrics(reg, "httpjson_interval_pages_execution_time", adapter.Accept).
GetOrRegister("histogram", metrics.NewHistogram(out.intervalPageExecutionTime))
_ = adapter.GetGoMetrics(reg, "httpjson_interval_pages_total", adapter.Accept).
GetOrRegister("histogram", metrics.NewHistogram(out.intervalPages))

return out
}

func (m *inputMetrics) updateIntervalMetrics(err error, t time.Time) {
if m == nil {
return
}
m.intervals.Add(1)
m.intervalExecutionTime.Update(time.Since(t).Nanoseconds())
if err != nil {
m.intervalErrs.Add(1)
}
}

func (m *inputMetrics) updatePageExecutionTime(t time.Time) {
if m == nil {
return
}
m.intervalPageExecutionTime.Update(time.Since(t).Nanoseconds())
}

func (m *inputMetrics) updatePagesPerInterval(npages int64) {
if m == nil {
return
}
m.intervalPages.Update(npages)
}