Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Strengthen locking in FsBlobContainer register impl #107830

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ default void compareAndSetRegister(
* @param purpose The purpose of the operation
* @param key key of the value to get
* @param listener a listener, completed with the value read from the register or {@code OptionalBytesReference#MISSING} if the value
* could not be read due to concurrent activity.
* could not be read due to concurrent activity (which should not happen).
*/
default void getRegister(OperationPurpose purpose, String key, ActionListener<OptionalBytesReference> listener) {
compareAndExchangeRegister(purpose, key, BytesArray.EMPTY, BytesArray.EMPTY, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Strings;
Expand Down Expand Up @@ -60,6 +61,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

import static java.util.Collections.unmodifiableMap;

Expand Down Expand Up @@ -392,74 +394,114 @@ private static OutputStream blobOutputStream(Path file) throws IOException {
}

@Override
@SuppressForbidden(reason = "write to channel that we have open for locking purposes already directly")
public void getRegister(OperationPurpose purpose, String key, ActionListener<OptionalBytesReference> listener) {
// no lock to acquire here, we are emulating the lack of read/read and read/write contention in cloud repositories
ActionListener.completeWith(
listener,
() -> doUncontendedCompareAndExchangeRegister(path.resolve(key), BytesArray.EMPTY, BytesArray.EMPTY)
);
}

@Override
public void compareAndExchangeRegister(
OperationPurpose purpose,
String key,
BytesReference expected,
BytesReference updated,
ActionListener<OptionalBytesReference> listener
) {
ActionListener.completeWith(listener, () -> {
BlobContainerUtils.ensureValidRegisterContent(updated);
try (LockedFileChannel lockedFileChannel = LockedFileChannel.open(path.resolve(key))) {
final FileChannel fileChannel = lockedFileChannel.fileChannel();
final ByteBuffer readBuf = ByteBuffer.allocate(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH);
while (readBuf.remaining() > 0) {
if (fileChannel.read(readBuf) == -1) {
break;
}
ActionListener.completeWith(listener, () -> doCompareAndExchangeRegister(path.resolve(key), expected, updated));
}

private static final KeyedLock<Path> writeMutexes = new KeyedLock<>();

private static OptionalBytesReference doCompareAndExchangeRegister(Path registerPath, BytesReference expected, BytesReference updated)
throws IOException {
// Emulate write/write contention as might happen in cloud repositories, at least for the case where the writers are all in this
// JVM (e.g. for an ESIntegTestCase).
try (var mutex = writeMutexes.tryAcquire(registerPath)) {
if (mutex == null) {
return OptionalBytesReference.MISSING;
} else {
try {
return doUncontendedCompareAndExchangeRegister(registerPath, expected, updated);
} finally {
mutex.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not handled by the try-with-resource above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

d'oh yes it is

}
final var found = new BytesArray(readBuf.array(), readBuf.arrayOffset(), readBuf.position());
readBuf.clear();
if (fileChannel.read(readBuf) != -1) {
throw new IllegalStateException(
"register contains more than [" + BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH + "] bytes"
);
}
}
}

@SuppressForbidden(reason = "write to channel that we have open for locking purposes already directly")
private static OptionalBytesReference doUncontendedCompareAndExchangeRegister(
Path registerPath,
BytesReference expected,
BytesReference updated
) throws IOException {
BlobContainerUtils.ensureValidRegisterContent(updated);
try (LockedFileChannel lockedFileChannel = LockedFileChannel.open(registerPath)) {
final FileChannel fileChannel = lockedFileChannel.fileChannel();
final ByteBuffer readBuf = ByteBuffer.allocate(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH);
while (readBuf.remaining() > 0) {
if (fileChannel.read(readBuf) == -1) {
break;
}
}
final var found = new BytesArray(readBuf.array(), readBuf.arrayOffset(), readBuf.position());
readBuf.clear();
if (fileChannel.read(readBuf) != -1) {
throw new IllegalStateException(
"register contains more than [" + BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH + "] bytes"
);
}

if (expected.equals(found)) {
var pageStart = 0L;
final var iterator = updated.iterator();
BytesRef page;
while ((page = iterator.next()) != null) {
final var writeBuf = ByteBuffer.wrap(page.bytes, page.offset, page.length);
while (writeBuf.remaining() > 0) {
fileChannel.write(writeBuf, pageStart + writeBuf.position());
}
pageStart += page.length;
if (expected.equals(found)) {
var pageStart = 0L;
final var iterator = updated.iterator();
BytesRef page;
while ((page = iterator.next()) != null) {
final var writeBuf = ByteBuffer.wrap(page.bytes, page.offset, page.length);
while (writeBuf.remaining() > 0) {
fileChannel.write(writeBuf, pageStart + writeBuf.position());
}
fileChannel.force(true);
pageStart += page.length;
}
return OptionalBytesReference.of(found);
} catch (OverlappingFileLockException e) {
return OptionalBytesReference.MISSING;
fileChannel.force(true);
}
});
return OptionalBytesReference.of(found);
} catch (OverlappingFileLockException e) {
assert false : e; // should be impossible, we protect against all concurrent operations within this JVM
return OptionalBytesReference.MISSING;
}
}

private record LockedFileChannel(FileChannel fileChannel, Closeable fileLock) implements Closeable {

// Avoid concurrently opening/closing locked files, because this can trip an assertion within the JDK (see #93955 for details).
// Perhaps it would work with finer-grained locks too, but we don't currently need to be fancy here.
private static final Object mutex = new Object();
//
// Also, avoid concurrent operations on FsBlobContainer registers within a single JVM with a simple blocking lock, to avoid
// OverlappingFileLockException. FileChannel#lock blocks on concurrent operations on the file in a different process. This emulates
// the lack of read/read and read/write contention that can happen on a cloud repository register.
private static final ReentrantLock mutex = new ReentrantLock();

static LockedFileChannel open(Path path) throws IOException {
synchronized (mutex) {
List<Closeable> resources = new ArrayList<>(2);
try {
final FileChannel fileChannel = openOrCreateAtomic(path);
resources.add(fileChannel);
List<Closeable> resources = new ArrayList<>(3);
try {
mutex.lock();
resources.add(mutex::unlock);

final Closeable fileLock = fileChannel.lock()::close;
resources.add(fileLock);
final FileChannel fileChannel = openOrCreateAtomic(path);
resources.add(fileChannel);

final var result = new LockedFileChannel(fileChannel, fileLock);
resources.clear();
return result;
} finally {
IOUtils.closeWhileHandlingException(resources);
}
final Closeable fileLock = fileChannel.lock()::close;
resources.add(fileLock);

final var result = new LockedFileChannel(fileChannel, fileLock);
resources.clear();
return result;
} finally {
IOUtils.closeWhileHandlingException(resources);
}
}

Expand All @@ -476,9 +518,7 @@ private static FileChannel openOrCreateAtomic(Path path) throws IOException {

@Override
public void close() throws IOException {
synchronized (mutex) {
IOUtils.close(fileLock, fileChannel);
}
IOUtils.close(fileLock, fileChannel, mutex::unlock);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand All @@ -52,6 +54,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.startsWith;

@LuceneTestCase.SuppressFileSystems("*") // we do our own mocking
Expand Down Expand Up @@ -238,6 +241,79 @@ public void testCompareAndExchange() throws Exception {
);
}

public void testRegisterContention() throws Exception {
final Path path = PathUtils.get(createTempDir().toString());
final FsBlobContainer container = new FsBlobContainer(
new FsBlobStore(randomIntBetween(1, 8) * 1024, path, false),
BlobPath.EMPTY,
path
);

final String contendedKey = randomAlphaOfLength(10);
final String uncontendedKey = randomAlphaOfLength(10);

final var startValue = new BytesArray(randomByteArrayOfLength(8));
final var finalValue = randomValueOtherThan(startValue, () -> new BytesArray(randomByteArrayOfLength(8)));

final var p = randomPurpose();
assertTrue(PlainActionFuture.get(l -> container.compareAndSetRegister(p, contendedKey, BytesArray.EMPTY, startValue, l)));
assertTrue(PlainActionFuture.get(l -> container.compareAndSetRegister(p, uncontendedKey, BytesArray.EMPTY, startValue, l)));

final var threads = new Thread[between(2, 5)];
final var startBarrier = new CyclicBarrier(threads.length + 1);
final var casSucceeded = new AtomicBoolean();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(
i == 0
// first thread does an uncontended write, which must succeed
? () -> {
safeAwait(startBarrier);
final OptionalBytesReference result = PlainActionFuture.get(
l -> container.compareAndExchangeRegister(p, uncontendedKey, startValue, finalValue, l)
);
// NB calling .bytesReference() asserts that the result is present, there was no contention
assertEquals(startValue, result.bytesReference());
}
// other threads try and do contended writes, which may fail and need retrying
: () -> {
safeAwait(startBarrier);
while (casSucceeded.get() == false) {
final OptionalBytesReference result = PlainActionFuture.get(
l -> container.compareAndExchangeRegister(p, contendedKey, startValue, finalValue, l)
);
if (result.isPresent() && result.bytesReference().equals(startValue)) {
casSucceeded.set(true);
}
}
},
"write-thread-" + i
);
threads[i].start();
}

safeAwait(startBarrier);
while (casSucceeded.get() == false) {
for (var key : new String[] { contendedKey, uncontendedKey }) {
// NB calling .bytesReference() asserts that the read did not experience contention
assertThat(
PlainActionFuture.<OptionalBytesReference, RuntimeException>get(l -> container.getRegister(p, key, l)).bytesReference(),
oneOf(startValue, finalValue)
);
}
}

for (Thread thread : threads) {
thread.join();
}

for (var key : new String[] { contendedKey, uncontendedKey }) {
assertEquals(
finalValue,
PlainActionFuture.<OptionalBytesReference, RuntimeException>get(l -> container.getRegister(p, key, l)).bytesReference()
);
}
}

public void testAtomicWriteMetadataWithoutAtomicOverwrite() throws IOException {
this.fileSystem = new FilterFileSystemProvider("nooverwritefs://", fileSystem) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,17 @@ public Map<String, BlobMetadata> listBlobsByPrefix(OperationPurpose purpose, Str
return blobMetadataByName;
}

@Override
public void getRegister(OperationPurpose purpose, String key, ActionListener<OptionalBytesReference> listener) {
assertPurpose(purpose);
final var register = registers.get(key);
if (register == null) {
listener.onResponse(OptionalBytesReference.EMPTY);
} else {
listener.onResponse(OptionalBytesReference.of(register.get()));
}
}

@Override
public void compareAndExchangeRegister(
OperationPurpose purpose,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,13 @@ public void onFailure(Exception e) {
};

if (request.getInitialRead() > request.getRequestCount()) {
blobContainer.getRegister(OperationPurpose.REPOSITORY_ANALYSIS, registerName, initialValueListener);
blobContainer.getRegister(OperationPurpose.REPOSITORY_ANALYSIS, registerName, initialValueListener.delegateFailure((l, r) -> {
if (r.isPresent()) {
l.onResponse(r);
} else {
l.onFailure(new IllegalStateException("register read failed due to contention"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DaveCTurner raising your attention that it seems to fail AzureSnapshotRepoTestKitIT/testRepositoryAnalysis (see #108504)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the ping, clearly a bug: #108900

}
}));
} else {
blobContainer.compareAndExchangeRegister(
OperationPurpose.REPOSITORY_ANALYSIS,
Expand Down