Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,32 +205,40 @@ public StatementResult executeStatement(String statement) {
ExecuteStatementHeaders.getInstance(),
new SessionMessageParameters(sessionHandle),
request);

// It's possible that the execution is canceled during the submission.
// Close the Operation in background to make sure the execution can continue.
getResponse(
executeStatementResponse,
e -> {
executorService.submit(
() -> {
try {
ExecuteStatementResponseBody executeStatementResponseBody =
executeStatementResponse.get();
// close operation in background to make sure users can not
// interrupt the execution.
closeOperationAsync(
getOperationHandle(
executeStatementResponseBody
::getOperationHandle));
} catch (Exception newException) {
// ignore
}
});
return new SqlExecutionException("Interrupted to get response.", e);
});

OperationHandle operationHandle =
getOperationHandle(
() -> getResponse(executeStatementResponse).getOperationHandle());
() ->
getResponse(
executeStatementResponse,
e -> {
executorService.submit(
() -> {
try {
ExecuteStatementResponseBody
executeStatementResponseBody =
executeStatementResponse
.get();
// close operation in background
// to make sure users can not
// interrupt the execution.
closeOperationAsync(
getOperationHandle(
executeStatementResponseBody
::getOperationHandle));
} catch (Exception newException) {
e.addSuppressed(newException);
LOG.error(
"Failed to cancel the interrupted exception.",
e);
}
});
return new SqlExecutionException(
"Interrupted to get response.", e);
})
.getOperationHandle());
FetchResultsResponseBody fetchResultsResponse = fetchUtilResultsReady(operationHandle);
ResultInfo firstResult = fetchResultsResponse.getResults();

Expand Down Expand Up @@ -316,7 +324,7 @@ private FetchResultsResponseBody fetchResults(OperationHandle operationHandle, l
return getFetchResultResponse(
operationHandle,
token,
true,
false,
e -> {
sendRequest(
CancelOperationHeaders.getInstance(),
Expand Down Expand Up @@ -368,7 +376,7 @@ private FetchResultsResponseBody fetchUtilResultsReady(OperationHandle operation
getFetchResultResponse(
operationHandle,
0L,
false,
true,
e -> {
// CliClient will not close the results. Try best to close it.
closeOperationAsync(operationHandle);
Expand All @@ -385,7 +393,7 @@ private FetchResultsResponseBody getFetchResultResponse(
boolean fetchResultWithInterval,
Function<InterruptedException, SqlExecutionException> interruptedExceptionHandler) {
try {
if (!fetchResultWithInterval) {
if (fetchResultWithInterval) {
Thread.sleep(100);
}
return sendRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ void testInterruptFetching() throws Exception {
testInterrupting(
executor -> {
try (StatementResult result =
executor.executeStatement(BlockPhase.EXECUTION.name())) {
executor.executeStatement(BlockPhase.FETCHING.name())) {
// trigger to fetch again
result.hasNext();
}
Expand Down Expand Up @@ -610,18 +610,25 @@ private void testNegotiateVersion(

private void testInterrupting(Consumer<Executor> task) throws Exception {
try (Executor executor = createTestServiceExecutor()) {
Thread t = threadFactory.newThread(() -> task.accept(executor));
t.start();

TestSqlGatewayService service =
(TestSqlGatewayService)
TEST_SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getSqlGatewayService();
Thread t =
threadFactory.newThread(
() -> {
try {
task.accept(executor);
} finally {
// notify server to return results until the executor finishes
// exception processing.
service.latch.countDown();
}
});
t.start();
CommonTestUtils.waitUntilCondition(() -> service.isBlocking, 100L);

// interrupt the submission
t.interrupt();
// notify service return handle
service.latch.countDown();

CommonTestUtils.waitUntilCondition(() -> service.isClosed, 100L);
}
Expand Down