Skip to content

Commit 68f70fd

Browse files
SteNicholasyaooqinn
authored andcommitted
[KYUUBI #1685] Support jar and lib start option
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Support jar and lib start option. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1818 from SteNicholas/KYUUBI-1685. Closes #1685 dd5a704 [SteNicholas] [KYUUBI #1685] Support jar and lib start option Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: Kent Yao <yao@apache.org>
1 parent ef8de37 commit 68f70fd

File tree

6 files changed

+215
-19
lines changed

6 files changed

+215
-19
lines changed

externals/kyuubi-flink-sql-engine/pom.xml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -198,11 +198,12 @@
198198
</includes>
199199
</relocation>
200200
<relocation>
201-
<pattern>org.apache.commons</pattern>
202-
<shadedPattern>${kyuubi.shade.packageName}.org.apache.commons</shadedPattern>
203-
<includes>
204-
<include>org.apache.commons.**</include>
205-
</includes>
201+
<pattern>org.apache.commons.codec</pattern>
202+
<shadedPattern>${kyuubi.shade.packageName}.org.apache.commons.codec</shadedPattern>
203+
</relocation>
204+
<relocation>
205+
<pattern>org.apache.commons.lang</pattern>
206+
<shadedPattern>${kyuubi.shade.packageName}.org.apache.commons.lang</shadedPattern>
206207
</relocation>
207208
<relocation>
208209
<pattern>io.netty</pattern>

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala

Lines changed: 67 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,17 @@
1717

1818
package org.apache.kyuubi.engine.flink
1919

20+
import java.io.File
21+
import java.net.URL
22+
23+
import scala.collection.JavaConverters._
24+
25+
import org.apache.commons.cli.{CommandLine, DefaultParser, Option, Options, ParseException}
26+
import org.apache.flink.core.fs.Path
2027
import org.apache.flink.runtime.util.EnvironmentInformation
2128
import org.apache.flink.table.client.SqlClientException
22-
import org.apache.flink.table.client.cli.{CliOptions, CliOptionsParser}
29+
import org.apache.flink.table.client.cli.CliOptions
30+
import org.apache.flink.table.client.cli.CliOptionsParser._
2331
import org.apache.flink.table.client.gateway.context.SessionContext
2432
import org.apache.flink.table.client.gateway.local.LocalExecutor
2533

@@ -28,6 +36,7 @@ import org.apache.kyuubi.Logging
2836
object FlinkEngineUtils extends Logging {
2937

3038
val MODE_EMBEDDED = "embedded"
39+
val EMBEDDED_MODE_CLIENT_OPTIONS: Options = getEmbeddedModeClientOptions(new Options);
3140

3241
def checkFlinkVersion(): Unit = {
3342
val flinkVersion = EnvironmentInformation.getVersion
@@ -40,19 +49,69 @@ object FlinkEngineUtils extends Logging {
4049
val (mode, modeArgs) =
4150
if (args.isEmpty || args(0).startsWith("-")) (MODE_EMBEDDED, args)
4251
else (args(0), args.drop(1))
43-
// TODO remove requirement of flink-python
44-
val options = CliOptionsParser.parseEmbeddedModeClient(modeArgs)
45-
mode match {
46-
case MODE_EMBEDDED if options.isPrintHelp => CliOptionsParser.printHelpEmbeddedModeClient()
47-
case MODE_EMBEDDED =>
48-
case _ => throw new SqlClientException("Other mode is not supported yet.")
52+
val options = parseEmbeddedModeClient(modeArgs)
53+
if (mode == MODE_EMBEDDED) {
54+
if (options.isPrintHelp) {
55+
printHelpEmbeddedModeClient()
56+
}
57+
options
58+
} else {
59+
throw new SqlClientException("Other mode is not supported yet.")
4960
}
50-
options
5161
}
5262

5363
def getSessionContext(localExecutor: LocalExecutor, sessionId: String): SessionContext = {
5464
val method = classOf[LocalExecutor].getDeclaredMethod("getSessionContext", classOf[String])
5565
method.setAccessible(true)
5666
method.invoke(localExecutor, sessionId).asInstanceOf[SessionContext]
5767
}
68+
69+
def parseEmbeddedModeClient(args: Array[String]): CliOptions =
70+
try {
71+
val parser = new DefaultParser
72+
val line = parser.parse(EMBEDDED_MODE_CLIENT_OPTIONS, args, true)
73+
val jarUrls = checkUrls(line, OPTION_JAR)
74+
val libraryUrls = checkUrls(line, OPTION_LIBRARY)
75+
new CliOptions(
76+
line.hasOption(OPTION_HELP.getOpt),
77+
checkSessionId(line),
78+
checkUrl(line, OPTION_INIT_FILE),
79+
checkUrl(line, OPTION_FILE),
80+
if (jarUrls != null && jarUrls.nonEmpty) jarUrls.asJava else null,
81+
if (libraryUrls != null && libraryUrls.nonEmpty) libraryUrls.asJava else null,
82+
line.getOptionValue(OPTION_UPDATE.getOpt),
83+
line.getOptionValue(OPTION_HISTORY.getOpt),
84+
null)
85+
} catch {
86+
case e: ParseException =>
87+
throw new SqlClientException(e.getMessage)
88+
}
89+
90+
def checkSessionId(line: CommandLine): String = {
91+
val sessionId = line.getOptionValue(OPTION_SESSION.getOpt)
92+
if (sessionId != null && !sessionId.matches("[a-zA-Z0-9_\\-.]+")) {
93+
throw new SqlClientException("Session identifier must only consists of 'a-zA-Z0-9_-.'.")
94+
} else sessionId
95+
}
96+
97+
def checkUrl(line: CommandLine, option: Option): URL = {
98+
val urls: List[URL] = checkUrls(line, option)
99+
if (urls != null && urls.nonEmpty) urls.head
100+
else null
101+
}
102+
103+
def checkUrls(line: CommandLine, option: Option): List[URL] = {
104+
if (line.hasOption(option.getOpt)) {
105+
line.getOptionValues(option.getOpt).distinct.map((url: String) => {
106+
checkFilePath(url)
107+
try Path.fromLocalFile(new File(url).getAbsoluteFile).toUri.toURL
108+
catch {
109+
case e: Exception =>
110+
throw new SqlClientException(
111+
"Invalid path for option '" + option.getLongOpt + "': " + url,
112+
e)
113+
}
114+
}).toList
115+
} else null
116+
}
58117
}

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,24 @@
1717

1818
package org.apache.kyuubi.engine.flink
1919

20+
import java.io.File
21+
import java.net.URL
2022
import java.time.Instant
2123
import java.util.concurrent.CountDownLatch
2224

2325
import scala.collection.JavaConverters._
26+
import scala.collection.mutable.ListBuffer
2427

2528
import org.apache.flink.client.cli.{CliFrontend, CustomCommandLine, DefaultCLI}
26-
import org.apache.flink.configuration.{DeploymentOptions, GlobalConfiguration}
29+
import org.apache.flink.configuration.DeploymentOptions
30+
import org.apache.flink.configuration.GlobalConfiguration
31+
import org.apache.flink.table.client.SqlClientException
2732
import org.apache.flink.table.client.gateway.context.DefaultContext
33+
import org.apache.flink.util.JarUtils
2834

29-
import org.apache.kyuubi.Logging
30-
import org.apache.kyuubi.Utils.{addShutdownHook, currentUser, FLINK_ENGINE_SHUTDOWN_PRIORITY}
35+
import org.apache.kyuubi.{KyuubiSQLException, Logging}
36+
import org.apache.kyuubi.Utils.{addShutdownHook, FLINK_ENGINE_SHUTDOWN_PRIORITY}
37+
import org.apache.kyuubi.Utils.currentUser
3138
import org.apache.kyuubi.config.KyuubiConf
3239
import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, currentEngine}
3340
import org.apache.kyuubi.service.Serverable
@@ -93,8 +100,13 @@ object FlinkSQLEngine extends Logging {
93100
debug(s"Skip generating app name for execution target $other")
94101
}
95102

103+
val cliOptions = FlinkEngineUtils.parseCliOptions(args)
104+
val jars = if (cliOptions.getJars != null) cliOptions.getJars.asScala else List.empty
105+
val libDirs =
106+
if (cliOptions.getLibraryDirs != null) cliOptions.getLibraryDirs.asScala else List.empty
107+
val dependencies = discoverDependencies(jars, libDirs)
96108
val engineContext = new DefaultContext(
97-
List.empty.asJava,
109+
dependencies.asJava,
98110
flinkConf,
99111
List[CustomCommandLine](new DefaultCLI).asJava)
100112

@@ -124,4 +136,37 @@ object FlinkSQLEngine extends Logging {
124136
addShutdownHook(() => engine.stop(), FLINK_ENGINE_SHUTDOWN_PRIORITY + 1)
125137
}
126138
}
139+
140+
private def discoverDependencies(
141+
jars: Seq[URL],
142+
libraries: Seq[URL]): List[URL] = {
143+
try {
144+
var dependencies: ListBuffer[URL] = ListBuffer()
145+
// find jar files
146+
jars.foreach { url =>
147+
JarUtils.checkJarFile(url)
148+
dependencies = dependencies += url
149+
}
150+
// find jar files in library directories
151+
libraries.foreach { libUrl =>
152+
val dir: File = new File(libUrl.toURI)
153+
if (!dir.isDirectory) throw new SqlClientException("Directory expected: " + dir)
154+
else if (!dir.canRead) throw new SqlClientException("Directory cannot be read: " + dir)
155+
val files: Array[File] = dir.listFiles
156+
if (files == null) throw new SqlClientException("Directory cannot be read: " + dir)
157+
files.foreach { f =>
158+
// only consider jars
159+
if (f.isFile && f.getAbsolutePath.toLowerCase.endsWith(".jar")) {
160+
val url: URL = f.toURI.toURL
161+
JarUtils.checkJarFile(url)
162+
dependencies = dependencies += url
163+
}
164+
}
165+
}
166+
dependencies.toList
167+
} catch {
168+
case e: Exception =>
169+
throw KyuubiSQLException(s"Could not load all required JAR files.", e)
170+
}
171+
}
127172
}

externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngine.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.kyuubi.engine.flink
1919

20+
import java.nio.file.Files
21+
2022
import scala.collection.JavaConverters._
2123

2224
import org.apache.flink.client.cli.{CustomCommandLine, DefaultCLI}
@@ -26,6 +28,7 @@ import org.apache.flink.table.client.gateway.context.DefaultContext
2628

2729
import org.apache.kyuubi.KyuubiFunSuite
2830
import org.apache.kyuubi.config.KyuubiConf
31+
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
2932

3033
trait WithFlinkSQLEngine extends KyuubiFunSuite {
3134

@@ -38,6 +41,17 @@ trait WithFlinkSQLEngine extends KyuubiFunSuite {
3841

3942
protected var connectionUrl: String = _
4043

44+
protected val GENERATED_UDF_CLASS: String = "LowerUDF"
45+
46+
protected val GENERATED_UDF_CODE: String =
47+
s"""
48+
public class $GENERATED_UDF_CLASS extends org.apache.flink.table.functions.ScalarFunction {
49+
public String eval(String str) {
50+
return str.toLowerCase();
51+
}
52+
}
53+
"""
54+
4155
override def beforeAll(): Unit = {
4256
startMiniCluster()
4357
startFlinkEngine()
@@ -55,8 +69,13 @@ trait WithFlinkSQLEngine extends KyuubiFunSuite {
5569
System.setProperty(k, v)
5670
kyuubiConf.set(k, v)
5771
}
72+
val udfJar = TestUserClassLoaderJar.createJarFile(
73+
Files.createTempDirectory("test-jar").toFile,
74+
"test-classloader-udf.jar",
75+
GENERATED_UDF_CLASS,
76+
GENERATED_UDF_CODE)
5877
val engineContext = new DefaultContext(
59-
List.empty.asJava,
78+
List(udfJar.toURI.toURL).asJava,
6079
flinkConfig,
6180
List[CustomCommandLine](new DefaultCLI).asJava)
6281
FlinkSQLEngine.startEngine(engineContext)

externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -717,11 +717,21 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
717717
// thus read all rows to find the desired one
718718
var success = false
719719
while (resultSet.next()) {
720-
if (resultSet.getString(1) == "pipeline.jars" && resultSet.getString(2) == "") {
720+
if (resultSet.getString(1) == "pipeline.jars" &&
721+
!resultSet.getString(2).contains("my.jar")) {
721722
success = true
722723
}
723724
}
724725
assert(success)
725726
})
726727
}
728+
729+
test("execute statement - select udf") {
730+
withJdbcStatement() { statement =>
731+
statement.execute(s"create function $GENERATED_UDF_CLASS AS '$GENERATED_UDF_CLASS'")
732+
val resultSet = statement.executeQuery(s"select $GENERATED_UDF_CLASS('A')")
733+
assert(resultSet.next())
734+
assert(resultSet.getString(1) === "a")
735+
}
736+
}
727737
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.flink.util
19+
20+
import java.io.{File, FileOutputStream}
21+
import java.nio.file.Paths
22+
import java.util.Collections
23+
import java.util.jar.{JarEntry, JarOutputStream}
24+
import javax.tools.{DiagnosticCollector, JavaFileObject, ToolProvider}
25+
26+
import scala.collection.JavaConverters._
27+
28+
import org.apache.flink.util.FileUtils
29+
30+
object TestUserClassLoaderJar {
31+
32+
/**
33+
* Pack the generated UDF class into a JAR and return the path of the JAR.
34+
*/
35+
def createJarFile(tmpDir: File, jarName: String, className: String, javaCode: String): File = {
36+
// write class source code to file
37+
val javaFile = Paths.get(tmpDir.toString, className + ".java").toFile
38+
// no inspection ResultOfMethodCallIgnored
39+
javaFile.createNewFile
40+
FileUtils.writeFileUtf8(javaFile, javaCode)
41+
// compile class source code
42+
val diagnostics = new DiagnosticCollector[JavaFileObject]
43+
val compiler = ToolProvider.getSystemJavaCompiler
44+
val fileManager = compiler.getStandardFileManager(diagnostics, null, null)
45+
val compilationUnit =
46+
fileManager.getJavaFileObjectsFromFiles(Collections.singletonList(javaFile))
47+
val task =
48+
compiler.getTask(null, fileManager, diagnostics, List.empty.asJava, null, compilationUnit)
49+
task.call
50+
// pack class file to jar
51+
val classFile = Paths.get(tmpDir.toString, className + ".class").toFile
52+
val jarFile = Paths.get(tmpDir.toString, jarName).toFile
53+
val jos = new JarOutputStream(new FileOutputStream(jarFile))
54+
val jarEntry = new JarEntry(className + ".class")
55+
jos.putNextEntry(jarEntry)
56+
val classBytes = FileUtils.readAllBytes(classFile.toPath)
57+
jos.write(classBytes)
58+
jos.closeEntry()
59+
jos.close()
60+
jarFile
61+
}
62+
}

0 commit comments

Comments
 (0)