Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new dynamic until_limit option #96233

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/96233.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 96233
summary: Add new dynamic `until_limit` option
area: Mapping
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.GeoBoundingBoxQueryBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.xcontent.XContentBuilder;
Expand All @@ -39,10 +41,13 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -89,18 +94,36 @@ public void testConflictingDynamicMappingsBulk() {
assertTrue(bulkResponse.hasFailures());
}

private static void assertMappingsHaveField(GetMappingsResponse mappings, String index, String field) throws IOException {
MappingMetadata indexMappings = mappings.getMappings().get("index");
assertNotNull(indexMappings);
Map<String, Object> typeMappingsMap = indexMappings.getSourceAsMap();
@SuppressWarnings("unchecked")
Map<String, Object> properties = (Map<String, Object>) typeMappingsMap.get("properties");
assertTrue("Could not find [" + field + "] in " + typeMappingsMap.toString(), properties.containsKey(field));
public void testConcurrentDynamicUpdates() throws Throwable {
int numberOfFieldsToCreate = 32;
Map<String, Object> properties = indexConcurrently(numberOfFieldsToCreate, "true", Settings.builder());
assertThat(properties.size(), equalTo(numberOfFieldsToCreate));
for (int i = 0; i < numberOfFieldsToCreate; i++) {
assertTrue("Could not find [field" + i + "] in " + properties, properties.containsKey("field" + i));
}
}

public void testConcurrentDynamicUpdates() throws Throwable {
createIndex("index");
final Thread[] indexThreads = new Thread[32];
public void testConcurrentDynamicUntilLimitUpdates() throws Throwable {
int numberOfFieldsToCreate = 32;
Map<String, Object> properties = indexConcurrently(
numberOfFieldsToCreate,
"until_limit",
Settings.builder().put(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), numberOfFieldsToCreate)
);
assertThat(properties.size(), equalTo(numberOfFieldsToCreate / 2));
SearchResponse response = client().prepareSearch("index")
.setQuery(new MatchAllQueryBuilder())
.setSize(numberOfFieldsToCreate)
.addFetchField("*")
.get();
long ignoredFields = Arrays.stream(response.getHits().getHits()).filter(hit -> hit.field("_ignored") != null).count();
assertEquals(16, ignoredFields);
}

private Map<String, Object> indexConcurrently(int numberOfFieldsToCreate, String dynamic, Settings.Builder settings) throws Throwable {
client().admin().indices().prepareCreate("index").setSettings(settings).setMapping(Map.of("dynamic", dynamic)).get();
ensureGreen("index");
final Thread[] indexThreads = new Thread[numberOfFieldsToCreate];
final CountDownLatch startLatch = new CountDownLatch(1);
final AtomicReference<Throwable> error = new AtomicReference<>();
for (int i = 0; i < indexThreads.length; ++i) {
Expand Down Expand Up @@ -128,14 +151,17 @@ public void run() {
if (error.get() != null) {
throw error.get();
}
Thread.sleep(2000);
GetMappingsResponse mappings = indicesAdmin().prepareGetMappings("index").get();
for (int i = 0; i < indexThreads.length; ++i) {
assertMappingsHaveField(mappings, "index", "field" + i);
}
for (int i = 0; i < indexThreads.length; ++i) {
client().admin().indices().prepareRefresh("index").get();
for (int i = 0; i < numberOfFieldsToCreate; ++i) {
assertTrue(client().prepareGet("index", Integer.toString(i)).get().isExists());
}
GetMappingsResponse mappings = indicesAdmin().prepareGetMappings("index").get();
MappingMetadata indexMappings = mappings.getMappings().get("index");
assertNotNull(indexMappings);
Map<String, Object> typeMappingsMap = indexMappings.getSourceAsMap();
@SuppressWarnings("unchecked")
Map<String, Object> properties = (Map<String, Object>) typeMappingsMap.get("properties");
return properties;
}

public void testPreflightCheckAvoidsMaster() throws InterruptedException, IOException {
Expand Down Expand Up @@ -226,15 +252,66 @@ public void onFailure(Exception e) {
Exception e = expectThrows(DocumentParsingException.class, () -> indexRequestBuilder.get(TimeValue.timeValueSeconds(10)));
assertThat(e.getMessage(), Matchers.containsString("failed to parse"));
assertThat(e.getCause(), instanceOf(IllegalArgumentException.class));
assertThat(
e.getCause().getMessage(),
Matchers.containsString("Limit of total fields [2] has been exceeded while adding new fields [1]")
);
assertThat(e.getCause().getMessage(), Matchers.containsString("Limit of total fields [2] has been exceeded"));
} finally {
indexingCompletedLatch.countDown();
}
}

public void testDynamicUntilLimitMultiField() throws Exception {
var fields = indexUntilLimit(2, orderedMap("field1", 1, "field2", "text")).getFields();
assertThat(fields.keySet(), equalTo(Set.of("field1", "_ignored")));
assertThat(fields.get("field1").getValues(), equalTo(List.of(1L)));
assertThat(fields.get("_ignored").getValues(), equalTo(List.of("field2")));
}

public void testDynamicUntilLimitObjectField() throws Exception {
var fields = indexUntilLimit(3, orderedMap("a.b", 1, "a.c", 2, "a.d", 3)).getFields();
assertThat(fields.keySet(), equalTo(Set.of("a.b", "a.c", "_ignored")));
assertThat(fields.get("a.b").getValues(), equalTo(List.of(1L)));
assertThat(fields.get("a.c").getValues(), equalTo(List.of(2L)));
assertThat(fields.get("_ignored").getValues(), equalTo(List.of("a.d")));
}

public void testDynamicUntilLimitDottedObjectMultiField() throws Exception {
var fields = indexUntilLimit(4, orderedMap("a.b", "foo", "a.c", 2, "a.d", 3)).getFields();
assertThat(fields.keySet(), equalTo(Set.of("a.b", "a.b.keyword", "a.c", "_ignored")));
assertThat(fields.get("a.b").getValues(), equalTo(List.of("foo")));
assertThat(fields.get("a.b.keyword").getValues(), equalTo(List.of("foo")));
assertThat(fields.get("a.c").getValues(), equalTo(List.of(2L)));
assertThat(fields.get("_ignored").getValues(), equalTo(List.of("a.d")));
}

public void testDynamicUntilLimitObjectMultiField() throws Exception {
var fields = indexUntilLimit(5, orderedMap("a", orderedMap("b", "foo", "c", "bar", "d", 3))).getFields();
assertThat(fields.keySet(), equalTo(Set.of("a.b", "a.b.keyword", "a.c", "a.c.keyword", "_ignored")));
assertThat(fields.get("a.b").getValues(), equalTo(List.of("foo")));
assertThat(fields.get("a.b.keyword").getValues(), equalTo(List.of("foo")));
assertThat(fields.get("a.c").getValues(), equalTo(List.of("bar")));
assertThat(fields.get("a.c.keyword").getValues(), equalTo(List.of("bar")));
assertThat(fields.get("_ignored").getValues(), equalTo(List.of("a.d")));
}

private LinkedHashMap<String, Object> orderedMap(Object... entries) {
var map = new LinkedHashMap<String, Object>();
for (int i = 0; i < entries.length; i += 2) {
map.put((String) entries[i], entries[i + 1]);
}
return map;
}

private SearchHit indexUntilLimit(int fieldLimit, Map<String, Object> source) throws Exception {
client().admin()
.indices()
.prepareCreate("index")
.setSettings(Settings.builder().put(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), fieldLimit).build())
.setMapping(Map.of("dynamic", "until_limit"))
.get();
ensureGreen("index");
client().prepareIndex("index").setId("1").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).setSource(source).get();
return client().prepareSearch("index").setQuery(new MatchAllQueryBuilder()).addFetchField("*").get().getHits().getHits()[0];
}

public void testTotalFieldsLimitWithRuntimeFields() {
Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,15 @@ public <T extends DocWriteRequest<T>> T getRequestToExecute() {
return (T) requestToExecute;
}

/** indicates that the optional mapping updated did not succeed and prepares for a new execution */
public void onOptionalMappingUpdateFailed() {
assert assertInvariants(ItemProcessingState.TRANSLATED);
currentItemState = ItemProcessingState.INITIAL;
requestToExecute = null;
executionResult = null;
assert assertInvariants(ItemProcessingState.INITIAL);
}

/** indicates that the current operation can not be completed and needs to wait for a new mapping from the master */
public void markAsRequiringMappingUpdate() {
assert assertInvariants(ItemProcessingState.TRANSLATED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,23 +361,27 @@ static boolean executeBulkItemRequest(
request.isRetry()
);
}
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE) {

try {
primary.mapperService()
.merge(
MapperService.SINGLE_MAPPING_NAME,
new CompressedXContent(result.getRequiredMappingUpdate()),
new CompressedXContent(result.getMappingUpdate()),
MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT
);
} catch (Exception e) {
logger.info(() -> format("%s mapping update rejected by primary", primary.shardId()), e);
assert result.getId() != null;
onComplete(exceptionToResult(e, primary, isDelete, version, result.getId()), context, updateResult);
if (result.isMappingUpdateOptional()) {
context.onOptionalMappingUpdateFailed();
} else {
logger.info(() -> format("%s mapping update rejected by primary", primary.shardId()), e);
assert result.getId() != null;
onComplete(exceptionToResult(e, primary, isDelete, version, result.getId()), context, updateResult);
}
return true;
}

mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(), new ActionListener<>() {
mappingUpdater.updateMappings(result.getMappingUpdate(), primary.shardId(), new ActionListener<>() {
@Override
public void onResponse(Void v) {
context.markAsRequiringMappingUpdate();
Expand All @@ -397,9 +401,13 @@ public void onFailure(Exception e) {

@Override
public void onFailure(Exception e) {
onComplete(exceptionToResult(e, primary, isDelete, version, result.getId()), context, updateResult);
// Requesting mapping update failed, so we don't have to wait for a cluster state update
assert context.isInitial();
if (result.isMappingUpdateOptional()) {
context.onOptionalMappingUpdateFailed();
} else {
onComplete(exceptionToResult(e, primary, isDelete, version, result.getId()), context, updateResult);
assert context.isInitial();
}
itemDoneListener.onResponse(null);
}
});
Expand Down Expand Up @@ -642,7 +650,7 @@ private static Engine.Result performOpOnReplica(
throw new IllegalStateException("Unexpected request operation type on replica: " + docWriteRequest.opType().getLowercase());
}
}
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE) {
// Even though the primary waits on all nodes to ack the mapping changes to the master
// (see MappingUpdatedAction.updateMappingOnMaster) we still need to protect against missing mappings
// and wait for them. The reason is concurrent requests. Request r1 which has new field f triggers a
Expand All @@ -653,7 +661,7 @@ private static Engine.Result performOpOnReplica(
// applied the new mapping, so there is no other option than to wait.
throw new TransportReplicationAction.RetryOnReplicaException(
replica.shardId(),
"Mappings are not available on the replica yet, triggered update: " + result.getRequiredMappingUpdate()
"Mappings are not available on the replica yet, triggered update: " + result.getMappingUpdate()
);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,10 @@ public static Translog.Location performOnReplica(ResyncReplicationRequest reques
replica.updateMaxUnsafeAutoIdTimestamp(request.getMaxSeenAutoIdTimestampOnPrimary());
for (Translog.Operation operation : request.getOperations()) {
final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA);
if (operationResult.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
if (operationResult.getResultType() == Engine.Result.Type.MAPPING_UPDATE) {
throw new TransportReplicationAction.RetryOnReplicaException(
replica.shardId(),
"Mappings are not available on the replica yet, triggered update: " + operationResult.getRequiredMappingUpdate()
"Mappings are not available on the replica yet, triggered update: " + operationResult.getMappingUpdate()
);
}
location = syncOperationResultOrThrow(operationResult, location);
Expand Down
35 changes: 19 additions & 16 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ public abstract static class Result {
private final long seqNo;
private final Exception failure;
private final SetOnce<Boolean> freeze = new SetOnce<>();
private final Mapping requiredMappingUpdate;
private final Mapping mappingUpdate;
private final boolean mappingUpdateOptional;
private final String id;
private Translog.Location translogLocation;
private long took;
Expand All @@ -397,7 +398,8 @@ protected Result(Operation.TYPE operationType, Exception failure, long version,
this.version = version;
this.term = term;
this.seqNo = seqNo;
this.requiredMappingUpdate = null;
this.mappingUpdate = null;
this.mappingUpdateOptional = false;
this.resultType = Type.FAILURE;
this.id = id;
}
Expand All @@ -408,19 +410,21 @@ protected Result(Operation.TYPE operationType, long version, long term, long seq
this.seqNo = seqNo;
this.term = term;
this.failure = null;
this.requiredMappingUpdate = null;
this.mappingUpdate = null;
this.mappingUpdateOptional = false;
this.resultType = Type.SUCCESS;
this.id = id;
}

protected Result(Operation.TYPE operationType, Mapping requiredMappingUpdate, String id) {
protected Result(Operation.TYPE operationType, Mapping mappingUpdate, boolean mappingUpdateOptional, String id) {
this.operationType = operationType;
this.version = Versions.NOT_FOUND;
this.seqNo = UNASSIGNED_SEQ_NO;
this.term = UNASSIGNED_PRIMARY_TERM;
this.failure = null;
this.requiredMappingUpdate = requiredMappingUpdate;
this.resultType = Type.MAPPING_UPDATE_REQUIRED;
this.mappingUpdate = mappingUpdate;
this.mappingUpdateOptional = mappingUpdateOptional;
this.resultType = Type.MAPPING_UPDATE;
this.id = id;
}

Expand Down Expand Up @@ -451,8 +455,12 @@ public long getTerm() {
* If the operation was aborted due to missing mappings, this method will return the mappings
* that are required to complete the operation.
*/
public Mapping getRequiredMappingUpdate() {
return requiredMappingUpdate;
public Mapping getMappingUpdate() {
return mappingUpdate;
}

public boolean isMappingUpdateOptional() {
return mappingUpdateOptional;
}

/** get the translog location after executing the operation */
Expand Down Expand Up @@ -501,7 +509,7 @@ void freeze() {
public enum Type {
SUCCESS,
FAILURE,
MAPPING_UPDATE_REQUIRED
MAPPING_UPDATE
}
}

Expand All @@ -526,8 +534,8 @@ public IndexResult(Exception failure, long version, long term, long seqNo, Strin
this.created = false;
}

public IndexResult(Mapping requiredMappingUpdate, String id) {
super(Operation.TYPE.INDEX, requiredMappingUpdate, id);
public IndexResult(Mapping mappingUpdate, boolean mappingUpdateOptional, String id) {
super(Operation.TYPE.INDEX, mappingUpdate, mappingUpdateOptional, id);
this.created = false;
}

Expand Down Expand Up @@ -557,11 +565,6 @@ public DeleteResult(Exception failure, long version, long term, long seqNo, bool
this.found = found;
}

public DeleteResult(Mapping requiredMappingUpdate, String id) {
super(Operation.TYPE.DELETE, requiredMappingUpdate, id);
this.found = false;
}

public boolean isFound() {
return found;
}
Expand Down
Loading