Skip to content

Commit

Permalink
x-pack/filebeat/input/http_endpoint: wire in trace logging
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed Nov 1, 2023
1 parent 3f992b5 commit 35b49cf
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ is collected by it.
- Avoid unwanted publication of Okta entity records. {pull}36770[36770]
- Add support for Digest Authentication to CEL input. {issue}35514[35514] {pull}36932[36932]
- Use filestream input with file_identity.fingerprint as default for hints autodiscover. {issue}35984[35984] {pull}36950[36950]
- Add request trace logging to http_endpoint input. {issue}36951[36951] {pull}36957[36957]

*Auditbeat*

Expand Down
40 changes: 40 additions & 0 deletions x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,46 @@ The secret token provided by the webhook owner for the CRC validation. It is req
The HTTP method handled by the endpoint. If specified, `method` must be `POST`, `PUT` or `PATCH`.
The default method is `POST`. If `PUT` or `PATCH` are specified, requests using those method types are accepted, but are treated as `POST` requests and are expected to have a request body containing the request data.

[float]
==== `tracer.filename`

It is possible to log HTTP requests and responses in a CEL program to a local file-system for debugging configurations.
This option is enabled by setting the `tracer.filename` value. Additional options are available to
tune log rotation behavior.

To differentiate the trace files generated from different input instances, a placeholder `*` can be added to the filename and will be replaced with the input instance id.
For Example, `http-request-trace-*.ndjson`.

Enabling this option compromises security and should only be used for debugging.

[float]
==== `tracer.maxsize`

This value sets the maximum size, in megabytes, the log file will reach before it is rotated. By default
logs are allowed to reach 1MB before rotation.

[float]
==== `tracer.maxage`

This specifies the number days to retain rotated log files. If it is not set, log files are retained
indefinitely.

[float]
==== `tracer.maxbackups`

The number of old logs to retain. If it is not set all old logs are retained subject to the `tracer.maxage`
setting.

[float]
==== `tracer.localtime`

Whether to use the host's local time rather that UTC for timestamping rotated log file names.

[float]
==== `tracer.compress`

This determines whether rotated logs should be gzip compressed.

[float]
=== Metrics

Expand Down
3 changes: 3 additions & 0 deletions x-pack/filebeat/input/http_endpoint/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"net/textproto"
"strings"

"gopkg.in/natefinch/lumberjack.v2"

"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)

Expand Down Expand Up @@ -45,6 +47,7 @@ type config struct {
CRCSecret string `config:"crc.secret"`
IncludeHeaders []string `config:"include_headers"`
PreserveOriginalEvent bool `config:"preserve_original_event"`
Tracer *lumberjack.Logger `config:"tracer"`
}

func defaultConfig() config {
Expand Down
31 changes: 31 additions & 0 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ import (
"net/http"
"time"

"go.uber.org/zap"

stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/jsontransform"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand All @@ -35,6 +38,9 @@ type handler struct {
log *logp.Logger
validator apiValidator

reqLogger *zap.Logger
host, scheme string

messageField string
responseCode int
responseBody string
Expand All @@ -49,6 +55,10 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

if h.reqLogger != nil {
h.logRequest(r)
}

start := time.Now()
h.metrics.batchesReceived.Add(1)
h.metrics.contentLength.Update(r.ContentLength)
Expand Down Expand Up @@ -106,6 +116,27 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.metrics.batchesPublished.Add(1)
}

func (h *handler) logRequest(r *http.Request) {
// Populate and preserve scheme and host if they are missing;
// they are required for httputil.DumpRequestOut.
var scheme, host string
if r.URL.Scheme == "" {
scheme = r.URL.Scheme
r.URL.Scheme = h.scheme
}
if r.URL.Host == "" {
host = r.URL.Host
r.URL.Host = h.host
}
httplog.LogRequest(h.reqLogger, r)
if scheme != "" {
r.URL.Scheme = scheme
}
if host != "" {
r.URL.Host = host
}
}

func sendAPIErrorResponse(w http.ResponseWriter, r *http.Request, log *logp.Logger, status int, apiError error) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(status)
Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/http_endpoint/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package http_endpoint
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"io"
"net/http"
Expand Down Expand Up @@ -316,13 +317,14 @@ func Test_apiResponse(t *testing.T) {
},
}

ctx := context.Background()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Setup
pub := new(publisher)
metrics := newInputMetrics("")
defer metrics.Close()
apiHandler := newHandler(tc.conf, pub, logp.NewLogger("http_endpoint.test"), metrics)
apiHandler := newHandler(ctx, tc.conf, pub, logp.NewLogger("http_endpoint.test"), metrics)

// Execute handler.
respRec := httptest.NewRecorder()
Expand Down
32 changes: 28 additions & 4 deletions x-pack/filebeat/input/http_endpoint/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
"time"

"github.com/rcrowley/go-metrics"
"go.elastic.co/ecszap"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
Expand Down Expand Up @@ -146,15 +149,14 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher, m
return err
}
log.Infof("Adding %s end point to server on %s", pattern, e.addr)
s.mux.Handle(pattern, newHandler(e.config, pub, log, metrics))
s.mux.Handle(pattern, newHandler(s.ctx, e.config, pub, log, metrics))
s.idOf[pattern] = ctx.ID
p.mu.Unlock()
<-s.ctx.Done()
return s.getErr()
}

mux := http.NewServeMux()
mux.Handle(pattern, newHandler(e.config, pub, log, metrics))
srv := &http.Server{Addr: e.addr, TLSConfig: e.tlsConfig, Handler: mux, ReadHeaderTimeout: 5 * time.Second}
s = &server{
idOf: map[string]string{pattern: ctx.ID},
Expand All @@ -163,6 +165,7 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher, m
srv: srv,
}
s.ctx, s.cancel = ctxtool.WithFunc(ctx.Cancelation, func() { srv.Close() })
mux.Handle(pattern, newHandler(s.ctx, e.config, pub, log, metrics))
p.servers[e.addr] = s
p.mu.Unlock()

Expand Down Expand Up @@ -284,8 +287,8 @@ func (s *server) getErr() error {
return s.err
}

func newHandler(c config, pub stateless.Publisher, log *logp.Logger, metrics *inputMetrics) http.Handler {
return &handler{
func newHandler(ctx context.Context, c config, pub stateless.Publisher, log *logp.Logger, metrics *inputMetrics) http.Handler {
h := &handler{
log: log,
publisher: pub,
metrics: metrics,
Expand All @@ -309,6 +312,27 @@ func newHandler(c config, pub stateless.Publisher, log *logp.Logger, metrics *in
preserveOriginalEvent: c.PreserveOriginalEvent,
crc: newCRC(c.CRCProvider, c.CRCSecret),
}
if c.Tracer != nil {
w := zapcore.AddSync(c.Tracer)
go func() {
// Close the logger when we are done.
<-ctx.Done()
c.Tracer.Close()
}()
core := ecszap.NewCore(
ecszap.NewDefaultEncoderConfig(),
w,
zap.DebugLevel,
)
h.reqLogger = zap.New(core)
h.host = c.ListenAddress + ":" + c.ListenPort
if c.TLS != nil && c.TLS.IsEnabled() {
h.scheme = "https"
} else {
h.scheme = "http"
}
}
return h
}

// inputMetrics handles the input's metric reporting.
Expand Down

0 comments on commit 35b49cf

Please sign in to comment.