Skip to content

Commit

Permalink
add ability to iterate over assigned shards, and add an extension poi…
Browse files Browse the repository at this point in the history
…nt to control shard routing iteration in the broadcast based action support
  • Loading branch information
kimchy committed Aug 17, 2010
1 parent 7833cb1 commit 96fc16d
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,26 @@ protected TransportBroadcastOperationAction(Settings settings, ThreadPool thread

protected abstract GroupShardsIterator shards(Request request, ClusterState clusterState);

/**
* Allows to override how shard routing is iterated over. Default implementation uses
* {@link ShardsIterator#nextActiveOrNull()}.
*
* <p>Note, if overriding this method, make sure to also override {@link #hasNextShard(org.elasticsearch.cluster.routing.ShardsIterator)}.
*/
protected ShardRouting nextShardOrNull(ShardsIterator shardIt) {
return shardIt.nextActiveOrNull();
}

/**
* Allows to override how shard routing is iterated over. Default implementation uses
* {@link ShardsIterator#hasNextActive()}.
*
* <p>Note, if overriding this method, make sure to also override {@link #nextShardOrNull(org.elasticsearch.cluster.routing.ShardsIterator)}.
*/
protected boolean hasNextShard(ShardsIterator shardIt) {
return shardIt.hasNextActive();
}

protected boolean accumulateExceptions() {
return true;
}
Expand Down Expand Up @@ -153,7 +173,7 @@ public void start() {
// count the local operations, and perform the non local ones
int localOperations = 0;
for (final ShardsIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.nextActiveOrNull();
final ShardRouting shard = nextShardOrNull(shardIt);
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
localOperations++;
Expand All @@ -173,7 +193,7 @@ public void start() {
threadPool.execute(new Runnable() {
@Override public void run() {
for (final ShardsIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
final ShardRouting shard = nextShardOrNull(shardIt.reset());
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
performOperation(shardIt.reset(), false);
Expand All @@ -188,7 +208,7 @@ public void start() {
request.beforeLocalFork();
}
for (final ShardsIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
final ShardRouting shard = nextShardOrNull(shardIt.reset());
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
performOperation(shardIt.reset(), localAsync);
Expand All @@ -200,7 +220,7 @@ public void start() {
}

private void performOperation(final ShardsIterator shardIt, boolean localAsync) {
final ShardRouting shard = shardIt.nextActiveOrNull();
final ShardRouting shard = nextShardOrNull(shardIt);
if (shard == null) {
// no more active shards... (we should not really get here, just safety)
onOperation(shard, shardIt, null, false);
Expand Down Expand Up @@ -263,7 +283,7 @@ private void onOperation(ShardRouting shard, ShardResponse response, boolean alr

@SuppressWarnings({"unchecked"})
private void onOperation(ShardRouting shard, final ShardsIterator shardIt, Throwable t, boolean alreadyThreaded) {
if (!shardIt.hasNextActive()) {
if (!hasNextShard(shardIt)) {
// e is null when there is no next active....
if (logger.isDebugEnabled()) {
if (t != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,24 @@ private IndexShardsIterator(int index) {
return shardModulo(index++);
}

@Override public void remove() {
throw new UnsupportedOperationException();
}

@Override public int size() {
return IndexShardRoutingTable.this.size();
}

@Override public int sizeActive() {
int shardsActive = 0;
for (ShardRouting shardRouting : IndexShardRoutingTable.this.shards()) {
if (shardRouting.active()) {
shardsActive++;
}
}
return shardsActive;
}

@Override public boolean hasNextActive() {
int counter = this.counter;
int index = this.index;
Expand Down Expand Up @@ -200,22 +218,50 @@ private IndexShardsIterator(int index) {
return null;
}

@Override public void remove() {
throw new UnsupportedOperationException();
@Override public int sizeAssigned() {
int shardsAssigned = 0;
for (ShardRouting shardRouting : IndexShardRoutingTable.this.shards()) {
if (shardRouting.assignedToNode()) {
shardsAssigned++;
}
}
return shardsAssigned;
}

@Override public int size() {
return IndexShardRoutingTable.this.size();
@Override public boolean hasNextAssigned() {
int counter = this.counter;
int index = this.index;
while (counter++ < size()) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.assignedToNode()) {
return true;
}
}
return false;
}

@Override public int sizeActive() {
int shardsActive = 0;
for (ShardRouting shardRouting : IndexShardRoutingTable.this.shards()) {
if (shardRouting.active()) {
shardsActive++;
@Override public ShardRouting nextAssigned() throws NoSuchElementException {
ShardRouting shardRouting = nextAssignedOrNull();
if (shardRouting == null) {
throw new NoSuchElementException("No assigned shard found");
}
return shardRouting;
}

@Override public ShardRouting nextAssignedOrNull() {
int counter = this.counter;
int index = this.index;
while (counter++ < size()) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.assignedToNode()) {
this.counter = counter;
this.index = index;
return shardRouting;
}
}
return shardsActive;
this.counter = counter;
this.index = index;
return null;
}

@Override public ShardId shardId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,6 @@ public PlainShardsIterator(ShardId shardId, List<ShardRouting> shards) {
return shards.size();
}

@Override public int sizeActive() {
int sizeActive = 0;
for (ShardRouting shardRouting : shards) {
if (shardRouting.active()) {
sizeActive++;
}
}
return sizeActive;
}

@Override public ShardId shardId() {
return this.shardId;
}
Expand All @@ -79,6 +69,16 @@ public PlainShardsIterator(ShardId shardId, List<ShardRouting> shards) {
return shards.get(counter++);
}

@Override public int sizeActive() {
int sizeActive = 0;
for (ShardRouting shardRouting : shards) {
if (shardRouting.active()) {
sizeActive++;
}
}
return sizeActive;
}

@Override public boolean hasNextActive() {
int counter = this.counter;
while (counter < shards.size()) {
Expand Down Expand Up @@ -107,6 +107,44 @@ public PlainShardsIterator(ShardId shardId, List<ShardRouting> shards) {
return null;
}

@Override public int sizeAssigned() {
int sizeAssigned = 0;
for (ShardRouting shardRouting : shards) {
if (shardRouting.assignedToNode()) {
sizeAssigned++;
}
}
return sizeAssigned;
}

@Override public boolean hasNextAssigned() {
int counter = this.counter;
while (counter < shards.size()) {
if (shards.get(counter++).assignedToNode()) {
return true;
}
}
return false;
}

@Override public ShardRouting nextAssigned() throws NoSuchElementException {
ShardRouting shardRouting = nextAssignedOrNull();
if (shardRouting == null) {
throw new NoSuchElementException("No assigned shard found");
}
return shardRouting;
}

@Override public ShardRouting nextAssignedOrNull() {
while (counter < shards.size()) {
ShardRouting shardRouting = shards.get(counter++);
if (shardRouting.assignedToNode()) {
return shardRouting;
}
}
return null;
}

@Override public void remove() {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,80 @@
import java.util.NoSuchElementException;

/**
* @author kimchy (Shay Banon)
* Allows to iterate over a set of shard instances (routing) within a shard id group.
*
* @author kimchy (shay.banon)
*/
public interface ShardsIterator extends Iterable<ShardRouting>, Iterator<ShardRouting> {

/**
* The shard id this group relates to.
*/
ShardId shardId();

/**
* Resets the iterator.
*/
ShardsIterator reset();

/**
* The number of shard routing instances.
*/
int size();

/**
* The number of active shard routing instances.
*
* @see ShardRouting#active()
*/
int sizeActive();

ShardId shardId();

/**
* Is there an active shard we can iterate to.
*
* @see ShardRouting#active()
*/
boolean hasNextActive();

/**
* Returns the next active shard, or throws {@link NoSuchElementException}.
*
* @see ShardRouting#active()
*/
ShardRouting nextActive() throws NoSuchElementException;

/**
* Returns the next active shard, or <tt>null</tt>.
*
* @see ShardRouting#active()
*/
ShardRouting nextActiveOrNull();

/**
* The number of assigned shard routing instances.
*
* @see ShardRouting#assignedToNode()
*/
int sizeAssigned();

/**
* Is there an assigned shard we can iterate to.
*
* @see ShardRouting#assignedToNode()
*/
boolean hasNextAssigned();

/**
* Returns the next assigned shard, or throws {@link NoSuchElementException}.
*
* @see ShardRouting#assignedToNode()
*/
ShardRouting nextAssigned() throws NoSuchElementException;

/**
* Returns the next assigned shard, or <tt>null</tt>.
*
* @see ShardRouting#assignedToNode()
*/
ShardRouting nextAssignedOrNull();
}

0 comments on commit 96fc16d

Please sign in to comment.