From b37d62d44fa20a165901cc7c747635d27291e093 Mon Sep 17 00:00:00 2001 From: "kandy01.wang" Date: Wed, 25 Oct 2023 15:55:08 +0800 Subject: [PATCH 1/6] [KYUUBI #5507] [FLINK] Support Initialize SQL in Flink Engine --- .../flink/session/FlinkSessionImpl.scala | 19 ++++ .../flink/WithFlinkSQLEngineLocal.scala | 3 +- .../FlinkEngineInitializeSuite.scala | 105 ++++++++++++++++++ 3 files changed, 125 insertions(+), 2 deletions(-) create mode 100644 externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala index b8d1f85692b..50096793651 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala @@ -28,6 +28,7 @@ import org.apache.flink.table.gateway.service.session.{Session => FSession} import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion} import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.flink.FlinkEngineUtils import org.apache.kyuubi.engine.flink.udf.KDFRegistry @@ -64,6 +65,24 @@ class FlinkSessionImpl( override def open(): Unit = { val executor = fSession.createExecutor(Configuration.fromMap(fSession.getSessionConfig)) + sessionManager.getConf.get(ENGINE_INITIALIZE_SQL).foreach { sql => + try { + executor.executeStatement(OperationHandle.create, sql) + } catch { + case NonFatal(e) => + throw KyuubiSQLException(s"execute ${ENGINE_INITIALIZE_SQL.key} $sql ", e) + } + } + + sessionManager.getConf.get(ENGINE_SESSION_INITIALIZE_SQL).foreach { sql => + try { + executor.executeStatement(OperationHandle.create, sql) + } catch { + case NonFatal(e) => + throw KyuubiSQLException(s"execute ${ENGINE_SESSION_INITIALIZE_SQL.key} $sql ", e) + } + } + val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) => Array(USE_CATALOG, USE_DATABASE).contains(k) } diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala index 92c1bcd83fc..ccaefb496b0 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala @@ -45,7 +45,7 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources private var zkServer: EmbeddedZookeeper = _ - protected val conf: KyuubiConf = FlinkSQLEngine.kyuubiConf + protected val conf: KyuubiConf = new KyuubiConf(false) protected def engineRefId: String @@ -60,7 +60,6 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources } } withKyuubiConf.foreach { case (k, v) => - System.setProperty(k, v) conf.set(k, v) } diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala new file mode 100644 index 00000000000..c5d23bd7902 --- /dev/null +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.flink.operation + +import java.util.UUID + +import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY +import org.apache.kyuubi.engine.ShareLevel +import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, WithFlinkSQLEngineLocal} +import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE} +import org.apache.kyuubi.operation.{HiveJDBCTestHelper, NoneMode} + +class FlinkEngineInitializeSuite extends HiveJDBCTestHelper + with WithDiscoveryFlinkSQLEngine with WithFlinkSQLEngineLocal { + + protected def jdbcUrl: String = getFlinkEngineServiceUrl + + protected val ENGINE_INITIALIZE_SQL_VALUE: String = + s""" + create catalog cat_a with ('type'='generic_in_memory'); + create table blackhole(i int) with ('connector'='blackhole') + """ + protected val ENGINE_SESSION_INITIALIZE_SQL_VALUE: String = + s""" + create table datagen(i int) with ( + 'connector'='datagen', + 'fields.i.kind'='sequence', + 'fields.i.start'='1', + 'fields.i.end'='10') + """ + + override def withKyuubiConf: Map[String, String] = { + Map( + "flink.execution.target" -> "remote", + "flink.high-availability.cluster-id" -> "flink-mini-cluster", + "flink.app.name" -> "kyuubi_connection_flink_kandy", + HA_NAMESPACE.key -> namespace, + HA_ENGINE_REF_ID.key -> engineRefId, + ENGINE_TYPE.key -> "FLINK_SQL", + ENGINE_SHARE_LEVEL.key -> shareLevel, + OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name, + ENGINE_INITIALIZE_SQL.key -> ENGINE_INITIALIZE_SQL_VALUE, + ENGINE_SESSION_INITIALIZE_SQL.key -> ENGINE_SESSION_INITIALIZE_SQL_VALUE, + KYUUBI_SESSION_USER_KEY -> "kandy") + } + + override protected def engineRefId: String = UUID.randomUUID().toString + + def namespace: String = "/kyuubi/flink-local-engine-test" + + def shareLevel: String = ShareLevel.USER.toString + + def engineType: String = "flink" + + test("execute statement - kyuubi engine initialize") { + withJdbcStatement() { statement => + var resultSet = statement.executeQuery("show catalogs") + val expectedCatalogs = Set("default_catalog", "cat_a") + while (resultSet.next()) { + assert(expectedCatalogs.contains(resultSet.getString(1))) + } + assert(!resultSet.next()) + + resultSet = statement.executeQuery("show databases") + assert(resultSet.next()) + assert(resultSet.getString(1) === "default_database") + assert(!resultSet.next()) + + val expectedTables = Set("blackhole", "datagen") + resultSet = statement.executeQuery("show tables") + while (resultSet.next()) { + assert(expectedTables.contains(resultSet.getString(1))) + } + assert(!resultSet.next()) + + var dropResult = statement.executeQuery("drop catalog cat_a") + assert(dropResult.next()) + assert(dropResult.getString(1) === "OK") + + dropResult = statement.executeQuery("drop table blackhole") + assert(dropResult.next()) + assert(dropResult.getString(1) === "OK") + + dropResult = statement.executeQuery("drop table datagen") + assert(dropResult.next()) + assert(dropResult.getString(1) === "OK") + } + } +} From a29ac38609c0f4e1fc6680e0bb215b810f6ce038 Mon Sep 17 00:00:00 2001 From: Paul Lin Date: Thu, 2 Nov 2023 00:21:23 +0800 Subject: [PATCH 2/6] [KYUUBI-5507] Run engine initial SQL at Engine start --- .../kyuubi/engine/flink/FlinkSQLEngine.scala | 25 +++++++++++-------- .../flink/session/FlinkSessionImpl.scala | 9 ------- .../FlinkEngineInitializeSuite.scala | 6 +++-- 3 files changed, 19 insertions(+), 21 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala index 8838799bc24..3d502f38f75 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala @@ -32,6 +32,7 @@ import org.apache.flink.table.gateway.service.context.DefaultContext import org.apache.kyuubi.{Logging, Utils} import org.apache.kyuubi.Utils.{addShutdownHook, currentUser, FLINK_ENGINE_SHUTDOWN_PRIORITY} import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.ENGINE_INITIALIZE_SQL import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, KYUUBI_SESSION_USER_KEY} import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, currentEngine} import org.apache.kyuubi.service.Serverable @@ -102,9 +103,7 @@ object FlinkSQLEngine extends Logging { startEngine(engineContext) info("Flink engine started") - if ("yarn-application".equalsIgnoreCase(executionTarget)) { - bootstrapFlinkApplicationExecutor() - } + bootstrap(executionTarget) // blocking main thread countDownLatch.await() @@ -129,15 +128,21 @@ object FlinkSQLEngine extends Logging { } } - private def bootstrapFlinkApplicationExecutor() = { - // trigger an execution to initiate EmbeddedExecutor with the default flink conf + private def bootstrap(executionTarget: String) = { val flinkConf = new Configuration() - flinkConf.set(PipelineOptions.NAME, "kyuubi-bootstrap-sql") - debug(s"Running bootstrap Flink SQL in application mode with flink conf: $flinkConf.") val tableEnv = TableEnvironment.create(flinkConf) - val res = tableEnv.executeSql("select 'kyuubi'") - res.await() - info("Bootstrap Flink SQL finished.") + + if ("yarn-application".equalsIgnoreCase(executionTarget)) { + // trigger an execution to initiate EmbeddedExecutor with the default flink conf + flinkConf.set(PipelineOptions.NAME, "kyuubi-bootstrap-sql") + debug(s"Running bootstrap Flink SQL in application mode with flink conf: $flinkConf.") + tableEnv.executeSql("select 'kyuubi'").await() + } + + kyuubiConf.get(ENGINE_INITIALIZE_SQL).foreach( + tableEnv.executeSql(_).await()) + + info("Bootstrap SQL finished.") } private def setDeploymentConf(executionTarget: String, flinkConf: Configuration): Unit = { diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala index 50096793651..5f8f0b8c049 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala @@ -65,15 +65,6 @@ class FlinkSessionImpl( override def open(): Unit = { val executor = fSession.createExecutor(Configuration.fromMap(fSession.getSessionConfig)) - sessionManager.getConf.get(ENGINE_INITIALIZE_SQL).foreach { sql => - try { - executor.executeStatement(OperationHandle.create, sql) - } catch { - case NonFatal(e) => - throw KyuubiSQLException(s"execute ${ENGINE_INITIALIZE_SQL.key} $sql ", e) - } - } - sessionManager.getConf.get(ENGINE_SESSION_INITIALIZE_SQL).foreach { sql => try { executor.executeStatement(OperationHandle.create, sql) diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala index c5d23bd7902..8e809a8ce3c 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala @@ -33,11 +33,13 @@ class FlinkEngineInitializeSuite extends HiveJDBCTestHelper protected val ENGINE_INITIALIZE_SQL_VALUE: String = s""" - create catalog cat_a with ('type'='generic_in_memory'); - create table blackhole(i int) with ('connector'='blackhole') + show databases; """ + protected val ENGINE_SESSION_INITIALIZE_SQL_VALUE: String = s""" + create catalog cat_a with ('type'='generic_in_memory'); + create table blackhole(i int) with ('connector'='blackhole'); create table datagen(i int) with ( 'connector'='datagen', 'fields.i.kind'='sequence', From 2ce8031818d63131dc4435feda04a75e5adeefe8 Mon Sep 17 00:00:00 2001 From: Paul Lin Date: Thu, 2 Nov 2023 00:28:24 +0800 Subject: [PATCH 3/6] [KYUUBI-5507] Improve tests --- .../flink/operation/FlinkEngineInitializeSuite.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala index 8e809a8ce3c..9388b0bc317 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala @@ -38,7 +38,7 @@ class FlinkEngineInitializeSuite extends HiveJDBCTestHelper protected val ENGINE_SESSION_INITIALIZE_SQL_VALUE: String = s""" - create catalog cat_a with ('type'='generic_in_memory'); + create catalog cat_b with ('type'='generic_in_memory'); create table blackhole(i int) with ('connector'='blackhole'); create table datagen(i int) with ( 'connector'='datagen', @@ -73,11 +73,12 @@ class FlinkEngineInitializeSuite extends HiveJDBCTestHelper test("execute statement - kyuubi engine initialize") { withJdbcStatement() { statement => var resultSet = statement.executeQuery("show catalogs") - val expectedCatalogs = Set("default_catalog", "cat_a") + val expectedCatalogs = Set("default_catalog", "cat_b") + var actualCatalogs = Set[String]() while (resultSet.next()) { - assert(expectedCatalogs.contains(resultSet.getString(1))) + actualCatalogs += resultSet.getString(1) } - assert(!resultSet.next()) + assert(expectedCatalogs.subsetOf(actualCatalogs)) resultSet = statement.executeQuery("show databases") assert(resultSet.next()) @@ -91,7 +92,7 @@ class FlinkEngineInitializeSuite extends HiveJDBCTestHelper } assert(!resultSet.next()) - var dropResult = statement.executeQuery("drop catalog cat_a") + var dropResult = statement.executeQuery("drop catalog cat_b") assert(dropResult.next()) assert(dropResult.getString(1) === "OK") From 13035f3e9336cfb7ed4162ebb87167a4c2686197 Mon Sep 17 00:00:00 2001 From: Paul Lin Date: Thu, 2 Nov 2023 12:43:10 +0800 Subject: [PATCH 4/6] Update externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala Co-authored-by: Cheng Pan --- .../org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala index 3d502f38f75..db50c1c33f8 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala @@ -139,8 +139,9 @@ object FlinkSQLEngine extends Logging { tableEnv.executeSql("select 'kyuubi'").await() } - kyuubiConf.get(ENGINE_INITIALIZE_SQL).foreach( - tableEnv.executeSql(_).await()) + kyuubiConf.get(ENGINE_INITIALIZE_SQL).foreach { stmt => + tableEnv.executeSql(stmt).await() + } info("Bootstrap SQL finished.") } From 07827382465328aeaf79f0899efc93125ec33100 Mon Sep 17 00:00:00 2001 From: Paul Lin Date: Thu, 2 Nov 2023 12:48:24 +0800 Subject: [PATCH 5/6] [KYUUBI-5507] Improve codestyle --- .../flink/operation/FlinkEngineInitializeSuite.scala | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala index 9388b0bc317..63a5179683a 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala @@ -32,20 +32,16 @@ class FlinkEngineInitializeSuite extends HiveJDBCTestHelper protected def jdbcUrl: String = getFlinkEngineServiceUrl protected val ENGINE_INITIALIZE_SQL_VALUE: String = - s""" - show databases; - """ + "show databases;" protected val ENGINE_SESSION_INITIALIZE_SQL_VALUE: String = - s""" - create catalog cat_b with ('type'='generic_in_memory'); + """create catalog cat_b with ('type'='generic_in_memory'); create table blackhole(i int) with ('connector'='blackhole'); - create table datagen(i int) with ( + create table datagen(i int) with ( 'connector'='datagen', 'fields.i.kind'='sequence', 'fields.i.start'='1', - 'fields.i.end'='10') - """ + 'fields.i.end'='10')""".stripMargin override def withKyuubiConf: Map[String, String] = { Map( From b1720ecde24fc0ce43643718784c7b05eda45747 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 2 Nov 2023 18:17:11 +0800 Subject: [PATCH 6/6] Update externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala --- .../flink/operation/FlinkEngineInitializeSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala index 63a5179683a..db174e50125 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala @@ -36,12 +36,12 @@ class FlinkEngineInitializeSuite extends HiveJDBCTestHelper protected val ENGINE_SESSION_INITIALIZE_SQL_VALUE: String = """create catalog cat_b with ('type'='generic_in_memory'); - create table blackhole(i int) with ('connector'='blackhole'); - create table datagen(i int) with ( - 'connector'='datagen', - 'fields.i.kind'='sequence', - 'fields.i.start'='1', - 'fields.i.end'='10')""".stripMargin + |create table blackhole(i int) with ('connector'='blackhole'); + |create table datagen(i int) with ( + |'connector'='datagen', + |'fields.i.kind'='sequence', + |'fields.i.start'='1', + |'fields.i.end'='10')""".stripMargin override def withKyuubiConf: Map[String, String] = { Map(