Skip to content

Commit

Permalink
better logic on sending mapping update new type introduction
Browse files Browse the repository at this point in the history
when an indexing request introduces a new mapping, today we rely on the parsing logic to mark it as modified on the "first" parsing phase. This can cause sending of mapping updates to master even when the mapping has been introduced in the create index/put mapping case, and can cause sending mapping updates without needing to.
 This bubbled up in the disabled field data format test, where we explicitly define mappings to not have the update mapping behavior happening, yet it still happens because of the current logic, and because in our test we delay the introduction of any mapping updates randomly, it can get in and override updated ones.
closes #6669
  • Loading branch information
kimchy committed Jul 2, 2014
1 parent 2ef21f7 commit 7dc6a07
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 54 deletions.
Expand Up @@ -281,8 +281,6 @@ protected ParseContext initialValue() {

private final Object mappersMutex = new Object();

private boolean initMappersAdded = true;

public DocumentMapper(String index, @Nullable Settings indexSettings, DocumentMapperParser docMapperParser,
RootObjectMapper rootObjectMapper,
ImmutableMap<String, Object> meta,
Expand Down Expand Up @@ -482,11 +480,6 @@ public ParsedDocument parse(SourceToParse source, @Nullable ParseListener listen
parser = XContentHelper.createParser(source.source());
}
context.reset(parser, new ParseContext.Document(), source, listener);
// on a newly created instance of document mapper, we always consider it as new mappers that have been added
if (initMappersAdded) {
context.setMappingsModified();
initMappersAdded = false;
}

// will result in START_OBJECT
int countDownTokens = 0;
Expand Down
13 changes: 9 additions & 4 deletions src/main/java/org/elasticsearch/index/mapper/MapperService.java
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
Expand Down Expand Up @@ -399,10 +400,14 @@ public DocumentMapper documentMapper(String type) {
return mappers.get(type);
}

public DocumentMapper documentMapperWithAutoCreate(String type) {
/**
* Returns the document mapper created, including if the document mapper ended up
* being actually created or not in the second tuple value.
*/
public Tuple<DocumentMapper, Boolean> documentMapperWithAutoCreate(String type) {
DocumentMapper mapper = mappers.get(type);
if (mapper != null) {
return mapper;
return Tuple.tuple(mapper, Boolean.FALSE);
}
if (!dynamic) {
throw new TypeMissingException(index, type, "trying to auto create mapping, but dynamic mapping is disabled");
Expand All @@ -411,10 +416,10 @@ public DocumentMapper documentMapperWithAutoCreate(String type) {
synchronized (typeMutex) {
mapper = mappers.get(type);
if (mapper != null) {
return mapper;
return Tuple.tuple(mapper, Boolean.FALSE);
}
merge(type, null, true);
return mappers.get(type);
return Tuple.tuple(mappers.get(type), Boolean.TRUE);
}
}

Expand Down
19 changes: 19 additions & 0 deletions src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Field;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.index.mapper.ParseContext.Document;

import java.util.List;
Expand Down Expand Up @@ -131,6 +132,24 @@ public boolean mappingsModified() {
return mappingsModified;
}

/**
* latches the mapping to be marked as modified.
*/
public void setMappingsModified() {
this.mappingsModified = true;
}

/**
* Uses the value of get document or create to automatically set if mapping is
* modified or not.
*/
public ParsedDocument setMappingsModified(Tuple<DocumentMapper, Boolean> docMapper) {
if (docMapper.v2()) {
setMappingsModified();
}
return this;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.Lucene;
Expand Down Expand Up @@ -369,9 +370,9 @@ private IndexShardState changeState(IndexShardState newState, String reason) {
@Override
public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates, boolean autoGeneratedId) throws ElasticsearchException {
long startTime = System.nanoTime();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type());
ParsedDocument doc = docMapper.parse(source);
return new Engine.Create(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates, autoGeneratedId);
Tuple<DocumentMapper, Boolean> docMapper = mapperService.documentMapperWithAutoCreate(source.type());
ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
return new Engine.Create(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates, autoGeneratedId);
}

@Override
Expand All @@ -390,9 +391,9 @@ public ParsedDocument create(Engine.Create create) throws ElasticsearchException
@Override
public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates) throws ElasticsearchException {
long startTime = System.nanoTime();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type());
ParsedDocument doc = docMapper.parse(source);
return new Engine.Index(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates);
Tuple<DocumentMapper, Boolean> docMapper = mapperService.documentMapperWithAutoCreate(source.type());
ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
return new Engine.Index(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates);
}

@Override
Expand All @@ -416,7 +417,7 @@ public ParsedDocument index(Engine.Index index) throws ElasticsearchException {
@Override
public Engine.Delete prepareDelete(String type, String id, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException {
long startTime = System.nanoTime();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(type);
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(type).v1();
return new Engine.Delete(type, id, docMapper.uidMapper().term(type, id), version, versionType, origin, startTime, false);
}

Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -276,10 +277,10 @@ private ParsedDocument parseRequest(IndexService documentIndexService, Percolate
}

MapperService mapperService = documentIndexService.mapperService();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(request.documentType());
doc = docMapper.parse(source(parser).type(request.documentType()).flyweight(true));
Tuple<DocumentMapper, Boolean> docMapper = mapperService.documentMapperWithAutoCreate(request.documentType());
doc = docMapper.v1().parse(source(parser).type(request.documentType()).flyweight(true)).setMappingsModified(docMapper);
if (doc.mappingsModified()) {
mappingUpdatedAction.updateMappingOnMaster(request.index(), docMapper, documentIndexService.indexUUID());
mappingUpdatedAction.updateMappingOnMaster(request.index(), docMapper.v1(), documentIndexService.indexUUID());
}
// the document parsing exists the "doc" object, so we need to set the new current field.
currentFieldName = parser.currentName();
Expand Down Expand Up @@ -386,8 +387,8 @@ private ParsedDocument parseFetchedDoc(PercolateContext context, BytesReference
try {
parser = XContentFactory.xContent(fetchedDoc).createParser(fetchedDoc);
MapperService mapperService = documentIndexService.mapperService();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(type);
doc = docMapper.parse(source(parser).type(type).flyweight(true));
Tuple<DocumentMapper, Boolean> docMapper = mapperService.documentMapperWithAutoCreate(type);
doc = docMapper.v1().parse(source(parser).type(type).flyweight(true));

if (context.highlight() != null) {
doc.setSource(fetchedDoc);
Expand Down
Expand Up @@ -19,31 +19,37 @@

package org.elasticsearch.index.fielddata;

import com.google.common.base.Predicate;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.mapper.MapperService.SmartNameFieldMappers;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;

import java.util.Set;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;

@ClusterScope(randomDynamicTemplates = false)
public class DisabledFieldDataFormatTests extends ElasticsearchIntegrationTest {

@Override
protected int numberOfReplicas() {
return 0;
}

public void test() throws Exception {
createIndex("test");
prepareCreate("test").addMapping("type", "s", "type=string").execute().actionGet();
ensureGreen();
logger.info("indexing data start");
for (int i = 0; i < 10; ++i) {
client().prepareIndex("test", "type", Integer.toString(i)).setSource("s", "value" + i).execute().actionGet();
}
logger.info("indexing data end");

final int searchCycles = 20;

refresh();

Expand All @@ -53,55 +59,99 @@ public void test() throws Exception {
SubAggCollectionMode aggCollectionMode = randomFrom(SubAggCollectionMode.values());
SearchResponse resp = null;
// try to run something that relies on field data and make sure that it fails
try {
resp = client().prepareSearch("test").addAggregation(AggregationBuilders.terms("t").field("s")
.collectMode(aggCollectionMode)).execute().actionGet();
assertFailures(resp);
} catch (SearchPhaseExecutionException e) {
// expected
for (int i = 0; i < searchCycles; i++) {
try {
resp = client().prepareSearch("test").setPreference(Integer.toString(i)).addAggregation(AggregationBuilders.terms("t").field("s")
.collectMode(aggCollectionMode)).execute().actionGet();
assertFailures(resp);
} catch (SearchPhaseExecutionException e) {
// expected
}
}

// enable it again
updateFormat("paged_bytes");

// try to run something that relies on field data and make sure that it works
resp = client().prepareSearch("test").addAggregation(AggregationBuilders.terms("t").field("s")
.collectMode(aggCollectionMode)).execute().actionGet();
assertNoFailures(resp);
for (int i = 0; i < searchCycles; i++) {
resp = client().prepareSearch("test").setPreference(Integer.toString(i)).addAggregation(AggregationBuilders.terms("t").field("s")
.collectMode(aggCollectionMode)).execute().actionGet();
assertNoFailures(resp);
}

// disable it again
updateFormat("disabled");

// this time, it should work because segments are already loaded
resp = client().prepareSearch("test").addAggregation(AggregationBuilders.terms("t").field("s")
.collectMode(aggCollectionMode)).execute().actionGet();
assertNoFailures(resp);
for (int i = 0; i < searchCycles; i++) {
resp = client().prepareSearch("test").setPreference(Integer.toString(i)).addAggregation(AggregationBuilders.terms("t").field("s")
.collectMode(aggCollectionMode)).execute().actionGet();
assertNoFailures(resp);
}

// but add more docs and the new segment won't be loaded
client().prepareIndex("test", "type", "-1").setSource("s", "value").execute().actionGet();
refresh();
try {
resp = client().prepareSearch("test").addAggregation(AggregationBuilders.terms("t").field("s")
.collectMode(aggCollectionMode)).execute().actionGet();
assertFailures(resp);
} catch (SearchPhaseExecutionException e) {
// expected
for (int i = 0; i < searchCycles; i++) {
try {
resp = client().prepareSearch("test").setPreference(Integer.toString(i)).addAggregation(AggregationBuilders.terms("t").field("s")
.collectMode(aggCollectionMode)).execute().actionGet();
assertFailures(resp);
} catch (SearchPhaseExecutionException e) {
// expected
}
}
}

private void updateFormat(String format) throws Exception {
private void updateFormat(final String format) throws Exception {
logger.info(">> put mapping start {}", format);
client().admin().indices().preparePutMapping("test").setType("type").setSource(
XContentFactory.jsonBuilder().startObject().startObject("type")
.startObject("properties")
.startObject("s")
.field("type", "string")
.startObject("properties")
.startObject("s")
.field("type", "string")
.startObject("fielddata")
.field("format", format)
.endObject()
.endObject()
.endObject()
.endObject()
.endObject()).execute().actionGet();
.endObject()
.endObject()).execute().actionGet();
logger.info(">> put mapping end {}", format);
boolean applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
try {
Set<String> nodes = internalCluster().nodesInclude("test");
if (nodes.isEmpty()) { // we expect at least one node to hold an index, so wait if not allocated yet
return false;
}
for (String node : nodes) {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexService("test");
if (indexService == null) {
return false;
}
final SmartNameFieldMappers mappers = indexService.mapperService().smartName("s");
if (mappers == null || !mappers.hasMapper()) {
return false;
}
final String currentFormat = mappers.mapper().fieldDataType().getFormat(ImmutableSettings.EMPTY);
if (!format.equals(currentFormat)) {
return false;
}
}
} catch (Exception e) {
logger.info("got exception waiting for concrete mappings", e);
return false;
}
return true;
}
});
logger.info(">> put mapping verified {}, applies {}", format, applied);
if (!applied) {
fail();
}
}

}
Expand Up @@ -176,7 +176,7 @@ public void testDefaultMappingAndNoMappingWithMapperService() throws Exception {
MapperService mapperService = MapperTestUtils.newMapperService();
mapperService.merge(MapperService.DEFAULT_MAPPING, new CompressedString(defaultMapping), true);

DocumentMapper mapper = mapperService.documentMapperWithAutoCreate("my_type");
DocumentMapper mapper = mapperService.documentMapperWithAutoCreate("my_type").v1();
assertThat(mapper.type(), equalTo("my_type"));
assertThat(mapper.sourceMapper().enabled(), equalTo(false));
}
Expand Down

0 comments on commit 7dc6a07

Please sign in to comment.