Skip to content

Commit

Permalink
Recovery: add total operations to the _recovery API
Browse files Browse the repository at this point in the history
This commit adds the current total number of translog operations to the recovery reporting API.  We also expose the recovered / total percentage:

```
"translog": {
     "recovered": 536,
     "total": 986,
     "percent": "54.3%",
     "total_time": "2ms",
     "total_time_in_millis": 2
},
```

Closes elastic#9368
  • Loading branch information
bleskes committed Mar 9, 2015
1 parent 61e07ad commit ebbd35c
Show file tree
Hide file tree
Showing 18 changed files with 167 additions and 43 deletions.
2 changes: 2 additions & 0 deletions docs/reference/indices/recovery.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ coming[1.5.0, this syntax was change to fix inconsistencies with other API]
},
"translog" : {
"recovered" : 0,
"total" : 0,
"percent" : "100.0%",
"total_time" : "0s",
"total_time_in_millis" : 0
},
Expand Down
3 changes: 3 additions & 0 deletions rest-api-spec/test/cat.recovery/10_basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
\d+\.\d+% \s+ # bytes_percent
\d+ \s+ # total_files
\d+ \s+ # total_bytes
\d+ \s+ # translog
-?\d+\.\d+% \s+ # translog_percent
-?\d+ \s+ # total_translog
\n
)+
$/
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ protected ShardStatus shardOperation(IndexShardStatusRequest request) throws Ela
recoveryState.getTimer().time(),
index.totalBytes(),
index.reusedBytes(),
index.recoveredBytes(), recoveryState.getTranslog().currentTranslogOperations());
index.recoveredBytes(), recoveryState.getTranslog().recoveredOperations());
} else if (recoveryState.getType() == RecoveryState.Type.GATEWAY) {
GatewayRecoveryStatus.Stage stage;
switch (recoveryState.getStage()) {
Expand All @@ -222,7 +222,7 @@ protected ShardStatus shardOperation(IndexShardStatusRequest request) throws Ela
stage = GatewayRecoveryStatus.Stage.INIT;
}
shardStatus.gatewayRecoveryStatus = new GatewayRecoveryStatus(stage, recoveryState.getTimer().startTime(), recoveryState.getTimer().time(),
index.totalBytes(), index.reusedBytes(), index.recoveredBytes(), recoveryState.getTranslog().currentTranslogOperations());
index.totalBytes(), index.reusedBytes(), index.recoveredBytes(), recoveryState.getTranslog().recoveredOperations());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void run() {
.append(new ByteSizeValue(index.reusedBytes())).append("]\n");
sb.append(" start : took [").append(TimeValue.timeValueMillis(recoveryState.getStart().time())).append("], check_index [")
.append(timeValueMillis(recoveryState.getStart().checkIndexTime())).append("]\n");
sb.append(" translog : number_of_operations [").append(recoveryState.getTranslog().currentTranslogOperations())
sb.append(" translog : number_of_operations [").append(recoveryState.getTranslog().recoveredOperations())
.append("], took [").append(TimeValue.timeValueMillis(recoveryState.getTranslog().time())).append("]");
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -229,6 +228,7 @@ public void recover(boolean indexShouldExists, RecoveryState recoveryState) thro
if (recoveringTranslogFile == null || !recoveringTranslogFile.exists()) {
// no translog to recovery from, start and bail
// no translog files, bail
recoveryState.getTranslog().totalOperations(0);
indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from gateway, no translog");
// no index, just start the shard and bail
Expand Down Expand Up @@ -273,7 +273,7 @@ public void recover(boolean indexShouldExists, RecoveryState recoveryState) thro
typesToUpdate.add(potentialIndexOperation.docMapper().type());
}
}
recoveryState.getTranslog().addTranslogOperations(1);
recoveryState.getTranslog().incrementRecoveredOperations();
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.BAD_REQUEST) {
// mainly for MapperParsingException and Failure to detect xcontent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void restore(final RecoveryState recoveryState) {
logger.trace("[{}] restoring shard [{}]", restoreSource.snapshotId(), shardId);
}
try {
recoveryState.getTranslog().totalOperations(0);
indexShard.prepareForIndexRecovery();
IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
ShardId snapshotShardId = shardId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,16 @@ class RecoveryCleanFilesRequest extends TransportRequest {
private ShardId shardId;
private Set<String> legacySnapshotFiles; // legacy - we moved to a real snapshot in 1.5
private Store.MetadataSnapshot snapshotFiles;
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;

RecoveryCleanFilesRequest() {
}

RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Store.MetadataSnapshot snapshotFiles) {
RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Store.MetadataSnapshot snapshotFiles, int totalTranslogOps) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.snapshotFiles = snapshotFiles;
this.totalTranslogOps = totalTranslogOps;
}

