Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xuanyuanking committed Dec 7, 2018
1 parent 6378a3d commit bc2c4f1
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ import org.apache.spark.scheduler.MapStatus
private[spark] class ShuffleWriteProcessor extends Serializable with Logging {

/**
* Create a [[ShuffleWriteMetricsReporter]] from the task context, always return a proxy
* reporter for both local accumulator and original reporter updating. As the reporter is a
* Create a [[ShuffleWriteMetricsReporter]] from the task context. As the reporter is a
* per-row operator, here need a careful consideration on performance.
*/
def createMetricsReporter(context: TaskContext): ShuffleWriteMetricsReporter = {
protected def createMetricsReporter(context: TaskContext): ShuffleWriteMetricsReporter = {
context.taskMetrics().shuffleWriteMetrics
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ object ShuffleExchangeExec {
*/
def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): ShuffleWriteProcessor = {
new ShuffleWriteProcessor {
override def createMetricsReporter(
override protected def createMetricsReporter(
context: TaskContext): ShuffleWriteMetricsReporter = {
new SQLShuffleWriteMetricsReporter(context.taskMetrics().shuffleWriteMetrics, metrics)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ object SQLMetrics {
private val SUM_METRIC = "sum"
private val SIZE_METRIC = "size"
private val TIMING_METRIC = "timing"
private val NORMALIZE_TIMING_METRIC = "normalizeTiming"
private val NS_TIMING_METRIC = "nsTiming"
private val AVERAGE_METRIC = "average"

private val baseForAvgMetric: Int = 10
Expand Down Expand Up @@ -126,7 +126,7 @@ object SQLMetrics {

def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = {
// Same with createTimingMetric, just normalize the unit of time to millisecond.
val acc = new SQLMetric(NORMALIZE_TIMING_METRIC, -1)
val acc = new SQLMetric(NS_TIMING_METRIC, -1)
acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false)
acc
}
Expand Down Expand Up @@ -173,7 +173,7 @@ object SQLMetrics {
Utils.bytesToString
} else if (metricsType == TIMING_METRIC) {
Utils.msDurationToString
} else if (metricsType == NORMALIZE_TIMING_METRIC) {
} else if (metricsType == NS_TIMING_METRIC) {
duration => Utils.msDurationToString(duration.nanos.toMillis)
} else {
throw new IllegalStateException("unexpected metrics type: " + metricsType)
Expand Down

0 comments on commit bc2c4f1

Please sign in to comment.