Skip to content

Commit

Permalink
fixup! fixup! be more forgiving on trying to retrieve routing/shards …
Browse files Browse the repository at this point in the history
…for partitioned tables
  • Loading branch information
mfussenegger committed May 11, 2015
1 parent e55b802 commit 92993a3
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 74 deletions.
2 changes: 1 addition & 1 deletion sql/src/main/java/io/crate/metadata/ReferenceInfos.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public ReferenceInfos(Map<String, SchemaInfo> builtInSchemas,
this.builtInSchemas = builtInSchemas;
this.clusterService = clusterService;
this.transportPutIndexTemplateAction = transportPutIndexTemplateAction;
this.executorService = (ExecutorService) threadPool.executor(ThreadPool.Names.GENERIC);
this.executorService = (ExecutorService) threadPool.executor(ThreadPool.Names.SUGGEST);
schemas.putAll(builtInSchemas);
schemas.putAll(resolveCustomSchemas(clusterService.state().metaData()));
clusterService.add(this);
Expand Down
2 changes: 1 addition & 1 deletion sql/src/main/java/io/crate/metadata/doc/DocSchemaInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public DocTableInfo load(@Nonnull String key) throws Exception {
public DocSchemaInfo(ClusterService clusterService,
ThreadPool threadPool,
TransportPutIndexTemplateAction transportPutIndexTemplateAction) {
executorService = (ExecutorService) threadPool.executor(ThreadPool.Names.GENERIC);
executorService = (ExecutorService) threadPool.executor(ThreadPool.Names.SUGGEST);
schemaName = ReferenceInfos.DEFAULT_SCHEMA_NAME;
this.clusterService = clusterService;
clusterService.add(this);
Expand Down
175 changes: 104 additions & 71 deletions sql/src/main/java/io/crate/metadata/doc/DocTableInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.crate.metadata.table.ColumnPolicy;
import io.crate.planner.RowGranularity;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
Expand Down Expand Up @@ -174,14 +175,9 @@ public Routing getRouting(WhereClause whereClause, @Nullable String preference)
return getRouting(observer, whereClause, preference, 0, true);
}

private Routing getRouting(final ClusterStateObserver observer,
final WhereClause whereClause,
@Nullable final String preference,
final int currentRetry,
final boolean retry) {
ClusterState clusterState = observer.observedState();
final Map<String, Map<String, List<Integer>>> locations = new TreeMap<>();

private GroupShardsIterator getShardIterators(WhereClause whereClause,
@Nullable String preference,
ClusterState clusterState) throws IndexMissingException {
String[] routingIndices = concreteIndices;
if (whereClause.partitions().size() > 0) {
routingIndices = whereClause.partitions().toArray(new String[whereClause.partitions().size()]);
Expand All @@ -192,20 +188,57 @@ private Routing getRouting(final ClusterStateObserver observer,
routingMap = clusterState.metaData().resolveSearchRouting(
whereClause.routingValues(), routingIndices);
}
return clusterService.operationRouting().searchShards(
clusterState,
indices,
routingIndices,
routingMap,
preference
);
}

private Routing getRouting(final ClusterStateObserver observer,
final WhereClause whereClause,
@Nullable final String preference,
final int currentRetry,
final boolean retry) {
ClusterState clusterState = observer.observedState();
final Map<String, Map<String, List<Integer>>> locations = new TreeMap<>();

GroupShardsIterator shardIterators;
try {
shardIterators = clusterService.operationRouting().searchShards(
clusterState,
indices,
routingIndices,
routingMap,
preference
);
shardIterators = getShardIterators(whereClause, preference, clusterState);
} catch (IndexMissingException e) {
return new Routing();
}

final List<ShardId> missingShards = new ArrayList<>(0);
fillLocationsFromShardIterators(locations, shardIterators, missingShards);

if (missingShards.isEmpty()) {
return new Routing(locations);
} else {
if (!retry || currentRetry > MAX_ROUTING_RETRIES) {
throw new UnavailableShardsException(missingShards.get(0));
}

final SettableFuture<Routing> futureRouting = SettableFuture.create();
observer.waitForNextChange(
new FetchRoutingListener(futureRouting, observer, whereClause, preference, currentRetry));

try {
return futureRouting.get();
} catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

private void fillLocationsFromShardIterators(Map<String, Map<String, List<Integer>>> locations,
GroupShardsIterator shardIterators,
List<ShardId> missingShards) {
ShardRouting shardRouting;
for (ShardIterator shardIterator : shardIterators) {
shardRouting = shardIterator.nextOrNull();
Expand All @@ -224,62 +257,6 @@ private Routing getRouting(final ClusterStateObserver observer,
}
}
}
if (missingShards.isEmpty()) {
return new Routing(locations);
} else {
if (!retry) {
throw new UnavailableShardsException(missingShards.get(0));
} else if (currentRetry > MAX_ROUTING_RETRIES) {
throw new UnavailableShardsException(missingShards.get(0));
}
final SettableFuture<Routing> futureRouting = SettableFuture.create();
observer.waitForNextChange(new ClusterStateObserver.Listener() {

Future<?> innerTaskFuture;

@Override
public void onNewClusterState(ClusterState state) {
try {
innerTaskFuture = executorService.submit(new Runnable() {
@Override
public void run() {
try {
futureRouting.set(getRouting(observer, whereClause, preference, currentRetry + 1, true));
} catch (Throwable e) {
futureRouting.setException(e);
}
}
});
} catch (RejectedExecutionException e) {
futureRouting.setException(e);
}
}

@Override
public void onClusterServiceClose() {
futureRouting.setException(new IllegalStateException("ClusterService closed"));
if (innerTaskFuture != null) {
innerTaskFuture.cancel(true);
}
}

@Override
public void onTimeout(TimeValue timeout) {
// one last retry before giving up
futureRouting.set(getRouting(observer, whereClause, preference, currentRetry + 1, false));
if (innerTaskFuture != null) {
innerTaskFuture.cancel(true);
}
}
});
try {
return futureRouting.get();
} catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

public List<ColumnIdent> primaryKey() {
Expand Down Expand Up @@ -367,4 +344,60 @@ public TableParameterInfo tableParameterInfo () {
return tableParameterInfo;
}

private class FetchRoutingListener implements ClusterStateObserver.Listener {

private final SettableFuture<Routing> futureRouting;
private final ClusterStateObserver observer;
private final WhereClause whereClause;
private final String preference;
private final int currentRetry;
Future<?> innerTaskFuture;

public FetchRoutingListener(SettableFuture<Routing> futureRouting,
ClusterStateObserver observer,
WhereClause whereClause,
String preference,
int currentRetry) {
this.futureRouting = futureRouting;
this.observer = observer;
this.whereClause = whereClause;
this.preference = preference;
this.currentRetry = currentRetry;
}

@Override
public void onNewClusterState(ClusterState state) {
try {
innerTaskFuture = executorService.submit(new Runnable() {
@Override
public void run() {
try {
futureRouting.set(getRouting(observer, whereClause, preference, currentRetry + 1, true));
} catch (Throwable e) {
futureRouting.setException(e);
}
}
});
} catch (RejectedExecutionException e) {
futureRouting.setException(e);
}
}

@Override
public void onClusterServiceClose() {
futureRouting.setException(new IllegalStateException("ClusterService closed"));
if (innerTaskFuture != null) {
innerTaskFuture.cancel(true);
}
}

@Override
public void onTimeout(TimeValue timeout) {
// one last retry before giving up
futureRouting.set(getRouting(observer, whereClause, preference, currentRetry + 1, false));
if (innerTaskFuture != null) {
innerTaskFuture.cancel(true);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public ShardReferenceResolver(Index index,
tableIdent,
clusterService,
transportPutIndexTemplateAction,
(ExecutorService) threadPool.executor(ThreadPool.Names.GENERIC),
(ExecutorService) threadPool.executor(ThreadPool.Names.SUGGEST),
true).build();
assert info.isPartitioned();
int i = 0;
Expand Down

0 comments on commit 92993a3

Please sign in to comment.