Skip to content

Commit

Permalink
x-pack/filebeat/input/{cel,httpjson,http_endpoint}: prevent complete …
Browse files Browse the repository at this point in the history
…loss of long request trace data (elastic#37836)

The lumberjack logger drops lines that are longer than the max size, so
truncate bodies that are near the limit to ensure that at least some
logging data is retained. Also truncate requests that are too long,
including in http_endpoint.
  • Loading branch information
efd6 authored and Scholar-Li committed Feb 5, 2024
1 parent f08c0a1 commit bb0808b
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -184,6 +184,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Add request trace logging for chained API requests. {issue}37551[36551] {pull}37682[37682]
- Relax TCP/UDP metric polling expectations to improve metric collection. {pull}37714[37714]
- Add support for PEM-based Okta auth in HTTPJSON. {pull}37772[37772]
- Prevent complete loss of long request trace data. {issue}37826[37826] {pull}37836[37836]

*Auditbeat*

Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/cel/input.go
Expand Up @@ -723,7 +723,9 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client,
)
traceLogger := zap.New(core)

trace = httplog.NewLoggingRoundTripper(c.Transport, traceLogger)
const margin = 1e3 // 1OkB ought to be enough room for all the remainder of the trace details.
maxSize := cfg.Resource.Tracer.MaxSize * 1e6
trace = httplog.NewLoggingRoundTripper(c.Transport, traceLogger, max(0, maxSize-margin))
c.Transport = trace
}

Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/http_endpoint/handler.go
Expand Up @@ -177,7 +177,9 @@ func (h *handler) logRequest(r *http.Request, status int, respBody []byte) {
zap.ByteString("http.response.body.content", respBody),
)
}
httplog.LogRequest(h.reqLogger, r, extra...)
// Limit request logging body size to 10kiB.
const maxBodyLen = 10 * (1 << 10)
httplog.LogRequest(h.reqLogger, r, maxBodyLen, extra...)
if scheme != "" {
r.URL.Scheme = scheme
}
Expand Down
7 changes: 6 additions & 1 deletion x-pack/filebeat/input/httpjson/input.go
Expand Up @@ -253,7 +253,12 @@ func newNetHTTPClient(ctx context.Context, cfg *requestConfig, log *logp.Logger,
)
traceLogger := zap.New(core)

netHTTPClient.Transport = httplog.NewLoggingRoundTripper(netHTTPClient.Transport, traceLogger)
const margin = 1e3 // 1OkB ought to be enough room for all the remainder of the trace details.
maxSize := cfg.Tracer.MaxSize*1e6 - margin
if maxSize < 0 {
maxSize = 0
}
netHTTPClient.Transport = httplog.NewLoggingRoundTripper(netHTTPClient.Transport, traceLogger, maxSize)
}

if reg != nil {
Expand Down
21 changes: 14 additions & 7 deletions x-pack/filebeat/input/internal/httplog/roundtripper.go
Expand Up @@ -32,9 +32,10 @@ type contextKey string

// NewLoggingRoundTripper returns a LoggingRoundTripper that logs requests and
// responses to the provided logger.
func NewLoggingRoundTripper(next http.RoundTripper, logger *zap.Logger) *LoggingRoundTripper {
func NewLoggingRoundTripper(next http.RoundTripper, logger *zap.Logger, maxBodyLen int) *LoggingRoundTripper {
return &LoggingRoundTripper{
transport: next,
maxBodyLen: maxBodyLen,
logger: logger,
txBaseID: newID(),
txIDCounter: atomic.NewUint64(0),
Expand All @@ -44,6 +45,7 @@ func NewLoggingRoundTripper(next http.RoundTripper, logger *zap.Logger) *Logging
// LoggingRoundTripper is an http.RoundTripper that logs requests and responses.
type LoggingRoundTripper struct {
transport http.RoundTripper
maxBodyLen int // The maximum length of a body. Longer bodies will be truncated.
logger *zap.Logger // Destination logger.
txBaseID string // Random value to make transaction IDs unique.
txIDCounter *atomic.Uint64 // Transaction ID counter that is incremented for each request.
Expand All @@ -63,6 +65,7 @@ type LoggingRoundTripper struct {
// http.request
// user_agent.original
// http.request.body.content
// http.request.body.truncated
// http.request.body.bytes
// http.request.mime_type
// event.original (the request without body from httputil.DumpRequestOut)
Expand All @@ -71,6 +74,7 @@ type LoggingRoundTripper struct {
//
// http.response.status_code
// http.response.body.content
// http.response.body.truncated
// http.response.body.bytes
// http.response.mime_type
// event.original (the response without body from httputil.DumpResponse)
Expand All @@ -86,7 +90,7 @@ func (rt *LoggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, err
}
}

req, respParts, errorsMessages := logRequest(log, req)
req, respParts, errorsMessages := logRequest(log, req, rt.maxBodyLen)

resp, err := rt.transport.RoundTrip(req)
if err != nil {
Expand All @@ -107,7 +111,8 @@ func (rt *LoggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, err
errorsMessages = append(errorsMessages, fmt.Sprintf("failed to read response body: %s", err))
} else {
respParts = append(respParts,
zap.ByteString("http.response.body.content", body),
zap.ByteString("http.response.body.content", body[:min(len(body), rt.maxBodyLen)]),
zap.Bool("http.response.body.truncated", rt.maxBodyLen < len(body)),
zap.Int("http.response.body.bytes", len(body)),
zap.String("http.response.mime_type", resp.Header.Get("Content-Type")),
)
Expand Down Expand Up @@ -143,17 +148,18 @@ func (rt *LoggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, err
// http.request
// user_agent.original
// http.request.body.content
// http.request.body.truncated
// http.request.body.bytes
// http.request.mime_type
// event.original (the request without body from httputil.DumpRequestOut)
//
// Additional fields in extra will also be logged.
func LogRequest(log *zap.Logger, req *http.Request, extra ...zapcore.Field) *http.Request {
req, _, _ = logRequest(log, req, extra...)
func LogRequest(log *zap.Logger, req *http.Request, maxBodyLen int, extra ...zapcore.Field) *http.Request {
req, _, _ = logRequest(log, req, maxBodyLen, extra...)
return req
}

func logRequest(log *zap.Logger, req *http.Request, extra ...zapcore.Field) (_ *http.Request, parts []zapcore.Field, errorsMessages []string) {
func logRequest(log *zap.Logger, req *http.Request, maxBodyLen int, extra ...zapcore.Field) (_ *http.Request, parts []zapcore.Field, errorsMessages []string) {
reqParts := append([]zapcore.Field{
zap.String("url.original", req.URL.String()),
zap.String("url.scheme", req.URL.Scheme),
Expand All @@ -174,7 +180,8 @@ func logRequest(log *zap.Logger, req *http.Request, extra ...zapcore.Field) (_ *
errorsMessages = append(errorsMessages, fmt.Sprintf("failed to read request body: %s", err))
} else {
reqParts = append(reqParts,
zap.ByteString("http.request.body.content", body),
zap.ByteString("http.request.body.content", body[:min(len(body), maxBodyLen)]),
zap.Bool("http.request.body.truncated", maxBodyLen < len(body)),
zap.Int("http.request.body.bytes", len(body)),
zap.String("http.request.mime_type", req.Header.Get("Content-Type")),
)
Expand Down

0 comments on commit bb0808b

Please sign in to comment.