-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22644][ML][TEST] Make ML testsuite support StructuredStreaming test #19843
Conversation
Test build #84287 has finished for PR 19843 at commit
|
Test build #84286 has finished for PR 19843 at commit
|
Jenkins retest this please. |
Test build #84290 has finished for PR 19843 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like this overall approach. Just a few small comments.
assert(checkFunction != null) | ||
sparkAnswer.foreach { row => | ||
try { | ||
checkFunction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be checkFunction(row)
?
@@ -133,6 +133,9 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be | |||
} | |||
|
|||
def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, false, false) | |||
|
|||
def apply(checkFunction: Row => Unit): CheckAnswerRows = | |||
CheckAnswerRows(null, false, false, checkFunction) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This construction feels very forced. I wonder if we should define a new case class for CheckAnswer(function)
. Maybe we could have the CheckAnswer classes share a common trait to avoid needing to duplicate the setup code for checking the answers.
I don't feel strongly about this, just a thought.
@@ -233,7 +232,8 @@ class LinearRegressionSuite | |||
assert(model2.intercept ~== interceptR relTol 1E-3) | |||
assert(model2.coefficients ~= coefficientsR relTol 1E-3) | |||
|
|||
model1.transform(datasetWithDenseFeature).select("features", "prediction").collect().foreach { | |||
testTransformer[(Double, Vector)](datasetWithDenseFeature, Seq(model1), | |||
"features", "prediction") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like this, I'm impressed by how little we need to change to leverage existing tests and have them run in both streaming and in batch mode!!
@MrBago Thanks! |
Test build #84328 has finished for PR 19843 at commit
|
sc.setCheckpointDir(checkpointDir) | ||
} | ||
|
||
override def afterAll() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In https://github.com/apache/spark/pull/9677/files#diff-2d609a0839a51280e0f10cc73ef42d21R35 we added a call to clear the active SQLContext after each MLlib test suite, and MLlibTestSparkContext still does that:
spark/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala
Line 49 in 8ae004b
SparkSession.clearActiveSession() |
At the time, that was necessary to avoid flakiness in Jenkins tests. Do you know if that's no longer necessary? CCing @nkronenfeld and @gatorsmile who seem to have worked on the SQL test traits recently
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe not needed if you are not hitting any flaky test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, we'll find out in a few weeks : )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking back in here, I haven't seen flakiness, but I have seen cascading failures, which I believe are a new phenomenon: A failure in one test suite A seems to cause subsequent tests suites B, C, ... to fail. (But when A is fixed, then B, C,... run correctly.) Do you not see this in SQL tests? Do you think this might be related to not cleaning up the active context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it's worse than this. I see a bunch of failures when I run multiple test suites at once, even when doing sbt clean package
beforehand and without any tests which fail by themselves. Will test on master and complain on the dev list if it's an issue. (No need to respond here)
} | ||
} | ||
|
||
def testTransformer[A : Encoder](dataframe: DataFrame, transformers: Seq[Transformer], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a Seq of Transformers? Should we not just use a PipelineModel as the Transformer when needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sometime only one transformer? I am not sure which is better.
@@ -162,6 +168,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be | |||
private def operatorName = if (lastOnly) "CheckLastBatch" else "CheckAnswer" | |||
} | |||
|
|||
case class CheckAnswerRowsByFunc(checkFunction: Row => Unit, lastOnly: Boolean) | |||
extends StreamAction with StreamMustBeRunning { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: style: indent +2 spaces
This looks awesome; I just had a couple of comments. Btw, this is fancy test code. It might be nice to add a little unit test to MLTest.scala to make sure that testTransformer does indeed fail when the per-row check fails. |
add UT for MLTest and change to use PipelineModel. |
Test build #84600 has finished for PR 19843 at commit
|
} | ||
|
||
def testPipelineModelOnStreamData[A : Encoder](dataframe: DataFrame, | ||
pipelineModel: PipelineModel, firstResultCol: String, otherResultCols: String*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I should have been clearer: I was suggesting taking a Transformer here, rather than a Seq of Transformers. A PipelineModel is a type of Transformer, so users of this trait could use a PipelineModel as the Transformer in order to string together multiple Transformers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, style nit: For multi-line method headers, please put 1 argument per line.
I'll make a call: Given that the SQL tests do not use clearActive, let's not bother with it. If we see flakiness, then we can try adding clearActive as a fix. |
Test build #84637 has finished for PR 19843 at commit
|
Also, can you please remove "WIP" from the PR title and update the Testing part of the PR description? |
LGTM, but I'll wait for the PR title & description updates to merge this. Thanks! |
Test build #84675 has finished for PR 19843 at commit
|
LGTM |
Test build #4008 has finished for PR 19843 at commit
|
retest this please |
That failure was caused by a bad change elsewhere which has been reverted. Testing again... |
Test build #84796 has finished for PR 19843 at commit
|
Test build #4009 has finished for PR 19843 at commit
|
Merging with master |
import org.apache.spark.sql.test.TestSparkSession | ||
import org.apache.spark.util.Utils | ||
|
||
trait MLTest extends StreamTest with TempDirectory { self: Suite => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MLStreamTest
seems more proper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But my intention here, is let every ml test suite inherit MLTest
.
What changes were proposed in this pull request?
We need to add some helper code to make testing ML transformers & models easier with streaming data. These tests might help us catch any remaining issues and we could encourage future PRs to use these tests to prevent new Models & Transformers from having issues.
I add a
MLTest
trait which extendsStreamTest
trait, and overridecreateSparkSession
. So ML testsuite can only extendMLTest
, to use both ML & Stream test util functions.I only modify one testcase in
LinearRegressionSuite
, for first pass review.Link to #19746
How was this patch tested?
MLTestSuite
added.