New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25414][SS][TEST] make it clear that the numRows metrics should be counted for each scan of the source #22402
Conversation
Test build #95985 has finished for PR 22402 at commit
|
Test build #95984 has finished for PR 22402 at commit
|
Test build #95994 has finished for PR 22402 at commit
|
Test build #96016 has finished for PR 22402 at commit
|
retest this please |
Test build #96019 has finished for PR 22402 at commit
|
true | ||
} | ||
) | ||
withSQLConf(SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
turn off EXCHANGE_REUSE_ENABLED
, to expose the self-join numRows double count bug.
Test build #96032 has finished for PR 22402 at commit
|
@@ -460,9 +460,9 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi | |||
val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value") | |||
|
|||
val progress = getFirstProgress(streamingInputDF.join(streamingInputDF, "value")) | |||
assert(progress.numInputRows === 20) // data is read multiple times in self-joins |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So IIUC in this line, the EXCHANGE_REUSE_ENABLED == true, and its not read twice actually?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exchange reuse is not triggered here, because the project of one side is eliminated. In the kafka test, we have a cast in the project so Spark doesn't eliminate project ay any side, and the trigger exchange reuse.
Test build #96056 has finished for PR 22402 at commit
|
LGTM |
retest this please |
Test build #96160 has finished for PR 22402 at commit
|
retest this please |
Test build #96165 has finished for PR 22402 at commit
|
Test build #96163 has finished for PR 22402 at commit
|
retest this please |
Test build #96172 has finished for PR 22402 at commit
|
retest this please |
Test build #96244 has finished for PR 22402 at commit
|
thanks, merging to master! |
What changes were proposed in this pull request?
For self-join/self-union, Spark will produce a physical plan which has multiple
DataSourceV2ScanExec
instances referring to the sameReadSupport
instance. In this case, the streaming source is indeed scanned multiple times, and thenumInputRows
metrics should be counted for each scan.Actually we already have 2 test cases to verify the behavior:
StreamingQuerySuite.input row calculation with same V2 source used twice in self-join
KafkaMicroBatchSourceSuiteBase.ensure stream-stream self-join generates only one offset in log and correct metrics
.However, in these 2 tests, the expected result is different, which is super confusing. It turns out that, the first test doesn't trigger exchange reuse, so the source is scanned twice. The second test triggers exchange reuse, and the source is scanned only once.
This PR proposes to improve these 2 tests, to test with/without exchange reuse.
How was this patch tested?
test only change