-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24248][K8S] Use level triggering and state reconciliation in scheduling and lifecycle #21366
Conversation
…for scheduling Previously, the scheduler backend was maintaining state in many places, not only for reading state but also writing to it. For example, state had to be managed in both the watch and in the executor allocator runnable. Furthermore one had to keep track of multiple hash tables. We can do better here by: (1) Consolidating the places where we manage state. Here, we take inspiration from traditional Kubernetes controllers. These controllers tend to implement an event queue which is populated by two sources: a watch connection, and a periodic poller. Controllers typically use both mechanisms for redundancy; the watch connection may drop, so the periodic polling serves as a backup. Both sources write pod updates to a single event queue and then a processor periodically processes the current state of pods as reported by the two sources. (2) Storing less specialized in-memory state in general. Previously we were creating hash tables to represent the state of executors. Instead, it's easier to represent state solely by the event queue, which has predictable read/write patterns and is more or less just a local up-to-date cache of the cluster's status.
Needs tests. @foxish @liyinan926 for initial comments on the design. |
Test build #90810 has finished for PR 21366 at commit
|
// We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the | ||
// executors that are running. But, here we choose instead to maintain all state within this | ||
// class from the persecptive of the k8s API. Therefore whether or not this scheduler loop | ||
// believes a scheduler is running is dictated by the K8s API rather than Spark's RPC events. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
believes an executor is running*
|
||
private def findExitReason(pod: Pod, execId: Long): ExecutorExited = { | ||
val exitCode = findExitCode(pod) | ||
val (exitCausedByApp, exitMessage) = if (isDeleted(pod)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is 100% accurate - the pod may be evicted by the Kubernetes API if the pod misbehaves, so we should introspect if Kubernetes kicked out the pod because the pod itself did something wrong or if the pod was just deleted by a user or by this Spark application.
|
||
import org.apache.spark.deploy.k8s.Constants._ | ||
|
||
private[spark] class ExecutorPodsPollingEventSource( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's noteworthy that the resync polls can also be done in ExecutorPodsEventHandler#processEvents
. The reason we don't is because we probably want the resync polls to occur on a different interval than the event handling passes. You may, for example, ask for the event handler to trigger very frequently so that pod updates are dealt with promptly. But you don't want to be polling the API server every 5 seconds.
def start(applicationId: String): Unit = { | ||
require(pollingFuture == null, "Cannot start polling more than once.") | ||
pollingFuture = pollingExecutor.scheduleWithFixedDelay( | ||
new PollRunnable(applicationId), 0L, 30L, TimeUnit.SECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should make these and other intervals like it configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
Test build #90813 has finished for PR 21366 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Kubernetes integration test starting |
Test build #90818 has finished for PR 21366 at commit
|
Kubernetes integration test status failure |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Kubernetes integration test starting |
Test build #90820 has finished for PR 21366 at commit
|
Kubernetes integration test status failure |
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #90819 has finished for PR 21366 at commit
|
Test build #90815 has finished for PR 21366 at commit
|
Kubernetes integration test status success |
Test build #91728 has finished for PR 21366 at commit
|
Test build #91727 has finished for PR 21366 at commit
|
Test build #91731 has finished for PR 21366 at commit
|
@mccheah could you add a design doc for future reference and so that new contributors can understand better the rationale behind this. There is some description in the JIRA ticket but not enough to describe the final solution. |
kubernetesClient | ||
.pods() | ||
.withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString) | ||
.delete() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't removeExecutorFromSpark be called here as well? Couldn't be the case that the executor exists at a higher level but K8s backend missed it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's handled by the lifecycle manager already, because the lifecycle manager looks at what the scheduler backend believes are its executors and reconciles them with what's in the snapshot.
|
||
import org.apache.spark.util.{ThreadUtils, Utils} | ||
|
||
private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: ScheduledExecutorService) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a description of the class here.
.newDaemonSingleThreadScheduledExecutor("kubernetes-executor-snapshots-buffer") | ||
val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(bufferSnapshotsExecutor) | ||
val removedExecutorsCache = CacheBuilder.newBuilder() | ||
.expireAfterWrite(3, TimeUnit.MINUTES) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 3 minutes? Should this be configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think it has to be configurable. Basically we should only receive the removed executor events multiple times for a short period of time, then we should settle into steady state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cache is only for a best effort attempt to not remove the same executor from the scheduler backend multiple times, but at the end of the day even if we do accidentally remove multiple times the only noticeable result is noisy logs. The scheduler backend properly handles multiple attempts to remove but we'd prefer it if we didn't have to.
snapshots.foreach { snapshot => | ||
snapshot.executorPods.foreach { case (execId, state) => | ||
state match { | ||
case deleted@PodDeleted(pod) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/succeeded@PodSucceeded(pod)/succeeded@PodSucceeded(_)
case deleted@PodDeleted(pod) => | ||
removeExecutorFromSpark(schedulerBackend, deleted, execId) | ||
execIdsRemovedInThisRound += execId | ||
case failed@PodFailed(pod) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above.
execIdsRemovedInThisRound += execId | ||
case failed@PodFailed(pod) => | ||
onFinalNonDeletedState(failed, execId, schedulerBackend, execIdsRemovedInThisRound) | ||
case succeeded@PodSucceeded(pod) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above.
new LinkedBlockingQueue[ExecutorPodsSnapshot](), onNewSnapshots) | ||
subscribers += newSubscriber | ||
pollingTasks += subscribersExecutor.scheduleWithFixedDelay( | ||
toRunnable(() => callSubscriber(newSubscriber)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
toRunnable is not needed with lambdas in Java 8. Just pass there: () => callSubscriber(newSubscriber)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just tried that and it doesn't work - I think that requires the scala-java8-compat module which I don't think is worth pulling in for just this case.
|
||
private class PollRunnable(applicationId: String) extends Runnable { | ||
override def run(): Unit = { | ||
snapshotsStore.replaceSnapshot(kubernetesClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you start with an empty state to trigger executor creation at the very beginning when the driver starts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not strictly why that's done here but a side-effect I suppose. Really the snapshots store should push an initial empty snapshot to all subscribers when it starts, and the unit tests do check for that - it's the responsibility of the snapshots store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you need to trigger the initial creation of executors somehow and yes I saw that in the tests, my only concern is that this should be explicit not implicit to make code more obvious anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see - I think what we actually want is ExecutorPodsSnapshotStoreImpl
to initialize the subscriber with its current snapshot. That creates the semantics where the new subscriber will first receive the most up to date state immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And though we don't allow for this right now, the above would allow subscribers to be added midway through to receive the most recent snapshot immediately. But again we don't do this right now - we setup all subscribers on startup before starting pushing snapshots.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could add some comment saying this is where we create executors and by what way.
I mean on mesos you start executors when you get offers from agents and that is straightforward and makes sense. Here you want to start them ASAP, you have no restrictions, so then you can send Spark tasks to them right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But polling isn't where we start to create executors - that's done on the subscriber rounds. Polling here populates the snapshots store, but processing the snapshots happens on the subscriber thread(s). Furthermore with the scheme proposed above you never have to even poll for snapshots once before we begin requesting executors, because the pods allocator subscriber will trigger immediately with an empty snapshot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example if we changed the initialDelay
here to stall before the first snapshots sync then with the above scheme we'd still try to request executors immediately, because the subscriber thread kicks off an allocation round immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I see. I guess this is done by ExecutorPodsAllocator as a subscriber when it gets the empty snapshot.
conf: SparkConf, | ||
kubernetesClient: KubernetesClient, | ||
snapshotsStore: ExecutorPodsSnapshotsStore, | ||
pollingExecutor: ScheduledExecutorService) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add some debug logging here. In general it would be good to be able to trace what is happening in case of a an issue with debug mode, this applies to all classes introduced for both watching and polling.
I can do that, but would we consider that blocking the merge of this PR? I'd like to get this in soon, it's been open for awhile. |
Agree with @mccheah on not blocking this on a design doc. This PR strictly improves the management of executor states in k8s compared to how it was done before. So we really should get this merged soon. |
If last round's comments are addressed, LGTM from me. Important behavior to check is - the snapshot, and creating replacement executors based on captured snapshot. |
Ok, addressed comments. The latest patch also makes it so that the subscribers run in a thread pool instead of just on a single thread. We have two subscribers so now they can run concurrently, if that ever comes up. Not much else besides addressing the comments. @skonto if you're +1 then I'll merge. |
@mccheah thanx a lot better with the comments. +1 |
Kubernetes integration test starting |
Kubernetes integration test status failure |
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status success |
Ok, I'm merging to master. Thanks everyone for contributing to review - @foxish, @liyinan926 , @skonto , @dvogelbacher, @erikerlandson. As discussed earlier, I will post a design document for how this all works to the JIRA ticket. |
Test build #91866 has finished for PR 21366 at commit
|
Test build #91870 has finished for PR 21366 at commit
|
What changes were proposed in this pull request?
Previously, the scheduler backend was maintaining state in many places, not only for reading state but also writing to it. For example, state had to be managed in both the watch and in the executor allocator runnable. Furthermore, one had to keep track of multiple hash tables.
We can do better here by:
Consolidating the places where we manage state. Here, we take inspiration from traditional Kubernetes controllers. These controllers tend to follow a level-triggered mechanism. This means that the controller will continuously monitor the API server via watches and polling, and on periodic passes, the controller will reconcile the current state of the cluster with the desired state. We implement this by introducing the concept of a pod snapshot, which is a given state of the executors in the Kubernetes cluster. We operate periodically on snapshots. To prevent overloading the API server with polling requests to get the state of the cluster (particularly for executor allocation where we want to be checking frequently to get executors to launch without unbearably bad latency), we use watches to populate snapshots by applying observed events to a previous snapshot to get a new snapshot. Whenever we do poll the cluster, the polled state replaces any existing snapshot - this ensures eventual consistency and mirroring of the cluster, as is desired in a level triggered architecture.
Storing less specialized in-memory state in general. Previously we were creating hash tables to represent the state of executors. Instead, it's easier to represent state solely by the snapshots.
How was this patch tested?
Integration tests should test there's no regressions end to end. Unit tests to be updated, in particular focusing on different orderings of events, particularly accounting for when events come in unexpected ordering.