diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index ae48de05a620b..4bcf3a9855a3e 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -259,7 +259,7 @@ default void compareAndSetRegister( * @param purpose The purpose of the operation * @param key key of the value to get * @param listener a listener, completed with the value read from the register or {@code OptionalBytesReference#MISSING} if the value - * could not be read due to concurrent activity. + * could not be read due to concurrent activity (which should not happen). */ default void getRegister(OperationPurpose purpose, String key, ActionListener listener) { compareAndExchangeRegister(purpose, key, BytesArray.EMPTY, BytesArray.EMPTY, listener); diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index aea24b7020a02..453cc6e3e1993 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Strings; @@ -60,6 +61,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import static java.util.Collections.unmodifiableMap; @@ -392,7 +394,15 @@ private static OutputStream blobOutputStream(Path file) throws IOException { } @Override - @SuppressForbidden(reason = "write to channel that we have open for locking purposes already directly") + public void getRegister(OperationPurpose purpose, String key, ActionListener listener) { + // no lock to acquire here, we are emulating the lack of read/read and read/write contention in cloud repositories + ActionListener.completeWith( + listener, + () -> doUncontendedCompareAndExchangeRegister(path.resolve(key), BytesArray.EMPTY, BytesArray.EMPTY) + ); + } + + @Override public void compareAndExchangeRegister( OperationPurpose purpose, String key, @@ -400,66 +410,92 @@ public void compareAndExchangeRegister( BytesReference updated, ActionListener listener ) { - ActionListener.completeWith(listener, () -> { - BlobContainerUtils.ensureValidRegisterContent(updated); - try (LockedFileChannel lockedFileChannel = LockedFileChannel.open(path.resolve(key))) { - final FileChannel fileChannel = lockedFileChannel.fileChannel(); - final ByteBuffer readBuf = ByteBuffer.allocate(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH); - while (readBuf.remaining() > 0) { - if (fileChannel.read(readBuf) == -1) { - break; - } - } - final var found = new BytesArray(readBuf.array(), readBuf.arrayOffset(), readBuf.position()); - readBuf.clear(); - if (fileChannel.read(readBuf) != -1) { - throw new IllegalStateException( - "register contains more than [" + BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH + "] bytes" - ); + ActionListener.completeWith(listener, () -> doCompareAndExchangeRegister(path.resolve(key), expected, updated)); + } + + private static final KeyedLock writeMutexes = new KeyedLock<>(); + + private static OptionalBytesReference doCompareAndExchangeRegister(Path registerPath, BytesReference expected, BytesReference updated) + throws IOException { + // Emulate write/write contention as might happen in cloud repositories, at least for the case where the writers are all in this + // JVM (e.g. for an ESIntegTestCase). + try (var mutex = writeMutexes.tryAcquire(registerPath)) { + return mutex == null + ? OptionalBytesReference.MISSING + : doUncontendedCompareAndExchangeRegister(registerPath, expected, updated); + } + } + + @SuppressForbidden(reason = "write to channel that we have open for locking purposes already directly") + private static OptionalBytesReference doUncontendedCompareAndExchangeRegister( + Path registerPath, + BytesReference expected, + BytesReference updated + ) throws IOException { + BlobContainerUtils.ensureValidRegisterContent(updated); + try (LockedFileChannel lockedFileChannel = LockedFileChannel.open(registerPath)) { + final FileChannel fileChannel = lockedFileChannel.fileChannel(); + final ByteBuffer readBuf = ByteBuffer.allocate(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH); + while (readBuf.remaining() > 0) { + if (fileChannel.read(readBuf) == -1) { + break; } + } + final var found = new BytesArray(readBuf.array(), readBuf.arrayOffset(), readBuf.position()); + readBuf.clear(); + if (fileChannel.read(readBuf) != -1) { + throw new IllegalStateException( + "register contains more than [" + BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH + "] bytes" + ); + } - if (expected.equals(found)) { - var pageStart = 0L; - final var iterator = updated.iterator(); - BytesRef page; - while ((page = iterator.next()) != null) { - final var writeBuf = ByteBuffer.wrap(page.bytes, page.offset, page.length); - while (writeBuf.remaining() > 0) { - fileChannel.write(writeBuf, pageStart + writeBuf.position()); - } - pageStart += page.length; + if (expected.equals(found)) { + var pageStart = 0L; + final var iterator = updated.iterator(); + BytesRef page; + while ((page = iterator.next()) != null) { + final var writeBuf = ByteBuffer.wrap(page.bytes, page.offset, page.length); + while (writeBuf.remaining() > 0) { + fileChannel.write(writeBuf, pageStart + writeBuf.position()); } - fileChannel.force(true); + pageStart += page.length; } - return OptionalBytesReference.of(found); - } catch (OverlappingFileLockException e) { - return OptionalBytesReference.MISSING; + fileChannel.force(true); } - }); + return OptionalBytesReference.of(found); + } catch (OverlappingFileLockException e) { + assert false : e; // should be impossible, we protect against all concurrent operations within this JVM + return OptionalBytesReference.MISSING; + } } private record LockedFileChannel(FileChannel fileChannel, Closeable fileLock) implements Closeable { // Avoid concurrently opening/closing locked files, because this can trip an assertion within the JDK (see #93955 for details). // Perhaps it would work with finer-grained locks too, but we don't currently need to be fancy here. - private static final Object mutex = new Object(); + // + // Also, avoid concurrent operations on FsBlobContainer registers within a single JVM with a simple blocking lock, to avoid + // OverlappingFileLockException. FileChannel#lock blocks on concurrent operations on the file in a different process. This emulates + // the lack of read/read and read/write contention that can happen on a cloud repository register. + private static final ReentrantLock mutex = new ReentrantLock(); static LockedFileChannel open(Path path) throws IOException { - synchronized (mutex) { - List resources = new ArrayList<>(2); - try { - final FileChannel fileChannel = openOrCreateAtomic(path); - resources.add(fileChannel); + List resources = new ArrayList<>(3); + try { + mutex.lock(); + resources.add(mutex::unlock); - final Closeable fileLock = fileChannel.lock()::close; - resources.add(fileLock); + final FileChannel fileChannel = openOrCreateAtomic(path); + resources.add(fileChannel); - final var result = new LockedFileChannel(fileChannel, fileLock); - resources.clear(); - return result; - } finally { - IOUtils.closeWhileHandlingException(resources); - } + final Closeable fileLock = fileChannel.lock()::close; + resources.add(fileLock); + + final var result = new LockedFileChannel(fileChannel, fileLock); + resources.clear(); + return result; + } finally { + IOUtils.closeWhileHandlingException(resources); } } @@ -476,9 +512,7 @@ private static FileChannel openOrCreateAtomic(Path path) throws IOException { @Override public void close() throws IOException { - synchronized (mutex) { - IOUtils.close(fileLock, fileChannel); - } + IOUtils.close(fileLock, fileChannel, mutex::unlock); } } } diff --git a/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java b/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java index 16213f1f761e4..b4ddc02aeb2d2 100644 --- a/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java +++ b/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java @@ -42,7 +42,9 @@ import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -52,6 +54,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.oneOf; import static org.hamcrest.Matchers.startsWith; @LuceneTestCase.SuppressFileSystems("*") // we do our own mocking @@ -238,6 +241,79 @@ public void testCompareAndExchange() throws Exception { ); } + public void testRegisterContention() throws Exception { + final Path path = PathUtils.get(createTempDir().toString()); + final FsBlobContainer container = new FsBlobContainer( + new FsBlobStore(randomIntBetween(1, 8) * 1024, path, false), + BlobPath.EMPTY, + path + ); + + final String contendedKey = randomAlphaOfLength(10); + final String uncontendedKey = randomAlphaOfLength(10); + + final var startValue = new BytesArray(randomByteArrayOfLength(8)); + final var finalValue = randomValueOtherThan(startValue, () -> new BytesArray(randomByteArrayOfLength(8))); + + final var p = randomPurpose(); + assertTrue(PlainActionFuture.get(l -> container.compareAndSetRegister(p, contendedKey, BytesArray.EMPTY, startValue, l))); + assertTrue(PlainActionFuture.get(l -> container.compareAndSetRegister(p, uncontendedKey, BytesArray.EMPTY, startValue, l))); + + final var threads = new Thread[between(2, 5)]; + final var startBarrier = new CyclicBarrier(threads.length + 1); + final var casSucceeded = new AtomicBoolean(); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread( + i == 0 + // first thread does an uncontended write, which must succeed + ? () -> { + safeAwait(startBarrier); + final OptionalBytesReference result = PlainActionFuture.get( + l -> container.compareAndExchangeRegister(p, uncontendedKey, startValue, finalValue, l) + ); + // NB calling .bytesReference() asserts that the result is present, there was no contention + assertEquals(startValue, result.bytesReference()); + } + // other threads try and do contended writes, which may fail and need retrying + : () -> { + safeAwait(startBarrier); + while (casSucceeded.get() == false) { + final OptionalBytesReference result = PlainActionFuture.get( + l -> container.compareAndExchangeRegister(p, contendedKey, startValue, finalValue, l) + ); + if (result.isPresent() && result.bytesReference().equals(startValue)) { + casSucceeded.set(true); + } + } + }, + "write-thread-" + i + ); + threads[i].start(); + } + + safeAwait(startBarrier); + while (casSucceeded.get() == false) { + for (var key : new String[] { contendedKey, uncontendedKey }) { + // NB calling .bytesReference() asserts that the read did not experience contention + assertThat( + PlainActionFuture.get(l -> container.getRegister(p, key, l)).bytesReference(), + oneOf(startValue, finalValue) + ); + } + } + + for (Thread thread : threads) { + thread.join(); + } + + for (var key : new String[] { contendedKey, uncontendedKey }) { + assertEquals( + finalValue, + PlainActionFuture.get(l -> container.getRegister(p, key, l)).bytesReference() + ); + } + } + public void testAtomicWriteMetadataWithoutAtomicOverwrite() throws IOException { this.fileSystem = new FilterFileSystemProvider("nooverwritefs://", fileSystem) { @Override diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java index 025516f8529ae..7715b9e8d42b8 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java @@ -712,6 +712,17 @@ public Map listBlobsByPrefix(OperationPurpose purpose, Str return blobMetadataByName; } + @Override + public void getRegister(OperationPurpose purpose, String key, ActionListener listener) { + assertPurpose(purpose); + final var register = registers.get(key); + if (register == null) { + listener.onResponse(OptionalBytesReference.EMPTY); + } else { + listener.onResponse(OptionalBytesReference.of(register.get())); + } + } + @Override public void compareAndExchangeRegister( OperationPurpose purpose, diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/ContendedRegisterAnalyzeAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/ContendedRegisterAnalyzeAction.java index 8058b270d310e..40cb4a45a0339 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/ContendedRegisterAnalyzeAction.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/ContendedRegisterAnalyzeAction.java @@ -156,7 +156,13 @@ public void onFailure(Exception e) { }; if (request.getInitialRead() > request.getRequestCount()) { - blobContainer.getRegister(OperationPurpose.REPOSITORY_ANALYSIS, registerName, initialValueListener); + blobContainer.getRegister(OperationPurpose.REPOSITORY_ANALYSIS, registerName, initialValueListener.delegateFailure((l, r) -> { + if (r.isPresent()) { + l.onResponse(r); + } else { + l.onFailure(new IllegalStateException("register read failed due to contention")); + } + })); } else { blobContainer.compareAndExchangeRegister( OperationPurpose.REPOSITORY_ANALYSIS,