Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate percolate mapping changes to cluster state #5776

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -31,12 +31,15 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.percolate.PercolateShardRequest;
import org.elasticsearch.action.percolate.PercolateShardResponse;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
Expand Down Expand Up @@ -115,14 +118,16 @@ public class PercolatorService extends AbstractComponent {
private final AggregationPhase aggregationPhase;
private final SortParseElement sortParseElement;
private final ScriptService scriptService;
private final MappingUpdatedAction mappingUpdatedAction;

private final CloseableThreadLocal<MemoryIndex> cache;

@Inject
public PercolatorService(Settings settings, IndicesService indicesService, CacheRecycler cacheRecycler,
PageCacheRecycler pageCacheRecycler, BigArrays bigArrays,
HighlightPhase highlightPhase, ClusterService clusterService, FacetPhase facetPhase,
AggregationPhase aggregationPhase, ScriptService scriptService) {
AggregationPhase aggregationPhase, ScriptService scriptService,
MappingUpdatedAction mappingUpdatedAction) {
super(settings);
this.indicesService = indicesService;
this.cacheRecycler = cacheRecycler;
Expand All @@ -133,6 +138,7 @@ public PercolatorService(Settings settings, IndicesService indicesService, Cache
this.facetPhase = facetPhase;
this.aggregationPhase = aggregationPhase;
this.scriptService = scriptService;
this.mappingUpdatedAction = mappingUpdatedAction;
this.sortParseElement = new SortParseElement();

final long maxReuseBytes = settings.getAsBytesSize("indices.memory.memory_index.size_per_thread", new ByteSizeValue(1, ByteSizeUnit.MB)).bytes();
Expand Down Expand Up @@ -269,6 +275,9 @@ private ParsedDocument parseRequest(IndexService documentIndexService, Percolate
MapperService mapperService = documentIndexService.mapperService();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(request.documentType());
doc = docMapper.parse(source(parser).type(request.documentType()).flyweight(true));
if (doc.mappingsModified()) {
updateMappingOnMaster(docMapper, request, documentIndexService.indexUUID());
}
// the document parsing exists the "doc" object, so we need to set the new current field.
currentFieldName = parser.currentName();
}
Expand Down Expand Up @@ -827,6 +836,31 @@ public InternalAggregations reducedAggregations() {
}
}

// TODO: maybe move this logic into MappingUpdatedAction? There is similar logic for the index and bulk api now.
private void updateMappingOnMaster(DocumentMapper documentMapper, final PercolateShardRequest request, String indexUUID) {
// we generate the order id before we get the mapping to send and refresh the source, so
// if 2 happen concurrently, we know that the later order will include the previous one
long orderId = mappingUpdatedAction.generateNextMappingUpdateOrder();
documentMapper.refreshSource();
DiscoveryNode node = clusterService.localNode();
final MappingUpdatedAction.MappingUpdatedRequest mappingRequest = new MappingUpdatedAction.MappingUpdatedRequest(
request.index(), indexUUID, request.documentType(), documentMapper.mappingSource(), orderId, node != null ? node.id() : null
);
logger.trace("Sending mapping updated to master: {}", mappingRequest);
mappingUpdatedAction.execute(mappingRequest, new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
@Override
public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
// all is well
logger.debug("Successfully updated master with mapping update: {}", mappingRequest);
}

@Override
public void onFailure(Throwable e) {
logger.warn("Failed to update master on updated mapping for {}", e, mappingRequest);
}
});
}

private InternalFacets reduceFacets(List<PercolateShardResponse> shardResults) {
if (shardResults.get(0).facets() == null) {
return null;
Expand Down
135 changes: 113 additions & 22 deletions src/test/java/org/elasticsearch/percolator/PercolatorTests.java
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
Expand All @@ -32,8 +33,8 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.ImmutableSettings.Builder;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -150,21 +151,7 @@ public void testSimple1() throws Exception {

@Test
public void testSimple2() throws Exception {

//TODO this test seems to have problems with more shards and/or 1 replica instead of 0
client().admin().indices().prepareCreate("index").setSettings(
ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build()
).execute().actionGet();
client().admin().indices().prepareCreate("test").setSettings(
ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build()
).execute().actionGet();
ensureGreen();
createIndex("index", "test");

// introduce the doc
XContentBuilder doc = XContentFactory.jsonBuilder().startObject().startObject("doc")
Expand Down Expand Up @@ -1595,13 +1582,18 @@ public void testDeletePercolatorType() throws Exception {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings("test1", "test2").get();
boolean hasPercolatorType = getMappingsResponse.getMappings().get("test1").containsKey(PercolatorService.TYPE_NAME);
if (hasPercolatorType) {
return getMappingsResponse.getMappings().get("test2").containsKey(PercolatorService.TYPE_NAME);
} else {
return false;
for (Client client : cluster()) {
GetMappingsResponse getMappingsResponse = client.admin().indices().prepareGetMappings("test1", "test2").get();
boolean hasPercolatorType = getMappingsResponse.getMappings().get("test1").containsKey(PercolatorService.TYPE_NAME);
if (!hasPercolatorType) {
return false;
}

if (!getMappingsResponse.getMappings().get("test2").containsKey(PercolatorService.TYPE_NAME)) {
return false;
}
}
return true;
}
});

Expand Down Expand Up @@ -1681,6 +1673,105 @@ public void testNestedPercolationOnExistingDoc() throws IOException {
assertEquals(response.getMatches()[0].getId().string(), "Q");
}

@Test
public void testPercolationWithDynamicTemplates() throws Exception {
assertAcked(prepareCreate("idx").addMapping("type", jsonBuilder().startObject().startObject("type")
.field("dynamic", false)
.startObject("properties")
.startObject("custom")
.field("dynamic", true)
.field("type", "object")
.field("incude_in_all", false)
.endObject()
.endObject()
.startArray("dynamic_template")
.startObject()
.startObject("custom_fields")
.field("path_match", "custom.*")
.startObject("mapping")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject()
.endArray()
.endObject().endObject()));

client().prepareIndex("idx", PercolatorService.TYPE_NAME, "1")
.setSource(jsonBuilder().startObject().field("query", QueryBuilders.queryString("color:red")).endObject())
.get();
client().prepareIndex("idx", PercolatorService.TYPE_NAME, "2")
.setSource(jsonBuilder().startObject().field("query", QueryBuilders.queryString("color:blue")).endObject())
.get();

PercolateResponse percolateResponse = client().preparePercolate().setDocumentType("type")
.setPercolateDoc(new PercolateSourceBuilder.DocBuilder().setDoc(jsonBuilder().startObject().startObject("custom").field("color", "blue").endObject().endObject()))
.get();

assertMatchCount(percolateResponse, 0l);
assertThat(percolateResponse.getMatches(), arrayWithSize(0));

// wait until the mapping change has propagated from the percolate request
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet();
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get();
return pendingTasks.pendingTasks().isEmpty();
}
});
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet();

// The previous percolate request introduced the custom.color field, so now we register the query again
// and the field name `color` will be resolved to `custom.color` field in mapping via smart field mapping resolving.
client().prepareIndex("idx", PercolatorService.TYPE_NAME, "2")
.setSource(jsonBuilder().startObject().field("query", QueryBuilders.queryString("color:blue")).endObject())
.get();

// The second request will yield a match, since the query during the proper field during parsing.
percolateResponse = client().preparePercolate().setDocumentType("type")
.setPercolateDoc(new PercolateSourceBuilder.DocBuilder().setDoc(jsonBuilder().startObject().startObject("custom").field("color", "blue").endObject().endObject()))
.get();

assertMatchCount(percolateResponse, 1l);
assertThat(percolateResponse.getMatches()[0].getId().string(), equalTo("2"));
}

@Test
public void testUpdateMappingDynamicallyWhilePercolating() throws Exception {
createIndex("test");

// percolation source
XContentBuilder percolateDocumentSource = XContentFactory.jsonBuilder().startObject().startObject("doc")
.field("field1", 1)
.field("field2", "value")
.endObject().endObject();

PercolateResponse response = client().preparePercolate()
.setIndices("test").setDocumentType("type1")
.setSource(percolateDocumentSource).execute().actionGet();
assertMatchCount(response, 0l);
assertThat(response.getMatches(), arrayWithSize(0));

// wait until the mapping change has propagated
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet();
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get();
return pendingTasks.pendingTasks().isEmpty();
}
});
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet();

GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings("test").get();
assertThat(mappingsResponse.getMappings().get("test"), notNullValue());
assertThat(mappingsResponse.getMappings().get("test").get("type1"), notNullValue());
assertThat(mappingsResponse.getMappings().get("test").get("type1").getSourceAsMap().isEmpty(), is(false));
Map<String, Object> properties = (Map<String, Object>) mappingsResponse.getMappings().get("test").get("type1").getSourceAsMap().get("properties");
assertThat(((Map<String, String>) properties.get("field1")).get("type"), equalTo("long"));
assertThat(((Map<String, String>) properties.get("field2")).get("type"), equalTo("string"));
}

void initNestedIndexAndPercolation() throws IOException {
XContentBuilder mapping = XContentFactory.jsonBuilder();
mapping.startObject().startObject("properties").startObject("companyname").field("type", "string").endObject()
Expand Down