Skip to content

Commit

Permalink
retry to retrieve routing on missing shards
Browse files Browse the repository at this point in the history
and use ensureYellow isntead of ensureGreen in tests
  • Loading branch information
mfussenegger committed Feb 4, 2015
1 parent a9106f7 commit 1cc58ef
Show file tree
Hide file tree
Showing 25 changed files with 420 additions and 357 deletions.
5 changes: 5 additions & 0 deletions sql/src/main/java/io/crate/metadata/OutputName.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,9 @@ public OutputName(String name) {
public String outputName() {
return name;
}

@Override
public String toString() {
return name;
}
}
79 changes: 72 additions & 7 deletions sql/src/main/java/io/crate/metadata/doc/DocTableInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

package io.crate.metadata.doc;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.SettableFuture;
import io.crate.analyze.AlterPartitionedTableParameterInfo;
import io.crate.analyze.TableParameterInfo;
import io.crate.analyze.WhereClause;
Expand All @@ -34,13 +36,20 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexMissingException;

import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;


public class DocTableInfo extends AbstractDynamicTableInfo {
Expand All @@ -58,6 +67,7 @@ public class DocTableInfo extends AbstractDynamicTableInfo {
private final BytesRef numberOfReplicas;
private final ClusterService clusterService;
private final TableParameterInfo tableParameterInfo;
private final ESLogger logger = Loggers.getLogger(getClass());

private final String[] indices;
private final List<PartitionName> partitions;
Expand Down Expand Up @@ -158,11 +168,23 @@ private void processShardRouting(Map<String, Map<String, Set<Integer>>> location
shards.add(shardRouting.id());
}


/**
* retrieves the routing
*
* In case some shards are still unassigned or initializing this method might block up to
* 1 second and wait for the shards to become ready.
*/
@Override
public Routing getRouting(WhereClause whereClause, @Nullable String preference) {
ClusterState clusterState = clusterService.state();
Map<String, Map<String, Set<Integer>>> locations = new HashMap<>();
ClusterStateObserver observer = new ClusterStateObserver(
clusterService, new TimeValue(1, TimeUnit.SECONDS), logger);
return getRouting(observer, whereClause, preference);
}

private Routing getRouting(
final ClusterStateObserver observer, final WhereClause whereClause, @Nullable final String preference) {
ClusterState clusterState = observer.observedState();
final Map<String, Map<String, Set<Integer>>> locations = new HashMap<>();

String[] routingIndices = concreteIndices;
if (whereClause.partitions().size() > 0) {
Expand All @@ -174,7 +196,6 @@ public Routing getRouting(WhereClause whereClause, @Nullable String preference)
routingMap = clusterState.metaData().resolveSearchRouting(
ImmutableSet.of(whereClause.clusteredBy().get()), routingIndices);
}

GroupShardsIterator shardIterators;
try {
shardIterators = clusterService.operationRouting().searchShards(
Expand All @@ -187,16 +208,60 @@ public Routing getRouting(WhereClause whereClause, @Nullable String preference)
} catch (IndexMissingException e) {
return new Routing();
}

final List<ShardId> missingShards = new ArrayList<>(0);
ShardRouting shardRouting;
for (ShardIterator shardIterator : shardIterators) {
shardRouting = shardIterator.nextOrNull();
if (shardRouting != null && shardRouting.active()) {
processShardRouting(locations, shardRouting);
if (shardRouting != null) {
if (shardRouting.active()) {
processShardRouting(locations, shardRouting);
} else {
missingShards.add(shardIterator.shardId());
}
} else {
throw new UnavailableShardsException(shardIterator.shardId());
}
}
return new Routing(locations);
if (missingShards.isEmpty()) {
return new Routing(locations);
} else {
final SettableFuture<Routing> futureRouting = SettableFuture.create();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
futureRouting.set(getRouting(observer, whereClause, preference));
} catch (Throwable e) {
futureRouting.setException(e);
}
}
});
thread.setDaemon(true);
thread.start();
}

@Override
public void onClusterServiceClose() {
futureRouting.setException(new IllegalStateException("ClusterService closed"));
}

@Override
public void onTimeout(TimeValue timeout) {
futureRouting.setException(new UnavailableShardsException(missingShards.get(0)));
}
});
try {
return futureRouting.get();
} catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

