Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add settings to control size and count of warning headers in responses #28427

2 changes: 1 addition & 1 deletion docs/reference/modules/cluster/misc.asciidoc
Expand Up @@ -82,4 +82,4 @@ Enable or disable allocation for persistent tasks:
This setting does not affect the persistent tasks that are already being executed.
Only newly created persistent tasks, or tasks that must be reassigned (after a node
left the cluster, for example), are impacted by this setting.
--
--
8 changes: 7 additions & 1 deletion docs/reference/modules/http.asciidoc
Expand Up @@ -20,7 +20,7 @@ http://en.wikipedia.org/wiki/Chunked_transfer_encoding[HTTP chunking].

The settings in the table below can be configured for HTTP. Note that none of
them are dynamically updatable so for them to take effect they should be set in
`elasticsearch.yml`.
the Elasticsearch <<settings, configuration file>>.

[cols="<,<",options="header",]
|=======================================================================
Expand Down Expand Up @@ -100,6 +100,12 @@ simple message will be returned. Defaults to `true`

|`http.pipelining.max_events` |The maximum number of events to be queued up in memory before a HTTP connection is closed, defaults to `10000`.

|`http.max_warning_header_count` |The maximum number of warning headers in
client HTTP responses, defaults to unbounded.

|`http.max_warning_header_size` |The maximum total size of warning headers in
client HTTP responses, defaults to unbounded.

|=======================================================================

