Skip to content

Commit

Permalink
Logging: add minimal support for ECS (#2457)
Browse files Browse the repository at this point in the history
* Add some ECS fields

* Handle "error" and "source" fields
  • Loading branch information
barkbay committed Jan 27, 2020
1 parent b1355ce commit b96e23f
Show file tree
Hide file tree
Showing 14 changed files with 47 additions and 11 deletions.
4 changes: 4 additions & 0 deletions config/e2e/global_operator.yaml
Expand Up @@ -157,6 +157,10 @@ spec:
serviceName: {{ .GlobalOperator.Name }}
template:
metadata:
annotations:
# Rename the fields "error" to "error.message" and "source" to "event.source"
# This is to avoid a conflict with the ECS "error" and "source" documents.
"co.elastic.logs/raw": "[{\"type\":\"container\",\"json.keys_under_root\":true,\"paths\":[\"/var/log/containers/*${data.kubernetes.container.id}.log\"],\"processors\":[{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"error\",\"to\":\"_error\"}]}},{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"_error\",\"to\":\"error.message\"}]}},{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"source\",\"to\":\"_source\"}]}},{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"_source\",\"to\":\"event.source\"}]}}]}]"
labels:
control-plane: {{ .GlobalOperator.Name }}
test-run: {{ .TestRun }}
Expand Down
4 changes: 4 additions & 0 deletions config/e2e/namespace_operator.yaml
Expand Up @@ -193,6 +193,10 @@ spec:
serviceName: {{ .NamespaceOperator.Name }}
template:
metadata:
annotations:
# Rename the fields "error" to "error.message" and "source" to "event.source"
# This is to avoid a conflict with the ECS "error" and "source" documents.
"co.elastic.logs/raw": "[{\"type\":\"container\",\"json.keys_under_root\":true,\"paths\":[\"/var/log/containers/*${data.kubernetes.container.id}.log\"],\"processors\":[{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"error\",\"to\":\"_error\"}]}},{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"_error\",\"to\":\"error.message\"}]}},{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"source\",\"to\":\"_source\"}]}},{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"_source\",\"to\":\"event.source\"}]}}]}]"
labels:
control-plane: {{ .NamespaceOperator.Name }}
test-run: {{ $testRun }}
Expand Down
4 changes: 4 additions & 0 deletions config/operator/all-in-one/operator.template.yaml
Expand Up @@ -14,6 +14,10 @@ spec:
serviceName: elastic-operator
template:
metadata:
annotations:
# Rename the fields "error" to "error.message" and "source" to "event.source"
# This is to avoid a conflict with the ECS "error" and "source" documents.
"co.elastic.logs/raw": "[{\"type\":\"container\",\"json.keys_under_root\":true,\"paths\":[\"/var/log/containers/*${data.kubernetes.container.id}.log\"],\"processors\":[{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"error\",\"to\":\"_error\"}]}},{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"_error\",\"to\":\"error.message\"}]}},{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"source\",\"to\":\"_source\"}]}},{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"_source\",\"to\":\"event.source\"}]}}]}]"
labels:
control-plane: elastic-operator
spec:
Expand Down
4 changes: 4 additions & 0 deletions config/operator/global/operator.template.yaml
Expand Up @@ -14,6 +14,10 @@ spec:
serviceName: elastic-global-operator
template:
metadata:
annotations:
# Rename the fields "error" to "error.message" and "source" to "event.source"
# This is to avoid a conflict with the ECS "error" and "source" documents.
"co.elastic.logs/raw": "[{\"type\":\"container\",\"json.keys_under_root\":true,\"paths\":[\"/var/log/containers/*${data.kubernetes.container.id}.log\"],\"processors\":[{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"error\",\"to\":\"_error\"}]}},{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"_error\",\"to\":\"error.message\"}]}},{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"source\",\"to\":\"_source\"}]}},{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"_source\",\"to\":\"event.source\"}]}}]}]"
labels:
control-plane: elastic-global-operator
spec:
Expand Down
4 changes: 4 additions & 0 deletions config/operator/namespace/operator.template.yaml
Expand Up @@ -14,6 +14,10 @@ spec:
serviceName: elastic-namespace-operator
template:
metadata:
annotations:
# Rename the fields "error" to "error.message" and "source" to "event.source"
# This is to avoid a conflict with the ECS "error" and "source" documents.
"co.elastic.logs/raw": "[{\"type\":\"container\",\"json.keys_under_root\":true,\"paths\":[\"/var/log/containers/*${data.kubernetes.container.id}.log\"],\"processors\":[{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"error\",\"to\":\"_error\"}]}},{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"_error\",\"to\":\"error.message\"}]}},{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"source\",\"to\":\"_source\"}]}},{\"convert\":{\"mode\":\"rename\",\"ignore_missing\":true,\"fields\":[{\"from\":\"_source\",\"to\":\"event.source\"}]}}]}]"
labels:
control-plane: elastic-namespace-operator
spec:
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/apmserver/apmserver_controller.go
Expand Up @@ -176,7 +176,7 @@ var _ driver.Interface = &ReconcileApmServer{}
// Reconcile reads that state of the cluster for a ApmServer object and makes changes based on the state read
// and what is in the ApmServer.Spec
func (r *ReconcileApmServer) Reconcile(request reconcile.Request) (reconcile.Result, error) {
defer common.LogReconciliationRun(log, request, &r.iteration)()
defer common.LogReconciliationRun(log, request, "as_name", &r.iteration)()

var as apmv1.ApmServer
if err := association.FetchWithAssociation(r.Client, request, &as); err != nil {
Expand Down
Expand Up @@ -134,7 +134,7 @@ func (r *ReconcileApmServerElasticsearchAssociation) onDelete(obj types.Namespac
// Reconcile reads that state of the cluster for a ApmServerElasticsearchAssociation object and makes changes based on the state read
// and what is in the ApmServerElasticsearchAssociation.Spec
func (r *ReconcileApmServerElasticsearchAssociation) Reconcile(request reconcile.Request) (reconcile.Result, error) {
defer common.LogReconciliationRun(log, request, &r.iteration)()
defer common.LogReconciliationRun(log, request, "as_name", &r.iteration)()

var apmServer apmv1.ApmServer
if err := association.FetchWithAssociation(r.Client, request, &apmServer); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/common/logging.go
Expand Up @@ -13,12 +13,12 @@ import (
)

// LogReconciliationRun is the common logging function used to record a reconciliation run.
func LogReconciliationRun(log logr.Logger, request reconcile.Request, iteration *uint64) func() {
func LogReconciliationRun(log logr.Logger, request reconcile.Request, nameField string, iteration *uint64) func() {
currentIteration := atomic.AddUint64(iteration, 1)
startTime := time.Now()
log.Info("Starting reconciliation run", "iteration", currentIteration, "namespace", request.Namespace, "name", request.Name)
log.Info("Starting reconciliation run", "iteration", currentIteration, "namespace", request.Namespace, nameField, request.Name)
return func() {
totalTime := time.Since(startTime)
log.Info("Ending reconciliation run", "iteration", currentIteration, "namespace", request.Namespace, "name", request.Name, "took", totalTime)
log.Info("Ending reconciliation run", "iteration", currentIteration, "namespace", request.Namespace, nameField, request.Name, "took", totalTime)
}
}
2 changes: 1 addition & 1 deletion pkg/controller/elasticsearch/elasticsearch_controller.go
Expand Up @@ -174,7 +174,7 @@ type ReconcileElasticsearch struct {
// Reconcile reads the state of the cluster for an Elasticsearch object and makes changes based on the state read and
// what is in the Elasticsearch.Spec
func (r *ReconcileElasticsearch) Reconcile(request reconcile.Request) (reconcile.Result, error) {
defer common.LogReconciliationRun(log, request, &r.iteration)()
defer common.LogReconciliationRun(log, request, "es_name", &r.iteration)()

// Fetch the Elasticsearch instance
es := esv1.Elasticsearch{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/kibana/kibana_controller.go
Expand Up @@ -127,7 +127,7 @@ type ReconcileKibana struct {
// Reconcile reads that state of the cluster for a Kibana object and makes changes based on the state read and what is
// in the Kibana.Spec
func (r *ReconcileKibana) Reconcile(request reconcile.Request) (reconcile.Result, error) {
defer common.LogReconciliationRun(log, request, &r.iteration)()
defer common.LogReconciliationRun(log, request, "kibana_name", &r.iteration)()

// retrieve the kibana object
var kb kbv1.Kibana
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/kibanaassociation/association_controller.go
Expand Up @@ -126,7 +126,7 @@ func (r *ReconcileAssociation) onDelete(obj types.NamespacedName) error {
// Reconcile reads that state of the cluster for an Association object and makes changes based on the state read and what is in
// the Association.Spec
func (r *ReconcileAssociation) Reconcile(request reconcile.Request) (reconcile.Result, error) {
defer common.LogReconciliationRun(log, request, &r.iteration)()
defer common.LogReconciliationRun(log, request, "kibana_name", &r.iteration)()

var kibana kbv1.Kibana
if err := association.FetchWithAssociation(r.Client, request, &kibana); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/license/license_controller.go
Expand Up @@ -54,7 +54,7 @@ var log = logf.Log.WithName(name)
// In any case it schedules a new reconcile request to be processed when the license is about to expire.
// This happens independently from any watch triggered reconcile request.
func (r *ReconcileLicenses) Reconcile(request reconcile.Request) (reconcile.Result, error) {
defer common.LogReconciliationRun(log, request, &r.iteration)()
defer common.LogReconciliationRun(log, request, "es_name", &r.iteration)()
results := r.reconcileInternal(request)
current, err := results.Aggregate()
log.V(1).Info("Reconcile result", "requeue", current.Requeue, "requeueAfter", current.RequeueAfter)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/webhook/webhook_certificates_controller.go
Expand Up @@ -47,7 +47,7 @@ type ReconcileWebhookResources struct {
}

func (r *ReconcileWebhookResources) Reconcile(request reconcile.Request) (reconcile.Result, error) {
defer common.LogReconciliationRun(log, request, &r.iteration)()
defer common.LogReconciliationRun(log, request, "validating_webhook_configuration", &r.iteration)()
res := r.reconcileInternal()
return res.Aggregate()
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/utils/log/log.go
Expand Up @@ -20,6 +20,11 @@ import (
crzap "sigs.k8s.io/controller-runtime/pkg/log/zap"
)

const (
EcsVersion = "1.4.0"
EcsServiceType = "eck"
)

var verbosity = flag.Int("log-verbosity", 0, "Verbosity level of logs (-2=Error, -1=Warn, 0=Info, >0=Debug)")

// BindFlags attaches logging flags to the given flag set.
Expand Down Expand Up @@ -55,6 +60,10 @@ func setLogger(v *int) {
_ = flagset.Set("v", strconv.Itoa(int(zapLevel.Level())*-1))
}

opts := []zap.Option{zap.Fields(
zap.String("service.version", getVersionString()),
)}

var encoder zapcore.Encoder
if dev.Enabled {
encoderConf := zap.NewDevelopmentEncoderConfig()
Expand All @@ -64,11 +73,18 @@ func setLogger(v *int) {
encoderConf := zap.NewProductionEncoderConfig()
encoderConf.MessageKey = "message"
encoderConf.TimeKey = "@timestamp"
encoderConf.LevelKey = "log.level"
encoderConf.NameKey = "log.logger"
encoderConf.StacktraceKey = "error.stack_trace"
encoderConf.EncodeTime = zapcore.ISO8601TimeEncoder
encoder = zapcore.NewJSONEncoder(encoderConf)
opts = append(opts,
zap.Fields(
zap.String("service.type", EcsServiceType),
zap.String("ecs.version", EcsVersion),
))
}

opts := []zap.Option{zap.Fields(zap.String("ver", getVersionString()))}
stackTraceLevel := zap.NewAtomicLevelAt(zapcore.ErrorLevel)
crlog.SetLogger(crzap.New(func(o *crzap.Options) {
o.DestWritter = os.Stderr
Expand Down

0 comments on commit b96e23f

Please sign in to comment.