Skip to content

Commit

Permalink
Block too many concurrent mapping updates (#51038)
Browse files Browse the repository at this point in the history
Ensures that there are not too many concurrent dynamic mapping updates going out from the
data nodes to the master.

Closes #50670
  • Loading branch information
ywelsch committed Jan 15, 2020
1 parent ac26392 commit 7b98184
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;

import java.util.concurrent.Semaphore;

/**
* Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated
* in the cluster state meta data (and broadcast to all members).
Expand All @@ -47,19 +50,30 @@ public class MappingUpdatedAction {
Setting.positiveTimeSetting("indices.mapping.dynamic_timeout", TimeValue.timeValueSeconds(30),
Property.Dynamic, Property.NodeScope);

public static final Setting<Integer> INDICES_MAX_IN_FLIGHT_UPDATES_SETTING =
Setting.intSetting("indices.mapping.max_in_flight_updates", 10, 1, 1000,
Property.Dynamic, Property.NodeScope);

private IndicesAdminClient client;
private volatile TimeValue dynamicMappingUpdateTimeout;
private final AdjustableSemaphore semaphore;

@Inject
public MappingUpdatedAction(Settings settings, ClusterSettings clusterSettings) {
this.dynamicMappingUpdateTimeout = INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING.get(settings);
this.semaphore = new AdjustableSemaphore(INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.get(settings), true);
clusterSettings.addSettingsUpdateConsumer(INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING, this::setDynamicMappingUpdateTimeout);
clusterSettings.addSettingsUpdateConsumer(INDICES_MAX_IN_FLIGHT_UPDATES_SETTING, this::setMaxInFlightUpdates);
}

private void setDynamicMappingUpdateTimeout(TimeValue dynamicMappingUpdateTimeout) {
this.dynamicMappingUpdateTimeout = dynamicMappingUpdateTimeout;
}

private void setMaxInFlightUpdates(int maxInFlightUpdates) {
semaphore.setMaxPermits(maxInFlightUpdates);
}

public void setClient(Client client) {
this.client = client.admin().indices();
}
Expand All @@ -74,7 +88,34 @@ public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdat
if (type.equals(MapperService.DEFAULT_MAPPING)) {
throw new IllegalArgumentException("_default_ mapping should not be updated");
}
client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON)

final RunOnce release = new RunOnce(() -> semaphore.release());
try {
semaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
listener.onFailure(e);
return;
}
boolean successFullySent = false;
try {
sendUpdateMapping(index, type, mappingUpdate, ActionListener.runBefore(listener, release::run));
successFullySent = true;
} finally {
if (successFullySent == false) {
release.run();
}
}
}

// used by tests
int blockedThreads() {
return semaphore.getQueueLength();
}

// can be overridden by tests
protected void sendUpdateMapping(Index index, String type, Mapping mappingUpdate, ActionListener<Void> listener) {
client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON)
.setMasterNodeTimeout(dynamicMappingUpdateTimeout).setTimeout(TimeValue.ZERO)
.execute(new ActionListener<AcknowledgedResponse>() {
@Override
Expand All @@ -101,4 +142,30 @@ private static RuntimeException unwrapEsException(ElasticsearchException esEx) {
}
return new UncategorizedExecutionException("Failed execution", root);
}

static class AdjustableSemaphore extends Semaphore {

private final Object maxPermitsMutex = new Object();
private int maxPermits;

AdjustableSemaphore(int maxPermits, boolean fair) {
super(maxPermits, fair);
this.maxPermits = maxPermits;
}

void setMaxPermits(int permits) {
synchronized (maxPermitsMutex) {
final int diff = Math.subtractExact(permits, maxPermits);
if (diff > 0) {
// add permits
release(diff);
} else if (diff < 0) {
// remove permits
reducePermits(Math.negateExact(diff));
}

maxPermits = permits;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesQueryCache.INDICES_CACHE_QUERY_COUNT_SETTING,
IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING,
MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING,
MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING,
MetaData.SETTING_READ_ONLY_SETTING,
MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING,
MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.action.index;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction.AdjustableSemaphore;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.test.ESTestCase;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class MappingUpdatedActionTests extends ESTestCase {

public void testAdjustableSemaphore() {
AdjustableSemaphore sem = new AdjustableSemaphore(1, randomBoolean());
assertEquals(1, sem.availablePermits());
assertTrue(sem.tryAcquire());
assertEquals(0, sem.availablePermits());
assertFalse(sem.tryAcquire());
assertEquals(0, sem.availablePermits());

// increase the number of max permits to 2
sem.setMaxPermits(2);
assertEquals(1, sem.availablePermits());
assertTrue(sem.tryAcquire());
assertEquals(0, sem.availablePermits());

// release all current permits
sem.release();
assertEquals(1, sem.availablePermits());
sem.release();
assertEquals(2, sem.availablePermits());

// reduce number of max permits to 1
sem.setMaxPermits(1);
assertEquals(1, sem.availablePermits());
// set back to 2
sem.setMaxPermits(2);
assertEquals(2, sem.availablePermits());

// take both permits and reduce max permits
assertTrue(sem.tryAcquire());
assertTrue(sem.tryAcquire());
assertEquals(0, sem.availablePermits());
assertFalse(sem.tryAcquire());
sem.setMaxPermits(1);
assertEquals(-1, sem.availablePermits());
assertFalse(sem.tryAcquire());

// release one permit
sem.release();
assertEquals(0, sem.availablePermits());
assertFalse(sem.tryAcquire());

// release second permit
sem.release();
assertEquals(1, sem.availablePermits());
assertTrue(sem.tryAcquire());
}

public void testMappingUpdatedActionBlocks() throws Exception {
List<ActionListener<Void>> inFlightListeners = new CopyOnWriteArrayList<>();
final MappingUpdatedAction mua = new MappingUpdatedAction(Settings.builder()
.put(MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.getKey(), 1).build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) {

@Override
protected void sendUpdateMapping(Index index, String type, Mapping mappingUpdate, ActionListener<Void> listener) {
inFlightListeners.add(listener);
}
};

PlainActionFuture<Void> fut1 = new PlainActionFuture<>();
mua.updateMappingOnMaster(null, "test", null, fut1);
assertEquals(1, inFlightListeners.size());
assertEquals(0, mua.blockedThreads());

PlainActionFuture<Void> fut2 = new PlainActionFuture<>();
Thread thread = new Thread(() -> {
mua.updateMappingOnMaster(null, "test", null, fut2); // blocked
});
thread.start();
assertBusy(() -> assertEquals(1, mua.blockedThreads()));

assertEquals(1, inFlightListeners.size());
assertFalse(fut1.isDone());
inFlightListeners.remove(0).onResponse(null);
assertTrue(fut1.isDone());

thread.join();
assertEquals(0, mua.blockedThreads());
assertEquals(1, inFlightListeners.size());
assertFalse(fut2.isDone());
inFlightListeners.remove(0).onResponse(null);
assertTrue(fut2.isDone());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ private Environment createEnvironment(String nodeName) {
.put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo").toAbsolutePath())
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY))
.put(MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.getKey(), 1000) // o.w. some tests might block
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,8 @@ private static Settings getRandomNodeSettings(long seed) {
if (random.nextBoolean()) {
builder.put(MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING.getKey(),
timeValueSeconds(RandomNumbers.randomIntBetween(random, 10, 30)).getStringRep());
builder.put(MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.getKey(),
RandomNumbers.randomIntBetween(random, 1, 10));
}

// turning on the real memory circuit breaker leads to spurious test failures. As have no full control over heap usage, we
Expand Down

0 comments on commit 7b98184

Please sign in to comment.