-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30362][Core] Update InputMetrics in DataSourceRDD #27021
Conversation
@@ -56,6 +58,15 @@ class DataSourceRDD( | |||
context.addTaskCompletionListener[Unit](_ => reader.close()) | |||
val iter = new Iterator[Any] { | |||
private[this] var valuePrepared = false | |||
private val inputMetrics = context.taskMetrics().inputMetrics | |||
private val existingBytesRead = inputMetrics.bytesRead |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we can support this if the data source don't report the size metrics. AFAIK ds v1 doesn't support it either. We only support it in file source.
I think we can only support the "recordsRead" metrics for now. We need to design a general API for data sources to report metrics. cc @rdblue @brkyvz
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 @cloud-fan's advice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we update only "recordsRead". In the UI "Input Size/Records" column will look like "0/ 10". Is this okay ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's true for DS v1 too, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it's true for DS V1 also. Shall we have some marker trait which says Input Size metrics is supported. For e.g. FilePartitionReaderFactory supports Input Size metrics. So that we can collect input size metrics only if it supports .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes we should, but then it's an API design and we should involve more people to discuss.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then shall I update the solution and push, then people can comment on the approach ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest we send an email to dev list with your proposal to trigger discussion first. AFAIK many people are interested in the metrics API in DS v2.
Test build #115839 has finished for PR 27021 at commit
|
870ba7d
to
1be7a33
Compare
@rdblue please review this, I have tested the changes from my end. |
Test build #117431 has finished for PR 27021 at commit
|
1be7a33
to
6c87a41
Compare
Test build #117433 has finished for PR 27021 at commit
|
6c87a41
to
df4bff9
Compare
Test build #117437 has finished for PR 27021 at commit
|
this is weird :( , I am able to build successfully using mvn but sbt is failing . |
df4bff9
to
5bd4b7b
Compare
Test build #117445 has finished for PR 27021 at commit
|
} | ||
} | ||
|
||
class MetricsHandler extends Logging with Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these new classes be private
instead? I don't see a need to expose them.
+1 for this change, with a fix to avoid exposing the new helper classes. To address @cloud-fan's objection, this solution records the amount of data read by Hadoop file systems. We can always expose an additional way for v2 sources to return a size metric if the bytes read by those sources do not go through the Hadoop FileSystem API, but there are many cases that do use the file system API and at least those are supported by this change. Thanks @sandeep-katta! |
Test build #117465 has finished for PR 27021 at commit
|
retest this please |
Test build #117478 has finished for PR 27021 at commit
|
} | ||
} | ||
|
||
private class MetricsBatchIterator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: to be consistent, let's have a MetricsRowIterator
, and the base class MetricsIterator
doesn't need to implement next
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or the base class can implement next
as
protected def numRecords(element: T): Int
override def next(): I = {
val item = iter.next
metricsHandler.updateMetrics(numRecords(item))
item
}
@@ -167,4 +170,31 @@ class DataSourceV2ScanExecRedactionSuite extends DataSourceScanRedactionTest { | |||
} | |||
} | |||
} | |||
|
|||
test("SPARK-30362: test input metrics for DSV2") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to actually test DSV2, let's call withSQLConf
in the test and set SQLConf.USE_V1_SOURCE_LIST
to empty.
OK let's get in this workaround, but we do need to think of a metrics API for DS v2 later. LGTM except a few minor comments. |
Test build #117569 has finished for PR 27021 at commit
|
thanks, merging to master! |
Incase of DS v2 InputMetrics are not updated **Before Fix** ![inputMetrics](https://user-images.githubusercontent.com/35216143/71501010-c216df00-288d-11ea-8522-fdd50b13eae1.png) **After Fix** we can see that `Input Size / Records` is updated in the UI ![image](https://user-images.githubusercontent.com/35216143/71501000-b88d7700-288d-11ea-92fe-a727b2b79908.png) InputMetrics like bytesread and recordread should be updated No Added UT and also verified manually Closes apache#27021 from sandeep-katta/dsv2inputmetrics. Authored-by: sandeep katta <sandeep.katta2007@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> ---- Cherry-pick conflicts: - sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala only import statements, as upstream reorganized packages - sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala The test in original PR sets a SQLConf to empty string that doesnt exist yet. Will need to be reintroduced after pulling from master.
* [SPARK-30362][CORE] Update InputMetrics in DataSourceRDD Incase of DS v2 InputMetrics are not updated **Before Fix** ![inputMetrics](https://user-images.githubusercontent.com/35216143/71501010-c216df00-288d-11ea-8522-fdd50b13eae1.png) **After Fix** we can see that `Input Size / Records` is updated in the UI ![image](https://user-images.githubusercontent.com/35216143/71501000-b88d7700-288d-11ea-92fe-a727b2b79908.png) InputMetrics like bytesread and recordread should be updated No Added UT and also verified manually Closes apache#27021 from sandeep-katta/dsv2inputmetrics. Authored-by: sandeep katta <sandeep.katta2007@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> ---- Cherry-pick conflicts: - sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala only import statements, as upstream reorganized packages - sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala The test in original PR sets a SQLConf to empty string that doesnt exist yet. Will need to be reintroduced after pulling from master. * waitUntilEmpty requires a timeout until 5e92301 Co-authored-by: sandeep katta <sandeep.katta2007@gmail.com>
Incase of DS v2 InputMetrics are not updated **Before Fix** ![inputMetrics](https://user-images.githubusercontent.com/35216143/71501010-c216df00-288d-11ea-8522-fdd50b13eae1.png) **After Fix** we can see that `Input Size / Records` is updated in the UI ![image](https://user-images.githubusercontent.com/35216143/71501000-b88d7700-288d-11ea-92fe-a727b2b79908.png) InputMetrics like bytesread and recordread should be updated No Added UT and also verified manually Closes apache#27021 from sandeep-katta/dsv2inputmetrics. Authored-by: sandeep katta <sandeep.katta2007@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> RB=2618349 BUG=LIHADOOP-57737 G=spark-reviewers R=vsowrira,mshen A=vsowrira
What changes were proposed in this pull request?
Incase of DS v2 InputMetrics are not updated
Before Fix
After Fix we can see that
Input Size / Records
is updated in the UIWhy are the changes needed?
InputMetrics like bytesread and recordread should be updated
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added UT and also verified manually