diff --git a/src/main/java/graphql/execution/AsyncExecutionStrategy.java b/src/main/java/graphql/execution/AsyncExecutionStrategy.java index d2e196ca15..20ec1daa4f 100644 --- a/src/main/java/graphql/execution/AsyncExecutionStrategy.java +++ b/src/main/java/graphql/execution/AsyncExecutionStrategy.java @@ -1,9 +1,6 @@ package graphql.execution; import graphql.ExecutionResult; -import graphql.execution.defer.DeferSupport; -import graphql.execution.defer.DeferredCall; -import graphql.execution.defer.DeferredErrorSupport; import graphql.execution.instrumentation.DeferredFieldInstrumentationContext; import graphql.execution.instrumentation.ExecutionStrategyInstrumentationContext; import graphql.execution.instrumentation.Instrumentation; @@ -13,16 +10,12 @@ import graphql.schema.GraphQLObjectType; import java.util.ArrayList; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.function.Supplier; import java.util.stream.Collectors; -import static graphql.execution.MergedSelectionSet.newMergedSelectionSet; - /** * The standard graphql execution strategy that runs fields asynchronously non-blocking. */ @@ -65,14 +58,7 @@ public CompletableFuture execute(ExecutionContext executionCont .transform(builder -> builder.field(currentField).path(fieldPath).parent(parameters)); resolvedFields.add(fieldName); - CompletableFuture future; - - if (isDeferred(executionContext, newParameters, currentField)) { - executionStrategyCtx.onDeferredField(currentField); - future = resolveFieldWithInfoToNull(executionContext, newParameters); - } else { - future = resolveFieldWithInfo(executionContext, newParameters); - } + CompletableFuture future = resolveFieldWithInfo(executionContext, newParameters); futures.add(future); } CompletableFuture overallResult = new CompletableFuture<>(); @@ -99,33 +85,6 @@ public CompletableFuture execute(ExecutionContext executionCont return overallResult; } - private boolean isDeferred(ExecutionContext executionContext, ExecutionStrategyParameters parameters, MergedField currentField) { - DeferSupport deferSupport = executionContext.getDeferSupport(); - if (deferSupport.checkForDeferDirective(currentField, executionContext.getVariables())) { - DeferredErrorSupport errorSupport = new DeferredErrorSupport(); - - // with a deferred field we are really resetting where we execute from, that is from this current field onwards - Map fields = new LinkedHashMap<>(); - fields.put(currentField.getName(), currentField); - - ExecutionStrategyParameters callParameters = parameters.transform(builder -> - { - MergedSelectionSet mergedSelectionSet = newMergedSelectionSet().subFields(fields).build(); - builder.deferredErrorSupport(errorSupport) - .field(currentField) - .fields(mergedSelectionSet) - .parent(null) // this is a break in the parent -> child chain - its a new start effectively - .listSize(0) - .currentListIndex(0); - } - ); - - DeferredCall call = new DeferredCall(parameters.getPath(), deferredExecutionResult(executionContext, callParameters), errorSupport); - deferSupport.enqueue(call); - return true; - } - return false; - } @SuppressWarnings("FutureReturnValueIgnored") private Supplier> deferredExecutionResult(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { diff --git a/src/main/java/graphql/execution/Execution.java b/src/main/java/graphql/execution/Execution.java index fa7933cedb..0d997ba42f 100644 --- a/src/main/java/graphql/execution/Execution.java +++ b/src/main/java/graphql/execution/Execution.java @@ -1,14 +1,11 @@ package graphql.execution; -import graphql.DeferredExecutionResult; import graphql.ExecutionInput; import graphql.ExecutionResult; import graphql.ExecutionResultImpl; -import graphql.GraphQL; import graphql.GraphQLError; import graphql.Internal; -import graphql.execution.defer.DeferSupport; import graphql.execution.instrumentation.Instrumentation; import graphql.execution.instrumentation.InstrumentationContext; import graphql.execution.instrumentation.InstrumentationState; @@ -22,7 +19,6 @@ import graphql.schema.GraphQLObjectType; import graphql.schema.GraphQLSchema; import graphql.util.LogKit; -import org.reactivestreams.Publisher; import org.slf4j.Logger; import java.util.Collections; @@ -183,27 +179,9 @@ private CompletableFuture executeOperation(ExecutionContext exe result = result.whenComplete(executeOperationCtx::onCompleted); - return deferSupport(executionContext, result); + return result; } - /* - * Adds the deferred publisher if its needed at the end of the query. This is also a good time for the deferred code to start running - */ - private CompletableFuture deferSupport(ExecutionContext executionContext, CompletableFuture result) { - return result.thenApply(er -> { - DeferSupport deferSupport = executionContext.getDeferSupport(); - if (deferSupport.isDeferDetected()) { - // we start the rest of the query now to maximize throughput. We have the initial important results - // and now we can start the rest of the calls as early as possible (even before some one subscribes) - Publisher publisher = deferSupport.startDeferredCalls(); - return ExecutionResultImpl.newExecutionResult().from(er) - .addExtension(GraphQL.DEFERRED_RESULTS, publisher) - .build(); - } - return er; - }); - - } private GraphQLObjectType getOperationRootType(GraphQLSchema graphQLSchema, OperationDefinition operationDefinition) { OperationDefinition.Operation operation = operationDefinition.getOperation(); diff --git a/src/main/java/graphql/execution/ExecutionContext.java b/src/main/java/graphql/execution/ExecutionContext.java index 7001e8145e..3c1f137df9 100644 --- a/src/main/java/graphql/execution/ExecutionContext.java +++ b/src/main/java/graphql/execution/ExecutionContext.java @@ -3,10 +3,8 @@ import graphql.ExecutionInput; import graphql.GraphQLError; -import graphql.Internal; import graphql.PublicApi; import graphql.cachecontrol.CacheControl; -import graphql.execution.defer.DeferSupport; import graphql.execution.instrumentation.Instrumentation; import graphql.execution.instrumentation.InstrumentationState; import graphql.language.Document; @@ -47,7 +45,6 @@ public class ExecutionContext { private final DataLoaderRegistry dataLoaderRegistry; private final CacheControl cacheControl; private final Locale locale; - private final DeferSupport deferSupport = new DeferSupport(); private final ValueUnboxer valueUnboxer; private final ExecutionInput executionInput; @@ -198,10 +195,6 @@ public ExecutionStrategy getSubscriptionStrategy() { return subscriptionStrategy; } - public DeferSupport getDeferSupport() { - return deferSupport; - } - /** * This helps you transform the current ExecutionContext object into another one by starting a builder with all * the current values and allows you to transform it how you want. diff --git a/src/main/java/graphql/execution/ExecutionStrategy.java b/src/main/java/graphql/execution/ExecutionStrategy.java index bd74118ce4..46eec2aaa7 100644 --- a/src/main/java/graphql/execution/ExecutionStrategy.java +++ b/src/main/java/graphql/execution/ExecutionStrategy.java @@ -345,7 +345,6 @@ private void handleFetchingException(ExecutionContext executionContext, DataFetcherExceptionHandlerResult handlerResult = dataFetcherExceptionHandler.onException(handlerParameters); handlerResult.getErrors().forEach(executionContext::addError); - parameters.deferredErrorSupport().onFetchingException(parameters, e); } /** @@ -461,7 +460,6 @@ private void handleUnresolvedTypeProblem(ExecutionContext context, ExecutionStra logNotSafe.warn(error.getMessage(), e); context.addError(error); - parameters.deferredErrorSupport().onError(error); } private CompletableFuture completeValueForNull(ExecutionStrategyParameters parameters) { @@ -669,7 +667,6 @@ private Object handleCoercionProblem(ExecutionContext context, ExecutionStrategy logNotSafe.warn(error.getMessage(), e); context.addError(error); - parameters.deferredErrorSupport().onError(error); return null; } @@ -708,7 +705,6 @@ private void handleTypeMismatchProblem(ExecutionContext context, ExecutionStrate logNotSafe.warn("{} got {}", error.getMessage(), result.getClass()); context.addError(error); - parameters.deferredErrorSupport().onError(error); } diff --git a/src/main/java/graphql/execution/ExecutionStrategyParameters.java b/src/main/java/graphql/execution/ExecutionStrategyParameters.java index edcf95db39..0b2e3d7ef7 100644 --- a/src/main/java/graphql/execution/ExecutionStrategyParameters.java +++ b/src/main/java/graphql/execution/ExecutionStrategyParameters.java @@ -2,7 +2,6 @@ import graphql.Assert; import graphql.PublicApi; -import graphql.execution.defer.DeferredErrorSupport; import java.util.Map; import java.util.function.Consumer; @@ -25,7 +24,6 @@ public class ExecutionStrategyParameters { private final int listSize; private final int currentListIndex; private final ExecutionStrategyParameters parent; - private final DeferredErrorSupport deferredErrorSupport; private ExecutionStrategyParameters(ExecutionStepInfo executionStepInfo, Object source, @@ -37,8 +35,7 @@ private ExecutionStrategyParameters(ExecutionStepInfo executionStepInfo, MergedField currentField, int listSize, int currentListIndex, - ExecutionStrategyParameters parent, - DeferredErrorSupport deferredErrorSupport) { + ExecutionStrategyParameters parent) { this.executionStepInfo = assertNotNull(executionStepInfo, () -> "executionStepInfo is null"); this.localContext = localContext; @@ -51,7 +48,6 @@ private ExecutionStrategyParameters(ExecutionStepInfo executionStepInfo, this.listSize = listSize; this.currentListIndex = currentListIndex; this.parent = parent; - this.deferredErrorSupport = deferredErrorSupport; } public ExecutionStepInfo getExecutionStepInfo() { @@ -94,10 +90,6 @@ public ExecutionStrategyParameters getParent() { return parent; } - public DeferredErrorSupport deferredErrorSupport() { - return deferredErrorSupport; - } - /** * This returns the current field in its query representations. * @@ -139,7 +131,6 @@ public static class Builder { int listSize; int currentListIndex; ExecutionStrategyParameters parent; - DeferredErrorSupport deferredErrorSupport = new DeferredErrorSupport(); /** * @see ExecutionStrategyParameters#newParameters() @@ -158,7 +149,6 @@ private Builder(ExecutionStrategyParameters oldParameters) { this.arguments = oldParameters.arguments; this.nonNullableFieldValidator = oldParameters.nonNullableFieldValidator; this.currentField = oldParameters.currentField; - this.deferredErrorSupport = oldParameters.deferredErrorSupport; this.path = oldParameters.path; this.parent = oldParameters.parent; this.listSize = oldParameters.listSize; @@ -225,13 +215,9 @@ public Builder parent(ExecutionStrategyParameters parent) { return this; } - public Builder deferredErrorSupport(DeferredErrorSupport deferredErrorSupport) { - this.deferredErrorSupport = deferredErrorSupport; - return this; - } public ExecutionStrategyParameters build() { - return new ExecutionStrategyParameters(executionStepInfo, source, localContext, fields, arguments, nonNullableFieldValidator, path, currentField, listSize, currentListIndex, parent, deferredErrorSupport); + return new ExecutionStrategyParameters(executionStepInfo, source, localContext, fields, arguments, nonNullableFieldValidator, path, currentField, listSize, currentListIndex, parent); } } } diff --git a/src/main/java/graphql/execution/defer/DeferSupport.java b/src/main/java/graphql/execution/defer/DeferSupport.java deleted file mode 100644 index 7e6fb97429..0000000000 --- a/src/main/java/graphql/execution/defer/DeferSupport.java +++ /dev/null @@ -1,82 +0,0 @@ -package graphql.execution.defer; - -import graphql.DeferredExecutionResult; -import graphql.Directives; -import graphql.ExecutionResult; -import graphql.Internal; -import graphql.execution.MergedField; -import graphql.execution.ValuesResolver; -import graphql.execution.reactive.SingleSubscriberPublisher; -import graphql.language.Directive; -import graphql.language.Field; -import org.reactivestreams.Publisher; - -import java.util.Deque; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.atomic.AtomicBoolean; - -import static graphql.Directives.*; - -/** - * This provides support for @defer directives on fields that mean that results will be sent AFTER - * the main result is sent via a Publisher stream. - */ -@Internal -public class DeferSupport { - - private final AtomicBoolean deferDetected = new AtomicBoolean(false); - private final Deque deferredCalls = new ConcurrentLinkedDeque<>(); - private final SingleSubscriberPublisher publisher = new SingleSubscriberPublisher<>(); - private final ValuesResolver valuesResolver = new ValuesResolver(); - - public boolean checkForDeferDirective(MergedField currentField, Map variables) { - for (Field field : currentField.getFields()) { - Directive directive = field.getDirective(DeferDirective.getName()); - if (directive != null) { - Map argumentValues = valuesResolver.getArgumentValues(DeferDirective.getArguments(), directive.getArguments(), variables); - return (Boolean) argumentValues.get("if"); - } - } - return false; - } - - @SuppressWarnings("FutureReturnValueIgnored") - private void drainDeferredCalls() { - if (deferredCalls.isEmpty()) { - publisher.noMoreData(); - return; - } - DeferredCall deferredCall = deferredCalls.pop(); - CompletableFuture future = deferredCall.invoke(); - future.whenComplete((executionResult, exception) -> { - if (exception != null) { - publisher.offerError(exception); - return; - } - publisher.offer(executionResult); - drainDeferredCalls(); - }); - } - - public void enqueue(DeferredCall deferredCall) { - deferDetected.set(true); - deferredCalls.offer(deferredCall); - } - - public boolean isDeferDetected() { - return deferDetected.get(); - } - - /** - * When this is called the deferred execution will begin - * - * @return the publisher of deferred results - */ - public Publisher startDeferredCalls() { - drainDeferredCalls(); - return publisher; - } -} diff --git a/src/main/java/graphql/execution/defer/DeferredCall.java b/src/main/java/graphql/execution/defer/DeferredCall.java deleted file mode 100644 index f1da0c8683..0000000000 --- a/src/main/java/graphql/execution/defer/DeferredCall.java +++ /dev/null @@ -1,43 +0,0 @@ -package graphql.execution.defer; - -import graphql.DeferredExecutionResult; -import graphql.DeferredExecutionResultImpl; -import graphql.ExecutionResult; -import graphql.GraphQLError; -import graphql.Internal; -import graphql.execution.ExecutionPath; - -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; - -/** - * This represents a deferred call (aka @defer) to get an execution result sometime after - * the initial query has returned - */ -@Internal -public class DeferredCall { - private final ExecutionPath path; - private final Supplier> call; - private final DeferredErrorSupport errorSupport; - - public DeferredCall(ExecutionPath path, Supplier> call, DeferredErrorSupport deferredErrorSupport) { - this.path = path; - this.call = call; - this.errorSupport = deferredErrorSupport; - } - - CompletableFuture invoke() { - CompletableFuture future = call.get(); - return future.thenApply(this::transformToDeferredResult); - } - - private DeferredExecutionResult transformToDeferredResult(ExecutionResult executionResult) { - List errorsEncountered = errorSupport.getErrors(); - DeferredExecutionResultImpl.Builder builder = DeferredExecutionResultImpl.newDeferredExecutionResult().from(executionResult); - return builder - .addErrors(errorsEncountered) - .path(path) - .build(); - } -} diff --git a/src/main/java/graphql/execution/defer/DeferredErrorSupport.java b/src/main/java/graphql/execution/defer/DeferredErrorSupport.java deleted file mode 100644 index 3ef4f34c5d..0000000000 --- a/src/main/java/graphql/execution/defer/DeferredErrorSupport.java +++ /dev/null @@ -1,31 +0,0 @@ -package graphql.execution.defer; - -import graphql.ExceptionWhileDataFetching; -import graphql.GraphQLError; -import graphql.Internal; -import graphql.execution.ExecutionStrategyParameters; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -/** - * This captures errors that occur while a deferred call is being made - */ -@Internal -public class DeferredErrorSupport { - - private final List errors = new CopyOnWriteArrayList<>(); - - public void onFetchingException(ExecutionStrategyParameters parameters, Throwable e) { - ExceptionWhileDataFetching error = new ExceptionWhileDataFetching(parameters.getPath(), e, parameters.getField().getSingleField().getSourceLocation()); - onError(error); - } - - public void onError(GraphQLError gError) { - errors.add(gError); - } - - public List getErrors() { - return errors; - } -} diff --git a/src/test/groovy/graphql/execution/defer/BasicSubscriber.groovy b/src/test/groovy/graphql/execution/defer/BasicSubscriber.groovy deleted file mode 100644 index e1f9be0cc5..0000000000 --- a/src/test/groovy/graphql/execution/defer/BasicSubscriber.groovy +++ /dev/null @@ -1,35 +0,0 @@ -package graphql.execution.defer - -import graphql.DeferredExecutionResult -import graphql.ExecutionResult -import org.reactivestreams.Subscriber -import org.reactivestreams.Subscription - -import java.util.concurrent.atomic.AtomicBoolean - -class BasicSubscriber implements Subscriber { - Subscription subscription - AtomicBoolean finished = new AtomicBoolean() - Throwable throwable - - @Override - void onSubscribe(Subscription s) { - assert s != null, "subscription must not be null" - this.subscription = s - s.request(1) - } - - @Override - void onNext(DeferredExecutionResult executionResult) { - } - - @Override - void onError(Throwable t) { - finished.set(true) - } - - @Override - void onComplete() { - finished.set(true) - } -} diff --git a/src/test/groovy/graphql/execution/defer/CapturingSubscriber.groovy b/src/test/groovy/graphql/execution/defer/CapturingSubscriber.groovy deleted file mode 100644 index 7245f24ee9..0000000000 --- a/src/test/groovy/graphql/execution/defer/CapturingSubscriber.groovy +++ /dev/null @@ -1,45 +0,0 @@ -package graphql.execution.defer - -import graphql.DeferredExecutionResult -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber -import org.reactivestreams.Subscription - -import java.util.concurrent.atomic.AtomicBoolean - -class CapturingSubscriber implements Subscriber { - Subscription subscription - AtomicBoolean finished = new AtomicBoolean() - Throwable throwable - List executionResults = [] - List executionResultData = [] - - AtomicBoolean subscribeTo(Publisher publisher) { - publisher.subscribe(this) - return finished - } - - @Override - void onSubscribe(Subscription s) { - assert s != null, "subscription must not be null" - this.subscription = s - s.request(1) - } - - @Override - void onNext(DeferredExecutionResult executionResult) { - executionResults.add(executionResult) - executionResultData.add(executionResult.getData()) - subscription.request(1) - } - - @Override - void onError(Throwable t) { - finished.set(true) - } - - @Override - void onComplete() { - finished.set(true) - } -} diff --git a/src/test/groovy/graphql/execution/defer/DeferSupportIntegrationTest.groovy b/src/test/groovy/graphql/execution/defer/DeferSupportIntegrationTest.groovy deleted file mode 100644 index 6a1a6b8d70..0000000000 --- a/src/test/groovy/graphql/execution/defer/DeferSupportIntegrationTest.groovy +++ /dev/null @@ -1,328 +0,0 @@ -package graphql.execution.defer - -import graphql.DeferredExecutionResult -import graphql.ErrorType -import graphql.Directives -import graphql.ExecutionInput -import graphql.ExecutionResult -import graphql.GraphQL -import graphql.TestUtil -import graphql.schema.DataFetcher -import graphql.schema.DataFetchingEnvironment -import graphql.schema.idl.RuntimeWiring -import graphql.validation.ValidationError -import graphql.validation.ValidationErrorType -import org.awaitility.Awaitility -import org.reactivestreams.Publisher -import spock.lang.Specification - -import java.time.Duration -import java.util.concurrent.CompletableFuture - -import static graphql.schema.idl.TypeRuntimeWiring.newTypeWiring - -class DeferSupportIntegrationTest extends Specification { - def then = 0 - - def sentAt() { - def seconds = Duration.ofMillis(System.currentTimeMillis() - then).toMillis() - "T+" + seconds - } - - def sleepSome(DataFetchingEnvironment env) { - Integer sleepTime = env.getArgument("sleepTime") - sleepTime = Optional.ofNullable(sleepTime).orElse(0) - Thread.sleep(sleepTime) - } - - def schemaSpec = ''' - type Query { - post : Post - mandatoryReviews : [Review]! - } - - type Mutation { - mutate(arg : String) : String - } - - type Post { - postText : String - sentAt : String - echo(text : String = "echo") : String - comments(sleepTime : Int, prefix :String) : [Comment] - reviews(sleepTime : Int) : [Review] - } - - type Comment { - commentText : String - sentAt : String - comments(sleepTime : Int, prefix :String) : [Comment] - goes : Bang - } - - type Review { - reviewText : String - sentAt : String - comments(sleepTime : Int, prefix :String) : [Comment] - goes : Bang - } - - type Bang { - bang : String - } - ''' - - DataFetcher postFetcher = new DataFetcher() { - @Override - Object get(DataFetchingEnvironment environment) { - return CompletableFuture.supplyAsync({ - [postText: "post_data", sentAt: sentAt()] - }) - } - } - DataFetcher commentsFetcher = new DataFetcher() { - @Override - Object get(DataFetchingEnvironment env) { - return CompletableFuture.supplyAsync({ - sleepSome(env) - - def prefix = env.getArgument("prefix") - prefix = prefix == null ? "" : prefix - - def result = [] - for (int i = 0; i < 3; i++) { - result.add([commentText: prefix + "comment" + i, sentAt: sentAt(), goes: "goes"]) - } - return result - }) - } - - } - DataFetcher reviewsFetcher = new DataFetcher() { - @Override - Object get(DataFetchingEnvironment env) { - return CompletableFuture.supplyAsync({ - sleepSome(env) - def result = [] - for (int i = 0; i < 3; i++) { - result.add([reviewText: "review" + i, sentAt: sentAt(), goes: "goes"]) - } - return result - }) - } - } - - DataFetcher bangDataFetcher = new DataFetcher() { - @Override - Object get(DataFetchingEnvironment environment) { - throw new RuntimeException("Bang!") - } - } - DataFetcher echoDataFetcher = new DataFetcher() { - @Override - Object get(DataFetchingEnvironment environment) { - return environment.getArgument("text") - } - } - - GraphQL graphQL = null - - void setup() { - then = System.currentTimeMillis() - - def runtimeWiring = RuntimeWiring.newRuntimeWiring() - .type(newTypeWiring("Query").dataFetcher("post", postFetcher)) - .type(newTypeWiring("Post").dataFetcher("comments", commentsFetcher)) - .type(newTypeWiring("Post").dataFetcher("echo", echoDataFetcher)) - .type(newTypeWiring("Post").dataFetcher("reviews", reviewsFetcher)) - .type(newTypeWiring("Bang").dataFetcher("bang", bangDataFetcher)) - - .type(newTypeWiring("Comment").dataFetcher("comments", commentsFetcher)) - .type(newTypeWiring("Review").dataFetcher("comments", commentsFetcher)) - .build() - - def schema = TestUtil.schema(schemaSpec, runtimeWiring) - .transform({ builder -> builder.additionalDirective(Directives.DeferDirective) }) - this.graphQL = GraphQL.newGraphQL(schema).build() - } - - def "test defer support end to end"() { - - def query = ''' - query { - post { - postText - - a :comments(sleepTime:200) @defer { - commentText - } - - b : reviews(sleepTime:100) @defer { - reviewText - comments(prefix : "b_") @defer { - commentText - } - } - - c: reviews @defer { - goes { - bang - } - } - } - } - ''' - - when: - def initialResult = graphQL.execute(ExecutionInput.newExecutionInput().query(query).build()) - - then: - initialResult.errors.isEmpty() - initialResult.data == ["post": ["postText": "post_data", a: null, b: null, c: null]] - - when: - - Publisher deferredResultStream = initialResult.extensions[GraphQL.DEFERRED_RESULTS] as Publisher - - def subscriber = new CapturingSubscriber() - subscriber.subscribeTo(deferredResultStream) - Awaitility.await().untilTrue(subscriber.finished) - - List resultList = subscriber.executionResults - - then: - - assertDeferredData(resultList) - } - - def "test defer support keeps the fields named correctly when interspersed in the query"() { - - def query = ''' - query { - post { - interspersedA: echo(text:"before a:") - - a: comments(sleepTime:200) @defer { - commentText - } - - interspersedB: echo(text:"before b:") - - b : reviews(sleepTime:100) @defer { - reviewText - comments(prefix : "b_") @defer { - commentText - } - } - - interspersedC: echo(text:"before c:") - - c: reviews @defer { - goes { - bang - } - } - - interspersedD: echo(text:"after c:") - } - } - ''' - - when: - def initialResult = graphQL.execute(ExecutionInput.newExecutionInput().query(query).build()) - - then: - initialResult.errors.isEmpty() - initialResult.data == ["post": [ - "interspersedA": "before a:", - "a" : null, - "interspersedB": "before b:", - "b" : null, - "interspersedC": "before c:", - "c" : null, - "interspersedD": "after c:", - ]] - - when: - - Publisher deferredResultStream = initialResult.extensions[GraphQL.DEFERRED_RESULTS] as Publisher - - def subscriber = new CapturingSubscriber() - subscriber.subscribeTo(deferredResultStream); - Awaitility.await().untilTrue(subscriber.finished) - - List resultList = subscriber.executionResults - - then: - - assertDeferredData(resultList) - } - - def assertDeferredData(ArrayList resultList) { - resultList.size() == 6 - - assert resultList[0].data == [[commentText: "comment0"], [commentText: "comment1"], [commentText: "comment2"]] - assert resultList[0].errors == [] - assert resultList[0].path == ["post", "a"] - - assert resultList[1].data == [[reviewText: "review0", comments: null], [reviewText: "review1", comments: null], [reviewText: "review2", comments: null]] - assert resultList[1].errors == [] - assert resultList[1].path == ["post", "b"] - - // exceptions in here - assert resultList[2].errors.size() == 3 - assert resultList[2].errors[0].getMessage() == "Exception while fetching data (/post/c[0]/goes/bang) : Bang!" - assert resultList[2].errors[1].getMessage() == "Exception while fetching data (/post/c[1]/goes/bang) : Bang!" - assert resultList[2].errors[2].getMessage() == "Exception while fetching data (/post/c[2]/goes/bang) : Bang!" - assert resultList[2].path == ["post", "c"] - - // sub defers are sent in encountered order - assert resultList[3].data == [[commentText: "b_comment0"], [commentText: "b_comment1"], [commentText: "b_comment2"]] - assert resultList[3].errors == [] - assert resultList[3].path == ["post", "b", 0, "comments"] - - assert resultList[4].data == [[commentText: "b_comment0"], [commentText: "b_comment1"], [commentText: "b_comment2"]] - assert resultList[4].errors == [] - assert resultList[4].path == ["post", "b", 1, "comments"] - - assert resultList[5].data == [[commentText: "b_comment0"], [commentText: "b_comment1"], [commentText: "b_comment2"]] - assert resultList[5].errors == [] - assert resultList[5].path == ["post", "b", 2, "comments"] - - true - } - - def "nonNull types are not allowed"() { - - def query = ''' - { - mandatoryReviews @defer # nulls are not allowed - { - reviewText - } - } - ''' - when: - def initialResult = graphQL.execute(ExecutionInput.newExecutionInput().query(query).build()) - then: - initialResult.errors.size() == 1 - initialResult.errors[0].errorType == ErrorType.ValidationError - (initialResult.errors[0] as ValidationError).validationErrorType == ValidationErrorType.DeferDirectiveOnNonNullField - - } - - def "mutations cant have defers"() { - - def query = ''' - mutation { - mutate(arg : "go") @defer - } - ''' - when: - def initialResult = graphQL.execute(ExecutionInput.newExecutionInput().query(query).build()) - then: - initialResult.errors.size() == 1 - initialResult.errors[0].errorType == ErrorType.ValidationError - (initialResult.errors[0] as ValidationError).validationErrorType == ValidationErrorType.DeferDirectiveNotOnQueryOperation - } -} diff --git a/src/test/groovy/graphql/execution/defer/DeferSupportTest.groovy b/src/test/groovy/graphql/execution/defer/DeferSupportTest.groovy deleted file mode 100644 index 7f2c41e364..0000000000 --- a/src/test/groovy/graphql/execution/defer/DeferSupportTest.groovy +++ /dev/null @@ -1,272 +0,0 @@ -package graphql.execution.defer - -import graphql.DeferredExecutionResult -import graphql.ExecutionResult -import graphql.ExecutionResultImpl -import graphql.execution.ExecutionPath -import graphql.language.Argument -import graphql.language.BooleanValue -import graphql.language.Directive -import graphql.language.Field -import graphql.language.VariableReference -import org.awaitility.Awaitility -import spock.lang.Specification - -import java.util.concurrent.CompletableFuture - -import static graphql.TestUtil.mergedField - -class DeferSupportTest extends Specification { - - - def "emits N deferred calls with order preserved"() { - - given: - def deferSupport = new DeferSupport() - deferSupport.enqueue(offThread("A", 100, "/field/path")) // <-- will finish last - deferSupport.enqueue(offThread("B", 50, "/field/path")) // <-- will finish second - deferSupport.enqueue(offThread("C", 10, "/field/path")) // <-- will finish first - - when: - List results = [] - def subscriber = new BasicSubscriber() { - @Override - void onNext(DeferredExecutionResult executionResult) { - results.add(executionResult) - subscription.request(1) - } - } - deferSupport.startDeferredCalls().subscribe(subscriber) - Awaitility.await().untilTrue(subscriber.finished) - then: - - results.size() == 3 - results[0].data == "A" - results[1].data == "B" - results[2].data == "C" - } - - def "calls within calls are enqueued correctly"() { - given: - def deferSupport = new DeferSupport() - deferSupport.enqueue(offThreadCallWithinCall(deferSupport, "A", "a", 100, "/a")) - deferSupport.enqueue(offThreadCallWithinCall(deferSupport, "B", "b", 50, "/b")) - deferSupport.enqueue(offThreadCallWithinCall(deferSupport, "C", "c", 10, "/c")) - - when: - List results = [] - BasicSubscriber subscriber = new BasicSubscriber() { - @Override - void onNext(DeferredExecutionResult executionResult) { - results.add(executionResult) - subscription.request(1) - } - } - deferSupport.startDeferredCalls().subscribe(subscriber) - - Awaitility.await().untilTrue(subscriber.finished) - then: - - results.size() == 6 - results[0].data == "A" - results[1].data == "B" - results[2].data == "C" - results[3].data == "a" - results[4].data == "b" - results[5].data == "c" - } - - def "stops at first exception encountered"() { - given: - def deferSupport = new DeferSupport() - deferSupport.enqueue(offThread("A", 100, "/field/path")) - deferSupport.enqueue(offThread("Bang", 50, "/field/path")) // <-- will throw exception - deferSupport.enqueue(offThread("C", 10, "/field/path")) - - when: - List results = [] - Throwable thrown = null - def subscriber = new BasicSubscriber() { - @Override - void onNext(DeferredExecutionResult executionResult) { - results.add(executionResult) - subscription.request(1) - } - - @Override - void onError(Throwable t) { - thrown = t - finished.set(true) - } - - @Override - void onComplete() { - assert false, "This should not be called!" - } - } - deferSupport.startDeferredCalls().subscribe(subscriber) - - Awaitility.await().untilTrue(subscriber.finished) - then: - - thrown.message == "java.lang.RuntimeException: Bang" - } - - def "you can cancel the subscription"() { - given: - def deferSupport = new DeferSupport() - deferSupport.enqueue(offThread("A", 100, "/field/path")) // <-- will finish last - deferSupport.enqueue(offThread("B", 50, "/field/path")) // <-- will finish second - deferSupport.enqueue(offThread("C", 10, "/field/path")) // <-- will finish first - - when: - List results = [] - def subscriber = new BasicSubscriber() { - @Override - void onNext(DeferredExecutionResult executionResult) { - results.add(executionResult) - subscription.cancel() - finished.set(true) - } - } - deferSupport.startDeferredCalls().subscribe(subscriber) - - Awaitility.await().untilTrue(subscriber.finished) - then: - - results.size() == 1 - results[0].data == "A" - - } - - def "you cant subscribe twice"() { - given: - def deferSupport = new DeferSupport() - deferSupport.enqueue(offThread("A", 100, "/field/path")) - deferSupport.enqueue(offThread("Bang", 50, "/field/path")) // <-- will finish second - deferSupport.enqueue(offThread("C", 10, "/field/path")) // <-- will finish first - - when: - Throwable expectedThrowble - deferSupport.startDeferredCalls().subscribe(new BasicSubscriber()) - deferSupport.startDeferredCalls().subscribe(new BasicSubscriber() { - @Override - void onError(Throwable t) { - expectedThrowble = t - } - }) - then: - expectedThrowble != null - } - - def "indicates of there any defers present"() { - given: - def deferSupport = new DeferSupport() - - when: - def deferPresent1 = deferSupport.isDeferDetected() - - then: - !deferPresent1 - - when: - deferSupport.enqueue(offThread("A", 100, "/field/path")) - def deferPresent2 = deferSupport.isDeferDetected() - - then: - deferPresent2 - } - - def "detects @defer directive"() { - given: - def deferSupport = new DeferSupport() - - when: - def noDirectivePresent = deferSupport.checkForDeferDirective(mergedField([ - new Field("a"), - new Field("b") - ]), [:]) - - then: - !noDirectivePresent - - when: - def directivePresent = deferSupport.checkForDeferDirective(mergedField([ - Field.newField("a").directives([new Directive("defer")]).build(), - new Field("b") - ]), [:]) - - then: - directivePresent - } - - def "detects @defer directive can be controlled via if"() { - given: - def deferSupport = new DeferSupport() - - when: - def ifArg = new Argument("if", new BooleanValue(false)) - def directivePresent = deferSupport.checkForDeferDirective(mergedField([ - Field.newField("a").directives([new Directive("defer", [ifArg])]).build(), - new Field("b") - ]), [:]) - - then: - !directivePresent - - when: - ifArg = new Argument("if", new BooleanValue(true)) - directivePresent = deferSupport.checkForDeferDirective(mergedField([ - Field.newField("a").directives([new Directive("defer", [ifArg])]).build(), - new Field("b") - ]), [:]) - - then: - directivePresent - - when: - ifArg = new Argument("if", new VariableReference("varRef")) - directivePresent = deferSupport.checkForDeferDirective(mergedField([ - Field.newField("a").directives([new Directive("defer", [ifArg])]).build(), - new Field("b") - ]), [varRef: false]) - - then: - !directivePresent - - when: - ifArg = new Argument("if", new VariableReference("varRef")) - directivePresent = deferSupport.checkForDeferDirective(mergedField([ - Field.newField("a").directives([new Directive("defer", [ifArg])]).build(), - new Field("b") - ]), [varRef: true]) - - then: - directivePresent - } - - private static DeferredCall offThread(String data, int sleepTime, String path) { - def callSupplier = { - CompletableFuture.supplyAsync({ - Thread.sleep(sleepTime) - if (data == "Bang") { - throw new RuntimeException(data) - } - new ExecutionResultImpl(data, []) - }) - } - return new DeferredCall(ExecutionPath.parse(path), callSupplier, new DeferredErrorSupport()) - } - - private - static DeferredCall offThreadCallWithinCall(DeferSupport deferSupport, String dataParent, String dataChild, int sleepTime, String path) { - def callSupplier = { - CompletableFuture.supplyAsync({ - Thread.sleep(sleepTime) - deferSupport.enqueue(offThread(dataChild, sleepTime, path)) - new ExecutionResultImpl(dataParent, []) - }) - } - return new DeferredCall(ExecutionPath.parse("/field/path"), callSupplier, new DeferredErrorSupport()) - } -} diff --git a/src/test/groovy/graphql/execution/defer/DeferredCallTest.groovy b/src/test/groovy/graphql/execution/defer/DeferredCallTest.groovy deleted file mode 100644 index 2e703bc875..0000000000 --- a/src/test/groovy/graphql/execution/defer/DeferredCallTest.groovy +++ /dev/null @@ -1,48 +0,0 @@ -package graphql.execution.defer - - -import graphql.ExecutionResultImpl -import graphql.validation.ValidationError -import graphql.validation.ValidationErrorType -import spock.lang.Specification - -import static graphql.execution.ExecutionPath.parse -import static java.util.concurrent.CompletableFuture.completedFuture - -class DeferredCallTest extends Specification { - - def "test call capture gives a CF"() { - given: - DeferredCall call = new DeferredCall(parse("/path"), { - completedFuture(new ExecutionResultImpl("some data", Collections.emptyList())) - }, new DeferredErrorSupport()) - - when: - def future = call.invoke() - then: - future.join().data == "some data" - future.join().path == ["path"] - } - - def "test error capture happens via CF"() { - given: - def errorSupport = new DeferredErrorSupport() - errorSupport.onError(new ValidationError(ValidationErrorType.MissingFieldArgument)) - errorSupport.onError(new ValidationError(ValidationErrorType.FieldsConflict)) - - DeferredCall call = new DeferredCall(parse("/path"), { - completedFuture(new ExecutionResultImpl("some data", [new ValidationError(ValidationErrorType.FieldUndefined)])) - }, errorSupport) - - when: - def future = call.invoke() - def er = future.join() - - then: - er.errors.size() == 3 - er.errors[0].message.contains("Validation error of type FieldUndefined") - er.errors[1].message.contains("Validation error of type MissingFieldArgument") - er.errors[2].message.contains("Validation error of type FieldsConflict") - er.path == ["path"] - } -} diff --git a/src/test/groovy/graphql/execution/defer/DeferredErrorSupportTest.groovy b/src/test/groovy/graphql/execution/defer/DeferredErrorSupportTest.groovy deleted file mode 100644 index 3fc8e8e1a8..0000000000 --- a/src/test/groovy/graphql/execution/defer/DeferredErrorSupportTest.groovy +++ /dev/null @@ -1,77 +0,0 @@ -package graphql.execution.defer - -import graphql.DeferredExecutionResult -import graphql.Directives -import graphql.ExecutionResult -import graphql.GraphQL -import graphql.schema.DataFetcher -import graphql.schema.DataFetchingEnvironment -import org.reactivestreams.Publisher -import spock.lang.Specification - -import static graphql.TestUtil.schema -import static graphql.schema.idl.RuntimeWiring.newRuntimeWiring -import static graphql.schema.idl.TypeRuntimeWiring.newTypeWiring -import static org.awaitility.Awaitility.await - -class DeferredErrorSupportTest extends Specification { - - def "#1040 errors in stage one do not affect deferred stages"() { - - def spec = ''' - type Query { - stage1 : String - stage2 : String - } - ''' - - def bangDF = new DataFetcher() { - @Override - Object get(DataFetchingEnvironment environment) { - throw new RuntimeException("bang-" + environment.getField().getName()) - } - } - - def runtimeWiring = newRuntimeWiring().type( - newTypeWiring("Query") - .dataFetchers([ - stage1: bangDF, - stage2: bangDF, - ]) - ).build() - - def schema = schema(spec, runtimeWiring).transform({ b -> b.additionalDirective(Directives.DeferDirective) }) - def graphql = GraphQL.newGraphQL(schema).build() - - when: - def executionResult = graphql.execute(''' - { - stage1, - stage2 @defer - } - ''') - - then: - executionResult.errors.size() == 1 - executionResult.errors[0].getMessage().contains("bang-stage1") - - when: - def executionResultDeferred = null - def subscriber = new BasicSubscriber() { - @Override - void onNext(DeferredExecutionResult executionResultStreamed) { - executionResultDeferred = executionResultStreamed - subscription.request(1) - } - } - Publisher deferredResultStream = executionResult.extensions[GraphQL.DEFERRED_RESULTS] as Publisher - deferredResultStream.subscribe(subscriber) - - await().untilTrue(subscriber.finished) - - then: - executionResultDeferred.errors.size() == 1 - executionResultDeferred.errors[0].getMessage().contains("bang-stage2") - - } -} diff --git a/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceTest.groovy b/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceTest.groovy index 0df6c7a5ca..4565db0615 100644 --- a/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceTest.groovy +++ b/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceTest.groovy @@ -1,23 +1,14 @@ package graphql.execution.instrumentation.dataloader -import graphql.DeferredExecutionResult + import graphql.ExecutionInput import graphql.GraphQL -import graphql.execution.defer.CapturingSubscriber import graphql.execution.instrumentation.Instrumentation -import org.awaitility.Awaitility import org.dataloader.DataLoaderRegistry -import org.reactivestreams.Publisher import spock.lang.Specification -import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.expectedInitialDeferredData -import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.expectedInitialExpensiveDeferredData -import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.getDeferredQuery import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.getExpectedData import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.getExpectedExpensiveData -import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.getExpectedExpensiveDeferredData -import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.getExpectedListOfDeferredData -import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.getExpensiveDeferredQuery import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.getExpensiveQuery import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.getQuery @@ -96,57 +87,4 @@ class DataLoaderPerformanceTest extends Specification { batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() <= 2 } - def "data loader will work with deferred queries"() { - - when: - - ExecutionInput executionInput = ExecutionInput.newExecutionInput().query(deferredQuery).dataLoaderRegistry(dataLoaderRegistry).build() - def result = graphQL.execute(executionInput) - - Map extensions = result.getExtensions() - Publisher deferredResultStream = (Publisher) extensions.get(GraphQL.DEFERRED_RESULTS) - - def subscriber = new CapturingSubscriber() - subscriber.subscribeTo(deferredResultStream) - Awaitility.await().untilTrue(subscriber.finished) - - - then: - - result.data == expectedInitialDeferredData - - subscriber.executionResultData == expectedListOfDeferredData - - // - // with deferred results, we don't achieve the same efficiency - batchCompareDataFetchers.departmentsForShopsBatchLoaderCounter.get() == 3 - batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 3 - } - - def "data loader will work with deferred queries on multiple levels deep"() { - - when: - - ExecutionInput executionInput = ExecutionInput.newExecutionInput().query(expensiveDeferredQuery).dataLoaderRegistry(dataLoaderRegistry).build() - def result = graphQL.execute(executionInput) - - Map extensions = result.getExtensions() - Publisher deferredResultStream = (Publisher) extensions.get(GraphQL.DEFERRED_RESULTS) - - def subscriber = new CapturingSubscriber() - subscriber.subscribeTo(deferredResultStream) - Awaitility.await().untilTrue(subscriber.finished) - - - then: - - result.data == expectedInitialExpensiveDeferredData - - subscriber.executionResultData == expectedExpensiveDeferredData - - // - // with deferred results, we don't achieve the same efficiency - batchCompareDataFetchers.departmentsForShopsBatchLoaderCounter.get() == 3 - batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 3 - } } diff --git a/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceWithChainedInstrumentationTest.groovy b/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceWithChainedInstrumentationTest.groovy index 7d15890a82..7b1bd96d54 100644 --- a/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceWithChainedInstrumentationTest.groovy +++ b/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceWithChainedInstrumentationTest.groovy @@ -1,25 +1,16 @@ package graphql.execution.instrumentation.dataloader -import graphql.DeferredExecutionResult + import graphql.ExecutionInput import graphql.GraphQL -import graphql.execution.defer.CapturingSubscriber import graphql.execution.instrumentation.ChainedInstrumentation import graphql.execution.instrumentation.Instrumentation -import org.awaitility.Awaitility import org.dataloader.DataLoaderRegistry -import org.reactivestreams.Publisher import spock.lang.Ignore import spock.lang.Specification -import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.expectedInitialDeferredData -import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.expectedInitialExpensiveDeferredData -import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.getDeferredQuery import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.getExpectedData import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.getExpectedExpensiveData -import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.getExpectedExpensiveDeferredData -import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.getExpectedListOfDeferredData -import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.getExpensiveDeferredQuery import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.getExpensiveQuery import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.getQuery @@ -105,57 +96,5 @@ class DataLoaderPerformanceWithChainedInstrumentationTest extends Specification batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() <= 2 } - def "chainedInstrumentation: data loader will work with deferred queries"() { - - when: - - ExecutionInput executionInput = ExecutionInput.newExecutionInput().query(deferredQuery).dataLoaderRegistry(dataLoaderRegistry).build() - def result = graphQL.execute(executionInput) - - Map extensions = result.getExtensions() - Publisher deferredResultStream = (Publisher) extensions.get(GraphQL.DEFERRED_RESULTS) - - def subscriber = new CapturingSubscriber() - subscriber.subscribeTo(deferredResultStream) - Awaitility.await().untilTrue(subscriber.finished) - - - then: - - result.data == expectedInitialDeferredData - - subscriber.executionResultData == expectedListOfDeferredData - - // - // with deferred results, we don't achieve the same efficiency - batchCompareDataFetchers.departmentsForShopsBatchLoaderCounter.get() == 3 - batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 3 - } - def "chainedInstrumentation: data loader will work with deferred queries on multiple levels deep"() { - - when: - - ExecutionInput executionInput = ExecutionInput.newExecutionInput().query(expensiveDeferredQuery).dataLoaderRegistry(dataLoaderRegistry).build() - def result = graphQL.execute(executionInput) - - Map extensions = result.getExtensions() - Publisher deferredResultStream = (Publisher) extensions.get(GraphQL.DEFERRED_RESULTS) - - def subscriber = new CapturingSubscriber() - subscriber.subscribeTo(deferredResultStream) - Awaitility.await().untilTrue(subscriber.finished) - - - then: - - result.data == expectedInitialExpensiveDeferredData - - subscriber.executionResultData == expectedExpensiveDeferredData - - // - // with deferred results, we don't achieve the same efficiency - batchCompareDataFetchers.departmentsForShopsBatchLoaderCounter.get() == 3 - batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 3 - } }