Skip to content

Commit

Permalink
[filebeat][httpjson] Add http metrics to httpjson input (#35392)
Browse files Browse the repository at this point in the history
* Add http metrics to httpjson input

* Add changelog entry

* Document the exposed metrics

* Move response total counter out of error block

* Instrument chain HTTP client

* Add httpjson interval metrics

* Update docs

* Format imports

* Account for possible existing metric on passed in registry

* Count pages for all responses

* Rename metrics to be flat and add metrics gathering test

* Rename in docs

* Rename in docs

* Add total body bytes metrics

* Add _total suffix to all relevant metrics

* Make test check against runtime values

* Use http variables

* Simplify test for runtime variability
  • Loading branch information
marc-gr committed Jun 1, 2023
1 parent b9bdc54 commit 977d996
Show file tree
Hide file tree
Showing 13 changed files with 561 additions and 45 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.next.asciidoc
Expand Up @@ -292,14 +292,13 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add execution budget to CEL input. {pull}35409[35409]
- Add XML decoding support to HTTPJSON. {issue}34438[34438] {pull}35235[35235]
- Add delegated account support when using Google ADC in `httpjson` input. {pull}35507[35507]
- Add support for collecting `httpjson` metrics. {pull}35392[35392]
- Add XML decoding support to CEL. {issue}34438[34438] {pull}35372[35372]
- Mark CEL input as GA. {pull}35559[35559]

*Auditbeat*
- Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817]

*Filebeat*

*Libbeat*
- Added support for apache parquet file reader. {issue}34662[34662] {pull}35183[35183]

Expand Down
39 changes: 39 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Expand Up @@ -1525,6 +1525,45 @@ image:images/input-httpjson-chain-lifecycle.png[Chain Request lifecycle]
. Go back to step-2 for the next step.
. Publish collected responses from the last chain step.


[float]
=== Metrics

This input exposes metrics under the <<http-endpoint, HTTP monitoring endpoint>>.
These metrics are exposed under the `/inputs` path. They can be used to
observe the activity of the input.

[options="header"]
|=======
| Metric | Description
| `http_request_total` | Total number of processed requests.
| `http_request_errors_total` | Total number of request errors.
| `http_request_delete_total` | Total number of `DELETE` requests.
| `http_request_get_total` | Total number of `GET` requests.
| `http_request_head_total` | Total number of `HEAD` requests.
| `http_request_options_total` | Total number of `OPTIONS` requests.
| `http_request_patch_total` | Total number of `PATCH` requests.
| `http_request_post_total` | Total number of `POST` requests.
| `http_request_put_total` | Total number of `PUT` requests.
| `http_request_body_bytes_total` | Total of the requests body size.
| `http_request_body_bytes` | Histogram of the requests body size.
| `http_response_total` | Total number of responses received.
| `http_response_errors_total` | Total number of response errors.
| `http_response_1xx_total` | Total number of `1xx` responses.
| `http_response_2xx_total` | Total number of `2xx` responses.
| `http_response_3xx_total` | Total number of `3xx` responses.
| `http_response_4xx_total` | Total number of `4xx` responses.
| `http_response_5xx_total` | Total number of `5xx` responses.
| `http_response_body_bytes_total` | Total of the responses body size.
| `http_response_body_bytes` | Histogram of the responses body size.
| `http_round_trip_time` | Histogram of the round trip time.
| `httpjson_interval_total` | Total number of intervals executed.
| `httpjson_interval_errors_total` | Total number of interval errors.
| `httpjson_interval_execution_time` | Histogram of the interval execution time.
| `httpjson_interval_pages_total` | Histogram of the total number of pages per interval.
| `httpjson_interval_pages_execution_time` | Histogram of the interval pages execution time.
|=======

[id="{beatname_lc}-input-{type}-common-options"]
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]

