-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-35259][SHUFFLE] Update ExternalBlockHandler Timer variables to expose correct units #33116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4ba1a30
831c196
4eb1bc0
bfbe874
d62dd57
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,124 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.network.util; | ||
|
|
||
| import java.io.OutputStream; | ||
| import java.io.PrintWriter; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| import com.codahale.metrics.Clock; | ||
| import com.codahale.metrics.ExponentiallyDecayingReservoir; | ||
| import com.codahale.metrics.Snapshot; | ||
| import com.codahale.metrics.Timer; | ||
|
|
||
| /** | ||
| * A custom version of a {@link Timer} which allows for specifying a specific {@link TimeUnit} to | ||
| * be used when accessing timing values via {@link #getSnapshot()}. Normally, though the | ||
| * {@link #update(long, TimeUnit)} method requires a unit, the extraction methods on the snapshot | ||
| * do not specify a unit, and always return nanoseconds. It can be useful to specify that a timer | ||
| * should use a different unit for its snapshot. Note that internally, all values are still stored | ||
| * with nanosecond-precision; it is only before being returned to the caller that the nanosecond | ||
| * value is converted to the custom time unit. | ||
| */ | ||
| public class TimerWithCustomTimeUnit extends Timer { | ||
|
|
||
| private final TimeUnit timeUnit; | ||
| private final double nanosPerUnit; | ||
|
|
||
| public TimerWithCustomTimeUnit(TimeUnit timeUnit) { | ||
| this(timeUnit, Clock.defaultClock()); | ||
| } | ||
|
|
||
| TimerWithCustomTimeUnit(TimeUnit timeUnit, Clock clock) { | ||
| super(new ExponentiallyDecayingReservoir(), clock); | ||
| this.timeUnit = timeUnit; | ||
| this.nanosPerUnit = timeUnit.toNanos(1); | ||
| } | ||
|
|
||
| @Override | ||
| public Snapshot getSnapshot() { | ||
| return new SnapshotWithCustomTimeUnit(super.getSnapshot()); | ||
| } | ||
|
|
||
| private double toUnit(double nanos) { | ||
| // TimeUnit.convert() truncates (loses precision), so floating-point division is used instead | ||
| return nanos / nanosPerUnit; | ||
| } | ||
|
|
||
| private long toUnit(long nanos) { | ||
| return timeUnit.convert(nanos, TimeUnit.NANOSECONDS); | ||
| } | ||
|
|
||
| private class SnapshotWithCustomTimeUnit extends Snapshot { | ||
|
|
||
| private final Snapshot wrappedSnapshot; | ||
|
|
||
| SnapshotWithCustomTimeUnit(Snapshot wrappedSnapshot) { | ||
| this.wrappedSnapshot = wrappedSnapshot; | ||
| } | ||
|
|
||
| @Override | ||
| public double getValue(double v) { | ||
| return toUnit(wrappedSnapshot.getValue(v)); | ||
| } | ||
|
|
||
| @Override | ||
| public long[] getValues() { | ||
| long[] nanoValues = wrappedSnapshot.getValues(); | ||
| long[] customUnitValues = new long[nanoValues.length]; | ||
| for (int i = 0; i < nanoValues.length; i++) { | ||
| customUnitValues[i] = toUnit(nanoValues[i]); | ||
| } | ||
| return customUnitValues; | ||
| } | ||
|
|
||
| @Override | ||
| public int size() { | ||
| return wrappedSnapshot.size(); | ||
| } | ||
|
|
||
| @Override | ||
| public long getMax() { | ||
| return toUnit(wrappedSnapshot.getMax()); | ||
| } | ||
|
|
||
| @Override | ||
| public double getMean() { | ||
| return toUnit(wrappedSnapshot.getMean()); | ||
| } | ||
|
|
||
| @Override | ||
| public long getMin() { | ||
| return toUnit(wrappedSnapshot.getMin()); | ||
| } | ||
|
|
||
| @Override | ||
| public double getStdDev() { | ||
| return toUnit(wrappedSnapshot.getStdDev()); | ||
| } | ||
|
|
||
| @Override | ||
| public void dump(OutputStream outputStream) { | ||
| try (PrintWriter writer = new PrintWriter(outputStream)) { | ||
| for (long value : getValues()) { | ||
| writer.println(value); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.network.util; | ||
|
|
||
| import java.time.Duration; | ||
| import java.util.Arrays; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| import com.codahale.metrics.Clock; | ||
| import com.codahale.metrics.Snapshot; | ||
| import com.codahale.metrics.Timer; | ||
| import org.junit.Test; | ||
|
|
||
| import static org.junit.Assert.assertArrayEquals; | ||
| import static org.junit.Assert.assertEquals; | ||
|
|
||
| /** Tests for {@link TimerWithCustomTimeUnit} */ | ||
| public class TimerWithCustomUnitSuite { | ||
|
|
||
| private static final double EPSILON = 1.0 / 1_000_000_000; | ||
|
|
||
| @Test | ||
| public void testTimerWithMillisecondTimeUnit() { | ||
| testTimerWithCustomTimeUnit(TimeUnit.MILLISECONDS); | ||
| } | ||
|
|
||
| @Test | ||
| public void testTimerWithNanosecondTimeUnit() { | ||
| testTimerWithCustomTimeUnit(TimeUnit.NANOSECONDS); | ||
| } | ||
|
|
||
| private void testTimerWithCustomTimeUnit(TimeUnit timeUnit) { | ||
| Timer timer = new TimerWithCustomTimeUnit(timeUnit); | ||
| Duration[] durations = { | ||
| Duration.ofNanos(1), | ||
| Duration.ofMillis(1), | ||
| Duration.ofMillis(5), | ||
| Duration.ofMillis(100), | ||
| Duration.ofSeconds(10) | ||
| }; | ||
| Arrays.stream(durations).forEach(timer::update); | ||
|
|
||
| Snapshot snapshot = timer.getSnapshot(); | ||
| assertEquals(toTimeUnit(durations[0], timeUnit), snapshot.getMin()); | ||
| assertEquals(toTimeUnitFloating(durations[0], timeUnit), snapshot.getValue(0), EPSILON); | ||
| assertEquals(toTimeUnitFloating(durations[2], timeUnit), snapshot.getMedian(), EPSILON); | ||
| assertEquals(toTimeUnitFloating(durations[3], timeUnit), snapshot.get75thPercentile(), EPSILON); | ||
| assertEquals(toTimeUnit(durations[4], timeUnit), snapshot.getMax()); | ||
|
|
||
| assertArrayEquals(Arrays.stream(durations).mapToLong(d -> toTimeUnit(d, timeUnit)).toArray(), | ||
| snapshot.getValues()); | ||
| double total = Arrays.stream(durations).mapToDouble(d -> toTimeUnitFloating(d, timeUnit)).sum(); | ||
| assertEquals(total / durations.length, snapshot.getMean(), EPSILON); | ||
| } | ||
|
|
||
| @Test | ||
| public void testTimingViaContext() { | ||
| ManualClock clock = new ManualClock(); | ||
| Timer timer = new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS, clock); | ||
| Duration[] durations = { Duration.ofNanos(1), Duration.ofMillis(100), Duration.ofMillis(1000) }; | ||
| for (Duration d : durations) { | ||
| Timer.Context context = timer.time(); | ||
| clock.advance(toTimeUnit(d, TimeUnit.NANOSECONDS)); | ||
| context.stop(); | ||
| } | ||
|
|
||
| Snapshot snapshot = timer.getSnapshot(); | ||
| assertEquals(0, snapshot.getMin()); | ||
| assertEquals(100, snapshot.getMedian(), EPSILON); | ||
| assertEquals(1000, snapshot.getMax(), EPSILON); | ||
| } | ||
|
|
||
| private static long toTimeUnit(Duration duration, TimeUnit timeUnit) { | ||
| return timeUnit.convert(duration.toNanos(), TimeUnit.NANOSECONDS); | ||
| } | ||
|
|
||
| private static double toTimeUnitFloating(Duration duration, TimeUnit timeUnit) { | ||
| return ((double) duration.toNanos()) / timeUnit.toNanos(1); | ||
| } | ||
|
|
||
| private static class ManualClock extends Clock { | ||
|
|
||
| private long currTick = 1; | ||
|
|
||
| void advance(long nanos) { | ||
| currTick += nanos; | ||
| } | ||
|
|
||
| @Override | ||
| public long getTick() { | ||
| return currTick; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |
| import java.util.Iterator; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.function.Function; | ||
|
|
||
| import com.codahale.metrics.Gauge; | ||
|
|
@@ -49,6 +50,7 @@ | |
| import org.apache.spark.network.server.RpcHandler; | ||
| import org.apache.spark.network.server.StreamManager; | ||
| import org.apache.spark.network.shuffle.protocol.*; | ||
| import org.apache.spark.network.util.TimerWithCustomTimeUnit; | ||
| import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; | ||
| import org.apache.spark.network.util.TransportConf; | ||
|
|
||
|
|
@@ -299,13 +301,17 @@ private void checkAuth(TransportClient client, String appId) { | |
| public class ShuffleMetrics implements MetricSet { | ||
| private final Map<String, Metric> allMetrics; | ||
| // Time latency for open block request in ms | ||
| private final Timer openBlockRequestLatencyMillis = new Timer(); | ||
| private final Timer openBlockRequestLatencyMillis = | ||
| new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS); | ||
xkrogen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // Time latency for executor registration latency in ms | ||
| private final Timer registerExecutorRequestLatencyMillis = new Timer(); | ||
| private final Timer registerExecutorRequestLatencyMillis = | ||
| new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS); | ||
| // Time latency for processing fetch merged blocks meta request latency in ms | ||
| private final Timer fetchMergedBlocksMetaLatencyMillis = new Timer(); | ||
| private final Timer fetchMergedBlocksMetaLatencyMillis = | ||
| new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| // Time latency for processing finalize shuffle merge request latency in ms | ||
| private final Timer finalizeShuffleMergeLatencyMillis = new Timer(); | ||
| private final Timer finalizeShuffleMergeLatencyMillis = | ||
| new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS); | ||
| // Block transfer rate in blocks per second | ||
| private final Meter blockTransferRate = new Meter(); | ||
| // Block fetch message rate per second. When using non-batch fetches | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -68,7 +68,6 @@ public static void collectMetric( | |
| // Snapshot inside the Timer provides the information for the operation delay | ||
| Timer t = (Timer) metric; | ||
| Snapshot snapshot = t.getSnapshot(); | ||
| String timingName = name + "_nanos"; | ||
| metricsRecordBuilder | ||
| .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), | ||
| t.getCount()) | ||
|
|
@@ -84,13 +83,13 @@ public static void collectMetric( | |
| .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), | ||
| t.getMeanRate()) | ||
| .addGauge( | ||
| getShuffleServiceMetricsInfoForGenericValue(timingName, "max"), snapshot.getMax()) | ||
| getShuffleServiceMetricsInfoForGenericValue(name, "max"), snapshot.getMax()) | ||
|
||
| .addGauge( | ||
| getShuffleServiceMetricsInfoForGenericValue(timingName, "min"), snapshot.getMin()) | ||
| getShuffleServiceMetricsInfoForGenericValue(name, "min"), snapshot.getMin()) | ||
| .addGauge( | ||
| getShuffleServiceMetricsInfoForGenericValue(timingName, "mean"), snapshot.getMean()) | ||
| getShuffleServiceMetricsInfoForGenericValue(name, "mean"), snapshot.getMean()) | ||
| .addGauge( | ||
| getShuffleServiceMetricsInfoForGenericValue(timingName, "stdDev"), snapshot.getStdDev()); | ||
| getShuffleServiceMetricsInfoForGenericValue(name, "stdDev"), snapshot.getStdDev()); | ||
| for (int percentileThousands : new int[] { 10, 50, 250, 500, 750, 950, 980, 990, 999 }) { | ||
| String percentileStr; | ||
| switch (percentileThousands) { | ||
|
|
@@ -105,7 +104,7 @@ public static void collectMetric( | |
| break; | ||
| } | ||
| metricsRecordBuilder.addGauge( | ||
| getShuffleServiceMetricsInfoForGenericValue(timingName, percentileStr), | ||
| getShuffleServiceMetricsInfoForGenericValue(name, percentileStr), | ||
| snapshot.getValue(percentileThousands / 1000.0)); | ||
| } | ||
| } else if (metric instanceof Meter) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.