Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10620][WIP] Migrate TaskMetrics to accumulators #10717

Closed
wants to merge 72 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
8d57657
Remove unused methods or replace them
Dec 30, 2015
1ad2868
Implement initial framework to migrate metrics to accums
Jan 4, 2016
a4ca6b2
Migrate a few more easy metrics
Jan 4, 2016
373898e
ShuffleReadMetrics + namespacing accumulators
Jan 4, 2016
e74632c
General code cleanup
Jan 5, 2016
7e74bf3
ShuffleWriteMetrics
Jan 5, 2016
396088d
OutputMetrics
Jan 5, 2016
0404e3e
InputMetrics
Jan 5, 2016
17becb0
Fix JsonProtocol + JsonProtocolSuite
Jan 6, 2016
809a93a
Fix tests where TaskMetrics had no accumulators
Jan 6, 2016
32ba9e3
Rename a few shuffle write metrics for consistency
Jan 6, 2016
78fb33e
Fix metrics in local mode (tests)
Jan 6, 2016
8117898
Fix "harmless" exception in peak memory tests
Jan 6, 2016
6bd9c0a
Simplify internal accumulator update mechanism
Jan 6, 2016
ed29328
Fix tests
Jan 6, 2016
2011912
Clean up
Jan 6, 2016
c3de4f0
Merge branch 'master' of github.com:apache/spark into task-metrics-to…
Jan 6, 2016
9222f11
Do not send TaskMetrics from executors to driver
Jan 7, 2016
2add53f
Restore accumulator serialization semantics
Jan 7, 2016
afe957c
Fix semantics of accumulators when tasks fail
Jan 7, 2016
c7240f3
Fix a few more tests
Jan 7, 2016
fa086c3
Fix SQL UI
Jan 8, 2016
361442e
Clean up: lift odd unique name requirement
Jan 8, 2016
5aa6aa1
Move smaller metrics classes to their own files
Jan 8, 2016
7118be5
Fix SQLQuerySuite
Jan 8, 2016
176e91d
Remove unused hostname from TaskMetrics
Jan 8, 2016
7939e1c
Reinitialize OutputMetrics et al during reconstruction
Jan 8, 2016
c029f62
Fix *ShuffleSuite
Jan 8, 2016
b3c51dd
Fix DAGSchedulerSuite
Jan 9, 2016
d531f3f
Simplify accumulator update code a little
Jan 9, 2016
8391ef8
Track updated blocks as an accumulator
Jan 9, 2016
972336b
Minor clean ups + docs
Jan 9, 2016
476394a
Replace all accumulator updates with Seq[AccumulableInfo]
Jan 11, 2016
fbaf32f
Always send Seq[AccumulableInfo] instead of TaskMetrics
Jan 11, 2016
08b6c86
Fix JsonProtocolSuite + cleanup
Jan 12, 2016
c1db008
More cleanups
Jan 12, 2016
61fc9e1
Merge branch 'master' of github.com:apache/spark into task-metrics-to…
Jan 12, 2016
4d681fa
Fix style
Jan 12, 2016
b00318b
Fix MiMa
Jan 12, 2016
67153e6
Merge branch 'master' of github.com:apache/spark into task-metrics-to…
Jan 12, 2016
e060ae2
Fix MiMa
Jan 12, 2016
c94420b
Rename updatedBlocks -> updatedBlockStatuses
Jan 12, 2016
94783d0
Fix wrong comment by removing it :)
Jan 12, 2016
b271205
Fix StringAccumulatorParam semantics
Jan 12, 2016
8065b25
Relax scheduler dependency order + simplify some code
Jan 12, 2016
63a3660
Merge github.com:apache/spark into task-metrics-to-accums
Jan 14, 2016
596348f
Fix HistoryServerSuite
andrewor14 Jan 14, 2016
8b99071
Fix TaskMetricsSuite
Jan 14, 2016
dd6d8ab
Fix JsonProtocolSuite
Jan 14, 2016
0e7f660
Merge branch 'master' of github.com:apache/spark into task-metrics-to…
Jan 14, 2016
471429c
Fix TaskContextSuite + add some param docs
Jan 14, 2016
28fae0f
Fix StagePageSuite
Jan 14, 2016
1b027cb
Fix ExternalSorterSuite
Jan 14, 2016
5d09221
Fix SQLMetricsSuite
Jan 14, 2016
4bfbc7c
Fix PartitionBatchPruningSuite
Jan 14, 2016
0185b72
Clean up Accumulators object a little + some docs
Jan 14, 2016
b31862a
Fix DAGSchedulerSuite, trivial omission
Jan 14, 2016
3dcc4e1
Fix style + default
Jan 15, 2016
36a5ca8
Split Accumulators.scala into 3 files
Jan 15, 2016
fe451e5
Fix MiMa
Jan 15, 2016
f900068
Minor cleanups
Jan 15, 2016
5ca949f
Fix style
Jan 15, 2016
109fec3
Add InternalAccumulatorsSuite
Jan 15, 2016
a393ade
Fix JsonProtocolSuite
Jan 15, 2016
ce10ea1
Merge branch 'master' of github.com:apache/spark into task-metrics-to…
Jan 15, 2016
e59f368
Add more tests to AccumulatorSuite
Jan 15, 2016
b17e848
Fix style
Jan 15, 2016
06b958e
Add test for accum updates during failures
Jan 15, 2016
b58f2e6
Merge branch 'master' of github.com:apache/spark into task-metrics-to…
Jan 15, 2016
28346e5
Add tests for TaskMetrics
Jan 15, 2016
00a12a4
Add fine-grained test for collecting accumulators during failures
Jan 16, 2016
ed9de9c
Fix style
Jan 18, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ public BypassMergeSortShuffleWriter(
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
this.writeMetrics = new ShuffleWriteMetrics();
taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
this.serializer = Serializer.getSerializer(dep.serializer());
this.shuffleBlockResolver = shuffleBlockResolver;
}
Expand Down Expand Up @@ -143,7 +142,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);

