Skip to content

Commit

Permalink
Update clusterstate if mapping service has local changes
Browse files Browse the repository at this point in the history
If the during percolating a new field was introduced
in the local mapping service, then those changes should
be updated in cluster state of the master as well.

Closes #5776
  • Loading branch information
martijnvg authored and s1monw committed Apr 15, 2014
1 parent 56e4b8b commit 118a404
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 20 deletions.
Expand Up @@ -33,12 +33,15 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.percolate.PercolateShardRequest;
import org.elasticsearch.action.percolate.PercolateShardResponse;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
Expand Down Expand Up @@ -114,11 +117,13 @@ public class PercolatorService extends AbstractComponent {
private final AggregationPhase aggregationPhase;
private final SortParseElement sortParseElement;
private final ScriptService scriptService;
private final MappingUpdatedAction mappingUpdatedAction;

@Inject
public PercolatorService(Settings settings, IndicesService indicesService, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler,
HighlightPhase highlightPhase, ClusterService clusterService, FacetPhase facetPhase,
AggregationPhase aggregationPhase, ScriptService scriptService) {
AggregationPhase aggregationPhase, ScriptService scriptService,
MappingUpdatedAction mappingUpdatedAction) {
super(settings);
this.indicesService = indicesService;
this.cacheRecycler = cacheRecycler;
Expand All @@ -128,6 +133,7 @@ public PercolatorService(Settings settings, IndicesService indicesService, Cache
this.facetPhase = facetPhase;
this.aggregationPhase = aggregationPhase;
this.scriptService = scriptService;
this.mappingUpdatedAction = mappingUpdatedAction;
this.sortParseElement = new SortParseElement();

final long maxReuseBytes = settings.getAsBytesSize("indices.memory.memory_index.size_per_thread", new ByteSizeValue(1, ByteSizeUnit.MB)).bytes();
Expand Down Expand Up @@ -272,6 +278,9 @@ private ParsedDocument parseRequest(IndexService documentIndexService, Percolate
MapperService mapperService = documentIndexService.mapperService();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(request.documentType());
doc = docMapper.parse(source(parser).type(request.documentType()).flyweight(true));
if (doc.mappingsModified()) {
updateMappingOnMaster(docMapper, request, documentIndexService.indexUUID());
}
// the document parsing exists the "doc" object, so we need to set the new current field.
currentFieldName = parser.currentName();
}
Expand Down Expand Up @@ -830,6 +839,31 @@ public InternalAggregations reducedAggregations() {
}
}

// TODO: maybe move this logic into MappingUpdatedAction? There is similar logic for the index and bulk api now.
private void updateMappingOnMaster(DocumentMapper documentMapper, final PercolateShardRequest request, String indexUUID) {
// we generate the order id before we get the mapping to send and refresh the source, so
// if 2 happen concurrently, we know that the later order will include the previous one
long orderId = mappingUpdatedAction.generateNextMappingUpdateOrder();
documentMapper.refreshSource();
DiscoveryNode node = clusterService.localNode();
final MappingUpdatedAction.MappingUpdatedRequest mappingRequest = new MappingUpdatedAction.MappingUpdatedRequest(
request.index(), indexUUID, request.documentType(), documentMapper.mappingSource(), orderId, node != null ? node.id() : null
);
logger.trace("Sending mapping updated to master: {}", mappingRequest);
mappingUpdatedAction.execute(mappingRequest, new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
@Override
public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
// all is well
logger.debug("Successfully updated master with mapping update: {}", mappingRequest);
}

@Override
public void onFailure(Throwable e) {
logger.warn("Failed to update master on updated mapping for {}", e, mappingRequest);
}
});
}

