-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28927][ML] Rethrow block mismatch exception in ALS when input data is nondeterministic #25789
Conversation
R/pkg/R/mllib_recommendation.R
Outdated
@@ -82,6 +82,9 @@ setClass("ALSModel", representation(jobj = "jobj")) | |||
#' statsS <- summary(modelS) | |||
#' } | |||
#' @note spark.als since 2.1.0 | |||
#' @note the input rating dataframe to the ALS implementation must be determinate. If the training |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deterministic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure deterministic or determinate which one is more correct. I use determinate just because RDD deterministic level has DETERMINATE, UNORDERED and INDETERMINATE.
Let me change to deterministic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah despite the name of the constants, I think the word here is 'deterministic'.
@@ -920,6 +924,14 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { | |||
require(intermediateRDDStorageLevel != StorageLevel.NONE, | |||
"ALS is not designed to run without persisting intermediate RDDs.") | |||
|
|||
// Indeterminate rating RDD causes inconsistent in/out blocks in case of rerun. | |||
// It can cause runtime error when matching in/out user/item blocks. | |||
if (ratings.outputDeterministicLevel == DeterministicLevel.INDETERMINATE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, I didn't even know this existed.
I get it, but, this is true of so many operations in Spark. Why is ALS special?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is because how ALS is implemented.
ALS uses the training data RDD, to make user/item in/out blocks, like:
training RDD -> user in/out block
training RDD -> item in/out block
Later, it matches user in block with item out block, and item in block with user out block.
If the training RDD is indeterminate, any rerun of its tasks can produce different output in such user/item blocks. In any iteration, if such rerun happens, mismatch hits and user/item index can't find correspond slot in user/item factors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if I understanding this correctly, mismatch -> failure is only one possible outcome? it could also end up matching the wrong user/item before the index is wrong? that seems more subtle and much harder to detect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is possible. It is just hard to tell if it is the case during ALS fitting time.
if (ratings.outputDeterministicLevel == DeterministicLevel.INDETERMINATE) { | ||
throw new IllegalArgumentException("The output of rating RDD can not be indeterminate. " + | ||
"If your training data has indeterminate RDD computations, like `randomSplit` or `sample`" + | ||
", please checkpoint the training data before running ALS.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or cache, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only cache doesn't fix this.
We use cache, but in case of cached blocks are lost, it still triggers rerun of the training RDD tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is going to break a lot of user code, although you are strictly speaking quite correct. If I do some train/test split and even cache() the results, which is the usual practice, it will fail now right? because the cached result is not considered deterministic. Does checkpointing change the determinism level? what if you lose the checkpoint?
It's not a wild idea. But this problem exists everywhere in theory in Spark. Any test/train split basically has this problem. I hesitate to enforce this so strictly everywhere? how about a warning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This kind of failure only happens when you lost blocks of training data. That can be cached blocks, or map outputs. It is not definitely going to fail in all cases. But it is more likely, when fitting ALS on big amount data. This is hard to reproduce in unit test or small dataset.
Checkpointing changes RDD output to deterministic:
spark/core/src/main/scala/org/apache/spark/rdd/RDD.scala
Lines 1919 to 1925 in c610de6
private[spark] final lazy val outputDeterministicLevel: DeterministicLevel.Value = { | |
if (isReliablyCheckpointed) { | |
DeterministicLevel.DETERMINATE | |
} else { | |
getOutputDeterministicLevel | |
} | |
} |
I think checkpointed RDD can't be rerun like cache or normal RDD, because checkpointing will clean up RDD lineage. When you lose checkpoint, the job should be failed.
Yeah, a warning sounds good. I also added notes to documents, hopefully this can notify users about this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I meant that throwing an exception is the thing that would definitely cause failures, when it currently happens to work fine in practice in almost all cases.
Test build #110584 has finished for PR 25789 at commit
|
Test build #110585 has finished for PR 25789 at commit
|
2555ef4
to
824cab9
Compare
a821fc9
to
d86541c
Compare
Test build #110595 has finished for PR 25789 at commit
|
Test build #110596 has finished for PR 25789 at commit
|
Test build #110597 has finished for PR 25789 at commit
|
@srowen @felixcheung I am thinking to catch ArrayIndexOutOfBoundsException and throws a Spark exception with explainable message. Because we don't want to break user code, we don't strictly require deterministic RDD. In case that users don't notice about this issue, I think it should be better to see a explainable error message, instead of confusing ArrayIndexOutOfBoundsException. WDYT? |
R/pkg/R/mllib_recommendation.R
Outdated
@@ -82,6 +82,10 @@ setClass("ALSModel", representation(jobj = "jobj")) | |||
#' statsS <- summary(modelS) | |||
#' } | |||
#' @note spark.als since 2.1.0 | |||
#' @note the input rating dataframe to the ALS implementation should not be indeterminate. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think we need to say "nondeterministic" and give an example (randomSplit), but also tell people how to fix it. Define a partitioning? sort order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A checkpoint or a sort before sampling can help. Sampled RDD is nondeterministic when its input RDD is unordered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any way to make, say, the result of randomSplit deterministic after it has been computed? I understand the usual answer of caching isn't 100% foolproof. But if there is no way to do it, hm, is it worth warning? because this is just generally true of lots of things in Spark
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If randomSplit or sample is computed, the only way to make it deterministic is to checkpoint. But obviously we can't checkpoint for users.
I think it is good to leave a clue so users can know what is going on when hit the ArrayIndexOutOfBoundsException during fitting ALS model.
Since we don't want to break existing user code, a warning is the least thing we can do?
I am also like to catch the exception and re-throw a meaningful message to users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But what if you lose the checkpoint? isn't this the same issue? At some level the answer is "you can't really fix this, anywhere", right? in practice, the fairly well understood caching/checkpoint mechanism works, everywhere. It seems inconsistent to just address this for ALS, as if it's not the same issue everywhere. It also seems hard to warn without providing any pointer to the solution, if there is one, but I can see that a warning is better than nothing.
If you're trying to fix a specific problem, maybe indeed detect the problem in question (a specific AIOOBE) and rewrap it, sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think checkpoint is relatively reliable. In case of checkpoint loss, Spark job fails without rerun. So you should not get an inconsistent data once you do checkpoint.
We have two ways to fix it, one is checkpoint, another is to sort data before sample/randomSplit. I added into the updated note.
Sounds like targeting a specific problem here is better. I do the catching AIOOBE thing and remove the warning as it seems not too much useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I think this is fairly reasonable. @jkbradley @MLnick does this sound right to you too?
Test build #110638 has finished for PR 25789 at commit
|
retest this please. |
Test build #110669 has finished for PR 25789 at commit
|
Merged to master |
What changes were proposed in this pull request?
Fitting ALS model can be failed due to nondeterministic input data. Currently the failure is thrown by an ArrayIndexOutOfBoundsException which is not explainable for end users what is wrong in fitting.
This patch catches this exception and rethrows a more explainable one, when the input data is nondeterministic.
Because we may not exactly know the output deterministic level of RDDs produced by user code, this patch also adds a note to Scala/Python/R ALS document about the training data deterministic level.
Why are the changes needed?
ArrayIndexOutOfBoundsException was observed during fitting ALS model. It was caused by mismatching between in/out user/item blocks during computing ratings.
If the training RDD output is nondeterministic, when fetch failure is happened, rerun part of training RDD can produce inconsistent user/item blocks.
This patch is needed to notify users ALS fitting on nondeterministic input.
Does this PR introduce any user-facing change?
Yes. When fitting ALS model on nondeterministic input data, previously if rerun happens, users would see ArrayIndexOutOfBoundsException caused by mismatch between In/Out user/item blocks.
After this patch, a SparkException with more clear message will be thrown, and original ArrayIndexOutOfBoundsException is wrapped.
How was this patch tested?
Tested on development cluster.