It also uses the common
Expand Down
Expand Up @@ -245,6 +245,8 @@ public void apply(Settings value, Settings current, Settings previous) {
HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH,
HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE,
HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE,
HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT,
HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE,
HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH,
HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT,
HttpTransportSettings.SETTING_HTTP_RESET_COOKIES,
Expand Down
Expand Up @@ -23,10 +23,16 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.http.HttpTransportSettings;

import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -39,13 +45,14 @@
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.nio.charset.StandardCharsets;


/**
* A ThreadContext is a map of string headers and a transient map of keyed objects that are associated with
Expand Down Expand Up @@ -81,6 +88,8 @@ public final class ThreadContext implements Closeable, Writeable {
private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct();
private final Map<String, String> defaultHeader;
private final ContextThreadLocal threadLocal;
private final int maxWarningHeaderCount;
private final long maxWarningHeaderSize;

/**
* Creates a new ThreadContext instance
Expand All @@ -98,6 +107,8 @@ public ThreadContext(Settings settings) {
this.defaultHeader = Collections.unmodifiableMap(defaultHeader);
}
threadLocal = new ContextThreadLocal();
this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings);
this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes();
}

@Override
Expand Down Expand Up @@ -282,7 +293,7 @@ public void addResponseHeader(final String key, final String value) {
* @param uniqueValue the function that produces de-duplication values
*/
public void addResponseHeader(final String key, final String value, final Function<String, String> uniqueValue) {
threadLocal.set(threadLocal.get().putResponse(key, value, uniqueValue));
threadLocal.set(threadLocal.get().putResponse(key, value, uniqueValue, maxWarningHeaderCount, maxWarningHeaderSize));
}

/**
Expand Down Expand Up @@ -359,7 +370,7 @@ private static final class ThreadContextStruct {
private final Map<String, Object> transientHeaders;
private final Map<String, List<String>> responseHeaders;
private final boolean isSystemContext;

private long warningHeadersSize; //saving current warning headers' size not to recalculate the size with every new warning header
private ThreadContextStruct(StreamInput in) throws IOException {
final int numRequest = in.readVInt();
Map<String, String> requestHeaders = numRequest == 0 ? Collections.emptyMap() : new HashMap<>(numRequest);
Expand All @@ -371,6 +382,7 @@ private ThreadContextStruct(StreamInput in) throws IOException {
this.responseHeaders = in.readMapOfLists(StreamInput::readString, StreamInput::readString);
this.transientHeaders = Collections.emptyMap();
isSystemContext = false; // we never serialize this it's a transient flag
this.warningHeadersSize = 0L;
}

private ThreadContextStruct setSystemContext() {
Expand All @@ -387,6 +399,18 @@ private ThreadContextStruct(Map<String, String> requestHeaders,
this.responseHeaders = responseHeaders;
this.transientHeaders = transientHeaders;
this.isSystemContext = isSystemContext;
this.warningHeadersSize = 0L;
}

private ThreadContextStruct(Map<String, String> requestHeaders,
Map<String, List<String>> responseHeaders,
Map<String, Object> transientHeaders, boolean isSystemContext,
long warningHeadersSize) {
this.requestHeaders = requestHeaders;
this.responseHeaders = responseHeaders;
this.transientHeaders = transientHeaders;
this.isSystemContext = isSystemContext;
this.warningHeadersSize = warningHeadersSize;
}

/**
Expand Down Expand Up @@ -440,30 +464,58 @@ private ThreadContextStruct putResponseHeaders(Map<String, List<String>> headers
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext);
}

private ThreadContextStruct putResponse(final String key, final String value, final Function<String, String> uniqueValue) {
private ThreadContextStruct putResponse(final String key, final String value, final Function<String, String> uniqueValue,
final int maxWarningHeaderCount, final long maxWarningHeaderSize) {
assert value != null;
long newWarningHeaderSize = warningHeadersSize;
//check if we can add another warning header - if max size within limits
if (key.equals("Warning") && (maxWarningHeaderSize != -1)) { //if size is NOT unbounded, check its limits
if (warningHeadersSize > maxWarningHeaderSize) { // if max size has already been reached before
final String message = "Dropping a warning header, as their total size reached the maximum allowed of [" +
maxWarningHeaderSize + "] bytes set in [" +
HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE.getKey() + "]!";
ESLoggerFactory.getLogger(ThreadContext.class).warn(message);
return this;
}
newWarningHeaderSize += "Warning".getBytes(StandardCharsets.UTF_8).length + value.getBytes(StandardCharsets.UTF_8).length;
if (newWarningHeaderSize > maxWarningHeaderSize) {
final String message = "Dropping a warning header, as their total size reached the maximum allowed of [" +
maxWarningHeaderSize + "] bytes set in [" +
HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE.getKey() + "]!";
ESLoggerFactory.getLogger(ThreadContext.class).warn(message);
return new ThreadContextStruct(requestHeaders, responseHeaders, transientHeaders, isSystemContext, newWarningHeaderSize);
}
}

final Map<String, List<String>> newResponseHeaders = new HashMap<>(this.responseHeaders);
final List<String> existingValues = newResponseHeaders.get(key);

if (existingValues != null) {
final Set<String> existingUniqueValues = existingValues.stream().map(uniqueValue).collect(Collectors.toSet());
assert existingValues.size() == existingUniqueValues.size();
if (existingUniqueValues.contains(uniqueValue.apply(value))) {
return this;
}

final List<String> newValues = new ArrayList<>(existingValues);
newValues.add(value);

newResponseHeaders.put(key, Collections.unmodifiableList(newValues));
} else {
newResponseHeaders.put(key, Collections.singletonList(value));
}

return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext);
//check if we can add another warning header - if max count within limits
if ((key.equals("Warning")) && (maxWarningHeaderCount != -1)) { //if count is NOT unbounded, check its limits
final int warningHeaderCount = newResponseHeaders.containsKey("Warning") ? newResponseHeaders.get("Warning").size() : 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a problem here. Imagine that maxWarningHeaderCount is set (not equal to -1) and then a user dynamically sets it to unbounded before the if statement on the following line (i.e., after we have passed the set condition). Then we will see the updated value and warningHeaderCount will always be greater than maxWarningHeaderCount. We want to avoid these sorts of race conditions.

if (warningHeaderCount > maxWarningHeaderCount) {
final String message = "Dropping a warning header, as their total count reached the maximum allowed of [" +
maxWarningHeaderCount + "] set in [" + HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT.getKey() + "]!";
ESLoggerFactory.getLogger(ThreadContext.class).warn(message);
return this;
}
}
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext, newWarningHeaderSize);
}


private ThreadContextStruct putTransient(String key, Object value) {
Map<String, Object> newTransient = new HashMap<>(this.transientHeaders);
if (newTransient.putIfAbsent(key, value) != null) {
Expand Down
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.common.unit.TimeValue;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -93,6 +92,10 @@ public final class HttpTransportSettings {
Setting.byteSizeSetting("http.max_chunk_size", new ByteSizeValue(8, ByteSizeUnit.KB), Property.NodeScope);
public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_HEADER_SIZE =
Setting.byteSizeSetting("http.max_header_size", new ByteSizeValue(8, ByteSizeUnit.KB), Property.NodeScope);
public static final Setting<Integer> SETTING_HTTP_MAX_WARNING_HEADER_COUNT =
Setting.intSetting("http.max_warning_header_count", -1, -1, Property.NodeScope);
public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_WARNING_HEADER_SIZE =
Setting.byteSizeSetting("http.max_warning_header_size", new ByteSizeValue(-1), Property.NodeScope);
public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_INITIAL_LINE_LENGTH =
Setting.byteSizeSetting("http.max_initial_line_length", new ByteSizeValue(4, ByteSizeUnit.KB), Property.NodeScope);
// don't reset cookies by default, since I don't think we really need to
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/org/elasticsearch/node/Node.java
Expand Up @@ -93,6 +93,7 @@
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
Expand Down
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.IntStream;
import java.nio.charset.StandardCharsets;

import static org.elasticsearch.common.logging.DeprecationLogger.WARNING_HEADER_PATTERN;
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
Expand Down Expand Up @@ -246,6 +247,60 @@ public void testEncode() {
assertThat(DeprecationLogger.encode(s), IsSame.sameInstance(s));
}


public void testWarningHeaderCountSetting() throws IOException{
// Test that the number of warning headers don't exceed 'http.max_warning_header_count'
final int maxWarningHeaderCount = 2;
Settings settings = Settings.builder()
.put("http.max_warning_header_count", maxWarningHeaderCount)
.build();
try (ThreadContext threadContext = new ThreadContext(settings)) {
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
// try to log three warning messages
logger.deprecated(threadContexts, "A simple message 1");
logger.deprecated(threadContexts, "A simple message 2");
logger.deprecated(threadContexts, "A simple message 3");
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
final List<String> responses = responseHeaders.get("Warning");

assertEquals(maxWarningHeaderCount, responses.size());
assertThat(responses.get(0), warningValueMatcher);
assertThat(responses.get(0), containsString("\"A simple message 1"));
assertThat(responses.get(1), warningValueMatcher);
assertThat(responses.get(1), containsString("\"A simple message 2"));
}
}

public void testWarningHeaderSizeSetting() throws IOException{
// Test that the size of warning headers don't exceed 'http.max_warning_header_size'
Settings settings = Settings.builder()
.put("http.max_warning_header_size", "1Kb")
.build();

byte [] arr = new byte[300];
String message1 = new String(arr, StandardCharsets.UTF_8) + "1";
String message2 = new String(arr, StandardCharsets.UTF_8) + "2";
String message3 = new String(arr, StandardCharsets.UTF_8) + "3";

try (ThreadContext threadContext = new ThreadContext(settings)) {
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
// try to log three warning messages
logger.deprecated(threadContexts, message1);
logger.deprecated(threadContexts, message2);
logger.deprecated(threadContexts, message3);
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
final List<String> responses = responseHeaders.get("Warning");

long warningHeadersSize = 0L;
for (String response : responses){
warningHeadersSize += "Warning".getBytes(StandardCharsets.UTF_8).length +
response.getBytes(StandardCharsets.UTF_8).length;
}
// assert that the size of all warning headers is less or equal to 1Kb
assertTrue(warningHeadersSize <= 1024);
}
}

private String range(int lowerInclusive, int upperInclusive) {
return IntStream
.range(lowerInclusive, upperInclusive + 1)
Expand Down