Skip to content

Commit

Permalink
Adds minimal traceparent header support to Elasticsearch backport(#74210
Browse files Browse the repository at this point in the history
) (#75331)

This adds just enough support for the traceparent header to be useful in es7
Contrary to es8 in 7.x branch the logs are in custom JSON format - not in ECS.
trace.id is added to deprecation, slow logs and server JSON logs

backport #74210
  • Loading branch information
pgomulka committed Aug 3, 2021
1 parent 90fa1a4 commit 6d2dd1f
Show file tree
Hide file tree
Showing 13 changed files with 178 additions and 37 deletions.
2 changes: 1 addition & 1 deletion distribution/src/config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ appender.deprecation_rolling.name = deprecation_rolling
appender.deprecation_rolling.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_deprecation.json
appender.deprecation_rolling.layout.type = ESJsonLayout
appender.deprecation_rolling.layout.type_name = deprecation.elasticsearch
appender.deprecation_rolling.layout.esmessagefields=x-opaque-id
appender.deprecation_rolling.layout.esmessagefields=x-opaque-id,key
appender.deprecation_rolling.filter.rate_limit.type = RateLimitingFilter

appender.deprecation_rolling.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_deprecation-%i.json.gz
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public void testLayout() {
"\"node.name\": \"%node_name\", " +
"\"message\": \"%notEmpty{%enc{%marker}{JSON} }%enc{%.-10000m}{JSON}\"" +
"%notEmpty{, %node_and_cluster_id }" +
"%notEmpty{, %trace_id }" +
"%exceptionAsJson }" + System.lineSeparator()));
}

Expand All @@ -62,6 +63,7 @@ public void testLayoutWithAdditionalFields() {
"%notEmpty{, \"x-opaque-id\": \"%ESMessageField{x-opaque-id}\"}" +
"%notEmpty{, \"someOtherField\": \"%ESMessageField{someOtherField}\"}" +
"%notEmpty{, %node_and_cluster_id }" +
"%notEmpty{, %trace_id }" +
"%exceptionAsJson }" + System.lineSeparator()));
}

