Skip to content

Commit

Permalink
Emit x-elastic-product-origin into logs backport(#81381) (#81518)
Browse files Browse the repository at this point in the history
This commit emits the value of x-elastic-product-origin header into elasticsearch.elastic_product_origin field in ES deprecation logs and indexed deprecation logs.
x-elastic-product-origin header is intended to identify the elastic stack origin and allow to ignore deprecations emitted by the stack.

ES v7 logging differs from v8 as it creates the JSON layout on its own. The additional custom field have to be declared in log4j.properties and ECSLayout used for indexing deprecated logs also has to be updated
backports #81381
  • Loading branch information
pgomulka committed Dec 9, 2021
1 parent f4b7127 commit 850aa05
Show file tree
Hide file tree
Showing 41 changed files with 231 additions and 109 deletions.
4 changes: 2 additions & 2 deletions distribution/src/config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ appender.deprecation_rolling.name = deprecation_rolling
appender.deprecation_rolling.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_deprecation.json
appender.deprecation_rolling.layout.type = ESJsonLayout
appender.deprecation_rolling.layout.type_name = deprecation.elasticsearch
appender.deprecation_rolling.layout.esmessagefields=x-opaque-id,key,category
appender.deprecation_rolling.layout.esmessagefields=x-opaque-id,key,category,elasticsearch.elastic_product_origin
appender.deprecation_rolling.filter.rate_limit.type = RateLimitingFilter

appender.deprecation_rolling.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_deprecation-%i.json.gz
Expand All @@ -81,7 +81,7 @@ appender.deprecation_rolling_old.type = RollingFile
appender.deprecation_rolling_old.name = deprecation_rolling_old
appender.deprecation_rolling_old.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_deprecation.log
appender.deprecation_rolling_old.layout.type = PatternLayout
appender.deprecation_rolling_old.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] [%node_name]%marker %m%n
appender.deprecation_rolling_old.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] [%node_name] [%product_origin]%marker %m%n
appender.deprecation_rolling_old.filter.rate_limit.type = RateLimitingFilter

