Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ public synchronized void onNext(Map.Entry<K, V> entry) {
V value = entry.getValue();

Object callContext = callContextByKey.get(key);
List<CompletableFuture<V>> futures = queuedFuturesByKey.get(key);
List<CompletableFuture<V>> futures = queuedFuturesByKey.getOrDefault(key, List.of());

onNextValue(key, value, callContext, futures);

Expand Down
190 changes: 162 additions & 28 deletions src/test/java/org/dataloader/DataLoaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package org.dataloader;

import org.awaitility.Duration;
import org.dataloader.fixtures.CustomCacheMap;
import org.dataloader.fixtures.JsonObject;
import org.dataloader.fixtures.TestKit;
import org.dataloader.fixtures.User;
import org.dataloader.fixtures.UserManager;
import org.dataloader.impl.CompletableFutureKit;
import org.dataloader.impl.DataLoaderAssertionException;
import org.junit.jupiter.api.Named;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -35,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
Expand All @@ -47,6 +50,7 @@
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.CompletableFuture.*;
import static org.awaitility.Awaitility.await;
import static org.dataloader.DataLoaderFactory.newDataLoader;
import static org.dataloader.DataLoaderFactory.newMappedDataLoader;
Expand Down Expand Up @@ -104,7 +108,7 @@ public void basic_map_batch_loading() {
mapOfResults.put(k, k);
}
});
return CompletableFuture.completedFuture(mapOfResults);
return completedFuture(mapOfResults);
};
DataLoader<String, String> loader = DataLoaderFactory.newMappedDataLoader(evensOnlyMappedBatchLoader);

Expand Down Expand Up @@ -424,7 +428,7 @@ public void should_Allow_priming_the_cache_with_a_future(TestDataLoaderFactory f
List<Collection<String>> loadCalls = new ArrayList<>();
DataLoader<String, String> identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls);

DataLoader<String, String> dlFluency = identityLoader.prime("A", CompletableFuture.completedFuture("A"));
DataLoader<String, String> dlFluency = identityLoader.prime("A", completedFuture("A"));
assertThat(dlFluency, equalTo(identityLoader));

