Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,9 @@ abstract class RDD[T: ClassTag](
// Block hit.
case Left(blockResult) =>
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().inputMetrics
existingMetrics.incBytesRead(blockResult.bytes)
Comment on lines -391 to -392
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? Based on TaskMetrics.inputMetrics document, it is

Metrics related to reading data from a [[org.apache.spark.rdd.HadoopRDD]] or from persisted data, defined only in tasks with input.

Even cached, don't we need to include its size into input size of the task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

input size of the task is increased when the rdd is first computed, I think it is not correct to increase the metrics again when read from cache. Correct me if i am wrong.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the doc, I think inputMetrics counts all input size the task processes. It is not for input size read from disk. Although you don't need to read from disk if it is cached, it still increases the amount of input the task is going to process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @viirya, as currently defined, it include all reads. Note, a cached RDD read could still involve network or disk reads.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on the @viirya comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the information @viirya. I'll close the pr

// When we read rdd from cache, we should not increase the inputMetrics
new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
test("input metrics with cache and coalesce") {
// prime the cache manager
val rdd = sc.textFile(tmpFilePath, 4).cache()
rdd.collect()
val bytesRead0 = runAndReturnBytesRead {
rdd.collect()
}
assert(bytesRead0 != 0)

val bytesRead = runAndReturnBytesRead {
rdd.count()
Expand All @@ -92,7 +95,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
}

// for count and coalesce, the same bytes should be read.
assert(bytesRead != 0)
assert(bytesRead == 0)
assert(bytesRead2 == bytesRead)
}

Expand Down Expand Up @@ -145,13 +148,16 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
test("input metrics on records read with cache") {
// prime the cache manager
val rdd = sc.textFile(tmpFilePath, 4).cache()
rdd.collect()
val records0 = runAndReturnRecordsRead {
rdd.collect()
}
assert(records0 != 0)

val records = runAndReturnRecordsRead {
rdd.count()
}

assert(records == numRecords)
assert(records == 0)
}

/**
Expand Down