Skip to content

Commit

Permalink
[KYUUBI #1829] Support executing statement async for Flink SQL engine
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_

Support executing statement async for Flink SQL engine.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1845 from SteNicholas/KYUUBI-1829.

Closes #1829

956739c [SteNicholas] [KYUUBI #1829] Support executing statement async for Flink SQL engine
4ccf4b4 [SteNicholas] [KYUUBI #1829] Support executing statement async for Flink SQL engine
616ddd2 [SteNicholas] [KYUUBI #1829] Support executing statement async for Flink SQL engine

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: yanghua <yanghua1127@gmail.com>
  • Loading branch information
SteNicholas authored and yanghua committed Jan 29, 2022
1 parent cebc03a commit 34043ae
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 8 deletions.
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import com.google.common.annotations.VisibleForTesting
import org.apache.calcite.rel.metadata.{DefaultRelMetadataProvider, JaninoRelMetadataProvider, RelMetadataQueryBase}
import org.apache.flink.table.api.{DataTypes, ResultKind}
import org.apache.flink.table.catalog.Column
import org.apache.flink.table.client.gateway.{Executor, TypedResult}
Expand Down Expand Up @@ -75,11 +76,11 @@ class ExecuteStatement(
val asyncOperation = new Runnable {
override def run(): Unit = {
OperationLog.setCurrentOperationLog(operationLog)
executeStatement()
}
}

try {
executeStatement()
val flinkSQLSessionManager = session.sessionManager
val backgroundHandle = flinkSQLSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
Expand All @@ -100,6 +101,9 @@ class ExecuteStatement(
try {
setState(OperationState.RUNNING)

// set the thread variable THREAD_PROVIDERS
RelMetadataQueryBase.THREAD_PROVIDERS.set(
JaninoRelMetadataProvider.of(DefaultRelMetadataProvider.INSTANCE))
val operation = executor.parseStatement(sessionId, statement)
operation match {
case queryOperation: QueryOperation => runQueryOperation(queryOperation)
Expand Down
Expand Up @@ -114,9 +114,7 @@ abstract class FlinkOperation(
protected def cleanup(targetState: OperationState): Unit = state.synchronized {
if (!isTerminalState(state)) {
setState(targetState)
if (shouldRunAsync) {
Option(getBackgroundHandle).foreach(_.cancel(true))
}
Option(getBackgroundHandle).foreach(_.cancel(true))
}
}

Expand Down
Expand Up @@ -22,6 +22,7 @@ import java.sql.DatabaseMetaData
import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE
import org.apache.flink.table.types.logical.LogicalTypeRoot
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsReq, TOpenSessionReq}
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._

Expand All @@ -35,7 +36,7 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
override def withKyuubiConf: Map[String, String] = Map()

override protected def jdbcUrl: String =
s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/"
s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"

ignore("release session if shared level is CONNECTION") {
logger.info(s"jdbc url is $jdbcUrl")
Expand Down Expand Up @@ -734,4 +735,27 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
assert(resultSet.getString(1) === "a")
}
}

test("async execute statement - select column name with dots") {
withThriftClient { client =>
val tOpenSessionReq = new TOpenSessionReq()
tOpenSessionReq.setUsername("kentyao")
tOpenSessionReq.setPassword("anonymous")
val tOpenSessionResp = client.OpenSession(tOpenSessionReq)
val tExecuteStatementReq = new TExecuteStatementReq()
tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
tExecuteStatementReq.setRunAsync(true)
tExecuteStatementReq.setStatement("select 'tmp.hello'")
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
val operationHandle = tExecuteStatementResp.getOperationHandle
waitForOperationToComplete(client, operationHandle)
val tFetchResultsReq = new TFetchResultsReq()
tFetchResultsReq.setOperationHandle(operationHandle)
tFetchResultsReq.setFetchType(2)
tFetchResultsReq.setMaxRows(1000)
val tFetchResultsResp = client.FetchResults(tFetchResultsReq)
assert(tFetchResultsResp.getResults.getColumns.get(0)
.getStringVal.getValues.get(0) === "tmp.hello")
}
}
}
Expand Up @@ -323,9 +323,6 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with

test("basic open | execute | close") {
withThriftClient { client =>
val operationManager =
engine.backendService.sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager]

val req = new TOpenSessionReq()
req.setUsername("kentyao")
req.setPassword("anonymous")
Expand Down

0 comments on commit 34043ae

Please sign in to comment.