-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22839] [K8s] Refactor to unify driver and executor pod builder APIs #20910
Conversation
It's now subsumed by the basicdriverfeaturestep.
For all reviewers - this change is very large. Github's interpretation of the diff also doesn't present the changes in the most easily consumed manner. To account for this, the pull request is best reviewed and understood commit by commit. Each commit roughly translates one component from the old architecture to the new architecture. The changes are incrementally built as follows:
We can alternatively create multiple pull requests to merge this change incrementally, but each intermediate pull request would likely be broken in the K8s functionality. To ensure that master is never in a broken unusable state for K8s, we unfortunately need to merge the entire change at once. |
Requesting review from @vanzin , @foxish, @ifilonenko, @liyinan926, @Eje. Any other feedback is welcome! |
Kubernetes integration test starting |
Test build #88607 has finished for PR 20910 at commit
|
clientArguments.mainAppResource, | ||
clientArguments.mainClass, | ||
clientArguments.driverArgs) | ||
val orchestrator = new KubernetesDriverBuilder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be builder
driverContainer = container | ||
) | ||
} | ||
private[k8s] object KubernetesSpec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably should just be private[spark]
Kubernetes integration test status failure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finished the first pass briefly. Overall the new APIs and abstraction look good to me. One thing I would suggest in general is to try to shorten the names of arguments/parameters.
.map(str => str.split(",").toSeq) | ||
.getOrElse(Seq.empty[String]) | ||
|
||
def driverCustomEnvs(): Seq[(String, String)] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is driver specific and probably should not be here. What about making custom envs as an argument of the class similarly to labels and annotations? Then createDriverConf
below gets the driver custom envs and pass them in. This also works for executor environment variables specified by spark.executorEnv.
.
def get(conf: String, defaultValue: String): String = sparkConf.get(conf, defaultValue) | ||
|
||
def getOption(key: String): Option[String] = sparkConf.getOption(key) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra new line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, do you mean we should remove this newline or that one should be added here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I meant removing the extra new line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, will address in the next patch after others review.
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX) | ||
val driverSecretNamesToMountPaths = | ||
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX) | ||
new KubernetesConf( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a new line before new KubernetesConf
?
sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) | ||
} | ||
} | ||
val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a new line before this line?
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) | ||
val executorSecrets = | ||
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX) | ||
new KubernetesConf( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto, it improves readability with new lines separating the code a bit.
private[k8s] case class KubernetesSpec( | ||
pod: SparkPod, | ||
additionalDriverKubernetesResources: Seq[HasMetadata], | ||
podJavaSystemProperties: Map[String, String]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we shorten the name to just systemProperties
? One of the most frequent types of comments I got while working on the upstreaming was to use short names.
private[spark] object SparkPod { | ||
def initialPod(): SparkPod = { | ||
SparkPod( | ||
new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need .withNewMetadata().endMetadata().withNewSpec().endSpec()
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sort of. It allows everything that consumes one of these to use .editMetadata()
or editOrNewMetadata
when creating features. If you don't initialize the metadata and spec and then a downstream caller tries to invoke editMetadata
then we throw an NPE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
import org.apache.spark.launcher.SparkLauncher | ||
|
||
private[spark] class BasicDriverFeatureStep( | ||
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename this to driverConf
?
val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { | ||
baseFeatures ++ Seq(provideSecretsStep(kubernetesConf)) | ||
} else baseFeatures | ||
var spec = KubernetesSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a new line before this line?
new ConfigMapBuilder() | ||
.withNewMetadata() | ||
.withName(configMapName) | ||
.withNamespace(namespace) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why removed this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not necessary to set namespaces on these objects because the kubernetes client itself is namespaced.
This reverts commit 4c944c4.
Test build #88608 has finished for PR 20910 at commit
|
Kubernetes integration test starting |
Test build #88609 has finished for PR 20910 at commit
|
Kubernetes integration test status failure |
executorId: String, driverPod: Pod) | ||
extends KubernetesRoleSpecificConf | ||
|
||
private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe should be a case class
? This seems like a struct-like object which inclines me to think using a case class
seems more idiomatic here.
retest this please |
Kubernetes integration test starting |
Test build #88912 has finished for PR 20910 at commit
|
} else { | ||
executorCores.toString | ||
} | ||
private val executorLimitCores = kubernetesConf.sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this can also be simplified as kubernetesConf .get
.
LGTM with only one comment. |
Test build #88913 has finished for PR 20910 at commit
|
Kubernetes integration test status success |
Kubernetes integration test starting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks!
Kubernetes integration test status success |
Kubernetes integration test starting |
Test build #88915 has finished for PR 20910 at commit
|
Kubernetes integration test status success |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mccheah just one comment.
* Represents a step in configuring the Spark driver pod. | ||
*/ | ||
private[spark] trait DriverConfigurationStep { | ||
private[spark] case class KubernetesSpec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is named as though it applies to driver and executor construction. Maybe KubernetesDriverSpec
? It's also a bit unclear to me what purpose this abstraction serves as opposed to the way KubernetesExecutorBuilder
goes about building the pod.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you check KubernetesClientApplication
, it needs the extra fields here (the additional driver resources and the driver system properties) to construct the pod. In other words, the driver builder has to return a structure with more than just the pod.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think we could move some of the logic from KubernetesClientApplication
into KubernetesDriverBuilder
. Do you have any suggestions if we should move around the abstraction boundaries a bit?
Test build #89202 has finished for PR 20910 at commit
|
@mccheah I think it's fine as is now. We can take care of moving abstractions between the submission client and the driver in a future PR if necessary. Just the scala style issue needs taking care of; and then this LGTM. |
Kubernetes integration test starting |
Test build #89300 has finished for PR 20910 at commit
|
Kubernetes integration test status success |
Merging to master |
…pod builder APIs
What changes were proposed in this pull request?
Breaks down the construction of driver pods and executor pods in a way that uses a common abstraction for both spark-submit creating the driver and KubernetesClusterSchedulerBackend creating the executor. Encourages more code reuse and is more legible than the older approach.
The high-level design is discussed in more detail on the JIRA ticket. This pull request is the implementation of that design with some minor changes in the implementation details.
No user-facing behavior should break as a result of this change.
How was this patch tested?
Migrated all unit tests from the old submission steps architecture to the new architecture. Integration tests should not have to change and pass given that this shouldn't change any outward behavior.