From c31a41010afd7c5871166a80396ea47901bf5e80 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 28 Jan 2016 18:39:24 -0800 Subject: [PATCH] Do not overwrite existing values of InputMetrics#bytesRead --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 7 ++++++- .../src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 7 ++++++- .../spark/sql/execution/datasources/SqlNewHadoopRDD.scala | 7 ++++++- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 3204e6adceca2..e2ebd7f00d0d5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -215,6 +215,7 @@ class HadoopRDD[K, V]( // TODO: there is a lot of duplicate code between this and NewHadoopRDD and SqlNewHadoopRDD val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop) + val existingBytesRead = inputMetrics.bytesRead // Sets the thread local variable for the file's name split.inputSplit.value match { @@ -230,9 +231,13 @@ class HadoopRDD[K, V]( case _ => None } + // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics. + // If we do a coalesce, however, we are likely to compute multiple partitions in the same + // task and in the same thread, in which case we need to avoid override values written by + // previous partitions (SPARK-13071). def updateBytesRead(): Unit = { getBytesReadCallback.foreach { getBytesRead => - inputMetrics.setBytesRead(getBytesRead()) + inputMetrics.setBytesRead(existingBytesRead + getBytesRead()) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 4d2816e335fe3..e71d3405c0ead 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -130,6 +130,7 @@ class NewHadoopRDD[K, V]( val conf = getConf val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop) + val existingBytesRead = inputMetrics.bytesRead // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes @@ -139,9 +140,13 @@ class NewHadoopRDD[K, V]( case _ => None } + // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics. + // If we do a coalesce, however, we are likely to compute multiple partitions in the same + // task and in the same thread, in which case we need to avoid override values written by + // previous partitions (SPARK-13071). def updateBytesRead(): Unit = { getBytesReadCallback.foreach { getBytesRead => - inputMetrics.setBytesRead(getBytesRead()) + inputMetrics.setBytesRead(existingBytesRead + getBytesRead()) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala index edd87c2d8ed07..9703b16c86f90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala @@ -127,6 +127,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( val conf = getConf(isDriverSide = false) val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop) + val existingBytesRead = inputMetrics.bytesRead // Sets the thread local variable for the file's name split.serializableHadoopSplit.value match { @@ -142,9 +143,13 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( case _ => None } + // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics. + // If we do a coalesce, however, we are likely to compute multiple partitions in the same + // task and in the same thread, in which case we need to avoid override values written by + // previous partitions (SPARK-13071). def updateBytesRead(): Unit = { getBytesReadCallback.foreach { getBytesRead => - inputMetrics.setBytesRead(getBytesRead()) + inputMetrics.setBytesRead(existingBytesRead + getBytesRead()) } }