Skip to content

[BUG] [kyuubi-flink-sql-engine] FlinkSQL returns duplicate results when executing query statement #4083

@bobo495

Description

@bobo495

Code of Conduct

Search before asking

  • I have searched in the issues and found no similar issues.

Describe the bug

The runQueryOperation method repeats the snapshotResult every 50ms and adds it to the rows. At this point, the Query will not have to return the result until the accumulation of rows rows over kyuubi.session.engine.flink.max.rows set value or FlinkSQL end.

It is not reasonable to add snapshotResult repeatedly to rows in runQueryOperation. Should the result be returned directly after snapshotResult is fetched?

These are the two changes I suggested. Are they appropriate?

  1. When snapshotResult gets the result, jumps out of the loop.
if (rows.size != 0) {
    loop = false
}
  1. Loop sets the maximum timeout to avoid being unable to exit the loop if the result is null.

Environment

  • Flink 1.14.0
  • Kyuubi 1.7.0-SNAPSHOT(Compile from master)

Reference UT

  test("data is repeatedly added to the resultSet") {
    withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "20"))(Map.empty) {
      withJdbcStatement() { statement =>
        statement.execute(
          """
            |create table tbl_src (
            |           a bigint
            |           ) with (
            |           'connector' = 'datagen',
            |          'rows-per-second'='1',
            |          'fields.a.kind'='sequence',
            |          'fields.a.start'='1',
            |          'fields.a.end'='5'
            |          )
            |""".stripMargin)
        val resultSet = statement.executeQuery(s"select a from tbl_src")
        var rows = List[Long]()
        while (resultSet.next()) {
          rows :+= resultSet.getLong("a")
        }
        // rows size more than the input data
        assert(rows.size <= 5)
      }
    }
  }

shot_2023-01-04_18 05 38

Affects Version(s)

master

Kyuubi Server Log Output

No response

Kyuubi Engine Log Output

No response

Kyuubi Server Configurations

kyuubi.engine.type FLINK_SQL

Kyuubi Engine Configurations

No response

Additional context

Test Case

Case1

kyuubi-defaults.conf uses the default configuration and generates an unbound source through datagen. At this point, Kyuubi does not have a result output for a long time, and the FlinkSQL job does not finish.

Connected to: Apache Flink (version 1.14.0)
Driver: Kyuubi Project Hive JDBC Client (version 1.7.0-SNAPSHOT)
Beeline version 1.7.0-SNAPSHOT by Apache Kyuubi (Incubating)
0: jdbc:hive2://localhost:10009/> create table tbl_src (
. . . . . . . . . . . . . . . . . >            a bigint
. . . . . . . . . . . . . . . . . >            ) with (
. . . . . . . . . . . . . . . . . >            'connector' = 'datagen',
. . . . . . . . . . . . . . . . . >           'rows-per-second'='1'
. . . . . . . . . . . . . . . . . >           );
...
query[7ca29418-03ff-40d9-bdba-b46eedb560b3]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.001 seconds
+---------+
| result  |
+---------+
+---------+
No rows selected (0.066 seconds)
0: jdbc:hive2://localhost:10009/> select * from tbl_src;

Case2

kyuubi-defaults.conf uses the default configuration and generates an bound source through datagen. At this point, duplicate results are returned after the FlinkSQL job status isFINISHED.

Connected to: Apache Flink (version 1.14.0)
Driver: Kyuubi Project Hive JDBC Client (version 1.7.0-SNAPSHOT)
Beeline version 1.7.0-SNAPSHOT by Apache Kyuubi (Incubating)
0: jdbc:hive2://localhost:10009/> create table tbl_src (
. . . . . . . . . . . . . . . . . >            a bigint
. . . . . . . . . . . . . . . . . >            ) with (
. . . . . . . . . . . . . . . . . >            'connector' = 'datagen',
. . . . . . . . . . . . . . . . . >           'rows-per-second'='1',
. . . . . . . . . . . . . . . . . >           'fields.a.kind'='sequence',
. . . . . . . . . . . . . . . . . >           'fields.a.start'='1',
. . . . . . . . . . . . . . . . . >           'fields.a.end'='5'
. . . . . . . . . . . . . . . . . >           );
...
query[fb4fbfe3-7e7c-45d0-97a2-f4b5438714ee]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.004 seconds
+---------+
| result  |
+---------+
+---------+
No rows selected (0.058 seconds)
0: jdbc:hive2://localhost:10009/> select * from tbl_src;
...
2023-01-04 10:06:20.336 WARN org.apache.flink.streaming.api.operators.collect.CollectResultFetcher: An exception occurred when fetching query results
java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (ca16d67ca30cfe4ea6e3fc0f93c65d21)
	at org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:917)
	at org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:931)
	at org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordinator(Dispatcher.java:719)
	at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

