Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write deprecation logs to a data stream #58924

Closed
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
36ac092
Introduce DeprecationIndexingService
pugnascotia Jun 17, 2020
cf5a677
First go at ensuring the data stream template is loaded
pugnascotia Jun 17, 2020
86c3d7d
Tweaks index name
pugnascotia Jun 30, 2020
d74dc9a
WIP - Adding tests for DeprecationIndexingService
pugnascotia Jul 2, 2020
6457f97
More tests
pugnascotia Jul 2, 2020
f0e0192
Fix template index pattern
pugnascotia Jul 2, 2020
581be1f
Merge remote-tracking branch 'upstream/master' into index-deprecation…
pugnascotia Jul 2, 2020
14c929a
Rework deprecation code so that log handlers are more generic
pugnascotia Jul 3, 2020
e902fb9
Move DeprecationIndexingService to x-pack depreacation plugin
pugnascotia Jul 3, 2020
40bea2c
Don't need a custom template any more :tada:
pugnascotia Jul 7, 2020
013bf55
Remove unused import
pugnascotia Jul 7, 2020
4c189da
Merge remote-tracking branch 'upstream/master' into index-deprecation…
pugnascotia Jul 7, 2020
d61b145
License header fix
pugnascotia Jul 7, 2020
ae62d06
Remove unused imports
pugnascotia Jul 7, 2020
32d8316
Add Javadoc
pugnascotia Jul 7, 2020
25ea88f
Silence log checker error
pugnascotia Jul 7, 2020
dce2a5d
Merge remote-tracking branch 'upstream/master' into index-deprecation…
pugnascotia Jul 7, 2020
3482d31
Subtitute log message params, simplify things
pugnascotia Jul 8, 2020
34e5529
Merge remote-tracking branch 'upstream/master' into index-deprecation…
pugnascotia Jul 8, 2020
f7f9ea7
Fixes
pugnascotia Jul 8, 2020
46c5aca
Use BulkProcessor in DeprecationIndexingService
pugnascotia Jul 9, 2020
c8d8086
Use the opaque ID from the EsLogMessage
pugnascotia Jul 9, 2020
4bf9b34
Record cluster and node IDs
pugnascotia Jul 13, 2020
75eac35
Tweaks
pugnascotia Jul 14, 2020
abd8381
Merge remote-tracking branch 'upstream/master' into index-deprecation…
pugnascotia Jul 14, 2020
8e4339f
WIP - trying a log4j implementation
pugnascotia Jul 15, 2020
d33b72b
Reimplement deprecation logging using log4j
pugnascotia Jul 17, 2020
379ca5d
Merge remote-tracking branch 'upstream/master' into index-deprecation…
pugnascotia Jul 17, 2020
4aead94
Reimplement using log4j abstractions
pugnascotia Jul 20, 2020
e8d246b
Polishing
pugnascotia Jul 20, 2020
31ff5fe
Merge remote-tracking branch 'upstream/master' into index-deprecation…
pugnascotia Jul 21, 2020
95e3fc3
More polishing
pugnascotia Jul 21, 2020
c0b8f65
Include key in DeprecatedMessage.of(...)
pugnascotia Jul 21, 2020
07c34cc
Merge remote-tracking branch 'upstream/master' into index-deprecation…
pugnascotia Jul 22, 2020
d6a75bd
Strip out UnionFilter
pugnascotia Jul 22, 2020
074ef8d
Build deprecation msg docs using EcsJsonLayout
pugnascotia Jul 22, 2020
516ce6d
Checkstyle
pugnascotia Jul 22, 2020
a750f49
Update docker log4j configs
pugnascotia Jul 22, 2020
2352a20
Fix libs tests
pugnascotia Jul 23, 2020
3b80fdd
Configure HeaderWarningAppender in ESTestCase
pugnascotia Jul 23, 2020
1a3f2da
Fix docker log4j deprecation levels
pugnascotia Jul 23, 2020
3db6c7a
Test fixes
pugnascotia Jul 23, 2020
edd0466
Merge remote-tracking branch 'upstream/master' into index-deprecation…
pugnascotia Jul 27, 2020
0d70f53
Merge remote-tracking branch 'upstream/master' into index-deprecation…
pugnascotia Jul 31, 2020
5529912
WIP - trying to write an integration test
pugnascotia Aug 3, 2020
3452367
Implement indexing test
pugnascotia Aug 4, 2020
d17a5e8
WIP - trying to rewrite DeprecationHttpIT as a rest test
pugnascotia Aug 5, 2020
8f70584
Tests execute against custom plugin, but need fixed
pugnascotia Aug 7, 2020
4f6f501
Rework deprecation HTTP tests
pugnascotia Aug 12, 2020
98def13
Merge remote-tracking branch 'upstream/master' into index-deprecation…
pugnascotia Aug 12, 2020
97f4934
Merge remote-tracking branch 'upstream/master' into index-deprecation…
pugnascotia Aug 12, 2020
26a3495
Tweaks
pugnascotia Aug 12, 2020
c070571
Merge branch 'index-deprecation-logs-v2-tests' into index-deprecation…
pugnascotia Aug 12, 2020
022793e
Fix tests finally, by resetting the rate limiting filter
pugnascotia Aug 13, 2020
d8eee69
Merge remote-tracking branch 'upstream/master' into index-deprecation…
pugnascotia Aug 13, 2020
afc4dcf
Add test
pugnascotia Aug 13, 2020
2d1d3fb
Merge remote-tracking branch 'upstream/master' into index-deprecation…
pugnascotia Aug 13, 2020
3c8069b
Fix license checks
pugnascotia Aug 13, 2020
2dac9a3
Formatting
pugnascotia Aug 13, 2020
f0f3edb
Remove redundant line
pugnascotia Aug 13, 2020
9877d57
Merge branch 'master' into index-deprecation-logs-v2
elasticmachine Aug 14, 2020
1309355
Remove unused import
pugnascotia Aug 17, 2020
bd8e3c9
Merge remote-tracking branch 'upstream/master' into index-deprecation…
pugnascotia Aug 18, 2020
76993c9
Add warning headers solely through the log4j appender
pugnascotia Aug 18, 2020
e695753
Merge branch 'master' into index-deprecation-logs-v2
elasticmachine Aug 19, 2020
3ff1015
Fixes to JsonLoggerTest
pugnascotia Aug 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package org.elasticsearch.common.logging;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;