public long recoveryId() {
Expand All @@ -65,6 +67,7 @@ public void readFrom(StreamInput in) throws IOException {
shardId = ShardId.readShardId(in);
if (in.getVersion().onOrAfter(Version.V_1_5_0)) {
snapshotFiles = Store.MetadataSnapshot.read(in);
totalTranslogOps = in.readVInt();
} else {
int size = in.readVInt();
legacySnapshotFiles = Sets.newHashSetWithExpectedSize(size);
Expand All @@ -82,6 +85,7 @@ public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_1_5_0)) {
snapshotFiles.writeTo(out);
out.writeVInt(totalTranslogOps);
} else {
out.writeVInt(snapshotFiles.size());
for (StoreFileMetaData snapshotFile : snapshotFiles) {
Expand All @@ -98,4 +102,8 @@ public Store.MetadataSnapshot sourceMetaSnapshot() {
public Set<String> legacySnapshotFiles() {
return legacySnapshotFiles;
}

public int totalTranslogOps() {
return totalTranslogOps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,20 @@ public final class RecoveryFileChunkRequest extends TransportRequest { // publi
private BytesReference content;
private StoreFileMetaData metaData;

private int totalTranslogOps;

RecoveryFileChunkRequest() {
}

public RecoveryFileChunkRequest(long recoveryId, ShardId shardId, StoreFileMetaData metaData, long position, BytesReference content, boolean lastChunk) {
public RecoveryFileChunkRequest(long recoveryId, ShardId shardId, StoreFileMetaData metaData, long position, BytesReference content,
boolean lastChunk, int totalTranslogOps) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.metaData = metaData;
this.position = position;
this.content = content;
this.lastChunk = lastChunk;
this.totalTranslogOps = totalTranslogOps;
}

public long recoveryId() {
Expand Down Expand Up @@ -83,6 +87,10 @@ public BytesReference content() {
return content;
}

public int totalTranslogOps() {
return totalTranslogOps;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -104,6 +112,12 @@ public void readFrom(StreamInput in) throws IOException {
} else {
lastChunk = false;
}

if (in.getVersion().onOrAfter(org.elasticsearch.Version.V_1_5_0)) {
totalTranslogOps = in.readVInt();
} else {
totalTranslogOps = RecoveryState.Translog.UNKNOWN;
}
}

@Override
Expand All @@ -122,6 +136,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(org.elasticsearch.Version.V_1_4_0_Beta1)) {
out.writeBoolean(lastChunk);
}
if (out.getVersion().onOrAfter(org.elasticsearch.Version.V_1_5_0)) {
out.writeVInt(totalTranslogOps);
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class RecoveryFilesInfoRequest extends TransportRequest {
List<String> phase1ExistingFileNames;
List<Long> phase1ExistingFileSizes;

int totalTranslogOps;

@Deprecated
long phase1TotalSize;

Expand All @@ -52,7 +54,7 @@ class RecoveryFilesInfoRequest extends TransportRequest {
}

RecoveryFilesInfoRequest(long recoveryId, ShardId shardId, List<String> phase1FileNames, List<Long> phase1FileSizes,
List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes,
List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes, int totalTranslogOps,
// needed for BWC only
@Deprecated long phase1TotalSize, @Deprecated long phase1ExistingTotalSize) {
this.recoveryId = recoveryId;
Expand All @@ -61,6 +63,7 @@ class RecoveryFilesInfoRequest extends TransportRequest {
this.phase1FileSizes = phase1FileSizes;
this.phase1ExistingFileNames = phase1ExistingFileNames;
this.phase1ExistingFileSizes = phase1ExistingFileSizes;
this.totalTranslogOps = totalTranslogOps;
this.phase1TotalSize = phase1TotalSize;
this.phase1ExistingTotalSize = phase1ExistingTotalSize;
}
Expand Down Expand Up @@ -107,6 +110,9 @@ public void readFrom(StreamInput in) throws IOException {
in.readVLong();
//phase1ExistingTotalSize
in.readVLong();
totalTranslogOps = RecoveryState.Translog.UNKNOWN;
} else {
totalTranslogOps = in.readVInt();
}
}

Expand Down Expand Up @@ -139,6 +145,8 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_1_5_0)) {
out.writeVLong(phase1TotalSize);
out.writeVLong(phase1ExistingTotalSize);
} else {
out.writeVInt(totalTranslogOps);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.indices.recovery;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -33,13 +34,15 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {

private long recoveryId;
private ShardId shardId;
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;

RecoveryPrepareForTranslogOperationsRequest() {
}

RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId) {
RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.totalTranslogOps = totalTranslogOps;
}

public long recoveryId() {
Expand All @@ -50,17 +53,28 @@ public ShardId shardId() {
return shardId;
}

public int totalTranslogOps() {
return totalTranslogOps;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
if (in.getVersion().onOrAfter(Version.V_1_5_0)) {
totalTranslogOps = in.readVInt();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_1_5_0)) {
out.writeVInt(totalTranslogOps);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ public void phase1(final SnapshotIndexCommit snapshot) throws ElasticsearchExcep
public void run() throws InterruptedException {
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(),
response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes,
shard.translog().estimatedNumberOfOperations(),
response.phase1TotalSize, response.phase1ExistingTotalSize);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest,
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
Expand Down Expand Up @@ -288,7 +289,8 @@ protected void doRun() {
public void run() throws InterruptedException {
// Actually send the file chunk to the target node, waiting for it to complete
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK,
new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content, lastChunk),
new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content,
lastChunk, shard.translog().estimatedNumberOfOperations()),
requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
Expand Down Expand Up @@ -350,7 +352,7 @@ public void run() throws InterruptedException {
// are deleted
try {
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata),
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, shard.translog().estimatedNumberOfOperations()),
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
} catch (RemoteTransportException remoteException) {
Expand Down Expand Up @@ -427,7 +429,7 @@ public void run() throws InterruptedException {
// operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId()),
new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId(), shard.translog().estimatedNumberOfOperations()),
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
Expand Down Expand Up @@ -616,7 +618,8 @@ protected int sendSnapshot(Translog.Snapshot snapshot) throws ElasticsearchExcep
cancellableThreads.execute(new Interruptable() {
@Override
public void run() throws InterruptedException {
final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations);
final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
request.recoveryId(), request.shardId(), operations, shard.translog().estimatedNumberOfOperations());
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
Expand All @@ -633,7 +636,8 @@ public void run() throws InterruptedException {
cancellableThreads.execute(new Interruptable() {
@Override
public void run() throws InterruptedException {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations);
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
request.recoveryId(), request.shardId(), operations, shard.translog().estimatedNumberOfOperations());
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
Expand Down

0 comments on commit ebbd35c

Please sign in to comment.