CompletableFuture<String> future1 = identityLoader.load("A");
Expand Down Expand Up @@ -992,7 +996,7 @@ public void batches_multiple_requests_with_max_batch_size(TestDataLoaderFactory

identityLoader.dispatch();

CompletableFuture.allOf(f1, f2, f3).join();
allOf(f1, f2, f3).join();

assertThat(f1.join(), equalTo(1));
assertThat(f2.join(), equalTo(2));
Expand Down Expand Up @@ -1035,13 +1039,13 @@ public void should_Batch_loads_occurring_within_futures(TestDataLoaderFactory fa

AtomicBoolean v4Called = new AtomicBoolean();

CompletableFuture.supplyAsync(nullValue).thenAccept(v1 -> {
supplyAsync(nullValue).thenAccept(v1 -> {
identityLoader.load("a");
CompletableFuture.supplyAsync(nullValue).thenAccept(v2 -> {
supplyAsync(nullValue).thenAccept(v2 -> {
identityLoader.load("b");
CompletableFuture.supplyAsync(nullValue).thenAccept(v3 -> {
supplyAsync(nullValue).thenAccept(v3 -> {
identityLoader.load("c");
CompletableFuture.supplyAsync(nullValue).thenAccept(
supplyAsync(nullValue).thenAccept(
v4 -> {
identityLoader.load("d");
v4Called.set(true);
Expand All @@ -1058,12 +1062,68 @@ public void should_Batch_loads_occurring_within_futures(TestDataLoaderFactory fa
singletonList(asList("a", "b", "c", "d"))));
}

@ParameterizedTest
@MethodSource("dataLoaderFactories")
public void should_blowup_after_N_keys(TestDataLoaderFactory factory) {
if (!(factory instanceof TestReactiveDataLoaderFactory)) {
return;
}
//
// if we blow up after emitting N keys, the N keys should work but the rest of the keys
// should be exceptional
DataLoader<Integer, Integer> identityLoader = ((TestReactiveDataLoaderFactory) factory).idLoaderBlowsUpsAfterN(3, new DataLoaderOptions(), new ArrayList<>());
CompletableFuture<Integer> cf1 = identityLoader.load(1);
CompletableFuture<Integer> cf2 = identityLoader.load(2);
CompletableFuture<Integer> cf3 = identityLoader.load(3);
CompletableFuture<Integer> cf4 = identityLoader.load(4);
CompletableFuture<Integer> cf5 = identityLoader.load(5);
identityLoader.dispatch();
await().until(cf5::isDone);

assertThat(cf1.join(), equalTo(1));
assertThat(cf2.join(), equalTo(2));
assertThat(cf3.join(), equalTo(3));
assertThat(cf4.isCompletedExceptionally(), is(true));
assertThat(cf5.isCompletedExceptionally(), is(true));

}

@ParameterizedTest
@MethodSource("dataLoaderFactories")
public void should_assert_values_size_equals_key_size(TestDataLoaderFactory factory) {
//
// what happens if we want 4 values but are only given 2 back say
//
DataLoader<String, String> identityLoader = factory.onlyReturnsNValues(2, new DataLoaderOptions(), new ArrayList<>());
CompletableFuture<String> cf1 = identityLoader.load("A");
CompletableFuture<String> cf2 = identityLoader.load("B");
CompletableFuture<String> cf3 = identityLoader.load("C");
CompletableFuture<String> cf4 = identityLoader.load("D");
identityLoader.dispatch();

await().atMost(Duration.FIVE_HUNDRED_MILLISECONDS).until(() -> cf1.isDone() && cf2.isDone() && cf3.isDone() && cf4.isDone());

if (factory instanceof ListDataLoaderFactory | factory instanceof PublisherDataLoaderFactory) {
assertThat(cause(cf1), instanceOf(DataLoaderAssertionException.class));
assertThat(cause(cf2), instanceOf(DataLoaderAssertionException.class));
assertThat(cause(cf3), instanceOf(DataLoaderAssertionException.class));
assertThat(cause(cf4), instanceOf(DataLoaderAssertionException.class));
} else {
// with the maps it's ok to have fewer results
assertThat(cf1.join(), equalTo("A"));
assertThat(cf2.join(), equalTo("B"));
assertThat(cf3.join(), equalTo(null));
assertThat(cf4.join(), equalTo(null));
}

}

@Test
public void can_call_a_loader_from_a_loader() throws Exception {
List<Collection<String>> deepLoadCalls = new ArrayList<>();
DataLoader<String, String> deepLoader = newDataLoader(keys -> {
deepLoadCalls.add(keys);
return CompletableFuture.completedFuture(keys);
return completedFuture(keys);
});

List<Collection<String>> aLoadCalls = new ArrayList<>();
Expand All @@ -1083,7 +1143,7 @@ public void can_call_a_loader_from_a_loader() throws Exception {
CompletableFuture<String> b1 = bLoader.load("B1");
CompletableFuture<String> b2 = bLoader.load("B2");

CompletableFuture.allOf(
allOf(
aLoader.dispatch(),
deepLoader.dispatch(),
bLoader.dispatch(),
Expand All @@ -1109,11 +1169,10 @@ public void can_call_a_loader_from_a_loader() throws Exception {
public void should_allow_composition_of_data_loader_calls() {
UserManager userManager = new UserManager();

BatchLoader<Long, User> userBatchLoader = userIds -> CompletableFuture
.supplyAsync(() -> userIds
.stream()
.map(userManager::loadUserById)
.collect(Collectors.toList()));
BatchLoader<Long, User> userBatchLoader = userIds -> supplyAsync(() -> userIds
.stream()
.map(userManager::loadUserById)
.collect(Collectors.toList()));
DataLoader<Long, User> userLoader = newDataLoader(userBatchLoader);

AtomicBoolean gandalfCalled = new AtomicBoolean(false);
Expand Down Expand Up @@ -1160,17 +1219,26 @@ private static Stream<Arguments> dataLoaderFactories() {

public interface TestDataLoaderFactory {
<K> DataLoader<K, K> idLoader(DataLoaderOptions options, List<Collection<K>> loadCalls);

<K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Collection<K>> loadCalls);

<K> DataLoader<K, Object> idLoaderAllExceptions(DataLoaderOptions options, List<Collection<K>> loadCalls);

DataLoader<Integer, Object> idLoaderOddEvenExceptions(DataLoaderOptions options, List<Collection<Integer>> loadCalls);

DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls);
}

public interface TestReactiveDataLoaderFactory {
<K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List<Collection<K>> loadCalls);
}

private static class ListDataLoaderFactory implements TestDataLoaderFactory {
@Override
public <K> DataLoader<K, K> idLoader(DataLoaderOptions options, List<Collection<K>> loadCalls) {
return newDataLoader(keys -> {
loadCalls.add(new ArrayList<>(keys));
return CompletableFuture.completedFuture(keys);
return completedFuture(keys);
}, options);
}

Expand All @@ -1189,7 +1257,7 @@ public <K> DataLoader<K, Object> idLoaderAllExceptions(DataLoaderOptions options
loadCalls.add(new ArrayList<>(keys));

List<Object> errors = keys.stream().map(k -> new IllegalStateException("Error")).collect(Collectors.toList());
return CompletableFuture.completedFuture(errors);
return completedFuture(errors);
}, options);
}

Expand All @@ -1206,7 +1274,15 @@ public DataLoader<Integer, Object> idLoaderOddEvenExceptions(DataLoaderOptions o
errors.add(new IllegalStateException("Error"));
}
}
return CompletableFuture.completedFuture(errors);
return completedFuture(errors);
}, options);
}

@Override
public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
return newDataLoader(keys -> {
loadCalls.add(new ArrayList<>(keys));
return completedFuture(keys.subList(0, N));
}, options);
}
}
Expand All @@ -1220,7 +1296,7 @@ public <K> DataLoader<K, K> idLoader(
loadCalls.add(new ArrayList<>(keys));
Map<K, K> map = new HashMap<>();
keys.forEach(k -> map.put(k, k));
return CompletableFuture.completedFuture(map);
return completedFuture(map);
}, options);
}

Expand All @@ -1239,7 +1315,7 @@ public <K> DataLoader<K, Object> idLoaderAllExceptions(
loadCalls.add(new ArrayList<>(keys));
Map<K, Object> errorByKey = new HashMap<>();
keys.forEach(k -> errorByKey.put(k, new IllegalStateException("Error")));
return CompletableFuture.completedFuture(errorByKey);
return completedFuture(errorByKey);
}, options);
}

Expand All @@ -1257,16 +1333,28 @@ public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
errorByKey.put(key, new IllegalStateException("Error"));
}
}
return CompletableFuture.completedFuture(errorByKey);
return completedFuture(errorByKey);
}, options);
}

@Override
public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
return newMappedDataLoader(keys -> {
loadCalls.add(new ArrayList<>(keys));

Map<String, String> collect = List.copyOf(keys).subList(0, N).stream().collect(Collectors.toMap(
k -> k, v -> v
));
return completedFuture(collect);
}, options);
}
}

private static class PublisherDataLoaderFactory implements TestDataLoaderFactory {
private static class PublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory {

@Override
public <K> DataLoader<K, K> idLoader(
DataLoaderOptions options, List<Collection<K>> loadCalls) {
DataLoaderOptions options, List<Collection<K>> loadCalls) {
return newPublisherDataLoader((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));
Flux.fromIterable(keys).subscribe(subscriber);
Expand All @@ -1283,7 +1371,7 @@ public <K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Col

@Override
public <K> DataLoader<K, Object> idLoaderAllExceptions(
DataLoaderOptions options, List<Collection<K>> loadCalls) {
DataLoaderOptions options, List<Collection<K>> loadCalls) {
return newPublisherDataLoaderWithTry((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));
Stream<Try<Object>> failures = keys.stream().map(k -> Try.failed(new IllegalStateException("Error")));
Expand All @@ -1293,7 +1381,7 @@ public <K> DataLoader<K, Object> idLoaderAllExceptions(

@Override
public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
return newPublisherDataLoaderWithTry((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));

Expand All @@ -1308,13 +1396,36 @@ public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
Flux.fromIterable(errors).subscribe(subscriber);
}, options);
}

@Override
public <K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List<Collection<K>> loadCalls) {
return newPublisherDataLoader((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));

List<K> nKeys = keys.subList(0, N);
Flux<K> subFlux = Flux.fromIterable(nKeys);
subFlux.concatWith(Flux.error(new IllegalStateException("Error")))
.subscribe(subscriber);
}, options);
}

