Skip to content

Commit

Permalink
Snapshot restore operations throttle more than specified
Browse files Browse the repository at this point in the history
Lucene's RateLimiter can do too much sleeping on small values (see also #6018).
The issue here is that calls to "pause" are not properly guarded in "restoreFile".

Instead of simply adding the guard, this commit uses the RateLimitingInputStream similar as for "snapshotFile".

Closes #13828
  • Loading branch information
Yannick Welsch committed Oct 3, 2015
1 parent 2b44dab commit e96e447
Showing 1 changed file with 17 additions and 4 deletions.
Expand Up @@ -30,6 +30,7 @@
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterService;
Expand Down Expand Up @@ -105,6 +106,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements

private RateLimitingInputStream.Listener snapshotThrottleListener;

private RateLimitingInputStream.Listener restoreThrottleListener;

private boolean compress;

private final ParseFieldMatcher parseFieldMatcher;
Expand Down Expand Up @@ -164,6 +167,12 @@ public void onPause(long nanos) {
rateLimiterListener.onSnapshotPause(nanos);
}
};
this.restoreThrottleListener = new RateLimitingInputStream.Listener() {
@Override
public void onPause(long nanos) {
rateLimiterListener.onRestorePause(nanos);
}
};
this.compress = compress;
indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher, isCompress());
indexShardSnapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher);
Expand Down Expand Up @@ -907,16 +916,20 @@ public void restore() throws IOException {
*/
private void restoreFile(final FileInfo fileInfo) throws IOException {
boolean success = false;
try (InputStream stream = new PartSliceStream(blobContainer, fileInfo)) {

try (InputStream partSliceStream = new PartSliceStream(blobContainer, fileInfo)) {
final InputStream stream;
if (restoreRateLimiter == null) {
stream = partSliceStream;
} else {
stream = new RateLimitingInputStream(partSliceStream, restoreRateLimiter, restoreThrottleListener);
}
try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
final byte[] buffer = new byte[BUFFER_SIZE];
int length;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, 0, length);
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length);
if (restoreRateLimiter != null) {
rateLimiterListener.onRestorePause(restoreRateLimiter.pause(length));
}
}
Store.verify(indexOutput);
indexOutput.close();
Expand Down

0 comments on commit e96e447

Please sign in to comment.