Skip to content

Commit

Permalink
Switch to murmurhash3 to route documents to shards.
Browse files Browse the repository at this point in the history
We currently use the djb2 hash function in order to compute the shard a
document should go to. Unfortunately this hash function is not very
sophisticated and you can sometimes hit adversarial cases, such as numeric ids
on 33 shards.

Murmur3 generates hashes with a better distribution, which should avoid the
adversarial cases.

Here are some examples of how 100000 incremental ids are distributed to shards
using either djb2 or murmur3.

5 shards:
Murmur3: [19933, 19964, 19940, 20030, 20133]
DJB:     [20000, 20000, 20000, 20000, 20000]

3 shards:
Murmur3: [33185, 33347, 33468]
DJB:     [30100, 30000, 39900]

33 shards:
Murmur3: [2999, 3096, 2930, 2986, 3070, 3093, 3023, 3052, 3112, 2940, 3036, 2985, 3031, 3048, 3127, 2961, 2901, 3105, 3041, 3130, 3013, 3035, 3031, 3019, 3008, 3022, 3111, 3086, 3016, 2996, 3075, 2945, 2977]
DJB:     [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 900, 900, 900, 900, 1000, 1000, 10000, 10000, 10000, 10000, 9100, 9100, 9100, 9100, 9000, 9000, 0, 0, 0, 0, 0, 0]

Even if djb2 looks ideal in some cases (5 shards), the fact that the
distribution of its hashes has some patterns can raise issues with some shard
counts (eg. 3, or even worse 33).

Some tests have been modified because they relied on implementation details of
the routing hash function.

Close #7954
  • Loading branch information
jpountz committed Nov 4, 2014
1 parent 8ef6e7e commit 9ea25df
Show file tree
Hide file tree
Showing 71 changed files with 971 additions and 554 deletions.
24 changes: 23 additions & 1 deletion docs/reference/migration/migrate_2_0.asciidoc
Expand Up @@ -18,4 +18,26 @@ Partial fields were deprecated since 1.0.0beta1 in favor of <<search-request-sou
=== More Like This Field

The More Like This Field query has been removed in favor of the <<query-dsl-mlt-query, More Like This Query>>
restrained set to a specific `field`.
restrained set to a specific `field`.

=== Routing

The default hash function that is used for routing has been changed from djb2 to
murmur3. This change should be transparent unless you relied on very specific
properties of djb2. This will help ensure a better balance of the document counts
between shards.

In addition, the following node settings related to routing have been deprecated:

[horizontal]

`cluster.routing.operation.hash.type`::

This was an undocumented setting that allowed to configure which hash function
to use for routing. `murmur3` is now enforced on new indices.

`cluster.routing.operation.use_type`::

This was an undocumented setting that allowed to take the `_type` of the
document into account when computing its shard (default: `false`). `false` is
now enforced on new indices.
13 changes: 8 additions & 5 deletions rest-api-spec/test/delete/50_refresh.yaml
Expand Up @@ -21,11 +21,14 @@
body: { foo: bar }
refresh: 1

# If you wonder why this document get 3 as an id instead of 2, it is because the
# current routing algorithm would route 1 and 2 to the same shard while we need
# them to be different for this test to pass
- do:
index:
index: test_1
type: test
id: 2
id: 3
body: { foo: bar }
refresh: 1

Expand All @@ -34,7 +37,7 @@
index: test_1
type: test
body:
query: { terms: { _id: [1,2] }}
query: { terms: { _id: [1,3] }}

- match: { hits.total: 2 }

Expand All @@ -49,15 +52,15 @@
index: test_1
type: test
body:
query: { terms: { _id: [1,2] }}
query: { terms: { _id: [1,3] }}

- match: { hits.total: 2 }

- do:
delete:
index: test_1
type: test
id: 2
id: 3
refresh: 1

# If a replica shard where doc 1 is located gets initialized at this point, doc 1
Expand All @@ -69,6 +72,6 @@
index: test_1
type: test
body:
query: { terms: { _id: [1,2] }}
query: { terms: { _id: [1,3] }}

