Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure ECS compliant logging when enabled. #3829

Merged
merged 19 commits into from Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions _meta/beat.yml
Expand Up @@ -1059,8 +1059,8 @@ output.elasticsearch:

# Set to true, to log messages with minimal required Elastic Common Schema (ECS)
# information. Recommended to use in combination with `logging.json=true`
# Defaults to false.
#logging.ecs: false
# Defaults to true.
#logging.ecs: true

#=============================== HTTP Endpoint ===============================

Expand Down
4 changes: 2 additions & 2 deletions apm-server.docker.yml
Expand Up @@ -1059,8 +1059,8 @@ output.elasticsearch:

# Set to true, to log messages with minimal required Elastic Common Schema (ECS)
# information. Recommended to use in combination with `logging.json=true`
# Defaults to false.
#logging.ecs: false
# Defaults to true.
#logging.ecs: true

#=============================== HTTP Endpoint ===============================

Expand Down
4 changes: 2 additions & 2 deletions apm-server.yml
Expand Up @@ -1059,8 +1059,8 @@ output.elasticsearch:

# Set to true, to log messages with minimal required Elastic Common Schema (ECS)
# information. Recommended to use in combination with `logging.json=true`
# Defaults to false.
#logging.ecs: false
# Defaults to true.
#logging.ecs: true

#=============================== HTTP Endpoint ===============================

