From a773d2872b8d730a4e82138962fc776f533f522b Mon Sep 17 00:00:00 2001 From: zentol Date: Fri, 7 Oct 2016 13:02:10 +0200 Subject: [PATCH 1/2] [FLINK-4773] [metrics] [refactor] Rename IOMetricGroup to TaskIOMetricGroup --- .../api/serialization/RecordSerializer.java | 4 ++-- .../serialization/SpanningRecordSerializer.java | 6 +++--- .../io/network/api/writer/RecordWriter.java | 4 ++-- .../partition/consumer/LocalInputChannel.java | 6 +++--- .../partition/consumer/RemoteInputChannel.java | 6 +++--- .../partition/consumer/SingleInputGate.java | 6 +++--- .../partition/consumer/UnknownInputChannel.java | 6 +++--- ...IOMetricGroup.java => TaskIOMetricGroup.java} | 6 +++--- .../runtime/metrics/groups/TaskMetricGroup.java | 10 +++++----- .../consumer/LocalInputChannelTest.java | 6 +++--- .../consumer/RemoteInputChannelTest.java | 6 +++--- .../partition/consumer/SingleInputGateTest.java | 16 ++++++++-------- .../partition/consumer/TestSingleInputGate.java | 2 +- .../partition/consumer/UnionInputGateTest.java | 4 ++-- .../testutils/UnregisteredTaskMetricsGroup.java | 6 +++--- .../runtime/io/StreamInputProcessor.java | 4 ++-- .../runtime/io/StreamTwoInputProcessor.java | 4 ++-- 17 files changed, 51 insertions(+), 51 deletions(-) rename flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/{IOMetricGroup.java => TaskIOMetricGroup.java} (90%) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java index a560bb65e87cb..c76dd0099deb7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java @@ -22,7 +22,7 @@ import java.io.IOException; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -76,5 +76,5 @@ public boolean isFullBuffer() { * * @param metrics metric group */ - void instantiateMetrics(IOMetricGroup metrics); + void instantiateMetrics(TaskIOMetricGroup metrics); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index 9d4f765d81480..7c4d937b58a39 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -25,7 +25,7 @@ import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.metrics.Counter; -import org.apache.flink.runtime.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.util.DataOutputSerializer; @@ -197,7 +197,7 @@ public void setReporter(AccumulatorRegistry.Reporter reporter) { } @Override - public void instantiateMetrics(IOMetricGroup metrics) { - numBytesOut = metrics.getBytesOutCounter(); + public void instantiateMetrics(TaskIOMetricGroup metrics) { + numBytesOut = metrics.getNumBytesOutCounter(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index 422aa652e9c0c..96eea23ed02e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.io.network.api.writer; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; @@ -213,7 +213,7 @@ public void setReporter(AccumulatorRegistry.Reporter reporter) { * Sets the metric group for this RecordWriter. * @param metrics */ - public void setMetricGroup(IOMetricGroup metrics) { + public void setMetricGroup(TaskIOMetricGroup metrics) { for(RecordSerializer serializer : serializers) { serializer.instantiateMetrics(metrics); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index a8aae7edb534d..55ff5394191cc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.runtime.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -68,7 +68,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe ResultPartitionID partitionId, ResultPartitionManager partitionManager, TaskEventDispatcher taskEventDispatcher, - IOMetricGroup metrics) { + TaskIOMetricGroup metrics) { this(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher, 0, 0, metrics); @@ -82,7 +82,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe TaskEventDispatcher taskEventDispatcher, int initialBackoff, int maxBackoff, - IOMetricGroup metrics) { + TaskIOMetricGroup metrics) { super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInLocalCounter()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index a12d2a8f3deca..13a71a98be6a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.runtime.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; @@ -81,7 +81,7 @@ public RemoteInputChannel( ResultPartitionID partitionId, ConnectionID connectionId, ConnectionManager connectionManager, - IOMetricGroup metrics) { + TaskIOMetricGroup metrics) { this(inputGate, channelIndex, partitionId, connectionId, connectionManager, 0, 0, metrics); @@ -95,7 +95,7 @@ public RemoteInputChannel( ConnectionManager connectionManager, int initialBackoff, int maxBackoff, - IOMetricGroup metrics) { + TaskIOMetricGroup metrics) { super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInRemoteCounter()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 0db30eefc1069..f4e4325cc630d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -20,8 +20,8 @@ import com.google.common.collect.Maps; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.taskmanager.TaskActions; -import org.apache.flink.runtime.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; @@ -175,7 +175,7 @@ public SingleInputGate( int consumedSubpartitionIndex, int numberOfInputChannels, TaskActions taskActions, - IOMetricGroup metrics) { + TaskIOMetricGroup metrics) { this.owningTaskName = checkNotNull(owningTaskName); this.jobId = checkNotNull(jobId); @@ -506,7 +506,7 @@ public static SingleInputGate create( InputGateDeploymentDescriptor igdd, NetworkEnvironment networkEnvironment, TaskActions taskActions, - IOMetricGroup metrics) { + TaskIOMetricGroup metrics) { final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java index 27ecc7031c2fb..08b50447f5592 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.runtime.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; @@ -49,7 +49,7 @@ public class UnknownInputChannel extends InputChannel { private final int maxBackoff; - private final IOMetricGroup metrics; + private final TaskIOMetricGroup metrics; public UnknownInputChannel( SingleInputGate gate, @@ -60,7 +60,7 @@ public UnknownInputChannel( ConnectionManager connectionManager, int initialBackoff, int maxBackoff, - IOMetricGroup metrics) { + TaskIOMetricGroup metrics) { super(gate, channelIndex, partitionId, initialBackoff, maxBackoff, null); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java similarity index 90% rename from flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index 8fa61115d3016..a726c2683f146 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -24,13 +24,13 @@ * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is * forwarded to the parent task metric group. */ -public class IOMetricGroup extends ProxyMetricGroup { +public class TaskIOMetricGroup extends ProxyMetricGroup { private final Counter numBytesOut; private final Counter numBytesInLocal; private final Counter numBytesInRemote; - public IOMetricGroup(TaskMetricGroup parent) { + public TaskIOMetricGroup(TaskMetricGroup parent) { super(parent); this.numBytesOut = counter("numBytesOut"); @@ -38,7 +38,7 @@ public IOMetricGroup(TaskMetricGroup parent) { this.numBytesInRemote = counter("numBytesInRemote"); } - public Counter getBytesOutCounter() { + public Counter getNumBytesOutCounter() { return numBytesOut; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java index 5082fd81d913f..75b8bd86cc5b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java @@ -41,7 +41,7 @@ public class TaskMetricGroup extends ComponentMetricGroup operators = new HashMap<>(); - private final IOMetricGroup ioMetrics; + private final TaskIOMetricGroup ioMetrics; /** The execution Id uniquely identifying the executed task represented by this metrics group */ private final AbstractID executionId; @@ -75,7 +75,7 @@ public TaskMetricGroup( this.subtaskIndex = subtaskIndex; this.attemptNumber = attemptNumber; - this.ioMetrics = new IOMetricGroup(this); + this.ioMetrics = new TaskIOMetricGroup(this); } // ------------------------------------------------------------------------ @@ -109,11 +109,11 @@ public int attemptNumber() { } /** - * Returns the IOMetricGroup for this task. + * Returns the TaskIOMetricGroup for this task. * - * @return IOMetricGroup for this task. + * @return TaskIOMetricGroup for this task. */ - public IOMetricGroup getIOMetricGroup() { + public TaskIOMetricGroup getIOMetricGroup() { return ioMetrics; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 75f2bcca08c17..19bb67e94c829 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -277,7 +277,7 @@ private LocalInputChannel createLocalInputChannel( mock(TaskEventDispatcher.class), initialAndMaxRequestBackoff._1(), initialAndMaxRequestBackoff._2(), - new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); } /** @@ -353,7 +353,7 @@ public TestLocalInputChannelConsumer( subpartitionIndex, numberOfInputChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); // Set buffer pool inputGate.setBufferPool(bufferPool); @@ -368,7 +368,7 @@ public TestLocalInputChannelConsumer( consumedPartitionIds[i], partitionManager, taskEventDispatcher, - new UnregisteredTaskMetricsGroup.DummyIOMetricGroup())); + new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup())); } this.numberOfInputChannels = numberOfInputChannels; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index 9a79ff89d7652..2c2f9669a26c8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -249,7 +249,7 @@ public void testOnFailedPartitionRequest() throws Exception { partitionId, mock(ConnectionID.class), connectionManager, - new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); ch.onFailedPartitionRequest(); @@ -269,7 +269,7 @@ public void testProducerFailedException() throws Exception { new ResultPartitionID(), mock(ConnectionID.class), connManager, - new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); ch.onError(new ProducerFailedException(new RuntimeException("Expected test exception."))); @@ -306,6 +306,6 @@ private RemoteInputChannel createRemoteInputChannel( connectionManager, initialAndMaxRequestBackoff._1(), initialAndMaxRequestBackoff._2(), - new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 9e4ab86b26b22..8f9ea9efd3763 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -66,7 +66,7 @@ public class SingleInputGateTest { public void testBasicGetNextLogic() throws Exception { // Setup final SingleInputGate inputGate = new SingleInputGate( - "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); final TestInputChannel[] inputChannels = new TestInputChannel[]{ new TestInputChannel(inputGate, 0), @@ -113,7 +113,7 @@ public void testBackwardsEventWithUninitializedChannel() throws Exception { // Setup reader with one local and one unknown input channel final IntermediateDataSetID resultId = new IntermediateDataSetID(); - final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); final BufferPool bufferPool = mock(BufferPool.class); when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2); @@ -122,12 +122,12 @@ public void testBackwardsEventWithUninitializedChannel() throws Exception { // Local ResultPartitionID localPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()); - InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); // Unknown ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()); - InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); // Set channels inputGate.setInputChannel(localPartitionId.getPartitionId(), local); @@ -169,7 +169,7 @@ public void testUpdateChannelBeforeRequest() throws Exception { 0, 1, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); @@ -182,7 +182,7 @@ public void testUpdateChannelBeforeRequest() throws Exception { new LocalConnectionManager(), 0, 0, - new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown); @@ -212,7 +212,7 @@ public void testReleaseWhilePollingChannel() throws Exception { 0, 1, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); InputChannel unknown = new UnknownInputChannel( inputGate, @@ -223,7 +223,7 @@ public void testReleaseWhilePollingChannel() throws Exception { new LocalConnectionManager(), 0, 0, - new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java index 867c2732ce223..3972917b7b917 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java @@ -67,7 +67,7 @@ public TestSingleInputGate(int numberOfInputChannels, boolean initialize) { 0, numberOfInputChannels, mock(TaskActions.class), - new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); this.inputGate = spy(realGate); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java index 466879e74d2ec..cba3199b31121 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java @@ -44,8 +44,8 @@ public class UnionInputGateTest { public void testBasicGetNextLogic() throws Exception { // Setup final String testTaskName = "Test Task"; - final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); - final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2}); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java index b4a740009439a..ae87085bc976c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java @@ -23,7 +23,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; -import org.apache.flink.runtime.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; @@ -67,8 +67,8 @@ public DummyJobMetricGroup() { } } - public static class DummyIOMetricGroup extends IOMetricGroup { - public DummyIOMetricGroup() { + public static class DummyTaskIOMetricGroup extends TaskIOMetricGroup { + public DummyTaskIOMetricGroup() { super(new UnregisteredTaskMetricsGroup()); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 92b155639ed69..aee0c701a483a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -25,7 +25,7 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; -import org.apache.flink.runtime.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -215,7 +215,7 @@ public void setReporter(AccumulatorRegistry.Reporter reporter) { * * @param metrics metric group */ - public void setMetricGroup(IOMetricGroup metrics) { + public void setMetricGroup(TaskIOMetricGroup metrics) { metrics.gauge("currentLowWatermark", new Gauge() { @Override public Long getValue() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index 660f07e80c6db..075d9e0dcee68 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; -import org.apache.flink.runtime.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -281,7 +281,7 @@ public void setReporter(AccumulatorRegistry.Reporter reporter) { * * @param metrics metric group */ - public void setMetricGroup(IOMetricGroup metrics) { + public void setMetricGroup(TaskIOMetricGroup metrics) { metrics.gauge("currentLowWatermark", new Gauge() { @Override public Long getValue() { From 9364476dd476389d902fc46fee8dbe13db74127b Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 5 Oct 2016 15:04:03 +0200 Subject: [PATCH 2/2] [FLINK-4773] [metrics] [refactor] Introduce OperatorIOMetricGroup --- .../metrics/groups/OperatorIOMetricGroup.java | 44 +++++++++++++++++++ .../metrics/groups/OperatorMetricGroup.java | 13 ++++++ .../AbstractCachedBuildSideJoinDriver.java | 4 +- .../operators/AbstractOuterJoinDriver.java | 4 +- .../operators/AllGroupCombineDriver.java | 4 +- .../runtime/operators/AllReduceDriver.java | 4 +- .../flink/runtime/operators/BatchTask.java | 5 ++- .../runtime/operators/CoGroupDriver.java | 4 +- .../flink/runtime/operators/CrossDriver.java | 16 +++---- .../flink/runtime/operators/DataSinkTask.java | 3 +- .../runtime/operators/DataSourceTask.java | 3 +- .../runtime/operators/FlatMapDriver.java | 4 +- .../runtime/operators/GroupReduceDriver.java | 4 +- .../flink/runtime/operators/JoinDriver.java | 4 +- .../flink/runtime/operators/MapDriver.java | 4 +- .../runtime/operators/MapPartitionDriver.java | 4 +- .../flink/runtime/operators/NoOpDriver.java | 4 +- .../operators/ReduceCombineDriver.java | 4 +- .../flink/runtime/operators/ReduceDriver.java | 4 +- .../flink/runtime/operators/TaskContext.java | 4 +- .../operators/UnionWithTempOperator.java | 4 +- .../operators/chaining/ChainedDriver.java | 8 ++-- .../metrics/groups/OperatorGroupTest.java | 19 +++++++- .../operators/drivers/TestTaskContext.java | 8 ++-- .../testutils/BinaryOperatorTestBase.java | 5 ++- .../operators/testutils/DriverTestBase.java | 7 ++- .../testutils/UnaryOperatorTestBase.java | 6 +-- .../UnregisteredTaskMetricsGroup.java | 7 +++ .../api/operators/AbstractStreamOperator.java | 3 +- .../runtime/io/StreamInputProcessor.java | 3 +- .../runtime/tasks/OperatorChain.java | 3 +- 31 files changed, 149 insertions(+), 64 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java new file mode 100644 index 0000000000000..8a69029ac7711 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.metrics.Counter; + +/** + * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is + * forwarded to the parent operator metric group. + */ +public class OperatorIOMetricGroup extends ProxyMetricGroup { + + private final Counter numRecordsIn; + private final Counter numRecordsOut; + + public OperatorIOMetricGroup(OperatorMetricGroup parentMetricGroup) { + super(parentMetricGroup); + numRecordsIn = parentMetricGroup.counter("numRecordsIn"); + numRecordsOut = parentMetricGroup.counter("numRecordsOut"); + } + + public Counter getNumRecordsInCounter() { + return numRecordsIn; + } + + public Counter getNumRecordsOutCounter() { + return numRecordsOut; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java index f1354b577d217..0c823ea8ab546 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java @@ -36,9 +36,13 @@ public class OperatorMetricGroup extends ComponentMetricGroup { private final String operatorName; + private final OperatorIOMetricGroup ioMetrics; + public OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup parent, String operatorName) { super(registry, registry.getScopeFormats().getOperatorFormat().formatScope(checkNotNull(parent), operatorName), parent); this.operatorName = operatorName; + + ioMetrics = new OperatorIOMetricGroup(this); } // ------------------------------------------------------------------------ @@ -55,6 +59,15 @@ protected QueryScopeInfo.OperatorQueryScopeInfo createQueryServiceMetricInfo(Cha this.parent.subtaskIndex, filter.filterCharacters(this.operatorName)); } + + /** + * Returns the OperatorIOMetricGroup for this operator. + * + * @return OperatorIOMetricGroup for this operator. + */ + public OperatorIOMetricGroup getIOMetricGroup() { + return ioMetrics; + } // ------------------------------------------------------------------------ // Component Metric Group Specifics diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java index 8c66cc7e95444..3a69642d1ebf4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java @@ -67,7 +67,7 @@ public boolean isInputResettable(int inputNum) { public void initialize() throws Exception { TaskConfig config = this.taskContext.getTaskConfig(); - final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsIn = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); TypeSerializer serializer1 = this.taskContext.getInputSerializer(0).getSerializer(); TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer(); @@ -169,7 +169,7 @@ public void prepare() throws Exception { @Override public void run() throws Exception { - final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut"); + final Counter numRecordsOut = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); final FlatJoinFunction matchStub = this.taskContext.getStub(); final Collector collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java index a28e27ec21938..3987ba084b7cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java @@ -88,7 +88,7 @@ public void prepare() throws Exception { final DriverStrategy ls = config.getDriverStrategy(); - final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); final MutableObjectIterator in1 = new CountingMutableObjectIterator<>(this.taskContext.getInput(0), numRecordsIn); final MutableObjectIterator in2 = new CountingMutableObjectIterator<>(this.taskContext.getInput(1), numRecordsIn); @@ -151,7 +151,7 @@ public void prepare() throws Exception { @Override public void run() throws Exception { - final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); final FlatJoinFunction joinStub = this.taskContext.getStub(); final Collector collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java index f0673c6a9175c..ceab46d6a15d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java @@ -94,8 +94,8 @@ public void run() throws Exception { LOG.debug("AllGroupCombine starting."); } - final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); - final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); + final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); final TypeSerializerFactory serializerFactory = this.taskContext.getInputSerializer(0); TypeSerializer serializer = serializerFactory.getSerializer(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java index 13d7222a6f35d..0d79f092338c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java @@ -107,8 +107,8 @@ public void run() throws Exception { LOG.debug(this.taskContext.formatLogString("AllReduce preprocessing done. Running Reducer code.")); } - final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); - final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); + final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); final ReduceFunction stub = this.taskContext.getStub(); final MutableObjectIterator input = this.input; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java index 29f1c204ccaa2..354dbac17d845 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.operators.chaining.ChainedDriver; import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException; import org.apache.flink.runtime.operators.resettable.SpillingResettableMutableObjectIterator; @@ -213,7 +214,7 @@ public class BatchTask extends AbstractInvokable impleme * The accumulator map used in the RuntimeContext. */ protected Map> accumulatorMap; - private MetricGroup metrics; + private OperatorMetricGroup metrics; // -------------------------------------------------------------------------------------------- // Task Interface @@ -1071,7 +1072,7 @@ public String formatLogString(String message) { } @Override - public MetricGroup getMetricGroup() { + public OperatorMetricGroup getMetricGroup() { return metrics; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java index 43a913d13121e..c3bd492d8a660 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java @@ -97,7 +97,7 @@ public void prepare() throws Exception throw new Exception("Unrecognized driver strategy for CoGoup driver: " + config.getDriverStrategy().name()); } - final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); final MutableObjectIterator in1 = new CountingMutableObjectIterator<>(this.taskContext.getInput(0), numRecordsIn); final MutableObjectIterator in2 = new CountingMutableObjectIterator<>(this.taskContext.getInput(1), numRecordsIn); @@ -149,7 +149,7 @@ public void prepare() throws Exception @Override public void run() throws Exception { - final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); final CoGroupFunction coGroupStub = this.taskContext.getStub(); final Collector collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java index fee0874fc8005..c3f395815f9fb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java @@ -198,8 +198,8 @@ private void runBlockedOuterFirst() throws Exception { "First input is outer (blocking) side, second input is inner (spilling) side.")); } - final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn"); - final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut"); + final Counter numRecordsIn = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); + final Counter numRecordsOut = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); final MutableObjectIterator in1 = new CountingMutableObjectIterator<>(this.taskContext.getInput(0), numRecordsIn); final MutableObjectIterator in2 = new CountingMutableObjectIterator<>(this.taskContext.getInput(1), numRecordsIn); @@ -266,8 +266,8 @@ private void runBlockedOuterSecond() throws Exception { "First input is inner (spilling) side, second input is outer (blocking) side.")); } - final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn"); - final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut"); + final Counter numRecordsIn = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); + final Counter numRecordsOut = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); final MutableObjectIterator in1 = new CountingMutableObjectIterator<>(this.taskContext.getInput(0), numRecordsIn); final MutableObjectIterator in2 = new CountingMutableObjectIterator<>(this.taskContext.getInput(1), numRecordsIn); @@ -332,8 +332,8 @@ private void runStreamedOuterFirst() throws Exception { "First input is outer side, second input is inner (spilling) side.")); } - final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn"); - final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut"); + final Counter numRecordsIn = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); + final Counter numRecordsOut = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); final MutableObjectIterator in1 = new CountingMutableObjectIterator<>(this.taskContext.getInput(0), numRecordsIn); final MutableObjectIterator in2 = new CountingMutableObjectIterator<>(this.taskContext.getInput(1), numRecordsIn); @@ -385,8 +385,8 @@ private void runStreamedOuterSecond() throws Exception { "First input is inner (spilling) side, second input is outer side.")); } - final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn"); - final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut"); + final Counter numRecordsIn = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); + final Counter numRecordsOut = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); final MutableObjectIterator in1 = new CountingMutableObjectIterator<>(this.taskContext.getInput(0), numRecordsIn); final MutableObjectIterator in2 = new CountingMutableObjectIterator<>(this.taskContext.getInput(1), numRecordsIn); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index c77a9ae9593be..4626b69a0bb1d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException; import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; import org.apache.flink.runtime.operators.util.CloseableInputProvider; @@ -107,7 +108,7 @@ public void invoke() throws Exception { LOG.debug(getLogString("Starting data sink operator")); RuntimeContext ctx = createRuntimeContext(); - final Counter numRecordsIn = ctx.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsIn = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){ ((RichOutputFormat) this.format).setRuntimeContext(ctx); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index c062bf8ba8854..4dc3ef529dd28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.operators.chaining.ChainedDriver; import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException; import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext; @@ -102,7 +103,7 @@ public void invoke() throws Exception { RuntimeContext ctx = createRuntimeContext(); Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed"); - Counter numRecordsOut = ctx.getMetricGroup().counter("numRecordsOut"); + Counter numRecordsOut = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsOutCounter(); if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) { ((RichInputFormat) this.format).setRuntimeContext(ctx); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java index 5b4a6ca396e6e..1a8f813057e2f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java @@ -85,8 +85,8 @@ public void prepare() { @Override public void run() throws Exception { - final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); - final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); + final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); // cache references on the stack final MutableObjectIterator input = this.taskContext.getInput(0); final FlatMapFunction function = this.taskContext.getStub(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java index ccd88ecdfe314..b6067e3273191 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java @@ -92,7 +92,7 @@ public void prepare() throws Exception { if (config.getDriverStrategy() != DriverStrategy.SORTED_GROUP_REDUCE) { throw new Exception("Unrecognized driver strategy for GroupReduce driver: " + config.getDriverStrategy().name()); } - final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); this.serializer = this.taskContext.getInputSerializer(0).getSerializer(); this.comparator = this.taskContext.getDriverComparator(0); @@ -111,7 +111,7 @@ public void run() throws Exception { if (LOG.isDebugEnabled()) { LOG.debug(this.taskContext.formatLogString("GroupReducer preprocessing done. Running GroupReducer code.")); } - final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); // cache references on the stack final GroupReduceFunction stub = this.taskContext.getStub(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java index 854372391e2c8..551bbff0ed35b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java @@ -88,7 +88,7 @@ public int getNumberOfDriverComparators() { public void prepare() throws Exception{ final TaskConfig config = this.taskContext.getTaskConfig(); - final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); // obtain task manager's memory manager and I/O manager final MemoryManager memoryManager = this.taskContext.getMemoryManager(); @@ -214,7 +214,7 @@ public void prepare() throws Exception{ @Override public void run() throws Exception { - final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); final FlatJoinFunction joinStub = this.taskContext.getStub(); final Collector collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); final JoinTaskIterator joinIterator = this.joinIterator; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java index 65f9061ebbe93..8661851e90e41 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java @@ -80,8 +80,8 @@ public void prepare() { @Override public void run() throws Exception { - final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); - final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); + final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); // cache references on the stack final MutableObjectIterator input = this.taskContext.getInput(0); final MapFunction function = this.taskContext.getStub(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java index 3496e14f96243..0b6461e37002b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java @@ -86,8 +86,8 @@ public void prepare() { @Override public void run() throws Exception { - final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); - final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); + final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); // cache references on the stack final MutableObjectIterator input = new CountingMutableObjectIterator<>(this.taskContext.getInput(0), numRecordsIn); final MapPartitionFunction function = this.taskContext.getStub(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java index dd64b76a84561..c483a5f40d5e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java @@ -77,8 +77,8 @@ public void prepare() { @Override public void run() throws Exception { // cache references on the stack - final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); - final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); + final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); final MutableObjectIterator input = this.taskContext.getInput(0); final Collector output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java index aea7ae86a4ec9..e840ecc408b1b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java @@ -111,7 +111,7 @@ public int getNumberOfDriverComparators() { @Override public void prepare() throws Exception { - final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut"); + final Counter numRecordsOut = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); strategy = taskContext.getTaskConfig().getDriverStrategy(); @@ -159,7 +159,7 @@ public void run() throws Exception { LOG.debug("Combiner starting."); } - final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsIn = taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); final MutableObjectIterator in = taskContext.getInput(0); final TypeSerializer serializer = this.serializer; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java index eb4f2f5ebf813..8b0993915432a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java @@ -108,8 +108,8 @@ public void run() throws Exception { LOG.debug(this.taskContext.formatLogString("Reducer preprocessing done. Running Reducer code.")); } - final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); - final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); + final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); // cache references on the stack final MutableObjectIterator input = this.input; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java index bc3e4c1fc2d22..73fa3cf8f93f9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java @@ -22,10 +22,10 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.util.Collector; @@ -69,5 +69,5 @@ public interface TaskContext { String formatLogString(String message); - MetricGroup getMetricGroup(); + OperatorMetricGroup getMetricGroup(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java index 3d52925582c9d..8402b10c4d294 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java @@ -60,8 +60,8 @@ public void prepare() {} @Override public void run() throws Exception { - final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); - final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); + final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); final Collector output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); T reuse = this.taskContext.getInputSerializer(STREAMED_INPUT).getSerializer().createInstance(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java index 256013599b17f..cf62dfa61073b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java @@ -23,9 +23,9 @@ import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext; import org.apache.flink.runtime.operators.util.TaskConfig; @@ -54,7 +54,7 @@ public abstract class ChainedDriver implements Collector { protected boolean objectReuseEnabled = false; - protected MetricGroup metrics; + protected OperatorMetricGroup metrics; protected Counter numRecordsIn; @@ -69,8 +69,8 @@ public void setup(TaskConfig config, String taskName, Collector outputCollec this.taskName = taskName; this.userCodeClassLoader = userCodeClassLoader; this.metrics = parent.getEnvironment().getMetricGroup().addOperator(taskName); - this.numRecordsIn = this.metrics.counter("numRecordsIn"); - this.numRecordsOut = this.metrics.counter("numRecordsOut"); + this.numRecordsIn = this.metrics.getIOMetricGroup().getNumRecordsInCounter(); + this.numRecordsOut = this.metrics.getIOMetricGroup().getNumRecordsOutCounter(); this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut); Environment env = parent.getEnvironment(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java index 7f82d211340b8..af73c2725efcc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java @@ -19,14 +19,12 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.AbstractID; - import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -59,6 +57,23 @@ public void testGenerateScopeDefault() { registry.shutdown(); } + @Test + public void testIOMetricGroupInstantiation() { + MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); + TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); + TaskMetricGroup taskGroup = new TaskMetricGroup( + registry, jmGroup, new AbstractID(), new AbstractID(), "aTaskName", 11, 0); + OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, taskGroup, "myOpName"); + + assertNotNull(opGroup.getIOMetricGroup()); + assertNotNull(opGroup.getIOMetricGroup().getNumRecordsInCounter()); + assertNotNull(opGroup.getIOMetricGroup().getNumRecordsOutCounter()); + + registry.shutdown(); + } + @Test public void testVariables() { MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java index 62110a709e897..5acc915605341 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java @@ -25,14 +25,14 @@ import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.TaskContext; import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.util.Collector; @@ -226,7 +226,7 @@ public String formatLogString(String message) { } @Override - public MetricGroup getMetricGroup() { - return new UnregisteredMetricsGroup(); + public OperatorMetricGroup getMetricGroup() { + return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java index 75f960e162f40..47a79531475de 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.operators.Driver; import org.apache.flink.runtime.operators.ResettableDriver; import org.apache.flink.runtime.operators.TaskContext; @@ -371,8 +372,8 @@ public String formatLogString(String message) { } @Override - public MetricGroup getMetricGroup() { - return new UnregisteredMetricsGroup(); + public OperatorMetricGroup getMetricGroup() { + return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup(); } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java index 088435ab74219..c9fa66424360f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java @@ -24,12 +24,11 @@ import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.operators.Driver; import org.apache.flink.runtime.operators.ResettableDriver; import org.apache.flink.runtime.operators.TaskContext; @@ -368,8 +367,8 @@ public String formatLogString(String message) { } @Override - public MetricGroup getMetricGroup() { - return new UnregisteredMetricsGroup(); + public OperatorMetricGroup getMetricGroup() { + return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup(); } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java index a94e694dd75dc..19744e115ca2d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java @@ -26,11 +26,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.operators.Driver; import org.apache.flink.runtime.operators.TaskContext; import org.apache.flink.runtime.operators.ResettableDriver; @@ -363,8 +363,8 @@ public String formatLogString(String message) { } @Override - public MetricGroup getMetricGroup() { - return null; + public OperatorMetricGroup getMetricGroup() { + return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup(); } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java index ae87085bc976c..31a3336cec8ed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java @@ -23,6 +23,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; @@ -72,4 +73,10 @@ public DummyTaskIOMetricGroup() { super(new UnregisteredTaskMetricsGroup()); } } + + public static class DummyOperatorMetricGroup extends OperatorMetricGroup { + public DummyOperatorMetricGroup() { + super(EMPTY_REGISTRY, new UnregisteredTaskMetricsGroup(), "testoperator"); + } + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index f2da9dae55461..5b66466c4cbde 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -38,6 +38,7 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.DefaultKeyedStateStore; @@ -159,7 +160,7 @@ public void setup(StreamTask containingTask, StreamConfig config, Output streamOperator, final return false; } if (numRecordsIn == null) { - numRecordsIn = streamOperator.getMetricGroup().counter("numRecordsIn"); + numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); } while (true) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index d02b066e9cd2e..2a4a0659dc18d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput; import org.apache.flink.streaming.api.collector.selector.DirectedOutput; @@ -335,7 +336,7 @@ private static class ChainingOutput implements Output> { public ChainingOutput(OneInputStreamOperator operator) { this.operator = operator; - this.numRecordsIn = operator.getMetricGroup().counter("numRecordsIn"); + this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); } @Override