Skip to content

Commit

Permalink
Remove unused method
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Jan 19, 2016
1 parent c04b5df commit 269031f
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 32 deletions.
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(blockResult.readMethod)
val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
existingMetrics.incBytesRead(blockResult.bytes)

val iter = blockResult.data.asInstanceOf[Iterator[T]]
Expand Down
25 changes: 0 additions & 25 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,31 +257,6 @@ class TaskMetrics extends Serializable {
if (_updatedBlockStatuses.nonEmpty) Some(_updatedBlockStatuses) else None
}

/**
* Returns the input metrics object that the task should use. Currently, if
* there exists an input metric with the same readMethod, we return that one
* so the caller can accumulate bytes read. If the readMethod is different
* than previously seen by this task, we return a new InputMetric but don't
* record it.
*
* Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed,
* we can store all the different inputMetrics (one per readMethod).
*/
private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod): InputMetrics = {
synchronized {
_inputMetrics match {
case None =>
val metrics = new InputMetrics(readMethod)
_inputMetrics = Some(metrics)
metrics
case Some(metrics @ InputMetrics(method)) if method == readMethod =>
metrics
case Some(InputMetrics(method)) =>
new InputMetrics(readMethod)
}
}
}

private[spark] def updateInputMetrics(): Unit = synchronized {
inputMetrics.foreach(_.updateBytesRead())
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()

val inputMetrics = context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)

// Sets the thread local variable for the file's name
split.inputSplit.value match {
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ class NewHadoopRDD[K, V](
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = getConf

val inputMetrics = context.taskMetrics
.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = getConf(isDriverSide = false)

val inputMetrics = context.taskMetrics
.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)

// Sets the thread local variable for the file's name
split.serializableHadoopSplit.value match {
Expand Down

0 comments on commit 269031f

Please sign in to comment.