Expand Down
111 changes: 57 additions & 54 deletions beater/middleware/log_middleware.go
Expand Up @@ -34,73 +34,76 @@ import (

// LogMiddleware returns a middleware taking care of logging processing a request in the middleware and the request handler
func LogMiddleware() Middleware {
logger := logp.NewLogger(logs.Request)
return func(h request.Handler) (request.Handler, error) {

return func(c *request.Context) {
var reqID, transactionID, traceID string
start := time.Now()
tx := apm.TransactionFromContext(c.Request.Context())
if tx != nil {
// This request is being traced, grab its IDs to add to logs.
traceContext := tx.TraceContext()
transactionID = traceContext.Span.String()
traceID = traceContext.Trace.String()
reqID = transactionID
} else {
uuid, err := uuid.NewV4()
if err != nil {
id := request.IDResponseErrorsInternal
logger.Errorw(request.MapResultIDToStatus[id].Keyword, "error", err)
c.Result.SetWithError(id, err)
c.Write()
return
}
reqID = uuid.String()
}

reqLogger := logger.With(
"request_id", reqID,
"method", c.Request.Method,
"URL", c.Request.URL,
"content_length", c.Request.ContentLength,
"remote_address", utility.RemoteAddr(c.Request),
"user-agent", c.Request.Header.Get(headers.UserAgent))

if traceID != "" {
reqLogger = reqLogger.With(
"trace.id", traceID,
"transaction.id", transactionID,
)
c.Logger = loggerWithRequestContext(c)
var err error
if c.Logger, err = loggerWithTraceContext(c); err != nil {
id := request.IDResponseErrorsInternal
c.Logger.Error(request.MapResultIDToStatus[id].Keyword, logp.Error(err))
c.Result.SetWithError(id, err)
c.Write()
return
}

c.Logger = reqLogger
h(c)
reqLogger = reqLogger.With("event.duration", time.Since(start))

c.Logger = c.Logger.With("event.duration", time.Since(start))
if c.MultipleWriteAttempts() {
reqLogger.Warn("multiple write attempts")
c.Logger.Warn("multiple write attempts")
}

keyword := c.Result.Keyword
if keyword == "" {
keyword = "handled request"
}

keysAndValues := []interface{}{"response_code", c.Result.StatusCode}
if c.Result.Err != nil {
keysAndValues = append(keysAndValues, "error", c.Result.Err.Error())
}
if c.Result.Stacktrace != "" {
keysAndValues = append(keysAndValues, "stacktrace", c.Result.Stacktrace)
}

c.Logger = loggerWithResult(c)
if c.Result.Failure() {
reqLogger.Errorw(keyword, keysAndValues...)
} else {
reqLogger.Infow(keyword, keysAndValues...)
c.Logger.Error(keyword)
return
}

c.Logger.Info(keyword)
}, nil
}
}

func loggerWithRequestContext(c *request.Context) *logp.Logger {
logger := logp.NewLogger(logs.Request).With(
"url.original", c.Request.URL.String(),
"http.request.method", c.Request.Method,
"user_agent.original", c.Request.Header.Get(headers.UserAgent),
axw marked this conversation as resolved.
Show resolved Hide resolved
"source.address", utility.RemoteAddr(c.Request))
if c.Request.ContentLength != -1 {
logger = logger.With("http.request.body.bytes", c.Request.ContentLength)
}
return logger
}

func loggerWithTraceContext(c *request.Context) (*logp.Logger, error) {
tx := apm.TransactionFromContext(c.Request.Context())
if tx == nil {
uuid, err := uuid.NewV4()
if err != nil {
return c.Logger, err
}
return c.Logger.With("http.request.id", uuid.String()), nil
}
// This request is being traced, grab its IDs to add to logs.
traceContext := tx.TraceContext()
transactionID := traceContext.Span.String()
return c.Logger.With(
"trace.id", traceContext.Trace.String(),
"transaction.id", transactionID,
"http.request.id", transactionID,
), nil
}

func loggerWithResult(c *request.Context) *logp.Logger {
logger := c.Logger.With(
"http.response.status_code", c.Result.StatusCode)
if c.Result.Err != nil {
logger = logger.With("error.message", c.Result.Err.Error())
}
if c.Result.Stacktrace != "" {
logger = logger.With("error.stack_trace", c.Result.Stacktrace)
}
return logger
}
84 changes: 39 additions & 45 deletions beater/middleware/log_middleware_test.go
Expand Up @@ -21,15 +21,16 @@ import (
"net/http"
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

"go.elastic.co/apm"
"go.elastic.co/apm/apmtest"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/logp/configure"

"github.com/elastic/apm-server/beater/beatertest"
"github.com/elastic/apm-server/beater/headers"
Expand All @@ -38,31 +39,30 @@ import (
)

func TestLogMiddleware(t *testing.T) {
err := logp.DevelopmentSetup(logp.ToObserverOutput())
require.NoError(t, err)

testCases := []struct {
name, message string
level zapcore.Level
handler request.Handler
code int
error error
stacktrace bool
traced bool
ecsKeys []string
}{
{
name: "Accepted",
message: "request accepted",
level: zapcore.InfoLevel,
handler: beatertest.Handler202,
code: http.StatusAccepted,
ecsKeys: []string{"url.original"},
},
{
name: "Traced",
message: "request accepted",
level: zapcore.InfoLevel,
handler: beatertest.Handler202,
code: http.StatusAccepted,
ecsKeys: []string{"url.original", "trace.id", "transaction.id"},
traced: true,
},
{
Expand All @@ -71,16 +71,15 @@ func TestLogMiddleware(t *testing.T) {
level: zapcore.ErrorLevel,
handler: beatertest.Handler403,
code: http.StatusForbidden,
error: errors.New("forbidden request"),
ecsKeys: []string{"url.original", "error.message"},
},
{
name: "Panic",
message: "internal error",
level: zapcore.ErrorLevel,
handler: Apply(RecoverPanicMiddleware(), beatertest.HandlerPanic),
code: http.StatusInternalServerError,
error: errors.New("panic on Handle"),
stacktrace: true,
name: "Panic",
message: "internal error",
level: zapcore.ErrorLevel,
handler: Apply(RecoverPanicMiddleware(), beatertest.HandlerPanic),
code: http.StatusInternalServerError,
ecsKeys: []string{"url.original", "error.message", "error.stack_trace"},
},
{
name: "Error without keyword",
Expand All @@ -90,12 +89,19 @@ func TestLogMiddleware(t *testing.T) {
c.Result.StatusCode = http.StatusForbidden
c.Write()
},
code: http.StatusForbidden,
code: http.StatusForbidden,
ecsKeys: []string{"url.original"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// log setup
configure.Logging("APM Server test",
common.MustNewConfigFrom(`{"ecs":true}`))
require.NoError(t, logp.DevelopmentSetup(logp.ToObserverOutput()))

// prepare and record request
c, rec := beatertest.DefaultContextWithResponseRecorder()
c.Request.Header.Set(headers.UserAgent, tc.name)
if tc.traced {
Expand All @@ -105,39 +111,27 @@ func TestLogMiddleware(t *testing.T) {
}
Apply(LogMiddleware(), tc.handler)(c)

// check log lines
assert.Equal(t, tc.code, rec.Code)
for i, entry := range logp.ObserverLogs().TakeAll() {
// expect only one log entry per request
assert.Equal(t, i, 0)
assert.Equal(t, logs.Request, entry.LoggerName)
assert.Equal(t, tc.level, entry.Level)
assert.Equal(t, tc.message, entry.Message)
entries := logp.ObserverLogs().TakeAll()
require.Equal(t, 1, len(entries))
entry := entries[0]
assert.Equal(t, logs.Request, entry.LoggerName)
assert.Equal(t, tc.level, entry.Level)
assert.Equal(t, tc.message, entry.Message)

ec := entry.ContextMap()
t.Logf("context map: %v", ec)

assert.NotEmpty(t, ec["request_id"])
assert.NotEmpty(t, ec["method"])
assert.Equal(t, c.Request.URL.String(), ec["URL"])
assert.NotEmpty(t, ec["remote_address"])
assert.Contains(t, ec, "event.duration")
assert.Equal(t, c.Request.Header.Get(headers.UserAgent), ec["user-agent"])
// zap encoded type
assert.Equal(t, tc.code, int(ec["response_code"].(int64)))
if tc.error != nil {
assert.Equal(t, tc.error.Error(), ec["error"])
}
if tc.stacktrace {
assert.NotZero(t, ec["stacktrace"])
}
if tc.traced {
assert.NotEmpty(t, ec, "trace.id")
assert.NotEmpty(t, ec, "transaction.id")
assert.Equal(t, ec["request_id"], ec["transaction.id"])
} else {
assert.NotContains(t, ec, "trace.id")
assert.NotContains(t, ec, "transaction.id")
}
encoder := zapcore.NewMapObjectEncoder()
ec := common.MapStr{}
for _, f := range entry.Context {
f.AddTo(encoder)
ec.DeepUpdate(encoder.Fields)
}
keys := []string{"http.request.id", "http.request.method", "http.request.body.bytes",
"source.address", "user_agent.original", "http.response.status_code", "event.duration"}
keys = append(keys, tc.ecsKeys...)
for _, key := range keys {
ok, _ := ec.HasKey(key)
assert.True(t, ok, key)
}
})
}
Expand Down
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Expand Up @@ -27,3 +27,4 @@ https://github.com/elastic/apm-server/compare/7.10\...master[View commits]
* Experimental support for data streams {pull}4409[4409]
* Label/custom/mark keys are now sanitized (rather than validated and rejected) by the server {pull}4465[4465]
* Upgrade Go to 1.14.12 {pull}4478[4478]
* Switch logging format to be ECS compliant where possible {pull}3829[3829]
1 change: 1 addition & 0 deletions cmd/root.go
Expand Up @@ -48,6 +48,7 @@ var libbeatConfigOverrides = []cfgfile.ConditionalOverride{{
"metrics": map[string]interface{}{
"enabled": false,
},
"ecs": true,
},
}),
}}
Expand Down
3 changes: 1 addition & 2 deletions docs/copied-from-beats/docs/loggingconfig.asciidoc
Expand Up @@ -238,8 +238,7 @@ When true, logs messages in JSON format. The default is false.
[float]
==== `logging.ecs`

When true, logs messages with minimal required Elastic Common Schema (ECS)
information.
When true, logs messages in Elastic Common Schema (ECS) compliant format.

ifndef::serverless[]
[float]
Expand Down
4 changes: 2 additions & 2 deletions systemtest/apmservertest/server.go
Expand Up @@ -288,7 +288,7 @@ func (s *Server) consumeStderr(procStderr io.Reader) {
s.Stderr = stderrPipeReader

type logEntry struct {
Timestamp logpTimestamp `json:"@timestamp"`
Timestamp logpTimestamp `json:"timestamp"`
simitt marked this conversation as resolved.
Show resolved Hide resolved
Message string `json:"message"`
Level zapcore.Level `json:"log.level"`
Logger string `json:"log.logger"`
Expand All @@ -312,7 +312,7 @@ func (s *Server) consumeStderr(procStderr io.Reader) {
if err := json.Unmarshal(raw, &fields); err != nil {
break
}
delete(fields, "@timestamp")
delete(fields, "timestamp")
simitt marked this conversation as resolved.
Show resolved Hide resolved
delete(fields, "log.level")
delete(fields, "log.logger")
delete(fields, "log.origin")
Expand Down
2 changes: 2 additions & 0 deletions systemtest/export_test.go
Expand Up @@ -49,6 +49,7 @@ func TestExportConfigDefaults(t *testing.T) {

expectedConfig := strings.ReplaceAll(`
logging:
ecs: true
metrics:
enabled: false
path:
Expand All @@ -69,6 +70,7 @@ func TestExportConfigOverrideDefaults(t *testing.T) {

expectedConfig := strings.ReplaceAll(`
logging:
ecs: true
metrics:
enabled: true
path:
Expand Down