while (records.hasNext()) {
final Product2<K, V> record = records.next();
Expand Down Expand Up @@ -203,7 +202,7 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
threwException = false;
} finally {
Closeables.close(out, threwException);
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
return lengths;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
// Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
// Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
// This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten());
writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten());
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import scala.Option;
import scala.Product2;
import scala.collection.JavaConverters;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

Expand Down Expand Up @@ -119,8 +118,7 @@ public UnsafeShuffleWriter(
this.shuffleId = dep.shuffleId();
this.serializer = Serializer.getSerializer(dep.serializer()).newInstance();
this.partitioner = dep.partitioner();
this.writeMetrics = new ShuffleWriteMetrics();
taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
this.taskContext = taskContext;
this.sparkConf = sparkConf;
this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
Expand Down Expand Up @@ -298,8 +296,8 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti
// final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
// to be counted as shuffle write, but this will lead to double-counting of the final
// SpillInfo's bytes.
writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length());
writeMetrics.incShuffleBytesWritten(outputFile.length());
writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
writeMetrics.incBytesWritten(outputFile.length());
return partitionLengths;
}
} catch (IOException e) {
Expand Down Expand Up @@ -411,7 +409,7 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
spillInputChannelPositions[i] += actualBytesTransferred;
bytesToTransfer -= actualBytesTransferred;
}
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
bytesWrittenToMergedFile += partitionLengthInSpill;
partitionLengths[partition] += partitionLengthInSpill;
}
Expand Down Expand Up @@ -445,13 +443,7 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
@Override
public Option<MapStatus> stop(boolean success) {
try {
// Update task metrics from accumulators (null in UnsafeShuffleWriterSuite)
Map<String, Accumulator<Object>> internalAccumulators =
taskContext.internalMetricsToAccumulators();
if (internalAccumulators != null) {
internalAccumulators.apply(InternalAccumulator.PEAK_EXECUTION_MEMORY())
.add(getPeakMemoryUsedBytes());
}
taskContext.taskMetrics().incPeakExecutionMemory(getPeakMemoryUsedBytes());

if (stopping) {
return Option.apply(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,34 +42,34 @@ public TimeTrackingOutputStream(ShuffleWriteMetrics writeMetrics, OutputStream o
public void write(int b) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void write(byte[] b) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b, off, len);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void flush() throws IOException {
final long startTime = System.nanoTime();
outputStream.flush();
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void close() throws IOException {
final long startTime = System.nanoTime();
outputStream.close();
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ private UnsafeExternalSorter(
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.fileBufferSizeBytes = 32 * 1024;
// TODO: metrics tracking + integration with shuffle write metrics
// need to connect the write metrics to task metrics so we count the spill IO somewhere.
this.writeMetrics = new ShuffleWriteMetrics();
this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();

if (existingInMemorySorter == null) {
this.inMemSorter = new UnsafeInMemorySorter(
Expand Down
150 changes: 150 additions & 0 deletions core/src/main/scala/org/apache/spark/AccumulableParam.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import java.io.Serializable

import scala.collection.generic.Growable
import scala.reflect.ClassTag

import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.storage.{BlockId, BlockStatus}


/**
* Helper object defining how to accumulate values of a particular type. An implicit
* AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type.
*
* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
trait AccumulableParam[R, T] extends Serializable {
/**
* Add additional data to the accumulator value. Is allowed to modify and return `r`
* for efficiency (to avoid allocating objects).
*
* @param r the current value of the accumulator
* @param t the data to be added to the accumulator
* @return the new value of the accumulator
*/
def addAccumulator(r: R, t: T): R

/**
* Merge two accumulated values together. Is allowed to modify and return the first value
* for efficiency (to avoid allocating objects).
*
* @param r1 one set of accumulated data
* @param r2 another set of accumulated data
* @return both data sets merged together
*/
def addInPlace(r1: R, r2: R): R

/**
* Return the "zero" (identity) value for an accumulator type, given its initial value. For
* example, if R was a vector of N dimensions, this would return a vector of N zeroes.
*/
def zero(initialValue: R): R
}


private[spark] class
GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
extends AccumulableParam[R, T] {

def addAccumulator(growable: R, elem: T): R = {
growable += elem
growable
}

def addInPlace(t1: R, t2: R): R = {
t1 ++= t2
t1
}

def zero(initialValue: R): R = {
// We need to clone initialValue, but it's hard to specify that R should also be Cloneable.
// Instead we'll serialize it to a buffer and load it back.
val ser = new JavaSerializer(new SparkConf(false)).newInstance()
val copy = ser.deserialize[R](ser.serialize(initialValue))
copy.clear() // In case it contained stuff
copy
}
}


/**
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
* in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
* available when you create Accumulators of a specific type.
*
* @tparam T type of value to accumulate
*/
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T): T = {
addInPlace(t1, t2)
}
}


object AccumulatorParam {

// The following implicit objects were in SparkContext before 1.2 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, as there are duplicate codes in SparkContext for backward
// compatibility, please update them accordingly if you modify the following implicit objects.

implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double): Double = 0.0
}

implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int): Int = 0
}

implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long): Long = t1 + t2
def zero(initialValue: Long): Long = 0L
}

implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float): Float = t1 + t2
def zero(initialValue: Float): Float = 0f
}

// Note: when merging values, this param just adopts the newer value. This is used only
// internally for things that shouldn't really be accumulated across tasks, like input
// read method, which should be the same across all tasks in the same stage.
private[spark] object StringAccumulatorParam extends AccumulatorParam[String] {
def addInPlace(t1: String, t2: String): String = t2
def zero(initialValue: String): String = ""
}

// Note: this is expensive as it makes a copy of the list every time the caller adds an item.
// A better way to use this is to first accumulate the values yourself then them all at once.
private[spark] class ListAccumulatorParam[T] extends AccumulatorParam[Seq[T]] {
def addInPlace(t1: Seq[T], t2: Seq[T]): Seq[T] = t1 ++ t2
def zero(initialValue: Seq[T]): Seq[T] = Seq.empty[T]
}

// For the internal metric that records what blocks are updated in a particular task
private[spark] object UpdatedBlockStatusesAccumulatorParam
extends ListAccumulatorParam[(BlockId, BlockStatus)]

}
Loading