Skip to content

Commit

Permalink
IGNITE-20454 Exception propagation test.
Browse files Browse the repository at this point in the history
  • Loading branch information
xtern committed Oct 17, 2023
1 parent 6ded3ae commit fd4f3eb
Showing 1 changed file with 46 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_LEFT_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -587,13 +584,13 @@ public void exceptionArrivingBeforeRootFragmentExecutesDoesNotLeaveQueryHanging(
}

/**
* Test checks the ability to run script with multiple statements using {@link QueryPrefetchCallback}.
* Each next statement starts executing after the first page for the previous statement has been prefetched.
* Tests the ability to run multiple statements using {@link QueryPrefetchCallback}. Each subsequent
* statement begins execution after the prefetching for the previous statement is completed.
*
* @throws Exception If failed.
*/
@Test
public void testPrefetchCallback() throws Exception {
public void testPrefetchCallbackInvocation() throws Exception {
String query = "SELECT * FROM test_tbl";
int totalStatements = 20;
Collection<AsyncCursor<List<Object>>> resultCursors = new ArrayBlockingQueue<>(totalStatements);
Expand Down Expand Up @@ -652,6 +649,49 @@ public void onPrefetchComplete(@Nullable Throwable err) {
resultCursors.forEach(AsyncCursor::closeAsync);
}

/**
* Test ensures that an exception during data prefetching is propagated to the callback.
*/
@Test
public void testErrorIsPropagatedToPrefetchCallback() {
ExecutionService execService = executionServices.get(0);
CompletableFuture<Void> prefetchFut = new CompletableFuture<>();
IgniteInternalException expectedException = new IgniteInternalException(Common.INTERNAL_ERR, "Expected exception");

BaseQueryContext ctx = BaseQueryContext.builder()
.cancel(new QueryCancel())
.prefetchCallback(prefetchFut::completeExceptionally)
.frameworkConfig(
Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
.defaultSchema(wrap(schema))
.build()
)
.logger(log)
.build();

testCluster.node(nodeNames.get(2)).interceptor((nodeName, msg, original) -> {
if (msg instanceof QueryStartRequest) {
testCluster.node(nodeNames.get(2)).messageService().send(nodeName, new SqlQueryMessagesFactory().queryStartResponse()
.queryId(((QueryStartRequest) msg).queryId())
.fragmentId(((QueryStartRequest) msg).fragmentId())
.error(expectedException)
.build()
);
} else {
original.onMessage(nodeName, msg);
}

return CompletableFuture.completedFuture(null);
});

QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
AsyncCursor<List<Object>> cursor = execService.executePlan(new NoOpTransaction(nodeNames.get(0)), plan, ctx);

assertThat(prefetchFut, willThrow(equalTo(expectedException)));

cursor.closeAsync();
}

/** Creates an execution service instance for the node with given consistent id. */
public ExecutionServiceImpl<Object[]> create(String nodeName) {
if (!nodeNames.contains(nodeName)) {
Expand Down

0 comments on commit fd4f3eb

Please sign in to comment.