Expand All @@ -82,6 +84,7 @@ public void testLayoutWithAdditionalFieldOverride() {
"\"node.name\": \"%node_name\"" +
"%notEmpty{, \"message\": \"%ESMessageField{message}\"}" +
"%notEmpty{, %node_and_cluster_id }" +
"%notEmpty{, %trace_id }" +
"%exceptionAsJson }" + System.lineSeparator()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,12 @@ public void tearDown() throws Exception {
super.tearDown();
}

public void testParseFieldEmittingLogs() throws Exception {


public void testParseFieldEmittingDeprecatedLogs() throws Exception {
withThreadContext(threadContext -> {
threadContext.putHeader(Task.X_OPAQUE_ID, "someId");
threadContext.putHeader(Task.TRACE_ID, "someTraceId");

ParseField deprecatedField = new ParseField("new_name", "deprecated_name");
assertTrue(deprecatedField.match("deprecated_name", LoggingDeprecationHandler.INSTANCE));
Expand All @@ -102,7 +105,9 @@ public void testParseFieldEmittingLogs() throws Exception {
hasEntry("cluster.name", "elasticsearch"),
hasEntry("node.name", "sample-name"),
hasEntry("message", "Deprecated field [deprecated_name] used, expected [new_name] instead"),
hasEntry("x-opaque-id", "someId")
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "deprecated_field_deprecated_name"),
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
hasEntry(Task.TRACE_ID, "someTraceId")
),
allOf(
hasEntry("type", "deprecation.elasticsearch"),
Expand All @@ -111,7 +116,9 @@ public void testParseFieldEmittingLogs() throws Exception {
hasEntry("cluster.name", "elasticsearch"),
hasEntry("node.name", "sample-name"),
hasEntry("message", "Deprecated field [deprecated_name2] used, expected [new_name] instead"),
hasEntry("x-opaque-id", "someId")
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "deprecated_field_deprecated_name2"),
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
hasEntry(Task.TRACE_ID, "someTraceId")
)

)
Expand All @@ -126,6 +133,7 @@ public void testParseFieldEmittingLogs() throws Exception {
public void testDeprecatedMessage() throws Exception {
withThreadContext(threadContext -> {
threadContext.putHeader(Task.X_OPAQUE_ID, "someId");
threadContext.putHeader(Task.TRACE_ID, "someTraceId");
final DeprecationLogger testLogger = DeprecationLogger.getLogger("org.elasticsearch.test");
testLogger.deprecate(DeprecationCategory.OTHER, "someKey", "deprecated message1");

Expand All @@ -147,6 +155,7 @@ public void testDeprecatedMessage() throws Exception {
hasEntry("cluster.name", "elasticsearch"),
hasEntry("node.name", "sample-name"),
hasEntry("message", "deprecated message1"),
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "someKey"),
hasEntry("x-opaque-id", "someId")
)
)
Expand All @@ -157,7 +166,6 @@ public void testDeprecatedMessage() throws Exception {
});
}


public void testDeprecatedMessageWithoutXOpaqueId() throws IOException {
final DeprecationLogger testLogger = DeprecationLogger.getLogger("org.elasticsearch.test");
testLogger.deprecate(DeprecationCategory.OTHER, "a key", "deprecated message1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ appender.deprecated.name = deprecated
appender.deprecated.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_deprecated.json
appender.deprecated.layout.type = ESJsonLayout
appender.deprecated.layout.type_name = deprecation.elasticsearch
appender.deprecated.layout.esmessagefields = x-opaque-id
appender.deprecated.layout.esmessagefields = x-opaque-id,key
appender.deprecated.filter.rate_limit.type = RateLimitingFilter

appender.deprecation_rolling_old.type = File
Expand All @@ -28,7 +28,7 @@ appender.deprecatedconsole.type = Console
appender.deprecatedconsole.name = deprecatedconsole
appender.deprecatedconsole.layout.type = ESJsonLayout
appender.deprecatedconsole.layout.type_name = deprecation.elasticsearch
appender.deprecatedconsole.layout.esmessagefields = x-opaque-id
appender.deprecatedconsole.layout.esmessagefields = x-opaque-id,key
appender.deprecatedconsole.filter.rate_limit.type = RateLimitingFilter

appender.index_search_slowlog_rolling.type = File
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,10 @@ public ActionModule(boolean transportClient, Settings settings, IndexNameExpress
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<RestHeaderDefinition> headers = Stream.concat(
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
Stream.of(new RestHeaderDefinition(Task.X_OPAQUE_ID, false))
Stream.of(
new RestHeaderDefinition(Task.X_OPAQUE_ID, false),
new RestHeaderDefinition(Task.TRACE_PARENT, false)
)
).collect(Collectors.toSet());
UnaryOperator<RestHandler> restWrapper = null;
for (ActionPlugin plugin : actionPlugins) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/
public class DeprecatedMessage extends ESLogMessage {
public static final String X_OPAQUE_ID_FIELD_NAME = "x-opaque-id";
public static final String KEY_FIELD_NAME = "key";

public DeprecatedMessage(DeprecationCategory category, String key, String xOpaqueId, String messagePattern, Object... args) {
super(fieldMap(category, key, xOpaqueId), messagePattern, args);
Expand All @@ -35,7 +36,7 @@ private static Map<String, Object> fieldMap(DeprecationCategory category, String
builder.put("category", category.name().toLowerCase(Locale.ROOT));

if (Strings.isNullOrEmpty(key) == false) {
builder.put("key", key);
builder.put(KEY_FIELD_NAME, key);
}
if (Strings.isNullOrEmpty(xOpaqueId) == false) {
builder.put(X_OPAQUE_ID_FIELD_NAME, xOpaqueId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ private String createPattern(Map<String, Object> map, Set<String> esMessageField
separator = ", ";
}
sb.append(notEmpty(", %node_and_cluster_id "));
sb.append(notEmpty(", %trace_id "));
sb.append("%exceptionAsJson ");
sb.append("}");
sb.append(System.lineSeparator());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.logging;

import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.pattern.ConverterKeys;
import org.apache.logging.log4j.core.pattern.LogEventPatternConverter;
import org.apache.logging.log4j.core.pattern.PatternConverter;
import org.elasticsearch.tasks.Task;

import java.util.Objects;

/**
* Pattern converter to format the trace id provided in the traceparent header into JSON fields <code>trace.id</code>.
*/
@Plugin(category = PatternConverter.CATEGORY, name = "TraceIdConverter")
@ConverterKeys({"trace_id"})
public final class TraceIdConverter extends LogEventPatternConverter {
/**
* Called by log4j2 to initialize this converter.
*/
public static TraceIdConverter newInstance(@SuppressWarnings("unused") final String[] options) {
return new TraceIdConverter();
}

public TraceIdConverter() {
super("trace_id", "trace_id");
}

public static String getTraceId() {
return HeaderWarning.THREAD_CONTEXT.stream()
.map(t -> t.<String>getHeader(Task.TRACE_ID))
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}

/**
* Formats the trace.id into json fields.
*
* @param event - a log event is ignored in this method as it uses the clusterId value
* from <code>NodeAndClusterIdStateListener</code> to format
*/
@Override
public void format(LogEvent event, StringBuilder toAppendTo) {
String traceId = getTraceId();
if (traceId != null) {
toAppendTo.append("\"trace.id\": \"" + traceId + "\"");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -109,15 +108,21 @@ public StoredContext stashContext() {
/**
* X-Opaque-ID should be preserved in a threadContext in order to propagate this across threads.
* This is needed so the DeprecationLogger in another thread can see the value of X-Opaque-ID provided by a user.
* The same is applied to Task.TRACE_ID.
* Otherwise when context is stash, it should be empty.
*/
if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
ThreadContextStruct threadContextStruct =
DEFAULT_CONTEXT.putHeaders(MapBuilder.<String, String>newMapBuilder()
.put(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID))
.immutableMap());
if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID) || context.requestHeaders.containsKey(Task.TRACE_ID)) {
Map<String, String> map = new HashMap<>(2, 1);
if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
map.put(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID));
}
if (context.requestHeaders.containsKey(Task.TRACE_ID)) {
map.put(Task.TRACE_ID, context.requestHeaders.get(Task.TRACE_ID));
}
ThreadContextStruct threadContextStruct = DEFAULT_CONTEXT.putHeaders(map);
threadLocal.set(threadContextStruct);
} else {
}
else {
threadLocal.set(DEFAULT_CONTEXT);
}
return () -> {
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ protected Node(final Environment initialEnvironment,
final Transport transport = networkModule.getTransportSupplier().get();
Set<String> taskHeaders = Stream.concat(
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
Stream.of(Task.X_OPAQUE_ID)
Stream.of(Task.X_OPAQUE_ID, Task.TRACE_ID)
).collect(Collectors.toSet());
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
Expand Down
55 changes: 35 additions & 20 deletions server/src/main/java/org/elasticsearch/rest/RestController.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.rest.RestHandler.Route;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.usage.UsageService;

import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -309,29 +310,15 @@ private void sendContentTypeErrorMessage(@Nullable List<String> contentTypeHeade
}

private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {
for (final RestHeaderDefinition restHeader : headersToCopy) {
final String name = restHeader.getName();
final List<String> headerValues = request.getAllHeaderValues(name);
if (headerValues != null && headerValues.isEmpty() == false) {
final List<String> distinctHeaderValues = headerValues.stream().distinct().collect(Collectors.toList());
if (restHeader.isMultiValueAllowed() == false && distinctHeaderValues.size() > 1) {
channel.sendResponse(
BytesRestResponse.
createSimpleErrorResponse(channel, BAD_REQUEST, "multiple values for single-valued header [" + name + "]."));
return;
} else {
threadContext.putHeader(name, String.join(",", distinctHeaderValues));
}
}
}
// error_trace cannot be used when we disable detailed errors
// we consume the error_trace parameter first to ensure that it is always consumed
if (request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled() == false) {
channel.sendResponse(
BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, "error traces in responses are disabled."));
try {
copyRestHeaders(request, threadContext);
validateErrorTrace(request, channel);
} catch (IllegalArgumentException e) {
channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, e.getMessage()));
return;
}


final String rawPath = request.rawPath();
final String uri = request.uri();
final RestRequest.Method requestMethod;
Expand Down Expand Up @@ -365,6 +352,34 @@ private void tryAllHandlers(final RestRequest request, final RestChannel channel
handleBadRequest(uri, requestMethod, channel);
}

private void validateErrorTrace(RestRequest request, RestChannel channel) {
// error_trace cannot be used when we disable detailed errors
// we consume the error_trace parameter first to ensure that it is always consumed
if (request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled() == false) {
throw new IllegalArgumentException("error traces in responses are disabled.");
}
}

private void copyRestHeaders(RestRequest request, ThreadContext threadContext) throws IOException {
for (final RestHeaderDefinition restHeader : headersToCopy) {
final String name = restHeader.getName();
final List<String> headerValues = request.getAllHeaderValues(name);
if (headerValues != null && headerValues.isEmpty() == false) {
final List<String> distinctHeaderValues = headerValues.stream().distinct().collect(Collectors.toList());
if (restHeader.isMultiValueAllowed() == false && distinctHeaderValues.size() > 1) {
throw new IllegalArgumentException("multiple values for single-valued header [" + name + "].");
} else if (name.equals(Task.TRACE_PARENT)) {
String traceparent = distinctHeaderValues.get(0);
if (traceparent.length() >= 55) {
threadContext.putHeader(Task.TRACE_ID, traceparent.substring(3, 35));
}
} else {
threadContext.putHeader(name, String.join(",", distinctHeaderValues));
}
}
}
}

Iterator<MethodHandlers> getAllHandlers(@Nullable Map<String, String> requestParamsRef, String rawPath) {
final Supplier<Map<String, String>> paramsSupplier;
if (requestParamsRef == null) {
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/elasticsearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ public class Task {
*/
public static final String X_OPAQUE_ID = "X-Opaque-Id";

/**
* The request header which is contained in HTTP request. We parse trace.id from it and store it in thread context.
* TRACE_PARENT once parsed in RestController.tryAllHandler is not preserved
* has to be declared as a header copied over from http request.
*/
public static final String TRACE_PARENT = "traceparent";

/**
* Parsed part of traceparent. It is stored in thread context and emitted in logs.
* Has to be declared as a header copied over for tasks.
*/
public static final String TRACE_ID = "trace.id";

private final long id;

private final String type;
Expand Down

0 comments on commit 6d2dd1f

Please sign in to comment.