New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26365][K8S] In kuberentes cluster mode, spark submit should pass driver exit code #40118
Conversation
Hello @dongjoon-hyun @holdenk , please help review this PR, given that there is a lot of user feedback on JIRA about this issue |
Overall I like it, but I would want to see a unit test (does not need to go all the way to exit but something to show that the code would match). |
Thanks for your review. Added unit test for |
.filter(driverContainerName == _.getName) | ||
.head.getState.getTerminated.getExitCode) | ||
} catch { | ||
case _: NullPointerException => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry but this could hide other important underlying exception. Do you think we can use more safer way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can normalize this exception and throw it because this is not the expected behavior (Driver container completed without exitCode)
import org.apache.spark.deploy.k8s.KubernetesDriverConf | ||
import org.apache.spark.deploy.k8s.KubernetesUtils._ | ||
import org.apache.spark.internal.Logging | ||
|
||
private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { | ||
def watchOrStop(submissionId: String): Boolean | ||
def reset(): Unit | ||
def getDriverExitCode(): Option[Int] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add some function documents about SparkException
?
} | ||
throw new SparkException("Fail to get driver exit code, when the application completed") | ||
} else { | ||
throw new SparkException("Call getDriverExitCode() when the application has not completed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this method is designed to return four cases, but why NullPointerException
is converted None
instead of case (1)?
- Case 1: SparkException: In case of
hasCompleted==false
- Case 2: Some(getExitCode): Successful case
- Case 3: None: In case of
hasCompleted==true
, butNullPointerException
- Case 4: SparkException:
Fail to get driver exit code, when the application completed
????
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for looking at the code in detail.
Four scenarios are considered here:
- Case 1: which normal won't be triggered under normal case, placed here to prevent the function from being misused.
- Case 2: Successful case
- Case 3: Only trigger if the app has completed but pod without exit code
- Case 4: due to we didn't get the specified driver container(Neither
KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME
norDEFAULT_DRIVER_CONTAINER_NAME
)
PS. In
KubernetesUtils.loadPodFromTemplate
function logic, we read the first container from user's template file, not hardwired toKUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME
orDEFAULT_DRIVER_CONTAINER_NAME
.
...ernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
Outdated
Show resolved
Hide resolved
...ernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
Outdated
Show resolved
Hide resolved
...es/core/src/test/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcherSuite.scala
Outdated
Show resolved
Hide resolved
val runningPod = new PodBuilder(pod).withStatus(runningState).build() | ||
watcher.eventReceived(Action.MODIFIED, runningPod) | ||
|
||
// non-completed case |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please split this negative case into a new test case.
// non-completed case | ||
assertThrows[SparkException] { | ||
watcher.getDriverExitCode() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to check the exception message because the function throws two different SparkException
s.
s"and submission ID $sId into Kubernetes") | ||
// no-test case exit with the driver's exit code | ||
if (!sys.props.get(IS_TESTING.key).getOrElse("false").toBoolean) { | ||
watcher.getDriverExitCode().foreach { exitCode => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR seems to propose to introduce a new SparkException
point to this class. In general, it's not good, as you know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @zwangsheng and @holdenk .
I left a few comments about the design, @zwangsheng . Could you take a look at those?
Just following back up, @zwangsheng do you have cycles? |
Thanks for watching this, sry to be late. I added unit test for |
What changes were proposed in this pull request?
Find spark driver container exit code and set as spark submit exit code.
Why are the changes needed?
Many users face this problem SPARK-26365
As a comparison, in yarn cluster mode, spark submit will throw exception when application be failed or killed.
Does this PR introduce any user-facing change?
Yes
Before, when application (driver container) on kubernetes fail, spark-submit still return 0 as exit code.
After this, spark submit will use driver's exit code or throw exception when not found driver's exit code.
How was this patch tested?
In local; If need unit test, willing to add