Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-32899][CORE] Support submit application with user-defined cluster manager #29770

Closed
wants to merge 1 commit into from

Conversation

ConeyLiu
Copy link
Contributor

What changes were proposed in this pull request?

Add the support to submit applications with user-defined cluster manager.

Why are the changes needed?

We have supported users to define the customed cluster manager with ExternalClusterManager trait. However, we can not submit the application with SparkSubmit. And also we can set the user-defined master with pyspark. The reason is that we check the master whether is the natively support one in SparkSubmit. However, the customed cluster manager is checked in SparkContext. This patch fixes the problem.

Does this PR introduce any user-facing change?

No

How was this patch tested?

New UT.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@KevinSmile
Copy link
Contributor

nit:
Maybe we have some doc things to do, e.g. add user-defined option here in in SparkSubmitArguments:

|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn,
| k8s://https://host:port, or local (Default: local[*]).

@KevinSmile
Copy link
Contributor

KevinSmile commented Sep 16, 2020

Also, I can see that SparkSubmit.scala is full of if...else... based on:

(args.deployMode, deployMode) match {
case (null, CLIENT) => args.deployMode = "client"
case (null, CLUSTER) => args.deployMode = "cluster"
case _ =>
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT
val isKubernetesClusterModeDriver = isKubernetesClient &&
sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)

So can we get the 4-tuple return value of prepareSubmitEnvironment correctly if a new user-defined option is added?

private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sparkConf = args.toSparkConf()
var childMainClass = ""

e.g. the childArgs is added differently in different modes:

if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
if (args.isPython) {
childArgs += ("--primary-py-file", args.primaryResource)
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
val mainFile = new Path(args.primaryResource).getName
childArgs += ("--primary-r-file", mainFile)
childArgs += ("--class", "org.apache.spark.deploy.RRunner")
} else {
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
}

I doubt that it would be hard for a user-defined mode to get correct 4-tuple return value of prepareSubmitEnvironment?

@ConeyLiu
Copy link
Contributor Author

@KevinSmile , thansk for the advice. Which one do you think can not be processed well?

@ConeyLiu
Copy link
Contributor Author

Hi @cloud-fan, could you help to review on this? Thanks a lot.

@KevinSmile
Copy link
Contributor

KevinSmile commented Sep 16, 2020

Just a glance, I'm not very sure about this...

Let's say we have a new user-defined cluster manager called my-Yarn, which is all the same as Yarn, so we just copy the yarn-scheduler code to implement the new one .

But what should we do with the following if(isYarnCluster) snippet in SparkSubmit.scala, copy-paste and change it to if(isMyYarnCluster) to get correct childArgs? Where to copy-paste it?

if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
if (args.isPython) {
childArgs += ("--primary-py-file", args.primaryResource)
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
val mainFile = new Path(args.primaryResource).getName
childArgs += ("--primary-r-file", mainFile)
childArgs += ("--class", "org.apache.spark.deploy.RRunner")
} else {
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
}

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Sep 16, 2020

But what should we do with the following if(isYarnCluster) snippet in SparkSubmit.scala, copy-paste and change it to if(isMyYarnCluster)?

The childArgs is passed to start the cluster mode driver. If a user want to use the UserDefinedClusterManager and with cluster mode, they should process the logic in the Main.class right? The current ExternalClusterManager has no ability to process it.

@KevinSmile
Copy link
Contributor

KevinSmile commented Sep 16, 2020

But Main.class's childArgs is retrieved from SparkSubmit( 4-tuple return value of prepareSubmitEnvironment)?

private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

app.start(childArgs.toArray, sparkConf)

So it's another topic? Which means that the current design of SparkSubmit.scala and ExternalClusterManager is not so elegant, user who want to use UserDefinedClusterManager should also modify SparkSubmit besides the user-defined-scheduler part?

@tgravescs
Copy link
Contributor

this is an interesting issue. One of the issues is how does spark submit properly know what all arguments are supported by that cluster manager. Similar what deployment modes are supported. there is a lot of cluster manager specific logic in here and this may work for you for most things but I would be surprised if it worked for all things.

Did you test this with both spark-submit and the interactive shells (spark-shell, pyspark, etc)? I'm not sure if you cluster manager supports full cluster mode or not vs running driver locally.

I think if we officially want to support this we need something else, some parts would need to be pluggable. I think that is going to be a whole lot more change though.

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Sep 16, 2020

But Main.class's childArgs is retrieved from SparkSubmit( 4-tuple return value of prepareSubmitEnvironment)?

Look the code may be more clear. The main method receives arguments as well. So the parsed args will be passed into the main method in client mode, and we do not need to append some special arguments. The extra arguments need to be appended when in cluster mode.

So it's another topic? Which means that the current design of SparkSubmit.scala and ExternalClusterManager is not so elegant, user who want to use UserDefinedClusterManager should also modify SparkSubmit besides the user-defined-scheduler part?

Yes

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Sep 16, 2020

Hi @tgravescs. Thanks for the advice. I have not tested all arguments, however, it works for us now. I agree with you, we need to redesign the ExternalClusterManager to support full arguments. Actually, the special logic for each ClusterManager should be processed in each ClusterManager, not in the SparkSubmit. The current change should be the smallest change. cc @carsonwang

@KevinSmile
Copy link
Contributor

Yes, not pluggable is the point.

@tgravescs
Copy link
Contributor

I'm possibly ok with this one but don't really want to keep hacking on it. I can see this going in and then someone filing another jira saying it doesn't work for X or Y and then we keep hacking at it. I would rather see it done properly if we are going to say its supported.

@ConeyLiu
Copy link
Contributor Author

@tgravescs, thanks for the suggestion. Mark this as WIP, will optimize it better.

@ConeyLiu ConeyLiu changed the title [SPARK-32899][CORE] Support submit application with user-defined cluster manager [WIP][SPARK-32899][CORE] Support submit application with user-defined cluster manager Sep 17, 2020
@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants