Skip to content

Commit

Permalink
[INDEX] Ensure index.version.created is consistent
Browse files Browse the repository at this point in the history
Today `index.version.created` depends on the version of the master
node in the cluster. This is potentially causing new features to be
expected on shards that didn't exist when the index was created.
There is no notion of `where was the shard allocated first` such that
`index.version.created` can't be reliably used as a feature flag.

With this change the `index.version.created` can be reliably used to
determin the smallest nodes version at the point in time when the index
was created. This means we can safely use certain features that would
for instance require reindeing and / or would not work if not the
entire index (all shards and segments) have been created with a certain
version or newer.

Closes #6660
  • Loading branch information
s1monw committed Jul 1, 2014
1 parent f14edef commit c9b7bec
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 31 deletions.
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
Expand Down Expand Up @@ -319,7 +320,9 @@ public ClusterState execute(ClusterState currentState) throws Exception {
}

if (indexSettingsBuilder.get(SETTING_VERSION_CREATED) == null) {
indexSettingsBuilder.put(SETTING_VERSION_CREATED, version);
DiscoveryNodes nodes = currentState.nodes();
final Version createdVersion = Version.smallest(version, nodes.smallestNonClientNodeVersion());
indexSettingsBuilder.put(SETTING_VERSION_CREATED, createdVersion);
}
indexSettingsBuilder.put(SETTING_UUID, Strings.randomBase64UUID());

