Skip to content

Commit

Permalink
Snapshot restore operations throttle more than specified
Browse files Browse the repository at this point in the history
Lucene's RateLimiter can do too much sleeping on small values (see also elastic#6018).
The issue here is that calls to "pause" are not properly guarded in "restoreFile".

Instead of simply adding the guard, this commit uses the RateLimitingInputStream similar as for "snapshotFile".

Closes elastic#13828
  • Loading branch information
Yannick Welsch committed Oct 3, 2015
1 parent 148265b commit 03a4e22
Showing 1 changed file with 12 additions and 4 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterService;
Expand Down Expand Up @@ -93,6 +94,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements

private RateLimitingInputStream.Listener snapshotThrottleListener;

private RateLimitingInputStream.Listener restoreThrottleListener;

private boolean compress;

private final ParseFieldMatcher parseFieldMatcher;
Expand Down Expand Up @@ -147,6 +150,7 @@ public void initialize(BlobStore blobStore, BlobPath basePath, ByteSizeValue chu
this.restoreRateLimiter = restoreRateLimiter;
this.rateLimiterListener = rateLimiterListener;
this.snapshotThrottleListener = nanos -> rateLimiterListener.onSnapshotPause(nanos);
this.restoreThrottleListener = nanos -> rateLimiterListener.onRestorePause(nanos);
this.compress = compress;
indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher, isCompress());
indexShardSnapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher);
Expand Down Expand Up @@ -890,16 +894,20 @@ public void restore() throws IOException {
*/
private void restoreFile(final FileInfo fileInfo) throws IOException {
boolean success = false;
try (InputStream stream = new PartSliceStream(blobContainer, fileInfo)) {

try (InputStream partSliceStream = new PartSliceStream(blobContainer, fileInfo)) {
final InputStream stream;
if (restoreRateLimiter == null) {
stream = partSliceStream;
} else {
stream = new RateLimitingInputStream(partSliceStream, restoreRateLimiter, restoreThrottleListener);
}
try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
final byte[] buffer = new byte[BUFFER_SIZE];
int length;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, 0, length);
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length);
if (restoreRateLimiter != null) {
rateLimiterListener.onRestorePause(restoreRateLimiter.pause(length));
}
}
Store.verify(indexOutput);
indexOutput.close();
Expand Down

0 comments on commit 03a4e22

Please sign in to comment.