From f9f7cac2b6aa9419b604ab031c17d8993670fd03 Mon Sep 17 00:00:00 2001 From: Shengkai <33114724+fsk119@users.noreply.github.com> Date: Fri, 3 Mar 2023 20:26:48 +0800 Subject: [PATCH] [FLINK-30948][sql-client] Fix ExecutorImpl#testInterruptException hangs (#22055) --- .../table/client/gateway/ExecutorImpl.java | 58 +++++++++++-------- .../client/gateway/ExecutorImplITCase.java | 19 ++++-- 2 files changed, 46 insertions(+), 31 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java index 6d9a5206f2b25..08b68319860f5 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java @@ -205,32 +205,40 @@ public StatementResult executeStatement(String statement) { ExecuteStatementHeaders.getInstance(), new SessionMessageParameters(sessionHandle), request); + // It's possible that the execution is canceled during the submission. // Close the Operation in background to make sure the execution can continue. - getResponse( - executeStatementResponse, - e -> { - executorService.submit( - () -> { - try { - ExecuteStatementResponseBody executeStatementResponseBody = - executeStatementResponse.get(); - // close operation in background to make sure users can not - // interrupt the execution. - closeOperationAsync( - getOperationHandle( - executeStatementResponseBody - ::getOperationHandle)); - } catch (Exception newException) { - // ignore - } - }); - return new SqlExecutionException("Interrupted to get response.", e); - }); - OperationHandle operationHandle = getOperationHandle( - () -> getResponse(executeStatementResponse).getOperationHandle()); + () -> + getResponse( + executeStatementResponse, + e -> { + executorService.submit( + () -> { + try { + ExecuteStatementResponseBody + executeStatementResponseBody = + executeStatementResponse + .get(); + // close operation in background + // to make sure users can not + // interrupt the execution. + closeOperationAsync( + getOperationHandle( + executeStatementResponseBody + ::getOperationHandle)); + } catch (Exception newException) { + e.addSuppressed(newException); + LOG.error( + "Failed to cancel the interrupted exception.", + e); + } + }); + return new SqlExecutionException( + "Interrupted to get response.", e); + }) + .getOperationHandle()); FetchResultsResponseBody fetchResultsResponse = fetchUtilResultsReady(operationHandle); ResultInfo firstResult = fetchResultsResponse.getResults(); @@ -316,7 +324,7 @@ private FetchResultsResponseBody fetchResults(OperationHandle operationHandle, l return getFetchResultResponse( operationHandle, token, - true, + false, e -> { sendRequest( CancelOperationHeaders.getInstance(), @@ -368,7 +376,7 @@ private FetchResultsResponseBody fetchUtilResultsReady(OperationHandle operation getFetchResultResponse( operationHandle, 0L, - false, + true, e -> { // CliClient will not close the results. Try best to close it. closeOperationAsync(operationHandle); @@ -385,7 +393,7 @@ private FetchResultsResponseBody getFetchResultResponse( boolean fetchResultWithInterval, Function interruptedExceptionHandler) { try { - if (!fetchResultWithInterval) { + if (fetchResultWithInterval) { Thread.sleep(100); } return sendRequest( diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java index 884472642468b..8b4c7d24be998 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java @@ -518,7 +518,7 @@ void testInterruptFetching() throws Exception { testInterrupting( executor -> { try (StatementResult result = - executor.executeStatement(BlockPhase.EXECUTION.name())) { + executor.executeStatement(BlockPhase.FETCHING.name())) { // trigger to fetch again result.hasNext(); } @@ -610,18 +610,25 @@ private void testNegotiateVersion( private void testInterrupting(Consumer task) throws Exception { try (Executor executor = createTestServiceExecutor()) { - Thread t = threadFactory.newThread(() -> task.accept(executor)); - t.start(); - TestSqlGatewayService service = (TestSqlGatewayService) TEST_SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getSqlGatewayService(); + Thread t = + threadFactory.newThread( + () -> { + try { + task.accept(executor); + } finally { + // notify server to return results until the executor finishes + // exception processing. + service.latch.countDown(); + } + }); + t.start(); CommonTestUtils.waitUntilCondition(() -> service.isBlocking, 100L); // interrupt the submission t.interrupt(); - // notify service return handle - service.latch.countDown(); CommonTestUtils.waitUntilCondition(() -> service.isClosed, 100L); }