Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33084][CORE][SQL] Add jar support ivy path #29966

Closed
wants to merge 57 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
afaf7bd
[SPARK-33084][CORE][SQL]Add jar support ivy path
AngersZhuuuu Oct 7, 2020
51daf9a
Merge branch 'master' into support-add-jar-ivy
AngersZhuuuu Oct 7, 2020
3579de0
Update SparkContextSuite.scala
AngersZhuuuu Oct 7, 2020
d6e8caf
Merge branch 'support-add-jar-ivy' of https://github.com/AngersZhuuuu…
AngersZhuuuu Oct 7, 2020
169e1f8
Update Utils.scala
AngersZhuuuu Oct 8, 2020
0e589ec
Merge branch 'master' into support-add-jar-ivy
AngersZhuuuu Nov 21, 2020
b3e3211
resolve conflict
AngersZhuuuu Nov 23, 2020
9161340
Update SessionState.scala
AngersZhuuuu Nov 23, 2020
0e3c1ec
Update sql-ref-syntax-aux-resource-mgmt-add-jar.md
AngersZhuuuu Nov 24, 2020
300ca56
follow comment
AngersZhuuuu Nov 24, 2020
63e877b
https://github.com/apache/spark/pull/29966#discussion_r529242208
AngersZhuuuu Nov 24, 2020
733e62c
follow comment
AngersZhuuuu Nov 24, 2020
b60ba1e
Update sql-ref-syntax-aux-resource-mgmt-add-jar.md
AngersZhuuuu Nov 24, 2020
883b9d3
fix uri re-use
AngersZhuuuu Nov 28, 2020
208afc2
follow comment
AngersZhuuuu Nov 28, 2020
ba9ea29
add warn message whe multiple trasitive
AngersZhuuuu Nov 28, 2020
10b3737
move DependencyUtils
AngersZhuuuu Nov 28, 2020
7f878c2
add ut
AngersZhuuuu Nov 28, 2020
d2c1950
Update SessionState.scala
AngersZhuuuu Nov 28, 2020
2200076
Update SessionState.scala
AngersZhuuuu Nov 28, 2020
5a9cc30
Update DependencyUtils.scala
AngersZhuuuu Nov 29, 2020
875d8a7
Add end to end test
AngersZhuuuu Nov 29, 2020
e921245
Update SQLQuerySuite.scala
AngersZhuuuu Nov 29, 2020
614a865
follow comment
AngersZhuuuu Nov 30, 2020
8c5cb7c
Update SparkContext.scala
AngersZhuuuu Nov 30, 2020
f460974
fix local path with comma
AngersZhuuuu Nov 30, 2020
1f7dc01
Merge branch 'support-add-jar-ivy' of https://github.com/AngersZhuuuu…
AngersZhuuuu Dec 1, 2020
050c410
follow comment
AngersZhuuuu Dec 1, 2020
ff611a6
Update SessionState.scala
AngersZhuuuu Dec 1, 2020
03aca3b
split UT and use simply dependency ivy path
AngersZhuuuu Dec 1, 2020
653b919
Update DependencyUtils.scala
AngersZhuuuu Dec 1, 2020
6e48275
Update SparkContext.scala
AngersZhuuuu Dec 1, 2020
bdc5035
follow comment
AngersZhuuuu Dec 2, 2020
9c22882
follow comment
AngersZhuuuu Dec 2, 2020
9c88f8d
follow comment
AngersZhuuuu Dec 2, 2020
8220e5a
Update SparkContextSuite.scala
AngersZhuuuu Dec 2, 2020
49ac62c
follow comment
AngersZhuuuu Dec 2, 2020
b69a62e
Update DependencyUtils.scala
AngersZhuuuu Dec 2, 2020
273a5ac
Follow comment
AngersZhuuuu Dec 3, 2020
ebe1c9c
Update DependencyUtils.scala
AngersZhuuuu Dec 4, 2020
6034fb2
Update sql-ref-syntax-aux-resource-mgmt-add-jar.md
AngersZhuuuu Dec 5, 2020
e22e398
Update SparkContext.scala
AngersZhuuuu Dec 7, 2020
afea73f
Update SparkContext.scala
AngersZhuuuu Dec 7, 2020
13000f2
Merge branch 'master' into support-add-jar-ivy
AngersZhuuuu Dec 14, 2020
bce3d40
Update SparkContext.scala
AngersZhuuuu Dec 14, 2020
d53f302
Merge branch 'support-add-jar-ivy' of https://github.com/AngersZhuuuu…
AngersZhuuuu Dec 14, 2020
57c351d
Update HiveQuerySuite.scala
AngersZhuuuu Dec 15, 2020
8c53b83
follow comment
AngersZhuuuu Dec 22, 2020
4048c5b
https://github.com/apache/spark/pull/29966#discussion_r547040115
AngersZhuuuu Dec 22, 2020
aa53482
Merge branch 'master' into support-add-jar-ivy
AngersZhuuuu Dec 22, 2020
2ffb431
Update SQLQuerySuite.scala
AngersZhuuuu Dec 22, 2020
8c18cdf
Update SparkContext.scala
AngersZhuuuu Dec 23, 2020
6bd41cd
Update SparkSubmit.scala
AngersZhuuuu Dec 23, 2020
fbc236c
follwo comment
AngersZhuuuu Dec 23, 2020
90491d5
Update DependencyUtils.scala
AngersZhuuuu Dec 23, 2020
75ff3ce
Update SparkContextSuite.scala
AngersZhuuuu Dec 23, 2020
4c44dae
follow comment remove default value
AngersZhuuuu Dec 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 28 additions & 17 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -1929,7 +1929,7 @@ class SparkContext(config: SparkConf) extends Logging {
}

