From 780285a1bc171fee585dc2b4a6f455b42ba04d92 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 24 Apr 2024 11:36:27 +0100 Subject: [PATCH 1/6] Strengthen locking in `FsBlobContainer` register impl Expands the JVM-wide mutex to prevent all concurrent operations on file-based registers, but then introduces an artificial mechanism for emulating write/write contention within a single JVM. --- .../common/blobstore/fs/FsBlobContainer.java | 67 ++++++++++++++----- 1 file changed, 49 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index aea24b7020a02..19df9f225e63e 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Strings; @@ -60,6 +61,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import static java.util.Collections.unmodifiableMap; @@ -392,13 +394,38 @@ private static OutputStream blobOutputStream(Path file) throws IOException { } @Override - @SuppressForbidden(reason = "write to channel that we have open for locking purposes already directly") + public void getRegister(OperationPurpose purpose, String key, ActionListener listener) { + // no lock to acquire here, we are emulating the lack of read/read and read/write contention in cloud repositories + doCompareAndExchangeRegisterUnderLock(key, BytesArray.EMPTY, BytesArray.EMPTY, listener); + } + + private static final KeyedLock writeMutexes = new KeyedLock<>(); + + @Override public void compareAndExchangeRegister( OperationPurpose purpose, String key, BytesReference expected, BytesReference updated, ActionListener listener + ) { + // Emulate write/write contention as might happen in cloud repositories, at least for the case where the writers are all in this + // JVM (e.g. for an ESIntegTestCase). + try (var mutex = writeMutexes.tryAcquire(path.resolve(key))) { + if (mutex == null) { + listener.onResponse(OptionalBytesReference.MISSING); + } else { + doCompareAndExchangeRegisterUnderLock(key, expected, updated, ActionListener.runBefore(listener, mutex::close)); + } + } + } + + @SuppressForbidden(reason = "write to channel that we have open for locking purposes already directly") + private void doCompareAndExchangeRegisterUnderLock( + String key, + BytesReference expected, + BytesReference updated, + ActionListener listener ) { ActionListener.completeWith(listener, () -> { BlobContainerUtils.ensureValidRegisterContent(updated); @@ -433,6 +460,7 @@ public void compareAndExchangeRegister( } return OptionalBytesReference.of(found); } catch (OverlappingFileLockException e) { + assert false : e; // should be impossible, we protect against all concurrent operations within this JVM return OptionalBytesReference.MISSING; } }); @@ -442,24 +470,29 @@ private record LockedFileChannel(FileChannel fileChannel, Closeable fileLock) im // Avoid concurrently opening/closing locked files, because this can trip an assertion within the JDK (see #93955 for details). // Perhaps it would work with finer-grained locks too, but we don't currently need to be fancy here. - private static final Object mutex = new Object(); + // + // Also, avoid concurrent operations on FsBlobContainer registers within a single JVM with a simple blocking lock, to avoid + // OverlappingFileLockException. FileChannel#lock blocks on concurrent operations on the file in a different process. This emulates + // the lack of read/read and read/write contention that can happen on a cloud repository register. + private static final ReentrantLock mutex = new ReentrantLock(); static LockedFileChannel open(Path path) throws IOException { - synchronized (mutex) { - List resources = new ArrayList<>(2); - try { - final FileChannel fileChannel = openOrCreateAtomic(path); - resources.add(fileChannel); + List resources = new ArrayList<>(3); + try { + mutex.lock(); + resources.add(mutex::unlock); - final Closeable fileLock = fileChannel.lock()::close; - resources.add(fileLock); + final FileChannel fileChannel = openOrCreateAtomic(path); + resources.add(fileChannel); - final var result = new LockedFileChannel(fileChannel, fileLock); - resources.clear(); - return result; - } finally { - IOUtils.closeWhileHandlingException(resources); - } + final Closeable fileLock = fileChannel.lock()::close; + resources.add(fileLock); + + final var result = new LockedFileChannel(fileChannel, fileLock); + resources.clear(); + return result; + } finally { + IOUtils.closeWhileHandlingException(resources); } } @@ -476,9 +509,7 @@ private static FileChannel openOrCreateAtomic(Path path) throws IOException { @Override public void close() throws IOException { - synchronized (mutex) { - IOUtils.close(fileLock, fileChannel); - } + IOUtils.close(fileLock, fileChannel, mutex::unlock); } } } From 481bf67bd1538bf17be4c2dc126e15b9b4c3c252 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 24 Apr 2024 12:06:56 +0100 Subject: [PATCH 2/6] static --- .../common/blobstore/fs/FsBlobContainer.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index 19df9f225e63e..a896bea828697 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -396,7 +396,7 @@ private static OutputStream blobOutputStream(Path file) throws IOException { @Override public void getRegister(OperationPurpose purpose, String key, ActionListener listener) { // no lock to acquire here, we are emulating the lack of read/read and read/write contention in cloud repositories - doCompareAndExchangeRegisterUnderLock(key, BytesArray.EMPTY, BytesArray.EMPTY, listener); + doCompareAndExchangeRegisterUnderLock(path.resolve(key), BytesArray.EMPTY, BytesArray.EMPTY, listener); } private static final KeyedLock writeMutexes = new KeyedLock<>(); @@ -411,25 +411,26 @@ public void compareAndExchangeRegister( ) { // Emulate write/write contention as might happen in cloud repositories, at least for the case where the writers are all in this // JVM (e.g. for an ESIntegTestCase). - try (var mutex = writeMutexes.tryAcquire(path.resolve(key))) { + final var registerPath = path.resolve(key); + try (var mutex = writeMutexes.tryAcquire(registerPath)) { if (mutex == null) { listener.onResponse(OptionalBytesReference.MISSING); } else { - doCompareAndExchangeRegisterUnderLock(key, expected, updated, ActionListener.runBefore(listener, mutex::close)); + doCompareAndExchangeRegisterUnderLock(registerPath, expected, updated, ActionListener.runBefore(listener, mutex::close)); } } } @SuppressForbidden(reason = "write to channel that we have open for locking purposes already directly") - private void doCompareAndExchangeRegisterUnderLock( - String key, + private static void doCompareAndExchangeRegisterUnderLock( + Path registerPath, BytesReference expected, BytesReference updated, ActionListener listener ) { ActionListener.completeWith(listener, () -> { BlobContainerUtils.ensureValidRegisterContent(updated); - try (LockedFileChannel lockedFileChannel = LockedFileChannel.open(path.resolve(key))) { + try (LockedFileChannel lockedFileChannel = LockedFileChannel.open(registerPath)) { final FileChannel fileChannel = lockedFileChannel.fileChannel(); final ByteBuffer readBuf = ByteBuffer.allocate(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH); while (readBuf.remaining() > 0) { From c2bac7c99be3cd01be62405834887568d6b864b8 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 24 Apr 2024 12:15:12 +0100 Subject: [PATCH 3/6] Iter --- .../common/blobstore/fs/FsBlobContainer.java | 92 ++++++++++--------- 1 file changed, 50 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index a896bea828697..402725e2bf39c 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -396,11 +396,12 @@ private static OutputStream blobOutputStream(Path file) throws IOException { @Override public void getRegister(OperationPurpose purpose, String key, ActionListener listener) { // no lock to acquire here, we are emulating the lack of read/read and read/write contention in cloud repositories - doCompareAndExchangeRegisterUnderLock(path.resolve(key), BytesArray.EMPTY, BytesArray.EMPTY, listener); + ActionListener.completeWith( + listener, + () -> doUncontendedCompareAndExchangeRegister(path.resolve(key), BytesArray.EMPTY, BytesArray.EMPTY) + ); } - private static final KeyedLock writeMutexes = new KeyedLock<>(); - @Override public void compareAndExchangeRegister( OperationPurpose purpose, @@ -409,62 +410,69 @@ public void compareAndExchangeRegister( BytesReference updated, ActionListener listener ) { + ActionListener.completeWith(listener, () -> doCompareAndExchangeRegister(path.resolve(key), expected, updated)); + } + + private static final KeyedLock writeMutexes = new KeyedLock<>(); + + private static OptionalBytesReference doCompareAndExchangeRegister(Path registerPath, BytesReference expected, BytesReference updated) + throws IOException { // Emulate write/write contention as might happen in cloud repositories, at least for the case where the writers are all in this // JVM (e.g. for an ESIntegTestCase). - final var registerPath = path.resolve(key); try (var mutex = writeMutexes.tryAcquire(registerPath)) { if (mutex == null) { - listener.onResponse(OptionalBytesReference.MISSING); + return OptionalBytesReference.MISSING; } else { - doCompareAndExchangeRegisterUnderLock(registerPath, expected, updated, ActionListener.runBefore(listener, mutex::close)); + try { + return doUncontendedCompareAndExchangeRegister(registerPath, expected, updated); + } finally { + mutex.close(); + } } } } @SuppressForbidden(reason = "write to channel that we have open for locking purposes already directly") - private static void doCompareAndExchangeRegisterUnderLock( + private static OptionalBytesReference doUncontendedCompareAndExchangeRegister( Path registerPath, BytesReference expected, - BytesReference updated, - ActionListener listener - ) { - ActionListener.completeWith(listener, () -> { - BlobContainerUtils.ensureValidRegisterContent(updated); - try (LockedFileChannel lockedFileChannel = LockedFileChannel.open(registerPath)) { - final FileChannel fileChannel = lockedFileChannel.fileChannel(); - final ByteBuffer readBuf = ByteBuffer.allocate(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH); - while (readBuf.remaining() > 0) { - if (fileChannel.read(readBuf) == -1) { - break; - } - } - final var found = new BytesArray(readBuf.array(), readBuf.arrayOffset(), readBuf.position()); - readBuf.clear(); - if (fileChannel.read(readBuf) != -1) { - throw new IllegalStateException( - "register contains more than [" + BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH + "] bytes" - ); + BytesReference updated + ) throws IOException { + BlobContainerUtils.ensureValidRegisterContent(updated); + try (LockedFileChannel lockedFileChannel = LockedFileChannel.open(registerPath)) { + final FileChannel fileChannel = lockedFileChannel.fileChannel(); + final ByteBuffer readBuf = ByteBuffer.allocate(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH); + while (readBuf.remaining() > 0) { + if (fileChannel.read(readBuf) == -1) { + break; } + } + final var found = new BytesArray(readBuf.array(), readBuf.arrayOffset(), readBuf.position()); + readBuf.clear(); + if (fileChannel.read(readBuf) != -1) { + throw new IllegalStateException( + "register contains more than [" + BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH + "] bytes" + ); + } - if (expected.equals(found)) { - var pageStart = 0L; - final var iterator = updated.iterator(); - BytesRef page; - while ((page = iterator.next()) != null) { - final var writeBuf = ByteBuffer.wrap(page.bytes, page.offset, page.length); - while (writeBuf.remaining() > 0) { - fileChannel.write(writeBuf, pageStart + writeBuf.position()); - } - pageStart += page.length; + if (expected.equals(found)) { + var pageStart = 0L; + final var iterator = updated.iterator(); + BytesRef page; + while ((page = iterator.next()) != null) { + final var writeBuf = ByteBuffer.wrap(page.bytes, page.offset, page.length); + while (writeBuf.remaining() > 0) { + fileChannel.write(writeBuf, pageStart + writeBuf.position()); } - fileChannel.force(true); + pageStart += page.length; } - return OptionalBytesReference.of(found); - } catch (OverlappingFileLockException e) { - assert false : e; // should be impossible, we protect against all concurrent operations within this JVM - return OptionalBytesReference.MISSING; + fileChannel.force(true); } - }); + return OptionalBytesReference.of(found); + } catch (OverlappingFileLockException e) { + assert false : e; // should be impossible, we protect against all concurrent operations within this JVM + return OptionalBytesReference.MISSING; + } } private record LockedFileChannel(FileChannel fileChannel, Closeable fileLock) implements Closeable { From 9d83d386ea1533f221794ccf50fae09917052d28 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 24 Apr 2024 12:44:01 +0100 Subject: [PATCH 4/6] Contention tests --- .../common/blobstore/BlobContainer.java | 2 +- .../blobstore/fs/FsBlobContainerTests.java | 76 +++++++++++++++++++ .../ContendedRegisterAnalyzeAction.java | 8 +- 3 files changed, 84 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index ae48de05a620b..4bcf3a9855a3e 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -259,7 +259,7 @@ default void compareAndSetRegister( * @param purpose The purpose of the operation * @param key key of the value to get * @param listener a listener, completed with the value read from the register or {@code OptionalBytesReference#MISSING} if the value - * could not be read due to concurrent activity. + * could not be read due to concurrent activity (which should not happen). */ default void getRegister(OperationPurpose purpose, String key, ActionListener listener) { compareAndExchangeRegister(purpose, key, BytesArray.EMPTY, BytesArray.EMPTY, listener); diff --git a/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java b/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java index 16213f1f761e4..b4ddc02aeb2d2 100644 --- a/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java +++ b/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java @@ -42,7 +42,9 @@ import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -52,6 +54,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.oneOf; import static org.hamcrest.Matchers.startsWith; @LuceneTestCase.SuppressFileSystems("*") // we do our own mocking @@ -238,6 +241,79 @@ public void testCompareAndExchange() throws Exception { ); } + public void testRegisterContention() throws Exception { + final Path path = PathUtils.get(createTempDir().toString()); + final FsBlobContainer container = new FsBlobContainer( + new FsBlobStore(randomIntBetween(1, 8) * 1024, path, false), + BlobPath.EMPTY, + path + ); + + final String contendedKey = randomAlphaOfLength(10); + final String uncontendedKey = randomAlphaOfLength(10); + + final var startValue = new BytesArray(randomByteArrayOfLength(8)); + final var finalValue = randomValueOtherThan(startValue, () -> new BytesArray(randomByteArrayOfLength(8))); + + final var p = randomPurpose(); + assertTrue(PlainActionFuture.get(l -> container.compareAndSetRegister(p, contendedKey, BytesArray.EMPTY, startValue, l))); + assertTrue(PlainActionFuture.get(l -> container.compareAndSetRegister(p, uncontendedKey, BytesArray.EMPTY, startValue, l))); + + final var threads = new Thread[between(2, 5)]; + final var startBarrier = new CyclicBarrier(threads.length + 1); + final var casSucceeded = new AtomicBoolean(); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread( + i == 0 + // first thread does an uncontended write, which must succeed + ? () -> { + safeAwait(startBarrier); + final OptionalBytesReference result = PlainActionFuture.get( + l -> container.compareAndExchangeRegister(p, uncontendedKey, startValue, finalValue, l) + ); + // NB calling .bytesReference() asserts that the result is present, there was no contention + assertEquals(startValue, result.bytesReference()); + } + // other threads try and do contended writes, which may fail and need retrying + : () -> { + safeAwait(startBarrier); + while (casSucceeded.get() == false) { + final OptionalBytesReference result = PlainActionFuture.get( + l -> container.compareAndExchangeRegister(p, contendedKey, startValue, finalValue, l) + ); + if (result.isPresent() && result.bytesReference().equals(startValue)) { + casSucceeded.set(true); + } + } + }, + "write-thread-" + i + ); + threads[i].start(); + } + + safeAwait(startBarrier); + while (casSucceeded.get() == false) { + for (var key : new String[] { contendedKey, uncontendedKey }) { + // NB calling .bytesReference() asserts that the read did not experience contention + assertThat( + PlainActionFuture.get(l -> container.getRegister(p, key, l)).bytesReference(), + oneOf(startValue, finalValue) + ); + } + } + + for (Thread thread : threads) { + thread.join(); + } + + for (var key : new String[] { contendedKey, uncontendedKey }) { + assertEquals( + finalValue, + PlainActionFuture.get(l -> container.getRegister(p, key, l)).bytesReference() + ); + } + } + public void testAtomicWriteMetadataWithoutAtomicOverwrite() throws IOException { this.fileSystem = new FilterFileSystemProvider("nooverwritefs://", fileSystem) { @Override diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/ContendedRegisterAnalyzeAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/ContendedRegisterAnalyzeAction.java index 8058b270d310e..40cb4a45a0339 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/ContendedRegisterAnalyzeAction.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/ContendedRegisterAnalyzeAction.java @@ -156,7 +156,13 @@ public void onFailure(Exception e) { }; if (request.getInitialRead() > request.getRequestCount()) { - blobContainer.getRegister(OperationPurpose.REPOSITORY_ANALYSIS, registerName, initialValueListener); + blobContainer.getRegister(OperationPurpose.REPOSITORY_ANALYSIS, registerName, initialValueListener.delegateFailure((l, r) -> { + if (r.isPresent()) { + l.onResponse(r); + } else { + l.onFailure(new IllegalStateException("register read failed due to contention")); + } + })); } else { blobContainer.compareAndExchangeRegister( OperationPurpose.REPOSITORY_ANALYSIS, From 36eb5fec3d4e6e5746f81c324d86ba40e530e9e5 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 24 Apr 2024 13:15:21 +0100 Subject: [PATCH 5/6] Fix read contention in RepositoryAnalysisFailureIT --- .../testkit/RepositoryAnalysisFailureIT.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java index 025516f8529ae..7715b9e8d42b8 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java @@ -712,6 +712,17 @@ public Map listBlobsByPrefix(OperationPurpose purpose, Str return blobMetadataByName; } + @Override + public void getRegister(OperationPurpose purpose, String key, ActionListener listener) { + assertPurpose(purpose); + final var register = registers.get(key); + if (register == null) { + listener.onResponse(OptionalBytesReference.EMPTY); + } else { + listener.onResponse(OptionalBytesReference.of(register.get())); + } + } + @Override public void compareAndExchangeRegister( OperationPurpose purpose, From b6cf2aa4388343197706b482c0b3e7b9d4c7628a Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 24 Apr 2024 13:23:20 +0100 Subject: [PATCH 6/6] Fix double-close --- .../common/blobstore/fs/FsBlobContainer.java | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index 402725e2bf39c..453cc6e3e1993 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -420,15 +420,9 @@ private static OptionalBytesReference doCompareAndExchangeRegister(Path register // Emulate write/write contention as might happen in cloud repositories, at least for the case where the writers are all in this // JVM (e.g. for an ESIntegTestCase). try (var mutex = writeMutexes.tryAcquire(registerPath)) { - if (mutex == null) { - return OptionalBytesReference.MISSING; - } else { - try { - return doUncontendedCompareAndExchangeRegister(registerPath, expected, updated); - } finally { - mutex.close(); - } - } + return mutex == null + ? OptionalBytesReference.MISSING + : doUncontendedCompareAndExchangeRegister(registerPath, expected, updated); } }