From 7b5041c5cd7b3fe21d4917bafd8e5af01ce90791 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Sat, 18 Oct 2025 15:36:53 +1000 Subject: [PATCH 01/10] remove any Reentrant locks in favor of a CAS linked list --- src/main/java/org/dataloader/CacheMap.java | 2 +- src/main/java/org/dataloader/DataLoader.java | 53 ++-- .../java/org/dataloader/DataLoaderHelper.java | 229 ++++++++++-------- .../org/dataloader/impl/DefaultCacheMap.java | 12 +- src/test/java/ReadmeExamples.java | 2 +- .../java/org/dataloader/DataLoaderTest.java | 4 +- .../dataloader/fixtures/CustomCacheMap.java | 12 +- 7 files changed, 161 insertions(+), 153 deletions(-) diff --git a/src/main/java/org/dataloader/CacheMap.java b/src/main/java/org/dataloader/CacheMap.java index 54b1b49..36913e6 100644 --- a/src/main/java/org/dataloader/CacheMap.java +++ b/src/main/java/org/dataloader/CacheMap.java @@ -90,7 +90,7 @@ static CacheMap simpleMap() { * * @return the cache map for fluent coding */ - CacheMap set(K key, CompletableFuture value); + CompletableFuture setIfAbsent(K key, CompletableFuture value); /** * Deletes the entry with the specified key from the cache map, if it exists. diff --git a/src/main/java/org/dataloader/DataLoader.java b/src/main/java/org/dataloader/DataLoader.java index 68e699b..4077e55 100644 --- a/src/main/java/org/dataloader/DataLoader.java +++ b/src/main/java/org/dataloader/DataLoader.java @@ -35,8 +35,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -65,6 +63,7 @@ * * @param type parameter indicating the type of the data load keys * @param type parameter indicating the type of the data that is returned + * * @author Arnold Schrijver * @author Brad Baker */ @@ -79,7 +78,6 @@ public class DataLoader { private final ValueCache valueCache; private final DataLoaderOptions options; private final Object batchLoadFunction; - final Lock lock; @VisibleForTesting DataLoader(@Nullable String name, Object batchLoadFunction, @Nullable DataLoaderOptions options) { @@ -96,7 +94,6 @@ public class DataLoader { this.batchLoadFunction = nonNull(batchLoadFunction); this.options = loaderOptions; this.name = name; - this.lock = new ReentrantLock(); this.helper = new DataLoaderHelper<>(this, batchLoadFunction, loaderOptions, this.futureCache, this.valueCache, this.stats, clock); } @@ -136,6 +133,7 @@ public Object getBatchLoadFunction() { * This allows you to change the current {@link DataLoader} and turn it into a new one * * @param builderConsumer the {@link DataLoaderFactory.Builder} consumer for changing the {@link DataLoader} + * * @return a newly built {@link DataLoader} instance */ public DataLoader transform(Consumer> builderConsumer) { @@ -170,6 +168,7 @@ public Duration getTimeSinceDispatch() { * and returned from cache). * * @param key the key to load + * * @return the future of the value */ public CompletableFuture load(K key) { @@ -187,6 +186,7 @@ public CompletableFuture load(K key) { * NOTE : This will NOT cause a data load to happen. You must call {@link #load(Object)} for that to happen. * * @param key the key to check + * * @return an Optional to the future of the value */ public Optional> getIfPresent(K key) { @@ -205,6 +205,7 @@ public Optional> getIfPresent(K key) { * NOTE : This will NOT cause a data load to happen. You must call {@link #load(Object)} for that to happen. * * @param key the key to check + * * @return an Optional to the future of the value */ public Optional> getIfCompleted(K key) { @@ -224,6 +225,7 @@ public Optional> getIfCompleted(K key) { * * @param key the key to load * @param keyContext a context object that is specific to this key + * * @return the future of the value */ public CompletableFuture load(@NonNull K key, @Nullable Object keyContext) { @@ -239,6 +241,7 @@ public CompletableFuture load(@NonNull K key, @Nullable Object keyContext) { * and returned from cache). * * @param keys the list of keys to load + * * @return the composite future of the list of values */ public CompletableFuture> loadMany(List keys) { @@ -258,6 +261,7 @@ public CompletableFuture> loadMany(List keys) { * * @param keys the list of keys to load * @param keyContexts the list of key calling context objects + * * @return the composite future of the list of values */ public CompletableFuture> loadMany(List keys, List keyContexts) { @@ -288,6 +292,7 @@ public CompletableFuture> loadMany(List keys, List keyContext * {@link org.dataloader.MappedBatchLoaderWithContext} to help retrieve data. * * @param keysAndContexts the map of keys to their respective contexts + * * @return the composite future of the map of keys and values */ public CompletableFuture> loadMany(Map keysAndContexts) { @@ -358,6 +363,7 @@ public int dispatchDepth() { * on the next load request. * * @param key the key to remove + * * @return the data loader for fluent coding */ public DataLoader clear(K key) { @@ -371,17 +377,13 @@ public DataLoader clear(K key) { * * @param key the key to remove * @param handler a handler that will be called after the async remote clear completes + * * @return the data loader for fluent coding */ public DataLoader clear(K key, BiConsumer handler) { Object cacheKey = getCacheKey(key); - try { - lock.lock(); - futureCache.delete(cacheKey); - valueCache.delete(key).whenComplete(handler); - } finally { - lock.unlock(); - } + futureCache.delete(cacheKey); + valueCache.delete(key).whenComplete(handler); return this; } @@ -399,16 +401,12 @@ public DataLoader clearAll() { * Clears the entire cache map of the loader, and of the cached value store. * * @param handler a handler that will be called after the async remote clear all completes + * * @return the data loader for fluent coding */ public DataLoader clearAll(BiConsumer handler) { - try { - lock.lock(); - futureCache.clear(); - valueCache.clear().whenComplete(handler); - } finally { - lock.unlock(); - } + futureCache.clear(); + valueCache.clear().whenComplete(handler); return this; } @@ -419,6 +417,7 @@ public DataLoader clearAll(BiConsumer handler) { * * @param key the key * @param value the value + * * @return the data loader for fluent coding */ public DataLoader prime(K key, V value) { @@ -430,6 +429,7 @@ public DataLoader prime(K key, V value) { * * @param key the key * @param error the exception to prime instead of a value + * * @return the data loader for fluent coding */ public DataLoader prime(K key, Exception error) { @@ -443,18 +443,12 @@ public DataLoader prime(K key, Exception error) { * * @param key the key * @param value the value + * * @return the data loader for fluent coding */ public DataLoader prime(K key, CompletableFuture value) { Object cacheKey = getCacheKey(key); - try { - lock.lock(); - if (!futureCache.containsKey(cacheKey)) { - futureCache.set(cacheKey, value); - } - } finally { - lock.unlock(); - } + futureCache.setIfAbsent(cacheKey, value); return this; } @@ -465,6 +459,7 @@ public DataLoader prime(K key, CompletableFuture value) { * If no cache key function is present in {@link DataLoaderOptions}, then the returned value equals the input key. * * @param key the input key + * * @return the cache key after the input is transformed with the cache key function */ public Object getCacheKey(K key) { @@ -503,8 +498,8 @@ public ValueCache getValueCache() { @Override public String toString() { return "DataLoader{" + - "name='" + name + '\'' + - ", stats=" + stats + - '}'; + "name='" + name + '\'' + + ", stats=" + stats + + '}'; } } diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 83389e4..eeb6808 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -13,12 +13,14 @@ import org.dataloader.stats.context.IncrementCacheHitCountStatisticsContext; import org.dataloader.stats.context.IncrementLoadCountStatisticsContext; import org.dataloader.stats.context.IncrementLoadErrorCountStatisticsContext; +import org.jspecify.annotations.Nullable; import org.reactivestreams.Subscriber; import java.time.Clock; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -28,7 +30,6 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -52,20 +53,22 @@ class DataLoaderHelper { static class LoaderQueueEntry { final K key; - final V value; + final CompletableFuture value; final Object callContext; + final LoaderQueueEntry prev; - public LoaderQueueEntry(K key, V value, Object callContext) { + public LoaderQueueEntry(K key, CompletableFuture value, Object callContext, LoaderQueueEntry prev) { this.key = key; this.value = value; this.callContext = callContext; + this.prev = prev; } K getKey() { return key; } - V getValue() { + CompletableFuture getValue() { return value; } @@ -79,11 +82,11 @@ Object getCallContext() { private final DataLoaderOptions loaderOptions; private final CacheMap futureCache; private final ValueCache valueCache; - private final List>> loaderQueue; + // private final List>> loaderQueue; + private final AtomicReference<@Nullable LoaderQueueEntry> loaderQueue = new AtomicReference<>(); private final StatisticsCollector stats; private final Clock clock; private final AtomicReference lastDispatchTime; - private final Lock lock; DataLoaderHelper(DataLoader dataLoader, Object batchLoadFunction, @@ -97,8 +100,6 @@ Object getCallContext() { this.loaderOptions = loaderOptions; this.futureCache = futureCache; this.valueCache = valueCache; - this.lock = dataLoader.lock; - this.loaderQueue = new ArrayList<>(); this.stats = stats; this.clock = clock; this.lastDispatchTime = new AtomicReference<>(); @@ -114,39 +115,28 @@ public Instant getLastDispatchTime() { } Optional> getIfPresent(K key) { - try { - lock.lock(); - boolean cachingEnabled = loaderOptions.cachingEnabled(); - if (cachingEnabled) { - Object cacheKey = getCacheKey(nonNull(key)); - try { - CompletableFuture cacheValue = futureCache.get(cacheKey); - if (cacheValue != null) { - stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key)); - return Optional.of(cacheValue); - } - } catch (Exception ignored) { + boolean cachingEnabled = loaderOptions.cachingEnabled(); + if (cachingEnabled) { + Object cacheKey = getCacheKey(nonNull(key)); + try { + CompletableFuture cacheValue = futureCache.get(cacheKey); + if (cacheValue != null) { + stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key)); + return Optional.of(cacheValue); } + } catch (Exception ignored) { } - } finally { - lock.unlock(); } return Optional.empty(); } Optional> getIfCompleted(K key) { - try { - lock.lock(); - - Optional> cachedPromise = getIfPresent(key); - if (cachedPromise.isPresent()) { - CompletableFuture promise = cachedPromise.get(); - if (promise.isDone()) { - return cachedPromise; - } + Optional> cachedPromise = getIfPresent(key); + if (cachedPromise.isPresent()) { + CompletableFuture promise = cachedPromise.get(); + if (promise.isDone()) { + return cachedPromise; } - } finally { - lock.unlock(); } return Optional.empty(); } @@ -154,25 +144,70 @@ Optional> getIfCompleted(K key) { @GuardedBy("lock") CompletableFuture load(K key, Object loadContext) { - try { - lock.lock(); + boolean batchingEnabled = loaderOptions.batchingEnabled(); + boolean futureCachingEnabled = loaderOptions.cachingEnabled(); + + stats.incrementLoadCount(new IncrementLoadCountStatisticsContext<>(key, loadContext)); + DataLoaderInstrumentationContext ctx = ctxOrNoopCtx(instrumentation().beginLoad(dataLoader, key, loadContext)); + Object cacheKey = null; + if (futureCachingEnabled) { + cacheKey = loadContext == null ? getCacheKey(key) : getCacheKeyWithContext(key, loadContext); + try { + CompletableFuture cachedFuture = futureCache.get(cacheKey); + if (cachedFuture != null) { + // We already have a promise for this key, no need to check value cache or queue up load + stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); + ctx.onDispatched(); + cachedFuture.whenComplete(ctx::onCompleted); + return cachedFuture; + } + } catch (Exception ignored) { + } + } + CompletableFuture loadCallFuture; + if (batchingEnabled) { + loadCallFuture = new CompletableFuture<>(); + if (futureCachingEnabled) { + CompletableFuture cachedFuture = futureCache.setIfAbsent(cacheKey, loadCallFuture); + if (cachedFuture != null) { + // another thread was faster and created a matching CF ... hence this is really a cachehit and we are done + stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); + ctx.onDispatched(); + cachedFuture.whenComplete(ctx::onCompleted); + return cachedFuture; + } + } + addEntryToLoaderQueue(key, loadCallFuture, loadContext); + } else { + stats.incrementBatchLoadCountBy(1, new IncrementBatchLoadCountByStatisticsContext<>(key, loadContext)); + // immediate execution of batch function + loadCallFuture = invokeLoaderImmediately(key, loadContext, true); + if (futureCachingEnabled) { + CompletableFuture cachedFuture = futureCache.setIfAbsent(cacheKey, loadCallFuture); + if (cachedFuture != null) { + // another thread was faster and the loader was invoked twice with the same key + // we are disregarding the resul of our dispatch call and use the already cached value + // meaning this is a cache hit and we are done + stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); + ctx.onDispatched(); + cachedFuture.whenComplete(ctx::onCompleted); + return cachedFuture; + } + } + } - boolean batchingEnabled = loaderOptions.batchingEnabled(); - boolean cachingEnabled = loaderOptions.cachingEnabled(); + ctx.onDispatched(); + loadCallFuture.whenComplete(ctx::onCompleted); + return loadCallFuture; + } - stats.incrementLoadCount(new IncrementLoadCountStatisticsContext<>(key, loadContext)); - DataLoaderInstrumentationContext ctx = ctxOrNoopCtx(instrumentation().beginLoad(dataLoader, key,loadContext)); - CompletableFuture cf; - if (cachingEnabled) { - cf = loadFromCache(key, loadContext, batchingEnabled); - } else { - cf = queueOrInvokeLoader(key, loadContext, batchingEnabled, false); + private void addEntryToLoaderQueue(K key, CompletableFuture future, Object loadContext) { + while (true) { + LoaderQueueEntry prev = loaderQueue.get(); + LoaderQueueEntry curr = new LoaderQueueEntry<>(key, future, loadContext, prev); + if (loaderQueue.compareAndSet(prev, curr)) { + return; } - ctx.onDispatched(); - cf.whenComplete(ctx::onCompleted); - return cf; - } finally { - lock.unlock(); } } @@ -196,35 +231,42 @@ DispatchResult dispatch() { final List keys; final List callContexts; final List> queuedFutures; - try { - lock.lock(); - int queueSize = loaderQueue.size(); - if (queueSize == 0) { - lastDispatchTime.set(now()); - instrCtx.onDispatched(); - return endDispatchCtx(instrCtx, emptyDispatchResult()); + LoaderQueueEntry loaderQueueEntryHead; + while (true) { + loaderQueueEntryHead = loaderQueue.get(); + if (loaderQueue.compareAndSet(loaderQueueEntryHead, null)) { + break; } - - // we copy the pre-loaded set of futures ready for dispatch - keys = new ArrayList<>(queueSize); - callContexts = new ArrayList<>(queueSize); - queuedFutures = new ArrayList<>(queueSize); - - loaderQueue.forEach(entry -> { - keys.add(entry.getKey()); - queuedFutures.add(entry.getValue()); - callContexts.add(entry.getCallContext()); - }); - loaderQueue.clear(); + } + if (loaderQueueEntryHead == null) { lastDispatchTime.set(now()); - } finally { - lock.unlock(); + instrCtx.onDispatched(); + return endDispatchCtx(instrCtx, emptyDispatchResult()); } + int queueSize = calcQueueDepth(loaderQueueEntryHead); + // we copy the pre-loaded set of futures ready for dispatch + keys = new ArrayList<>(queueSize); + callContexts = new ArrayList<>(queueSize); + queuedFutures = new ArrayList<>(queueSize); + + while (loaderQueueEntryHead != null) { + keys.add(loaderQueueEntryHead.getKey()); + queuedFutures.add(loaderQueueEntryHead.getValue()); + callContexts.add(loaderQueueEntryHead.getCallContext()); + loaderQueueEntryHead = loaderQueueEntryHead.prev; + } + //TODO: to many test depend on the previous order, therefore we reverse the lists here + // but this should not matter and we should change the tests + Collections.reverse(keys); + Collections.reverse(callContexts); + Collections.reverse(queuedFutures); + lastDispatchTime.set(now()); if (!batchingEnabled) { instrCtx.onDispatched(); return endDispatchCtx(instrCtx, emptyDispatchResult()); } + final int totalEntriesHandled = keys.size(); // // order of keys -> values matter in data loader hence the use of linked hash map @@ -354,38 +396,6 @@ private void possiblyClearCacheEntriesOnExceptions(List keys) { } } - @GuardedBy("lock") - private CompletableFuture loadFromCache(K key, Object loadContext, boolean batchingEnabled) { - final Object cacheKey = loadContext == null ? getCacheKey(key) : getCacheKeyWithContext(key, loadContext); - - try { - CompletableFuture cacheValue = futureCache.get(cacheKey); - if (cacheValue != null) { - // We already have a promise for this key, no need to check value cache or queue up load - stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); - return cacheValue; - } - } catch (Exception ignored) { - } - - CompletableFuture loadCallFuture = queueOrInvokeLoader(key, loadContext, batchingEnabled, true); - futureCache.set(cacheKey, loadCallFuture); - return loadCallFuture; - } - - @GuardedBy("lock") - private CompletableFuture queueOrInvokeLoader(K key, Object loadContext, boolean batchingEnabled, boolean cachingEnabled) { - if (batchingEnabled) { - CompletableFuture loadCallFuture = new CompletableFuture<>(); - loaderQueue.add(new LoaderQueueEntry<>(key, loadCallFuture, loadContext)); - return loadCallFuture; - } else { - stats.incrementBatchLoadCountBy(1, new IncrementBatchLoadCountByStatisticsContext<>(key, loadContext)); - // immediate execution of batch function - return invokeLoaderImmediately(key, loadContext, cachingEnabled); - } - } - CompletableFuture invokeLoaderImmediately(K key, Object keyContext, boolean cachingEnabled) { List keys = singletonList(key); List keyContexts = singletonList(keyContext); @@ -626,14 +636,19 @@ private DataLoaderInstrumentation instrumentation() { } int dispatchDepth() { - try { - lock.lock(); - return loaderQueue.size(); - } finally { - lock.unlock(); + return calcQueueDepth(loaderQueue.get()); + } + + private int calcQueueDepth(LoaderQueueEntry head) { + int count = 0; + while (head != null) { + count++; + head = head.prev; } + return count; } + private final List> NOT_SUPPORTED_LIST = emptyList(); private final CompletableFuture>> NOT_SUPPORTED = CompletableFuture.completedFuture(NOT_SUPPORTED_LIST); private final Try ALWAYS_FAILED = Try.alwaysFailed(); diff --git a/src/main/java/org/dataloader/impl/DefaultCacheMap.java b/src/main/java/org/dataloader/impl/DefaultCacheMap.java index fa89bb0..c8e1ae2 100644 --- a/src/main/java/org/dataloader/impl/DefaultCacheMap.java +++ b/src/main/java/org/dataloader/impl/DefaultCacheMap.java @@ -20,9 +20,8 @@ import org.dataloader.annotations.Internal; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; /** * Default implementation of {@link CacheMap} that is based on a regular {@link java.util.HashMap}. @@ -35,13 +34,13 @@ @Internal public class DefaultCacheMap implements CacheMap { - private final Map> cache; + private final ConcurrentHashMap> cache; /** * Default constructor */ public DefaultCacheMap() { - cache = new HashMap<>(); + cache = new ConcurrentHashMap<>(); } /** @@ -73,9 +72,8 @@ public Collection> getAll() { * {@inheritDoc} */ @Override - public CacheMap set(K key, CompletableFuture value) { - cache.put(key, value); - return this; + public CompletableFuture setIfAbsent(K key, CompletableFuture value) { + return cache.putIfAbsent(key, value); } /** diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java index f391b80..8cf5c70 100644 --- a/src/test/java/ReadmeExamples.java +++ b/src/test/java/ReadmeExamples.java @@ -265,7 +265,7 @@ public Collection> getAll() { } @Override - public CacheMap set(Object key, CompletableFuture value) { + public CompletableFuture setIfAbsent(Object key, CompletableFuture value) { return null; } diff --git a/src/test/java/org/dataloader/DataLoaderTest.java b/src/test/java/org/dataloader/DataLoaderTest.java index 37ae030..6ec548a 100644 --- a/src/test/java/org/dataloader/DataLoaderTest.java +++ b/src/test/java/org/dataloader/DataLoaderTest.java @@ -61,6 +61,7 @@ import static org.dataloader.DataLoaderOptions.newDefaultOptions; import static org.dataloader.DataLoaderOptions.newOptions; import static org.dataloader.fixtures.TestKit.areAllDone; +import static org.dataloader.fixtures.TestKit.asSet; import static org.dataloader.fixtures.TestKit.listFrom; import static org.dataloader.impl.CompletableFutureKit.cause; import static org.hamcrest.MatcherAssert.assertThat; @@ -70,6 +71,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; /** @@ -980,7 +982,7 @@ public void should_Accept_a_custom_cache_map_implementation(TestDataLoaderFactor assertThat(future2b.get(), equalTo("b")); assertThat(loadCalls, equalTo(asList(asList("a", "b"), singletonList("c"), singletonList("b")))); - assertArrayEquals(customMap.stash.keySet().toArray(), asList("a", "c", "b").toArray()); + assertEquals(customMap.stash.keySet(), asSet("a", "c", "b")); // Supports clear all diff --git a/src/test/java/org/dataloader/fixtures/CustomCacheMap.java b/src/test/java/org/dataloader/fixtures/CustomCacheMap.java index 695da5e..0a32ba2 100644 --- a/src/test/java/org/dataloader/fixtures/CustomCacheMap.java +++ b/src/test/java/org/dataloader/fixtures/CustomCacheMap.java @@ -3,16 +3,15 @@ import org.dataloader.CacheMap; import java.util.Collection; -import java.util.LinkedHashMap; -import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; public class CustomCacheMap implements CacheMap { - public Map> stash; + public ConcurrentHashMap> stash; public CustomCacheMap() { - stash = new LinkedHashMap<>(); + stash = new ConcurrentHashMap<>(); } @Override @@ -31,9 +30,8 @@ public Collection> getAll() { } @Override - public CacheMap set(String key, CompletableFuture value) { - stash.put(key, value); - return this; + public CompletableFuture setIfAbsent(String key, CompletableFuture value) { + return stash.putIfAbsent(key, value); } @Override From 65a496ec4738b261ab72faa31e88823a4f8db3e7 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Sat, 18 Oct 2025 20:20:44 +1000 Subject: [PATCH 02/10] add jcstress tests --- .github/workflows/master.yml | 2 +- .github/workflows/pull_request.yml | 2 +- build.gradle | 7 +- .../DataLoaderDispatchJCStress.java | 100 ++++++++++++++++++ 4 files changed, 107 insertions(+), 4 deletions(-) create mode 100644 src/jcstress/java/org/dataloader/DataLoaderDispatchJCStress.java diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index e6aa3a2..d076296 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -28,6 +28,6 @@ jobs: - name: Setup Gradle uses: gradle/actions/setup-gradle@v5 - name: build test and publish - run: ./gradlew assemble && ./gradlew check --info && ./gradlew publishToSonatype closeAndReleaseSonatypeStagingRepository -x check --info --stacktrace + run: ./gradlew assemble && ./gradlew check --info && && ./gradlew jcstress && ./gradlew publishToSonatype closeAndReleaseSonatypeStagingRepository -x check --info --stacktrace env: CI: true diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 5258ad2..41a1e88 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -26,6 +26,6 @@ jobs: - name: Setup Gradle uses: gradle/actions/setup-gradle@v5 - name: build and test - run: ./gradlew assemble && ./gradlew check --info --stacktrace + run: ./gradlew assemble && ./gradlew check --info --stacktrace && ./gradlew jcstress env: CI: true diff --git a/build.gradle b/build.gradle index 7b5de5c..f2a0637 100644 --- a/build.gradle +++ b/build.gradle @@ -1,6 +1,7 @@ +import net.ltgt.gradle.errorprone.CheckSeverity import org.jetbrains.kotlin.gradle.dsl.JvmTarget import org.jetbrains.kotlin.gradle.dsl.KotlinVersion -import net.ltgt.gradle.errorprone.CheckSeverity + import java.text.SimpleDateFormat plugins { @@ -15,6 +16,7 @@ plugins { id 'com.github.ben-manes.versions' version '0.53.0' id "me.champeau.jmh" version "0.7.3" id "net.ltgt.errorprone" version '4.3.0' + id "io.github.reyerizo.gradle.jcstress" version "0.8.15" // Kotlin just for tests - not production code id 'org.jetbrains.kotlin.jvm' version '2.2.20' @@ -229,7 +231,8 @@ nexusPublishing { // https://central.sonatype.org/publish/publish-portal-ossrh-staging-api/#configuration nexusUrl.set(uri("https://ossrh-staging-api.central.sonatype.com/service/local/")) // GraphQL Java does not publish snapshots, but adding this URL for completeness - snapshotRepositoryUrl.set(uri("https://central.sonatype.com/repository/maven-snapshots/")) } + snapshotRepositoryUrl.set(uri("https://central.sonatype.com/repository/maven-snapshots/")) + } } } diff --git a/src/jcstress/java/org/dataloader/DataLoaderDispatchJCStress.java b/src/jcstress/java/org/dataloader/DataLoaderDispatchJCStress.java new file mode 100644 index 0000000..0226c0a --- /dev/null +++ b/src/jcstress/java/org/dataloader/DataLoaderDispatchJCStress.java @@ -0,0 +1,100 @@ +package org.dataloader; + +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Arbiter; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.II_Result; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE; + +@JCStressTest +@State +@Outcome(id = "2000, 2000", expect = ACCEPTABLE, desc = "accepted") +public class DataLoaderDispatchJCStress { + + + AtomicInteger counter = new AtomicInteger(); + AtomicInteger batchLoaderCount = new AtomicInteger(); + volatile boolean finished1; + volatile boolean finished2; + + + BatchLoader batchLoader = keys -> { + return CompletableFuture.supplyAsync(() -> { + batchLoaderCount.getAndAdd(keys.size()); + return keys; + }); + }; + DataLoader dataLoader = DataLoaderFactory.newDataLoader(batchLoader); + + public DataLoaderDispatchJCStress() { + + } + + @Actor + public void load1() { + for (int i = 0; i < 1000; i++) { + dataLoader.load("load-1-" + i); + } + finished1 = true; + } + + @Actor + public void load2() { + for (int i = 0; i < 1000; i++) { + dataLoader.load("load-2-" + i); + } + finished2 = true; + } + + + @Actor + public void dispatch1() { + while (!finished1 || !finished2) { + try { + List dispatchedResult = dataLoader.dispatch().get(); + counter.getAndAdd(dispatchedResult.size()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + try { + List dispatchedResult = dataLoader.dispatch().get(); + counter.getAndAdd(dispatchedResult.size()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Actor + public void dispatch2() { + while (!finished1 || !finished2) { + try { + List dispatchedResult = dataLoader.dispatch().get(); + counter.getAndAdd(dispatchedResult.size()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + try { + List dispatchedResult = dataLoader.dispatch().get(); + counter.getAndAdd(dispatchedResult.size()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Arbiter + public void arbiter(II_Result r) { + r.r1 = counter.get(); + r.r2 = batchLoaderCount.get(); + } + + +} From ee8d5685fe47b69a865957205590cb8b56ca137e Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Sun, 19 Oct 2025 06:42:51 +1000 Subject: [PATCH 03/10] add jcstress tests --- build.gradle | 4 ++++ ...ataLoaderBatchingAndCachingDispatchJCStress.java} | 12 ++++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) rename src/jcstress/java/org/dataloader/{DataLoaderDispatchJCStress.java => DataLoaderBatchingAndCachingDispatchJCStress.java} (87%) diff --git a/build.gradle b/build.gradle index f2a0637..e49a001 100644 --- a/build.gradle +++ b/build.gradle @@ -261,3 +261,7 @@ tasks.named("dependencyUpdates").configure { isNonStable(it.candidate.version) } } + +jcstress { + verbose = true +} \ No newline at end of file diff --git a/src/jcstress/java/org/dataloader/DataLoaderDispatchJCStress.java b/src/jcstress/java/org/dataloader/DataLoaderBatchingAndCachingDispatchJCStress.java similarity index 87% rename from src/jcstress/java/org/dataloader/DataLoaderDispatchJCStress.java rename to src/jcstress/java/org/dataloader/DataLoaderBatchingAndCachingDispatchJCStress.java index 0226c0a..c2df70f 100644 --- a/src/jcstress/java/org/dataloader/DataLoaderDispatchJCStress.java +++ b/src/jcstress/java/org/dataloader/DataLoaderBatchingAndCachingDispatchJCStress.java @@ -16,7 +16,7 @@ @JCStressTest @State @Outcome(id = "2000, 2000", expect = ACCEPTABLE, desc = "accepted") -public class DataLoaderDispatchJCStress { +public class DataLoaderBatchingAndCachingDispatchJCStress { AtomicInteger counter = new AtomicInteger(); @@ -33,12 +33,16 @@ public class DataLoaderDispatchJCStress { }; DataLoader dataLoader = DataLoaderFactory.newDataLoader(batchLoader); - public DataLoaderDispatchJCStress() { + public DataLoaderBatchingAndCachingDispatchJCStress() { } @Actor public void load1() { + for (int i = 0; i < 1000; i++) { + dataLoader.load("load-1-" + i); + } + // we load the same keys again for (int i = 0; i < 1000; i++) { dataLoader.load("load-1-" + i); } @@ -50,6 +54,10 @@ public void load2() { for (int i = 0; i < 1000; i++) { dataLoader.load("load-2-" + i); } + // we load the same keys again + for (int i = 0; i < 1000; i++) { + dataLoader.load("load-1-" + i); + } finished2 = true; } From bcf7ef6e4e486e2261b2026f7a9dc9e32a24e949 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Sun, 19 Oct 2025 07:19:11 +1000 Subject: [PATCH 04/10] add another jcstress test --- build.gradle | 2 +- ...DataLoader_Batching_Caching_JCStress.java} | 4 +- ...ataLoader_NoBatching_Caching_JCStress.java | 66 +++++++++++++++++++ src/main/java/org/dataloader/CacheMap.java | 12 +++- .../org/dataloader/impl/DefaultCacheMap.java | 5 ++ src/test/java/ReadmeExamples.java | 5 ++ .../dataloader/fixtures/CustomCacheMap.java | 5 ++ 7 files changed, 95 insertions(+), 4 deletions(-) rename src/jcstress/java/org/dataloader/{DataLoaderBatchingAndCachingDispatchJCStress.java => DataLoader_Batching_Caching_JCStress.java} (96%) create mode 100644 src/jcstress/java/org/dataloader/DataLoader_NoBatching_Caching_JCStress.java diff --git a/build.gradle b/build.gradle index e49a001..2384afd 100644 --- a/build.gradle +++ b/build.gradle @@ -263,5 +263,5 @@ tasks.named("dependencyUpdates").configure { } jcstress { - verbose = true +// verbose = true } \ No newline at end of file diff --git a/src/jcstress/java/org/dataloader/DataLoaderBatchingAndCachingDispatchJCStress.java b/src/jcstress/java/org/dataloader/DataLoader_Batching_Caching_JCStress.java similarity index 96% rename from src/jcstress/java/org/dataloader/DataLoaderBatchingAndCachingDispatchJCStress.java rename to src/jcstress/java/org/dataloader/DataLoader_Batching_Caching_JCStress.java index c2df70f..58f89c6 100644 --- a/src/jcstress/java/org/dataloader/DataLoaderBatchingAndCachingDispatchJCStress.java +++ b/src/jcstress/java/org/dataloader/DataLoader_Batching_Caching_JCStress.java @@ -16,7 +16,7 @@ @JCStressTest @State @Outcome(id = "2000, 2000", expect = ACCEPTABLE, desc = "accepted") -public class DataLoaderBatchingAndCachingDispatchJCStress { +public class DataLoader_Batching_Caching_JCStress { AtomicInteger counter = new AtomicInteger(); @@ -33,7 +33,7 @@ public class DataLoaderBatchingAndCachingDispatchJCStress { }; DataLoader dataLoader = DataLoaderFactory.newDataLoader(batchLoader); - public DataLoaderBatchingAndCachingDispatchJCStress() { + public DataLoader_Batching_Caching_JCStress() { } diff --git a/src/jcstress/java/org/dataloader/DataLoader_NoBatching_Caching_JCStress.java b/src/jcstress/java/org/dataloader/DataLoader_NoBatching_Caching_JCStress.java new file mode 100644 index 0000000..c7f9801 --- /dev/null +++ b/src/jcstress/java/org/dataloader/DataLoader_NoBatching_Caching_JCStress.java @@ -0,0 +1,66 @@ +package org.dataloader; + +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Arbiter; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.II_Result; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE; +import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE_INTERESTING; + +@JCStressTest +@State +@Outcome(id = "2000, 2000", expect = ACCEPTABLE, desc = "No keys loaded twice") +@Outcome(id = "2.*, 2000", expect = ACCEPTABLE_INTERESTING, desc = "Some keys loaded twice") +public class DataLoader_NoBatching_Caching_JCStress { + + + AtomicInteger batchLoaderCount = new AtomicInteger(); + volatile boolean finished1; + volatile boolean finished2; + + + BatchLoader batchLoader = keys -> { + batchLoaderCount.getAndAdd(keys.size()); + return CompletableFuture.completedFuture(keys); + }; + DataLoader dataLoader = DataLoaderFactory.newDataLoader(batchLoader, DataLoaderOptions.newOptions().setBatchingEnabled(false).build()); + + + @Actor + public void load1() { + for (int i = 0; i < 1000; i++) { + dataLoader.load("load-1-" + i); + } + // we load the same keys again + for (int i = 0; i < 1000; i++) { + dataLoader.load("load-1-" + i); + } + finished1 = true; + } + + @Actor + public void load2() { + for (int i = 0; i < 1000; i++) { + dataLoader.load("load-2-" + i); + } + // we load the same keys again + for (int i = 0; i < 1000; i++) { + dataLoader.load("load-1-" + i); + } + finished2 = true; + } + + + @Arbiter + public void arbiter(II_Result r) { + r.r1 = batchLoaderCount.get(); + r.r2 = dataLoader.getCacheMap().size(); + } + +} diff --git a/src/main/java/org/dataloader/CacheMap.java b/src/main/java/org/dataloader/CacheMap.java index 36913e6..dc60d97 100644 --- a/src/main/java/org/dataloader/CacheMap.java +++ b/src/main/java/org/dataloader/CacheMap.java @@ -74,10 +74,11 @@ static CacheMap simpleMap() { * * @return the cached value, or {@code null} if not found (depends on cache implementation) */ - @Nullable CompletableFuture get(K key); + @Nullable CompletableFuture get(K key); /** * Gets a collection of CompletableFutures from the cache map. + * * @return the collection of cached values */ Collection> getAll(); @@ -107,4 +108,13 @@ static CacheMap simpleMap() { * @return the cache map for fluent coding */ CacheMap clear(); + + /** + * Returns the current size of the cache. This is not used by DataLoader directly + * and intended for testing and debugging. + * If a cache doesn't support it, it can throw an Exception. + * + * @return + */ + int size(); } diff --git a/src/main/java/org/dataloader/impl/DefaultCacheMap.java b/src/main/java/org/dataloader/impl/DefaultCacheMap.java index c8e1ae2..10c1a5f 100644 --- a/src/main/java/org/dataloader/impl/DefaultCacheMap.java +++ b/src/main/java/org/dataloader/impl/DefaultCacheMap.java @@ -93,4 +93,9 @@ public CacheMap clear() { cache.clear(); return this; } + + @Override + public int size() { + return cache.size(); + } } diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java index 8cf5c70..05d1670 100644 --- a/src/test/java/ReadmeExamples.java +++ b/src/test/java/ReadmeExamples.java @@ -278,6 +278,11 @@ public CacheMap delete(Object key) { public CacheMap clear() { return null; } + + @Override + public int size() { + return 0; + } } private void customCache() { diff --git a/src/test/java/org/dataloader/fixtures/CustomCacheMap.java b/src/test/java/org/dataloader/fixtures/CustomCacheMap.java index 0a32ba2..c22ce1a 100644 --- a/src/test/java/org/dataloader/fixtures/CustomCacheMap.java +++ b/src/test/java/org/dataloader/fixtures/CustomCacheMap.java @@ -45,4 +45,9 @@ public CacheMap clear() { stash.clear(); return this; } + + @Override + public int size() { + return stash.size(); + } } From 5ba70ddaea86995de199e159435e1a291bf2e722 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Sun, 19 Oct 2025 07:25:14 +1000 Subject: [PATCH 05/10] jcstress tests --- .../DataLoader_Batching_Caching_JCStress.java | 2 +- ...ataLoader_NoBatching_Caching_JCStress.java | 20 ++++--------------- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/src/jcstress/java/org/dataloader/DataLoader_Batching_Caching_JCStress.java b/src/jcstress/java/org/dataloader/DataLoader_Batching_Caching_JCStress.java index 58f89c6..f0d8263 100644 --- a/src/jcstress/java/org/dataloader/DataLoader_Batching_Caching_JCStress.java +++ b/src/jcstress/java/org/dataloader/DataLoader_Batching_Caching_JCStress.java @@ -56,7 +56,7 @@ public void load2() { } // we load the same keys again for (int i = 0; i < 1000; i++) { - dataLoader.load("load-1-" + i); + dataLoader.load("load-2-" + i); } finished2 = true; } diff --git a/src/jcstress/java/org/dataloader/DataLoader_NoBatching_Caching_JCStress.java b/src/jcstress/java/org/dataloader/DataLoader_NoBatching_Caching_JCStress.java index c7f9801..6b46ed1 100644 --- a/src/jcstress/java/org/dataloader/DataLoader_NoBatching_Caching_JCStress.java +++ b/src/jcstress/java/org/dataloader/DataLoader_NoBatching_Caching_JCStress.java @@ -15,45 +15,33 @@ @JCStressTest @State -@Outcome(id = "2000, 2000", expect = ACCEPTABLE, desc = "No keys loaded twice") -@Outcome(id = "2.*, 2000", expect = ACCEPTABLE_INTERESTING, desc = "Some keys loaded twice") +@Outcome(id = "1000, 1000", expect = ACCEPTABLE, desc = "No keys loaded twice") +@Outcome(id = "1.*, 1000", expect = ACCEPTABLE_INTERESTING, desc = "Some keys loaded twice") public class DataLoader_NoBatching_Caching_JCStress { AtomicInteger batchLoaderCount = new AtomicInteger(); - volatile boolean finished1; - volatile boolean finished2; - BatchLoader batchLoader = keys -> { batchLoaderCount.getAndAdd(keys.size()); return CompletableFuture.completedFuture(keys); }; - DataLoader dataLoader = DataLoaderFactory.newDataLoader(batchLoader, DataLoaderOptions.newOptions().setBatchingEnabled(false).build()); + DataLoader dataLoader = DataLoaderFactory.newDataLoader(batchLoader, DataLoaderOptions.newOptions().setBatchingEnabled(false).build()); + @Actor public void load1() { for (int i = 0; i < 1000; i++) { dataLoader.load("load-1-" + i); } - // we load the same keys again - for (int i = 0; i < 1000; i++) { - dataLoader.load("load-1-" + i); - } - finished1 = true; } @Actor public void load2() { - for (int i = 0; i < 1000; i++) { - dataLoader.load("load-2-" + i); - } - // we load the same keys again for (int i = 0; i < 1000; i++) { dataLoader.load("load-1-" + i); } - finished2 = true; } From 002f872c7773b52397ca06788ed608df82a87614 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Tue, 21 Oct 2025 12:49:13 +1000 Subject: [PATCH 06/10] dispatch performance test --- .../performance/DataLoaderDispatchPerformance.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/jmh/java/performance/DataLoaderDispatchPerformance.java b/src/jmh/java/performance/DataLoaderDispatchPerformance.java index 0b4696d..ad2060c 100644 --- a/src/jmh/java/performance/DataLoaderDispatchPerformance.java +++ b/src/jmh/java/performance/DataLoaderDispatchPerformance.java @@ -12,6 +12,7 @@ import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; @@ -280,15 +281,20 @@ public void setup() { } + DataLoader ownerDL = DataLoaderFactory.newDataLoader(ownerBatchLoader); + DataLoader petDL = DataLoaderFactory.newDataLoader(petBatchLoader); + + } @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) + @Threads(Threads.MAX) public void loadAndDispatch(MyState myState, Blackhole blackhole) { - DataLoader ownerDL = DataLoaderFactory.newDataLoader(ownerBatchLoader); - DataLoader petDL = DataLoaderFactory.newDataLoader(petBatchLoader); + DataLoader ownerDL = myState.ownerDL; + DataLoader petDL = myState.petDL; for (Owner owner : owners.values()) { ownerDL.load(owner.id); From 16cf34051e4d7081345759f0a698a7978e5a5771 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Wed, 22 Oct 2025 08:42:48 +1000 Subject: [PATCH 07/10] keep order of keys without reversing --- .../java/org/dataloader/DataLoaderHelper.java | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index eeb6808..4c4ced3 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -19,8 +19,8 @@ import java.time.Clock; import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -228,9 +228,6 @@ DispatchResult dispatch() { DataLoaderInstrumentationContext> instrCtx = ctxOrNoopCtx(instrumentation().beginDispatch(dataLoader)); boolean batchingEnabled = loaderOptions.batchingEnabled(); - final List keys; - final List callContexts; - final List> queuedFutures; LoaderQueueEntry loaderQueueEntryHead; while (true) { @@ -246,21 +243,21 @@ DispatchResult dispatch() { } int queueSize = calcQueueDepth(loaderQueueEntryHead); // we copy the pre-loaded set of futures ready for dispatch - keys = new ArrayList<>(queueSize); - callContexts = new ArrayList<>(queueSize); - queuedFutures = new ArrayList<>(queueSize); - + Object[] keysArray = new Object[queueSize]; + CompletableFuture[] queuedFuturesArray = new CompletableFuture[queueSize]; + Object[] callContextsArray = new Object[queueSize]; + int index = queueSize - 1; while (loaderQueueEntryHead != null) { - keys.add(loaderQueueEntryHead.getKey()); - queuedFutures.add(loaderQueueEntryHead.getValue()); - callContexts.add(loaderQueueEntryHead.getCallContext()); + keysArray[index] = loaderQueueEntryHead.getKey(); + queuedFuturesArray[index] = loaderQueueEntryHead.getValue(); + callContextsArray[index] = loaderQueueEntryHead.getCallContext(); loaderQueueEntryHead = loaderQueueEntryHead.prev; + index--; } - //TODO: to many test depend on the previous order, therefore we reverse the lists here - // but this should not matter and we should change the tests - Collections.reverse(keys); - Collections.reverse(callContexts); - Collections.reverse(queuedFutures); + final List keys = (List) Arrays.asList(keysArray); + final List> queuedFutures = Arrays.asList(queuedFuturesArray); + final List callContexts = Arrays.asList(callContextsArray); + lastDispatchTime.set(now()); if (!batchingEnabled) { instrCtx.onDispatched(); From 9a9105ce322f23c0939d67a938f13b80e37e2e56 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Wed, 22 Oct 2025 12:55:39 +1000 Subject: [PATCH 08/10] don't calculate queue lenghth --- .../java/org/dataloader/DataLoaderHelper.java | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 4c4ced3..a0807b2 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -56,12 +56,14 @@ static class LoaderQueueEntry { final CompletableFuture value; final Object callContext; final LoaderQueueEntry prev; + final int queueSize; - public LoaderQueueEntry(K key, CompletableFuture value, Object callContext, LoaderQueueEntry prev) { + public LoaderQueueEntry(K key, CompletableFuture value, Object callContext, LoaderQueueEntry prev, int queueSize) { this.key = key; this.value = value; this.callContext = callContext; this.prev = prev; + this.queueSize = queueSize; } K getKey() { @@ -204,7 +206,7 @@ CompletableFuture load(K key, Object loadContext) { private void addEntryToLoaderQueue(K key, CompletableFuture future, Object loadContext) { while (true) { LoaderQueueEntry prev = loaderQueue.get(); - LoaderQueueEntry curr = new LoaderQueueEntry<>(key, future, loadContext, prev); + LoaderQueueEntry curr = new LoaderQueueEntry<>(key, future, loadContext, prev, prev != null ? prev.queueSize + 1 : 1); if (loaderQueue.compareAndSet(prev, curr)) { return; } @@ -241,7 +243,7 @@ DispatchResult dispatch() { instrCtx.onDispatched(); return endDispatchCtx(instrCtx, emptyDispatchResult()); } - int queueSize = calcQueueDepth(loaderQueueEntryHead); + int queueSize = loaderQueueEntryHead.queueSize; // we copy the pre-loaded set of futures ready for dispatch Object[] keysArray = new Object[queueSize]; CompletableFuture[] queuedFuturesArray = new CompletableFuture[queueSize]; @@ -633,16 +635,12 @@ private DataLoaderInstrumentation instrumentation() { } int dispatchDepth() { - return calcQueueDepth(loaderQueue.get()); - } - - private int calcQueueDepth(LoaderQueueEntry head) { - int count = 0; - while (head != null) { - count++; - head = head.prev; + LoaderQueueEntry loaderQueueEntry = loaderQueue.get(); + if (loaderQueueEntry != null) { + return loaderQueueEntry.queueSize; + } else { + return 0; } - return count; } From c026da61dca774293272040d8b7a6a9e02dce936 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Wed, 22 Oct 2025 12:56:25 +1000 Subject: [PATCH 09/10] remove guarded by --- src/main/java/org/dataloader/DataLoaderHelper.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index a0807b2..51ac634 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -1,6 +1,5 @@ package org.dataloader; -import org.dataloader.annotations.GuardedBy; import org.dataloader.annotations.Internal; import org.dataloader.impl.CompletableFutureKit; import org.dataloader.instrumentation.DataLoaderInstrumentation; @@ -144,7 +143,6 @@ Optional> getIfCompleted(K key) { } - @GuardedBy("lock") CompletableFuture load(K key, Object loadContext) { boolean batchingEnabled = loaderOptions.batchingEnabled(); boolean futureCachingEnabled = loaderOptions.cachingEnabled(); @@ -225,7 +223,6 @@ Object getCacheKeyWithContext(K key, Object context) { loaderOptions.cacheKeyFunction().get().getKeyWithContext(key, context) : key; } - @GuardedBy("lock") DispatchResult dispatch() { DataLoaderInstrumentationContext> instrCtx = ctxOrNoopCtx(instrumentation().beginDispatch(dataLoader)); From 54197a08cabc80e9f0a95e70c15b5963c56d8a1b Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Thu, 23 Oct 2025 10:03:18 +1000 Subject: [PATCH 10/10] name changing to putIfAbsentAtomically --- src/main/java/org/dataloader/CacheMap.java | 2 +- src/main/java/org/dataloader/DataLoader.java | 2 +- src/main/java/org/dataloader/DataLoaderHelper.java | 4 ++-- src/main/java/org/dataloader/impl/DefaultCacheMap.java | 2 +- src/test/java/ReadmeExamples.java | 2 +- src/test/java/org/dataloader/fixtures/CustomCacheMap.java | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/dataloader/CacheMap.java b/src/main/java/org/dataloader/CacheMap.java index dc60d97..b3929af 100644 --- a/src/main/java/org/dataloader/CacheMap.java +++ b/src/main/java/org/dataloader/CacheMap.java @@ -91,7 +91,7 @@ static CacheMap simpleMap() { * * @return the cache map for fluent coding */ - CompletableFuture setIfAbsent(K key, CompletableFuture value); + CompletableFuture putIfAbsentAtomically(K key, CompletableFuture value); /** * Deletes the entry with the specified key from the cache map, if it exists. diff --git a/src/main/java/org/dataloader/DataLoader.java b/src/main/java/org/dataloader/DataLoader.java index 4077e55..d80823f 100644 --- a/src/main/java/org/dataloader/DataLoader.java +++ b/src/main/java/org/dataloader/DataLoader.java @@ -448,7 +448,7 @@ public DataLoader prime(K key, Exception error) { */ public DataLoader prime(K key, CompletableFuture value) { Object cacheKey = getCacheKey(key); - futureCache.setIfAbsent(cacheKey, value); + futureCache.putIfAbsentAtomically(cacheKey, value); return this; } diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 51ac634..249c1f2 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -168,7 +168,7 @@ CompletableFuture load(K key, Object loadContext) { if (batchingEnabled) { loadCallFuture = new CompletableFuture<>(); if (futureCachingEnabled) { - CompletableFuture cachedFuture = futureCache.setIfAbsent(cacheKey, loadCallFuture); + CompletableFuture cachedFuture = futureCache.putIfAbsentAtomically(cacheKey, loadCallFuture); if (cachedFuture != null) { // another thread was faster and created a matching CF ... hence this is really a cachehit and we are done stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); @@ -183,7 +183,7 @@ CompletableFuture load(K key, Object loadContext) { // immediate execution of batch function loadCallFuture = invokeLoaderImmediately(key, loadContext, true); if (futureCachingEnabled) { - CompletableFuture cachedFuture = futureCache.setIfAbsent(cacheKey, loadCallFuture); + CompletableFuture cachedFuture = futureCache.putIfAbsentAtomically(cacheKey, loadCallFuture); if (cachedFuture != null) { // another thread was faster and the loader was invoked twice with the same key // we are disregarding the resul of our dispatch call and use the already cached value diff --git a/src/main/java/org/dataloader/impl/DefaultCacheMap.java b/src/main/java/org/dataloader/impl/DefaultCacheMap.java index 10c1a5f..b6c811f 100644 --- a/src/main/java/org/dataloader/impl/DefaultCacheMap.java +++ b/src/main/java/org/dataloader/impl/DefaultCacheMap.java @@ -72,7 +72,7 @@ public Collection> getAll() { * {@inheritDoc} */ @Override - public CompletableFuture setIfAbsent(K key, CompletableFuture value) { + public CompletableFuture putIfAbsentAtomically(K key, CompletableFuture value) { return cache.putIfAbsent(key, value); } diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java index 05d1670..6705cb8 100644 --- a/src/test/java/ReadmeExamples.java +++ b/src/test/java/ReadmeExamples.java @@ -265,7 +265,7 @@ public Collection> getAll() { } @Override - public CompletableFuture setIfAbsent(Object key, CompletableFuture value) { + public CompletableFuture putIfAbsentAtomically(Object key, CompletableFuture value) { return null; } diff --git a/src/test/java/org/dataloader/fixtures/CustomCacheMap.java b/src/test/java/org/dataloader/fixtures/CustomCacheMap.java index c22ce1a..6e20f68 100644 --- a/src/test/java/org/dataloader/fixtures/CustomCacheMap.java +++ b/src/test/java/org/dataloader/fixtures/CustomCacheMap.java @@ -30,7 +30,7 @@ public Collection> getAll() { } @Override - public CompletableFuture setIfAbsent(String key, CompletableFuture value) { + public CompletableFuture putIfAbsentAtomically(String key, CompletableFuture value) { return stash.putIfAbsent(key, value); }