From e038f2f94c2a88b9f27a1fa8800ef6df3db1d408 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 28 Jan 2021 12:04:35 +0800 Subject: [PATCH 1/7] GetCatalogs supports DSv2 --- .../kyuubi/engine/spark/operation/GetCatalogs.scala | 12 ++++++++---- .../engine/spark/operation/GetTableTypes.scala | 3 +-- .../engine/spark/SparkSQLEngineListenerSuite.scala | 2 +- .../kyuubi/ha/client/ServiceDiscoverySuite.scala | 1 + .../kyuubi/ha/server/EmbeddedZkServerSuite.scala | 5 ++--- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala index 3485f54d757..a620163251e 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala @@ -17,7 +17,7 @@ package org.apache.kyuubi.engine.spark.operation -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.types.StructType import org.apache.kyuubi.operation.OperationType @@ -33,8 +33,12 @@ class GetCatalogs(spark: SparkSession, session: Session) } override protected def runInternal(): Unit = { - iter = Seq( - Row(spark.sessionState.catalogManager.currentCatalog.name()) - ).toList.iterator + try { + iter = try { + spark.sql("SELECT CURRENT_CATALOG()").collect().toList.toIterator + } catch { + case _: AnalysisException => Seq(Row("spark_catalog")).toIterator + } + } catch onError() } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala index 40ea77b7272..46cef7f323c 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala @@ -18,7 +18,6 @@ package org.apache.kyuubi.engine.spark.operation import org.apache.spark.sql.{Row, SparkSession} -import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.types.StructType import org.apache.kyuubi.operation.OperationType @@ -33,6 +32,6 @@ class GetTableTypes(spark: SparkSession, session: Session) } override protected def runInternal(): Unit = { - iter = CatalogTableType.tableTypes.map(t => Row(t.name)).toList.iterator + iter = Seq("EXTERNAL", "MANAGED", "VIEW").map(Row(_)).toList.iterator } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala index d65734e76ac..ba4ecf39dbb 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala @@ -36,7 +36,7 @@ class SparkSQLEngineListenerSuite extends KyuubiFunSuite { .builder().master("local").config("spark.ui.port", "0").getOrCreate() val engine = new SparkSQLEngine(spark) - engine.initialize(KyuubiConf()) + engine.initialize(KyuubiConf().set(KyuubiConf.FRONTEND_BIND_PORT, 0)) engine.start() assert(engine.getServiceState === ServiceState.STARTED) spark.stop() diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala index d68c08d6026..f3505d9f856 100644 --- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala +++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala @@ -59,6 +59,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper { .unset(KyuubiConf.SERVER_PRINCIPAL) .set(HA_ZK_QUORUM, zkServer.getConnectString) .set(HA_ZK_NAMESPACE, namespace) + .set(KyuubiConf.FRONTEND_BIND_PORT, 0) val server: Serverable = new NoopServer() server.initialize(conf) diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/server/EmbeddedZkServerSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/server/EmbeddedZkServerSuite.scala index fb1121179b3..825ee9f792e 100644 --- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/server/EmbeddedZkServerSuite.scala +++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/server/EmbeddedZkServerSuite.scala @@ -33,15 +33,14 @@ class EmbeddedZkServerSuite extends KyuubiFunSuite { assert(zkServer.getName === zkServer.getClass.getSimpleName) assert(zkServer.getServiceState === LATENT) val conf = KyuubiConf() + conf.set(KyuubiConf.EMBEDDED_ZK_PORT, 0) zkServer.stop() // only for test coverage zkServer.initialize(conf) assert(zkServer.getConf === conf) assert(zkServer.getServiceState === INITIALIZED) - assert(zkServer.getConnectString.endsWith("2181")) assert(zkServer.getStartTime === 0) zkServer.start() assert(zkServer.getServiceState === STARTED) - assert(zkServer.getConnectString.endsWith("2181")) assert(zkServer.getStartTime !== 0) zkServer.stop() assert(zkServer.getServiceState === STOPPED) @@ -50,7 +49,7 @@ class EmbeddedZkServerSuite extends KyuubiFunSuite { test("connect test with embedded zookeeper") { val zkServer = new EmbeddedZkServer() assert(zkServer.getConnectString === null) - zkServer.initialize(KyuubiConf()) + zkServer.initialize(KyuubiConf().set(KyuubiConf.EMBEDDED_ZK_PORT, 0)) zkServer.start() val zkClient = CuratorFrameworkFactory.builder() From d068dc9ac36db46ae83b33e1462ec61e357f28ec Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 28 Jan 2021 12:36:00 +0800 Subject: [PATCH 2/7] pr template --- .github/PULL_REQUEST_TEMPLATE | 28 +++++++++++++++++++--------- .github/pr-badge.yml | 4 ++-- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/.github/PULL_REQUEST_TEMPLATE b/.github/PULL_REQUEST_TEMPLATE index b8ef7e06165..2299000a1ae 100644 --- a/.github/PULL_REQUEST_TEMPLATE +++ b/.github/PULL_REQUEST_TEMPLATE @@ -1,11 +1,19 @@ + +### _Which issue are you going to fix_? + -### Please add issue ID here? - -Fixes #${issue ID} +Fixes #${ID} ### Why are the changes needed? -### Test Plan: -- Add some test cases that check the changes thoroughly including negative and positive cases if possible -- Add screenshots for manual tests if appropriate -- [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request +### How was this patch tested: +- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible + +- [ ] Add screenshots for manual tests if appropriate + +- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request diff --git a/.github/pr-badge.yml b/.github/pr-badge.yml index d211fcc8281..dc1b7def219 100644 --- a/.github/pr-badge.yml +++ b/.github/pr-badge.yml @@ -3,7 +3,7 @@ color: "green" - label: "Preview" - message: "PR $prNumber" + message: "Fixes $prNumber" color: "blue" url: "https://github.com/yaooqinn/kyuubi/pull/$prNumber" @@ -15,7 +15,7 @@ - label: "Missing" message: "Test Plan" color: "#ff0000" - when: "$payload.pull_request.body.includes('## Test Plan') === false" + when: "$payload.pull_request.body.includes('- [x]') === false" - label: "Label" message: "Feature" From 8ed3b9ce2a623f85f24bbb0870ed6cb5674a9b08 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 28 Jan 2021 12:55:35 +0800 Subject: [PATCH 3/7] nit --- .github/pr-badge.yml | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/.github/pr-badge.yml b/.github/pr-badge.yml index dc1b7def219..bac3441b824 100644 --- a/.github/pr-badge.yml +++ b/.github/pr-badge.yml @@ -3,10 +3,22 @@ color: "green" - label: "Preview" - message: "Fixes $prNumber" + message: "Closes%20#$prNumber" color: "blue" url: "https://github.com/yaooqinn/kyuubi/pull/$prNumber" +- label: "+" + message: "$additions" + color: "red" + +- label: "-" + message: "$deletions" + color: "green" + +- label: "commits" + message: "$commits" + color: "yellow" + - label: "Missing" message: "Target Issue" color: "#ff0000" From 21317a4f80e0827acc4122e607cf4e943bb71b96 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 28 Jan 2021 12:57:37 +0800 Subject: [PATCH 4/7] nit --- .github/PULL_REQUEST_TEMPLATE | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/PULL_REQUEST_TEMPLATE b/.github/PULL_REQUEST_TEMPLATE index 2299000a1ae..6ba9b7da169 100644 --- a/.github/PULL_REQUEST_TEMPLATE +++ b/.github/PULL_REQUEST_TEMPLATE @@ -6,7 +6,7 @@ Here are some tips for you: https://kyuubi.readthedocs.io/en/latest/community/contributions.html --> -### _Which issue are you going to fix_? +### _Which issue are you going to fix?_ -### How was this patch tested: + +### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate From 59c9d0c603c161725b065506618389e42e781524 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 28 Jan 2021 16:18:22 +0800 Subject: [PATCH 5/7] shim --- .../engine/spark/operation/GetCatalogs.scala | 11 +-- .../kyuubi/engine/spark/shim/Shim_v2_4.scala | 26 +++++++ .../kyuubi/engine/spark/shim/Shim_v3_0.scala | 35 +++++++++ .../kyuubi/engine/spark/shim/SparkShim.scala | 76 +++++++++++++++++++ 4 files changed, 141 insertions(+), 7 deletions(-) create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v2_4.scala create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkShim.scala diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala index a620163251e..eec959e0859 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala @@ -17,9 +17,10 @@ package org.apache.kyuubi.engine.spark.operation -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType +import org.apache.kyuubi.engine.spark.shim.SparkShim import org.apache.kyuubi.operation.OperationType import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT import org.apache.kyuubi.session.Session @@ -33,12 +34,8 @@ class GetCatalogs(spark: SparkSession, session: Session) } override protected def runInternal(): Unit = { - try { - iter = try { - spark.sql("SELECT CURRENT_CATALOG()").collect().toList.toIterator - } catch { - case _: AnalysisException => Seq(Row("spark_catalog")).toIterator - } + try { + iter = SparkShim().getCatalogs(spark).toIterator } catch onError() } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v2_4.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v2_4.scala new file mode 100644 index 00000000000..2a841b03517 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v2_4.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.shim + +import org.apache.spark.sql.{Row, SparkSession} + +class Shim_v2_4 extends SparkShim { + override def getCatalogs(ss: SparkSession): Seq[Row] = { + Seq(Row("spark_catalog")) + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala new file mode 100644 index 00000000000..3e1e3ede690 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.shim + +import org.apache.spark.sql.{Row, SparkSession} + +class Shim_v3_0 extends Shim_v2_4 { + + override def getCatalogs(ss: SparkSession): Seq[Row] = { + val sessionState = getSessionState(ss) + val catalogMgr = invoke(sessionState, "catalogManager") + // get the custom v2 session catalog or default spark_catalog + val currentCatalog = invoke(catalogMgr, "v2SessionCatalog") + val currentCatalogName = invoke(currentCatalog, "name") + .asInstanceOf[String] + val catalogs = getField(catalogMgr, "catalogs") + .asInstanceOf[scala.collection.Map[String, _]] + (catalogs.keys ++: Seq(currentCatalogName)).distinct.map(Row(_)) + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkShim.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkShim.scala new file mode 100644 index 00000000000..c4a8056d5ec --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkShim.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.shim + +import org.apache.spark.sql.{Row, SparkSession} + +import org.apache.kyuubi.Utils + +/** + * A shim that defines the interface interact with Spark's catalogs + */ +trait SparkShim { + + /** + * Get all register catalogs in Spark's `CatalogManager` + */ + def getCatalogs(ss: SparkSession): Seq[Row] + + protected def getSessionState(ss: SparkSession): Any = { + invoke(classOf[SparkSession], ss, "sessionState") + } + + protected def invoke( + obj: Any, + methodName: String, + args: (Class[_], AnyRef)*): Any = { + val (types, values) = args.unzip + val method = obj.getClass.getDeclaredMethod(methodName, types: _*) + method.setAccessible(true) + method.invoke(obj, values.toSeq: _*) + } + + protected def invoke( + clazz: Class[_], + obj: AnyRef, + methodName: String, + args: (Class[_], AnyRef)*): AnyRef = { + val (types, values) = args.unzip + val method = clazz.getDeclaredMethod(methodName, types: _*) + method.setAccessible(true) + method.invoke(obj, values.toSeq: _*) + } + + protected def getField(o: Any, fieldName: String): Any = { + val field = o.getClass.getDeclaredField(fieldName) + field.setAccessible(true) + field.get(o) + } +} + +object SparkShim { + def apply(): SparkShim = { + val runtimeSparkVer = org.apache.spark.SPARK_VERSION + val (major, minor) = Utils.majorMinorVersion(runtimeSparkVer) + (major, minor) match { + case (3, _) => new Shim_v3_0 + case (2, _) => new Shim_v2_4 + case _ => throw new IllegalArgumentException(s"Not Support spark version $runtimeSparkVer") + } + } +} From 5d6aa0b5547db6fbea73c08a05a60575664ecbbc Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 28 Jan 2021 23:35:56 +0800 Subject: [PATCH 6/7] add iceberg tests --- .../kyuubi/engine/spark/shim/Shim_v3_0.scala | 12 ++-- .../kyuubi/engine/spark/shim/SparkShim.scala | 4 +- .../org/apache/kyuubi/session/package.scala | 6 ++ kyuubi-main/pom.xml | 5 ++ .../kyuubi/session/KyuubiSessionImpl.scala | 17 +++++- .../datalake/IcebergOperationSuite.scala | 60 +++++++++++++++++++ pom.xml | 10 ++++ 7 files changed, 106 insertions(+), 8 deletions(-) create mode 100644 kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala index 3e1e3ede690..39cae2f4d59 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala @@ -23,13 +23,17 @@ class Shim_v3_0 extends Shim_v2_4 { override def getCatalogs(ss: SparkSession): Seq[Row] = { val sessionState = getSessionState(ss) + + // A [[CatalogManager]] is session unique val catalogMgr = invoke(sessionState, "catalogManager") // get the custom v2 session catalog or default spark_catalog - val currentCatalog = invoke(catalogMgr, "v2SessionCatalog") - val currentCatalogName = invoke(currentCatalog, "name") - .asInstanceOf[String] + val sessionCatalog = invoke(catalogMgr, "v2SessionCatalog") + val defaultCatalog = invoke(catalogMgr, "currentCatalog") + + val defaults = Seq(sessionCatalog, defaultCatalog).distinct + .map(invoke(_, "name").asInstanceOf[String]) val catalogs = getField(catalogMgr, "catalogs") .asInstanceOf[scala.collection.Map[String, _]] - (catalogs.keys ++: Seq(currentCatalogName)).distinct.map(Row(_)) + (catalogs.keys ++: defaults).distinct.map(Row(_)) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkShim.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkShim.scala index c4a8056d5ec..51f75d55d6d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkShim.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkShim.scala @@ -19,12 +19,12 @@ package org.apache.kyuubi.engine.spark.shim import org.apache.spark.sql.{Row, SparkSession} -import org.apache.kyuubi.Utils +import org.apache.kyuubi.{Logging, Utils} /** * A shim that defines the interface interact with Spark's catalogs */ -trait SparkShim { +trait SparkShim extends Logging { /** * Get all register catalogs in Spark's `CatalogManager` diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala index 421c0d05209..ab8fa7cc9ff 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala @@ -24,4 +24,10 @@ package object session { val HIVE_VAR_PREFIX: Regex = """set:hivevar:([^=]+)""".r val HIVE_CONF_PREFIX: Regex = """set:hiveconf:([^=]+)""".r + val ENV_PREFIX = "env:" + val SYSTEM_PREFIX = "system:" + val HIVECONF_PREFIX = "hiveconf:" + val HIVEVAR_PREFIX = "hivevar:" + val METACONF_PREFIX = "metaconf:" + } diff --git a/kyuubi-main/pom.xml b/kyuubi-main/pom.xml index 4318fa4abc1..90dbaadfbe0 100644 --- a/kyuubi-main/pom.xml +++ b/kyuubi-main/pom.xml @@ -100,6 +100,11 @@ netty test + + + org.apache.iceberg + ${iceberg.name} + diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index d6267f42da8..68d7192261b 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -51,8 +51,21 @@ class KyuubiSessionImpl( private def mergeConf(): Unit = { conf.foreach { - case (HIVE_VAR_PREFIX(key), value) => sessionConf.set(key, value) - case (HIVE_CONF_PREFIX(key), value) => sessionConf.set(key, value) + case (k, v) if k.startsWith("set:") => + val newKey = k.substring(4) + if (newKey.startsWith(SYSTEM_PREFIX)) { + sessionConf.set(newKey.substring(SYSTEM_PREFIX.length), v) + } else if (newKey.startsWith(HIVECONF_PREFIX)) { + sessionConf.set(newKey.substring(HIVECONF_PREFIX.length), v) + } else if (newKey.startsWith(HIVEVAR_PREFIX)) { + sessionConf.set(newKey.substring(HIVEVAR_PREFIX.length), v) + } else if (newKey.startsWith(METACONF_PREFIX)) { + sessionConf.set(newKey.substring(METACONF_PREFIX.length), v) + } else if (newKey.startsWith(SYSTEM_PREFIX)) { + // do nothing + } else { + sessionConf.set(k, v) + } case ("use:database", _) => case (key, value) => sessionConf.set(key, value) } diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala new file mode 100644 index 00000000000..1e633349ed9 --- /dev/null +++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation.datalake + +import org.apache.kyuubi.Utils +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.operation.{JDBCTestUtils, WithKyuubiServer} +import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT + +class IcebergOperationSuite extends WithKyuubiServer with JDBCTestUtils { + + protected def catalog: String = "hadoop_prod" + + private val iceberg: String = { + System.getProperty("java.class.path") + .split(":") + .filter(_.contains("iceberg-spark")).head + } + + private val warehouse = Utils.createTempDir() + + override def jdbcUrl: String = getJdbcUrl + + "#" + + s"spark.sql.defaultCatalog=$catalog;" + + "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;" + + "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog;" + + "spark.sql.catalog.spark_catalog.type=hive;" + + s"spark.sql.catalog.$catalog=org.apache.iceberg.spark.SparkCatalog;" + + s"spark.sql.catalog.$catalog.type=hadoop;" + + s"spark.sql.catalog.$catalog.warehouse=$warehouse;" + + s"spark.jars=$iceberg;" + + override protected val conf: KyuubiConf = KyuubiConf() + + test("get catalogs") { + withJdbcStatement() { statement => + val metaData = statement.getConnection.getMetaData + val catalogs = metaData.getCatalogs + catalogs.next() + assert(catalogs.getString(TABLE_CAT) === "spark_catalog") + catalogs.next() + assert(catalogs.getString(TABLE_CAT) === catalog) + } + } +} diff --git a/pom.xml b/pom.xml index 6f54c26e700..26f41e8f41a 100644 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,8 @@ 3.4.14 3.0.3 + iceberg-spark3-runtime + 0.11.0 UTF-8 ${project.build.directory}/scala-${scala.binary.version}/jars @@ -507,6 +509,13 @@ netty ${netty3.version} + + + org.apache.iceberg + ${iceberg.name} + ${iceberg.version} + test + @@ -815,6 +824,7 @@ 2.4.7 2.11.12 2.11 + iceberg-spark-runtime From 185ec526f4bc43c9b5f95ede0b63ca8e2b72dc79 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 29 Jan 2021 09:59:40 +0800 Subject: [PATCH 7/7] nit --- kyuubi-main/pom.xml | 1 + pom.xml | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/kyuubi-main/pom.xml b/kyuubi-main/pom.xml index 90dbaadfbe0..b265a524e99 100644 --- a/kyuubi-main/pom.xml +++ b/kyuubi-main/pom.xml @@ -104,6 +104,7 @@ org.apache.iceberg ${iceberg.name} + test diff --git a/pom.xml b/pom.xml index 26f41e8f41a..cbb22351066 100644 --- a/pom.xml +++ b/pom.xml @@ -514,7 +514,6 @@ org.apache.iceberg ${iceberg.name} ${iceberg.version} - test