@Override
public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
return newPublisherDataLoader((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));

List<String> nKeys = keys.subList(0, N);
Flux.fromIterable(nKeys)
.subscribe(subscriber);
}, options);
}
}

private static class MappedPublisherDataLoaderFactory implements TestDataLoaderFactory {
private static class MappedPublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory {

@Override
public <K> DataLoader<K, K> idLoader(
DataLoaderOptions options, List<Collection<K>> loadCalls) {
DataLoaderOptions options, List<Collection<K>> loadCalls) {
return newMappedPublisherDataLoader((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));
Map<K, K> map = new HashMap<>();
Expand All @@ -1333,7 +1444,7 @@ public <K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Col

@Override
public <K> DataLoader<K, Object> idLoaderAllExceptions(
DataLoaderOptions options, List<Collection<K>> loadCalls) {
DataLoaderOptions options, List<Collection<K>> loadCalls) {
return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));
Stream<Map.Entry<K, Try<Object>>> failures = keys.stream().map(k -> Map.entry(k, Try.failed(new IllegalStateException("Error"))));
Expand All @@ -1343,7 +1454,7 @@ public <K> DataLoader<K, Object> idLoaderAllExceptions(

@Override
public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));

Expand All @@ -1358,6 +1469,29 @@ public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
Flux.fromIterable(errorByKey.entrySet()).subscribe(subscriber);
}, options);
}

@Override
public <K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List<Collection<K>> loadCalls) {
return newMappedPublisherDataLoader((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));

List<K> nKeys = keys.subList(0, N);
Flux<Map.Entry<K, K>> subFlux = Flux.fromIterable(nKeys).map(k -> Map.entry(k, k));
subFlux.concatWith(Flux.error(new IllegalStateException("Error")))
.subscribe(subscriber);
}, options);
}

@Override
public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
return newMappedPublisherDataLoader((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));

List<String> nKeys = keys.subList(0, N);
Flux.fromIterable(nKeys).map(k -> Map.entry(k, k))
.subscribe(subscriber);
}, options);
}
}

private static class ThrowingCacheMap extends CustomCacheMap {
Expand Down