import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.common.Strings.isNullOrEmpty;

/**
* This service is responsible for writing deprecation messages to a data stream. It also creates
* the data stream if necessary. The writing of messages can be toggled using the
* {@link #WRITE_DEPRECATION_LOGS_TO_INDEX} setting.
*/
public class DeprecationIndexingService implements ClusterStateListener {
private static final Logger LOGGER = LogManager.getLogger(DeprecationIndexingService.class);

private static final String DATA_STREAM_NAME = "logs-deprecation-elasticsearch";
private static final String TEMPLATE_NAME = DATA_STREAM_NAME + "-template";
private static final String TEMPLATE_MAPPING = TEMPLATE_NAME + ".json";

private static final String DEPRECATION_ORIGIN = "deprecation";

public static final Setting<Boolean> WRITE_DEPRECATION_LOGS_TO_INDEX = Setting.boolSetting(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the setting allow to flex between writing to a log, an index, or both ?

"cluster.deprecation_indexing.enabled",
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final Client client;
private boolean hasTriedToLoadTemplate = false;
private volatile boolean isEnabled = false;

public DeprecationIndexingService(ClusterService clusterService, Client client) {
this.client = new OriginSettingClient(client, DEPRECATION_ORIGIN);

clusterService.addListener(this);
}

/**
* Indexes a deprecation message.
* @param key the key that was used to determine if this deprecation should have been be logged.
* Useful when aggregating the recorded messages.
* @param message the message to log
* @param xOpaqueId the associated "X-Opaque-ID" header value if any, or <code>null</code>
* @param params parameters to the message, if any
*/
public void writeMessage(String key, String message, String xOpaqueId, Object[] params) {
if (this.isEnabled == false) {
return;
}

Map<String, Object> payload = new HashMap<>();
pugnascotia marked this conversation as resolved.
Show resolved Hide resolved
payload.put("@timestamp", Instant.now().toString());
payload.put("key", key);
payload.put("message", message);
pugnascotia marked this conversation as resolved.
Show resolved Hide resolved

if (isNullOrEmpty(xOpaqueId) == false) {
payload.put("x-opaque-id", xOpaqueId);
}

if (params != null && params.length > 0) {
payload.put("params", params);
}

new IndexRequestBuilder(client, IndexAction.INSTANCE).setIndex(DATA_STREAM_NAME)
pugnascotia marked this conversation as resolved.
Show resolved Hide resolved
.setOpType(DocWriteRequest.OpType.CREATE)
.setSource(payload)
.execute(new ActionListener<>() {
@Override
public void onResponse(IndexResponse indexResponse) {
// Nothing to do
}

@Override
public void onFailure(Exception e) {
LOGGER.error("Failed to index deprecation message", e);
}
});
}

/**
* Listens for changes to the cluster state, in order to know whether to toggle indexing.
*/
@Override
public void clusterChanged(ClusterChangedEvent event) {
this.isEnabled = WRITE_DEPRECATION_LOGS_TO_INDEX.get(event.state().getMetadata().settings());

if (this.isEnabled == false || this.hasTriedToLoadTemplate == true) {
return;
}

// We only ever try to load the template once, because if there's a problem, we'll spam
// the log with the failure on every cluster state update
this.hasTriedToLoadTemplate = true;

if (event.state().getMetadata().templatesV2().containsKey(TEMPLATE_NAME)) {
return;
}

loadTemplate();
}

/*
* Attempts to load a template for the deprecation logs data stream
*/
private void loadTemplate() {
pugnascotia marked this conversation as resolved.
Show resolved Hide resolved
try (InputStream is = getClass().getResourceAsStream(TEMPLATE_MAPPING)) {
final XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, null, is);

PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(TEMPLATE_NAME);
request.cause("auto (deprecation indexing service)");
request.indexTemplate(ComposableIndexTemplate.parse(parser));

this.client.execute(PutComposableIndexTemplateAction.INSTANCE, request, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
if (acknowledgedResponse.isAcknowledged() == false) {
LOGGER.error("The attempt to create a deprecations index template was not acknowledged.");
}
}

@Override
public void onFailure(Exception e) {
LOGGER.error("Failed to create the deprecations index template: " + e.getMessage(), e);
}
});
} catch (IOException e) {
LOGGER.error("Failed to load " + TEMPLATE_MAPPING + ": " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* @see ThrottlingAndHeaderWarningLogger for throttling and header warnings implementation details
*/
public class DeprecationLogger {
private static DeprecationIndexingService indexingService;
private final ThrottlingAndHeaderWarningLogger deprecationLogger;

/**
Expand All @@ -45,8 +46,8 @@ public class DeprecationLogger {
* it replaces "org.elasticsearch" with "org.elasticsearch.deprecation" to maintain
* the "org.elasticsearch" namespace.
*/
private DeprecationLogger(Logger parentLogger) {
deprecationLogger = new ThrottlingAndHeaderWarningLogger(parentLogger);
public DeprecationLogger(Logger parentLogger) {
pugnascotia marked this conversation as resolved.
Show resolved Hide resolved
deprecationLogger = new ThrottlingAndHeaderWarningLogger(deprecatedLoggerName(parentLogger.getName()), indexingService);
pugnascotia marked this conversation as resolved.
Show resolved Hide resolved
}

public static DeprecationLogger getLogger(Class<?> aClass) {
Expand Down Expand Up @@ -79,6 +80,14 @@ public static void removeThreadContext(ThreadContext threadContext) {
HeaderWarning.removeThreadContext(threadContext);
}

public static void setIndexingService(DeprecationIndexingService indexingService) {
DeprecationLogger.indexingService = indexingService;
}

public static void removeIndexingService() {
DeprecationLogger.indexingService = null;
}

/**
* Logs a deprecation message, adding a formatted warning message as a response header on the thread context.
* The deprecation message will be throttled to deprecation log.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.logging;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;

/**
* <p>
* This class limits the number of times that actions are carried out.
* <p>
* The throttling algorithm relies on a LRU set of keys, which evicts entries when its size exceeds 128.
* When a {@code key} is seen for the first time, the {@code runnable} will be executed, but then will not be
* executed again for that key until the key is removed from the set.
*/
class RateLimiter {

// LRU set of keys used to determine if a message should be emitted to the logs
private final Set<String> keys = Collections.newSetFromMap(Collections.synchronizedMap(new LinkedHashMap<>() {
@Override
protected boolean removeEldestEntry(final Map.Entry<String, Boolean> eldest) {
return size() > 128;
}
}));

void limit(String key, Runnable runnable) {
boolean shouldRun = keys.add(key);
if (shouldRun) {
runnable.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,32 @@
package org.elasticsearch.common.logging;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.elasticsearch.common.SuppressLoggerChecks;
import org.elasticsearch.common.util.concurrent.ThreadContext;

import java.security.AccessController;
import java.security.PrivilegedAction;

/**
* This class wraps both <code>HeaderWarningLogger</code> and <code>ThrottlingLogger</code>
* which is a common use case across Elasticsearch
* This class composes {@link HeaderWarning}, {@link DeprecationIndexingService} and {@link Logger},
* in order to apply a single message to multiple destination.
* <p>
* Logging and indexing are throttled in order to avoid filling the destination with duplicates.
* Throttling is implemented using a mandatory per-message key combined with any <code>X-Opaque-Id</code>
* HTTP header value. This header allows throttling per user. This value is set in {@link ThreadContext}.
* <p>
* TODO wrapping logging this way limits the usage of %location. It will think this is used from that class.
*/
class ThrottlingAndHeaderWarningLogger {
private final ThrottlingLogger throttlingLogger;
private final Logger logger;
private final RateLimiter rateLimiter;
private final DeprecationIndexingService indexingService;

ThrottlingAndHeaderWarningLogger(Logger logger) {
this.throttlingLogger = new ThrottlingLogger(logger);
ThrottlingAndHeaderWarningLogger(Logger logger, DeprecationIndexingService indexingService) {
this.logger = logger;
this.rateLimiter = new RateLimiter();
this.indexingService = indexingService;
}

/**
Expand All @@ -43,7 +59,24 @@ void throttleLogAndAddWarning(final String key, ESLogMessage message) {
String messagePattern = message.getMessagePattern();
Object[] arguments = message.getArguments();
HeaderWarning.addWarning(messagePattern, arguments);
throttlingLogger.throttleLog(key, message);

String xOpaqueId = HeaderWarning.getXOpaqueId();
this.rateLimiter.limit(xOpaqueId + key, () -> {
log(message);
if (indexingService != null) {
indexingService.writeMessage(key, messagePattern, xOpaqueId, arguments);
}
});
}

private void log(Message message) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@SuppressLoggerChecks(reason = "safely delegates to logger")
@Override
public Void run() {
logger.warn(message);
return null;
}
});
}
}

This file was deleted.

Loading