Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to murmurhash3 to route documents to shards. #7954

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 23 additions & 1 deletion docs/reference/migration/migrate_2_0.asciidoc
Expand Up @@ -9,4 +9,26 @@ your application to Elasticsearch 2.0.
The <<alias-retrieving, get alias api>> will, by default produce an error response
if a requested index does not exist. This change brings the defaults for this API in
line with the other Indices APIs. The <<multi-index>> options can be used on a request
to change this behavior
to change this behavior

=== Routing

The default hash function that is used for routing has been changed from djb2 to
murmur3. This change should be transparent unless you relied on very specific
properties of djb2. This will help ensure a better balance of the document counts
between shards.

In addition, the following node settings related to routing have been deprecated:

[horizontal]

`cluster.routing.operation.hash.type`::

This was an undocumented setting that allowed to configure which hash function
to use for routing. `murmur3` is now enforced on new indices.

`cluster.routing.operation.use_type`::

This was an undocumented setting that allowed to take the `_type` of the
document into account when computing its shard (default: `false`). `false` is
now enforced on new indices.
13 changes: 8 additions & 5 deletions rest-api-spec/test/delete/50_refresh.yaml
Expand Up @@ -21,11 +21,14 @@
body: { foo: bar }
refresh: 1

# If you wonder why this document get 3 as an id instead of 2, it is because the
# current routing algorithm would route 1 and 2 to the same shard while we need
# them to be different for this test to pass
- do:
index:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will make our tests go wild for rest compat so be prepared

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm maybe not if we go with 2.x only change since REST tests only run against major version and if not we should fix it.

index: test_1
type: test
id: 2
id: 3
body: { foo: bar }
refresh: 1

Expand All @@ -34,7 +37,7 @@
index: test_1
type: test
body:
query: { terms: { _id: [1,2] }}
query: { terms: { _id: [1,3] }}

- match: { hits.total: 2 }

Expand All @@ -49,15 +52,15 @@
index: test_1
type: test
body:
query: { terms: { _id: [1,2] }}
query: { terms: { _id: [1,3] }}

- match: { hits.total: 2 }

- do:
delete:
index: test_1
type: test
id: 2
id: 3
refresh: 1

# If a replica shard where doc 1 is located gets initialized at this point, doc 1
Expand All @@ -69,6 +72,6 @@
index: test_1
type: test
body:
query: { terms: { _id: [1,2] }}
query: { terms: { _id: [1,3] }}

- match: { hits.total: 1 }
Expand Up @@ -28,6 +28,8 @@
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
import org.elasticsearch.cluster.routing.operation.hash.murmur3.Murmur3HashFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand Down Expand Up @@ -163,8 +165,14 @@ public static State fromString(String state) {
public static final String SETTING_VERSION_CREATED = "index.version.created";
public static final String SETTING_CREATION_DATE = "index.creation_date";
public static final String SETTING_UUID = "index.uuid";
public static final String SETTING_LEGACY_ROUTING_HASH_FUNCTION = "index.legacy.routing.hash.type";
public static final String SETTING_LEGACY_ROUTING_USE_TYPE = "index.legacy.routing.use_type";
public static final String INDEX_UUID_NA_VALUE = "_na_";

// hard-coded hash function as of 2.0
// older indices will read which hash function to use in their index settings
private static final HashFunction MURMUR3_HASH_FUNCTION = new Murmur3HashFunction();

private final String index;
private final long version;

Expand All @@ -184,6 +192,10 @@ public static State fromString(String state) {
private final DiscoveryNodeFilters includeFilters;
private final DiscoveryNodeFilters excludeFilters;

private final Version indexCreatedVersion;
private final HashFunction routingHashFunction;
private final boolean useTypeForRouting;

private IndexMetaData(String index, long version, State state, Settings settings, ImmutableOpenMap<String, MappingMetaData> mappings, ImmutableOpenMap<String, AliasMetaData> aliases, ImmutableOpenMap<String, Custom> customs) {
Preconditions.checkArgument(settings.getAsInt(SETTING_NUMBER_OF_SHARDS, null) != null, "must specify numberOfShards for index [" + index + "]");
Preconditions.checkArgument(settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, null) != null, "must specify numberOfReplicas for index [" + index + "]");
Expand Down Expand Up @@ -214,10 +226,20 @@ private IndexMetaData(String index, long version, State state, Settings settings
} else {
excludeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, excludeMap);
}
indexCreatedVersion = Version.indexCreated(settings);
final Class<? extends HashFunction> hashFunctionClass = settings.getAsClass(SETTING_LEGACY_ROUTING_HASH_FUNCTION, null);
if (hashFunctionClass == null) {
routingHashFunction = MURMUR3_HASH_FUNCTION;
} else {
try {
routingHashFunction = hashFunctionClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new ElasticsearchIllegalStateException("Cannot instantiate hash function", e);
}
}
useTypeForRouting = settings.getAsBoolean(SETTING_LEGACY_ROUTING_USE_TYPE, false);
}



public String index() {
return index;
}
Expand Down Expand Up @@ -254,6 +276,41 @@ public long getVersion() {
return this.version;
}

/**
* Return the {@link Version} on which this index has been created. This
* information is typically useful for backward compatibility.
*/
public Version creationVersion() {
return indexCreatedVersion;
}

public Version getCreationVersion() {
return creationVersion();
}

/**
* Return the {@link HashFunction} that should be used for routing.
*/
public HashFunction routingHashFunction() {
return routingHashFunction;
}