Expand Down
87 changes: 61 additions & 26 deletions x-pack/filebeat/input/httpjson/input.go
Expand Up @@ -26,10 +26,12 @@ import (
inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/transport"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
"github.com/elastic/elastic-agent-libs/useragent"
Expand Down Expand Up @@ -105,11 +107,23 @@ func test(url *url.URL) error {
return nil
}

func runWithMetrics(
ctx v2.Context,
config config,
publisher inputcursor.Publisher,
cursor *inputcursor.Cursor,
) error {
reg, unreg := inputmon.NewInputRegistry("httpjson", ctx.ID, nil)
defer unreg()
return run(ctx, config, publisher, cursor, reg)
}

func run(
ctx v2.Context,
config config,
publisher inputcursor.Publisher,
cursor *inputcursor.Cursor,
reg *monitoring.Registry,
) error {
log := ctx.Logger.With("input_url", config.Request.URL)

Expand All @@ -120,12 +134,14 @@ func run(
config.Request.Tracer.Filename = strings.ReplaceAll(config.Request.Tracer.Filename, "*", id)
}

httpClient, err := newHTTPClient(stdCtx, config, log)
metrics := newInputMetrics(reg)

httpClient, err := newHTTPClient(stdCtx, config, log, reg)
if err != nil {
return err
}

requestFactory, err := newRequestFactory(stdCtx, config, log)
requestFactory, err := newRequestFactory(stdCtx, config, log, metrics, reg)
if err != nil {
log.Errorf("Error while creating requestFactory: %v", err)
return err
Expand All @@ -139,7 +155,7 @@ func run(
}
}
pagination := newPagination(config, httpClient, log)
responseProcessor := newResponseProcessor(config, pagination, xmlDetails, log)
responseProcessor := newResponseProcessor(config, pagination, xmlDetails, metrics, log)
requester := newRequester(httpClient, requestFactory, responseProcessor, log)

trCtx := emptyTransformContext()
Expand All @@ -149,11 +165,16 @@ func run(
doFunc := func() error {
log.Info("Process another repeated request.")

if err := requester.doRequest(stdCtx, trCtx, publisher); err != nil {
startTime := time.Now()

var err error
if err = requester.doRequest(stdCtx, trCtx, publisher); err != nil {
log.Errorf("Error while processing http request: %v", err)
}

if stdCtx.Err() != nil {
metrics.updateIntervalMetrics(err, startTime)

if err := stdCtx.Err(); err != nil {
return err
}

Expand All @@ -180,32 +201,13 @@ func sanitizeFileName(name string) string {
return strings.ReplaceAll(name, string(filepath.Separator), "_")
}

func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpClient, error) {
func newHTTPClient(ctx context.Context, config config, log *logp.Logger, reg *monitoring.Registry) (*httpClient, error) {
// Make retryable HTTP client
netHTTPClient, err := config.Request.Transport.Client(clientOptions(config.Request.URL.URL, config.Request.KeepAlive.settings())...)
netHTTPClient, err := newNetHTTPClient(ctx, config.Request, log, reg)
if err != nil {
return nil, err
}

if config.Request.Tracer != nil {
w := zapcore.AddSync(config.Request.Tracer)
go func() {
// Close the logger when we are done.
<-ctx.Done()
config.Request.Tracer.Close()
}()
core := ecszap.NewCore(
ecszap.NewDefaultEncoderConfig(),
w,
zap.DebugLevel,
)
traceLogger := zap.New(core)

netHTTPClient.Transport = httplog.NewLoggingRoundTripper(netHTTPClient.Transport, traceLogger)
}

netHTTPClient.CheckRedirect = checkRedirect(config.Request, log)

client := &retryablehttp.Client{
HTTPClient: netHTTPClient,
Logger: newRetryLogger(log),
Expand All @@ -229,6 +231,39 @@ func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpC
return &httpClient{client: client.StandardClient(), limiter: limiter}, nil
}

func newNetHTTPClient(ctx context.Context, cfg *requestConfig, log *logp.Logger, reg *monitoring.Registry) (*http.Client, error) {
// Make retryable HTTP client
netHTTPClient, err := cfg.Transport.Client(clientOptions(cfg.URL.URL, cfg.KeepAlive.settings())...)
if err != nil {
return nil, err
}

if cfg.Tracer != nil {
w := zapcore.AddSync(cfg.Tracer)
go func() {
// Close the logger when we are done.
<-ctx.Done()
cfg.Tracer.Close()
}()
core := ecszap.NewCore(
ecszap.NewDefaultEncoderConfig(),
w,
zap.DebugLevel,
)
traceLogger := zap.New(core)

netHTTPClient.Transport = httplog.NewLoggingRoundTripper(netHTTPClient.Transport, traceLogger)
}

if reg != nil {
netHTTPClient.Transport = httplog.NewMetricsRoundTripper(netHTTPClient.Transport, reg)
}

netHTTPClient.CheckRedirect = checkRedirect(cfg, log)

return netHTTPClient, nil
}

// clientOption returns constructed client configuration options, including
// setting up http+unix and http+npipe transports if requested.
func clientOptions(u *url.URL, keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/input_cursor.go
Expand Up @@ -52,5 +52,5 @@ func (in *cursorInput) Run(
publisher inputcursor.Publisher,
) error {
s := src.(*source)
return run(ctx, s.config, publisher, &cursor)
return runWithMetrics(ctx, s.config, publisher, &cursor)
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/input_stateless.go
Expand Up @@ -48,5 +48,5 @@ func (pub statelessPublisher) Publish(event beat.Event, _ interface{}) error {
// It will return on context cancellation, any other error will be retried.
func (in *statelessInput) Run(ctx v2.Context, publisher stateless.Publisher) error {
pub := statelessPublisher{wrapped: publisher}
return run(ctx, in.config, pub, nil)
return runWithMetrics(ctx, in.config, pub, nil)
}
70 changes: 70 additions & 0 deletions x-pack/filebeat/input/httpjson/metrics.go
@@ -0,0 +1,70 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package httpjson

import (
"time"

"github.com/rcrowley/go-metrics"

"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
)

type inputMetrics struct {
intervalExecutionTime metrics.Sample // histogram of the total time elapsed during an interval
intervalPageExecutionTime metrics.Sample // histogram of per page execution time during an interval
intervalPages metrics.Sample // histogram of pages per interval
intervals *monitoring.Uint // total number of intervals executed
intervalErrs *monitoring.Uint // total number of interval errors
}

func newInputMetrics(reg *monitoring.Registry) *inputMetrics {
if reg == nil {
return nil
}

out := &inputMetrics{
intervals: monitoring.NewUint(reg, "httpjson_interval_total"),
intervalErrs: monitoring.NewUint(reg, "httpjson_interval_errors_total"),
intervalExecutionTime: metrics.NewUniformSample(1024),
intervalPageExecutionTime: metrics.NewUniformSample(1024),
intervalPages: metrics.NewUniformSample(1024),
}

_ = adapter.GetGoMetrics(reg, "httpjson_interval_execution_time", adapter.Accept).
GetOrRegister("histogram", metrics.NewHistogram(out.intervalExecutionTime))
_ = adapter.GetGoMetrics(reg, "httpjson_interval_pages_execution_time", adapter.Accept).
GetOrRegister("histogram", metrics.NewHistogram(out.intervalPageExecutionTime))
_ = adapter.GetGoMetrics(reg, "httpjson_interval_pages_total", adapter.Accept).
GetOrRegister("histogram", metrics.NewHistogram(out.intervalPages))

return out
}

func (m *inputMetrics) updateIntervalMetrics(err error, t time.Time) {
if m == nil {
return
}
m.intervals.Add(1)
m.intervalExecutionTime.Update(time.Since(t).Nanoseconds())
if err != nil {
m.intervalErrs.Add(1)
}
}

func (m *inputMetrics) updatePageExecutionTime(t time.Time) {
if m == nil {
return
}
m.intervalPageExecutionTime.Update(time.Since(t).Nanoseconds())
}

func (m *inputMetrics) updatePagesPerInterval(npages int64) {
if m == nil {
return
}
m.intervalPages.Update(npages)
}

0 comments on commit 977d996

Please sign in to comment.