Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import com.apple.foundationdb.record.metadata.Index;
import com.apple.foundationdb.record.metadata.MetaDataException;
import com.apple.foundationdb.record.provider.foundationdb.indexing.IndexingRangeSet;
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.Tuple;
import com.google.protobuf.Message;
import org.slf4j.Logger;
Expand Down Expand Up @@ -111,24 +110,17 @@ static IndexBuildProto.IndexBuildIndexingStamp compileIndexingTypeStamp() {
@Override
CompletableFuture<Void> buildIndexInternalAsync() {
return getRunner().runAsync(context ->
context.getReadVersionAsync()
.thenCompose(vignore -> {
SubspaceProvider subspaceProvider = common.getRecordStoreBuilder().getSubspaceProvider();
return subspaceProvider.getSubspaceAsync(context)
.thenCompose(subspace ->
indexScrub(subspaceProvider, subspace));
}),
context.getReadVersionAsync().thenCompose(ignore -> indexScrub()),
common.indexLogMessageKeyValues("IndexScrubbing::buildIndexInternalAsync"));
}

@Nonnull
private CompletableFuture<Void> indexScrub(@Nonnull SubspaceProvider subspaceProvider, @Nonnull Subspace subspace) {
private CompletableFuture<Void> indexScrub() {

final List<Object> additionalLogMessageKeyValues = Arrays.asList(LogMessageKeys.CALLING_METHOD, "indexScrub");

return iterateAllRanges(additionalLogMessageKeyValues,
this::indexScrubRangeOnly,
subspaceProvider, subspace);
this::indexScrubRangeOnly);
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import com.apple.foundationdb.record.query.plan.synthetic.SyntheticRecordFromStoredRecordPlan;
import com.apple.foundationdb.record.query.plan.synthetic.SyntheticRecordPlanner;
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.ByteArrayUtil2;
import com.apple.foundationdb.tuple.Tuple;
import com.google.protobuf.Message;
import org.slf4j.Logger;
Expand Down Expand Up @@ -500,7 +499,7 @@ private RecordCoreException newPartlyBuiltException(IndexBuildProto.IndexBuildIn
}

// Helpers for implementing modules. Some of them are public to support unit-testing.
protected CompletableFuture<Boolean> doneOrThrottleDelayAndMaybeLogProgress(boolean done, SubspaceProvider subspaceProvider, List<Object> additionalLogMessageKeyValues) {
protected CompletableFuture<Boolean> doneOrThrottleDelayAndMaybeLogProgress(boolean done, List<Object> additionalLogMessageKeyValues) {
if (done) {
return AsyncUtil.READY_FALSE;
}
Expand All @@ -514,8 +513,7 @@ protected CompletableFuture<Boolean> doneOrThrottleDelayAndMaybeLogProgress(bool
lastProgressSnapshot = StoreTimerSnapshot.from(timer);
}
LOGGER.info(KeyValueLogMessage.build("Indexer: Built Range",
subspaceProvider.logKey(), subspaceProvider,
LogMessageKeys.DELAY, toWait)
LogMessageKeys.DELAY, toWait)
.addKeysAndValues(additionalLogMessageKeyValues != null ? additionalLogMessageKeyValues : Collections.emptyList())
.addKeysAndValues(indexingLogMessageKeyValues())
.addKeysAndValues(common.indexLogMessageKeyValues())
Expand Down Expand Up @@ -907,14 +905,12 @@ private CompletableFuture<Void> updateMaintainerBuilder(@Nonnull FDBRecordStore
}

protected CompletableFuture<Void> iterateAllRanges(List<Object> additionalLogMessageKeyValues,
BiFunction<FDBRecordStore, AtomicLong, CompletableFuture<Boolean>> iterateRange,
@Nonnull SubspaceProvider subspaceProvider, @Nonnull Subspace subspace) {
return iterateAllRanges(additionalLogMessageKeyValues, iterateRange, subspaceProvider, subspace, null);
BiFunction<FDBRecordStore, AtomicLong, CompletableFuture<Boolean>> iterateRange) {
return iterateAllRanges(additionalLogMessageKeyValues, iterateRange, null);
}

protected CompletableFuture<Void> iterateAllRanges(List<Object> additionalLogMessageKeyValues,
BiFunction<FDBRecordStore, AtomicLong, CompletableFuture<Boolean>> iterateRange,
@Nonnull SubspaceProvider subspaceProvider, @Nonnull Subspace subspace,
@Nullable Function<FDBException, Optional<Boolean>> shouldReturnQuietly) {

return AsyncUtil.whileTrue(() ->
Expand All @@ -923,28 +919,29 @@ protected CompletableFuture<Void> iterateAllRanges(List<Object> additionalLogMes
if (ex == null) {
final Set<Index> indexSet = throttle.getAndResetMergeRequiredIndexes();
if (indexSet != null && !indexSet.isEmpty()) {
return mergeIndexes(indexSet, subspaceProvider)
.thenCompose(ignore -> doneOrThrottleDelayAndMaybeLogProgress(!hasMore, subspaceProvider, additionalLogMessageKeyValues));
return mergeIndexes(indexSet)
.thenCompose(ignore -> doneOrThrottleDelayAndMaybeLogProgress(!hasMore, additionalLogMessageKeyValues));
}
return doneOrThrottleDelayAndMaybeLogProgress(!hasMore, subspaceProvider, additionalLogMessageKeyValues);
return doneOrThrottleDelayAndMaybeLogProgress(!hasMore, additionalLogMessageKeyValues);
}
final RuntimeException unwrappedEx = getRunner().getDatabase().mapAsyncToSyncException(ex);
if (LOGGER.isInfoEnabled()) {
LOGGER.info(KeyValueLogMessage.of("possibly non-fatal error encountered building range",
LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(subspace.pack())), ex);
LOGGER.info(KeyValueLogMessage.build("possibly non-fatal error encountered building range")
.addKeysAndValues(common.indexLogMessageKeyValues())
.toString(), ex);
}
throw unwrappedEx;
}).thenCompose(Function.identity()),
getRunner().getExecutor());
}

public CompletableFuture<Void> mergeIndexes() {
return mergeIndexes(new HashSet<>(common.getTargetIndexes()), common.getRecordStoreBuilder().subspaceProvider);
return mergeIndexes(new HashSet<>(common.getTargetIndexes()));
}

private CompletableFuture<Void> mergeIndexes(Set<Index> indexSet, @Nullable SubspaceProvider subspaceProvider) {
private CompletableFuture<Void> mergeIndexes(Set<Index> indexSet) {
return AsyncUtil.whenAll(indexSet.stream()
.map(index -> getIndexingMerger(index).mergeIndex(subspaceProvider)
.map(index -> getIndexingMerger(index).mergeIndex()
).collect(Collectors.toList()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.apple.foundationdb.record.metadata.IndexTypes;
import com.apple.foundationdb.record.metadata.RecordType;
import com.apple.foundationdb.record.provider.foundationdb.indexing.IndexingRangeSet;
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.Tuple;
import com.google.protobuf.Message;
import com.google.protobuf.ZeroCopyByteString;
Expand Down Expand Up @@ -114,20 +113,15 @@ CompletableFuture<Void> buildIndexInternalAsync() {
validateSourceAndTargetIndexes(store);
// all valid; back to the future. Note that for practical reasons, readability and idempotency will be validated later
return context.getReadVersionAsync()
.thenCompose(vignore -> {
SubspaceProvider subspaceProvider = common.getRecordStoreBuilder().getSubspaceProvider();
return subspaceProvider.getSubspaceAsync(context)
.thenCompose(subspace -> buildIndexFromIndex(subspaceProvider, subspace));
});
.thenCompose(ignore -> buildIndexFromIndex());
}), common.indexLogMessageKeyValues("IndexingByIndex::buildIndexInternalAsync"));
}

@Nonnull
private CompletableFuture<Void> buildIndexFromIndex(@Nonnull SubspaceProvider subspaceProvider, @Nonnull Subspace subspace) {
private CompletableFuture<Void> buildIndexFromIndex() {
final List<Object> additionalLogMessageKeyValues = Arrays.asList(LogMessageKeys.CALLING_METHOD, "buildIndexFromIndex");
return iterateAllRanges(additionalLogMessageKeyValues,
(store, recordsScanned) -> buildRangeOnly(store, recordsScanned),
subspaceProvider, subspace);
this::buildRangeOnly);
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ LogMessageKeys.TARGET_INDEX_NAME, getTargetIndexesNames(),
LogMessageKeys.RECORDS_SCANNED, totalRecordsScanned.get(),
LogMessageKeys.INDEXER_ID, indexerId);

SubspaceProvider subspaceProvider = getRecordStoreBuilder().getSubspaceProvider();
if (subspaceProvider != null) {
keyValues.add(subspaceProvider.logKey());
keyValues.add(subspaceProvider);
}

if (moreKeyValues != null && !moreKeyValues.isEmpty()) {
keyValues.addAll(moreKeyValues);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -52,7 +50,6 @@ public class IndexingMerger {
private int mergeSuccesses = 0;
private long timeQuotaMillis = 0;
private final IndexingCommon common;
private SubspaceProvider subspaceProvider = null;
private int repartitionDocumentCount = 0;
private int repartitionSecondChances = 0;

Expand All @@ -62,13 +59,12 @@ public IndexingMerger(final Index index, IndexingCommon common, long initialMer
this.mergesLimit = initialMergesCountLimit;
}

private CompletableFuture<FDBRecordStore> openRecordStore(@Nonnull FDBRecordContext context) {
private CompletableFuture<FDBRecordStore> openRecordStore(FDBRecordContext context) {
return common.getRecordStoreBuilder().copyBuilder().setContext(context).openAsync();
}

@SuppressWarnings("squid:S3776") // cognitive complexity is high, candidate for refactoring
CompletableFuture<Void> mergeIndex(@Nullable SubspaceProvider subspaceProvider) {
this.subspaceProvider = subspaceProvider; // for logs only
CompletableFuture<Void> mergeIndex() {
final AtomicInteger failureCountLimit = new AtomicInteger(1000);
AtomicReference<IndexDeferredMaintenanceControl> mergeControlRef = new AtomicReference<>();
final FDBStoreTimer timer = common.getRunner().getTimer();
Expand Down Expand Up @@ -249,6 +245,7 @@ List<Object> mergerKeysAndValues(final IndexDeferredMaintenanceControl mergeCont
String mergerLogMessage(String ttl, final IndexDeferredMaintenanceControl mergeControl) {
final KeyValueLogMessage msg = KeyValueLogMessage.build(ttl);
msg.addKeysAndValues(mergerKeysAndValues(mergeControl));
SubspaceProvider subspaceProvider = common.getRecordStoreBuilder().getSubspaceProvider();
if (subspaceProvider != null) {
msg.addKeyAndValue(subspaceProvider.logKey(), subspaceProvider);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.apple.foundationdb.record.logging.LogMessageKeys;
import com.apple.foundationdb.record.metadata.Index;
import com.apple.foundationdb.record.provider.foundationdb.indexing.IndexingRangeSet;
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.ByteArrayUtil;
import com.apple.foundationdb.tuple.Tuple;
import com.google.protobuf.Message;
Expand Down Expand Up @@ -112,17 +111,12 @@ List<Object> indexingLogMessageKeyValues() {
CompletableFuture<Void> buildIndexInternalAsync() {
return getRunner().runAsync(context -> openRecordStore(context)
.thenCompose( store -> context.getReadVersionAsync()
.thenCompose(vignore -> {
SubspaceProvider subspaceProvider = common.getRecordStoreBuilder().getSubspaceProvider();
// validation checks, if any, will be performed here
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this comment intended to be that:

buildMultiTargetIndex will do any necessary validation checks

Or, the more problematic for this change:

getSubspaceAsync will do any necessary validation checks

? I'm pretty sure the answer is that buildMultiTargetIndex is doing the validation checks, but wanted to confirm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment appeared in the indexers as a hint/reminder where to add validations. I think that appears because IndexingByIndex had a source/target indexes validations at the post context.getReadVersionAsync lambda, and then its structure was copied by the other indexers. I did not think that this comment is needed anymore.

return subspaceProvider.getSubspaceAsync(context)
.thenCompose(subspace -> buildMultiTargetIndex(subspaceProvider, subspace));
})
), common.indexLogMessageKeyValues("IndexingMultiTargetByRecords::buildIndexInternalAsync"));
.thenCompose(ignore -> buildMultiTargetIndex())),
common.indexLogMessageKeyValues("IndexingMultiTargetByRecords::buildIndexInternalAsync"));
}

@Nonnull
private CompletableFuture<Void> buildMultiTargetIndex(@Nonnull SubspaceProvider subspaceProvider, @Nonnull Subspace subspace) {
private CompletableFuture<Void> buildMultiTargetIndex() {
final TupleRange tupleRange = common.computeRecordsRange();
final byte[] rangeStart;
final byte[] rangeEnd;
Expand Down Expand Up @@ -157,7 +151,7 @@ private CompletableFuture<Void> buildMultiTargetIndex(@Nonnull SubspaceProvider
LogMessageKeys.RANGE_END, rangeEnd);

return maybePresetRangeFuture.thenCompose(ignore ->
iterateAllRanges(additionalLogMessageKeyValues, this::buildRangeOnly, subspaceProvider, subspace));
iterateAllRanges(additionalLogMessageKeyValues, this::buildRangeOnly));
}

@Nonnull
Expand Down Expand Up @@ -218,7 +212,6 @@ private static CompletableFuture<Void> insertRanges(List<IndexingRangeSet> range
return AsyncUtil.whenAll(rangeSets.stream().map(set -> set.insertRangeAsync(start, end, true)).collect(Collectors.toList()));
}

@Nullable
@SuppressWarnings("unused")
private CompletableFuture<FDBStoredRecord<Message>> getRecordIfTypeMatch(FDBRecordStore store, @Nonnull RecordCursorResult<FDBStoredRecord<Message>> cursorResult) {
// No need to "translate" rec, so store is unused
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.apple.foundationdb.record.logging.LogMessageKeys;
import com.apple.foundationdb.record.metadata.Index;
import com.apple.foundationdb.record.provider.foundationdb.indexing.IndexingRangeSet;
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.ByteArrayUtil;
import com.apple.foundationdb.tuple.Tuple;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -277,21 +276,18 @@ private void fragmentIterationTypePlusPlus() {
CompletableFuture<Void> buildIndexInternalAsync() {
return getRunner().runAsync(context -> openRecordStore(context)
.thenCompose( store -> context.getReadVersionAsync()
.thenCompose(vignore -> {
.thenCompose(ignore -> {
// init fragment data
setFragmentationData(store);
SubspaceProvider subspaceProvider = common.getRecordStoreBuilder().getSubspaceProvider();
// validation checks, if any, will be performed here
return subspaceProvider.getSubspaceAsync(context)
.thenCompose(subspace -> buildMultiTargetIndex(subspaceProvider, subspace));
return buildMultiTargetIndex();
})
),
common.indexLogMessageKeyValues("IndexingMutuallyByRecords::buildIndexInternalAsync",
fragmentLogMessageKeyValues()));
}

@Nonnull
private CompletableFuture<Void> buildMultiTargetIndex(@Nonnull SubspaceProvider subspaceProvider, @Nonnull Subspace subspace) {
private CompletableFuture<Void> buildMultiTargetIndex() {
final TupleRange tupleRange = common.computeRecordsRange();
final byte[] rangeStart;
final byte[] rangeEnd;
Expand Down Expand Up @@ -325,13 +321,11 @@ private CompletableFuture<Void> buildMultiTargetIndex(@Nonnull SubspaceProvider

return maybePresetRangeFuture.thenCompose(ignore ->
iterateAllRanges(additionalLogMessageKeyValues,
(store, recordsScanned) -> buildRangeOnly(store, subspaceProvider, subspace),
subspaceProvider, subspace));
(store, recordsScanned) -> buildRangeOnly(store)));
}

@Nonnull
private CompletableFuture<Boolean> buildRangeOnly(@Nonnull FDBRecordStore store,
@Nonnull SubspaceProvider subspaceProvider, @Nonnull Subspace subspace) {
private CompletableFuture<Boolean> buildRangeOnly(@Nonnull FDBRecordStore store) {
// return false when done
/* Mutual indexing:
* 1. detects missing ranges
Expand All @@ -348,11 +342,10 @@ private CompletableFuture<Boolean> buildRangeOnly(@Nonnull FDBRecordStore store,
validateSameMetadataOrThrow(store);
IndexingRangeSet rangeSet = IndexingRangeSet.forIndexBuild(store, common.getPrimaryIndex());
return rangeSet.listMissingRangesAsync().thenCompose(missingRanges ->
buildNextRangeOnly(sortAndSquash(missingRanges), subspaceProvider, subspace));
buildNextRangeOnly(sortAndSquash(missingRanges)));
}

private CompletableFuture<Boolean> buildNextRangeOnly(List<Range> missingRanges,
@Nonnull SubspaceProvider subspaceProvider, @Nonnull Subspace subspace) {
private CompletableFuture<Boolean> buildNextRangeOnly(List<Range> missingRanges) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(KeyValueLogMessage.of("buildNextRangeOnly",
LogMessageKeys.MISSING_RANGES, missingRanges));
Expand Down Expand Up @@ -396,7 +389,6 @@ private CompletableFuture<Boolean> buildNextRangeOnly(List<Range> missingRanges,
additionalLogMessageKeyValues.addAll(fragmentLogMessageKeyValues());
return iterateAllRanges(additionalLogMessageKeyValues,
(store, recordsScanned) -> buildThisRangeOnly(store, recordsScanned, rangeToBuild),
subspaceProvider, subspace,
anyJumperCallback(rangeToBuild)
).thenCompose(ignore -> AsyncUtil.READY_TRUE);
}
Expand Down
Loading