public HashFunction getRoutingHashFunction() {
return routingHashFunction();
}

/**
* Return whether routing should use the _type in addition to the _id in
* order to decide which shard a document should go to.
*/
public boolean routingUseType() {
return useTypeForRouting;
}

public boolean getRoutingUseType() {
return routingUseType();
}

public long creationDate() {
return settings.getAsLong(SETTING_CREATION_DATE, -1l);
}
Expand Down
Expand Up @@ -20,8 +20,6 @@
package org.elasticsearch.cluster.routing.operation;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should drop this module entirely and a make it hardcoded wherever it's used? Or at least not changeable? I don't think anybody should change this module or have it's own operation routing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kimchy what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be delayed to another PR? I am slightly concerned this PR has already been awaiting merge for a long time?

import com.google.common.collect.ImmutableList;
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
import org.elasticsearch.cluster.routing.operation.plain.PlainOperationRoutingModule;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
Expand All @@ -48,6 +46,5 @@ public Iterable<? extends Module> spawnModules() {

@Override
protected void configure() {
bind(HashFunction.class).to(settings.getAsClass("cluster.routing.operation.hash.type", DjbHashFunction.class, "org.elasticsearch.cluster.routing.operation.hash.", "HashFunction")).asEagerSingleton();
}
}
Expand Up @@ -37,5 +37,6 @@ public interface HashFunction {
* @param routing String to calculate the hash value from
* @return hash value of the given type and routing string
*/
@Deprecated
int hash(String type, String id);
}
@@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.routing.operation.hash.murmur3;

import org.apache.lucene.util.StringHelper;
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;

/**
* Hash function based on the Murmur3 algorithm, which is the default as of Elasticsearch 2.0.
*/
public class Murmur3HashFunction implements HashFunction {

@Override
public int hash(String routing) {
final byte[] bytesToHash = new byte[routing.length() * 2];
for (int i = 0; i < routing.length(); ++i) {
final char c = routing.charAt(i);
final byte b1 = (byte) c, b2 = (byte) (c >>> 8);
assert ((b1 & 0xFF) | ((b2 & 0xFF) << 8)) == c; // no information loss
bytesToHash[i * 2] = b1;
bytesToHash[i * 2 + 1] = b2;
}
return StringHelper.murmurhash3_x86_32(bytesToHash, 0, bytesToHash.length, 0);
}

@Override
public int hash(String type, String id) {
throw new UnsupportedOperationException();
}

}
Expand Up @@ -20,8 +20,8 @@
package org.elasticsearch.cluster.routing.operation.plain;

import com.google.common.collect.Lists;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -37,30 +37,30 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexMissingException;

import java.util.*;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
*
*/
public class PlainOperationRouting extends AbstractComponent implements OperationRouting {

private final HashFunction hashFunction;

private final boolean useType;

private final AwarenessAllocationDecider awarenessAllocationDecider;

@Inject
public PlainOperationRouting(Settings indexSettings, HashFunction hashFunction, AwarenessAllocationDecider awarenessAllocationDecider) {
super(indexSettings);
this.hashFunction = hashFunction;
this.useType = indexSettings.getAsBoolean("cluster.routing.operation.use_type", false);
public PlainOperationRouting(Settings settings, AwarenessAllocationDecider awarenessAllocationDecider) {
super(settings);
this.awarenessAllocationDecider = awarenessAllocationDecider;
}

Expand Down Expand Up @@ -262,22 +262,35 @@ protected IndexShardRoutingTable shards(ClusterState clusterState, String index,
return indexShard;
}

private int shardId(ClusterState clusterState, String index, String type, @Nullable String id, @Nullable String routing) {
private int shardId(ClusterState clusterState, String index, String type, String id, @Nullable String routing) {
final IndexMetaData indexMetaData = indexMetaData(clusterState, index);
final Version createdVersion = indexMetaData.getCreationVersion();
final HashFunction hashFunction = indexMetaData.getRoutingHashFunction();
final boolean useType = indexMetaData.getRoutingUseType();

final int hash;
if (routing == null) {
if (!useType) {
return Math.abs(hash(id) % indexMetaData(clusterState, index).numberOfShards());
hash = hash(hashFunction, id);
} else {
return Math.abs(hash(type, id) % indexMetaData(clusterState, index).numberOfShards());
hash = hash(hashFunction, type, id);
}
} else {
hash = hash(hashFunction, routing);
}
if (createdVersion.onOrAfter(Version.V_2_0_0)) {
return MathUtils.mod(hash, indexMetaData.numberOfShards());
} else {
return Math.abs(hash % indexMetaData.numberOfShards());
}
return Math.abs(hash(routing) % indexMetaData(clusterState, index).numberOfShards());
}

protected int hash(String routing) {
protected int hash(HashFunction hashFunction, String routing) {
return hashFunction.hash(routing);
}

protected int hash(String type, String id) {
@Deprecated
protected int hash(HashFunction hashFunction, String type, String id) {
if (type == null || "_all".equals(type)) {
throw new ElasticsearchIllegalArgumentException("Can't route an operation with no type and having type part of the routing (for backward comp)");
}
Expand All @@ -289,4 +302,5 @@ private void ensureNodeIdExists(DiscoveryNodes nodes, String nodeId) {
throw new ElasticsearchIllegalArgumentException("No data node with id[" + nodeId + "] found");
}
}

}