Skip to content

Commit

Permalink
[KYUUBI #3133] Always run Flink statement in sync mode
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

Currently, the Flink SQL engine does not work w/ Flink 1.15, it failed w/

```
- set kyuubi conf into flink conf *** FAILED ***
  org.apache.kyuubi.jdbc.hive.KyuubiSQLException: org.apache.kyuubi.KyuubiSQLException: org.apache.kyuubi.KyuubiSQLException: Error operating ExecuteStatement: java.lang.NoClassDefFoundError: org/apache/calcite/rel/metadata/RelMetadataQueryBase
	at org.apache.kyuubi.engine.flink.operation.ExecuteStatement.org$apache$kyuubi$engine$flink$operation$ExecuteStatement$$executeStatement(ExecuteStatement.scala:102)
	at org.apache.kyuubi.engine.flink.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:76)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

	at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
	at org.apache.kyuubi.engine.flink.operation.FlinkOperation$$anonfun$onError$1.applyOrElse(FlinkOperation.scala:120)
	at org.apache.kyuubi.engine.flink.operation.FlinkOperation$$anonfun$onError$1.applyOrElse(FlinkOperation.scala:105)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:34)
	at org.apache.kyuubi.engine.flink.operation.ExecuteStatement.org$apache$kyuubi$engine$flink$operation$ExecuteStatement$$executeStatement(ExecuteStatement.scala:121)
	at org.apache.kyuubi.engine.flink.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:76)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NoClassDefFoundError: org/apache/calcite/rel/metadata/RelMetadataQueryBase
	at org.apache.kyuubi.engine.flink.operation.ExecuteStatement.org$apache$kyuubi$engine$flink$operation$ExecuteStatement$$executeStatement(ExecuteStatement.scala:102)
	... 6 more

	at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
	at org.apache.kyuubi.operation.ExecuteStatement.waitStatementComplete(ExecuteStatement.scala:119)
	at org.apache.kyuubi.operation.ExecuteStatement.$anonfun$runInternal$1(ExecuteStatement.scala:154)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
```

The NoClassDefFoundError is because that FLINK-24427 sealed calcite classes then we can not access them in the current classloader.

https://github.com/apache/incubator-kyuubi/blob/64090f502689fe583c18fa330cd7f28c03379ba3/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala#L102-L103

The code is going to update the thread local value required for async execution.

Actually, I think there is not much benefit to run statements in async mode, so just ignore the `runAsync` and always run in sync mode.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3133 from pan3793/flink-sync.

Closes #3133

d603777 [Cheng Pan] Always run Flink statement in sync mode

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
pan3793 committed Jul 26, 2022
1 parent 0e91019 commit 976af3d
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,21 @@ package org.apache.kyuubi.engine.flink.operation

import java.time.LocalDate
import java.util
import java.util.concurrent.RejectedExecutionException

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQueryBase}
import org.apache.flink.table.api.ResultKind
import org.apache.flink.table.client.gateway.TypedResult
import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryMapData}
import org.apache.flink.table.operations.{Operation, QueryOperation}
import org.apache.flink.table.operations.command._
import org.apache.flink.table.planner.plan.metadata.FlinkDefaultRelMetadataProvider
import org.apache.flink.table.types.DataType
import org.apache.flink.table.types.logical._
import org.apache.flink.types.Row

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.Logging
import org.apache.kyuubi.engine.flink.result.ResultSet
import org.apache.kyuubi.engine.flink.schema.RowSet.toHiveString
import org.apache.kyuubi.operation.OperationState
Expand Down Expand Up @@ -69,38 +66,12 @@ class ExecuteStatement(

override protected def runInternal(): Unit = {
addTimeoutMonitor(queryTimeout)
if (shouldRunAsync) {
val asyncOperation = new Runnable {
override def run(): Unit = {
OperationLog.setCurrentOperationLog(operationLog)
executeStatement()
}
}

try {
val flinkSQLSessionManager = session.sessionManager
val backgroundHandle = flinkSQLSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
val ke =
KyuubiSQLException("Error submitting query in background, query rejected", rejected)
setOperationException(ke)
throw ke
}
} else {
executeStatement()
}
executeStatement()
}

private def executeStatement(): Unit = {
try {
setState(OperationState.RUNNING)

// set the thread variable THREAD_PROVIDERS
RelMetadataQueryBase.THREAD_PROVIDERS.set(
JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE))
val operation = executor.parseStatement(sessionId, statement)
operation match {
case queryOperation: QueryOperation => runQueryOperation(queryOperation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
resultMaxRowsDefault.toString).toInt
val op = OperationModes.withName(mode.toUpperCase(Locale.ROOT)) match {
case NONE =>
new ExecuteStatement(session, statement, runAsync, queryTimeout, resultMaxRows)
// FLINK-24427 seals calcite classes which required to access in async mode, considering
// there is no much benefit in async mode, here we just ignore `runAsync` and always run
// statement in sync mode as a workaround
new ExecuteStatement(session, statement, false, queryTimeout, resultMaxRows)
case mode =>
new PlanOnlyStatement(session, statement, mode)
}
Expand Down

0 comments on commit 976af3d

Please sign in to comment.