Skip to content

Commit

Permalink
Allow to update the _source mapping exclude/include dynamically when …
Browse files Browse the repository at this point in the history
…we merge mappings.

Closes #3491
  • Loading branch information
bleskes committed Aug 13, 2013
1 parent 248042d commit cca12a8
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 8 deletions.
Expand Up @@ -372,8 +372,8 @@ public void run() {
}

// update the current cluster state
logger.debug("Updating cluster state version {}", newClusterState.version());
clusterState = newClusterState;
logger.debug("Set cluster state to version {}. Broadcasting to listeners.", newClusterState.version());

for (ClusterStateListener listener : priorityClusterStateListeners) {
listener.clusterChanged(clusterChangedEvent);
Expand Down Expand Up @@ -401,7 +401,7 @@ public void run() {
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
}

logger.debug("processing [{}]: done applying updated cluster_state", source);
logger.debug("processing [{}]: done applying updated cluster_state (version: {})", source, newClusterState.version());
} catch (Exception e) {
StringBuilder sb = new StringBuilder("failed to apply updated cluster state:\nversion [").append(newClusterState.version()).append("], source [").append(source).append("]\n");
sb.append(newClusterState.nodes().prettyPrint());
Expand Down
Expand Up @@ -182,11 +182,9 @@ public Mapper.Builder parse(String name, Map<String, Object> node, ParserContext
private final boolean enabled;

private Boolean compress;

private long compressThreshold;

private String[] includes;

private String[] excludes;

private String format;
Expand Down Expand Up @@ -406,6 +404,12 @@ public void merge(Mapper mergeWith, MergeContext mergeContext) throws MergeMappi
if (sourceMergeWith.compressThreshold != -1) {
this.compressThreshold = sourceMergeWith.compressThreshold;
}
if (sourceMergeWith.includes != null) {
this.includes = sourceMergeWith.includes;
}
if (sourceMergeWith.excludes != null) {
this.excludes = sourceMergeWith.excludes;
}
}
}
}
Expand Up @@ -31,7 +31,9 @@
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.optimize.OptimizeResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequestBuilder;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.client.AdminClient;
Expand Down Expand Up @@ -315,8 +317,20 @@ protected int numberOfNodes() {
}

// utils
protected void index(String index, String type, XContentBuilder source) {
client().prepareIndex(index, type).setSource(source).execute().actionGet();
protected GetResponse get(String index, String type, String id) {
return client().prepareGet(index, type, id).execute().actionGet();
}

protected IndexResponse index(String index, String type, XContentBuilder source) {
return client().prepareIndex(index, type).setSource(source).execute().actionGet();
}

protected IndexResponse index(String index, String type, String id, XContentBuilder source) {
return client().prepareIndex(index, type, id).setSource(source).execute().actionGet();
}

protected IndexResponse index(String index, String type, String id, String field, Object value) {
return client().prepareIndex(index, type, id).setSource(field, value).execute().actionGet();
}

protected RefreshResponse refresh() {
Expand Down Expand Up @@ -381,9 +395,9 @@ protected <Res extends BroadcastOperationResponse> Res run(BroadcastOperationReq
assertNoFailures(actionGet);
return actionGet;
}

// TODO move this into a base class for integration tests
public void indexRandom(String index, boolean forceRefresh, IndexRequestBuilder...builders) throws InterruptedException, ExecutionException {
public void indexRandom(String index, boolean forceRefresh, IndexRequestBuilder... builders) throws InterruptedException, ExecutionException {
Random random = getRandom();
List<IndexRequestBuilder> list = Arrays.asList(builders);
Collections.shuffle(list, random);
Expand Down
Expand Up @@ -4,6 +4,8 @@
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.json.JsonXContent;
Expand Down Expand Up @@ -109,6 +111,68 @@ public void updateMappingNoChanges() throws Exception {
}


@SuppressWarnings("unchecked")
@Test
public void updateIncludeExclude() throws Exception {
createIndexMapped("test", "type", "normal", "long", "exclude", "long", "include", "long");

logger.info("Index doc 1");
index("test", "type", "1", JsonXContent.contentBuilder().startObject()
.field("normal", 1).field("exclude", 1).field("include", 1)
.endObject()
);
refresh(); // commit it for later testing.


logger.info("Adding exclude settings");
PutMappingResponse putResponse = client().admin().indices().preparePutMapping("test").setType("type").setSource(
JsonXContent.contentBuilder().startObject().startObject("type")
.startObject("_source")
.startArray("excludes").value("exclude").endArray()
.endObject().endObject()
).get();

assertTrue(putResponse.isAcknowledged());

logger.info("Index doc 2");
index("test", "type", "2", JsonXContent.contentBuilder().startObject()
.field("normal", 2).field("exclude", 1).field("include", 2)
.endObject()
);

GetResponse getResponse = get("test", "type", "2");
assertThat(getResponse.getSource(), hasKey("normal"));
assertThat(getResponse.getSource(), not(hasKey("exclude")));
assertThat(getResponse.getSource(), hasKey("include"));


putResponse = client().admin().indices().preparePutMapping("test").setType("type").setSource(
JsonXContent.contentBuilder().startObject().startObject("type")
.startObject("_source")
.startArray("excludes").endArray()
.startArray("includes").value("include").endArray()
.endObject().endObject()
).get();
assertTrue(putResponse.isAcknowledged());

ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
MappingMetaData typeMapping = clusterStateResponse.getState().metaData().index("test").getMappings().get("type");
assertThat((Map<String, Object>) typeMapping.getSourceAsMap().get("_source"), hasKey("includes"));
assertThat((Map<String, Object>) typeMapping.getSourceAsMap().get("_source"), not(hasKey("excludes")));


index("test", "type", "3", JsonXContent.contentBuilder().startObject()
.field("normal", 3).field("exclude", 3).field("include", 3)
.endObject()
);

getResponse = get("test", "type", "3");
assertThat(getResponse.getSource(), not(hasKey("normal")));
assertThat(getResponse.getSource(), not(hasKey("exclude")));
assertThat(getResponse.getSource(), hasKey("include"));

}

@SuppressWarnings("unchecked")
@Test
public void updateDefaultMappingSettings() throws Exception {
Expand Down

0 comments on commit cca12a8

Please sign in to comment.