public List<ColumnIdent> primaryKey() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testMathFunction(String function) {
@Test
public void testSelectWhereArithmeticScalar() throws Exception {
execute("create table t (d double, i integer) clustered into 1 shards with (number_of_replicas=0)");
ensureGreen();
ensureYellow();
execute("insert into t (d) values (?), (?), (?)", new Object[]{1.3d, 1.6d, 2.2d});
execute("refresh table t");

Expand Down Expand Up @@ -107,7 +107,7 @@ public void testSelectWhereArithmeticScalar() throws Exception {
@Test
public void testSelectOrderByScalar() throws Exception {
execute("create table t (d double, i integer, name string) clustered into 1 shards with (number_of_replicas=0)");
ensureGreen();
ensureYellow();
execute("insert into t (d, name) values (?, ?)", new Object[][] {
new Object[] {1.3d, "Arthur" },
new Object[] {1.6d, null },
Expand Down Expand Up @@ -151,7 +151,7 @@ public void testSelectOrderByScalar() throws Exception {
@Test
public void testSelectWhereArithmeticScalarTwoReferences() throws Exception {
execute("create table t (d double, i integer) clustered into 1 shards with (number_of_replicas=0)");
ensureGreen();
ensureYellow();
execute("insert into t (d, i) values (?, ?), (?, ?), (?, ?)", new Object[]{
1.3d, 1,
1.6d, 2,
Expand All @@ -168,7 +168,7 @@ public void testSelectWhereArithmeticScalarTwoReferences() throws Exception {
@Test
public void testSelectWhereArithmeticScalarTwoReferenceArgs() throws Exception {
execute("create table t (x long, base long) clustered into 1 shards with (number_of_replicas=0)");
ensureGreen();
ensureYellow();
execute("insert into t (x, base) values (?, ?), (?, ?), (?, ?)", new Object[]{
144L, 12L, // 2
65536L, 2L, // 16
Expand All @@ -190,7 +190,7 @@ public void testSelectWhereArithmeticScalarTwoReferenceArgs() throws Exception {
@Test
public void testScalarInOrderByAndSelect() throws Exception {
execute("create table t (i integer, l long, d double) clustered into 3 shards with (number_of_replicas=0)");
ensureGreen();
ensureYellow();
execute("insert into t (i, l, d) values (1, 2, 90.5), (-1, 4, 90.5), (193384, 31234594433, 99.0)");
execute("insert into t (i, l, d) values (1, 2, 99.0), (-1, 4, 99.0)");
refresh();
Expand All @@ -207,7 +207,7 @@ public void testScalarInOrderByAndSelect() throws Exception {
@Test
public void testNonIndexedColumnInRegexScalar() throws Exception {
execute("create table regex_noindex (i integer, s string INDEX OFF) clustered into 3 shards with (number_of_replicas=0)");
ensureGreen();
ensureYellow();
execute("insert into regex_noindex (i, s) values (?, ?)", new Object[][]{
new Object[]{1, "foo"},
new Object[]{2, "bar"},
Expand All @@ -229,7 +229,7 @@ public void testNonIndexedColumnInRegexScalar() throws Exception {
@Test
public void testFulltextColumnInRegexScalar() throws Exception {
execute("create table regex_fulltext (i integer, s string INDEX USING FULLTEXT) clustered into 3 shards with (number_of_replicas=0)");
ensureGreen();
ensureYellow();
execute("insert into regex_fulltext (i, s) values (?, ?)", new Object[][]{
new Object[]{1, "foo is first"},
new Object[]{2, "bar is second"},
Expand Down Expand Up @@ -276,7 +276,7 @@ public void testSelectRandomTwoTimes() throws Exception {
@Test
public void testSelectArithmeticOperatorInWhereClause() throws Exception {
execute("create table t (i integer, l long, d double) clustered into 3 shards with (number_of_replicas=0)");
ensureGreen();
ensureYellow();
execute("insert into t (i, l, d) values (1, 2, 90.5), (2, 5, 90.5), (193384, 31234594433, 99.0), (10, 21, 99.0), (-1, 4, 99.0)");
refresh();

Expand All @@ -303,7 +303,7 @@ public void testSelectArithmeticOperatorInWhereClause() throws Exception {
@Test
public void testSelectArithMetricOperatorInOrderBy() throws Exception {
execute("create table t (i integer, l long, d double) clustered into 3 shards with (number_of_replicas=0)");
ensureGreen();
ensureYellow();
execute("insert into t (i, l, d) values (1, 2, 90.5), (2, 5, 90.5), (193384, 31234594433, 99.0), (10, 21, 99.0), (-1, 4, 99.0)");
refresh();

Expand All @@ -323,7 +323,7 @@ public void testSelectFailingArithmeticScalar() throws Exception {
expectedException.expectMessage("log(x, b): given arguments would result in: 'NaN'");

execute("create table t (i integer, l long, d double) clustered into 1 shards with (number_of_replicas=0)");
ensureGreen();
ensureYellow();
execute("insert into t (i, l, d) values (1, 2, 90.5)");
refresh();

Expand All @@ -336,7 +336,7 @@ public void testSelectGroupByFailingArithmeticScalar() throws Exception {
expectedException.expectMessage("log(x, b): given arguments would result in: 'NaN'");

execute("create table t (i integer, l long, d double) with (number_of_replicas=0)");
ensureGreen();
ensureYellow();
execute("insert into t (i, l, d) values (1, 2, 90.5), (0, 4, 100)");
execute("refresh table t");

Expand All @@ -347,7 +347,7 @@ public void testSelectGroupByFailingArithmeticScalar() throws Exception {
public void testArithmeticScalarFunctionsOnAllTypes() throws Exception {
// this test validates that no exception is thrown
execute("create table t (b byte, s short, i integer, l long, f float, d double, t timestamp) with (number_of_replicas=0)");
ensureGreen();
ensureYellow();
execute("insert into t (b, s, i, l, f, d, t) values (1, 2, 3, 4, 5.7, 6.3, '2014-07-30')");
refresh();

Expand Down
Loading

0 comments on commit 1cc58ef

Please sign in to comment.