From 23c0421ee0feee0bd2c5768793c534ba700d5422 Mon Sep 17 00:00:00 2001 From: ohad Date: Tue, 11 Nov 2025 14:08:18 -0500 Subject: [PATCH 1/5] Add support for MDC context passing into the throttled iterator --- .../recordrepair/RecordRepair.java | 4 +- .../throttled/ThrottledRetryingIterator.java | 8 +++ .../throttled/ThrottledIteratorTest.java | 63 +++++++++++++++++++ 3 files changed, 74 insertions(+), 1 deletion(-) diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java index 7ca244d8c2..8d507b578e 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java @@ -33,6 +33,7 @@ import com.apple.foundationdb.util.CloseException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -100,7 +101,8 @@ protected RecordRepair(@Nonnull final Builder config) { this.database = config.database; this.storeBuilder = config.getStoreBuilder(); this.validationKind = config.getValidationKind(); - ThrottledRetryingIterator.Builder iteratorBuilder = ThrottledRetryingIterator.builder(database, cursorFactory(), this::handleOneItem); + ThrottledRetryingIterator.Builder iteratorBuilder = + ThrottledRetryingIterator.builder(database, cursorFactory(), MDC.getCopyOfContextMap(), this::handleOneItem); throttledIterator = configureThrottlingIterator(iteratorBuilder, config).build(); } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java index b47e2ba554..fa0e1a6c96 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java @@ -43,6 +43,7 @@ import javax.annotation.Nullable; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; @@ -403,6 +404,13 @@ public static Builder builder(FDBDatabase database, return new Builder<>(database, FDBRecordContextConfig.newBuilder(), cursorCreator, singleItemHandler); } + public static Builder builder(FDBDatabase database, + CursorFactory cursorCreator, + @Nullable Map mdcContext, + ItemHandler singleItemHandler) { + return new Builder<>(database, FDBRecordContextConfig.newBuilder().setMdcContext(mdcContext), cursorCreator, singleItemHandler); + } + /** * A builder class for the iterator. * diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java index aae40a62f0..cf12e0837e 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java @@ -36,11 +36,16 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.MDC; import javax.annotation.Nonnull; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Semaphore; @@ -51,6 +56,7 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -718,6 +724,63 @@ void testIteratorClosesOnNextCloses() throws Exception { Assertions.assertThat(cursor.get().isClosed()).isTrue(); } + @CsvSource({ + "test-request-123,20,false", // Basic MDC propagation + ",10,false", // Null context + "trace-xyz-789,30,true" // Multiple transactions + }) + private static Stream mdcParams() { + return Stream.of( + Arguments.of("value", 10, 0, 1), // have MDC value, one transaction + Arguments.of(null, 10, 0, 1), // null MDC, one transaction + Arguments.of("value", 10, 6, 2) // have MDC, 2 transactions + ); + } + + @ParameterizedTest + @MethodSource("mdcParams") + void testMdcContextPropagation(String mdcValue, int numRecords, int deletesPerTransaction, int expectedTransactions) throws Exception { + String mdcKey = "mdckey"; + final AtomicInteger transactionCount = new AtomicInteger(0); + + // Set MDC context if provided + if (mdcValue != null) { + MDC.put(mdcKey, mdcValue); + } + + try { + final Consumer transactionStart = + quotaManager -> transactionCount.incrementAndGet(); + final ItemHandler itemHandler = (store, item, quotaManager) -> { + assertThat(MDC.get(mdcKey)).isEqualTo(mdcValue); // covers null case + quotaManager.deleteCountInc(); + return AsyncUtil.DONE; + }; + + final FDBRecordStore.Builder storeBuilder; + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + storeBuilder = recordStore.asBuilder(); + commit(context); + } + + Map mdcContext = (mdcValue == null) ? null : MDC.getCopyOfContextMap(); + ThrottledRetryingIterator.Builder builder = + ThrottledRetryingIterator.builder(fdb, intCursor(numRecords, null), mdcContext, itemHandler); + + builder.withMaxRecordsDeletesPerTransaction(deletesPerTransaction) + .withTransactionInitNotification(transactionStart); + + try (ThrottledRetryingIterator throttledIterator = builder.build()) { + throttledIterator.iterateAll(storeBuilder).join(); + } + + assertThat(transactionCount.get()).isEqualTo(expectedTransactions); + } finally { + MDC.clear(); + } + } + private ThrottledRetryingIterator.Builder iteratorBuilder(final int numRecords, final ItemHandler itemHandler, final Consumer initNotification, From 50457199731242af5d408927c50313fef2d9e698 Mon Sep 17 00:00:00 2001 From: ohad Date: Tue, 11 Nov 2025 14:22:45 -0500 Subject: [PATCH 2/5] Cleanup --- .../throttled/ThrottledIteratorTest.java | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java index cf12e0837e..b9c2bf8269 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java @@ -43,7 +43,6 @@ import javax.annotation.Nonnull; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -724,11 +723,6 @@ void testIteratorClosesOnNextCloses() throws Exception { Assertions.assertThat(cursor.get().isClosed()).isTrue(); } - @CsvSource({ - "test-request-123,20,false", // Basic MDC propagation - ",10,false", // Null context - "trace-xyz-789,30,true" // Multiple transactions - }) private static Stream mdcParams() { return Stream.of( Arguments.of("value", 10, 0, 1), // have MDC value, one transaction @@ -742,13 +736,14 @@ private static Stream mdcParams() { void testMdcContextPropagation(String mdcValue, int numRecords, int deletesPerTransaction, int expectedTransactions) throws Exception { String mdcKey = "mdckey"; final AtomicInteger transactionCount = new AtomicInteger(0); - - // Set MDC context if provided - if (mdcValue != null) { - MDC.put(mdcKey, mdcValue); - } + final Map original = MDC.getCopyOfContextMap(); try { + // Set MDC context if provided + if (mdcValue != null) { + MDC.clear(); + MDC.put(mdcKey, mdcValue); + } final Consumer transactionStart = quotaManager -> transactionCount.incrementAndGet(); final ItemHandler itemHandler = (store, item, quotaManager) -> { @@ -777,7 +772,7 @@ void testMdcContextPropagation(String mdcValue, int numRecords, int deletesPerTr assertThat(transactionCount.get()).isEqualTo(expectedTransactions); } finally { - MDC.clear(); + MDC.setContextMap(original); } } From f9fc82318145a5979df232ee19383099a501e305 Mon Sep 17 00:00:00 2001 From: ohad Date: Wed, 12 Nov 2025 12:10:03 -0500 Subject: [PATCH 3/5] PR comments. Added setter for MdcContext, removed Builder constructor and placed configuration insode build() --- .../throttled/ThrottledRetryingIterator.java | 57 +++++++++++-------- .../throttled/ThrottledIteratorTest.java | 5 +- 2 files changed, 37 insertions(+), 25 deletions(-) diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java index fa0e1a6c96..593f1a94b4 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java @@ -390,36 +390,25 @@ void init() { } } - public static Builder builder(TransactionalRunner runner, - Executor executor, - ScheduledExecutorService scheduledExecutor, - CursorFactory cursorCreator, - ItemHandler singleItemHandler) { - return new Builder<>(runner, executor, scheduledExecutor, cursorCreator, singleItemHandler); - } - public static Builder builder(FDBDatabase database, CursorFactory cursorCreator, ItemHandler singleItemHandler) { return new Builder<>(database, FDBRecordContextConfig.newBuilder(), cursorCreator, singleItemHandler); } - public static Builder builder(FDBDatabase database, - CursorFactory cursorCreator, - @Nullable Map mdcContext, - ItemHandler singleItemHandler) { - return new Builder<>(database, FDBRecordContextConfig.newBuilder().setMdcContext(mdcContext), cursorCreator, singleItemHandler); - } - /** * A builder class for the iterator. * * @param the item type being iterated on. */ public static class Builder { - private final TransactionalRunner transactionalRunner; - private final Executor executor; - private final ScheduledExecutorService scheduledExecutor; + // Fields constructed during build() + private TransactionalRunner transactionalRunner; + private Executor executor; + private ScheduledExecutorService scheduledExecutor; + // Fields initialized by setters/constructor + private FDBDatabase database; + private FDBRecordContextConfig.Builder contextConfigBuilder; private final CursorFactory cursorCreator; private final ItemHandler singleItemHandler; private Consumer transactionSuccessNotification; @@ -452,11 +441,17 @@ private Builder(TransactionalRunner runner, Executor executor, ScheduledExecutor } private Builder(FDBDatabase database, FDBRecordContextConfig.Builder contextConfigBuilder, CursorFactory cursorCreator, ItemHandler singleItemHandler) { - this(new TransactionalRunner(database, contextConfigBuilder), - database.newContextExecutor(contextConfigBuilder.getMdcContext()), - database.getScheduledExecutor(), - cursorCreator, - singleItemHandler); + // Mandatory fields are set in the constructor. Everything else is optional. + this.database = database; + this.contextConfigBuilder = contextConfigBuilder; + this.cursorCreator = cursorCreator; + this.singleItemHandler = singleItemHandler; + // set defaults + this.transactionTimeQuotaMillis = (int)TimeUnit.SECONDS.toMillis(4); + this.maxRecordDeletesPerTransaction = 0; + this.maxRecordScannedPerSec = 0; + this.maxRecordDeletesPerSec = 0; + this.numOfRetries = NUMBER_OF_RETRIES; } /** @@ -546,11 +541,27 @@ public Builder withNumOfRetries(int numOfRetries) { return this; } + /** + * Set the MDC context for the runner/executor. + * This MDC context will be carried out into the runner and executor and will allow them to pass that down to + * LOGGER calls used by the item handlers. + * Defaults to empty context. + * @param mdcContext the MDC context to use + * @return this builder + */ + public Builder withMdcContext(Map mdcContext) { + this.contextConfigBuilder.setMdcContext(mdcContext); + return this; + } + /** * Create the iterator. * @return the newly minted iterator */ public ThrottledRetryingIterator build() { + this.transactionalRunner = new TransactionalRunner(database, contextConfigBuilder); + this.executor = database.newContextExecutor(contextConfigBuilder.getMdcContext()); + this.scheduledExecutor = database.getScheduledExecutor(); return new ThrottledRetryingIterator<>(this); } } diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java index b9c2bf8269..ab913268f0 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java @@ -735,7 +735,6 @@ private static Stream mdcParams() { @MethodSource("mdcParams") void testMdcContextPropagation(String mdcValue, int numRecords, int deletesPerTransaction, int expectedTransactions) throws Exception { String mdcKey = "mdckey"; - final AtomicInteger transactionCount = new AtomicInteger(0); final Map original = MDC.getCopyOfContextMap(); try { @@ -744,6 +743,7 @@ void testMdcContextPropagation(String mdcValue, int numRecords, int deletesPerTr MDC.clear(); MDC.put(mdcKey, mdcValue); } + final AtomicInteger transactionCount = new AtomicInteger(0); final Consumer transactionStart = quotaManager -> transactionCount.incrementAndGet(); final ItemHandler itemHandler = (store, item, quotaManager) -> { @@ -761,7 +761,8 @@ void testMdcContextPropagation(String mdcValue, int numRecords, int deletesPerTr Map mdcContext = (mdcValue == null) ? null : MDC.getCopyOfContextMap(); ThrottledRetryingIterator.Builder builder = - ThrottledRetryingIterator.builder(fdb, intCursor(numRecords, null), mdcContext, itemHandler); + ThrottledRetryingIterator.builder(fdb, intCursor(numRecords, null), itemHandler) + .withMdcContext(mdcContext); builder.withMaxRecordsDeletesPerTransaction(deletesPerTransaction) .withTransactionInitNotification(transactionStart); From a8ae709260b5767b6b788321a0ffe50d2961a36e Mon Sep 17 00:00:00 2001 From: ohad Date: Wed, 12 Nov 2025 12:27:21 -0500 Subject: [PATCH 4/5] PR comments. Added setter for MdcContext, removed Builder constructor and placed configuration insode build() --- .../provider/foundationdb/recordrepair/RecordRepair.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java index 8d507b578e..3e114200fd 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java @@ -102,7 +102,8 @@ protected RecordRepair(@Nonnull final Builder config) { this.storeBuilder = config.getStoreBuilder(); this.validationKind = config.getValidationKind(); ThrottledRetryingIterator.Builder iteratorBuilder = - ThrottledRetryingIterator.builder(database, cursorFactory(), MDC.getCopyOfContextMap(), this::handleOneItem); + ThrottledRetryingIterator.builder(database, cursorFactory(), this::handleOneItem) + .withMdcContext(MDC.getCopyOfContextMap()); throttledIterator = configureThrottlingIterator(iteratorBuilder, config).build(); } From 533334ceafed6afc3a8e9a4f2199034b7abe4184 Mon Sep 17 00:00:00 2001 From: ohad Date: Tue, 18 Nov 2025 15:22:01 +0200 Subject: [PATCH 5/5] PR comments --- .../throttled/ThrottledRetryingIterator.java | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java index 593f1a94b4..66ed060a4d 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java @@ -419,27 +419,6 @@ public static class Builder { private int maxRecordDeletesPerSec; private int numOfRetries; - /** - * Constructor. - * @param runner the FDB runner to use when creating transactions - * @param cursorCreator the factory to use when creating the inner cursor - * @param singleItemHandler the handler of a single item while iterating - */ - private Builder(TransactionalRunner runner, Executor executor, ScheduledExecutorService scheduledExecutor, CursorFactory cursorCreator, ItemHandler singleItemHandler) { - // Mandatory fields are set in the constructor. Everything else is optional. - this.transactionalRunner = runner; - this.executor = executor; - this.scheduledExecutor = scheduledExecutor; - this.cursorCreator = cursorCreator; - this.singleItemHandler = singleItemHandler; - // set defaults - this.transactionTimeQuotaMillis = (int)TimeUnit.SECONDS.toMillis(4); - this.maxRecordDeletesPerTransaction = 0; - this.maxRecordScannedPerSec = 0; - this.maxRecordDeletesPerSec = 0; - this.numOfRetries = NUMBER_OF_RETRIES; - } - private Builder(FDBDatabase database, FDBRecordContextConfig.Builder contextConfigBuilder, CursorFactory cursorCreator, ItemHandler singleItemHandler) { // Mandatory fields are set in the constructor. Everything else is optional. this.database = database;