From d3e859a11c33d89a5d048c0172c6f2e5e91e97d3 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Fri, 25 Aug 2023 19:39:01 +0530 Subject: [PATCH] Rate Limiter integration for remote transfer (#9448) * Rate Limiter integration for remote transfer, introduces repository settings to rate limit remote store uploads and downloads Signed-off-by: Bukhtawar Khan --- CHANGELOG.md | 1 + .../opensearch/remotestore/RemoteStoreIT.java | 2 +- .../remotestore/RemoteStoreRestoreIT.java | 41 +++++ .../multipart/RemoteStoreMultipartIT.java | 46 ++++++ .../org/opensearch/common/StreamLimiter.java | 56 +++++++ .../RateLimitingOffsetRangeInputStream.java | 83 +++++++++++ .../blobstore/RateLimitingInputStream.java | 39 +---- .../index/store/RemoteDirectory.java | 141 +++++++++++++++++- .../store/RemoteSegmentStoreDirectory.java | 128 ++-------------- .../RemoteSegmentStoreDirectoryFactory.java | 20 ++- .../repositories/FilterRepository.java | 10 ++ .../opensearch/repositories/Repository.java | 10 ++ .../blobstore/BlobStoreRepository.java | 105 ++++++++++++- ...teLimitingOffsetRangeInputStreamTests.java | 46 ++++++ .../index/store/RemoteDirectoryTests.java | 89 +++++++++++ .../RemoteSegmentStoreDirectoryTests.java | 12 +- .../RepositoriesServiceTests.java | 10 ++ .../index/shard/RestoreOnlyRepository.java | 10 ++ 18 files changed, 686 insertions(+), 163 deletions(-) create mode 100644 server/src/main/java/org/opensearch/common/StreamLimiter.java create mode 100644 server/src/main/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStream.java create mode 100644 server/src/test/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStreamTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index e2b97998159cb..44569a87f992a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -156,6 +156,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix sort related ITs for concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9466) - Removing the vec file extension from INDEX_STORE_HYBRID_NIO_EXTENSIONS, to ensure the no performance degradation for vector search via Lucene Engine.([#9528](https://github.com/opensearch-project/OpenSearch/pull/9528))) - Separate request-based and settings-based concurrent segment search controls and introduce AggregatorFactory method to determine concurrent search support ([#9469](https://github.com/opensearch-project/OpenSearch/pull/9469)) +- [Remote Store] Rate limiter integration for remote store uploads and downloads([#9448](https://github.com/opensearch-project/OpenSearch/pull/9448/)) ### Deprecated diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 87ec515ffe740..9a2948861e967 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -38,7 +38,7 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0) public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase { - private static final String INDEX_NAME = "remote-store-test-idx-1"; + protected final String INDEX_NAME = "remote-store-test-idx-1"; @Override protected Collection> nodePlugins() { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java index e9d8933961073..60d7eefbb6d9b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java @@ -14,7 +14,10 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; @@ -26,9 +29,11 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.greaterThan; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0) public class RemoteStoreRestoreIT extends RemoteStoreBaseIntegTestCase { @@ -450,5 +455,41 @@ public void testRTSRestoreDataOnlyInTranslog() throws IOException { testRestoreFlow(0, true, randomIntBetween(1, 5)); } + public void testRateLimitedRemoteDownloads() throws Exception { + assertAcked( + client().admin() + .cluster() + .preparePutRepository(REPOSITORY_NAME) + .setType("fs") + .setSettings( + Settings.builder() + .put("location", randomRepoPath()) + .put("compress", randomBoolean()) + .put("max_remote_download_bytes_per_sec", "2kb") + .put("chunk_size", 200, ByteSizeUnit.BYTES) + + ) + ); + int shardCount = randomIntBetween(1, 3); + prepareCluster(0, 3, INDEX_NAME, 0, shardCount); + Map indexStats = indexData(5, false, INDEX_NAME); + assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); + ensureRed(INDEX_NAME); + restore(INDEX_NAME); + assertBusy(() -> { + long downloadPauseTime = 0L; + for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { + downloadPauseTime += repositoriesService.repository(REPOSITORY_NAME).getRemoteDownloadThrottleTimeInNanos(); + } + assertThat(downloadPauseTime, greaterThan(TimeValue.timeValueSeconds(randomIntBetween(5, 10)).nanos())); + }, 30, TimeUnit.SECONDS); + ensureGreen(INDEX_NAME); + // This is required to get updated number from already active shards which were not restored + assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); + assertEquals(0, getNumShards(INDEX_NAME).numReplicas); + verifyRestoredData(indexStats, INDEX_NAME); + } + // TODO: Restore flow - index aliases } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java index a523d5c0f5470..842a576a92a38 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java @@ -8,17 +8,24 @@ package org.opensearch.remotestore.multipart; +import org.opensearch.client.Client; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.plugins.Plugin; import org.opensearch.remotestore.RemoteStoreIT; import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; +import org.opensearch.repositories.RepositoriesService; import java.nio.file.Path; import java.util.Collection; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; public class RemoteStoreMultipartIT extends RemoteStoreIT { @@ -35,4 +42,43 @@ protected void putRepository(Path path) { .setSettings(Settings.builder().put("location", path)) ); } + + public void testRateLimitedRemoteUploads() throws Exception { + internalCluster().startDataOnlyNodes(1); + Client client = client(); + logger.info("--> updating repository"); + Path repositoryLocation = randomRepoPath(); + assertAcked( + client.admin() + .cluster() + .preparePutRepository(REPOSITORY_NAME) + .setType(MockFsRepositoryPlugin.TYPE) + .setSettings( + Settings.builder() + .put("location", repositoryLocation) + .put("compress", randomBoolean()) + .put("max_remote_upload_bytes_per_sec", "1kb") + .put("chunk_size", 100, ByteSizeUnit.BYTES) + ) + ); + + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + ensureGreen(); + + logger.info("--> indexing some data"); + for (int i = 0; i < 10; i++) { + index(INDEX_NAME, "_doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + // check if throttling is active + assertBusy(() -> { + long uploadPauseTime = 0L; + for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { + uploadPauseTime += repositoriesService.repository(REPOSITORY_NAME).getRemoteUploadThrottleTimeInNanos(); + } + assertThat(uploadPauseTime, greaterThan(TimeValue.timeValueSeconds(randomIntBetween(5, 10)).nanos())); + }, 30, TimeUnit.SECONDS); + + assertThat(client.prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value, equalTo(10L)); + } } diff --git a/server/src/main/java/org/opensearch/common/StreamLimiter.java b/server/src/main/java/org/opensearch/common/StreamLimiter.java new file mode 100644 index 0000000000000..ec203a1c30868 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/StreamLimiter.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common; + +import org.apache.lucene.store.RateLimiter; + +import java.io.IOException; +import java.util.function.Supplier; + +/** + * The stream limiter that limits the transfer of bytes + * + * @opensearch.internal + */ +public class StreamLimiter { + + private final Supplier rateLimiterSupplier; + + private final StreamLimiter.Listener listener; + + private int bytesSinceLastRateLimit; + + public StreamLimiter(Supplier rateLimiterSupplier, Listener listener) { + this.rateLimiterSupplier = rateLimiterSupplier; + this.listener = listener; + } + + public void maybePause(int bytes) throws IOException { + bytesSinceLastRateLimit += bytes; + final RateLimiter rateLimiter = rateLimiterSupplier.get(); + if (rateLimiter != null) { + if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) { + long pause = rateLimiter.pause(bytesSinceLastRateLimit); + bytesSinceLastRateLimit = 0; + if (pause > 0) { + listener.onPause(pause); + } + } + } + } + + /** + * Internal listener + * + * @opensearch.internal + */ + public interface Listener { + void onPause(long nanos); + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStream.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStream.java new file mode 100644 index 0000000000000..b455999bbed0c --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStream.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer.stream; + +import org.apache.lucene.store.RateLimiter; +import org.opensearch.common.StreamLimiter; + +import java.io.IOException; +import java.util.function.Supplier; + +/** + * Rate Limits an {@link OffsetRangeInputStream} + * + * @opensearch.internal + */ +public class RateLimitingOffsetRangeInputStream extends OffsetRangeInputStream { + + private final StreamLimiter streamLimiter; + + private final OffsetRangeInputStream delegate; + + /** + * The ctor for RateLimitingOffsetRangeInputStream + * @param delegate the underlying {@link OffsetRangeInputStream} + * @param rateLimiterSupplier the supplier for {@link RateLimiter} + * @param listener the listener to be invoked on rate limits + */ + public RateLimitingOffsetRangeInputStream( + OffsetRangeInputStream delegate, + Supplier rateLimiterSupplier, + StreamLimiter.Listener listener + ) { + this.streamLimiter = new StreamLimiter(rateLimiterSupplier, listener); + this.delegate = delegate; + } + + @Override + public int read() throws IOException { + int b = delegate.read(); + streamLimiter.maybePause(1); + return b; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int n = delegate.read(b, off, len); + if (n > 0) { + streamLimiter.maybePause(n); + } + return n; + } + + @Override + public synchronized void mark(int readlimit) { + delegate.mark(readlimit); + } + + @Override + public boolean markSupported() { + return delegate.markSupported(); + } + + @Override + public long getFilePointer() throws IOException { + return delegate.getFilePointer(); + } + + @Override + public synchronized void reset() throws IOException { + delegate.reset(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } +} diff --git a/server/src/main/java/org/opensearch/index/snapshots/blobstore/RateLimitingInputStream.java b/server/src/main/java/org/opensearch/index/snapshots/blobstore/RateLimitingInputStream.java index 86ecef1173e48..ee601f96ecee1 100644 --- a/server/src/main/java/org/opensearch/index/snapshots/blobstore/RateLimitingInputStream.java +++ b/server/src/main/java/org/opensearch/index/snapshots/blobstore/RateLimitingInputStream.java @@ -33,6 +33,7 @@ package org.opensearch.index.snapshots.blobstore; import org.apache.lucene.store.RateLimiter; +import org.opensearch.common.StreamLimiter; import java.io.FilterInputStream; import java.io.IOException; @@ -46,45 +47,17 @@ */ public class RateLimitingInputStream extends FilterInputStream { - private final Supplier rateLimiterSupplier; + private final StreamLimiter streamLimiter; - private final Listener listener; - - private long bytesSinceLastRateLimit; - - /** - * Internal listener - * - * @opensearch.internal - */ - public interface Listener { - void onPause(long nanos); - } - - public RateLimitingInputStream(InputStream delegate, Supplier rateLimiterSupplier, Listener listener) { + public RateLimitingInputStream(InputStream delegate, Supplier rateLimiterSupplier, StreamLimiter.Listener listener) { super(delegate); - this.rateLimiterSupplier = rateLimiterSupplier; - this.listener = listener; - } - - private void maybePause(int bytes) throws IOException { - bytesSinceLastRateLimit += bytes; - final RateLimiter rateLimiter = rateLimiterSupplier.get(); - if (rateLimiter != null) { - if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) { - long pause = rateLimiter.pause(bytesSinceLastRateLimit); - bytesSinceLastRateLimit = 0; - if (pause > 0) { - listener.onPause(pause); - } - } - } + this.streamLimiter = new StreamLimiter(rateLimiterSupplier, listener); } @Override public int read() throws IOException { int b = super.read(); - maybePause(1); + streamLimiter.maybePause(1); return b; } @@ -92,7 +65,7 @@ public int read() throws IOException { public int read(byte[] b, int off, int len) throws IOException { int n = super.read(b, off, len); if (n > 0) { - maybePause(n); + streamLimiter.maybePause(n); } return n; } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index a5e02a5baed69..04b5d7eb7c6bd 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -8,15 +8,30 @@ package org.opensearch.index.store; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.Lock; +import org.opensearch.ExceptionsHelper; import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; +import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer; +import org.opensearch.common.blobstore.exception.CorruptFileException; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; +import org.opensearch.common.util.ByteUtils; import org.opensearch.core.action.ActionListener; +import org.opensearch.index.store.exception.ChecksumCombinationException; import java.io.FileNotFoundException; import java.io.IOException; @@ -30,7 +45,11 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; +import java.util.zip.CRC32; + +import com.jcraft.jzlib.JZlib; /** * A {@code RemoteDirectory} provides an abstraction layer for storing a list of files to a remote store. @@ -45,12 +64,33 @@ public class RemoteDirectory extends Directory { protected final BlobContainer blobContainer; + protected final UnaryOperator uploadRateLimiter; + + protected final UnaryOperator downloadRateLimiter; + + /** + * Number of bytes in the segment file to store checksum + */ + private static final int SEGMENT_CHECKSUM_BYTES = 8; + + private static final Logger logger = LogManager.getLogger(RemoteDirectory.class); + public BlobContainer getBlobContainer() { return blobContainer; } public RemoteDirectory(BlobContainer blobContainer) { + this(blobContainer, UnaryOperator.identity(), UnaryOperator.identity()); + } + + public RemoteDirectory( + BlobContainer blobContainer, + UnaryOperator uploadRateLimiter, + UnaryOperator downloadRateLimiter + ) { this.blobContainer = blobContainer; + this.uploadRateLimiter = uploadRateLimiter; + this.downloadRateLimiter = downloadRateLimiter; } /** @@ -149,7 +189,7 @@ public IndexInput openInput(String name, IOContext context) throws IOException { InputStream inputStream = null; try { inputStream = blobContainer.readBlob(name); - return new RemoteIndexInput(name, inputStream, fileLength(name)); + return new RemoteIndexInput(name, downloadRateLimiter.apply(inputStream), fileLength(name)); } catch (Exception e) { // Incase the RemoteIndexInput creation fails, close the input stream to avoid file handler leak. if (inputStream != null) inputStream.close(); @@ -259,4 +299,103 @@ public Lock obtainLock(String name) throws IOException { public void delete() throws IOException { blobContainer.delete(); } + + public boolean copyFrom( + Directory from, + String src, + String remoteFileName, + IOContext context, + Runnable postUploadRunner, + ActionListener listener + ) { + if (blobContainer instanceof VerifyingMultiStreamBlobContainer) { + try { + uploadBlob(from, src, remoteFileName, context, postUploadRunner, listener); + } catch (Exception e) { + listener.onFailure(e); + } + return true; + } + return false; + } + + private void uploadBlob( + Directory from, + String src, + String remoteFileName, + IOContext ioContext, + Runnable postUploadRunner, + ActionListener listener + ) throws Exception { + long expectedChecksum = calculateChecksumOfChecksum(from, src); + long contentLength; + try (IndexInput indexInput = from.openInput(src, ioContext)) { + contentLength = indexInput.length(); + } + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + src, + remoteFileName, + contentLength, + true, + WritePriority.NORMAL, + (size, position) -> uploadRateLimiter.apply(new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position)), + expectedChecksum, + this.getBlobContainer() instanceof VerifyingMultiStreamBlobContainer + ); + ActionListener completionListener = ActionListener.wrap(resp -> { + try { + postUploadRunner.run(); + listener.onResponse(null); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Exception in segment postUpload for file [{}]", src), e); + listener.onFailure(e); + } + }, ex -> { + logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", src), ex); + IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(ex); + if (corruptIndexException != null) { + listener.onFailure(corruptIndexException); + return; + } + Throwable throwable = ExceptionsHelper.unwrap(ex, CorruptFileException.class); + if (throwable != null) { + CorruptFileException corruptFileException = (CorruptFileException) throwable; + listener.onFailure(new CorruptIndexException(corruptFileException.getMessage(), corruptFileException.getFileName())); + return; + } + listener.onFailure(ex); + }); + + completionListener = ActionListener.runBefore(completionListener, () -> { + try { + remoteTransferContainer.close(); + } catch (Exception e) { + logger.warn("Error occurred while closing streams", e); + } + }); + + WriteContext writeContext = remoteTransferContainer.createWriteContext(); + ((VerifyingMultiStreamBlobContainer) blobContainer).asyncBlobUpload(writeContext, completionListener); + } + + private long calculateChecksumOfChecksum(Directory directory, String file) throws IOException { + try (IndexInput indexInput = directory.openInput(file, IOContext.DEFAULT)) { + long storedChecksum = CodecUtil.retrieveChecksum(indexInput); + CRC32 checksumOfChecksum = new CRC32(); + checksumOfChecksum.update(ByteUtils.toByteArrayBE(storedChecksum)); + try { + return JZlib.crc32_combine(storedChecksum, checksumOfChecksum.getValue(), SEGMENT_CHECKSUM_BYTES); + } catch (Exception e) { + throw new ChecksumCombinationException( + "Potentially corrupted file: Checksum combination failed while combining stored checksum " + + "and calculated checksum of stored checksum in segment file: " + + file + + ", directory: " + + directory, + file, + e + ); + } + } + } } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index b1077bef5b492..0f6ca2a61b67d 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; -import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentInfos; @@ -24,20 +23,11 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.Version; -import org.opensearch.ExceptionsHelper; import org.opensearch.common.UUIDs; -import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer; -import org.opensearch.common.blobstore.exception.CorruptFileException; -import org.opensearch.common.blobstore.stream.write.WriteContext; -import org.opensearch.common.blobstore.stream.write.WritePriority; -import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; -import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.lucene.store.ByteArrayIndexInput; -import org.opensearch.common.util.ByteUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.index.store.exception.ChecksumCombinationException; import org.opensearch.index.store.lockmanager.FileLockInfo; import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; @@ -60,9 +50,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import java.util.zip.CRC32; - -import com.jcraft.jzlib.JZlib; /** * A RemoteDirectory extension for remote segment store. We need to make sure we don't overwrite a segment file once uploaded. @@ -83,11 +70,6 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement */ public static final String SEGMENT_NAME_UUID_SEPARATOR = "__"; - /** - * Number of bytes in the segment file to store checksum - */ - private static final int SEGMENT_CHECKSUM_BYTES = 8; - /** * remoteDataDirectory is used to store segment files at path: cluster_UUID/index_UUID/shardId/segments/data */ @@ -433,77 +415,25 @@ public IndexInput openInput(String name, IOContext context) throws IOException { * @param listener Listener to handle upload callback events */ public void copyFrom(Directory from, String src, IOContext context, ActionListener listener) { - if (remoteDataDirectory.getBlobContainer() instanceof VerifyingMultiStreamBlobContainer) { - try { - String remoteFilename = getNewRemoteSegmentFilename(src); - uploadBlob(from, src, remoteFilename, context, listener); - } catch (Exception e) { - listener.onFailure(e); - } - } else { - try { + try { + final String remoteFileName = getNewRemoteSegmentFilename(src); + boolean uploaded = remoteDataDirectory.copyFrom(from, src, remoteFileName, context, () -> { + try { + postUpload(from, src, remoteFileName, getChecksumOfLocalFile(from, src)); + } catch (IOException e) { + throw new RuntimeException("Exception in segment postUpload for file " + src, e); + } + }, listener); + if (uploaded == false) { copyFrom(from, src, src, context); listener.onResponse(null); - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", src), e); - listener.onFailure(e); } + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", src), e); + listener.onFailure(e); } } - private void uploadBlob(Directory from, String src, String remoteFileName, IOContext ioContext, ActionListener listener) - throws Exception { - long expectedChecksum = calculateChecksumOfChecksum(from, src); - long contentLength; - try (IndexInput indexInput = from.openInput(src, ioContext)) { - contentLength = indexInput.length(); - } - RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( - src, - remoteFileName, - contentLength, - true, - WritePriority.NORMAL, - (size, position) -> new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position), - expectedChecksum, - remoteDataDirectory.getBlobContainer() instanceof VerifyingMultiStreamBlobContainer - ); - ActionListener completionListener = ActionListener.wrap(resp -> { - try { - postUpload(from, src, remoteFileName, getChecksumOfLocalFile(from, src)); - listener.onResponse(null); - } catch (Exception e) { - logger.error(() -> new ParameterizedMessage("Exception in segment postUpload for file [{}]", src), e); - listener.onFailure(e); - } - }, ex -> { - logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", src), ex); - IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(ex); - if (corruptIndexException != null) { - listener.onFailure(corruptIndexException); - return; - } - Throwable throwable = ExceptionsHelper.unwrap(ex, CorruptFileException.class); - if (throwable != null) { - CorruptFileException corruptFileException = (CorruptFileException) throwable; - listener.onFailure(new CorruptIndexException(corruptFileException.getMessage(), corruptFileException.getFileName())); - return; - } - listener.onFailure(ex); - }); - - completionListener = ActionListener.runBefore(completionListener, () -> { - try { - remoteTransferContainer.close(); - } catch (Exception e) { - logger.warn("Error occurred while closing streams", e); - } - }); - - WriteContext writeContext = remoteTransferContainer.createWriteContext(); - ((VerifyingMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer()).asyncBlobUpload(writeContext, completionListener); - } - /** * This acquires a lock on a given commit by creating a lock file in lock directory using {@code FileLockInfo} * @@ -579,13 +509,6 @@ String getMetadataFileForCommit(long primaryTerm, long generation) throws IOExce return metadataFiles.get(0); } - public void copyFrom(Directory from, String src, String dest, IOContext context, String checksum) throws IOException { - String remoteFilename; - remoteFilename = getNewRemoteSegmentFilename(dest); - remoteDataDirectory.copyFrom(from, src, remoteFilename, context); - postUpload(from, src, remoteFilename, checksum); - } - private void postUpload(Directory from, String src, String remoteFilename, String checksum) throws IOException { UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum, from.fileLength(src)); segmentsUploadedToRemoteStore.put(src, segmentMetadata); @@ -597,7 +520,9 @@ private void postUpload(Directory from, String src, String remoteFilename, Strin */ @Override public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException { - copyFrom(from, src, dest, context, getChecksumOfLocalFile(from, src)); + String remoteFilename = getNewRemoteSegmentFilename(dest); + remoteDataDirectory.copyFrom(from, src, remoteFilename, context); + postUpload(from, src, remoteFilename, getChecksumOfLocalFile(from, src)); } /** @@ -731,27 +656,6 @@ private String getChecksumOfLocalFile(Directory directory, String file) throws I } } - private long calculateChecksumOfChecksum(Directory directory, String file) throws IOException { - try (IndexInput indexInput = directory.openInput(file, IOContext.DEFAULT)) { - long storedChecksum = CodecUtil.retrieveChecksum(indexInput); - CRC32 checksumOfChecksum = new CRC32(); - checksumOfChecksum.update(ByteUtils.toByteArrayBE(storedChecksum)); - try { - return JZlib.crc32_combine(storedChecksum, checksumOfChecksum.getValue(), SEGMENT_CHECKSUM_BYTES); - } catch (Exception e) { - throw new ChecksumCombinationException( - "Potentially corrupted file: Checksum combination failed while combining stored checksum " - + "and calculated checksum of stored checksum in segment file: " - + file - + ", directory: " - + directory, - file, - e - ); - } - } - } - private String getExistingRemoteFilename(String localFilename) { if (segmentsUploadedToRemoteStore.containsKey(localFilename)) { return segmentsUploadedToRemoteStore.get(localFilename).uploadedFilename; diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index 3de7a706c0688..31b49f6813ad2 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -9,7 +9,6 @@ package org.opensearch.index.store; import org.apache.lucene.store.Directory; -import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; @@ -54,11 +53,18 @@ public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throw public Directory newDirectory(String repositoryName, String indexUUID, String shardId) throws IOException { try (Repository repository = repositoriesService.get().repository(repositoryName)) { assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; - BlobPath commonBlobPath = ((BlobStoreRepository) repository).basePath(); + BlobStoreRepository blobStoreRepository = ((BlobStoreRepository) repository); + BlobPath commonBlobPath = blobStoreRepository.basePath(); commonBlobPath = commonBlobPath.add(indexUUID).add(shardId).add(SEGMENTS); - RemoteDirectory dataDirectory = createRemoteDirectory(repository, commonBlobPath, "data"); - RemoteDirectory metadataDirectory = createRemoteDirectory(repository, commonBlobPath, "metadata"); + RemoteDirectory dataDirectory = new RemoteDirectory( + blobStoreRepository.blobStore().blobContainer(commonBlobPath.add("data")), + blobStoreRepository::maybeRateLimitRemoteUploadTransfers, + blobStoreRepository::maybeRateLimitRemoteDownloadTransfers + ); + RemoteDirectory metadataDirectory = new RemoteDirectory( + blobStoreRepository.blobStore().blobContainer(commonBlobPath.add("metadata")) + ); RemoteStoreLockManager mdLockManager = RemoteStoreLockManagerFactory.newLockManager( repositoriesService.get(), repositoryName, @@ -72,9 +78,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh } } - private RemoteDirectory createRemoteDirectory(Repository repository, BlobPath commonBlobPath, String extention) { - BlobPath extendedPath = commonBlobPath.add(extention); - BlobContainer dataBlobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(extendedPath); - return new RemoteDirectory(dataBlobContainer); + private RemoteDirectory createRemoteDirectory(BlobStoreRepository repository, BlobPath commonBlobPath, String extension) { + return new RemoteDirectory(repository.blobStore().blobContainer(commonBlobPath.add(extension))); } } diff --git a/server/src/main/java/org/opensearch/repositories/FilterRepository.java b/server/src/main/java/org/opensearch/repositories/FilterRepository.java index 1aba9e25a0dc2..08f8bcb467d03 100644 --- a/server/src/main/java/org/opensearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/opensearch/repositories/FilterRepository.java @@ -137,6 +137,16 @@ public long getRestoreThrottleTimeInNanos() { return in.getRestoreThrottleTimeInNanos(); } + @Override + public long getRemoteUploadThrottleTimeInNanos() { + return in.getRemoteUploadThrottleTimeInNanos(); + } + + @Override + public long getRemoteDownloadThrottleTimeInNanos() { + return in.getRemoteDownloadThrottleTimeInNanos(); + } + @Override public String startVerification() { return in.startVerification(); diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index 862a8de1e3218..76a3b65c9ea55 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -198,6 +198,16 @@ default void deleteSnapshotsAndReleaseLockFiles( */ long getRestoreThrottleTimeInNanos(); + /** + * Returns restore throttle time in nanoseconds + */ + long getRemoteUploadThrottleTimeInNanos(); + + /** + * Returns restore throttle time in nanoseconds + */ + long getRemoteDownloadThrottleTimeInNanos(); + /** * Returns stats on the repository usage */ diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 693022a60cc09..108a022a2612b 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -73,6 +73,8 @@ import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.DeleteResult; import org.opensearch.common.blobstore.fs.FsBlobContainer; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; +import org.opensearch.common.blobstore.transfer.stream.RateLimitingOffsetRangeInputStream; import org.opensearch.common.collect.Tuple; import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.io.Streams; @@ -295,10 +297,18 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final RateLimiter restoreRateLimiter; + private final RateLimiter remoteUploadRateLimiter; + + private final RateLimiter remoteDownloadRateLimiter; + private final CounterMetric snapshotRateLimitingTimeInNanos = new CounterMetric(); private final CounterMetric restoreRateLimitingTimeInNanos = new CounterMetric(); + private final CounterMetric remoteDownloadRateLimitingTimeInNanos = new CounterMetric(); + + private final CounterMetric remoteUploadRateLimitingTimeInNanos = new CounterMetric(); + public static final ChecksumBlobStoreFormat GLOBAL_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( "metadata", METADATA_NAME_FORMAT, @@ -398,6 +408,8 @@ protected BlobStoreRepository( this.supportURLRepo = SUPPORT_URL_REPO.get(metadata.settings()); snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO); + remoteUploadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_upload_bytes_per_sec", ByteSizeValue.ZERO); + remoteDownloadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_download_bytes_per_sec", ByteSizeValue.ZERO); readOnly = READONLY_SETTING.get(metadata.settings()); cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings()); bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes()); @@ -1778,6 +1790,16 @@ public long getRestoreThrottleTimeInNanos() { return restoreRateLimitingTimeInNanos.count(); } + @Override + public long getRemoteUploadThrottleTimeInNanos() { + return remoteUploadRateLimitingTimeInNanos.count(); + } + + @Override + public long getRemoteDownloadThrottleTimeInNanos() { + return remoteDownloadRateLimitingTimeInNanos.count(); + } + protected void assertSnapshotOrGenericThread() { assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']') || Thread.currentThread().getName().contains('[' + ThreadPool.Names.GENERIC + ']') : "Expected current thread [" @@ -3005,20 +3027,75 @@ private static ActionListener fileQueueListener( }); } - private static InputStream maybeRateLimit(InputStream stream, Supplier rateLimiterSupplier, CounterMetric metric) { - return new RateLimitingInputStream(stream, rateLimiterSupplier, metric::inc); + private static void mayBeLogRateLimits(BlobStoreTransferContext context, RateLimiter rateLimiter, long time) { + logger.debug( + () -> new ParameterizedMessage( + "Rate limited blob store transfer, context [{}], for duration [{} ms] for configured rate [{} MBps]", + context, + TimeValue.timeValueNanos(time).millis(), + rateLimiter.getMBPerSec() + ) + ); + } + + private static InputStream maybeRateLimit( + InputStream stream, + Supplier rateLimiterSupplier, + CounterMetric metric, + BlobStoreTransferContext context + ) { + return new RateLimitingInputStream(stream, rateLimiterSupplier, (t) -> { + mayBeLogRateLimits(context, rateLimiterSupplier.get(), t); + metric.inc(t); + }); + } + + private static OffsetRangeInputStream maybeRateLimitRemoteTransfers( + OffsetRangeInputStream offsetRangeInputStream, + Supplier rateLimiterSupplier, + CounterMetric metric, + BlobStoreTransferContext context + ) { + return new RateLimitingOffsetRangeInputStream(offsetRangeInputStream, rateLimiterSupplier, (t) -> { + mayBeLogRateLimits(context, rateLimiterSupplier.get(), t); + metric.inc(t); + }); } public InputStream maybeRateLimitRestores(InputStream stream) { return maybeRateLimit( - maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos), + maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT_RESTORE), recoverySettings::rateLimiter, - restoreRateLimitingTimeInNanos + restoreRateLimitingTimeInNanos, + BlobStoreTransferContext.SNAPSHOT_RESTORE + ); + } + + public OffsetRangeInputStream maybeRateLimitRemoteUploadTransfers(OffsetRangeInputStream offsetRangeInputStream) { + return maybeRateLimitRemoteTransfers( + offsetRangeInputStream, + () -> remoteUploadRateLimiter, + remoteUploadRateLimitingTimeInNanos, + BlobStoreTransferContext.REMOTE_UPLOAD + ); + } + + public InputStream maybeRateLimitRemoteDownloadTransfers(InputStream inputStream) { + return maybeRateLimit( + maybeRateLimit( + inputStream, + () -> remoteDownloadRateLimiter, + remoteDownloadRateLimitingTimeInNanos, + BlobStoreTransferContext.REMOTE_DOWNLOAD + ), + recoverySettings::rateLimiter, + remoteDownloadRateLimitingTimeInNanos, + BlobStoreTransferContext.REMOTE_DOWNLOAD ); } public InputStream maybeRateLimitSnapshots(InputStream stream) { - return maybeRateLimit(stream, () -> snapshotRateLimiter, snapshotRateLimitingTimeInNanos); + return maybeRateLimit(stream, () -> snapshotRateLimiter, snapshotRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT); } @Override @@ -3379,4 +3456,22 @@ private static final class ShardSnapshotMetaDeleteResult { this.blobsToDelete = blobsToDelete; } } + + enum BlobStoreTransferContext { + REMOTE_UPLOAD("remote_upload"), + REMOTE_DOWNLOAD("remote_download"), + SNAPSHOT("snapshot"), + SNAPSHOT_RESTORE("snapshot_restore"); + + private final String name; + + BlobStoreTransferContext(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + } } diff --git a/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStreamTests.java b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStreamTests.java new file mode 100644 index 0000000000000..fc2eba4c35e2a --- /dev/null +++ b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStreamTests.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer.stream; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.NIOFSDirectory; +import org.apache.lucene.store.RateLimiter; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; + +public class RateLimitingOffsetRangeInputStreamTests extends ResettableCheckedInputStreamBaseTest { + + private Directory directory; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + directory = new NIOFSDirectory(testFile.getParent()); + } + + @Override + protected OffsetRangeInputStream getOffsetRangeInputStream(long size, long position) throws IOException { + return new RateLimitingOffsetRangeInputStream( + new OffsetRangeIndexInputStream(directory.openInput(testFile.getFileName().toString(), IOContext.DEFAULT), size, position), + () -> new RateLimiter.SimpleRateLimiter(randomIntBetween(10, 20)), + (t) -> {} + ); + } + + @Override + @After + public void tearDown() throws Exception { + directory.close(); + super.tearDown(); + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java index b220b0891f11d..7655690685889 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -8,12 +8,17 @@ package org.opensearch.index.store; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; +import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer; +import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.support.PlainBlobMetadata; import org.opensearch.core.action.ActionListener; import org.opensearch.test.OpenSearchTestCase; @@ -28,9 +33,14 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.mockito.Mockito; + import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -58,6 +68,85 @@ public void testListAllEmpty() throws IOException { assertArrayEquals(expectedFileName, actualFileNames); } + public void testCopyFrom() throws IOException, InterruptedException { + AtomicReference postUploadInvoked = new AtomicReference<>(false); + String filename = "_100.si"; + VerifyingMultiStreamBlobContainer blobContainer = mock(VerifyingMultiStreamBlobContainer.class); + Mockito.doAnswer(invocation -> { + ActionListener completionListener = invocation.getArgument(1); + completionListener.onResponse(null); + return null; + }).when(blobContainer).asyncBlobUpload(any(WriteContext.class), any()); + + Directory storeDirectory = LuceneTestCase.newDirectory(); + IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); + indexOutput.writeString("Hello World!"); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + storeDirectory.sync(List.of(filename)); + + CountDownLatch countDownLatch = new CountDownLatch(1); + RemoteDirectory remoteDirectory = new RemoteDirectory(blobContainer); + remoteDirectory.copyFrom( + storeDirectory, + filename, + filename, + IOContext.READ, + () -> postUploadInvoked.set(true), + new ActionListener<>() { + @Override + public void onResponse(Void t) { + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail("Listener responded with exception" + e); + } + } + ); + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + assertTrue(postUploadInvoked.get()); + storeDirectory.close(); + } + + public void testCopyFromWithException() throws IOException, InterruptedException { + AtomicReference postUploadInvoked = new AtomicReference<>(false); + String filename = "_100.si"; + VerifyingMultiStreamBlobContainer blobContainer = mock(VerifyingMultiStreamBlobContainer.class); + Mockito.doAnswer(invocation -> { + ActionListener completionListener = invocation.getArgument(1); + completionListener.onResponse(null); + return null; + }).when(blobContainer).asyncBlobUpload(any(WriteContext.class), any()); + + Directory storeDirectory = LuceneTestCase.newDirectory(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + RemoteDirectory remoteDirectory = new RemoteDirectory(blobContainer); + remoteDirectory.copyFrom( + storeDirectory, + filename, + filename, + IOContext.READ, + () -> postUploadInvoked.set(true), + new ActionListener<>() { + @Override + public void onResponse(Void t) { + fail("Listener responded with success"); + } + + @Override + public void onFailure(Exception e) { + countDownLatch.countDown(); + } + } + ); + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + assertFalse(postUploadInvoked.get()); + storeDirectory.close(); + } + public void testListAll() throws IOException { Map fileNames = Stream.of("abc", "xyz", "pqr", "lmn", "jkl") .collect(Collectors.toMap(filename -> filename, filename -> new PlainBlobMetadata(filename, 100))); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 91154e5b77641..44dfb44eb9a15 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -517,6 +517,15 @@ public void onFailure(Exception e) {} public void testCopyFilesFromMultipartIOException() throws Exception { String filename = "_100.si"; + VerifyingMultiStreamBlobContainer blobContainer = mock(VerifyingMultiStreamBlobContainer.class); + remoteDataDirectory = new RemoteDirectory(blobContainer); + remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( + remoteDataDirectory, + remoteMetadataDirectory, + mdLockManager, + threadPool + ); + populateMetadata(); remoteSegmentStoreDirectory.init(); @@ -528,9 +537,6 @@ public void testCopyFilesFromMultipartIOException() throws Exception { storeDirectory.sync(List.of(filename)); assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); - - VerifyingMultiStreamBlobContainer blobContainer = mock(VerifyingMultiStreamBlobContainer.class); - when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); Mockito.doAnswer(invocation -> { ActionListener completionListener = invocation.getArgument(1); completionListener.onFailure(new Exception("Test exception")); diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index 33d79eb38467a..a79c0e5b1f1ba 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -283,6 +283,16 @@ public long getRestoreThrottleTimeInNanos() { return 0; } + @Override + public long getRemoteUploadThrottleTimeInNanos() { + return 0; + } + + @Override + public long getRemoteDownloadThrottleTimeInNanos() { + return 0; + } + @Override public String startVerification() { return null; diff --git a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java index 38520e9292206..fbee13ab3b551 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java @@ -150,6 +150,16 @@ public long getRestoreThrottleTimeInNanos() { return 0; } + @Override + public long getRemoteUploadThrottleTimeInNanos() { + return 0; + } + + @Override + public long getRemoteDownloadThrottleTimeInNanos() { + return 0; + } + @Override public String startVerification() { return null;