Skip to content

Commit a8a232e

Browse files
jiaoqingbopan3793
authored andcommitted
[KYUUBI #3504] Extend JDBC URL to support catalog
### _Why are the changes needed?_ fix #3504 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3516 from jiaoqingbo/kyuubi3504. Closes #3504 93fba69 [jiaoqingbo] remove outdated comment af3e6a1 [jiaoqingbo] code review 894877b [jiaoqingbo] code review 856b28f [jiaoqingbo] code review 60876a9 [jiaoqingbo] [KYUUBI #3504] Extend JDBC URL to support catalog Authored-by: jiaoqingbo <1178404354@qq.com> Signed-off-by: Cheng Pan <chengpan@apache.org> (cherry picked from commit f768344) Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 2c60c09 commit a8a232e

File tree

8 files changed

+92
-27
lines changed

8 files changed

+92
-27
lines changed

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,14 @@ class FlinkSessionImpl(
5454
override def open(): Unit = {
5555
executor.openSession(handle.identifier.toString)
5656
normalizedConf.foreach {
57+
case ("use:catalog", catalog) =>
58+
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
59+
try {
60+
tableEnv.useCatalog(catalog)
61+
} catch {
62+
case NonFatal(e) =>
63+
throw e
64+
}
5765
case ("use:database", database) =>
5866
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
5967
try {

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,16 +51,20 @@ class SparkSessionImpl(
5151

5252
override def open(): Unit = {
5353
normalizedConf.foreach {
54+
case ("use:catalog", catalog) =>
55+
try {
56+
SparkCatalogShim().setCurrentCatalog(spark, catalog)
57+
} catch {
58+
case e if e.getMessage.contains("Cannot find catalog plugin class for catalog") =>
59+
warn(e.getMessage())
60+
}
5461
case ("use:database", database) =>
5562
try {
5663
SparkCatalogShim().setCurrentDatabase(spark, database)
5764
} catch {
5865
case e
5966
if database == "default" && e.getMessage != null &&
6067
e.getMessage.contains("not found") =>
61-
// use:database is from hive so the catalog is always session catalog which must have
62-
// default namespace `default`. But as spark support v2 catalog, catalog may not have
63-
// default namespace. Here we do nothing for compatible both session and v2 catalog.
6468
}
6569
case (key, value) => setModifiableConfig(key, value)
6670
}

externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@ package org.apache.kyuubi.engine.trino.session
1919

2020
import java.net.URI
2121
import java.time.ZoneId
22-
import java.util.Collections
23-
import java.util.Locale
24-
import java.util.Optional
22+
import java.util.{Collections, Locale, Optional}
2523
import java.util.concurrent.TimeUnit
2624

2725
import io.airlift.units.Duration
@@ -36,8 +34,7 @@ import org.apache.kyuubi.engine.trino.{TrinoConf, TrinoContext, TrinoStatement}
3634
import org.apache.kyuubi.engine.trino.event.TrinoSessionEvent
3735
import org.apache.kyuubi.events.EventBus
3836
import org.apache.kyuubi.operation.{Operation, OperationHandle}
39-
import org.apache.kyuubi.session.AbstractSession
40-
import org.apache.kyuubi.session.SessionManager
37+
import org.apache.kyuubi.session.{AbstractSession, SessionManager}
4138

4239
class TrinoSessionImpl(
4340
protocol: TProtocolVersion,
@@ -50,32 +47,38 @@ class TrinoSessionImpl(
5047

5148
var trinoContext: TrinoContext = _
5249
private var clientSession: ClientSession = _
50+
private var catalogName: String = null
51+
private var databaseName: String = null
5352

5453
private val sessionEvent = TrinoSessionEvent(this)
5554

5655
override def open(): Unit = {
5756
normalizedConf.foreach {
58-
case ("use:database", database) => clientSession = createClientSession(database)
57+
case ("use:catalog", catalog) => catalogName = catalog
58+
case ("use:database", database) => databaseName = database
5959
case _ => // do nothing
6060
}
6161

6262
val httpClient = new OkHttpClient.Builder().build()
6363

64-
if (clientSession == null) {
65-
clientSession = createClientSession()
66-
}
64+
clientSession = createClientSession()
6765
trinoContext = TrinoContext(httpClient, clientSession)
6866

6967
super.open()
7068
EventBus.post(sessionEvent)
7169
}
7270

73-
private def createClientSession(schema: String = null): ClientSession = {
71+
private def createClientSession(): ClientSession = {
7472
val sessionConf = sessionManager.getConf
7573
val connectionUrl = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_URL).getOrElse(
7674
throw KyuubiSQLException("Trino server url can not be null!"))
77-
val catalog = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_CATALOG).getOrElse(
78-
throw KyuubiSQLException("Trino default catalog can not be null!"))
75+
76+
if (catalogName == null) {
77+
catalogName = sessionConf.get(
78+
KyuubiConf.ENGINE_TRINO_CONNECTION_CATALOG).getOrElse(
79+
throw KyuubiSQLException("Trino default catalog can not be null!"))
80+
}
81+
7982
val user = sessionConf
8083
.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY).getOrElse(currentUser)
8184
val clientRequestTimeout = sessionConf.get(TrinoConf.CLIENT_REQUEST_TIMEOUT)
@@ -88,8 +91,8 @@ class TrinoSessionImpl(
8891
Optional.empty(),
8992
Collections.emptySet(),
9093
null,
91-
catalog,
92-
schema,
94+
catalogName,
95+
databaseName,
9396
null,
9497
ZoneId.systemDefault(),
9598
Locale.getDefault,

kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ public class JdbcConnectionParams {
2727
// Prefer using a shorter camelCase param name instead of using the same name as the
2828
// corresponding
2929
// HiveServer2 config.
30-
// For a jdbc url: jdbc:hive2://<host>:<port>/dbName;sess_var_list?hive_conf_list#hive_var_list,
30+
// For a jdbc url:
31+
// jdbc:hive2://<host>:<port>/catalogName/dbName;sess_var_list?hive_conf_list#hive_var_list,
3132
// client side params are specified in sess_var_list
3233

3334
// Client param names:
@@ -115,6 +116,7 @@ public class JdbcConnectionParams {
115116
private String host = null;
116117
private int port = 0;
117118
private String jdbcUriString;
119+
private String catalogName;
118120
private String dbName = Utils.DEFAULT_DATABASE;
119121
private Map<String, String> hiveConfs = new LinkedHashMap<>();
120122
private Map<String, String> hiveVars = new LinkedHashMap<>();
@@ -130,6 +132,7 @@ public JdbcConnectionParams(JdbcConnectionParams params) {
130132
this.host = params.host;
131133
this.port = params.port;
132134
this.jdbcUriString = params.jdbcUriString;
135+
this.catalogName = params.catalogName;
133136
this.dbName = params.dbName;
134137
this.hiveConfs.putAll(params.hiveConfs);
135138
this.hiveVars.putAll(params.hiveVars);
@@ -152,6 +155,10 @@ public String getJdbcUriString() {
152155
return jdbcUriString;
153156
}
154157

158+
public String getCatalogName() {
159+
return catalogName;
160+
}
161+
155162
public String getDbName() {
156163
return dbName;
157164
}
@@ -196,6 +203,10 @@ public void setJdbcUriString(String jdbcUriString) {
196203
this.jdbcUriString = jdbcUriString;
197204
}
198205

206+
public void setCatalogName(String catalogName) {
207+
this.catalogName = catalogName;
208+
}
209+
199210
public void setDbName(String dbName) {
200211
this.dbName = dbName;
201212
}

kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,10 @@ private void openSession() throws SQLException {
702702
for (Entry<String, String> hiveVar : connParams.getHiveVars().entrySet()) {
703703
openConf.put("set:hivevar:" + hiveVar.getKey(), hiveVar.getValue());
704704
}
705+
// switch the catalog
706+
if (connParams.getCatalogName() != null) {
707+
openConf.put("use:catalog", connParams.getCatalogName());
708+
}
705709
// switch the database
706710
openConf.put("use:database", connParams.getDbName());
707711
// set the fetchSize

kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/Utils.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,14 +152,27 @@ public static JdbcConnectionParams extractURLComponents(String uri, Properties i
152152
String sessVars = jdbcURI.getPath();
153153
if ((sessVars != null) && !sessVars.isEmpty()) {
154154
String dbName = "";
155+
String catalogName = "";
155156
// removing leading '/' returned by getPath()
156157
sessVars = sessVars.substring(1);
157158
if (!sessVars.contains(";")) {
158-
// only dbname is provided
159-
dbName = sessVars;
159+
if (sessVars.contains("/")) {
160+
catalogName = sessVars.substring(0, sessVars.indexOf('/'));
161+
dbName = sessVars.substring(sessVars.indexOf('/') + 1);
162+
} else {
163+
// only dbname is provided
164+
dbName = sessVars;
165+
}
160166
} else {
161167
// we have dbname followed by session parameters
162-
dbName = sessVars.substring(0, sessVars.indexOf(';'));
168+
String catalogAndDb = sessVars.substring(0, sessVars.indexOf(';'));
169+
if (catalogAndDb.contains("/")) {
170+
catalogName = catalogAndDb.substring(0, catalogAndDb.indexOf('/'));
171+
dbName = catalogAndDb.substring(catalogAndDb.indexOf('/') + 1);
172+
} else {
173+
// only dbname is provided
174+
dbName = catalogAndDb;
175+
}
163176
sessVars = sessVars.substring(sessVars.indexOf(';') + 1);
164177
Matcher sessMatcher = pattern.matcher(sessVars);
165178
while (sessMatcher.find()) {
@@ -169,6 +182,9 @@ public static JdbcConnectionParams extractURLComponents(String uri, Properties i
169182
}
170183
}
171184
}
185+
if (!catalogName.isEmpty()) {
186+
connParams.setCatalogName(catalogName);
187+
}
172188
if (!dbName.isEmpty()) {
173189
connParams.setDbName(dbName);
174190
}

kyuubi-hive-jdbc/src/test/java/org/apache/kyuubi/jdbc/hive/UtilsTest.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,23 +33,39 @@ public class UtilsTest {
3333

3434
private String expectedHost;
3535
private String expectedPort;
36+
private String expectedCatalog;
37+
private String expectedDb;
3638
private String uri;
3739

3840
@Parameterized.Parameters
3941
public static Collection<String[]> data() {
4042
return Arrays.asList(
4143
new String[][] {
42-
{"localhost", "10009", "jdbc:hive2:///db;k1=v1?k2=v2#k3=v3"},
43-
{"localhost", "10009", "jdbc:hive2:///"},
44-
{"localhost", "10009", "jdbc:kyuubi://"},
45-
{"localhost", "10009", "jdbc:hive2://"},
46-
{"hostname", "10018", "jdbc:hive2://hostname:10018/db;k1=v1?k2=v2#k3=v3"}
44+
{"localhost", "10009", null, "db", "jdbc:hive2:///db;k1=v1?k2=v2#k3=v3"},
45+
{"localhost", "10009", null, "default", "jdbc:hive2:///"},
46+
{"localhost", "10009", null, "default", "jdbc:kyuubi://"},
47+
{"localhost", "10009", null, "default", "jdbc:hive2://"},
48+
{"hostname", "10018", null, "db", "jdbc:hive2://hostname:10018/db;k1=v1?k2=v2#k3=v3"},
49+
{
50+
"hostname",
51+
"10018",
52+
"catalog",
53+
"db",
54+
"jdbc:hive2://hostname:10018/catalog/db;k1=v1?k2=v2#k3=v3"
55+
}
4756
});
4857
}
4958

50-
public UtilsTest(String expectedHost, String expectedPort, String uri) {
59+
public UtilsTest(
60+
String expectedHost,
61+
String expectedPort,
62+
String expectedCatalog,
63+
String expectedDb,
64+
String uri) {
5165
this.expectedHost = expectedHost;
5266
this.expectedPort = expectedPort;
67+
this.expectedCatalog = expectedCatalog;
68+
this.expectedDb = expectedDb;
5369
this.uri = uri;
5470
}
5571

@@ -58,5 +74,7 @@ public void testExtractURLComponents() throws JdbcUriParseException {
5874
JdbcConnectionParams jdbcConnectionParams1 = extractURLComponents(uri, new Properties());
5975
assertEquals(expectedHost, jdbcConnectionParams1.getHost());
6076
assertEquals(Integer.parseInt(expectedPort), jdbcConnectionParams1.getPort());
77+
assertEquals(expectedCatalog, jdbcConnectionParams1.getCatalogName());
78+
assertEquals(expectedDb, jdbcConnectionParams1.getDbName());
6179
}
6280
}

kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ class KyuubiSessionImpl(
6363

6464
// TODO: needs improve the hardcode
6565
optimizedConf.foreach {
66+
case ("use:catalog", _) =>
6667
case ("use:database", _) =>
6768
case ("kyuubi.engine.pool.size.threshold", _) =>
6869
case (key, value) => sessionConf.set(key, value)

0 commit comments

Comments
 (0)