private InternalFacets reduceFacets(List<PercolateShardResponse> shardResults) {
if (shardResults.get(0).facets() == null) {
return null;
Expand Down
132 changes: 113 additions & 19 deletions src/test/java/org/elasticsearch/percolator/PercolatorTests.java
Expand Up @@ -21,6 +21,7 @@
import com.google.common.base.Predicate;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
Expand All @@ -31,6 +32,7 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.ImmutableSettings.Builder;
Expand Down Expand Up @@ -145,19 +147,7 @@ public void testSimple1() throws Exception {

@Test
public void testSimple2() throws Exception {
client().admin().indices().prepareCreate("index").setSettings(
ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build()
).execute().actionGet();
client().admin().indices().prepareCreate("test").setSettings(
ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build()
).execute().actionGet();
ensureGreen();
createIndex("index", "test");

// introduce the doc
XContentBuilder doc = XContentFactory.jsonBuilder().startObject().startObject("doc")
Expand Down Expand Up @@ -1604,13 +1594,18 @@ public void testDeletePercolatorType() throws Exception {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings("test1", "test2").get();
boolean hasPercolatorType = getMappingsResponse.getMappings().get("test1").containsKey(PercolatorService.TYPE_NAME);
if (hasPercolatorType) {
return getMappingsResponse.getMappings().get("test2").containsKey(PercolatorService.TYPE_NAME);
} else {
return false;
for (Client client : cluster()) {
GetMappingsResponse getMappingsResponse = client.admin().indices().prepareGetMappings("test1", "test2").get();
boolean hasPercolatorType = getMappingsResponse.getMappings().get("test1").containsKey(PercolatorService.TYPE_NAME);
if (!hasPercolatorType) {
return false;
}

if (!getMappingsResponse.getMappings().get("test2").containsKey(PercolatorService.TYPE_NAME)) {
return false;
}
}
return true;
}
});

Expand Down Expand Up @@ -1667,4 +1662,103 @@ public void percolateNonMatchingConstantScoreQuery() throws Exception {
assertMatchCount(percolate, 0l);
}

@Test
public void testPercolationWithDynamicTemplates() throws Exception {
assertAcked(prepareCreate("idx").addMapping("type", jsonBuilder().startObject().startObject("type")
.field("dynamic", false)
.startObject("properties")
.startObject("custom")
.field("dynamic", true)
.field("type", "object")
.field("incude_in_all", false)
.endObject()
.endObject()
.startArray("dynamic_template")
.startObject()
.startObject("custom_fields")
.field("path_match", "custom.*")
.startObject("mapping")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject()
.endArray()
.endObject().endObject()));

client().prepareIndex("idx", PercolatorService.TYPE_NAME, "1")
.setSource(jsonBuilder().startObject().field("query", QueryBuilders.queryString("color:red")).endObject())
.get();
client().prepareIndex("idx", PercolatorService.TYPE_NAME, "2")
.setSource(jsonBuilder().startObject().field("query", QueryBuilders.queryString("color:blue")).endObject())
.get();

PercolateResponse percolateResponse = client().preparePercolate().setDocumentType("type")
.setPercolateDoc(new PercolateSourceBuilder.DocBuilder().setDoc(jsonBuilder().startObject().startObject("custom").field("color", "blue").endObject().endObject()))
.get();

assertMatchCount(percolateResponse, 0l);
assertThat(percolateResponse.getMatches(), arrayWithSize(0));

// wait until the mapping change has propagated from the percolate request
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet();
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get();
return pendingTasks.pendingTasks().isEmpty();
}
});
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet();

// The previous percolate request introduced the custom.color field, so now we register the query again
// and the field name `color` will be resolved to `custom.color` field in mapping via smart field mapping resolving.
client().prepareIndex("idx", PercolatorService.TYPE_NAME, "2")
.setSource(jsonBuilder().startObject().field("query", QueryBuilders.queryString("color:blue")).endObject())
.get();

// The second request will yield a match, since the query during the proper field during parsing.
percolateResponse = client().preparePercolate().setDocumentType("type")
.setPercolateDoc(new PercolateSourceBuilder.DocBuilder().setDoc(jsonBuilder().startObject().startObject("custom").field("color", "blue").endObject().endObject()))
.get();

assertMatchCount(percolateResponse, 1l);
assertThat(percolateResponse.getMatches()[0].getId().string(), equalTo("2"));
}

@Test
public void testUpdateMappingDynamicallyWhilePercolating() throws Exception {
createIndex("test");

// percolation source
XContentBuilder percolateDocumentSource = XContentFactory.jsonBuilder().startObject().startObject("doc")
.field("field1", 1)
.field("field2", "value")
.endObject().endObject();

PercolateResponse response = client().preparePercolate()
.setIndices("test").setDocumentType("type1")
.setSource(percolateDocumentSource).execute().actionGet();
assertMatchCount(response, 0l);
assertThat(response.getMatches(), arrayWithSize(0));

// wait until the mapping change has propagated
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet();
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get();
return pendingTasks.pendingTasks().isEmpty();
}
});
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet();

GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings("test").get();
assertThat(mappingsResponse.getMappings().get("test"), notNullValue());
assertThat(mappingsResponse.getMappings().get("test").get("type1"), notNullValue());
assertThat(mappingsResponse.getMappings().get("test").get("type1").getSourceAsMap().isEmpty(), is(false));
Map<String, Object> properties = (Map<String, Object>) mappingsResponse.getMappings().get("test").get("type1").getSourceAsMap().get("properties");
assertThat(((Map<String, String>) properties.get("field1")).get("type"), equalTo("long"));
assertThat(((Map<String, String>) properties.get("field2")).get("type"), equalTo("string"));
}

}

0 comments on commit 118a404

Please sign in to comment.