Skip to content

Commit

Permalink
address pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed Nov 11, 2023
1 parent adad0c1 commit 38001b7
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 37 deletions.
2 changes: 1 addition & 1 deletion x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ The default method is `POST`. If `PUT` or `PATCH` are specified, requests using
[float]
==== `tracer.filename`

It is possible to log HTTP requests and responses in a CEL program to a local file-system for debugging configurations.
It is possible to log HTTP requests to a local file-system for debugging configurations.
This option is enabled by setting the `tracer.filename` value. Additional options are available to
tune log rotation behavior.

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/http_endpoint/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
trace_logs
86 changes: 62 additions & 24 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -50,35 +52,43 @@ type handler struct {
}

func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if status, err := h.validator.validateRequest(r); err != nil {
sendAPIErrorResponse(w, r, h.log, status, err)
status, err := h.validator.validateRequest(r)
if err != nil {
h.sendAPIErrorResponse(w, r, h.log, status, err)
return
}

if h.reqLogger != nil {
h.logRequest(r)
}

start := time.Now()
h.metrics.batchesReceived.Add(1)
h.metrics.contentLength.Update(r.ContentLength)
body, status, err := getBodyReader(r)
if err != nil {
sendAPIErrorResponse(w, r, h.log, status, err)
h.sendAPIErrorResponse(w, r, h.log, status, err)
h.metrics.apiErrors.Add(1)
return
}
defer body.Close()

if h.reqLogger != nil {
// If we are logging, keep a copy of the body for the logger.
// This is stashed in the r.Body field. This is only safe
// because we are closing the original body in a defer and
// r.Body is not otherwise referenced by the non-logging logic
// after the call to getBodyReader above.
var buf bytes.Buffer
body = io.NopCloser(io.TeeReader(body, &buf))
r.Body = io.NopCloser(&buf)
}

objs, _, status, err := httpReadJSON(body)
if err != nil {
sendAPIErrorResponse(w, r, h.log, status, err)
h.sendAPIErrorResponse(w, r, h.log, status, err)
h.metrics.apiErrors.Add(1)
return
}

var headers map[string]interface{}
if len(h.includeHeaders) > 0 {
if len(h.includeHeaders) != 0 {
headers = getIncludedHeaders(r, h.includeHeaders)
}

Expand All @@ -97,26 +107,51 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
break
} else if !errors.Is(err, errNotCRC) {
h.metrics.apiErrors.Add(1)
sendAPIErrorResponse(w, r, h.log, http.StatusBadRequest, err)
h.sendAPIErrorResponse(w, r, h.log, http.StatusBadRequest, err)
return
}
}

if err = h.publishEvent(obj, headers); err != nil {
h.metrics.apiErrors.Add(1)
sendAPIErrorResponse(w, r, h.log, http.StatusInternalServerError, err)
h.sendAPIErrorResponse(w, r, h.log, http.StatusInternalServerError, err)
return
}
h.metrics.eventsPublished.Add(1)
respCode, respBody = h.responseCode, h.responseBody
}

h.sendResponse(w, respCode, respBody)
if h.reqLogger != nil {
h.logRequest(r, respCode, nil)
}
h.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds())
h.metrics.batchesPublished.Add(1)
}

func (h *handler) logRequest(r *http.Request) {
func (h *handler) sendAPIErrorResponse(w http.ResponseWriter, r *http.Request, log *logp.Logger, status int, apiError error) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(status)

var (
mw io.Writer = w
buf bytes.Buffer
)
if h.reqLogger != nil {
mw = io.MultiWriter(mw, &buf)
}
enc := json.NewEncoder(mw)
enc.SetEscapeHTML(false)
err := enc.Encode(map[string]interface{}{"message": apiError.Error()})
if err != nil {
log.Debugw("Failed to write HTTP response.", "error", err, "client.address", r.RemoteAddr)
}
if h.reqLogger != nil {
h.logRequest(r, status, buf.Bytes())
}
}

func (h *handler) logRequest(r *http.Request, status int, respBody []byte) {
// Populate and preserve scheme and host if they are missing;
// they are required for httputil.DumpRequestOut.
var scheme, host string
Expand All @@ -128,7 +163,21 @@ func (h *handler) logRequest(r *http.Request) {
host = r.URL.Host
r.URL.Host = h.host
}
httplog.LogRequest(h.reqLogger, r)
extra := make([]zapcore.Field, 1, 4)
extra[0] = zap.Int("status", status)
addr, port, err := net.SplitHostPort(r.RemoteAddr)
if err == nil {
extra = append(extra,
zap.String("source.ip", addr),
zap.String("source.port", port),
)
}
if len(respBody) != 0 {
extra = append(extra,
zap.ByteString("http.response.body.content", respBody),
)
}
httplog.LogRequest(h.reqLogger, r, extra...)
if scheme != "" {
r.URL.Scheme = scheme
}
Expand All @@ -137,17 +186,6 @@ func (h *handler) logRequest(r *http.Request) {
}
}

func sendAPIErrorResponse(w http.ResponseWriter, r *http.Request, log *logp.Logger, status int, apiError error) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(status)

enc := json.NewEncoder(w)
enc.SetEscapeHTML(false)
if err := enc.Encode(map[string]interface{}{"message": apiError.Error()}); err != nil {
log.Debugw("Failed to write HTTP response.", "error", err, "client.address", r.RemoteAddr)
}
}

