Skip to content

Commit

Permalink
Fixes #357: FDBRecordStore.IndexUniquenessCheck.check needs to be async
Browse files Browse the repository at this point in the history
  • Loading branch information
MMcM committed Feb 14, 2019
1 parent 2c3b2db commit be87f45
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 84 deletions.
2 changes: 1 addition & 1 deletion docs/ReleaseNotes.md
Expand Up @@ -45,7 +45,7 @@ In order to simplify typed record stores, the `FDBRecordStoreBase` class was tur
* **Bug fix** ChainedCursor does not obey byte/record/time scan limits [(Issue #358)](https://github.com/FoundationDB/fdb-record-layer/issues/358)
* **Bug fix** Fix 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Bug fix** Fix 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Bug fix** Fix 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Bug fix** `FDBRecordStore.IndexUniquenessCheck.check` needs to be async [(Issue #357)](https://github.com/FoundationDB/fdb-record-layer/issues/357)
* **Bug fix** Fix 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Performance** Improvement 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Performance** Improvement 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
Expand Down
Expand Up @@ -24,13 +24,14 @@
import com.apple.foundationdb.MutationType;
import com.apple.foundationdb.ReadTransaction;
import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.record.RecordCoreStorageException;
import com.apple.foundationdb.record.SpotBugsSuppressWarnings;
import com.apple.foundationdb.record.logging.KeyValueLogMessage;
import com.apple.foundationdb.record.provider.common.StoreTimer;
import com.apple.foundationdb.tuple.ByteArrayUtil;
import com.apple.foundationdb.tuple.ByteArrayUtil2;
import com.apple.foundationdb.tuple.Tuple;
import com.apple.foundationdb.record.SpotBugsSuppressWarnings;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,6 +40,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
Expand All @@ -49,6 +51,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
* An open transaction against FDB.
Expand Down Expand Up @@ -85,7 +88,7 @@ public class FDBRecordContext extends FDBTransactionContext implements AutoClose
@Nullable
private Consumer<FDBStoreTimer.Wait> hookForAsyncToSync = null;
@Nonnull
private final Queue<CommitCheck> commitChecks = new ArrayDeque<>();
private final Queue<CommitCheckAsync> commitChecks = new ArrayDeque<>();
@Nonnull
private final Queue<AfterCommit> afterCommits = new ArrayDeque<>();

Expand Down Expand Up @@ -148,12 +151,15 @@ public void commit() {
public CompletableFuture<Void> commitAsync() {
long startTimeNanos = System.nanoTime();
ensureActive();
CompletableFuture<Void> checks = runCommitChecks();
versionMutationCache.forEach((key, valuePair) ->
transaction.mutate(valuePair.getLeft(), key, valuePair.getRight()));
runCommitChecks();
CompletableFuture<byte[]> versionFuture = transaction.getVersionstamp();
long beforeCommitTimeMillis = System.currentTimeMillis();
return transaction.commit().whenComplete((v, ex) -> {
CompletableFuture<Void> commit = checks.isDone() && !checks.isCompletedExceptionally() ?
transaction.commit() :
checks.thenCompose(vignore -> transaction.commit());
return commit.whenComplete((v, ex) -> {
StoreTimer.Event event = FDBStoreTimer.Events.COMMIT;
try {
if (ex != null) {
Expand Down Expand Up @@ -221,28 +227,115 @@ public long getTransactionCreateTime() {

/**
* A consistency check, such as uniqueness, that can execute asynchronously and is finally checked at or before commit time.
* @see #addCommitCheck(CommitCheckAsync)
*/
public interface CommitCheckAsync {
/**
* Get whether the check is ready to be tested.
* @return {@code true} if the check is complete
*/
default boolean isReady() {
return false;
}

/**
* Complete the check.
*
* This is always called once before {@link #commit} finishes. If {@link #isReady} returns {@code true} earlier,
* it can be called while processing the transaction.
* @return a future that will be complete (exceptionally if the check fails) when the check has been performed
*/
@Nonnull
CompletableFuture<Void> checkAsync();
}

/**
* A synchronous {@link CommitCheckAsync}.
*
* At some point, this class will be deprecated.
* Please implement {@link CommitCheckAsync} directly or call {@link #addCommitCheck(CompletableFuture)} instead.
*/
public interface CommitCheck {
boolean isReady();
public interface CommitCheck extends CommitCheckAsync {
@Override
@Nonnull
default CompletableFuture<Void> checkAsync() {
check();
return AsyncUtil.DONE;
}

/**
* Complete the check.
*
* This is always called once before {@link #commit} finishes. If {@link #isReady} returns {@code true} earlier,
* it can be called while processing the transaction.
*
* <p>
* This method should not block or {@link #commitAsync} will block. It is therefore much
* better to always implement {@link CommitCheckAsync} or call {@link #addCommitCheck(CompletableFuture)} instead.
*/
void check();
}

public synchronized void addCommitCheck(@Nonnull CommitCheck check) {
/**
* Add a {@link CommitCheckAsync} to be performed before {@link #commit} finishes.
*
* This method is suitable for checks that cannot be started until just before commit.
* For checks that can be started before {@code addCommitCheck} time, {@link #addCommitCheck(CompletableFuture)}
* may be more convenient.
* <p>
* It is possible for this method to throw an exception caused by an earlier unsuccessful check that has become ready in the meantime.
* @param check the check to be performed
*/
public synchronized void addCommitCheck(@Nonnull CommitCheckAsync check) {
while (!commitChecks.isEmpty()) {
if (commitChecks.peek().isReady()) {
commitChecks.remove().check();
asyncToSync(FDBStoreTimer.Waits.WAIT_ERROR_CHECK, commitChecks.remove().checkAsync());
} else {
break;
}
}
commitChecks.add(check);
}

public synchronized void runCommitChecks() {
while (!commitChecks.isEmpty()) {
commitChecks.remove().check();
/**
* Add a check to be completed before {@link #commit} finishes.
*
* {@link #commit} will wait for the future to be completed (exceptionally if the check fails)
* before committing the underlying transaction.
* <p>
* It is possible for this method to throw an exception caused by an earlier unsuccessful check that has become ready in the meantime.
* @param check the check to be performed
*/
public synchronized void addCommitCheck(@Nonnull CompletableFuture<Void> check) {
addCommitCheck(new CommitCheckAsync() {
@Override
public boolean isReady() {
return check.isDone();
}

@Nonnull
@Override
public CompletableFuture<Void> checkAsync() {
return check;
}
});
}

/**
* Run any {@link CommitCheckAsync}s that are still outstanding.
* @return a future that is complete when all checks have been performed
*/
@Nonnull
public CompletableFuture<Void> runCommitChecks() {
List<CompletableFuture<Void>> futures;
synchronized (this) {
if (commitChecks.isEmpty()) {
return AsyncUtil.DONE;
} else {
futures = commitChecks.stream().map(CommitCheckAsync::checkAsync).collect(Collectors.toList());
}
}
return AsyncUtil.whenAll(futures);
}

/**
Expand Down Expand Up @@ -338,26 +431,15 @@ public <T> T get(CompletableFuture<T> future) throws InterruptedException, Execu

public void timeReadSampleKey(byte[] key) {
if (timer != null) {
CompletableFuture<byte[]> future = instrument(FDBStoreTimer.Events.READ_SAMPLE_KEY,
ensureActive().get(key));
addCommitCheck(new CommitCheck() {
@Override
public boolean isReady() {
return future.isDone();
}

@Override
public void check() {
try {
future.get();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (ExecutionException ex) {
LOGGER.warn(KeyValueLogMessage.of("error reading sample key", "key", ByteArrayUtil2.loggable(key)),
CompletableFuture<Void> future = instrument(FDBStoreTimer.Events.READ_SAMPLE_KEY, ensureActive().get(key))
.handle((bytes, ex) -> {
if (ex != null) {
LOGGER.warn(KeyValueLogMessage.of("error reading sample key", "key", ByteArrayUtil2.loggable(key)),
ex);
}
}
});
}
return null;
});
addCommitCheck(future);
}
}

Expand Down
Expand Up @@ -629,61 +629,22 @@ public void addUniquenessCheck(@Nonnull AsyncIterable<KeyValue> kvs,
@Nonnull Index index,
@Nonnull IndexEntry indexEntry,
@Nonnull Tuple primaryKey) {
getRecordContext().addCommitCheck(new IndexUniquenessCheck(kvs, index, indexEntry, primaryKey));
}

class IndexUniquenessCheck implements FDBRecordContext.CommitCheck {
@Nonnull
private final AsyncIterator<KeyValue> iter;
@Nonnull
private final Index index;
@Nonnull
private final IndexEntry indexEntry;
@Nonnull
private final Tuple primaryKey;
@Nonnull
private CompletableFuture<Boolean> onHasNext;
@Nonnull
private IndexMaintainer indexMaintainer;

public IndexUniquenessCheck(@Nonnull AsyncIterable<KeyValue> iter,
@Nonnull Index index,
@Nonnull IndexEntry indexEntry,
@Nonnull Tuple primaryKey) {
this.iter = iter.iterator();
this.index = index;
this.indexEntry = indexEntry;
this.primaryKey = primaryKey;
this.indexMaintainer = getIndexMaintainer(index);

onHasNext = this.iter.onHasNext();
onHasNext = context.instrument(FDBStoreTimer.Events.CHECK_INDEX_UNIQUENESS, onHasNext);
}

@Override
public boolean isReady() {
return onHasNext.isDone();
}

@Override
public void check() {
Tuple valueKey = null;
while (iter.hasNext()) {
Tuple existingEntry = SplitHelper.unpackKey(indexMaintainer.getIndexSubspace(), iter.next());
Tuple existingKey = FDBRecordStoreBase.indexEntryPrimaryKey(index, existingEntry);
if (!primaryKey.equals(existingKey)) {
if (isIndexWriteOnly(index)) {
if (valueKey == null) {
valueKey = indexEntry.getKey();
final IndexMaintainer indexMaintainer = getIndexMaintainer(index);
final CompletableFuture<Void> checker = context.instrument(FDBStoreTimer.Events.CHECK_INDEX_UNIQUENESS,
AsyncUtil.forEach(kvs, kv -> {
Tuple existingEntry = SplitHelper.unpackKey(indexMaintainer.getIndexSubspace(), kv);
Tuple existingKey = FDBRecordStoreBase.indexEntryPrimaryKey(index, existingEntry);
if (!TupleHelpers.equals(primaryKey, existingKey)) {
if (isIndexWriteOnly(index)) {
Tuple valueKey = indexEntry.getKey();
indexMaintainer.updateUniquenessViolations(valueKey, primaryKey, existingKey, false);
indexMaintainer.updateUniquenessViolations(valueKey, existingKey, primaryKey, false);
} else {
throw new RecordIndexUniquenessViolation(index, indexEntry, primaryKey, existingKey);
}
indexMaintainer.updateUniquenessViolations(valueKey, primaryKey, existingKey, false);
indexMaintainer.updateUniquenessViolations(valueKey, existingKey, primaryKey, false);
} else {
throw new RecordIndexUniquenessViolation(index, indexEntry, primaryKey, existingKey);
}
}
}
}
}, getExecutor()));
getRecordContext().addCommitCheck(checker);
}

public CompletableFuture<IndexOperationResult> performIndexOperationAsync(@Nonnull String indexName,
Expand Down
Expand Up @@ -2554,4 +2554,56 @@ public void invalidMetaData() throws Exception {
assertThrows(KeyExpression.InvalidExpressionException.class, () -> openSimpleRecordStore(context, invalid));
}
}

@Test
public void commitChecks() throws Exception {
// Start check now; fails even if added.
try (FDBRecordContext context = openContext()) {
openSimpleRecordStore(context);

context.addCommitCheck(checkRec1Exists());

TestRecords1Proto.MySimpleRecord rec = TestRecords1Proto.MySimpleRecord.newBuilder()
.setRecNo(1L)
.build();
recordStore.saveRecord(rec);
assertThrows(RecordDoesNotExistException.class, () -> commit(context));
}
// Deferred check; fails if not added.
try (FDBRecordContext context = openContext()) {
openSimpleRecordStore(context);

context.addCommitCheck(this::checkRec1Exists);

assertThrows(RecordDoesNotExistException.class, () -> commit(context));
}
// Succeeds if added.
try (FDBRecordContext context = openContext()) {
openSimpleRecordStore(context);

context.addCommitCheck(this::checkRec1Exists);

TestRecords1Proto.MySimpleRecord rec = TestRecords1Proto.MySimpleRecord.newBuilder()
.setRecNo(1L)
.build();
recordStore.saveRecord(rec);
commit(context);
}
// Immediate succeeds too now.
try (FDBRecordContext context = openContext()) {
openSimpleRecordStore(context);

context.addCommitCheck(checkRec1Exists());
commit(context);
}
}

private CompletableFuture<Void> checkRec1Exists() {
return recordStore.recordExistsAsync(Tuple.from(1)).thenAccept(exists -> {
if (!exists) {
throw new RecordDoesNotExistException("required record does not exist");
}
});
}

}

0 comments on commit be87f45

Please sign in to comment.