- match: { hits.total: 1 }
Expand Up @@ -28,6 +28,8 @@
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
import org.elasticsearch.cluster.routing.operation.hash.murmur3.Murmur3HashFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand Down Expand Up @@ -163,8 +165,14 @@ public static State fromString(String state) {
public static final String SETTING_VERSION_CREATED = "index.version.created";
public static final String SETTING_CREATION_DATE = "index.creation_date";
public static final String SETTING_UUID = "index.uuid";
public static final String SETTING_LEGACY_ROUTING_HASH_FUNCTION = "index.legacy.routing.hash.type";
public static final String SETTING_LEGACY_ROUTING_USE_TYPE = "index.legacy.routing.use_type";
public static final String INDEX_UUID_NA_VALUE = "_na_";

// hard-coded hash function as of 2.0
// older indices will read which hash function to use in their index settings
private static final HashFunction MURMUR3_HASH_FUNCTION = new Murmur3HashFunction();

private final String index;
private final long version;

Expand All @@ -184,6 +192,10 @@ public static State fromString(String state) {
private final DiscoveryNodeFilters includeFilters;
private final DiscoveryNodeFilters excludeFilters;

private final Version indexCreatedVersion;
private final HashFunction routingHashFunction;
private final boolean useTypeForRouting;

private IndexMetaData(String index, long version, State state, Settings settings, ImmutableOpenMap<String, MappingMetaData> mappings, ImmutableOpenMap<String, AliasMetaData> aliases, ImmutableOpenMap<String, Custom> customs) {
Preconditions.checkArgument(settings.getAsInt(SETTING_NUMBER_OF_SHARDS, null) != null, "must specify numberOfShards for index [" + index + "]");
Preconditions.checkArgument(settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, null) != null, "must specify numberOfReplicas for index [" + index + "]");
Expand Down Expand Up @@ -214,10 +226,20 @@ private IndexMetaData(String index, long version, State state, Settings settings
} else {
excludeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, excludeMap);
}
indexCreatedVersion = Version.indexCreated(settings);
final Class<? extends HashFunction> hashFunctionClass = settings.getAsClass(SETTING_LEGACY_ROUTING_HASH_FUNCTION, null);
if (hashFunctionClass == null) {
routingHashFunction = MURMUR3_HASH_FUNCTION;
} else {
try {
routingHashFunction = hashFunctionClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new ElasticsearchIllegalStateException("Cannot instantiate hash function", e);
}
}
useTypeForRouting = settings.getAsBoolean(SETTING_LEGACY_ROUTING_USE_TYPE, false);
}



public String index() {
return index;
}
Expand Down Expand Up @@ -254,6 +276,41 @@ public long getVersion() {
return this.version;
}

/**
* Return the {@link Version} on which this index has been created. This
* information is typically useful for backward compatibility.
*/
public Version creationVersion() {
return indexCreatedVersion;
}

public Version getCreationVersion() {
return creationVersion();
}

/**
* Return the {@link HashFunction} that should be used for routing.
*/
public HashFunction routingHashFunction() {
return routingHashFunction;
}

public HashFunction getRoutingHashFunction() {
return routingHashFunction();
}

/**
* Return whether routing should use the _type in addition to the _id in
* order to decide which shard a document should go to.
*/
public boolean routingUseType() {
return useTypeForRouting;
}

public boolean getRoutingUseType() {
return routingUseType();
}

public long creationDate() {
return settings.getAsLong(SETTING_CREATION_DATE, -1l);
}
Expand Down
Expand Up @@ -20,8 +20,6 @@
package org.elasticsearch.cluster.routing.operation;

import com.google.common.collect.ImmutableList;
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
import org.elasticsearch.cluster.routing.operation.plain.PlainOperationRoutingModule;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
Expand All @@ -48,6 +46,5 @@ public Iterable<? extends Module> spawnModules() {

@Override
protected void configure() {
bind(HashFunction.class).to(settings.getAsClass("cluster.routing.operation.hash.type", DjbHashFunction.class, "org.elasticsearch.cluster.routing.operation.hash.", "HashFunction")).asEagerSingleton();
}
}
Expand Up @@ -37,5 +37,6 @@ public interface HashFunction {
* @param routing String to calculate the hash value from
* @return hash value of the given type and routing string
*/
@Deprecated
int hash(String type, String id);
}
@@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.routing.operation.hash.murmur3;