func (h *handler) sendResponse(w http.ResponseWriter, status int, message string) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(status)
Expand Down
44 changes: 37 additions & 7 deletions x-pack/filebeat/input/http_endpoint/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,31 @@ import (
"compress/gzip"
"context"
"encoding/json"
"errors"
"flag"
"io"
"io/fs"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/natefinch/lumberjack.v2"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

var withTraces = flag.Bool("log-traces", false, "specify logging request traces during tests")

const traceLogsDir = "trace_logs"

func Test_httpReadJSON(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -159,6 +169,16 @@ func (p *publisher) Publish(e beat.Event) {
}

func Test_apiResponse(t *testing.T) {
if *withTraces {
err := os.RemoveAll(traceLogsDir)
if err != nil && errors.Is(err, fs.ErrExist) {
t.Fatalf("failed to remove trace logs directory: %v", err)
}
err = os.Mkdir(traceLogsDir, 0o750)
if err != nil {
t.Fatalf("failed to make trace logs directory: %v", err)
}
}
testCases := []struct {
name string // Sub-test name.
conf config // Load configuration.
Expand All @@ -168,7 +188,7 @@ func Test_apiResponse(t *testing.T) {
wantResponse string // Expected response message.
}{
{
name: "single event",
name: "single_event",
conf: defaultConfig(),
request: func() *http.Request {
req := httptest.NewRequest(http.MethodPost, "/", bytes.NewBufferString(`{"id":0}`))
Expand All @@ -186,7 +206,7 @@ func Test_apiResponse(t *testing.T) {
wantResponse: `{"message": "success"}`,
},
{
name: "single event gzip",
name: "single_event_gzip",
conf: defaultConfig(),
request: func() *http.Request {
buf := new(bytes.Buffer)
Expand All @@ -210,7 +230,7 @@ func Test_apiResponse(t *testing.T) {
wantResponse: `{"message": "success"}`,
},
{
name: "multiple events gzip",
name: "multiple_events_gzip",
conf: defaultConfig(),
request: func() *http.Request {
events := []string{
Expand Down Expand Up @@ -244,7 +264,7 @@ func Test_apiResponse(t *testing.T) {
wantResponse: `{"message": "success"}`,
},
{
name: "validate CRC request",
name: "validate_CRC_request",
conf: config{
CRCProvider: "Zoom",
CRCSecret: "secretValueTest",
Expand All @@ -268,7 +288,7 @@ func Test_apiResponse(t *testing.T) {
wantResponse: `{"encryptedToken":"70c1f2e2e6ca2d39297490d1f9142c7d701415ea8e6151f6562a08fa657a40ff","plainToken":"qgg8vlvZRS6UYooatFL8Aw"}`,
},
{
name: "malformed CRC request",
name: "malformed_CRC_request",
conf: config{
CRCProvider: "Zoom",
CRCSecret: "secretValueTest",
Expand All @@ -292,7 +312,7 @@ func Test_apiResponse(t *testing.T) {
wantResponse: `{"message":"malformed JSON object at stream position 0: invalid character '\\n' in string literal"}`,
},
{
name: "empty CRC challenge",
name: "empty_CRC_challenge",
conf: config{
CRCProvider: "Zoom",
CRCSecret: "secretValueTest",
Expand Down Expand Up @@ -324,7 +344,7 @@ func Test_apiResponse(t *testing.T) {
pub := new(publisher)
metrics := newInputMetrics("")
defer metrics.Close()
apiHandler := newHandler(ctx, tc.conf, pub, logp.NewLogger("http_endpoint.test"), metrics)
apiHandler := newHandler(ctx, tracerConfig(tc.name, tc.conf, *withTraces), pub, logp.NewLogger("http_endpoint.test"), metrics)

// Execute handler.
respRec := httptest.NewRecorder()
Expand All @@ -341,3 +361,13 @@ func Test_apiResponse(t *testing.T) {
})
}
}

func tracerConfig(name string, cfg config, withTrace bool) config {
if !withTrace {
return cfg
}
cfg.Tracer = &lumberjack.Logger{
Filename: filepath.Join(traceLogsDir, name+".ndjson"),
}
return cfg
}
13 changes: 8 additions & 5 deletions x-pack/filebeat/input/internal/httplog/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,15 @@ func (rt *LoggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, err
// http.request.body.bytes
// http.request.mime_type
// event.original (the request without body from httputil.DumpRequestOut)
func LogRequest(log *zap.Logger, req *http.Request) *http.Request {
req, _, _ = logRequest(log, req)
//
// Additional fields in extra will also be logged.
func LogRequest(log *zap.Logger, req *http.Request, extra ...zapcore.Field) *http.Request {
req, _, _ = logRequest(log, req, extra...)
return req
}

func logRequest(log *zap.Logger, req *http.Request) (_ *http.Request, parts []zapcore.Field, errorsMessages []string) {
reqParts := []zapcore.Field{
func logRequest(log *zap.Logger, req *http.Request, extra ...zapcore.Field) (_ *http.Request, parts []zapcore.Field, errorsMessages []string) {
reqParts := append([]zapcore.Field{
zap.String("url.original", req.URL.String()),
zap.String("url.scheme", req.URL.Scheme),
zap.String("url.path", req.URL.Path),
Expand All @@ -161,7 +163,8 @@ func logRequest(log *zap.Logger, req *http.Request) (_ *http.Request, parts []za
zap.String("url.query", req.URL.RawQuery),
zap.String("http.request.method", req.Method),
zap.String("user_agent.original", req.Header.Get("User-Agent")),
}
}, extra...)

var (
body []byte
err error
Expand Down

0 comments on commit 38001b7

Please sign in to comment.