Skip to content

Commit 302bb56

Browse files
Andrew Orrxin
Andrew Or
authored andcommittedJan 18, 2016
[SPARK-12884] Move classes to their own files for readability
This is a small step in implementing SPARK-10620, which migrates `TaskMetrics` to accumulators. This patch is strictly a cleanup patch and introduces no change in functionality. It literally just moves classes to their own files to avoid having single monolithic ones that contain 10 different classes. Parent PR: #10717 Author: Andrew Or <andrew@databricks.com> Closes #10810 from andrewor14/move-things.
1 parent 5e492e9 commit 302bb56

8 files changed

+493
-360
lines changed
 

‎core/src/main/scala/org/apache/spark/Accumulators.scala ‎core/src/main/scala/org/apache/spark/Accumulable.scala

+3-176
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,12 @@ package org.apache.spark
2020
import java.io.{ObjectInputStream, Serializable}
2121

2222
import scala.collection.generic.Growable
23-
import scala.collection.Map
24-
import scala.collection.mutable
25-
import scala.ref.WeakReference
2623
import scala.reflect.ClassTag
2724

2825
import org.apache.spark.serializer.JavaSerializer
2926
import org.apache.spark.util.Utils
3027

28+
3129
/**
3230
* A data type that can be accumulated, ie has an commutative and associative "add" operation,
3331
* but where the result type, `R`, may be different from the element type being added, `T`.
@@ -166,6 +164,7 @@ class Accumulable[R, T] private[spark] (
166164
override def toString: String = if (value_ == null) "null" else value_.toString
167165
}
168166

167+
169168
/**
170169
* Helper object defining how to accumulate values of a particular type. An implicit
171170
* AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type.
@@ -201,6 +200,7 @@ trait AccumulableParam[R, T] extends Serializable {
201200
def zero(initialValue: R): R
202201
}
203202

203+
204204
private[spark] class
205205
GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
206206
extends AccumulableParam[R, T] {
@@ -224,176 +224,3 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
224224
copy
225225
}
226226
}
227-
228-
/**
229-
* A simpler value of [[Accumulable]] where the result type being accumulated is the same
230-
* as the types of elements being merged, i.e. variables that are only "added" to through an
231-
* associative operation and can therefore be efficiently supported in parallel. They can be used
232-
* to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric
233-
* value types, and programmers can add support for new types.
234-
*
235-
* An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
236-
* Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.
237-
* However, they cannot read its value. Only the driver program can read the accumulator's value,
238-
* using its value method.
239-
*
240-
* The interpreter session below shows an accumulator being used to add up the elements of an array:
241-
*
242-
* {{{
243-
* scala> val accum = sc.accumulator(0)
244-
* accum: spark.Accumulator[Int] = 0
245-
*
246-
* scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
247-
* ...
248-
* 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
249-
*
250-
* scala> accum.value
251-
* res2: Int = 10
252-
* }}}
253-
*
254-
* @param initialValue initial value of accumulator
255-
* @param param helper object defining how to add elements of type `T`
256-
* @tparam T result type
257-
*/
258-
class Accumulator[T] private[spark] (
259-
@transient private[spark] val initialValue: T,
260-
param: AccumulatorParam[T],
261-
name: Option[String],
262-
internal: Boolean)
263-
extends Accumulable[T, T](initialValue, param, name, internal) {
264-
265-
def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
266-
this(initialValue, param, name, false)
267-
}
268-
269-
def this(initialValue: T, param: AccumulatorParam[T]) = {
270-
this(initialValue, param, None, false)
271-
}
272-
}
273-
274-
/**
275-
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
276-
* in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
277-
* available when you create Accumulators of a specific type.
278-
*
279-
* @tparam T type of value to accumulate
280-
*/
281-
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
282-
def addAccumulator(t1: T, t2: T): T = {
283-
addInPlace(t1, t2)
284-
}
285-
}
286-
287-
object AccumulatorParam {
288-
289-
// The following implicit objects were in SparkContext before 1.2 and users had to
290-
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
291-
// them automatically. However, as there are duplicate codes in SparkContext for backward
292-
// compatibility, please update them accordingly if you modify the following implicit objects.
293-
294-
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
295-
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
296-
def zero(initialValue: Double): Double = 0.0
297-
}
298-
299-
implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
300-
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
301-
def zero(initialValue: Int): Int = 0
302-
}
303-
304-
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
305-
def addInPlace(t1: Long, t2: Long): Long = t1 + t2
306-
def zero(initialValue: Long): Long = 0L
307-
}
308-
309-
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
310-
def addInPlace(t1: Float, t2: Float): Float = t1 + t2
311-
def zero(initialValue: Float): Float = 0f
312-
}
313-
314-
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
315-
}
316-
317-
// TODO: The multi-thread support in accumulators is kind of lame; check
318-
// if there's a more intuitive way of doing it right
319-
private[spark] object Accumulators extends Logging {
320-
/**
321-
* This global map holds the original accumulator objects that are created on the driver.
322-
* It keeps weak references to these objects so that accumulators can be garbage-collected
323-
* once the RDDs and user-code that reference them are cleaned up.
324-
*/
325-
val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
326-
327-
private var lastId: Long = 0
328-
329-
def newId(): Long = synchronized {
330-
lastId += 1
331-
lastId
332-
}
333-
334-
def register(a: Accumulable[_, _]): Unit = synchronized {
335-
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
336-
}
337-
338-
def remove(accId: Long) {
339-
synchronized {
340-
originals.remove(accId)
341-
}
342-
}
343-
344-
// Add values to the original accumulators with some given IDs
345-
def add(values: Map[Long, Any]): Unit = synchronized {
346-
for ((id, value) <- values) {
347-
if (originals.contains(id)) {
348-
// Since we are now storing weak references, we must check whether the underlying data
349-
// is valid.
350-
originals(id).get match {
351-
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
352-
case None =>
353-
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
354-
}
355-
} else {
356-
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
357-
}
358-
}
359-
}
360-
361-
}
362-
363-
private[spark] object InternalAccumulator {
364-
val PEAK_EXECUTION_MEMORY = "peakExecutionMemory"
365-
val TEST_ACCUMULATOR = "testAccumulator"
366-
367-
// For testing only.
368-
// This needs to be a def since we don't want to reuse the same accumulator across stages.
369-
private def maybeTestAccumulator: Option[Accumulator[Long]] = {
370-
if (sys.props.contains("spark.testing")) {
371-
Some(new Accumulator(
372-
0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true))
373-
} else {
374-
None
375-
}
376-
}
377-
378-
/**
379-
* Accumulators for tracking internal metrics.
380-
*
381-
* These accumulators are created with the stage such that all tasks in the stage will
382-
* add to the same set of accumulators. We do this to report the distribution of accumulator
383-
* values across all tasks within each stage.
384-
*/
385-
def create(sc: SparkContext): Seq[Accumulator[Long]] = {
386-
val internalAccumulators = Seq(
387-
// Execution memory refers to the memory used by internal data structures created
388-
// during shuffles, aggregations and joins. The value of this accumulator should be
389-
// approximately the sum of the peak sizes across all such data structures created
390-
// in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
391-
new Accumulator(
392-
0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
393-
) ++ maybeTestAccumulator.toSeq
394-
internalAccumulators.foreach { accumulator =>
395-
sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
396-
}
397-
internalAccumulators
398-
}
399-
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import scala.collection.{mutable, Map}
21+
import scala.ref.WeakReference
22+
23+
24+
/**
25+
* A simpler value of [[Accumulable]] where the result type being accumulated is the same
26+
* as the types of elements being merged, i.e. variables that are only "added" to through an
27+
* associative operation and can therefore be efficiently supported in parallel. They can be used
28+
* to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric
29+
* value types, and programmers can add support for new types.
30+
*
31+
* An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
32+
* Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.
33+
* However, they cannot read its value. Only the driver program can read the accumulator's value,
34+
* using its value method.
35+
*
36+
* The interpreter session below shows an accumulator being used to add up the elements of an array:
37+
*
38+
* {{{
39+
* scala> val accum = sc.accumulator(0)
40+
* accum: spark.Accumulator[Int] = 0
41+
*
42+
* scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
43+
* ...
44+
* 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
45+
*
46+
* scala> accum.value
47+
* res2: Int = 10
48+
* }}}
49+
*
50+
* @param initialValue initial value of accumulator
51+
* @param param helper object defining how to add elements of type `T`
52+
* @tparam T result type
53+
*/
54+
class Accumulator[T] private[spark] (
55+
@transient private[spark] val initialValue: T,
56+
param: AccumulatorParam[T],
57+
name: Option[String],
58+
internal: Boolean)
59+
extends Accumulable[T, T](initialValue, param, name, internal) {
60+
61+
def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
62+
this(initialValue, param, name, false)
63+
}
64+
65+
def this(initialValue: T, param: AccumulatorParam[T]) = {
66+
this(initialValue, param, None, false)
67+
}
68+
}
69+
70+
71+
// TODO: The multi-thread support in accumulators is kind of lame; check
72+
// if there's a more intuitive way of doing it right
73+
private[spark] object Accumulators extends Logging {
74+
/**
75+
* This global map holds the original accumulator objects that are created on the driver.
76+
* It keeps weak references to these objects so that accumulators can be garbage-collected
77+
* once the RDDs and user-code that reference them are cleaned up.
78+
*/
79+
val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
80+
81+
private var lastId: Long = 0
82+
83+
def newId(): Long = synchronized {
84+
lastId += 1
85+
lastId
86+
}
87+
88+
def register(a: Accumulable[_, _]): Unit = synchronized {
89+
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
90+
}
91+
92+
def remove(accId: Long) {
93+
synchronized {
94+
originals.remove(accId)
95+
}
96+
}
97+
98+
// Add values to the original accumulators with some given IDs
99+
def add(values: Map[Long, Any]): Unit = synchronized {
100+
for ((id, value) <- values) {
101+
if (originals.contains(id)) {
102+
// Since we are now storing weak references, we must check whether the underlying data
103+
// is valid.
104+
originals(id).get match {
105+
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
106+
case None =>
107+
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
108+
}
109+
} else {
110+
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
111+
}
112+
}
113+
}
114+
115+
}
116+
117+
118+
/**
119+
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
120+
* in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
121+
* available when you create Accumulators of a specific type.
122+
*
123+
* @tparam T type of value to accumulate
124+
*/
125+
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
126+
def addAccumulator(t1: T, t2: T): T = {
127+
addInPlace(t1, t2)
128+
}
129+
}
130+
131+
132+
object AccumulatorParam {
133+
134+
// The following implicit objects were in SparkContext before 1.2 and users had to
135+
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
136+
// them automatically. However, as there are duplicate codes in SparkContext for backward
137+
// compatibility, please update them accordingly if you modify the following implicit objects.
138+
139+
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
140+
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
141+
def zero(initialValue: Double): Double = 0.0
142+
}
143+
144+
implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
145+
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
146+
def zero(initialValue: Int): Int = 0
147+
}
148+
149+
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
150+
def addInPlace(t1: Long, t2: Long): Long = t1 + t2
151+
def zero(initialValue: Long): Long = 0L
152+
}
153+
154+
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
155+
def addInPlace(t1: Float, t2: Float): Float = t1 + t2
156+
def zero(initialValue: Float): Float = 0f
157+
}
158+
159+
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
160+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
21+
// This is moved to its own file because many more things will be added to it in SPARK-10620.
22+
private[spark] object InternalAccumulator {
23+
val PEAK_EXECUTION_MEMORY = "peakExecutionMemory"
24+
val TEST_ACCUMULATOR = "testAccumulator"
25+
26+
// For testing only.
27+
// This needs to be a def since we don't want to reuse the same accumulator across stages.
28+
private def maybeTestAccumulator: Option[Accumulator[Long]] = {
29+
if (sys.props.contains("spark.testing")) {
30+
Some(new Accumulator(
31+
0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true))
32+
} else {
33+
None
34+
}
35+
}
36+
37+
/**
38+
* Accumulators for tracking internal metrics.
39+
*
40+
* These accumulators are created with the stage such that all tasks in the stage will
41+
* add to the same set of accumulators. We do this to report the distribution of accumulator
42+
* values across all tasks within each stage.
43+
*/
44+
def create(sc: SparkContext): Seq[Accumulator[Long]] = {
45+
val internalAccumulators = Seq(
46+
// Execution memory refers to the memory used by internal data structures created
47+
// during shuffles, aggregations and joins. The value of this accumulator should be
48+
// approximately the sum of the peak sizes across all such data structures created
49+
// in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
50+
new Accumulator(
51+
0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
52+
) ++ maybeTestAccumulator.toSeq
53+
internalAccumulators.foreach { accumulator =>
54+
sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
55+
}
56+
internalAccumulators
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.executor
19+
20+
import org.apache.spark.annotation.DeveloperApi
21+
22+
23+
/**
24+
* :: DeveloperApi ::
25+
* Method by which input data was read. Network means that the data was read over the network
26+
* from a remote block manager (which may have stored the data on-disk or in-memory).
27+
*/
28+
@DeveloperApi
29+
object DataReadMethod extends Enumeration with Serializable {
30+
type DataReadMethod = Value
31+
val Memory, Disk, Hadoop, Network = Value
32+
}
33+
34+
35+
/**
36+
* :: DeveloperApi ::
37+
* Metrics about reading input data.
38+
*/
39+
@DeveloperApi
40+
case class InputMetrics(readMethod: DataReadMethod.Value) {
41+
42+
/**
43+
* This is volatile so that it is visible to the updater thread.
44+
*/
45+
@volatile @transient var bytesReadCallback: Option[() => Long] = None
46+
47+
/**
48+
* Total bytes read.
49+
*/
50+
private var _bytesRead: Long = _
51+
def bytesRead: Long = _bytesRead
52+
def incBytesRead(bytes: Long): Unit = _bytesRead += bytes
53+
54+
/**
55+
* Total records read.
56+
*/
57+
private var _recordsRead: Long = _
58+
def recordsRead: Long = _recordsRead
59+
def incRecordsRead(records: Long): Unit = _recordsRead += records
60+
61+
/**
62+
* Invoke the bytesReadCallback and mutate bytesRead.
63+
*/
64+
def updateBytesRead() {
65+
bytesReadCallback.foreach { c =>
66+
_bytesRead = c()
67+
}
68+
}
69+
70+
/**
71+
* Register a function that can be called to get up-to-date information on how many bytes the task
72+
* has read from an input source.
73+
*/
74+
def setBytesReadCallback(f: Option[() => Long]) {
75+
bytesReadCallback = f
76+
}
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.executor
19+
20+
import org.apache.spark.annotation.DeveloperApi
21+
22+
23+
/**
24+
* :: DeveloperApi ::
25+
* Method by which output data was written.
26+
*/
27+
@DeveloperApi
28+
object DataWriteMethod extends Enumeration with Serializable {
29+
type DataWriteMethod = Value
30+
val Hadoop = Value
31+
}
32+
33+
34+
/**
35+
* :: DeveloperApi ::
36+
* Metrics about writing output data.
37+
*/
38+
@DeveloperApi
39+
case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
40+
/**
41+
* Total bytes written
42+
*/
43+
private var _bytesWritten: Long = _
44+
def bytesWritten: Long = _bytesWritten
45+
private[spark] def setBytesWritten(value : Long): Unit = _bytesWritten = value
46+
47+
/**
48+
* Total records written
49+
*/
50+
private var _recordsWritten: Long = 0L
51+
def recordsWritten: Long = _recordsWritten
52+
private[spark] def setRecordsWritten(value: Long): Unit = _recordsWritten = value
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.executor
19+
20+
import org.apache.spark.annotation.DeveloperApi
21+
22+
23+
/**
24+
* :: DeveloperApi ::
25+
* Metrics pertaining to shuffle data read in a given task.
26+
*/
27+
@DeveloperApi
28+
class ShuffleReadMetrics extends Serializable {
29+
/**
30+
* Number of remote blocks fetched in this shuffle by this task
31+
*/
32+
private var _remoteBlocksFetched: Int = _
33+
def remoteBlocksFetched: Int = _remoteBlocksFetched
34+
private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
35+
private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
36+
37+
/**
38+
* Number of local blocks fetched in this shuffle by this task
39+
*/
40+
private var _localBlocksFetched: Int = _
41+
def localBlocksFetched: Int = _localBlocksFetched
42+
private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
43+
private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
44+
45+
/**
46+
* Time the task spent waiting for remote shuffle blocks. This only includes the time
47+
* blocking on shuffle input data. For instance if block B is being fetched while the task is
48+
* still not finished processing block A, it is not considered to be blocking on block B.
49+
*/
50+
private var _fetchWaitTime: Long = _
51+
def fetchWaitTime: Long = _fetchWaitTime
52+
private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
53+
private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
54+
55+
/**
56+
* Total number of remote bytes read from the shuffle by this task
57+
*/
58+
private var _remoteBytesRead: Long = _
59+
def remoteBytesRead: Long = _remoteBytesRead
60+
private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
61+
private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
62+
63+
/**
64+
* Shuffle data that was read from the local disk (as opposed to from a remote executor).
65+
*/
66+
private var _localBytesRead: Long = _
67+
def localBytesRead: Long = _localBytesRead
68+
private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value
69+
70+
/**
71+
* Total bytes fetched in the shuffle by this task (both remote and local).
72+
*/
73+
def totalBytesRead: Long = _remoteBytesRead + _localBytesRead
74+
75+
/**
76+
* Number of blocks fetched in this shuffle by this task (remote or local)
77+
*/
78+
def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched
79+
80+
/**
81+
* Total number of records read from the shuffle by this task
82+
*/
83+
private var _recordsRead: Long = _
84+
def recordsRead: Long = _recordsRead
85+
private[spark] def incRecordsRead(value: Long) = _recordsRead += value
86+
private[spark] def decRecordsRead(value: Long) = _recordsRead -= value
87+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.executor
19+
20+
import org.apache.spark.annotation.DeveloperApi
21+
22+
23+
/**
24+
* :: DeveloperApi ::
25+
* Metrics pertaining to shuffle data written in a given task.
26+
*/
27+
@DeveloperApi
28+
class ShuffleWriteMetrics extends Serializable {
29+
/**
30+
* Number of bytes written for the shuffle by this task
31+
*/
32+
@volatile private var _shuffleBytesWritten: Long = _
33+
def shuffleBytesWritten: Long = _shuffleBytesWritten
34+
private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
35+
private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
36+
37+
/**
38+
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
39+
*/
40+
@volatile private var _shuffleWriteTime: Long = _
41+
def shuffleWriteTime: Long = _shuffleWriteTime
42+
private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
43+
private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
44+
45+
/**
46+
* Total number of records written to the shuffle by this task
47+
*/
48+
@volatile private var _shuffleRecordsWritten: Long = _
49+
def shuffleRecordsWritten: Long = _shuffleRecordsWritten
50+
private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
51+
private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
52+
private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value
53+
}

‎core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

+2-184
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.executor.DataReadMethod.DataReadMethod
2727
import org.apache.spark.storage.{BlockId, BlockStatus}
2828
import org.apache.spark.util.Utils
2929

30+
3031
/**
3132
* :: DeveloperApi ::
3233
* Metrics tracked during the execution of a task.
@@ -241,6 +242,7 @@ class TaskMetrics extends Serializable {
241242
}
242243
}
243244

245+
244246
private[spark] object TaskMetrics {
245247
private val hostNameCache = new ConcurrentHashMap[String, String]()
246248

@@ -251,187 +253,3 @@ private[spark] object TaskMetrics {
251253
if (canonicalHost != null) canonicalHost else host
252254
}
253255
}
254-
255-
/**
256-
* :: DeveloperApi ::
257-
* Method by which input data was read. Network means that the data was read over the network
258-
* from a remote block manager (which may have stored the data on-disk or in-memory).
259-
*/
260-
@DeveloperApi
261-
object DataReadMethod extends Enumeration with Serializable {
262-
type DataReadMethod = Value
263-
val Memory, Disk, Hadoop, Network = Value
264-
}
265-
266-
/**
267-
* :: DeveloperApi ::
268-
* Method by which output data was written.
269-
*/
270-
@DeveloperApi
271-
object DataWriteMethod extends Enumeration with Serializable {
272-
type DataWriteMethod = Value
273-
val Hadoop = Value
274-
}
275-
276-
/**
277-
* :: DeveloperApi ::
278-
* Metrics about reading input data.
279-
*/
280-
@DeveloperApi
281-
case class InputMetrics(readMethod: DataReadMethod.Value) {
282-
283-
/**
284-
* This is volatile so that it is visible to the updater thread.
285-
*/
286-
@volatile @transient var bytesReadCallback: Option[() => Long] = None
287-
288-
/**
289-
* Total bytes read.
290-
*/
291-
private var _bytesRead: Long = _
292-
def bytesRead: Long = _bytesRead
293-
def incBytesRead(bytes: Long): Unit = _bytesRead += bytes
294-
295-
/**
296-
* Total records read.
297-
*/
298-
private var _recordsRead: Long = _
299-
def recordsRead: Long = _recordsRead
300-
def incRecordsRead(records: Long): Unit = _recordsRead += records
301-
302-
/**
303-
* Invoke the bytesReadCallback and mutate bytesRead.
304-
*/
305-
def updateBytesRead() {
306-
bytesReadCallback.foreach { c =>
307-
_bytesRead = c()
308-
}
309-
}
310-
311-
/**
312-
* Register a function that can be called to get up-to-date information on how many bytes the task
313-
* has read from an input source.
314-
*/
315-
def setBytesReadCallback(f: Option[() => Long]) {
316-
bytesReadCallback = f
317-
}
318-
}
319-
320-
/**
321-
* :: DeveloperApi ::
322-
* Metrics about writing output data.
323-
*/
324-
@DeveloperApi
325-
case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
326-
/**
327-
* Total bytes written
328-
*/
329-
private var _bytesWritten: Long = _
330-
def bytesWritten: Long = _bytesWritten
331-
private[spark] def setBytesWritten(value : Long): Unit = _bytesWritten = value
332-
333-
/**
334-
* Total records written
335-
*/
336-
private var _recordsWritten: Long = 0L
337-
def recordsWritten: Long = _recordsWritten
338-
private[spark] def setRecordsWritten(value: Long): Unit = _recordsWritten = value
339-
}
340-
341-
/**
342-
* :: DeveloperApi ::
343-
* Metrics pertaining to shuffle data read in a given task.
344-
*/
345-
@DeveloperApi
346-
class ShuffleReadMetrics extends Serializable {
347-
/**
348-
* Number of remote blocks fetched in this shuffle by this task
349-
*/
350-
private var _remoteBlocksFetched: Int = _
351-
def remoteBlocksFetched: Int = _remoteBlocksFetched
352-
private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
353-
private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
354-
355-
/**
356-
* Number of local blocks fetched in this shuffle by this task
357-
*/
358-
private var _localBlocksFetched: Int = _
359-
def localBlocksFetched: Int = _localBlocksFetched
360-
private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
361-
private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
362-
363-
/**
364-
* Time the task spent waiting for remote shuffle blocks. This only includes the time
365-
* blocking on shuffle input data. For instance if block B is being fetched while the task is
366-
* still not finished processing block A, it is not considered to be blocking on block B.
367-
*/
368-
private var _fetchWaitTime: Long = _
369-
def fetchWaitTime: Long = _fetchWaitTime
370-
private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
371-
private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
372-
373-
/**
374-
* Total number of remote bytes read from the shuffle by this task
375-
*/
376-
private var _remoteBytesRead: Long = _
377-
def remoteBytesRead: Long = _remoteBytesRead
378-
private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
379-
private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
380-
381-
/**
382-
* Shuffle data that was read from the local disk (as opposed to from a remote executor).
383-
*/
384-
private var _localBytesRead: Long = _
385-
def localBytesRead: Long = _localBytesRead
386-
private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value
387-
388-
/**
389-
* Total bytes fetched in the shuffle by this task (both remote and local).
390-
*/
391-
def totalBytesRead: Long = _remoteBytesRead + _localBytesRead
392-
393-
/**
394-
* Number of blocks fetched in this shuffle by this task (remote or local)
395-
*/
396-
def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched
397-
398-
/**
399-
* Total number of records read from the shuffle by this task
400-
*/
401-
private var _recordsRead: Long = _
402-
def recordsRead: Long = _recordsRead
403-
private[spark] def incRecordsRead(value: Long) = _recordsRead += value
404-
private[spark] def decRecordsRead(value: Long) = _recordsRead -= value
405-
}
406-
407-
/**
408-
* :: DeveloperApi ::
409-
* Metrics pertaining to shuffle data written in a given task.
410-
*/
411-
@DeveloperApi
412-
class ShuffleWriteMetrics extends Serializable {
413-
/**
414-
* Number of bytes written for the shuffle by this task
415-
*/
416-
@volatile private var _shuffleBytesWritten: Long = _
417-
def shuffleBytesWritten: Long = _shuffleBytesWritten
418-
private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
419-
private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
420-
421-
/**
422-
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
423-
*/
424-
@volatile private var _shuffleWriteTime: Long = _
425-
def shuffleWriteTime: Long = _shuffleWriteTime
426-
private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
427-
private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
428-
429-
/**
430-
* Total number of records written to the shuffle by this task
431-
*/
432-
@volatile private var _shuffleRecordsWritten: Long = _
433-
def shuffleRecordsWritten: Long = _shuffleRecordsWritten
434-
private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
435-
private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
436-
private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value
437-
}

0 commit comments

Comments
 (0)
Please sign in to comment.