import org.apache.lucene.util.StringHelper;
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;

/**
* Hash function based on the Murmur3 algorithm, which is the default as of Elasticsearch 2.0.
*/
public class Murmur3HashFunction implements HashFunction {

@Override
public int hash(String routing) {
final byte[] bytesToHash = new byte[routing.length() * 2];
for (int i = 0; i < routing.length(); ++i) {
final char c = routing.charAt(i);
final byte b1 = (byte) c, b2 = (byte) (c >>> 8);
assert ((b1 & 0xFF) | ((b2 & 0xFF) << 8)) == c; // no information loss
bytesToHash[i * 2] = b1;
bytesToHash[i * 2 + 1] = b2;
}
return StringHelper.murmurhash3_x86_32(bytesToHash, 0, bytesToHash.length, 0);
}

@Override
public int hash(String type, String id) {
throw new UnsupportedOperationException();
}

}
Expand Up @@ -20,8 +20,8 @@
package org.elasticsearch.cluster.routing.operation.plain;

import com.google.common.collect.Lists;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -37,30 +37,30 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexMissingException;

import java.util.*;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
*
*/
public class PlainOperationRouting extends AbstractComponent implements OperationRouting {

private final HashFunction hashFunction;

private final boolean useType;

private final AwarenessAllocationDecider awarenessAllocationDecider;

@Inject
public PlainOperationRouting(Settings indexSettings, HashFunction hashFunction, AwarenessAllocationDecider awarenessAllocationDecider) {
super(indexSettings);
this.hashFunction = hashFunction;
this.useType = indexSettings.getAsBoolean("cluster.routing.operation.use_type", false);
public PlainOperationRouting(Settings settings, AwarenessAllocationDecider awarenessAllocationDecider) {
super(settings);
this.awarenessAllocationDecider = awarenessAllocationDecider;
}

Expand Down Expand Up @@ -262,22 +262,35 @@ protected IndexShardRoutingTable shards(ClusterState clusterState, String index,
return indexShard;
}

private int shardId(ClusterState clusterState, String index, String type, @Nullable String id, @Nullable String routing) {
private int shardId(ClusterState clusterState, String index, String type, String id, @Nullable String routing) {
final IndexMetaData indexMetaData = indexMetaData(clusterState, index);
final Version createdVersion = indexMetaData.getCreationVersion();
final HashFunction hashFunction = indexMetaData.getRoutingHashFunction();
final boolean useType = indexMetaData.getRoutingUseType();

final int hash;
if (routing == null) {
if (!useType) {
return Math.abs(hash(id) % indexMetaData(clusterState, index).numberOfShards());
hash = hash(hashFunction, id);
} else {
return Math.abs(hash(type, id) % indexMetaData(clusterState, index).numberOfShards());
hash = hash(hashFunction, type, id);
}
} else {
hash = hash(hashFunction, routing);
}
if (createdVersion.onOrAfter(Version.V_2_0_0)) {
return MathUtils.mod(hash, indexMetaData.numberOfShards());
} else {
return Math.abs(hash % indexMetaData.numberOfShards());
}
return Math.abs(hash(routing) % indexMetaData(clusterState, index).numberOfShards());
}

protected int hash(String routing) {
protected int hash(HashFunction hashFunction, String routing) {
return hashFunction.hash(routing);
}

protected int hash(String type, String id) {
@Deprecated
protected int hash(HashFunction hashFunction, String type, String id) {
if (type == null || "_all".equals(type)) {
throw new ElasticsearchIllegalArgumentException("Can't route an operation with no type and having type part of the routing (for backward comp)");
}
Expand All @@ -289,4 +302,5 @@ private void ensureNodeIdExists(DiscoveryNodes nodes, String nodeId) {
throw new ElasticsearchIllegalArgumentException("No data node with id[" + nodeId + "] found");
}
}

}

0 comments on commit 9ea25df

Please sign in to comment.