Skip to content

Commit

Permalink
add peer recovery status to the indices status API exposing both on g…
Browse files Browse the repository at this point in the history
…oing and summary when recovering from a peer shard
  • Loading branch information
kimchy committed Aug 17, 2010
1 parent 96fc16d commit 5fb80c3
Show file tree
Hide file tree
Showing 9 changed files with 407 additions and 139 deletions.
Expand Up @@ -111,26 +111,6 @@ public ByteSizeValue getStoreSize() {
return storeSize();
}

public ByteSizeValue estimatedFlushableMemorySize() {
long bytes = -1;
for (ShardStatus shard : shards()) {
if (shard.estimatedFlushableMemorySize() != null) {
if (bytes == -1) {
bytes = 0;
}
bytes += shard.estimatedFlushableMemorySize().bytes();
}
}
if (bytes == -1) {
return null;
}
return new ByteSizeValue(bytes);
}

public ByteSizeValue getEstimatedFlushableMemorySize() {
return estimatedFlushableMemorySize();
}

public long translogOperations() {
long translogOperations = -1;
for (ShardStatus shard : shards()) {
Expand All @@ -148,7 +128,12 @@ public long getTranslogOperations() {
return translogOperations();
}

private transient Docs docs;

public Docs docs() {
if (docs != null) {
return docs;
}
Docs docs = new Docs();
for (ShardStatus shard : shards()) {
if (!shard.shardRouting().primary()) {
Expand All @@ -174,7 +159,12 @@ public Docs docs() {
docs.deletedDocs += shard.docs().deletedDocs();
}
}
return docs;
if (docs.numDocs == -1) {
this.docs = Docs.UNKNOWN;
} else {
this.docs = docs;
}
return this.docs;
}

public Docs getDocs() {
Expand Down
Expand Up @@ -139,26 +139,6 @@ public ByteSizeValue getStoreSize() {
return storeSize();
}

public ByteSizeValue estimatedFlushableMemorySize() {
long bytes = -1;
for (IndexShardStatus shard : this) {
if (shard.estimatedFlushableMemorySize() != null) {
if (bytes == -1) {
bytes = 0;
}
bytes += shard.estimatedFlushableMemorySize().bytes();
}
}
if (bytes == -1) {
return null;
}
return new ByteSizeValue(bytes);
}

public ByteSizeValue getEstimatedFlushableMemorySize() {
return estimatedFlushableMemorySize();
}

public long translogOperations() {
long translogOperations = -1;
for (IndexShardStatus shard : this) {
Expand All @@ -176,7 +156,12 @@ public long getTranslogOperations() {
return translogOperations();
}

private transient Docs docs;

public Docs docs() {
if (docs != null) {
return docs;
}
Docs docs = new Docs();
for (IndexShardStatus shard : this) {
if (shard.docs().numDocs() != -1) {
Expand All @@ -198,6 +183,11 @@ public Docs docs() {
docs.deletedDocs += shard.docs().deletedDocs();
}
}
if (docs.numDocs == -1) {
this.docs = Docs.UNKNOWN;
} else {
this.docs = docs;
}
return docs;
}

Expand Down
Expand Up @@ -19,11 +19,13 @@

package org.elasticsearch.action.admin.indices.status;

import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.IndexShardState;

import java.io.IOException;
Expand Down Expand Up @@ -68,20 +70,161 @@ public int getDeletedDocs() {
}
}

public static class PeerRecoveryStatus {

public enum Stage {
INIT((byte) 0),
RETRY((byte) 1),
FILES((byte) 2),
TRANSLOG((byte) 3),
FINALIZE((byte) 4),
DONE((byte) 5);

private final byte value;

Stage(byte value) {
this.value = value;
}

public byte value() {
return value;
}

public static Stage fromValue(byte value) {
if (value == 0) {
return INIT;
} else if (value == 1) {
return RETRY;
} else if (value == 2) {
return FILES;
} else if (value == 3) {
return TRANSLOG;
} else if (value == 4) {
return FINALIZE;
} else if (value == 5) {
return DONE;
}
throw new ElasticSearchIllegalArgumentException("No stage found for [" + value + ']');
}
}

final Stage stage;

final long startTime;

final long took;

final long retryTime;

final long indexSize;

final long reusedIndexSize;

final long recoveredIndexSize;

final long recoveredTranslogOperations;

public PeerRecoveryStatus(Stage stage, long startTime, long took, long retryTime, long indexSize, long reusedIndexSize,
long recoveredIndexSize, long recoveredTranslogOperations) {
this.stage = stage;
this.startTime = startTime;
this.took = took;
this.retryTime = retryTime;
this.indexSize = indexSize;
this.reusedIndexSize = reusedIndexSize;
this.recoveredIndexSize = recoveredIndexSize;
this.recoveredTranslogOperations = recoveredTranslogOperations;
}

public Stage stage() {
return this.stage;
}

public long startTime() {
return this.startTime;
}

public long getStartTime() {
return this.startTime;
}

public TimeValue took() {
return TimeValue.timeValueMillis(took);
}

public TimeValue getTook() {
return took();
}

public TimeValue retryTime() {
return TimeValue.timeValueMillis(retryTime);
}

public TimeValue getRetryTime() {
return retryTime();
}

public ByteSizeValue indexSize() {
return new ByteSizeValue(indexSize);
}

public ByteSizeValue getIndexSize() {
return indexSize();
}

public ByteSizeValue reusedIndexSize() {
return new ByteSizeValue(reusedIndexSize);
}

public ByteSizeValue getReusedIndexSize() {
return reusedIndexSize();
}

public ByteSizeValue expectedRecoveredIndexSize() {
return new ByteSizeValue(indexSize - reusedIndexSize);
}

public ByteSizeValue getExpectedRecoveredIndexSize() {
return expectedRecoveredIndexSize();
}

/**
* How much of the index has been recovered.
*/
public ByteSizeValue recoveredIndexSize() {
return new ByteSizeValue(recoveredIndexSize);
}

/**
* How much of the index has been recovered.
*/
public ByteSizeValue getRecoveredIndexSize() {
return recoveredIndexSize();
}

public long recoveredTranslogOperations() {
return recoveredTranslogOperations;
}

public long getRecoveredTranslogOperations() {
return recoveredTranslogOperations();
}
}

private ShardRouting shardRouting;

IndexShardState state;

ByteSizeValue storeSize;

ByteSizeValue estimatedFlushableMemorySize;

long translogId = -1;

long translogOperations = -1;

Docs docs = Docs.UNKNOWN;

PeerRecoveryStatus peerRecoveryStatus;

ShardStatus() {
}

Expand Down Expand Up @@ -114,14 +257,6 @@ public ByteSizeValue getStoreSize() {
return storeSize();
}

public ByteSizeValue estimatedFlushableMemorySize() {
return estimatedFlushableMemorySize;
}

public ByteSizeValue getEstimatedFlushableMemorySize() {
return estimatedFlushableMemorySize();
}

public long translogId() {
return translogId;
}
Expand All @@ -146,6 +281,14 @@ public Docs getDocs() {
return docs();
}

public PeerRecoveryStatus peerRecoveryStatus() {
return peerRecoveryStatus;
}

public PeerRecoveryStatus getPeerRecoveryStatus() {
return peerRecoveryStatus();
}

public static ShardStatus readIndexShardStatus(StreamInput in) throws IOException {
ShardStatus shardStatus = new ShardStatus();
shardStatus.readFrom(in);
Expand All @@ -162,17 +305,29 @@ public static ShardStatus readIndexShardStatus(StreamInput in) throws IOExceptio
out.writeBoolean(true);
storeSize.writeTo(out);
}
if (estimatedFlushableMemorySize == null) {
out.writeLong(translogId);
out.writeLong(translogOperations);
if (docs == Docs.UNKNOWN) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
estimatedFlushableMemorySize.writeTo(out);
out.writeInt(docs.numDocs());
out.writeInt(docs.maxDoc());
out.writeInt(docs.deletedDocs());
}
if (peerRecoveryStatus == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeByte(peerRecoveryStatus.stage.value);
out.writeVLong(peerRecoveryStatus.startTime);
out.writeVLong(peerRecoveryStatus.took);
out.writeVLong(peerRecoveryStatus.retryTime);
out.writeVLong(peerRecoveryStatus.indexSize);
out.writeVLong(peerRecoveryStatus.reusedIndexSize);
out.writeVLong(peerRecoveryStatus.recoveredIndexSize);
out.writeVLong(peerRecoveryStatus.recoveredTranslogOperations);
}
out.writeLong(translogId);
out.writeLong(translogOperations);
out.writeInt(docs.numDocs());
out.writeInt(docs.maxDoc());
out.writeInt(docs.deletedDocs());
}

@Override public void readFrom(StreamInput in) throws IOException {
Expand All @@ -182,14 +337,17 @@ public static ShardStatus readIndexShardStatus(StreamInput in) throws IOExceptio
if (in.readBoolean()) {
storeSize = readBytesSizeValue(in);
}
if (in.readBoolean()) {
estimatedFlushableMemorySize = readBytesSizeValue(in);
}
translogId = in.readLong();
translogOperations = in.readLong();
docs = new Docs();
docs.numDocs = in.readInt();
docs.maxDoc = in.readInt();
docs.deletedDocs = in.readInt();
if (in.readBoolean()) {
docs = new Docs();
docs.numDocs = in.readInt();
docs.maxDoc = in.readInt();
docs.deletedDocs = in.readInt();
}
if (in.readBoolean()) {
peerRecoveryStatus = new PeerRecoveryStatus(PeerRecoveryStatus.Stage.fromValue(in.readByte()),
in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
}
}
}

0 comments on commit 5fb80c3

Please sign in to comment.