Skip to content
Permalink
Browse files

[SPARK-25674][SQL] If the records are incremented by more than 1 at a…

… time,the number of bytes might rarely ever get updated

## What changes were proposed in this pull request?
If the records are incremented by more than 1 at a time,the number of bytes might rarely ever get updated,because it might skip over the count that is an exact multiple of UPDATE_INPUT_METRICS_INTERVAL_RECORDS.

This PR just checks whether the increment causes the value to exceed a higher multiple of UPDATE_INPUT_METRICS_INTERVAL_RECORDS.

## How was this patch tested?
existed unit tests

Closes #22594 from 10110346/inputMetrics.

Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
(cherry picked from commit 69f5e9c)
Signed-off-by: Sean Owen <sean.owen@databricks.com>
  • Loading branch information...
10110346 authored and srowen committed Oct 11, 2018
1 parent 7102aee commit 5324a85a2fb0d632dff028c7d1d2ea8a4ead94f4
@@ -112,12 +112,15 @@ class FileScanRDD(
val nextElement = currentIterator.next()
// TODO: we should have a better separation of row based and batch based scan, so that we
// don't need to run this `if` for every record.
val preNumRecordsRead = inputMetrics.recordsRead
if (nextElement.isInstanceOf[ColumnarBatch]) {
inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows())
} else {
inputMetrics.incRecordsRead(1)
}
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
// The records may be incremented by more than 1 at a time.
if (preNumRecordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS !=
inputMetrics.recordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS) {
updateBytesRead()
}
nextElement

0 comments on commit 5324a85

Please sign in to comment.
You can’t perform that action at this time.