Skip to content

Commit

Permalink
New iteration.
Browse files Browse the repository at this point in the history
 - index creation version as a first-class citizen of index metadata
 - 32-bits murmur3 instead of 128 (we only took 32 bits of it anyway)
 - more tests
  • Loading branch information
jpountz committed Oct 17, 2014
1 parent e8c89d5 commit 38a384d
Show file tree
Hide file tree
Showing 63 changed files with 667 additions and 442 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
import org.elasticsearch.cluster.routing.operation.hash.murmur3.Murmur3HashFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand Down Expand Up @@ -163,8 +165,14 @@ public static State fromString(String state) {
public static final String SETTING_VERSION_CREATED = "index.version.created";
public static final String SETTING_CREATION_DATE = "index.creation_date";
public static final String SETTING_UUID = "index.uuid";
public static final String SETTING_LEGACY_ROUTING_HASH_FUNCTION = "index.legacy.routing.hash.type";
public static final String SETTING_LEGACY_ROUTING_USE_TYPE = "index.legacy.routing.use_type";
public static final String INDEX_UUID_NA_VALUE = "_na_";

// hard-coded hash function as of 2.0
// older indices will read which hash function to use in their index settings
private static final HashFunction MURMUR3_HASH_FUNCTION = new Murmur3HashFunction();

private final String index;
private final long version;

Expand All @@ -184,6 +192,10 @@ public static State fromString(String state) {
private final DiscoveryNodeFilters includeFilters;
private final DiscoveryNodeFilters excludeFilters;

private final Version indexCreatedVersion;
private final HashFunction routingHashFunction;
private final boolean useTypeForRouting;

private IndexMetaData(String index, long version, State state, Settings settings, ImmutableOpenMap<String, MappingMetaData> mappings, ImmutableOpenMap<String, AliasMetaData> aliases, ImmutableOpenMap<String, Custom> customs) {
Preconditions.checkArgument(settings.getAsInt(SETTING_NUMBER_OF_SHARDS, null) != null, "must specify numberOfShards for index [" + index + "]");
Preconditions.checkArgument(settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, null) != null, "must specify numberOfReplicas for index [" + index + "]");
Expand Down Expand Up @@ -214,10 +226,20 @@ private IndexMetaData(String index, long version, State state, Settings settings
} else {
excludeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, excludeMap);
}
indexCreatedVersion = Version.indexCreated(settings);
final Class<? extends HashFunction> hashFunctionClass = settings.getAsClass(SETTING_LEGACY_ROUTING_HASH_FUNCTION, null);
if (hashFunctionClass == null) {
routingHashFunction = MURMUR3_HASH_FUNCTION;
} else {
try {
routingHashFunction = hashFunctionClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new ElasticsearchIllegalStateException("Cannot instantiate hash function", e);
}
}
useTypeForRouting = settings.getAsBoolean(SETTING_LEGACY_ROUTING_USE_TYPE, false);
}



public String index() {
return index;
}
Expand Down Expand Up @@ -254,6 +276,41 @@ public long getVersion() {
return this.version;
}

/**
* Return the {@link Version} on which this index has been created. This
* information is typically useful for backward compatibility.
*/
public Version creationVersion() {
return indexCreatedVersion;
}

public Version getCreationVersion() {
return creationVersion();
}

/**
* Return the {@link HashFunction} that should be used for routing.
*/
public HashFunction routingHashFunction() {
return routingHashFunction;
}

public HashFunction getRoutingHashFunction() {
return routingHashFunction();
}

/**
* Return whether routing should use the _type in addition to the _id in
* order to decide which shard a document should go to.
*/
public boolean routingUseType() {
return useTypeForRouting;
}

public boolean getRoutingUseType() {
return routingUseType();
}