private def addJar(path: String, addedOnSubmit: Boolean): Unit = {
def addLocalJarFile(file: File): String = {
def addLocalJarFile(file: File): Seq[String] = {
try {
if (!file.exists()) {
throw new FileNotFoundException(s"Jar ${file.getAbsolutePath} not found")
Expand All @@ -1938,15 +1938,15 @@ class SparkContext(config: SparkConf) extends Logging {
throw new IllegalArgumentException(
s"Directory ${file.getAbsoluteFile} is not allowed for addJar")
}
env.rpcEnv.fileServer.addJar(file)
Seq(env.rpcEnv.fileServer.addJar(file))
} catch {
case NonFatal(e) =>
logError(s"Failed to add $path to Spark environment", e)
null
Nil
}
}

def checkRemoteJarFile(path: String): String = {
def checkRemoteJarFile(path: String): Seq[String] = {
val hadoopPath = new Path(path)
val scheme = hadoopPath.toUri.getScheme
if (!Array("http", "https", "ftp").contains(scheme)) {
Expand All @@ -1959,47 +1959,58 @@ class SparkContext(config: SparkConf) extends Logging {
throw new IllegalArgumentException(
s"Directory ${path} is not allowed for addJar")
}
path
Seq(path)
} catch {
case NonFatal(e) =>
logError(s"Failed to add $path to Spark environment", e)
null
Nil
}
} else {
path
Seq(path)
}
}

if (path == null || path.isEmpty) {
logWarning("null or empty path specified as parameter to addJar")
} else {
val key = if (path.contains("\\") && Utils.isWindows) {
val (keys, scheme) = if (path.contains("\\") && Utils.isWindows) {
// For local paths with backslashes on Windows, URI throws an exception
addLocalJarFile(new File(path))
(addLocalJarFile(new File(path)), "local")
} else {
val uri = new Path(path).toUri
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
Utils.validateURL(uri)
uri.getScheme match {
val uriScheme = uri.getScheme
val jarPaths = uriScheme match {
// A JAR file which exists only on the driver node
case null =>
// SPARK-22585 path without schema is not url encoded
addLocalJarFile(new File(uri.getPath))
// A JAR file which exists only on the driver node
case "file" => addLocalJarFile(new File(uri.getPath))
// A JAR file which exists locally on every worker node
case "local" => "file:" + uri.getPath
case "local" => Seq("file:" + uri.getPath)
case "ivy" =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu, out of curiosity, is the Ivy URI the standard form documented somewhere? or something specific to Spark that you came up with?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu, out of curiosity, is the Ivy URI the standard form documented somewhere? or something specific to Spark that you came up with?

From hive https://issues.apache.org/jira/browse/HIVE-9664, since it download jar use ivy then use schema as ivy? I think this useful for a lot of companies that have standard package management, so I implemented it in Spark

// Since `new Path(path).toUri` will lose query information,
// so here we use `URI.create(path)`
DependencyUtils.resolveMavenDependencies(URI.create(path))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if two added jars have the same dependency with different versions? e.g.,

sc.addJar("ivy://lib1:1.0?transitive=true") // --> it depends on `libX v1.0`
sc.addJar("ivy://lib2:1.0?transitive=true") // --> it depends on `libX v2.0`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.flatMap(jar => addLocalJarFile(new File(jar)))
case _ => checkRemoteJarFile(path)
}
(jarPaths, uriScheme)
}
if (key != null) {
if (keys.nonEmpty) {
val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
logInfo(s"Added JAR $path at $key with timestamp $timestamp")
val (added, existed) = keys.partition(addedJars.putIfAbsent(_, timestamp).isEmpty)
if (added.nonEmpty) {
val jarMessage = if (scheme != "ivy") "JAR" else "dependency jars of Ivy URI"
logInfo(s"Added $jarMessage $path at ${added.mkString(",")} with timestamp $timestamp")
postEnvironmentUpdate()
} else {
logWarning(s"The jar $path has been added already. Overwriting of added jars " +
"is not supported in the current version.")
}
if (existed.nonEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests to check if this warning message is shown only once by using LogAppender?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests to check if this warning message is shown only once by using LogAppender?

Sure

val jarMessage = if (scheme != "ivy") "JAR" else "dependency jars of Ivy URI"
logInfo(s"The $jarMessage $path at ${existed.mkString(",")} has been added already." +
" Overwriting of added jar is not supported in the current version.")
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Expand Up @@ -304,8 +304,8 @@ private[spark] class SparkSubmit extends Logging {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath,
args.ivySettingsPath)
packagesTransitive = true, args.packagesExclusions, args.packages,
args.repositories, args.ivyRepoPath, args.ivySettingsPath)

if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
// In K8s client mode, when in the driver, add resolved jars early as we might need
Expand Down Expand Up @@ -1360,13 +1360,15 @@ private[spark] object SparkSubmitUtils {
* Resolves any dependencies that were supplied through maven coordinates
* @param coordinates Comma-delimited string of maven coordinates
* @param ivySettings An IvySettings containing resolvers to use
* @param transitive Whether resolving transitive dependencies, default is true
* @param exclusions Exclusions to apply when resolving transitive dependencies
* @return The comma-delimited path to the jars of the given maven artifacts including their
* transitive dependencies
*/
def resolveMavenCoordinates(
coordinates: String,
ivySettings: IvySettings,
transitive: Boolean,
exclusions: Seq[String] = Nil,
isTest: Boolean = false): String = {
if (coordinates == null || coordinates.trim.isEmpty) {
Expand Down Expand Up @@ -1396,7 +1398,7 @@ private[spark] object SparkSubmitUtils {
val ivy = Ivy.newInstance(ivySettings)
// Set resolve options to download transitive dependencies as well
val resolveOptions = new ResolveOptions
resolveOptions.setTransitive(true)
resolveOptions.setTransitive(transitive)
val retrieveOptions = new RetrieveOptions
// Turn downloading and logging off for testing
if (isTest) {
Expand Down
Expand Up @@ -22,7 +22,7 @@ import java.io.File
import org.apache.commons.lang3.StringUtils

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util._
Expand Down Expand Up @@ -79,17 +79,11 @@ object DriverWrapper extends Logging {
val secMgr = new SecurityManager(sparkConf)
val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)

val Seq(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath) =
Seq(
"spark.jars.excludes",
"spark.jars.packages",
"spark.jars.repositories",
"spark.jars.ivy",
"spark.jars.ivySettings"
).map(sys.props.get(_).orNull)
val ivyProperties = DependencyUtils.getIvyProperties()

val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(packagesExclusions,
packages, repositories, ivyRepoPath, Option(ivySettingsPath))
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(true,
ivyProperties.packagesExclusions, ivyProperties.packages, ivyProperties.repositories,
ivyProperties.ivyRepoPath, Option(ivyProperties.ivySettingsPath))
val jars = {
val jarsProp = sys.props.get(config.JARS.key).orNull
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.deploy
package org.apache.spark.util

import java.io.File
import java.net.URI
Expand All @@ -25,12 +25,140 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
import org.apache.spark.util.{MutableURLClassLoader, Utils}

private[deploy] object DependencyUtils extends Logging {
case class IvyProperties(
packagesExclusions: String,
packages: String,
repositories: String,
ivyRepoPath: String,
ivySettingsPath: String)

private[spark] object DependencyUtils extends Logging {

def getIvyProperties(): IvyProperties = {
val Seq(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath) = Seq(
"spark.jars.excludes",
"spark.jars.packages",
"spark.jars.repositories",
"spark.jars.ivy",
"spark.jars.ivySettings"
).map(sys.props.get(_).orNull)
IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath)
}

AngersZhuuuu marked this conversation as resolved.
Show resolved Hide resolved
private def isInvalidQueryString(tokens: Array[String]): Boolean = {
tokens.length != 2 || StringUtils.isBlank(tokens(0)) || StringUtils.isBlank(tokens(1))
}

/**
* Parse URI query string's parameter value of `transitive` and `exclude`.
* Other invalid parameters will be ignored.
*
* @param uri Ivy URI need to be downloaded.
* @return Tuple value of parameter `transitive` and `exclude` value.
*
* 1. transitive: whether to download dependency jar of Ivy URI, default value is false
* and this parameter value is case-sensitive. Invalid value will be treat as false.
* Example: Input: exclude=org.mortbay.jetty:jetty&transitive=true
* Output: true
*
* 2. exclude: comma separated exclusions to apply when resolving transitive dependencies,
* consists of `group:module` pairs separated by commas.
* Example: Input: excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
* Output: [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http]
*/
private def parseQueryParams(uri: URI): (Boolean, String) = {
val uriQuery = uri.getQuery
if (uriQuery == null) {
(false, "")
} else {
val mapTokens = uriQuery.split("&").map(_.split("="))
if (mapTokens.exists(isInvalidQueryString)) {
throw new IllegalArgumentException(
s"Invalid query string in Ivy URI ${uri.toString}: $uriQuery")
}
val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)

// Parse transitive parameters (e.g., transitive=true) in an Ivy URI, default value is false
val transitiveParams = groupedParams.get("transitive")
if (transitiveParams.map(_.size).getOrElse(0) > 1) {
logWarning("It's best to specify `transitive` parameter in ivy URI query only once." +
" If there are multiple `transitive` parameter, we will select the last one")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hive has the same behaviour with this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hive has the same behaviour with this?

No, but we can have this warning

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, what's the hive behaviour? it will throw an exception instead selecting the last one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, what's the hive behaviour? it will throw an exception instead selecting the last one?

Select the last one
https://github.com/apache/hive/blob/aed7c86cdd59f0b2a4979633fbd191d451f2fd75/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L121-L127

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok.

}
val transitive =
transitiveParams.flatMap(_.takeRight(1).map(_._2 == "true").headOption).getOrElse(false)
AngersZhuuuu marked this conversation as resolved.
Show resolved Hide resolved

// Parse an excluded list (e.g., exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
// in an Ivy URI. When download Ivy URI jar, Spark won't download transitive jar
// in a excluded list.
val exclusionList = groupedParams.get("exclude").map { params =>
params.map(_._2).flatMap { excludeString =>
val excludes = excludeString.split(",")
if (excludes.map(_.split(":")).exists(isInvalidQueryString)) {
throw new IllegalArgumentException(
s"Invalid exclude string in Ivy URI ${uri.toString}:" +
" expected 'org:module,org:module,..', found " + excludeString)
}
excludes
}.mkString(",")
}.getOrElse("")

val validParams = Set("transitive", "exclude")
val invalidParams = groupedParams.keys.filterNot(validParams.contains).toSeq
if (invalidParams.nonEmpty) {
logWarning(s"Invalid parameters `${invalidParams.sorted.mkString(",")}` found " +
s"in Ivy URI query `$uriQuery`.")
}

(transitive, exclusionList)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if an invalid param is given in hive, e.g., invalidParam=xxxx? It is just ignored? Could you add tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we should at least warn on invalid param?

}
}

/**
* Download Ivy URI's dependency jars.
*
* @param uri Ivy URI need to be downloaded. The URI format should be:
* `ivy://group:module:version[?query]`
* Ivy URI query part format should be:
* `parameter=value&parameter=value...`
* Note that currently Ivy URI query part support two parameters:
* 1. transitive: whether to download dependent jars related to your Ivy URI.
* transitive=false or `transitive=true`, if not set, the default value is false.
* 2. exclude: exclusion list when download Ivy URI jar and dependency jars.
* The `exclude` parameter content is a ',' separated `group:module` pair string :
* `exclude=group:module,group:module...`
* @return Comma separated string list of jars downloaded.
*/
def resolveMavenDependencies(uri: URI): Seq[String] = {
val ivyProperties = DependencyUtils.getIvyProperties()
val authority = uri.getAuthority
if (authority == null) {
throw new IllegalArgumentException(
s"Invalid Ivy URI authority in uri ${uri.toString}:" +
" Expected 'org:module:version', found null.")
}
if (authority.split(":").length != 3) {
throw new IllegalArgumentException(
s"Invalid Ivy URI authority in uri ${uri.toString}:" +
s" Expected 'org:module:version', found $authority.")
}

val (transitive, exclusionList) = parseQueryParams(uri)

resolveMavenDependencies(
transitive,
exclusionList,
authority,
ivyProperties.repositories,
ivyProperties.ivyRepoPath,
Option(ivyProperties.ivySettingsPath)
).split(",")
}

def resolveMavenDependencies(
packagesTransitive: Boolean,
packagesExclusions: String,
packages: String,
repositories: String,
Expand All @@ -51,7 +179,8 @@ private[deploy] object DependencyUtils extends Logging {
SparkSubmitUtils.buildIvySettings(Option(repositories), Option(ivyRepoPath))
}

SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, exclusions = exclusions)
SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings,
transitive = packagesTransitive, exclusions = exclusions)
}

def resolveAndDownloadJars(
Expand Down