Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add settings to control size and count of warning headers in responses #28427

21 changes: 21 additions & 0 deletions docs/reference/modules/cluster/misc.asciidoc
Expand Up @@ -56,3 +56,24 @@ PUT /_cluster/settings
}
-------------------------------
// CONSOLE

[[cluster-warning-headers]]
==== Warning Headers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the documentation for these settings should be moved to the HTTP docs.

For every distinct warning, elastic cluster will return a new warning header in the client HTTP response.
Sometimes the amount of returned warning headers can be too large and exceed client configuration settings.
These dynamic settings allow to set the maximum count and size of warning headers in client http responses.
Once the maximum count or size is reached, any extra warning will not produce an additional warning header.
The default value for `http.max_warning_header_count` is unbounded.
The default value for `http.max_warning_header_size` is unbounded.

[source,js]
-------------------------------
PUT /_cluster/settings
{
"persistent" : {
"http.max_warning_header_count" : 62,
"http.max_warning_header_size" : "7Kb"
}
}
-------------------------------
// CONSOLE
Expand Up @@ -245,6 +245,8 @@ public void apply(Settings value, Settings current, Settings previous) {
HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH,
HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE,
HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE,
HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT,
HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE,
HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH,
HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT,
HttpTransportSettings.SETTING_HTTP_RESET_COOKIES,
Expand Down
Expand Up @@ -23,10 +23,16 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.http.HttpTransportSettings;

import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -39,13 +45,14 @@
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.nio.charset.StandardCharsets;


/**
* A ThreadContext is a map of string headers and a transient map of keyed objects that are associated with
Expand Down Expand Up @@ -81,6 +88,8 @@ public final class ThreadContext implements Closeable, Writeable {
private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct();
private final Map<String, String> defaultHeader;
private final ContextThreadLocal threadLocal;
private static volatile int maxWrnHeaderCount;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use traditional Java variable style names here (e.g., maxWarningHeaderCount).

private static volatile long maxWrnHeaderSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxWrnHeaderSize -> maxWarningHeaderSize


/**
* Creates a new ThreadContext instance
Expand All @@ -98,13 +107,23 @@ public ThreadContext(Settings settings) {
this.defaultHeader = Collections.unmodifiableMap(defaultHeader);
}
threadLocal = new ContextThreadLocal();
maxWrnHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings);
maxWrnHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes();
}

@Override
public void close() throws IOException {
threadLocal.close();
}

public static void setMaxWarningHeaderCount(int newMaxWrnHeaderCount){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: spacing between the ) and {: ){ -> ) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newMaxWrnHeaderCount -> maxWarningHeaderCount(and reference the member field by dereferencingthis` then).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the method parameter final too; this is a safety guard against accidentally assigning to the method parameter instead of the member field.

maxWrnHeaderCount = newMaxWrnHeaderCount;
}

public static void setMaxWarningHeaderSize(ByteSizeValue newMaxWarningHeaderSize){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

){ -> ) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the method parameter final as a guard against accidentally assigning to it instead of the to the member field.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newMaxWarningHeaderSize -> maxWarningHeaderSize, and reference the member field by dereferencing this.

maxWrnHeaderSize = newMaxWarningHeaderSize.getBytes();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we accept a ByteSizeValue parameter yet store it as a long? Should we preserve the ByteSizeValue?

}

/**
* Removes the current context and resets a default context. The removed context can be
* restored when closing the returned {@link StoredContext}
Expand Down Expand Up @@ -359,7 +378,8 @@ private static final class ThreadContextStruct {
private final Map<String, Object> transientHeaders;
private final Map<String, List<String>> responseHeaders;
private final boolean isSystemContext;

private long wrnHeadersSize; //saving current warning headers' size not to recalculate the size with every new warning header
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrnHeaderSize -> warningHeaderSize

private boolean isWrnLmtReached;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this field strictly needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think yes. this field once the limit is reached ensures that we don't check and log "warnings limit reached" again and again with a new warning header.

private ThreadContextStruct(StreamInput in) throws IOException {
final int numRequest = in.readVInt();
Map<String, String> requestHeaders = numRequest == 0 ? Collections.emptyMap() : new HashMap<>(numRequest);
Expand All @@ -371,6 +391,8 @@ private ThreadContextStruct(StreamInput in) throws IOException {
this.responseHeaders = in.readMapOfLists(StreamInput::readString, StreamInput::readString);
this.transientHeaders = Collections.emptyMap();
isSystemContext = false; // we never serialize this it's a transient flag
wrnHeadersSize = 0L;
isWrnLmtReached = false;
}

private ThreadContextStruct setSystemContext() {
Expand All @@ -387,6 +409,20 @@ private ThreadContextStruct(Map<String, String> requestHeaders,
this.responseHeaders = responseHeaders;
this.transientHeaders = transientHeaders;
this.isSystemContext = isSystemContext;
this.wrnHeadersSize = 0L;
isWrnLmtReached = false;
}

private ThreadContextStruct(Map<String, String> requestHeaders,
Map<String, List<String>> responseHeaders,
Map<String, Object> transientHeaders, boolean isSystemContext,
long wrnHeadersSize, boolean isWrnLmtReached) {
this.requestHeaders = requestHeaders;
this.responseHeaders = responseHeaders;
this.transientHeaders = transientHeaders;
this.isSystemContext = isSystemContext;
this.wrnHeadersSize = wrnHeadersSize;
this.isWrnLmtReached = isWrnLmtReached;
}

/**
Expand Down Expand Up @@ -442,6 +478,19 @@ private ThreadContextStruct putResponseHeaders(Map<String, List<String>> headers

private ThreadContextStruct putResponse(final String key, final String value, final Function<String, String> uniqueValue) {
assert value != null;
long curWrnHeaderSize = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curWrnHeaderSize -> currentWarningHeaderSize

//check if we can add another warning header (max count or size within limits)
if (key.equals("Warning")) {
if (isWrnLmtReached) return this; //can't add warning headers - limit reached
if (maxWrnHeaderCount != -1) { //if count is NOT unbounded, check its limits
int wrnHeaderCount = this.responseHeaders.containsKey("Warning") ? this.responseHeaders.get("Warning").size() : 0;
if (wrnHeaderCount >= maxWrnHeaderCount) return addWrnLmtReachedHeader();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take care, the warning headers are de-duplicated below. The accounting should happen after de-duplication, otherwise we truncate when a warning header would not have been added because it is a duplicate of an existing warning header.

}
if (maxWrnHeaderSize != -1) { //if size is NOT unbounded, check its limits
curWrnHeaderSize = "Warning".getBytes(StandardCharsets.UTF_8).length + value.getBytes(StandardCharsets.UTF_8).length;
if ((wrnHeadersSize + curWrnHeaderSize) > maxWrnHeaderSize) return addWrnLmtReachedHeader();
}
}

final Map<String, List<String>> newResponseHeaders = new HashMap<>(this.responseHeaders);
final List<String> existingValues = newResponseHeaders.get(key);
Expand All @@ -460,8 +509,37 @@ private ThreadContextStruct putResponse(final String key, final String value, fi
} else {
newResponseHeaders.put(key, Collections.singletonList(value));
}
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders,
isSystemContext, wrnHeadersSize + curWrnHeaderSize, isWrnLmtReached);
}

return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext);
//replace last warning header(s) with "headers limit reached" warning
//respecting limitations on headers size if it is set by user
private ThreadContextStruct addWrnLmtReachedHeader(){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user already expects if they have set this setting that warning headers will be truncated. Therefore, I don't think we should do this, losing a warning header (or more) at the expense of a warning header saying that there are more warnings that can be found in the logs. Instead I think that we should log (in the main Elasticsearch log) when we truncate.

if ((maxWrnHeaderSize == 0) || (maxWrnHeaderCount ==0)) //can't even add "headers limit reached" warning
return new ThreadContextStruct(requestHeaders, responseHeaders, transientHeaders,
isSystemContext, wrnHeadersSize, true);
final Map<String, List<String>> newResponseHeaders = new HashMap<>(this.responseHeaders);
final List<String> wrns = new ArrayList<>(newResponseHeaders.get("Warning"));
final String lastWrnMessage = DeprecationLogger.formatWarning(
"There were more warnings, but they were dropped as [" +
HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT.getKey() + "] or [" +
HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE.getKey() + "] were reached!");

if (maxWrnHeaderSize > 0) {
final long wrnSize = "Warning".getBytes(StandardCharsets.UTF_8).length;
wrnHeadersSize = wrnHeadersSize + wrnSize + lastWrnMessage.getBytes(StandardCharsets.UTF_8).length;
do {
String wrn = wrns.remove(wrns.size() - 1);
wrnHeadersSize = wrnHeadersSize - wrnSize - wrn.getBytes(StandardCharsets.UTF_8).length;
} while(wrnHeadersSize > maxWrnHeaderSize);
} else { //we don't care about size as it is unbounded
wrns.remove(wrns.size() - 1);
}
wrns.add(lastWrnMessage);
newResponseHeaders.put("Warning", Collections.unmodifiableList(wrns));
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders,
isSystemContext, wrnHeadersSize, true);
}

private ThreadContextStruct putTransient(String key, Object value) {
Expand Down
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.common.unit.TimeValue;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -88,6 +87,10 @@ public final class HttpTransportSettings {
Setting.byteSizeSetting("http.max_chunk_size", new ByteSizeValue(8, ByteSizeUnit.KB), Property.NodeScope);
public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_HEADER_SIZE =
Setting.byteSizeSetting("http.max_header_size", new ByteSizeValue(8, ByteSizeUnit.KB), Property.NodeScope);
public static final Setting<Integer> SETTING_HTTP_MAX_WARNING_HEADER_COUNT =
Setting.intSetting("http.max_warning_header_count", -1, -1, Setting.Property.Dynamic, Property.NodeScope);
public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_WARNING_HEADER_SIZE =
Setting.byteSizeSetting("http.max_warning_header_size", new ByteSizeValue(-1), Setting.Property.Dynamic, Property.NodeScope);
public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_INITIAL_LINE_LENGTH =
Setting.byteSizeSetting("http.max_initial_line_length", new ByteSizeValue(4, ByteSizeUnit.KB), Property.NodeScope);
// don't reset cookies by default, since I don't think we really need to
Expand Down
8 changes: 8 additions & 0 deletions server/src/main/java/org/elasticsearch/node/Node.java
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.node;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.SetOnce;
Expand Down Expand Up @@ -93,6 +94,7 @@
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -351,6 +353,12 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
listener::onNewInfo);
final UsageService usageService = new UsageService(settings);


clusterService.getClusterSettings().addSettingsUpdateConsumer(HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT,
org.elasticsearch.common.util.concurrent.ThreadContext::setMaxWarningHeaderCount);
clusterService.getClusterSettings().addSettingsUpdateConsumer(HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE,
org.elasticsearch.common.util.concurrent.ThreadContext::setMaxWarningHeaderSize);

ModulesBuilder modules = new ModulesBuilder();
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.createGuiceModules()) {
Expand Down
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.IntStream;
import java.nio.charset.StandardCharsets;

import static org.elasticsearch.common.logging.DeprecationLogger.WARNING_HEADER_PATTERN;
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
Expand Down Expand Up @@ -246,6 +247,60 @@ public void testEncode() {
assertThat(DeprecationLogger.encode(s), IsSame.sameInstance(s));
}


public void testWarningHeaderCountSetting() throws IOException{
// Test that the number of warning headers don't exceed 'http.max_warning_header_count'
final int maxWarningHeaderCount = 2;
Settings settings = Settings.builder()
.put("http.max_warning_header_count", maxWarningHeaderCount)
.build();
try (ThreadContext threadContext = new ThreadContext(settings)) {
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
// try to log three warning messages
logger.deprecated(threadContexts, "A simple message 1");
logger.deprecated(threadContexts, "A simple message 2");
logger.deprecated(threadContexts, "A simple message 3");
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
final List<String> responses = responseHeaders.get("Warning");

assertEquals(maxWarningHeaderCount, responses.size());
assertThat(responses.get(0), warningValueMatcher);
assertThat(responses.get(0), containsString("\"A simple message 1"));
assertThat(responses.get(1), warningValueMatcher);
assertThat(responses.get(1), containsString("\"There were more warnings, but they were dropped as "));
}
}

public void testWarningHeaderSizeSetting() throws IOException{
// Test that the size of warning headers don't exceed 'http.max_warning_header_size'
Settings settings = Settings.builder()
.put("http.max_warning_header_size", "1Kb")
.build();

byte [] arr = new byte[300];
String message1 = new String(arr, StandardCharsets.UTF_8) + "1";
String message2 = new String(arr, StandardCharsets.UTF_8) + "2";
String message3 = new String(arr, StandardCharsets.UTF_8) + "3";

try (ThreadContext threadContext = new ThreadContext(settings)) {
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
// try to log three warning messages
logger.deprecated(threadContexts, message1);
logger.deprecated(threadContexts, message2);
logger.deprecated(threadContexts, message3);
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
final List<String> responses = responseHeaders.get("Warning");

long warningHeadersSize = 0L;
for (String response : responses){
warningHeadersSize += "Warning".getBytes(StandardCharsets.UTF_8).length +
response.getBytes(StandardCharsets.UTF_8).length;
}
// assert that the size of all warning headers is less or equal to 1Kb
assertTrue(warningHeadersSize <= 1024);
}
}

private String range(int lowerInclusive, int upperInclusive) {
return IntStream
.range(lowerInclusive, upperInclusive + 1)
Expand Down