public long creationDate() {
return settings.getAsLong(SETTING_CREATION_DATE, -1l);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.elasticsearch.cluster.routing.operation.hash.murmur3;

import org.apache.lucene.util.StringHelper;
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
import org.elasticsearch.common.hash.MurmurHash3;

/**
* Hash function based on the Murmur3 algorithm, which is the default as of Elasticsearch 2.0.
Expand All @@ -32,13 +32,12 @@ public int hash(String routing) {
final byte[] bytesToHash = new byte[routing.length() * 2];
for (int i = 0; i < routing.length(); ++i) {
final char c = routing.charAt(i);
final byte b1 = (byte) (c >>> 8), b2 = (byte) c;
assert ((b1 & 0xFF) << 8 | (b2 & 0xFF)) == c; // no information loss
final byte b1 = (byte) c, b2 = (byte) (c >>> 8);
assert ((b1 & 0xFF) | ((b2 & 0xFF) << 8)) == c; // no information loss
bytesToHash[i * 2] = b1;
bytesToHash[i * 2 + 1] = b2;
}
final MurmurHash3.Hash128 hash = MurmurHash3.hash128(bytesToHash, 0, bytesToHash.length, 0, new MurmurHash3.Hash128());
return (int) hash.h1;
return StringHelper.murmurhash3_x86_32(bytesToHash, 0, bytesToHash.length, 0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand All @@ -34,7 +33,6 @@
import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
import org.elasticsearch.cluster.routing.operation.hash.murmur3.Murmur3HashFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
Expand All @@ -56,12 +54,7 @@
*/
public class PlainOperationRouting extends AbstractComponent implements OperationRouting {

public static final String SETTING_LEGACY_HASH_FUNCTION = "index.legacy.routing.hash.type";
public static final String SETTING_LEGACY_USE_TYPE = "index.legacy.routing.use_type";

// hard-coded hash function as of 2.0
// older indices will read which hash function to use in their index settings
private static final HashFunction HASH_FUNCTION = new Murmur3HashFunction();

private final AwarenessAllocationDecider awarenessAllocationDecider;

Expand Down Expand Up @@ -271,29 +264,9 @@ protected IndexShardRoutingTable shards(ClusterState clusterState, String index,

private int shardId(ClusterState clusterState, String index, String type, String id, @Nullable String routing) {
final IndexMetaData indexMetaData = indexMetaData(clusterState, index);
final Version createdVersion = Version.indexCreated(indexMetaData.getSettings());
final HashFunction hashFunction;
final boolean useType;

final Class<? extends HashFunction> hashFunctionClass = indexMetaData.getSettings().getAsClass(SETTING_LEGACY_HASH_FUNCTION, null);
if (createdVersion.onOrAfter(Version.V_2_0_0)) {
if (hashFunctionClass != null) {
throw new ElasticsearchIllegalStateException("Index [" + index + "] has the `" + SETTING_LEGACY_HASH_FUNCTION + "` setting while it is not supposed to have it since it was created on " + createdVersion);
}
hashFunction = HASH_FUNCTION;
useType = false;
} else {
if (hashFunctionClass == null) {
throw new ElasticsearchIllegalStateException("Index [" + index + "] misses the `" + SETTING_LEGACY_HASH_FUNCTION + "` setting while recovery from gateway should have added it since it was created on " + createdVersion);
}
// TODO: is there a way to make Guice instantiate it instead?
try {
hashFunction = hashFunctionClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new ElasticsearchIllegalStateException("Cannot instantiate hash function", e);
}
useType = indexMetaData.getSettings().getAsBoolean(SETTING_LEGACY_USE_TYPE, false);
}
final Version createdVersion = indexMetaData.getCreationVersion();
final HashFunction hashFunction = indexMetaData.getRoutingHashFunction();
final boolean useType = indexMetaData.getRoutingUseType();

final int hash;
if (routing == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
Expand All @@ -31,7 +32,6 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
import org.elasticsearch.cluster.routing.operation.plain.PlainOperationRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -46,7 +46,9 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -63,6 +65,8 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
static final Pattern GLOBAL_STATE_FILE_PATTERN = Pattern.compile(GLOBAL_STATE_FILE_PREFIX + "(\\d+)(" + MetaDataStateFormat.STATE_FILE_EXTENSION + ")?");
static final Pattern INDEX_STATE_FILE_PATTERN = Pattern.compile(INDEX_STATE_FILE_PREFIX + "(\\d+)(" + MetaDataStateFormat.STATE_FILE_EXTENSION + ")?");
private static final String GLOBAL_STATE_LOG_TYPE = "[_global]";
private static final String DEPRECATED_SETTING_ROUTING_HASH_FUNCTION = "cluster.routing.operation.hash.type";
private static final String DEPRECATED_SETTING_ROUTING_USE_TYPE = "cluster.routing.operation.use_type";

static enum AutoImportDangledState {
NO() {
Expand Down Expand Up @@ -523,27 +527,36 @@ private void pre019Upgrade() throws Exception {
/**
* Elasticsearch 2.0 deprecated custom routing hash functions. So what we do here is that for old indices, we
* move this old & deprecated node setting to an index setting so that we can keep things backward compatible.
* See {@link PlainOperationRouting}.
*/
private void pre20Upgrade() throws Exception {
final Class<? extends HashFunction> pre20HashFunction = settings.getAsClass("cluster.routing.operation.hash.type", DjbHashFunction.class, "org.elasticsearch.cluster.routing.operation.hash.", "HashFunction");
final boolean pre20UseType = settings.getAsBoolean("cluster.routing.operation.use_type", false);
final Class<? extends HashFunction> pre20HashFunction = settings.getAsClass(DEPRECATED_SETTING_ROUTING_HASH_FUNCTION, null, "org.elasticsearch.cluster.routing.operation.hash.", "HashFunction");
final Boolean pre20UseType = settings.getAsBoolean(DEPRECATED_SETTING_ROUTING_USE_TYPE, null);
MetaData metaData = loadMetaState();
for (IndexMetaData indexMetaData : metaData) {
if (indexMetaData.settings().get(PlainOperationRouting.SETTING_LEGACY_HASH_FUNCTION) == null
&& Version.indexCreated(indexMetaData.settings()).before(Version.V_2_0_0)) {
if (indexMetaData.settings().get(IndexMetaData.SETTING_LEGACY_ROUTING_HASH_FUNCTION) == null
&& indexMetaData.getCreationVersion().before(Version.V_2_0_0)) {
// these settings need an upgrade
Settings indexSettings = ImmutableSettings.builder().put(indexMetaData.settings())
.put(PlainOperationRouting.SETTING_LEGACY_HASH_FUNCTION, pre20HashFunction)
.put(PlainOperationRouting.SETTING_LEGACY_USE_TYPE, pre20UseType)
.put(IndexMetaData.SETTING_LEGACY_ROUTING_HASH_FUNCTION, pre20HashFunction == null ? DjbHashFunction.class : pre20HashFunction)
.put(IndexMetaData.SETTING_LEGACY_ROUTING_USE_TYPE, pre20UseType == null ? false : pre20UseType)
.build();
IndexMetaData newMetaData = IndexMetaData.builder(indexMetaData)
.version(indexMetaData.version())
.settings(indexSettings)
.build();
writeIndex("upgrade", newMetaData, null);
} else if (indexMetaData.getCreationVersion().onOrAfter(Version.V_2_0_0)) {
if (indexMetaData.getSettings().get(IndexMetaData.SETTING_LEGACY_ROUTING_HASH_FUNCTION) != null
|| indexMetaData.getSettings().get(IndexMetaData.SETTING_LEGACY_ROUTING_USE_TYPE) != null) {
throw new ElasticsearchIllegalStateException("Indices created on or after 2.0 should NOT contain [" + IndexMetaData.SETTING_LEGACY_ROUTING_HASH_FUNCTION
+ "] + or [" + IndexMetaData.SETTING_LEGACY_ROUTING_USE_TYPE + "] in their index settings");
}
}
}
if (pre20HashFunction != null || pre20UseType != null) {
logger.warn("Settings [{}] and [{}] are deprecated. Index settings from your old indices have been updated to record the fact that they "
+ "used some custom routing logic, you can now remove these settings from your `elasticsearch.yml` file", DEPRECATED_SETTING_ROUTING_HASH_FUNCTION, DEPRECATED_SETTING_ROUTING_USE_TYPE);
}
}

class RemoveDanglingIndex implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@
import org.apache.lucene.store.RAMDirectory;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.cluster.routing.operation.hash.murmur3.Murmur3HashFunction;
import org.elasticsearch.common.inject.internal.Join;
import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.indices.IndexMissingException;
Expand Down Expand Up @@ -209,7 +207,7 @@ protected void createIndexBasedOnFieldSettings(String index, String alias, TestF
/**
* Generate test documentsThe returned documents are already indexed.
*/
protected TestDoc[] generateTestDocs(int numberOfDocs, TestFieldSetting[] fieldSettings) {
protected TestDoc[] generateTestDocs(String index, TestFieldSetting[] fieldSettings) {
String[] fieldContentOptions = new String[]{"Generating a random permutation of a sequence (such as when shuffling cards).",
"Selecting a random sample of a population (important in statistical sampling).",
"Allocating experimental units via random assignment to a treatment or control condition.",
Expand All @@ -218,33 +216,26 @@ protected TestDoc[] generateTestDocs(int numberOfDocs, TestFieldSetting[] fieldS

String[] contentArray = new String[fieldSettings.length];
Map<String, Object> docSource = new HashMap<>();
TestDoc[] testDocs = new TestDoc[numberOfDocs];
int totalShards = getNumShards(index).numPrimaries;
TestDoc[] testDocs = new TestDoc[totalShards];
// this methods wants to send one doc to each shard
for (int shardId = 0; shardId < numberOfDocs; shardId++) {
for (int i = 0; i < totalShards; i++) {
docSource.clear();
for (int i = 0; i < contentArray.length; i++) {
contentArray[i] = fieldContentOptions[randomInt(fieldContentOptions.length - 1)];
docSource.put(fieldSettings[i].name, contentArray[i]);
for (int j = 0; j < contentArray.length; j++) {
contentArray[j] = fieldContentOptions[randomInt(fieldContentOptions.length - 1)];
docSource.put(fieldSettings[j].name, contentArray[j]);
}
int id = 0;
while (shardId(numberOfDocs, Integer.toString(id)) != shardId) {
id += 1;
}
TestDoc doc = new TestDoc(Integer.toString(id), fieldSettings, contentArray.clone());
final String id = routingKeyForShard(index, "type", i);
TestDoc doc = new TestDoc(id, fieldSettings, contentArray.clone());
index(doc.index, doc.type, doc.id, docSource);
testDocs[shardId] = doc;
testDocs[i] = doc;
}

refresh();
return testDocs;

}

// TODO: this test should NOT depend on the routing algorithm!
private static int shardId(int numberOfShards, String id) {
return MathUtils.mod(new Murmur3HashFunction().hash(id), numberOfShards);
}

protected TestConfig[] generateTestConfigs(int numberOfTests, TestDoc[] testDocs, TestFieldSetting[] fieldSettings) {
ArrayList<TestConfig> configs = new ArrayList<>();
for (int i = 0; i < numberOfTests; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public void testDuelESLucene() throws Exception {
TestFieldSetting[] testFieldSettings = getFieldSettings();
createIndexBasedOnFieldSettings("test", "alias", testFieldSettings);
//we generate as many docs as many shards we have
TestDoc[] testDocs = generateTestDocs(getNumShards("test").numPrimaries, testFieldSettings);
TestDoc[] testDocs = generateTestDocs("test", testFieldSettings);

DirectoryReader directoryReader = indexDocsWithLucene(testDocs);
TestConfig[] testConfigs = generateTestConfigs(20, testDocs, testFieldSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void testDuelESLucene() throws Exception {
AbstractTermVectorTests.TestFieldSetting[] testFieldSettings = getFieldSettings();
createIndexBasedOnFieldSettings("test", "alias", testFieldSettings);
//we generate as many docs as many shards we have
TestDoc[] testDocs = generateTestDocs(getNumShards("test").numPrimaries, testFieldSettings);
TestDoc[] testDocs = generateTestDocs("test", testFieldSettings);

DirectoryReader directoryReader = indexDocsWithLucene(testDocs);
AbstractTermVectorTests.TestConfig[] testConfigs = generateTestConfigs(20, testDocs, testFieldSettings);
Expand Down
Loading

0 comments on commit 38a384d

Please sign in to comment.