appender.deprecation_rolling_old.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ static Collection<String> returnHttpResponseBodies(Collection<FullHttpResponse>
static Collection<String> returnOpaqueIds(Collection<FullHttpResponse> responses) {
List<String> list = new ArrayList<>(responses.size());
for (HttpResponse response : responses) {
list.add(response.headers().get(Task.X_OPAQUE_ID));
list.add(response.headers().get(Task.X_OPAQUE_ID_HTTP_HEADER));
}
return list;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class NioHttpClient implements Closeable {
static Collection<String> returnOpaqueIds(Collection<FullHttpResponse> responses) {
List<String> list = new ArrayList<>(responses.size());
for (HttpResponse response : responses) {
list.add(response.headers().get(Task.X_OPAQUE_ID));
list.add(response.headers().get(Task.X_OPAQUE_ID_HTTP_HEADER));
}
return list;
}
Expand All @@ -99,7 +99,7 @@ public Collection<FullHttpResponse> get(InetSocketAddress remoteAddress, String.
for (int i = 0; i < uris.length; i++) {
final HttpRequest httpRequest = new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]);
httpRequest.headers().add(HOST, "localhost");
httpRequest.headers().add(Task.X_OPAQUE_ID, String.valueOf(i));
httpRequest.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, String.valueOf(i));
requests.add(httpRequest);
}
return sendRequests(remoteAddress, requests);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ public void testDeprecatedMessageWithoutXOpaqueId() throws IOException {

public void testParseFieldEmittingDeprecatedLogs() throws Exception {
withThreadContext(threadContext -> {
threadContext.putHeader(Task.X_OPAQUE_ID, "someId");
threadContext.putHeader(Task.X_OPAQUE_ID_HTTP_HEADER, "someId");
threadContext.putHeader(Task.TRACE_ID, "someTraceId");
threadContext.putHeader(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER, "kibana");

ParseField deprecatedField = new ParseField("new_name", "deprecated_name");
assertTrue(deprecatedField.match("deprecated_name", LoggingDeprecationHandler.INSTANCE));
Expand Down Expand Up @@ -160,7 +161,8 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception {
hasEntry("message", "Deprecated field [deprecated_name] used, expected [new_name] instead"),
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "deprecated_field_deprecated_name"),
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
hasEntry(Task.TRACE_ID, "someTraceId")
hasEntry(Task.TRACE_ID, "someTraceId"),
hasEntry(DeprecatedMessage.ELASTIC_ORIGIN_FIELD_NAME, "kibana")
),
allOf(
hasEntry("type", "deprecation.elasticsearch"),
Expand All @@ -172,7 +174,8 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception {
hasEntry("message", "Deprecated field [deprecated_name2] used, expected [new_name] instead"),
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "deprecated_field_deprecated_name2"),
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
hasEntry(Task.TRACE_ID, "someTraceId")
hasEntry(Task.TRACE_ID, "someTraceId"),
hasEntry(DeprecatedMessage.ELASTIC_ORIGIN_FIELD_NAME, "kibana")
)

)
Expand All @@ -188,8 +191,9 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception {

public void testDeprecatedMessage() throws Exception {
withThreadContext(threadContext -> {
threadContext.putHeader(Task.X_OPAQUE_ID, "someId");
threadContext.putHeader(Task.X_OPAQUE_ID_HTTP_HEADER, "someId");
threadContext.putHeader(Task.TRACE_ID, "someTraceId");
threadContext.putHeader(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER, "kibana");
final DeprecationLogger testLogger = DeprecationLogger.getLogger("org.elasticsearch.test");
testLogger.critical(DeprecationCategory.OTHER, "someKey", "deprecated message1");

Expand All @@ -213,7 +217,8 @@ public void testDeprecatedMessage() throws Exception {
hasEntry("node.name", "sample-name"),
hasEntry("message", "deprecated message1"),
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "someKey"),
hasEntry("x-opaque-id", "someId")
hasEntry("x-opaque-id", "someId"),
hasEntry(DeprecatedMessage.ELASTIC_ORIGIN_FIELD_NAME, "kibana")
)
)
);
Expand Down Expand Up @@ -367,7 +372,7 @@ public void testDuplicateLogMessages() throws Exception {

// For the same key and X-Opaque-ID deprecation should be once
withThreadContext(threadContext -> {
threadContext.putHeader(Task.X_OPAQUE_ID, "ID1");
threadContext.putHeader(Task.X_OPAQUE_ID_HTTP_HEADER, "ID1");
deprecationLogger.critical(DeprecationCategory.OTHER, "key", "message1");
deprecationLogger.critical(DeprecationCategory.OTHER, "key", "message2");
assertWarnings("message1", "message2");
Expand Down Expand Up @@ -403,7 +408,7 @@ public void testDuplicateLogMessages() throws Exception {
// For the same key and different X-Opaque-ID should be multiple times per key/x-opaque-id
// continuing with message1-ID1 in logs already, adding a new deprecation log line with message2-ID2
withThreadContext(threadContext -> {
threadContext.putHeader(Task.X_OPAQUE_ID, "ID2");
threadContext.putHeader(Task.X_OPAQUE_ID_HTTP_HEADER, "ID2");
deprecationLogger.critical(DeprecationCategory.OTHER, "key", "message1");
deprecationLogger.critical(DeprecationCategory.OTHER, "key", "message2");
assertWarnings("message1", "message2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ appender.deprecated.name = deprecated
appender.deprecated.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_deprecated.json
appender.deprecated.layout.type = ESJsonLayout
appender.deprecated.layout.type_name = deprecation.elasticsearch
appender.deprecated.layout.esmessagefields = x-opaque-id,key,category
appender.deprecated.layout.esmessagefields = x-opaque-id,key,category,elasticsearch.elastic_product_origin
appender.deprecated.filter.rate_limit.type = RateLimitingFilter

appender.deprecation_rolling_old.type = File
Expand All @@ -28,7 +28,7 @@ appender.deprecatedconsole.type = Console
appender.deprecatedconsole.name = deprecatedconsole
appender.deprecatedconsole.layout.type = ESJsonLayout
appender.deprecatedconsole.layout.type_name = deprecation.elasticsearch
appender.deprecatedconsole.layout.esmessagefields = x-opaque-id,key,category
appender.deprecatedconsole.layout.esmessagefields = x-opaque-id,key,category,elasticsearch.elastic_product_origin
appender.deprecatedconsole.filter.rate_limit.type = RateLimitingFilter

appender.index_search_slowlog_rolling.type = File
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public void testSearchTaskDescriptions() {
.get();

Map<String, String> headers = new HashMap<>();
headers.put(Task.X_OPAQUE_ID, "my_id");
headers.put(Task.X_OPAQUE_ID_HTTP_HEADER, "my_id");
headers.put("Foo-Header", "bar");
headers.put("Custom-Task-Header", "my_value");
assertSearchResponse(
Expand Down Expand Up @@ -407,7 +407,7 @@ public void testSearchTaskHeaderLimit() {
int maxSize = Math.toIntExact(SETTING_HTTP_MAX_HEADER_SIZE.getDefault(Settings.EMPTY).getBytes() / 2 + 1);

Map<String, String> headers = new HashMap<>();
headers.put(Task.X_OPAQUE_ID, "my_id");
headers.put(Task.X_OPAQUE_ID_HTTP_HEADER, "my_id");
headers.put("Custom-Task-Header", randomAlphaOfLengthBetween(maxSize, maxSize + 100));
IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
Expand All @@ -418,7 +418,7 @@ public void testSearchTaskHeaderLimit() {

private void assertTaskHeaders(TaskInfo taskInfo) {
assertThat(taskInfo.getHeaders().keySet(), hasSize(2));
assertEquals("my_id", taskInfo.getHeaders().get(Task.X_OPAQUE_ID));
assertEquals("my_id", taskInfo.getHeaders().get(Task.X_OPAQUE_ID_HTTP_HEADER));
assertEquals("my_value", taskInfo.getHeaders().get("Custom-Task-Header"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,11 @@ public ActionModule(
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<RestHeaderDefinition> headers = Stream.concat(
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
Stream.of(new RestHeaderDefinition(Task.X_OPAQUE_ID, false), new RestHeaderDefinition(Task.TRACE_PARENT, false))
Stream.of(
new RestHeaderDefinition(Task.X_OPAQUE_ID_HTTP_HEADER, false),
new RestHeaderDefinition(Task.TRACE_PARENT_HTTP_HEADER, false),
new RestHeaderDefinition(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER, false)
)
).collect(Collectors.toSet());
UnaryOperator<RestHandler> restWrapper = null;
for (ActionPlugin plugin : actionPlugins) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,22 @@
* Carries x-opaque-id field if provided in the headers. Will populate the x-opaque-id field in JSON logs.
*/
public class DeprecatedMessage extends ESLogMessage {
public static final String ELASTIC_ORIGIN_FIELD_NAME = "elasticsearch.elastic_product_origin";
public static final String X_OPAQUE_ID_FIELD_NAME = "x-opaque-id";
public static final String KEY_FIELD_NAME = "key";

public DeprecatedMessage(DeprecationCategory category, String key, String xOpaqueId, String messagePattern, Object... args) {
super(fieldMap(category, key, xOpaqueId), messagePattern, args);
public DeprecatedMessage(
DeprecationCategory category,
String key,
String xOpaqueId,
String productOrigin,
String messagePattern,
Object... args
) {
super(fieldMap(category, key, xOpaqueId, productOrigin), messagePattern, args);
}

private static Map<String, Object> fieldMap(DeprecationCategory category, String key, String xOpaqueId) {
private static Map<String, Object> fieldMap(DeprecationCategory category, String key, String xOpaqueId, String productOrigin) {
final MapBuilder<String, Object> builder = MapBuilder.newMapBuilder();

// The fields below are emitted using ECS keys in `EcsJsonLayout`
Expand All @@ -41,6 +49,9 @@ private static Map<String, Object> fieldMap(DeprecationCategory category, String
if (Strings.isNullOrEmpty(xOpaqueId) == false) {
builder.put(X_OPAQUE_ID_FIELD_NAME, xOpaqueId);
}
if (Strings.isNullOrEmpty(xOpaqueId) == false) {
builder.put(ELASTIC_ORIGIN_FIELD_NAME, productOrigin);
}
return builder.immutableMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ public DeprecationLogger warn(final DeprecationCategory category, final String k
}

private DeprecationLogger logDeprecation(Level level, DeprecationCategory category, String key, String msg, Object[] params) {
ESLogMessage deprecationMessage = new DeprecatedMessage(category, key, HeaderWarning.getXOpaqueId(), msg, params);
String opaqueId = HeaderWarning.getXOpaqueId();
String productOrigin = HeaderWarning.getProductOrigin();
ESLogMessage deprecationMessage = new DeprecatedMessage(category, key, opaqueId, productOrigin, msg, params);
logger.log(level, deprecationMessage);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,19 @@ private static char hex(int b) {
}
}

public static String getProductOrigin() {
return getSingleValue(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER);
}

public static String getXOpaqueId() {
return getSingleValue(Task.X_OPAQUE_ID_HTTP_HEADER);
}

private static String getSingleValue(String headerName) {
return THREAD_CONTEXT.stream()
.filter(t -> t.getHeader(Task.X_OPAQUE_ID) != null)
.filter(t -> t.getHeader(headerName) != null)
.findFirst()
.map(t -> t.getHeader(Task.X_OPAQUE_ID))
.map(t -> t.getHeader(headerName))
.orElse("");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.logging;

import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.pattern.ConverterKeys;
import org.apache.logging.log4j.core.pattern.LogEventPatternConverter;
import org.apache.logging.log4j.core.pattern.PatternConverter;
import org.elasticsearch.tasks.Task;

import java.util.Objects;

/**
* Pattern converter to format the X-elastic-product-origin into plaintext logs.
*/
@Plugin(category = PatternConverter.CATEGORY, name = "ProductOriginConverter")
@ConverterKeys({ "product_origin" })
public final class ProductOriginConverter extends LogEventPatternConverter {
/**
* Called by log4j2 to initialize this converter.
*/
public static ProductOriginConverter newInstance(@SuppressWarnings("unused") final String[] options) {
return new ProductOriginConverter();
}

public ProductOriginConverter() {
super("product_origin", "product_origin");
}

public static String getProductOrigin() {
return HeaderWarning.THREAD_CONTEXT.stream()
.map(t -> t.<String>getHeader(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER))
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}

/**
* Formats the X-elastic-product-origin into plaintext logs/
*
* @param event - a log event is ignored in this method as it uses the value from ThreadContext
* from <code>NodeAndClusterIdStateListener</code> to format
*/
@Override
public void format(LogEvent event, StringBuilder toAppendTo) {
String productOrigin = getProductOrigin();
if (productOrigin != null) {
toAppendTo.append(productOrigin);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public static String getTraceId() {
/**
* Formats the trace.id into json fields.
*
* @param event - a log event is ignored in this method as it uses the clusterId value
* from <code>NodeAndClusterIdStateListener</code> to format
* @param event - a log event is ignored in this method as it uses the value from ThreadContext
*/
@Override
public void format(LogEvent event, StringBuilder toAppendTo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.tasks.Task;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand All @@ -41,6 +40,7 @@

import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE;
import static org.elasticsearch.tasks.Task.HEADERS_TO_COPY;

/**
* A ThreadContext is a map of string headers and a transient map of keyed objects that are associated with
Expand Down Expand Up @@ -110,19 +110,15 @@ public StoredContext stashContext() {
* The same is applied to Task.TRACE_ID.
* Otherwise when context is stash, it should be empty.
*/
if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID) || context.requestHeaders.containsKey(Task.TRACE_ID)) {
Map<String, String> map = new HashMap<>(2, 1);
if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
map.put(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID));
}
if (context.requestHeaders.containsKey(Task.TRACE_ID)) {
map.put(Task.TRACE_ID, context.requestHeaders.get(Task.TRACE_ID));
}

if (HEADERS_TO_COPY.stream().anyMatch(header -> context.requestHeaders.containsKey(header))) {
Map<String, String> map = headers(context, HEADERS_TO_COPY);
ThreadContextStruct threadContextStruct = DEFAULT_CONTEXT.putHeaders(map);
threadLocal.set(threadContextStruct);
} else {
threadLocal.set(DEFAULT_CONTEXT);
}

return () -> {
// If the node and thus the threadLocal get closed while this task
// is still executing, we don't want this runnable to fail with an
Expand All @@ -131,6 +127,16 @@ public StoredContext stashContext() {
};
}

private Map<String, String> headers(ThreadContextStruct context, Set<String> headersToCopy) {
Map<String, String> map = new HashMap<>(headersToCopy.size(), 1);
for (String header : headersToCopy) {
if (context.requestHeaders.containsKey(header)) {
map.put(header, context.requestHeaders.get(header));
}
}
return map;
}

/**
* Captures the current thread context as writeable, allowing it to be serialized out later
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ public void incomingRequest(final HttpRequest httpRequest, final HttpChannel htt
if (logThreshold > 0 && took > logThreshold) {
logger.warn(
"handling request [{}][{}][{}][{}] took [{}ms] which is above the warn threshold of [{}ms]",
httpRequest.header(Task.X_OPAQUE_ID),
httpRequest.header(Task.X_OPAQUE_ID_HTTP_HEADER),
httpRequest.method(),
httpRequest.uri(),
httpChannel,
Expand Down

0 comments on commit 850aa05

Please sign in to comment.