End of exception on server side>]
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_211]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_211]
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:163) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:128) [flink-dist_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) [flink-dist_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) [flink-dist_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370) [flink-table_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:74) [flink-sql-client_2.12-1.14.0.jar:1.14.0]
...
query[e1581803-04c4-4fb0-a09b-f2e8ffd0b157]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.02 seconds
+----+
| a  |
+----+
| 1  |
| 2  |
| 3  |
| 4  |
| 1  |
| 2  |
| 3  |
| 4  |
...
| 1  |
| 2  |
| 3  |
| 4  |
| 5  |
| 1  |
| 2  |
| 3  |
| 4  |
| 5  |
+----+
270 rows selected (6.675 seconds)

Case3

kyuubi-defaults.conf set kyuubi.session.engine.flink.max.rows 100, and generates an bound source through datagen. At this point, 100 duplicate results will be returned and the FlinkSQL job state will be 'CANCELED'.

Driver: Kyuubi Project Hive JDBC Client (version 1.7.0-SNAPSHOT)
Beeline version 1.7.0-SNAPSHOT by Apache Kyuubi (Incubating)
0: jdbc:hive2://localhost:10009/> create table tbl_src (
. . . . . . . . . . . . . . . . . >            a bigint
. . . . . . . . . . . . . . . . . >            ) with (
. . . . . . . . . . . . . . . . . >            'connector' = 'datagen',
. . . . . . . . . . . . . . . . . >           'rows-per-second'='1',
. . . . . . . . . . . . . . . . . >           'fields.a.kind'='sequence',
. . . . . . . . . . . . . . . . . >           'fields.a.start'='1',
. . . . . . . . . . . . . . . . . >           'fields.a.end'='5'
. . . . . . . . . . . . . . . . . >           );
...
query[31a19428-0f34-4b2e-9f21-1acf8e5e1885]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.031 seconds
+---------+
| result  |
+---------+
| OK      |
+---------+
1 row selected (0.315 seconds)
0: jdbc:hive2://localhost:10009/> select * from tbl_src;
...
2023-01-04 10:14:14.304 WARN org.apache.flink.streaming.api.operators.collect.CollectResultFetcher: Interrupted when sleeping before a retry
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method) ~[?:1.8.0_211]
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sleepBeforeRetry(CollectResultFetcher.java:237) [flink-dist_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:113) [flink-dist_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) [flink-dist_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) [flink-dist_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370) [flink-table_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:74) [flink-sql-client_2.12-1.14.0.jar:1.14.0]
...
2023-01-04 10:14:14.315 INFO org.apache.kyuubi.engine.flink.operation.ExecuteStatement: Processing anonymous's query[063b61fe-8e20-4325-ac25-762f65b3ccf2]: RUNNING_STATE -> FINISHED_STATE, time taken: 4.648 seconds
2023-01-04 10:14:14.321 INFO org.apache.kyuubi.operation.ExecuteStatement: Query[4e003511-c4c9-4661-bd48-d77822f7352c] in FINISHED_STATE
2023-01-04 10:14:14.322 INFO org.apache.kyuubi.operation.ExecuteStatement: Processing anonymous's query[4e003511-c4c9-4661-bd48-d77822f7352c]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.004 seconds
+----+
| a  |
+----+
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 2  |
| 1  |
| 2  |
...
| 1  |
| 2  |
| 3  |
| 1  |
| 2  |
+----+
100 rows selected (4.702 seconds)

Are you willing to submit PR?

  • Yes. I would be willing to submit a PR with guidance from the Kyuubi community to fix.
  • No. I cannot submit a PR at this time.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions