Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #5531][TEST] Fix flaky FlinkOperationOnYarnSuite by enlarging the max rows setting #5549

Closed
wants to merge 24 commits into from

Conversation

davidyuan1223
Copy link
Contributor

@davidyuan1223 davidyuan1223 commented Oct 27, 2023

Why are the changes needed?

1. know about this pr

When we execute flink(1.17+) test case, it may throw exception when the test case is show/stop job, the exception desc like this

- execute statement - show/stop jobs *** FAILED ***
  org.apache.kyuubi.jdbc.hive.KyuubiSQLException: Error operating ExecuteStatement: org.apache.flink.table.gateway.service.utils.SqlExecutionException: Could not stop job 4dece26857fab91d63fad1abd8c6bdd0 with savepoint for operation 9ed8247a-b7bd-4004-875b-61ba654ab3dd.
	at org.apache.flink.table.gateway.service.operation.OperationExecutor.lambda$callStopJobOperation$11(OperationExecutor.java:628)
	at org.apache.flink.table.gateway.service.operation.OperationExecutor.runClusterAction(OperationExecutor.java:716)
	at org.apache.flink.table.gateway.service.operation.OperationExecutor.callStopJobOperation(OperationExecutor.java:601)
	at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:434)
	at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
	at org.apache.kyuubi.engine.flink.operation.ExecuteStatement.executeStatement(ExecuteStatement.scala:64)
	at org.apache.kyuubi.engine.flink.operation.ExecuteStatement.runInternal(ExecuteStatement.scala:56)
	at org.apache.kyuubi.operation.AbstractOperation.run(AbstractOperation.scala:171)
	at org.apache.kyuubi.session.AbstractSession.runOperation(AbstractSession.scala:101)
	at org.apache.kyuubi.session.AbstractSession.$anonfun$executeStatement$1(AbstractSession.scala:131)
	at org.apache.kyuubi.session.AbstractSession.withAcquireRelease(AbstractSession.scala:82)
	at org.apache.kyuubi.session.AbstractSession.executeStatement(AbstractSession.scala:128)
	at org.apache.kyuubi.service.AbstractBackendService.executeStatement(AbstractBackendService.scala:67)
	at org.apache.kyuubi.service.TFrontendService.ExecuteStatement(TFrontendService.scala:252)
	at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1557)
	at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1542)
	at org.apache.kyuubi.shade.org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
	at org.apache.kyuubi.shade.org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
	at org.apache.kyuubi.service.authentication.TSetIpAddressProcessor.process(TSetIpAddressProcessor.scala:36)
	at org.apache.kyuubi.shade.org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1[149](https://github.com/apache/kyuubi/actions/runs/6649714451/job/18068699087?pr=5501#step:8:150))
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
	at org.apache.flink.table.gateway.service.operation.OperationExecutor.lambda$callStopJobOperation$11(OperationExecutor.java:617)
	... 22 more
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
	at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
	at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
	at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:925)
	at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:913)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
	at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1298)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)
	at akka.dispatch.OnComplete.internal(Future.scala:299)
	at akka.dispatch.OnComplete.internal(Future.scala:297)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
  at org.apache.kyuubi.operation.AbstractOperation.run(AbstractOperation.scala:171)
  ...
  Cause: java.lang.RuntimeException: org.apache.flink.util.SerializedThrowable:java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
  at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
  at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
  at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:925)
  at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:913)
  at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
  at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
  at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260)
  at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
  at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
  at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
  ...
  Cause: java.lang.RuntimeException: org.apache.flink.util.SerializedThrowable:org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
  at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:143)
  at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:105)
  at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:[160](https://github.com/apache/kyuubi/actions/runs/6649714451/job/18068699087?pr=5501#step:8:161)4)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
  at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
  at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:[168](https://github.com/apache/kyuubi/actions/runs/6649714451/job/18068699087?pr=5501#step:8:169))
  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
  ...

2. what make the test case failed?

If we want know the reason about the exception, we need to understand the process of flink executing stop job, the process line like this code space show(it's source is our bad test case, we can use this test case to solve similar problems)

1. sql
   1.1 create table tbl_a (a int) with ('connector' = 'datagen','rows-per-second'='10')
   1.2 create table tbl_b (a int) with ('connector' = 'blackhole')
   1.3 insert into tbl_b select * from tbl_a
2. start job: it will get 2 tasks abount source sink
3. show job: we can get job info 
4. stop job(the main error): 
   4.1 stop job need checkpoint
   4.2 start checkpoint, it need all task state is running
   4.3 checkpoint can not get all task state is running, then throw the exception

Actually, in a normal process, it should not throw the exception, if this happens to your job, please check your kyuubi conf kyuubi.session.engine.flink.max.rows, it's default value is 1000000. But in the test case, we only the the conf's value is 10.
It's the reason to make the error, this conf makes when we execute a stream query, it will cancel the when the limit is reached. Because flink's datagen is a streamconnector, so we can imagine, when we execute those sql, because our conf, it will make the sink task be canceled because the query reached 10. So when we execute stop job, flink checkpoint cannot get the tasks about this job is all in running state, then flink throw this exception.

3. how can we solve this problem?

When your job makes the same exception, please make sure your kyuubi conf kyuubi.session.engine.flink.max.rows's value can it meet your streaming query needs? Then changes the conf's value.

close #5531

How was this patch tested?

  • Add some test cases that check the changes thoroughly including negative and positive cases if possible

  • Add screenshots for manual tests if appropriate

  • Run test locally before make a pull request

Was this patch authored or co-authored using generative AI tooling?

No

davidyuan1223 and others added 22 commits January 13, 2023 16:40
1. "$@" is a array, we want use string to compare. so update "$@" => "$*"
2. `tty` mean execute the command, we can use $(tty) replace it
3. param $# is a number, compare number should use -gt/-lt,not >/<
1. "$@" is a array, we want use string to compare. so update "$@" => "$*"
2. `tty` mean execute the command, we can use $(tty) replace it
3. param $# is a number, compare number should use -gt/-lt,not >/<
4. not sure the /bin/kyuubi line 63 'exit -1' need modify? so the directory bin only have a shellcheck note in /bin/kyuubi
# Conflicts:
#	externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
#	externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala
@pan3793
Copy link
Member

pan3793 commented Oct 27, 2023

@davidyuan1223 thanks for your contribution, would you like to consider using git rebase to reduce your commit history?

https://dev.to/lydiahallie/cs-visualized-useful-git-commands-37p1

@pan3793 pan3793 requested a review from link3280 October 27, 2023 09:44
@davidyuan1223
Copy link
Contributor Author

@davidyuan1223 thanks for your contribution, would you like to consider using git rebase to reduce your commit history?

https://dev.to/lydiahallie/cs-visualized-useful-git-commands-37p1

ok, wait the ci success i will try to reduce the commit

Copy link
Contributor

@link3280 link3280 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a reasonable change.

FWIW, The root cause is that we cannot easily determine if all tasks of a Flink job are running. The tests in the Flink project would invoke a rest client to check the task status, but I think it's overkill for us.

Let's see how's it going with the new max rows conf.

@davidyuan1223
Copy link
Contributor Author

davidyuan1223 commented Oct 27, 2023

I think it's a reasonable change.

FWIW, The root cause is that we cannot easily determine if all tasks of a Flink job are running. The tests in the Flink project would invoke a rest client to check the task status, but I think it's overkill for us.

Let's see how's it going with the new max rows conf.

just like the comment #5531 (comment), actually, i also not sure the kyuubi's conf can solve the problem, maybe we also need to change the create sql about datagen's rows-per-second's value.
All changes i think we need wait ci complete then check

@pan3793
Copy link
Member

pan3793 commented Oct 27, 2023

could you change the PR title with more accurate words? "fix bug" is too generic which makes less sense for future explorers.

@davidyuan1223 davidyuan1223 changed the title [KYUUBI #5531] Fix flink test bug [KYUUBI #5531] Fix kyuubi-flink-engine's test case failed when the checkpoint found there have some tasks are not running Oct 27, 2023
@pan3793 pan3793 changed the title [KYUUBI #5531] Fix kyuubi-flink-engine's test case failed when the checkpoint found there have some tasks are not running [KYUUBI #5531][TEST] Fix flaky FlinkOperationOnYarnSuite by enlarging the max rows setting Oct 27, 2023
@codecov-commenter
Copy link

codecov-commenter commented Oct 27, 2023

Codecov Report

Merging #5549 (ce7fd79) into master (03d6223) will increase coverage by 61.40%.
Report is 42 commits behind head on master.
The diff coverage is 90.77%.

@@              Coverage Diff              @@
##             master    #5549       +/-   ##
=============================================
+ Coverage      0.00%   61.40%   +61.40%     
- Complexity        0       23       +23     
=============================================
  Files           588      600       +12     
  Lines         33480    34316      +836     
  Branches       4405     4501       +96     
=============================================
+ Hits              0    21072    +21072     
+ Misses        33480    11109    -22371     
- Partials          0     2135     +2135     
Files Coverage Δ
...ache/kyuubi/plugin/spark/authz/OperationType.scala 100.00% <100.00%> (+100.00%) ⬆️
.../kyuubi/plugin/spark/authz/PrivilegesBuilder.scala 90.69% <100.00%> (+90.69%) ⬆️
...rk/authz/ranger/RuleApplyPermanentViewMarker.scala 94.11% <100.00%> (+94.11%) ⬆️
...in/scala/org/apache/spark/kyuubi/StageStatus.scala 75.00% <100.00%> (+75.00%) ⬆️
...in/scala/org/apache/kyuubi/config/KyuubiConf.scala 97.25% <ø> (+97.25%) ⬆️
...ache/kyuubi/server/KyuubiRestFrontendService.scala 75.72% <100.00%> (+75.72%) ⬆️
.../org/apache/kyuubi/util/reflect/ReflectUtils.scala 78.12% <100.00%> (ø)
.../apache/kyuubi/plugin/spark/authz/ObjectType.scala 60.00% <50.00%> (+60.00%) ⬆️
...ubi/plugin/spark/authz/ranger/AccessResource.scala 86.95% <0.00%> (+86.95%) ⬆️
.../plugin/spark/authz/util/PermanentViewMarker.scala 85.71% <75.00%> (+85.71%) ⬆️
... and 7 more

... and 541 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@pan3793 pan3793 added this to the v1.8.0 milestone Oct 27, 2023
Copy link
Member

@pan3793 pan3793 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The analysis looks reasonable to me (Sorry I have no much knowledge about the internal of Flink).

One minor tip: use pure text as much as possible, because the picture is not searchable, it's hard to search engines for referencing and indexing.

@davidyuan1223
Copy link
Contributor Author

The analysis looks reasonable to me (Sorry I have no much knowledge about the internal of Flink).

One minor tip: use pure text as much as possible, because the picture is not searchable, it's hard to search engines for referencing and indexing.

ok, i will fix those questions in the future.

@pan3793
Copy link
Member

pan3793 commented Oct 27, 2023

Also, would you mind centralizing your analysis to the PR description? it will be included in the commit message, making it convenient for future explorers to understand the background of this change.

@davidyuan1223
Copy link
Contributor Author

davidyuan1223 commented Oct 27, 2023

The analysis looks reasonable to me (Sorry I have no much knowledge about the internal of Flink).

One minor tip: use pure text as much as possible, because the picture is not searchable, it's hard to search engines for referencing and indexing.

Also, would you mind centralizing your analysis to the PR description? it will be included in the commit message, making it convenient for future explorers to understand the background of this change.

of course, and i think we can add desc about kyuubi's flink max-rows conf. maybe there have someone makes the some problem like this bug, it can improve user to find the source problem quickly, what do you think?

(maybe i need more time to manage the commit history, i have same problem in other pr, Since I'm used to idea's git tool, the time it takes me to deal with this problem may not be solved until tomorrow...)

@pan3793
Copy link
Member

pan3793 commented Oct 27, 2023

@davidyuan1223 thanks for your patience, the clean commit history is just something nice to have, not a big deal. Your analysis of the issue is the most valuable thing. Even though this PR only contains one line change, I consider escalating the task level from easy to medium (if no objection from others) according to your deep dive of the root cause.

@pan3793
Copy link
Member

pan3793 commented Oct 27, 2023

... i think we can add desc about kyuubi's flink max-rows conf. ...

I am educated from your analysis left in #5531, I think it's good to inline those parts to the PR description

@davidyuan1223
Copy link
Contributor Author

... i think we can add desc about kyuubi's flink max-rows conf. ...

I am educated from your analysis left in #5531, I think it's good to inline those parts to the PR description

I have updated the content of PR, what do you think?

Copy link
Contributor

@bowenliang123 bowenliang123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
Thank you for your in-depth analysis and detailed explanation.
I am deeply moved by your dedication to technical excellence and attention to detail.

@davidyuan1223
Copy link
Contributor Author

LGTM. Thank you for your in-depth analysis and detailed explanation. I am deeply moved by your dedication to technical excellence and attention to detail.

Thank you for your compliment. I am still a newbie in the field of big data. I have a lot to learn from kyuubi and you.

…uubi/engine/flink/operation/FlinkOperationSuite.scala

Co-authored-by: Cheng Pan <pan3793@gmail.com>
@pan3793 pan3793 closed this in 26f614a Oct 30, 2023
pan3793 pushed a commit that referenced this pull request Oct 30, 2023
… the max rows setting

### _Why are the changes needed?_

#### 1. know about this pr
When we execute flink(1.17+) test case, it may throw exception when the test case is `show/stop job`, the exception desc like this
```
- execute statement - show/stop jobs *** FAILED ***
  org.apache.kyuubi.jdbc.hive.KyuubiSQLException: Error operating ExecuteStatement: org.apache.flink.table.gateway.service.utils.SqlExecutionException: Could not stop job 4dece26857fab91d63fad1abd8c6bdd0 with savepoint for operation 9ed8247a-b7bd-4004-875b-61ba654ab3dd.
	at org.apache.flink.table.gateway.service.operation.OperationExecutor.lambda$callStopJobOperation$11(OperationExecutor.java:628)
	at org.apache.flink.table.gateway.service.operation.OperationExecutor.runClusterAction(OperationExecutor.java:716)
	at org.apache.flink.table.gateway.service.operation.OperationExecutor.callStopJobOperation(OperationExecutor.java:601)
	at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:434)
	at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
	at org.apache.kyuubi.engine.flink.operation.ExecuteStatement.executeStatement(ExecuteStatement.scala:64)
	at org.apache.kyuubi.engine.flink.operation.ExecuteStatement.runInternal(ExecuteStatement.scala:56)
	at org.apache.kyuubi.operation.AbstractOperation.run(AbstractOperation.scala:171)
	at org.apache.kyuubi.session.AbstractSession.runOperation(AbstractSession.scala:101)
	at org.apache.kyuubi.session.AbstractSession.$anonfun$executeStatement$1(AbstractSession.scala:131)
	at org.apache.kyuubi.session.AbstractSession.withAcquireRelease(AbstractSession.scala:82)
	at org.apache.kyuubi.session.AbstractSession.executeStatement(AbstractSession.scala:128)
	at org.apache.kyuubi.service.AbstractBackendService.executeStatement(AbstractBackendService.scala:67)
	at org.apache.kyuubi.service.TFrontendService.ExecuteStatement(TFrontendService.scala:252)
	at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1557)
	at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1542)
	at org.apache.kyuubi.shade.org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
	at org.apache.kyuubi.shade.org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
	at org.apache.kyuubi.service.authentication.TSetIpAddressProcessor.process(TSetIpAddressProcessor.scala:36)
	at org.apache.kyuubi.shade.org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1[149](https://github.com/apache/kyuubi/actions/runs/6649714451/job/18068699087?pr=5501#step:8:150))
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
	at org.apache.flink.table.gateway.service.operation.OperationExecutor.lambda$callStopJobOperation$11(OperationExecutor.java:617)
	... 22 more
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
	at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
	at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
	at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:925)
	at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:913)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
	at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1298)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)
	at akka.dispatch.OnComplete.internal(Future.scala:299)
	at akka.dispatch.OnComplete.internal(Future.scala:297)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
  at org.apache.kyuubi.operation.AbstractOperation.run(AbstractOperation.scala:171)
  ...
  Cause: java.lang.RuntimeException: org.apache.flink.util.SerializedThrowable:java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
  at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
  at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
  at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:925)
  at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:913)
  at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
  at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
  at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260)
  at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
  at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
  at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
  ...
  Cause: java.lang.RuntimeException: org.apache.flink.util.SerializedThrowable:org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
  at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:143)
  at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:105)
  at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:[160](https://github.com/apache/kyuubi/actions/runs/6649714451/job/18068699087?pr=5501#step:8:161)4)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
  at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
  at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:[168](https://github.com/apache/kyuubi/actions/runs/6649714451/job/18068699087?pr=5501#step:8:169))
  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
  ...
```
#### 2. what make the test case failed?
If we want know the reason about the exception, we need to understand the process of flink executing stop job, the process line like this code space show(it's source is our bad test case, we can use this test case to solve similar problems)
```
1. sql
   1.1 create table tbl_a (a int) with ('connector' = 'datagen','rows-per-second'='10')
   1.2 create table tbl_b (a int) with ('connector' = 'blackhole')
   1.3 insert into tbl_b select * from tbl_a
2. start job: it will get 2 tasks abount source sink
3. show job: we can get job info
4. stop job(the main error):
   4.1 stop job need checkpoint
   4.2 start checkpoint, it need all task state is running
   4.3 checkpoint can not get all task state is running, then throw the exception
```
Actually, in a normal process, it should not throw the exception, if this happens to your job, please check your kyuubi conf `kyuubi.session.engine.flink.max.rows`, it's default value is 1000000. But in the test case, we only the the conf's value is 10.
It's the reason to make the error, this conf makes when we execute a stream query, it will cancel the when the limit is reached. Because flink's datagen is a streamconnector, so we can imagine, when we execute those sql, because our conf, it will make the sink task be canceled because the query reached 10. So when we execute stop job, flink checkpoint cannot get the tasks about this job is all in running state, then flink throw this exception.
#### 3. how can we solve this problem?
When your job makes the same exception, please make sure your kyuubi conf `kyuubi.session.engine.flink.max.rows`'s value can it meet your streaming query needs? Then changes the conf's value.

close #5531

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No

Closes #5549 from davidyuan1223/fix_flink_test_bug.

Closes #5531

ce7fd79 [david yuan] Update externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
dc3a4b9 [davidyuan] fix flink on yarn test bug
86a647a [davidyuan] fix flink on yarn test bug
cbd4c0c [davidyuan] fix flink on yarn test bug
8b51840 [davidyuan] add common method to get session level config
bcb0cf3 [davidyuan] Merge remote-tracking branch 'origin/master'
72e7aea [david yuan] Merge branch 'apache:master' into master
57ec746 [david yuan] Merge pull request #13 from davidyuan1223/fix
56b91a3 [yuanfuyuan] fix_4186
c8eb9a2 [david yuan] Merge branch 'apache:master' into master
2beccb6 [david yuan] Merge branch 'apache:master' into master
0925a4b [david yuan] Merge pull request #12 from davidyuan1223/revert-11-fix_4186
40e80d9 [david yuan] Revert "fix_4186"
c83836b [david yuan] Merge pull request #11 from davidyuan1223/fix_4186
360d183 [david yuan] Merge branch 'master' into fix_4186
b616044 [yuanfuyuan] fix_4186
e244029 [david yuan] Merge branch 'apache:master' into master
bfa6cbf [davidyuan1223] Merge branch 'apache:master' into master
16237c2 [davidyuan1223] Merge branch 'apache:master' into master
c48ad38 [yuanfuyuan] remove the used blank lines
55a0a43 [xiaoyuandajian] Merge pull request #10 from xiaoyuandajian/fix-#4057
cb11935 [yuan] Merge remote-tracking branch 'origin/fix-#4057' into fix-#4057
86e4e1c [yuan] fix-#4057 info: modify the shellcheck errors file in ./bin 1. "$@" is a array, we want use string to compare. so update "$@" => "$*" 2. `tty` mean execute the command, we can use $(tty) replace it 3. param $# is a number, compare number should use -gt/-lt,not >/< 4. not sure the /bin/kyuubi line 63 'exit -1' need modify? so the directory bin only have a shellcheck note in /bin/kyuubi
dd39efd [袁福元] fix-#4057 info: 1. "$@" is a array, we want use string to compare. so update "$@" => "$*" 2. `tty` mean execute the command, we can use $(tty) replace it 3. param $# is a number, compare number should use -gt/-lt,not >/<

Lead-authored-by: davidyuan <yuanfuyuan@mafengwo.com>
Co-authored-by: david yuan <51512358+davidyuan1223@users.noreply.github.com>
Co-authored-by: yuanfuyuan <1406957364@qq.com>
Co-authored-by: yuan <yuanfuyuan@mafengwo.com>
Co-authored-by: davidyuan1223 <51512358+davidyuan1223@users.noreply.github.com>
Co-authored-by: xiaoyuandajian <51512358+xiaoyuandajian@users.noreply.github.com>
Co-authored-by: 袁福元 <yuanfuyuan@mafengwo.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
(cherry picked from commit 26f614a)
Signed-off-by: Cheng Pan <chengpan@apache.org>
@pan3793
Copy link
Member

pan3793 commented Oct 30, 2023

Thanks, merged to master/1.8

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[TASK][MEDIUM] Fix flaky FlinkOperationOnYarnSuite
6 participants