Expand Down
29 changes: 22 additions & 7 deletions src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java
Expand Up @@ -55,13 +55,17 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {

private final String masterNodeId;
private final String localNodeId;
private final Version minNodeVersion;
private final Version minNonClientNodeVersion;

private DiscoveryNodes(ImmutableOpenMap<String, DiscoveryNode> nodes, ImmutableOpenMap<String, DiscoveryNode> dataNodes, ImmutableOpenMap<String, DiscoveryNode> masterNodes, String masterNodeId, String localNodeId) {
private DiscoveryNodes(ImmutableOpenMap<String, DiscoveryNode> nodes, ImmutableOpenMap<String, DiscoveryNode> dataNodes, ImmutableOpenMap<String, DiscoveryNode> masterNodes, String masterNodeId, String localNodeId, Version minNodeVersion, Version minNonClientNodeVersion) {
this.nodes = nodes;
this.dataNodes = dataNodes;
this.masterNodes = masterNodes;
this.masterNodeId = masterNodeId;
this.localNodeId = localNodeId;
this.minNodeVersion = minNodeVersion;
this.minNonClientNodeVersion = minNonClientNodeVersion;
}

@Override
Expand Down Expand Up @@ -290,11 +294,16 @@ public boolean isAllNodes(String... nodesIds) {
* @return the oldest version in the cluster
*/
public Version smallestVersion() {
Version version = Version.CURRENT;
for (ObjectCursor<DiscoveryNode> cursor : nodes.values()) {
version = Version.smallest(version, cursor.value.version());
}
return version;
return minNodeVersion;
}

/**
* Returns the version of the node with the oldest version in the cluster that is not a client node
*
* @return the oldest version in the cluster
*/
public Version smallestNonClientNodeVersion() {
return minNonClientNodeVersion;
}

/**
Expand Down Expand Up @@ -606,15 +615,21 @@ public Builder localNodeId(String localNodeId) {
public DiscoveryNodes build() {
ImmutableOpenMap.Builder<String, DiscoveryNode> dataNodesBuilder = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<String, DiscoveryNode> masterNodesBuilder = ImmutableOpenMap.builder();
Version minNodeVersion = Version.CURRENT;
Version minNonClientNodeVersion = Version.CURRENT;
for (ObjectObjectCursor<String, DiscoveryNode> nodeEntry : nodes) {
if (nodeEntry.value.dataNode()) {
dataNodesBuilder.put(nodeEntry.key, nodeEntry.value);
minNonClientNodeVersion = Version.smallest(minNonClientNodeVersion, nodeEntry.value.version());
}
if (nodeEntry.value.masterNode()) {
masterNodesBuilder.put(nodeEntry.key, nodeEntry.value);
minNonClientNodeVersion = Version.smallest(minNonClientNodeVersion, nodeEntry.value.version());
}
minNodeVersion = Version.smallest(minNodeVersion, nodeEntry.value.version());
}
return new DiscoveryNodes(nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), masterNodeId, localNodeId);

return new DiscoveryNodes(nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), masterNodeId, localNodeId, minNodeVersion, minNonClientNodeVersion);
}

public static void writeTo(DiscoveryNodes nodes, StreamOutput out) throws IOException {
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.lucene.index.FieldInfo.IndexOptions;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lucene.Lucene;
Expand Down Expand Up @@ -106,9 +107,13 @@ public FieldNamesFieldMapper build(BuilderContext context) {
public static class TypeParser implements Mapper.TypeParser {
@Override
public Mapper.Builder parse(String name, Map<String, Object> node, ParserContext parserContext) throws MapperParsingException {
FieldNamesFieldMapper.Builder builder = fieldNames();
parseField(builder, builder.name, node, parserContext);
return builder;
if (parserContext.indexVersionCreated().onOrAfter(Version.V_1_3_0)) {
FieldNamesFieldMapper.Builder builder = fieldNames();
parseField(builder, builder.name, node, parserContext);
return builder;
} else {
throw new ElasticsearchIllegalArgumentException("type="+CONTENT_TYPE+" is not supported on indices created before version 1.3.0 is your cluster running multiple datanode versions?");
}
}
}

Expand Down
Expand Up @@ -20,30 +20,46 @@

import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.lucene.util.English;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.internal.FieldNamesFieldMapper;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ElasticsearchBackwardsCompatIntegrationTest;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.existsFilter;
import static org.elasticsearch.index.query.FilterBuilders.missingFilter;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.index.query.QueryBuilders.filteredQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;

/**
*/
Expand Down Expand Up @@ -91,6 +107,8 @@ public void testInternalVersion() throws Exception {
assertThat("Document with ID " +id + " should exist but doesn't", get.isExists(), is(true));
assertThat(get.getVersion(), equalTo(2l));
}

assertVersionCreated(compatibilityVersion(), "test");
}

/**
Expand All @@ -110,6 +128,7 @@ public void testIndexAndSearch() throws Exception {
String id = Integer.toString(i);
assertHitCount(client().prepareSearch().setQuery(QueryBuilders.termQuery("the_id", id)).get(), 1);
}
assertVersionCreated(compatibilityVersion(), "test");
}

@Test
Expand All @@ -132,6 +151,7 @@ public void testRecoverFromPreviousVersion() throws ExecutionException, Interrup
countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);
}
assertVersionCreated(compatibilityVersion(), "test");
}

/**
Expand Down Expand Up @@ -166,6 +186,7 @@ public void testNoRecoveryFromNewNodes() throws ExecutionException, InterruptedE
countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);
}
assertVersionCreated(compatibilityVersion(), "test");
}

public void assertAllShardsOnNodes(String index, String pattern) {
Expand Down Expand Up @@ -214,6 +235,7 @@ public void testIndexUpgradeSingleNode() throws Exception {
countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);
}
assertVersionCreated(compatibilityVersion(), "test");
}

/**
Expand All @@ -229,7 +251,6 @@ public void testIndexRollingUpgrade() throws Exception {
assertAcked(prepareCreate(indices[i]).setSettings(ImmutableSettings.builder().put("index.routing.allocation.exclude._name", backwardsCluster().newNodePattern()).put(indexSettings())));
}


int numDocs = randomIntBetween(100, 150);
IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];
String[] indexForDoc = new String[docs.length];
Expand Down Expand Up @@ -260,7 +281,126 @@ public void testIndexRollingUpgrade() throws Exception {
client().admin().indices().prepareUpdateSettings(indices).setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "all")).get();
CountResponse countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);
String[] newIndices = new String[randomIntBetween(1,3)];

for (int i = 0; i < newIndices.length; i++) {
newIndices[i] = "new_index" + i;
createIndex(newIndices[i]);
}
assertVersionCreated(Version.CURRENT, newIndices); // new indices are all created with the new version
assertVersionCreated(compatibilityVersion(), indices);
}

public void assertVersionCreated(Version version, String... index) {
GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings(index).get();
ImmutableOpenMap<String,Settings> indexToSettings = getSettingsResponse.getIndexToSettings();
for (int i = 0; i < index.length; i++) {
Settings settings = indexToSettings.get(index[i]);
assertThat(settings.getAsVersion(IndexMetaData.SETTING_VERSION_CREATED, null), notNullValue());
assertThat(settings.getAsVersion(IndexMetaData.SETTING_VERSION_CREATED, null), equalTo(version));
}
}

@Test
public void testUnsupportedFeatures() throws IOException {
if (compatibilityVersion().before(Version.V_1_3_0)) {
XContentBuilder mapping = XContentBuilder.builder(JsonXContent.jsonXContent)
.startObject()
.startObject("type")
.startObject(FieldNamesFieldMapper.NAME)
// by setting randomly index to no we also test the pre-1.3 behavior
.field("index", randomFrom("no", "not_analyzed"))
.field("store", randomFrom("no", "yes"))
.endObject()
.endObject()
.endObject();

try {
assertAcked(prepareCreate("test").
setSettings(ImmutableSettings.builder().put("index.routing.allocation.exclude._name", backwardsCluster().newNodePattern()).put(indexSettings()))
.addMapping("type", mapping));
} catch (MapperParsingException ex) {
if (getMasterVersion().onOrAfter(Version.V_1_3_0)) {
assertThat(ex.getCause(), instanceOf(ElasticsearchIllegalArgumentException.class));
assertThat(ex.getCause().getMessage(), equalTo("type=_field_names is not supported on indices created before version 1.3.0 is your cluster running multiple datanode versions?"));
} else {
assertThat(ex.getCause(), instanceOf(MapperParsingException.class));
assertThat(ex.getCause().getMessage(), startsWith("Root type mapping not empty after parsing!"));
}
}
}

}

/**
* This filter had a major upgrade in 1.3 where we started to index the field names. Lets see if they still work as expected...
* this test is basically copied from SimpleQueryTests...
*/
public void testExistsFilter() throws IOException, ExecutionException, InterruptedException {
for (;;) {
createIndex("test");
indexRandom(true,
client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().startObject("obj1").field("obj1_val", "1").endObject().field("x1", "x_1").field("field1", "value1_1").field("field2", "value2_1").endObject()),
client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().startObject("obj1").field("obj1_val", "1").endObject().field("x2", "x_2").field("field1", "value1_2").endObject()),
client().prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().startObject("obj2").field("obj2_val", "1").endObject().field("y1", "y_1").field("field2", "value2_3").endObject()),
client().prepareIndex("test", "type1", "4").setSource(jsonBuilder().startObject().startObject("obj2").field("obj2_val", "1").endObject().field("y2", "y_2").field("field3", "value3_4").endObject()));

CountResponse countResponse = client().prepareCount().setQuery(filteredQuery(matchAllQuery(), existsFilter("field1"))).get();
assertHitCount(countResponse, 2l);

countResponse = client().prepareCount().setQuery(constantScoreQuery(existsFilter("field1"))).get();
assertHitCount(countResponse, 2l);

countResponse = client().prepareCount().setQuery(queryString("_exists_:field1")).get();
assertHitCount(countResponse, 2l);

countResponse = client().prepareCount().setQuery(filteredQuery(matchAllQuery(), existsFilter("field2"))).get();
assertHitCount(countResponse, 2l);

countResponse = client().prepareCount().setQuery(filteredQuery(matchAllQuery(), existsFilter("field3"))).get();
assertHitCount(countResponse, 1l);

// wildcard check
countResponse = client().prepareCount().setQuery(filteredQuery(matchAllQuery(), existsFilter("x*"))).get();
assertHitCount(countResponse, 2l);

// object check
countResponse = client().prepareCount().setQuery(filteredQuery(matchAllQuery(), existsFilter("obj1"))).get();
assertHitCount(countResponse, 2l);

countResponse = client().prepareCount().setQuery(filteredQuery(matchAllQuery(), missingFilter("field1"))).get();
assertHitCount(countResponse, 2l);

countResponse = client().prepareCount().setQuery(filteredQuery(matchAllQuery(), missingFilter("field1"))).get();
assertHitCount(countResponse, 2l);

countResponse = client().prepareCount().setQuery(constantScoreQuery(missingFilter("field1"))).get();
assertHitCount(countResponse, 2l);

countResponse = client().prepareCount().setQuery(queryString("_missing_:field1")).get();
assertHitCount(countResponse, 2l);

// wildcard check
countResponse = client().prepareCount().setQuery(filteredQuery(matchAllQuery(), missingFilter("x*"))).get();
assertHitCount(countResponse, 2l);

// object check
countResponse = client().prepareCount().setQuery(filteredQuery(matchAllQuery(), missingFilter("obj1"))).get();
assertHitCount(countResponse, 2l);
if (!backwardsCluster().upgradeOneNode()) {
break;
}
ensureYellow();
assertVersionCreated(compatibilityVersion(), "test"); // we had an old node in the cluster so we have to be on the compat version
assertAcked(client().admin().indices().prepareDelete("test"));
}

assertVersionCreated(Version.CURRENT, "test"); // after upgrade we have current version

}


public Version getMasterVersion() {
return client().admin().cluster().prepareState().get().getState().nodes().masterNode().getVersion();
}
}
Expand Up @@ -39,10 +39,8 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

/**
*
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE)
@ElasticsearchIntegrationTest.CompatibilityVersion(version = Version.V_1_2_0_ID) // we throw an exception if we create an index with _field_names that is 1.3
public class PreBuiltAnalyzerIntegrationTests extends ElasticsearchIntegrationTest {

@Override
Expand Down
Expand Up @@ -453,6 +453,8 @@ public void testCommonTermsQueryStackedTokens() throws Exception {

@Test
public void testOmitTermFreqsAndPositions() throws Exception {
cluster().wipeTemplates(); // no randomized template for this test -- we are testing bwc compat and set version explicitly this might cause failures if an unsupported feature
// is added randomly via an index template.
Version version = Version.CURRENT;
int iters = scaledRandomIntBetween(10, 20);
for (int i = 0; i < iters; i++) {
Expand Down

0 comments on commit c9b7bec

Please sign in to comment.