From e585c47c303b0c4feee097f36315be464da19a8e Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 1 Jun 2016 11:24:40 +0200 Subject: [PATCH 01/10] task: lowWatermark --- .../flink/streaming/runtime/io/StreamInputProcessor.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index ab69ab718ff60..a5153948050f0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.AbstractEvent; @@ -214,6 +215,12 @@ public void setMetricGroup(IOMetricGroup metrics) { for (RecordDeserializer deserializer : recordDeserializers) { deserializer.instantiateMetrics(metrics); } + metrics.gauge("currentLowWatermark", new Gauge() { + @Override + public Long getValue() { + return lastEmittedWatermark; + } + }); } public void cleanup() throws IOException { From 2c7b18b1476d548382bf6e5a49d496508a4fc9ae Mon Sep 17 00:00:00 2001 From: zentol Date: Fri, 3 Jun 2016 11:32:28 +0200 Subject: [PATCH 02/10] task: checkpointSize --- .../flink/streaming/runtime/tasks/StreamTask.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 51904b384ad8b..8f3975e11f9fd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -154,6 +155,8 @@ public abstract class StreamTask> private long recoveryTimestamp; + private long lastCheckpointSize = 0; + // ------------------------------------------------------------------------ // Life cycle methods for specific implementations // ------------------------------------------------------------------------ @@ -194,6 +197,13 @@ public final void invoke() throws Exception { // allow trigger tasks to be removed if all timers for that timestamp are removed by user timerService.setRemoveOnCancelPolicy(true); + getEnvironment().getMetricGroup().gauge("lastCheckpointSize", new Gauge() { + @Override + public Long getValue() { + return StreamTask.this.lastCheckpointSize; + } + }); + // task specific initialization init(); @@ -525,6 +535,7 @@ protected boolean performCheckpoint(final long checkpointId, final long timestam if (allStates.isEmpty()) { getEnvironment().acknowledgeCheckpoint(checkpointId); } else if (!hasAsyncStates) { + this.lastCheckpointSize = allStates.getStateSize(); getEnvironment().acknowledgeCheckpoint(checkpointId, allStates); } else { // start a Thread that does the asynchronous materialization and @@ -559,6 +570,7 @@ public void run() { } } StreamTaskStateList allStates = new StreamTaskStateList(states); + StreamTask.this.lastCheckpointSize = allStates.getStateSize(); getEnvironment().acknowledgeCheckpoint(checkpointId, allStates); LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}", checkpointId, getName()); } From b04e1dde2ecc101e6a45bcbc83921f8a4d9c3c1b Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 8 Jun 2016 11:20:14 +0200 Subject: [PATCH 03/10] task: numBytes(In/Out)(Local/Remote) --- .../flink/metrics/groups/IOMetricGroup.java | 18 +++++++++++------ .../AdaptiveSpanningRecordDeserializer.java | 5 ----- ...ingAdaptiveSpanningRecordDeserializer.java | 5 ----- .../partition/consumer/InputChannel.java | 8 +++++++- .../partition/consumer/LocalInputChannel.java | 12 +++++++---- .../consumer/RemoteInputChannel.java | 12 +++++++---- .../partition/consumer/SingleInputGate.java | 20 +++++++++++++------ .../consumer/UnknownInputChannel.java | 13 ++++++++---- .../flink/runtime/taskmanager/Task.java | 3 ++- .../partition/consumer/InputChannelTest.java | 3 ++- .../consumer/LocalInputChannelTest.java | 10 +++++++--- .../consumer/RemoteInputChannelTest.java | 10 +++++++--- .../consumer/SingleInputGateTest.java | 19 ++++++++++-------- .../consumer/TestSingleInputGate.java | 3 ++- .../consumer/UnionInputGateTest.java | 5 +++-- .../UnregisteredTaskMetricsGroup.java | 16 +++++++++++++++ 16 files changed, 108 insertions(+), 54 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java index b34c844c7bc37..e65b3abf62ca3 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java @@ -26,22 +26,20 @@ */ public class IOMetricGroup extends AbstractMetricGroup { - private final Counter numBytesIn; private final Counter numBytesOut; private final Counter numRecordsIn; private final Counter numRecordsOut; + private final Counter numBytesInLocal; + private final Counter numBytesInRemote; public IOMetricGroup(MetricRegistry registry, TaskMetricGroup parent) { super(registry, parent.getScopeComponents()); - - this.numBytesIn = parent.counter("numBytesIn"); this.numBytesOut = parent.counter("numBytesOut"); this.numRecordsIn = parent.counter("numRecordsIn"); this.numRecordsOut = parent.counter("numRecordsOut"); - } - public Counter getBytesInCounter() { - return numBytesIn; + this.numBytesInLocal = parent.counter("numBytesInLocal"); + this.numBytesInRemote = parent.counter("numBytesInRemote"); } public Counter getBytesOutCounter() { @@ -55,4 +53,12 @@ public Counter getRecordsInCounter() { public Counter getRecordsOutCounter() { return numRecordsOut; } + + public Counter getNumBytesInLocal() { + return numBytesInLocal; + } + + public Counter getNumBytesInRemote() { + return numBytesInRemote; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java index 1c1747601b8fc..5548150e3225b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java @@ -51,7 +51,6 @@ public class AdaptiveSpanningRecordDeserializer im private AccumulatorRegistry.Reporter reporter; private transient Counter numRecordsIn; - private transient Counter numBytesIn; public AdaptiveSpanningRecordDeserializer() { this.nonSpanningWrapper = new NonSpanningWrapper(); @@ -101,9 +100,6 @@ public DeserializationResult getNextRecord(T target) throws IOException { if (reporter != null) { reporter.reportNumBytesIn(len); } - if (numBytesIn != null) { - numBytesIn.inc(len); - } if (len <= nonSpanningRemaining - 4) { // we can get a full record from here @@ -181,7 +177,6 @@ public void setReporter(AccumulatorRegistry.Reporter reporter) { @Override public void instantiateMetrics(IOMetricGroup metrics) { - numBytesIn = metrics.getBytesInCounter(); numRecordsIn = metrics.getRecordsInCounter(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java index 7e96390c9d9a0..027d388277cac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java @@ -64,7 +64,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer initialAndMaxBackoff) { + Tuple2 initialAndMaxBackoff, + Counter numBytesIn) { checkArgument(channelIndex >= 0); @@ -84,6 +88,8 @@ protected InputChannel( this.initialBackoff = initial; this.maxBackoff = max; this.currentBackoff = initial == 0 ? -1 : 0; + + this.numBytesIn = numBytesIn; } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index ff12153ec976f..075b8d790e7ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -67,10 +68,11 @@ public class LocalInputChannel extends InputChannel implements NotificationListe int channelIndex, ResultPartitionID partitionId, ResultPartitionManager partitionManager, - TaskEventDispatcher taskEventDispatcher) { + TaskEventDispatcher taskEventDispatcher, + IOMetricGroup metrics) { this(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher, - new Tuple2(0, 0)); + new Tuple2(0, 0), metrics); } LocalInputChannel( @@ -79,9 +81,10 @@ public class LocalInputChannel extends InputChannel implements NotificationListe ResultPartitionID partitionId, ResultPartitionManager partitionManager, TaskEventDispatcher taskEventDispatcher, - Tuple2 initialAndMaxBackoff) { + Tuple2 initialAndMaxBackoff, + IOMetricGroup metrics) { - super(inputGate, channelIndex, partitionId, initialAndMaxBackoff); + super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, metrics.getNumBytesInLocal()); this.partitionManager = checkNotNull(partitionManager); this.taskEventDispatcher = checkNotNull(taskEventDispatcher); @@ -165,6 +168,7 @@ Buffer getNextBuffer() throws IOException, InterruptedException { getNextLookAhead(); + numBytesIn.inc(next.getSize()); return next; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index be2509f04d606..f3e605cfe8325 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; @@ -80,10 +81,11 @@ public RemoteInputChannel( int channelIndex, ResultPartitionID partitionId, ConnectionID connectionId, - ConnectionManager connectionManager) { + ConnectionManager connectionManager, + IOMetricGroup metrics) { this(inputGate, channelIndex, partitionId, connectionId, connectionManager, - new Tuple2(0, 0)); + new Tuple2(0, 0), metrics); } public RemoteInputChannel( @@ -92,9 +94,10 @@ public RemoteInputChannel( ResultPartitionID partitionId, ConnectionID connectionId, ConnectionManager connectionManager, - Tuple2 initialAndMaxBackoff) { + Tuple2 initialAndMaxBackoff, + IOMetricGroup metrics) { - super(inputGate, channelIndex, partitionId, initialAndMaxBackoff); + super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, metrics.getNumBytesInRemote()); this.connectionId = checkNotNull(connectionId); this.connectionManager = checkNotNull(connectionManager); @@ -148,6 +151,7 @@ Buffer getNextBuffer() throws IOException { throw new IOException("Queried input channel for data although non is available."); } + numBytesIn.inc(buffer.getSize()); return buffer; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index bf8bc73095594..c5569b8513581 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -20,6 +20,7 @@ import com.google.common.collect.Maps; import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; @@ -173,7 +174,8 @@ public SingleInputGate( IntermediateDataSetID consumedResultId, int consumedSubpartitionIndex, int numberOfInputChannels, - PartitionStateChecker partitionStateChecker) { + PartitionStateChecker partitionStateChecker, + IOMetricGroup metrics) { this.owningTaskName = checkNotNull(owningTaskName); this.jobId = checkNotNull(jobId); @@ -502,7 +504,8 @@ public static SingleInputGate create( JobID jobId, ExecutionAttemptID executionId, InputGateDeploymentDescriptor igdd, - NetworkEnvironment networkEnvironment) { + NetworkEnvironment networkEnvironment, + IOMetricGroup metrics) { final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId()); @@ -512,7 +515,8 @@ public static SingleInputGate create( final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors()); final SingleInputGate inputGate = new SingleInputGate( - owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex, icdd.length, networkEnvironment.getPartitionStateChecker()); + owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex, + icdd.length, networkEnvironment.getPartitionStateChecker(), metrics); // Create the input channels. There is one input channel for each consumed partition. final InputChannel[] inputChannels = new InputChannel[icdd.length]; @@ -526,13 +530,16 @@ public static SingleInputGate create( inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId, networkEnvironment.getPartitionManager(), networkEnvironment.getTaskEventDispatcher(), - networkEnvironment.getPartitionRequestInitialAndMaxBackoff()); + networkEnvironment.getPartitionRequestInitialAndMaxBackoff(), + metrics + ); } else if (partitionLocation.isRemote()) { inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId, partitionLocation.getConnectionId(), networkEnvironment.getConnectionManager(), - networkEnvironment.getPartitionRequestInitialAndMaxBackoff() + networkEnvironment.getPartitionRequestInitialAndMaxBackoff(), + metrics ); } else if (partitionLocation.isUnknown()) { @@ -540,7 +547,8 @@ else if (partitionLocation.isUnknown()) { networkEnvironment.getPartitionManager(), networkEnvironment.getTaskEventDispatcher(), networkEnvironment.getConnectionManager(), - networkEnvironment.getPartitionRequestInitialAndMaxBackoff() + networkEnvironment.getPartitionRequestInitialAndMaxBackoff(), + metrics ); } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java index cdf28be025c46..fbf4663b323e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; @@ -47,6 +48,8 @@ public class UnknownInputChannel extends InputChannel { /** Initial and maximum backoff (in ms) after failed partition requests. */ private final Tuple2 partitionRequestInitialAndMaxBackoff; + private final IOMetricGroup metrics; + public UnknownInputChannel( SingleInputGate gate, int channelIndex, @@ -54,14 +57,16 @@ public UnknownInputChannel( ResultPartitionManager partitionManager, TaskEventDispatcher taskEventDispatcher, ConnectionManager connectionManager, - Tuple2 partitionRequestInitialAndMaxBackoff) { + Tuple2 partitionRequestInitialAndMaxBackoff, + IOMetricGroup metrics) { - super(gate, channelIndex, partitionId, partitionRequestInitialAndMaxBackoff); + super(gate, channelIndex, partitionId, partitionRequestInitialAndMaxBackoff, null); this.partitionManager = checkNotNull(partitionManager); this.taskEventDispatcher = checkNotNull(taskEventDispatcher); this.connectionManager = checkNotNull(connectionManager); this.partitionRequestInitialAndMaxBackoff = checkNotNull(partitionRequestInitialAndMaxBackoff); + this.metrics = checkNotNull(metrics); } @Override @@ -112,10 +117,10 @@ public String toString() { // ------------------------------------------------------------------------ public RemoteInputChannel toRemoteInputChannel(ConnectionID producerAddress) { - return new RemoteInputChannel(inputGate, channelIndex, partitionId, checkNotNull(producerAddress), connectionManager, partitionRequestInitialAndMaxBackoff); + return new RemoteInputChannel(inputGate, channelIndex, partitionId, checkNotNull(producerAddress), connectionManager, partitionRequestInitialAndMaxBackoff, metrics); } public LocalInputChannel toLocalInputChannel() { - return new LocalInputChannel(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher, partitionRequestInitialAndMaxBackoff); + return new LocalInputChannel(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher, partitionRequestInitialAndMaxBackoff, metrics); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 1f766e187e2a7..4c7d857797c00 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -319,7 +319,8 @@ public Task(TaskDeploymentDescriptor tdd, for (int i = 0; i < this.inputGates.length; i++) { SingleInputGate gate = SingleInputGate.create( - taskNameWithSubtaskAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment); + taskNameWithSubtaskAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment, + metricGroup.getIOMetricGroup()); this.inputGates[i] = gate; inputGatesById.put(gate.getConsumedResultId(), gate); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java index 97175303e2616..da15f08de330a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -118,7 +119,7 @@ private MockInputChannel( ResultPartitionID partitionId, Tuple2 initialAndMaxBackoff) { - super(inputGate, channelIndex, partitionId, initialAndMaxBackoff); + super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new Counter()); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index d4fcf16d5d00f..cc08c4ac14e6b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -271,7 +272,8 @@ private LocalInputChannel createLocalInputChannel( new ResultPartitionID(), partitionManager, mock(TaskEventDispatcher.class), - initialAndMaxRequestBackoff); + initialAndMaxRequestBackoff, + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); } /** @@ -346,7 +348,8 @@ public TestLocalInputChannelConsumer( new IntermediateDataSetID(), subpartitionIndex, numberOfInputChannels, - mock(PartitionStateChecker.class)); + mock(PartitionStateChecker.class), + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); // Set buffer pool inputGate.setBufferPool(bufferPool); @@ -360,7 +363,8 @@ public TestLocalInputChannelConsumer( i, consumedPartitionIds[i], partitionManager, - taskEventDispatcher)); + taskEventDispatcher, + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup())); } this.numberOfInputChannels = numberOfInputChannels; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index c484cc47bc256..9eb49efa0af6e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.network.partition.ProducerFailedException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.util.TestBufferFactory; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.junit.Test; import scala.Tuple2; @@ -247,7 +248,8 @@ public void testOnFailedPartitionRequest() throws Exception { 0, partitionId, mock(ConnectionID.class), - connectionManager); + connectionManager, + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); ch.onFailedPartitionRequest(); @@ -266,7 +268,8 @@ public void testProducerFailedException() throws Exception { 0, new ResultPartitionID(), mock(ConnectionID.class), - connManager); + connManager, + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); ch.onError(new ProducerFailedException(new RuntimeException("Expected test exception."))); @@ -301,6 +304,7 @@ private RemoteInputChannel createRemoteInputChannel( new ResultPartitionID(), mock(ConnectionID.class), connectionManager, - initialAndMaxRequestBackoff); + initialAndMaxRequestBackoff, + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index c4bb785396d4c..05427a1aa36bc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.io.network.util.TestTaskEvent; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.junit.Test; import scala.Tuple2; @@ -66,7 +67,7 @@ public class SingleInputGateTest { public void testBasicGetNextLogic() throws Exception { // Setup final SingleInputGate inputGate = new SingleInputGate( - "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionStateChecker.class)); + "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); final TestInputChannel[] inputChannels = new TestInputChannel[]{ new TestInputChannel(inputGate, 0), @@ -113,7 +114,7 @@ public void testBackwardsEventWithUninitializedChannel() throws Exception { // Setup reader with one local and one unknown input channel final IntermediateDataSetID resultId = new IntermediateDataSetID(); - final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(PartitionStateChecker.class)); + final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); final BufferPool bufferPool = mock(BufferPool.class); when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2); @@ -122,12 +123,12 @@ public void testBackwardsEventWithUninitializedChannel() throws Exception { // Local ResultPartitionID localPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()); - InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher); + InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); // Unknown ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()); - InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), new Tuple2(0, 0)); + InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), new Tuple2(0, 0), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); // Set channels inputGate.setInputChannel(localPartitionId.getPartitionId(), local); @@ -168,7 +169,7 @@ public void testUpdateChannelBeforeRequest() throws Exception { new IntermediateDataSetID(), 0, 1, - mock(PartitionStateChecker.class)); + mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); @@ -179,7 +180,7 @@ public void testUpdateChannelBeforeRequest() throws Exception { partitionManager, new TaskEventDispatcher(), new LocalConnectionManager(), - new Tuple2<>(0, 0)); + new Tuple2(0, 0), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown); @@ -208,7 +209,8 @@ public void testReleaseWhilePollingChannel() throws Exception { new IntermediateDataSetID(), 0, 1, - mock(PartitionStateChecker.class)); + mock(PartitionStateChecker.class), + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); InputChannel unknown = new UnknownInputChannel( inputGate, @@ -217,7 +219,8 @@ public void testReleaseWhilePollingChannel() throws Exception { new ResultPartitionManager(), new TaskEventDispatcher(), new LocalConnectionManager(), - new Tuple2<>(0, 0)); + new Tuple2(0, 0), + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java index 114900095247e..9bb60d8d93741 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.util.event.EventListener; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -59,7 +60,7 @@ public TestSingleInputGate(int numberOfInputChannels, boolean initialize) { checkArgument(numberOfInputChannels >= 1); SingleInputGate realGate = new SingleInputGate( - "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, mock(PartitionStateChecker.class)); + "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); this.inputGate = spy(realGate); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java index d8714d191279b..28f621ff9b139 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -43,8 +44,8 @@ public class UnionInputGateTest { public void testBasicGetNextLogic() throws Exception { // Setup final String testTaskName = "Test Task"; - final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(PartitionStateChecker.class)); - final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(PartitionStateChecker.class)); + final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2}); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java index a2edce2b84397..6d3f768878f09 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java @@ -23,6 +23,7 @@ import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.metrics.groups.JobMetricGroup; import org.apache.flink.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.metrics.groups.TaskMetricGroup; @@ -65,4 +66,19 @@ public DummyJobMetricGroup() { super(EMPTY_REGISTRY, new DummyTaskManagerMetricsGroup(), new JobID(), "testjob"); } } + + public static class DummyIOMetricGroup extends IOMetricGroup { + public DummyIOMetricGroup() { + super(EMPTY_REGISTRY, new UnregisteredTaskMetricsGroup()); + } + + @Override + protected void addMetric(String name, Metric metric) { + } + + @Override + public MetricGroup addGroup(String name) { + return new UnregisteredMetricsGroup(); + } + } } From d131bfa3a1f95d22900bba131b6c300ee881e5c7 Mon Sep 17 00:00:00 2001 From: zentol Date: Tue, 7 Jun 2016 20:20:10 +0200 Subject: [PATCH 04/10] operator: numSplitsProcessed --- .../apache/flink/runtime/operators/DataSourceTask.java | 8 +++++++- .../api/functions/source/FileSourceFunction.java | 6 ++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index 819b84f99dd5f..eadbf1c900c6a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -19,12 +19,14 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; @@ -97,8 +99,11 @@ public void invoke() throws Exception { // -------------------------------------------------------------------- LOG.debug(getLogString("Starting data source operator")); + RuntimeContext ctx = createRuntimeContext(); + Counter splitCounter = ctx.getMetricGroup().counter("numSplitsProcessed"); + if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) { - ((RichInputFormat) this.format).setRuntimeContext(createRuntimeContext()); + ((RichInputFormat) this.format).setRuntimeContext(ctx); LOG.debug(getLogString("Rich Source detected. Initializing runtime context.")); ((RichInputFormat) this.format).openInputFormat(); LOG.debug(getLogString("Rich Source detected. Opening the InputFormat.")); @@ -165,6 +170,7 @@ public void invoke() throws Exception { // close. We close here such that a regular close throwing an exception marks a task as failed. format.close(); } + splitCounter.inc(); } // end for all input splits // close the collector. if it is a chaining task collector, it will close its chained tasks diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java index 0dcb9ffc796c0..a6b26b44f1a77 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -43,6 +44,8 @@ public class FileSourceFunction extends RichParallelSourceFunction { private volatile boolean isRunning = true; + private Counter splitCounter; + @SuppressWarnings("unchecked") public FileSourceFunction(InputFormat format, TypeInformation typeInfo) { this.format = (InputFormat) format; @@ -54,6 +57,7 @@ public FileSourceFunction(InputFormat format, TypeInformation typeI public void open(Configuration parameters) throws Exception { StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); this.provider = context.getInputSplitProvider(); + this.splitCounter = context.getMetricGroup().counter("numSplitsProcessed"); format.configure(parameters); serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); @@ -123,9 +127,11 @@ public void run(SourceContext ctx) throws Exception { OUT nextElement = serializer.createInstance(); nextElement = format.nextRecord(nextElement); if (nextElement == null && splitIterator.hasNext()) { + splitCounter.inc(); format.open(splitIterator.next()); continue; } else if (nextElement == null) { + splitCounter.inc(); break; } ctx.collect(nextElement); From ffbba601fab4542b30c299c3650a18b4b2612df2 Mon Sep 17 00:00:00 2001 From: zentol Date: Fri, 10 Jun 2016 13:24:49 +0200 Subject: [PATCH 05/10] operator: numRecords(In/Out) Basics --- .../util/metrics/CountingCollector.java | 42 +++++++++++++++ .../util/metrics/CountingIterable.java | 38 ++++++++++++++ .../util/metrics/CountingIterator.java | 48 +++++++++++++++++ .../CountingMutableObjectIterator.java | 51 +++++++++++++++++++ 4 files changed, 179 insertions(+) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java new file mode 100644 index 0000000000000..f7a1df93598aa --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.operators.util.metrics; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.Collector; + +public class CountingCollector implements Collector { + private final Collector collector; + private final Counter numRecordsOut; + + public CountingCollector(Collector collector, Counter numRecordsOut) { + this.collector = collector; + this.numRecordsOut = numRecordsOut; + } + + @Override + public void collect(OUT record) { + this.numRecordsOut.inc(); + this.collector.collect(record); + } + + @Override + public void close() { + this.collector.close(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java new file mode 100644 index 0000000000000..7494108b755c3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.operators.util.metrics; + +import org.apache.flink.metrics.Counter; + +import java.util.Iterator; + +public class CountingIterable implements Iterable { + + private final Iterable iterable; + private final Counter numRecordsIn; + + public CountingIterable(Iterable iterable, Counter numRecordsIn) { + this.iterable = iterable; + this.numRecordsIn = numRecordsIn; + } + + @Override + public Iterator iterator() { + return new CountingIterator<>(this.iterable.iterator(), this.numRecordsIn); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java new file mode 100644 index 0000000000000..fe89358240056 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.operators.util.metrics; + +import org.apache.flink.metrics.Counter; + +import java.util.Iterator; + +public class CountingIterator implements Iterator { + private final Iterator iterator; + private final Counter numRecordsIn; + + public CountingIterator(Iterator iterator, Counter numRecordsIn) { + this.iterator = iterator; + this.numRecordsIn = numRecordsIn; + } + + @Override + public boolean hasNext() { + return this.iterator.hasNext(); + } + + @Override + public IN next() { + numRecordsIn.inc(); + return this.iterator.next(); + } + + @Override + public void remove() { + this.iterator.remove(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java new file mode 100644 index 0000000000000..e4b436a878b21 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.operators.util.metrics; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.MutableObjectIterator; + +import java.io.IOException; + +public class CountingMutableObjectIterator implements MutableObjectIterator { + private final MutableObjectIterator iterator; + private final Counter numRecordsIn; + + public CountingMutableObjectIterator(MutableObjectIterator iterator, Counter numRecordsIn) { + this.iterator = iterator; + this.numRecordsIn = numRecordsIn; + } + + @Override + public IN next(IN reuse) throws IOException { + IN next = iterator.next(reuse); + if (next != null) { + numRecordsIn.inc(); + } + return next; + } + + @Override + public IN next() throws IOException { + IN next = iterator.next(); + if (next != null) { + numRecordsIn.inc(); + } + return next; + } +} From 9428ba9291380f56c0486c124163e3ad58ccfae8 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 8 Jun 2016 11:25:30 +0200 Subject: [PATCH 06/10] operator: streaming: numRecords(In/Out) --- .../flink/metrics/groups/IOMetricGroup.java | 12 ------ .../AdaptiveSpanningRecordDeserializer.java | 10 ----- .../SpanningRecordSerializer.java | 6 --- ...ingAdaptiveSpanningRecordDeserializer.java | 10 ----- .../api/operators/AbstractStreamOperator.java | 30 ++++++++++++- .../streaming/api/operators/StreamSource.java | 4 ++ .../runtime/io/StreamInputProcessor.java | 7 ++++ .../runtime/tasks/OperatorChain.java | 5 +++ .../runtime/tasks/SourceStreamTask.java | 2 +- .../runtime/tasks/StreamIterationTail.java | 42 ++++++++++++++----- 10 files changed, 78 insertions(+), 50 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java index e65b3abf62ca3..c6eb5605901ab 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java @@ -27,16 +27,12 @@ public class IOMetricGroup extends AbstractMetricGroup { private final Counter numBytesOut; - private final Counter numRecordsIn; - private final Counter numRecordsOut; private final Counter numBytesInLocal; private final Counter numBytesInRemote; public IOMetricGroup(MetricRegistry registry, TaskMetricGroup parent) { super(registry, parent.getScopeComponents()); this.numBytesOut = parent.counter("numBytesOut"); - this.numRecordsIn = parent.counter("numRecordsIn"); - this.numRecordsOut = parent.counter("numRecordsOut"); this.numBytesInLocal = parent.counter("numBytesInLocal"); this.numBytesInRemote = parent.counter("numBytesInRemote"); @@ -46,14 +42,6 @@ public Counter getBytesOutCounter() { return numBytesOut; } - public Counter getRecordsInCounter() { - return numRecordsIn; - } - - public Counter getRecordsOutCounter() { - return numRecordsOut; - } - public Counter getNumBytesInLocal() { return numBytesInLocal; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java index 5548150e3225b..ac3ac04e27769 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java @@ -21,7 +21,6 @@ import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -49,8 +48,6 @@ public class AdaptiveSpanningRecordDeserializer im private Buffer currentBuffer; private AccumulatorRegistry.Reporter reporter; - - private transient Counter numRecordsIn; public AdaptiveSpanningRecordDeserializer() { this.nonSpanningWrapper = new NonSpanningWrapper(); @@ -108,9 +105,6 @@ public DeserializationResult getNextRecord(T target) throws IOException { if (reporter != null) { reporter.reportNumRecordsIn(1); } - if (numRecordsIn != null) { - numRecordsIn.inc(); - } return (this.nonSpanningWrapper.remaining() == 0) ? DeserializationResult.LAST_RECORD_FROM_BUFFER : @@ -138,9 +132,6 @@ public DeserializationResult getNextRecord(T target) throws IOException { if (reporter != null) { reporter.reportNumRecordsIn(1); } - if (numRecordsIn != null) { - numRecordsIn.inc(); - } // move the remainder to the non-spanning wrapper // this does not copy it, only sets the memory segment @@ -177,7 +168,6 @@ public void setReporter(AccumulatorRegistry.Reporter reporter) { @Override public void instantiateMetrics(IOMetricGroup metrics) { - numRecordsIn = metrics.getRecordsInCounter(); } // ----------------------------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index 64956507f7714..b218de83883db 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -55,7 +55,6 @@ public class SpanningRecordSerializer implements R private AccumulatorRegistry.Reporter reporter; - private transient Counter numRecordsOut; private transient Counter numBytesOut; public SpanningRecordSerializer() { @@ -94,10 +93,6 @@ public SerializationResult addRecord(T record) throws IOException { if (numBytesOut != null) { numBytesOut.inc(len); } - - if (numRecordsOut != null) { - numRecordsOut.inc(); - } this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer(); @@ -204,6 +199,5 @@ public void setReporter(AccumulatorRegistry.Reporter reporter) { @Override public void instantiateMetrics(IOMetricGroup metrics) { numBytesOut = metrics.getBytesOutCounter(); - numRecordsOut = metrics.getRecordsOutCounter(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java index 027d388277cac..296736d2ce236 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java @@ -22,7 +22,6 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -63,8 +62,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer 0) { @@ -164,9 +158,6 @@ else if (remaining == 0) { if (reporter != null) { reporter.reportNumRecordsIn(1); } - if (numRecordsIn != null) { - numRecordsIn.inc(); - } // move the remainder to the non-spanning wrapper // this does not copy it, only sets the memory segment @@ -200,7 +191,6 @@ public void setReporter(AccumulatorRegistry.Reporter reporter) { @Override public void instantiateMetrics(IOMetricGroup metrics) { - numRecordsIn = metrics.getRecordsInCounter(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 326a42f619f2e..dc7bbdb8a9742 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -25,10 +25,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.runtime.state.KvStateSnapshot; import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; @@ -102,10 +104,10 @@ public abstract class AbstractStreamOperator public void setup(StreamTask containingTask, StreamConfig config, Output> output) { this.container = containingTask; this.config = config; - this.output = output; String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); + this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap()); stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader()); @@ -334,4 +336,30 @@ public boolean isInputCopyingDisabled() { public void disableInputCopy() { this.inputCopyDisabled = true; } + + public class CountingOutput implements Output> { + private final Output> output; + private final Counter numRecordsOut; + + public CountingOutput(Output> output, Counter counter) { + this.output = output; + this.numRecordsOut = counter; + } + + @Override + public void emitWatermark(Watermark mark) { + output.emitWatermark(mark); + } + + @Override + public void collect(StreamRecord record) { + numRecordsOut.inc(); + output.collect(record); + } + + @Override + public void close() { + output.close(); + } + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 84f59ed0a1ec3..68c623e472b73 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -51,6 +51,10 @@ public StreamSource(SRC sourceFunction) { this.chainingStrategy = ChainingStrategy.HEAD; } + public void run(final Object lockingObject) throws Exception { + run(lockingObject, output); + } + public void run(final Object lockingObject, final Output> collector) throws Exception { final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index a5153948050f0..8d8a2753257c7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; @@ -82,6 +83,8 @@ public class StreamInputProcessor { private final DeserializationDelegate deserializationDelegate; + private Counter numRecordsIn; + @SuppressWarnings("unchecked") public StreamInputProcessor(InputGate[] inputGates, TypeSerializer inputSerializer, EventListener checkpointListener, @@ -134,6 +137,9 @@ public boolean processInput(OneInputStreamOperator streamOperator, final if (isFinished) { return false; } + if (numRecordsIn == null) { + numRecordsIn = streamOperator.getMetricGroup().counter("numRecordsIn"); + } while (true) { if (currentRecordDeserializer != null) { @@ -167,6 +173,7 @@ public boolean processInput(OneInputStreamOperator streamOperator, final // now we can do the actual processing StreamRecord record = recordOrWatermark.asRecord(); synchronized (lock) { + numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 90abea40a5600..761aa37baf9c9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -26,6 +26,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -298,14 +299,17 @@ private static RecordWriterOutput createStreamOutput( private static class ChainingOutput implements Output> { protected final OneInputStreamOperator operator; + protected final Counter numRecordsIn; public ChainingOutput(OneInputStreamOperator operator) { this.operator = operator; + this.numRecordsIn = operator.getMetricGroup().counter("numRecordsIn"); } @Override public void collect(StreamRecord record) { try { + numRecordsIn.inc(); operator.setKeyContextElement1(record); operator.processElement(record); } @@ -347,6 +351,7 @@ public CopyingChainingOutput(OneInputStreamOperator operator, TypeSerializ @Override public void collect(StreamRecord record) { try { + numRecordsIn.inc(); StreamRecord copy = record.copy(serializer.copy(record.getValue())); operator.setKeyContextElement1(copy); operator.processElement(copy); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index af9278f3b0c4a..7ae99f6c036cf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -53,7 +53,7 @@ protected void cleanup() { @Override protected void run() throws Exception { - headOperator.run(getCheckpointLock(), getHeadOutput()); + headOperator.run(getCheckpointLock()); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java index af7d3f968e37b..016abe640b71d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java @@ -23,6 +23,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -56,13 +57,27 @@ public void init() throws Exception { LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID); - this.headOperator = new RecordPusher<>(dataChannel, iterationWaitTime); + this.headOperator = new RecordPusher<>(); + this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime)); } private static class RecordPusher extends AbstractStreamOperator implements OneInputStreamOperator { private static final long serialVersionUID = 1L; + @Override + public void processElement(StreamRecord record) throws Exception { + output.collect(record); + } + + @Override + public void processWatermark(Watermark mark) { + // ignore + } + } + + private static class IterationTailOutput implements Output> { + @SuppressWarnings("NonSerializableFieldInSerializableClass") private final BlockingQueue> dataChannel; @@ -70,25 +85,32 @@ private static class RecordPusher extends AbstractStreamOperator impleme private final boolean shouldWait; - RecordPusher(BlockingQueue> dataChannel, long iterationWaitTime) { + IterationTailOutput(BlockingQueue> dataChannel, long iterationWaitTime) { this.dataChannel = dataChannel; this.iterationWaitTime = iterationWaitTime; this.shouldWait = iterationWaitTime > 0; } @Override - public void processElement(StreamRecord record) throws Exception { - if (shouldWait) { - dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS); - } - else { - dataChannel.put(record); + public void emitWatermark(Watermark mark) { + } + + @Override + public void collect(StreamRecord record) { + try { + if (shouldWait) { + dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS); + } + else { + dataChannel.put(record); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); } } @Override - public void processWatermark(Watermark mark) { - // ignore + public void close() { } } } From 85e81e42147930af255c1991726c290281b9d82e Mon Sep 17 00:00:00 2001 From: zentol Date: Fri, 10 Jun 2016 13:25:18 +0200 Subject: [PATCH 07/10] operator: batch: unchained: numRecords(In/Out) --- .../AbstractCachedBuildSideJoinDriver.java | 12 +++-- .../operators/AbstractOuterJoinDriver.java | 14 ++++-- .../operators/AllGroupCombineDriver.java | 10 +++- .../runtime/operators/AllReduceDriver.java | 14 +++++- .../runtime/operators/CoGroupDriver.java | 13 ++++-- .../flink/runtime/operators/CrossDriver.java | 46 +++++++++++++------ .../flink/runtime/operators/DataSinkTask.java | 9 +++- .../runtime/operators/DataSourceTask.java | 4 +- .../runtime/operators/FlatMapDriver.java | 8 +++- .../runtime/operators/GroupReduceDriver.java | 10 +++- .../flink/runtime/operators/JoinDriver.java | 12 +++-- .../flink/runtime/operators/MapDriver.java | 8 +++- .../runtime/operators/MapPartitionDriver.java | 9 +++- .../flink/runtime/operators/NoOpDriver.java | 8 +++- .../operators/ReduceCombineDriver.java | 10 +++- .../flink/runtime/operators/ReduceDriver.java | 11 ++++- .../operators/UnionWithTempOperator.java | 8 +++- 17 files changed, 162 insertions(+), 44 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java index e034dd6504c24..406d430312c59 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java @@ -25,12 +25,15 @@ import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashJoinIterator; import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashJoinIterator; import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashJoinIterator; import org.apache.flink.runtime.operators.hash.ReusingBuildSecondReOpenableHashJoinIterator; import org.apache.flink.runtime.operators.util.JoinTaskIterator; import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; +import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; @@ -63,13 +66,15 @@ public boolean isInputResettable(int inputNum) { @Override public void initialize() throws Exception { TaskConfig config = this.taskContext.getTaskConfig(); + + final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn"); TypeSerializer serializer1 = this.taskContext.getInputSerializer(0).getSerializer(); TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer(); TypeComparator comparator1 = this.taskContext.getDriverComparator(0); TypeComparator comparator2 = this.taskContext.getDriverComparator(1); - MutableObjectIterator input1 = this.taskContext.getInput(0); - MutableObjectIterator input2 = this.taskContext.getInput(1); + MutableObjectIterator input1 = new CountingMutableObjectIterator<>(this.taskContext.getInput(0), numRecordsIn); + MutableObjectIterator input2 = new CountingMutableObjectIterator<>(this.taskContext.getInput(1), numRecordsIn); TypePairComparatorFactory pairComparatorFactory = this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader()); @@ -164,8 +169,9 @@ public void prepare() throws Exception { @Override public void run() throws Exception { + final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut"); final FlatJoinFunction matchStub = this.taskContext.getStub(); - final Collector collector = this.taskContext.getOutputCollector(); + final Collector collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java index 2589ca5af9a40..a28e27ec21938 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java @@ -23,10 +23,13 @@ import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.util.JoinTaskIterator; import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; +import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; import org.slf4j.Logger; @@ -84,9 +87,10 @@ public void prepare() throws Exception { final double driverMemFraction = config.getRelativeMemoryDriver(); final DriverStrategy ls = config.getDriverStrategy(); - - final MutableObjectIterator in1 = this.taskContext.getInput(0); - final MutableObjectIterator in2 = this.taskContext.getInput(1); + + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final MutableObjectIterator in1 = new CountingMutableObjectIterator<>(this.taskContext.getInput(0), numRecordsIn); + final MutableObjectIterator in2 = new CountingMutableObjectIterator<>(this.taskContext.getInput(1), numRecordsIn); // get serializers and comparators final TypeSerializer serializer1 = this.taskContext.getInputSerializer(0).getSerializer(); @@ -147,8 +151,10 @@ public void prepare() throws Exception { @Override public void run() throws Exception { + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final FlatJoinFunction joinStub = this.taskContext.getStub(); - final Collector collector = this.taskContext.getOutputCollector(); + final Collector collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); final JoinTaskIterator outerJoinIterator = this.outerJoinIterator; while (this.running && outerJoinIterator.callWithNextKey(joinStub, collector)) ; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java index 0c8dc34c2d8cb..f0673c6a9175c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java @@ -23,6 +23,9 @@ import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; +import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator; import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper; import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper; import org.apache.flink.util.Collector; @@ -91,12 +94,15 @@ public void run() throws Exception { LOG.debug("AllGroupCombine starting."); } + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final TypeSerializerFactory serializerFactory = this.taskContext.getInputSerializer(0); TypeSerializer serializer = serializerFactory.getSerializer(); - final MutableObjectIterator in = this.taskContext.getInput(0); + final MutableObjectIterator in = new CountingMutableObjectIterator<>(this.taskContext.getInput(0), numRecordsIn); final GroupCombineFunction reducer = this.taskContext.getStub(); - final Collector output = this.taskContext.getOutputCollector(); + final Collector output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { final ReusingMutableToRegularIteratorWrapper inIter = new ReusingMutableToRegularIteratorWrapper(in, serializer); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java index e8545e703fde1..13d7222a6f35d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java @@ -20,6 +20,9 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; +import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.ReduceFunction; @@ -104,14 +107,19 @@ public void run() throws Exception { LOG.debug(this.taskContext.formatLogString("AllReduce preprocessing done. Running Reducer code.")); } + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final ReduceFunction stub = this.taskContext.getStub(); final MutableObjectIterator input = this.input; final TypeSerializer serializer = this.serializer; + final Collector collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); T val1; if ((val1 = input.next()) == null) { return; } + numRecordsIn.inc(); if (objectReuseEnabled) { // We only need two objects. The first reference stores results and is @@ -121,6 +129,7 @@ public void run() throws Exception { T value = val1; while (running && (val2 = input.next(val2)) != null) { + numRecordsIn.inc(); value = stub.reduce(value, val2); // we must never read into the object returned @@ -132,14 +141,15 @@ public void run() throws Exception { } } - this.taskContext.getOutputCollector().collect(value); + collector.collect(value); } else { T val2; while (running && (val2 = input.next()) != null) { + numRecordsIn.inc(); val1 = stub.reduce(val1, val2); } - this.taskContext.getOutputCollector().collect(val1); + collector.collect(val1); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java index 665ab0e58afac..43a913d13121e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java @@ -20,7 +20,10 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; +import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.CoGroupFunction; @@ -93,9 +96,11 @@ public void prepare() throws Exception if (config.getDriverStrategy() != DriverStrategy.CO_GROUP) { throw new Exception("Unrecognized driver strategy for CoGoup driver: " + config.getDriverStrategy().name()); } + + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); - final MutableObjectIterator in1 = this.taskContext.getInput(0); - final MutableObjectIterator in2 = this.taskContext.getInput(1); + final MutableObjectIterator in1 = new CountingMutableObjectIterator<>(this.taskContext.getInput(0), numRecordsIn); + final MutableObjectIterator in2 = new CountingMutableObjectIterator<>(this.taskContext.getInput(1), numRecordsIn); // get the key positions and types final TypeSerializer serializer1 = this.taskContext.getInputSerializer(0).getSerializer(); @@ -144,8 +149,10 @@ public void prepare() throws Exception @Override public void run() throws Exception { + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final CoGroupFunction coGroupStub = this.taskContext.getStub(); - final Collector collector = this.taskContext.getOutputCollector(); + final Collector collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); final CoGroupTaskIterator coGroupIterator = this.coGroupIterator; while (this.running && coGroupIterator.next()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java index c9d84b16f61cd..3e1d01f4a6616 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java @@ -20,6 +20,9 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; +import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.CrossFunction; @@ -194,9 +197,12 @@ private void runBlockedOuterFirst() throws Exception { LOG.debug(this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: " + "First input is outer (blocking) side, second input is inner (spilling) side.")); } - - final MutableObjectIterator in1 = this.taskContext.getInput(0); - final MutableObjectIterator in2 = this.taskContext.getInput(1); + + final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut"); + + final MutableObjectIterator in1 = new CountingMutableObjectIterator<>(this.taskContext.getInput(0), numRecordsIn); + final MutableObjectIterator in2 = new CountingMutableObjectIterator<>(this.taskContext.getInput(1), numRecordsIn); final TypeSerializer serializer1 = this.taskContext.getInputSerializer(0).getSerializer(); final TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer(); @@ -213,7 +219,7 @@ private void runBlockedOuterFirst() throws Exception { final CrossFunction crosser = this.taskContext.getStub(); - final Collector collector = this.taskContext.getOutputCollector(); + final Collector collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { @@ -259,9 +265,12 @@ private void runBlockedOuterSecond() throws Exception { LOG.debug(this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: " + "First input is inner (spilling) side, second input is outer (blocking) side.")); } - - final MutableObjectIterator in1 = this.taskContext.getInput(0); - final MutableObjectIterator in2 = this.taskContext.getInput(1); + + final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut"); + + final MutableObjectIterator in1 = new CountingMutableObjectIterator<>(this.taskContext.getInput(0), numRecordsIn); + final MutableObjectIterator in2 = new CountingMutableObjectIterator<>(this.taskContext.getInput(1), numRecordsIn); final TypeSerializer serializer1 = this.taskContext.getInputSerializer(0).getSerializer(); final TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer(); @@ -277,7 +286,7 @@ private void runBlockedOuterSecond() throws Exception { this.blockIter = blockVals; final CrossFunction crosser = this.taskContext.getStub(); - final Collector collector = this.taskContext.getOutputCollector(); + final Collector collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { final T1 val1Reuse = serializer1.createInstance(); @@ -322,9 +331,12 @@ private void runStreamedOuterFirst() throws Exception { LOG.debug(this.taskContext.formatLogString("Running Cross with Nested-Loops: " + "First input is outer side, second input is inner (spilling) side.")); } - - final MutableObjectIterator in1 = this.taskContext.getInput(0); - final MutableObjectIterator in2 = this.taskContext.getInput(1); + + final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut"); + + final MutableObjectIterator in1 = new CountingMutableObjectIterator<>(this.taskContext.getInput(0), numRecordsIn); + final MutableObjectIterator in2 = new CountingMutableObjectIterator<>(this.taskContext.getInput(1), numRecordsIn); final TypeSerializer serializer1 = this.taskContext.getInputSerializer(0).getSerializer(); final TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer(); @@ -335,7 +347,7 @@ private void runStreamedOuterFirst() throws Exception { this.spillIter = spillVals; final CrossFunction crosser = this.taskContext.getStub(); - final Collector collector = this.taskContext.getOutputCollector(); + final Collector collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { final T1 val1Reuse = serializer1.createInstance(); @@ -372,8 +384,12 @@ private void runStreamedOuterSecond() throws Exception { LOG.debug(this.taskContext.formatLogString("Running Cross with Nested-Loops: " + "First input is inner (spilling) side, second input is outer side.")); } - final MutableObjectIterator in1 = this.taskContext.getInput(0); - final MutableObjectIterator in2 = this.taskContext.getInput(1); + + final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut"); + + final MutableObjectIterator in1 = new CountingMutableObjectIterator<>(this.taskContext.getInput(0), numRecordsIn); + final MutableObjectIterator in2 = new CountingMutableObjectIterator<>(this.taskContext.getInput(1), numRecordsIn); final TypeSerializer serializer1 = this.taskContext.getInputSerializer(0).getSerializer(); final TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer(); @@ -384,7 +400,7 @@ private void runStreamedOuterSecond() throws Exception { this.spillIter = spillVals; final CrossFunction crosser = this.taskContext.getStub(); - final Collector collector = this.taskContext.getOutputCollector(); + final Collector collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { final T1 val1Reuse = serializer1.createInstance(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index 380edd4cb9aed..b0252c143f3ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.io.CleanupWhenUnsuccessful; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.io.RichOutputFormat; @@ -27,6 +28,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; @@ -104,8 +106,11 @@ public void invoke() throws Exception { // -------------------------------------------------------------------- LOG.debug(getLogString("Starting data sink operator")); + RuntimeContext ctx = createRuntimeContext(); + final Counter numRecordsIn = ctx.getMetricGroup().counter("numRecordsIn"); + if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){ - ((RichOutputFormat) this.format).setRuntimeContext(createRuntimeContext()); + ((RichOutputFormat) this.format).setRuntimeContext(ctx); LOG.debug(getLogString("Rich Sink detected. Initializing runtime context.")); } @@ -174,6 +179,7 @@ public void invoke() throws Exception { // work! while (!this.taskCanceled && ((record = input.next(record)) != null)) { + numRecordsIn.inc(); format.writeRecord(record); } } else { @@ -181,6 +187,7 @@ public void invoke() throws Exception { // work! while (!this.taskCanceled && ((record = input.next()) != null)) { + numRecordsIn.inc(); format.writeRecord(record); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index eadbf1c900c6a..c57f1335af420 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException; import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext; import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,6 +102,7 @@ public void invoke() throws Exception { RuntimeContext ctx = createRuntimeContext(); Counter splitCounter = ctx.getMetricGroup().counter("numSplitsProcessed"); + Counter numRecordsOut = ctx.getMetricGroup().counter("numRecordsOut"); if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) { ((RichInputFormat) this.format).setRuntimeContext(ctx); @@ -140,7 +142,7 @@ public void invoke() throws Exception { LOG.debug(getLogString("Starting to read input from split " + split.toString())); try { - final Collector output = this.output; + final Collector output = new CountingCollector<>(this.output, numRecordsOut); if (objectReuseEnabled) { OT reuse = serializer.createInstance(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java index c29923b88ab3f..5b4a6ca396e6e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java @@ -21,6 +21,8 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; import org.slf4j.Logger; @@ -83,22 +85,26 @@ public void prepare() { @Override public void run() throws Exception { + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); // cache references on the stack final MutableObjectIterator input = this.taskContext.getInput(0); final FlatMapFunction function = this.taskContext.getStub(); - final Collector output = this.taskContext.getOutputCollector(); + final Collector output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { IT record = this.taskContext.getInputSerializer(0).getSerializer().createInstance(); while (this.running && ((record = input.next(record)) != null)) { + numRecordsIn.inc(); function.flatMap(record, output); } } else { IT record; while (this.running && ((record = input.next()) != null)) { + numRecordsIn.inc(); function.flatMap(record, output); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java index d6825acb102be..ccd88ecdfe314 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java @@ -19,6 +19,9 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; +import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.GroupReduceFunction; @@ -89,9 +92,11 @@ public void prepare() throws Exception { if (config.getDriverStrategy() != DriverStrategy.SORTED_GROUP_REDUCE) { throw new Exception("Unrecognized driver strategy for GroupReduce driver: " + config.getDriverStrategy().name()); } + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + this.serializer = this.taskContext.getInputSerializer(0).getSerializer(); this.comparator = this.taskContext.getDriverComparator(0); - this.input = this.taskContext.getInput(0); + this.input = new CountingMutableObjectIterator<>(this.taskContext.getInput(0), numRecordsIn); ExecutionConfig executionConfig = taskContext.getExecutionConfig(); this.objectReuseEnabled = executionConfig.isObjectReuseEnabled(); @@ -106,10 +111,11 @@ public void run() throws Exception { if (LOG.isDebugEnabled()) { LOG.debug(this.taskContext.formatLogString("GroupReducer preprocessing done. Running GroupReducer code.")); } + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); // cache references on the stack final GroupReduceFunction stub = this.taskContext.getStub(); - final Collector output = this.taskContext.getOutputCollector(); + final Collector output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { final ReusingKeyGroupedIterator iter = new ReusingKeyGroupedIterator(this.input, this.serializer, this.comparator); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java index f7ad8d1e3f9e2..efb59a79fb7b5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator; @@ -34,6 +35,8 @@ import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator; import org.apache.flink.runtime.operators.util.JoinTaskIterator; import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; +import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; @@ -84,6 +87,8 @@ public int getNumberOfDriverComparators() { @Override public void prepare() throws Exception{ final TaskConfig config = this.taskContext.getTaskConfig(); + + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); // obtain task manager's memory manager and I/O manager final MemoryManager memoryManager = this.taskContext.getMemoryManager(); @@ -96,8 +101,8 @@ public void prepare() throws Exception{ // test minimum memory requirements final DriverStrategy ls = config.getDriverStrategy(); - final MutableObjectIterator in1 = this.taskContext.getInput(0); - final MutableObjectIterator in2 = this.taskContext.getInput(1); + final MutableObjectIterator in1 = new CountingMutableObjectIterator<>(this.taskContext.getInput(0), numRecordsIn); + final MutableObjectIterator in2 = new CountingMutableObjectIterator<>(this.taskContext.getInput(1), numRecordsIn); // get the key positions and types final TypeSerializer serializer1 = this.taskContext.getInputSerializer(0).getSerializer(); @@ -209,8 +214,9 @@ public void prepare() throws Exception{ @Override public void run() throws Exception { + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); final FlatJoinFunction joinStub = this.taskContext.getStub(); - final Collector collector = this.taskContext.getOutputCollector(); + final Collector collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); final JoinTaskIterator joinIterator = this.joinIterator; while (this.running && joinIterator.callWithNextKey(joinStub, collector)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java index eefe8e4a50596..65f9061ebbe93 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java @@ -20,6 +20,8 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; @@ -78,15 +80,18 @@ public void prepare() { @Override public void run() throws Exception { + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); // cache references on the stack final MutableObjectIterator input = this.taskContext.getInput(0); final MapFunction function = this.taskContext.getStub(); - final Collector output = this.taskContext.getOutputCollector(); + final Collector output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { IT record = this.taskContext.getInputSerializer(0).getSerializer().createInstance(); while (this.running && ((record = input.next(record)) != null)) { + numRecordsIn.inc(); output.collect(function.map(record)); } } @@ -94,6 +99,7 @@ public void run() throws Exception { IT record = null; while (this.running && ((record = input.next()) != null)) { + numRecordsIn.inc(); output.collect(function.map(record)); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java index 8f245f0993ac4..3496e14f96243 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java @@ -21,6 +21,9 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; +import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator; import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper; import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper; import org.apache.flink.util.Collector; @@ -83,10 +86,12 @@ public void prepare() { @Override public void run() throws Exception { + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); // cache references on the stack - final MutableObjectIterator input = this.taskContext.getInput(0); + final MutableObjectIterator input = new CountingMutableObjectIterator<>(this.taskContext.getInput(0), numRecordsIn); final MapPartitionFunction function = this.taskContext.getStub(); - final Collector output = this.taskContext.getOutputCollector(); + final Collector output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { final ReusingMutableToRegularIteratorWrapper inIter = new ReusingMutableToRegularIteratorWrapper(input, this.taskContext.getInputSerializer(0).getSerializer()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java index 073a8371e1c10..dd64b76a84561 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java @@ -21,6 +21,8 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; import org.slf4j.Logger; @@ -75,18 +77,22 @@ public void prepare() { @Override public void run() throws Exception { // cache references on the stack + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); final MutableObjectIterator input = this.taskContext.getInput(0); - final Collector output = this.taskContext.getOutputCollector(); + final Collector output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { T record = this.taskContext.getInputSerializer(0).getSerializer().createInstance(); while (this.running && ((record = input.next(record)) != null)) { + numRecordsIn.inc(); output.collect(record); } } else { T record; while (this.running && ((record = input.next()) != null)) { + numRecordsIn.inc(); output.collect(record); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java index 1ceeaf0decbd0..e1ce39e7e465c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java @@ -23,6 +23,8 @@ import java.util.List; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.ReduceFunction; @@ -104,13 +106,15 @@ public void prepare() throws Exception { if (this.taskContext.getTaskConfig().getDriverStrategy() != DriverStrategy.SORTED_PARTIAL_REDUCE) { throw new Exception("Invalid strategy " + this.taskContext.getTaskConfig().getDriverStrategy() + " for reduce combiner."); } + + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); // instantiate the serializer / comparator final TypeSerializerFactory serializerFactory = this.taskContext.getInputSerializer(0); this.comparator = this.taskContext.getDriverComparator(0); this.serializer = serializerFactory.getSerializer(); this.reducer = this.taskContext.getStub(); - this.output = this.taskContext.getOutputCollector(); + this.output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); MemoryManager memManager = this.taskContext.getMemoryManager(); final int numMemoryPages = memManager.computeNumberOfPages( @@ -140,6 +144,8 @@ public void run() throws Exception { LOG.debug("Combiner starting."); } + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final MutableObjectIterator in = this.taskContext.getInput(0); final TypeSerializer serializer = this.serializer; @@ -147,6 +153,7 @@ public void run() throws Exception { T value = serializer.createInstance(); while (running && (value = in.next(value)) != null) { + numRecordsIn.inc(); // try writing to the sorter first if (this.sorter.write(value)) { @@ -166,6 +173,7 @@ public void run() throws Exception { else { T value; while (running && (value = in.next()) != null) { + numRecordsIn.inc(); // try writing to the sorter first if (this.sorter.write(value)) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java index 3b7af6e2e9299..eb4f2f5ebf813 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.ReduceFunction; @@ -106,6 +108,9 @@ public void run() throws Exception { LOG.debug(this.taskContext.formatLogString("Reducer preprocessing done. Running Reducer code.")); } + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + // cache references on the stack final MutableObjectIterator input = this.input; final TypeSerializer serializer = this.serializer; @@ -113,7 +118,7 @@ public void run() throws Exception { final ReduceFunction function = this.taskContext.getStub(); - final Collector output = this.taskContext.getOutputCollector(); + final Collector output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { // We only need two objects. The first reference stores results and is @@ -128,10 +133,12 @@ public void run() throws Exception { // iterate over key groups while (this.running && value != null) { + numRecordsIn.inc(); comparator.setReference(value); // iterate within a key group while ((reuse2 = input.next(reuse2)) != null) { + numRecordsIn.inc(); if (comparator.equalToReference(reuse2)) { // same group, reduce value = function.reduce(value, reuse2); @@ -163,11 +170,13 @@ public void run() throws Exception { // iterate over key groups while (this.running && value != null) { + numRecordsIn.inc(); comparator.setReference(value); T res = value; // iterate within a key group while ((value = input.next()) != null) { + numRecordsIn.inc(); if (comparator.equalToReference(value)) { // same group, reduce res = function.reduce(res, value); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java index 47917610b63c1..3d52925582c9d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.functions.Function; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; @@ -58,18 +60,22 @@ public void prepare() {} @Override public void run() throws Exception { + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); - final Collector output = this.taskContext.getOutputCollector(); + final Collector output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); T reuse = this.taskContext.getInputSerializer(STREAMED_INPUT).getSerializer().createInstance(); T record; final MutableObjectIterator input = this.taskContext.getInput(STREAMED_INPUT); while (this.running && ((record = input.next(reuse)) != null)) { + numRecordsIn.inc(); output.collect(record); } final MutableObjectIterator cache = this.taskContext.getInput(CACHED_INPUT); while (this.running && ((record = cache.next(reuse)) != null)) { + numRecordsIn.inc(); output.collect(record); } } From ae0ba804cc664d8044749257ab6303aadaeb853e Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 8 Jun 2016 14:03:19 +0200 Subject: [PATCH 08/10] operator: batch: chaining: numRecords(In/Out) --- .../flink/runtime/operators/NoOpChainedDriver.java | 1 + .../operators/chaining/ChainedAllReduceDriver.java | 1 + .../runtime/operators/chaining/ChainedDriver.java | 12 ++++++++++-- .../operators/chaining/ChainedFlatMapDriver.java | 1 + .../runtime/operators/chaining/ChainedMapDriver.java | 1 + .../chaining/ChainedTerminationCriterionDriver.java | 1 + .../chaining/GroupCombineChainedDriver.java | 1 + .../chaining/SynchronousChainedCombineDriver.java | 1 + 8 files changed, 17 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java index 9b08fadde2cec..802227a3c04ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java @@ -61,6 +61,7 @@ public String getTaskName() { @Override public void collect(IT record) { try { + this.numRecordsIn.inc(); this.outputCollector.collect(record); } catch (Exception ex) { throw new ExceptionInChainedStubException(this.taskName, ex); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java index 46ee41bae51ab..1e3482f9bc3cc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java @@ -86,6 +86,7 @@ public String getTaskName() { // -------------------------------------------------------------------------------------------- @Override public void collect(IT record) { + numRecordsIn.inc(); try { if (base == null) { base = objectReuseEnabled ? record : serializer.copy(record); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java index 407716f92f453..256013599b17f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java @@ -22,12 +22,14 @@ import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext; import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; import org.apache.flink.util.Collector; import java.util.Map; @@ -53,6 +55,10 @@ public abstract class ChainedDriver implements Collector { protected boolean objectReuseEnabled = false; protected MetricGroup metrics; + + protected Counter numRecordsIn; + + protected Counter numRecordsOut; public void setup(TaskConfig config, String taskName, Collector outputCollector, @@ -61,9 +67,11 @@ public void setup(TaskConfig config, String taskName, Collector outputCollec { this.config = config; this.taskName = taskName; - this.outputCollector = outputCollector; this.userCodeClassLoader = userCodeClassLoader; this.metrics = parent.getEnvironment().getMetricGroup().addOperator(taskName); + this.numRecordsIn = this.metrics.counter("numRecordsIn"); + this.numRecordsOut = this.metrics.counter("numRecordsOut"); + this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut); Environment env = parent.getEnvironment(); @@ -103,7 +111,7 @@ protected RuntimeContext getUdfRuntimeContext() { @SuppressWarnings("unchecked") public void setOutputCollector(Collector outputCollector) { - this.outputCollector = (Collector) outputCollector; + this.outputCollector = new CountingCollector<>((Collector) outputCollector, numRecordsOut); } public Collector getOutputCollector() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java index f51cb686dc01b..86be7b0aeb47e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java @@ -76,6 +76,7 @@ public String getTaskName() { @Override public void collect(IT record) { try { + this.numRecordsIn.inc(); this.mapper.flatMap(record, this.outputCollector); } catch (Exception ex) { throw new ExceptionInChainedStubException(this.taskName, ex); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java index 9b888f2d710d6..cef1b738d00da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java @@ -75,6 +75,7 @@ public String getTaskName() { @Override public void collect(IT record) { try { + this.numRecordsIn.inc(); this.outputCollector.collect(this.mapper.map(record)); } catch (Exception ex) { throw new ExceptionInChainedStubException(this.taskName, ex); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java index 3912b98ac5082..e3de1c48937a3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java @@ -59,6 +59,7 @@ public String getTaskName() { @Override public void collect(IT record) { + numRecordsIn.inc(); agg.aggregate(1); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java index 63f2b202a0198..e6c8c2fe071b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java @@ -167,6 +167,7 @@ public String getTaskName() { @Override public void collect(IN record) { + numRecordsIn.inc(); // try writing to the sorter first try { if (this.sorter.write(record)) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java index 408abc2c9eb47..a003d9e83d6eb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java @@ -163,6 +163,7 @@ public String getTaskName() { @Override public void collect(IN record) { + this.numRecordsIn.inc(); // try writing to the sorter first try { if (this.sorter.write(record)) { From 01301c8c0d6ea13769800da3f7bfe32cafe05760 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 15 Jun 2016 09:02:37 +0200 Subject: [PATCH 09/10] Address comments --- .../org/apache/flink/metrics/groups/IOMetricGroup.java | 4 ++-- .../io/network/api/reader/AbstractRecordReader.java | 8 -------- .../runtime/io/network/api/reader/BufferReader.java | 5 ----- .../runtime/io/network/api/reader/ReaderBase.java | 8 -------- .../AdaptiveSpanningRecordDeserializer.java | 5 ----- .../network/api/serialization/RecordDeserializer.java | 8 -------- .../SpillingAdaptiveSpanningRecordDeserializer.java | 5 ----- .../network/partition/consumer/LocalInputChannel.java | 2 +- .../network/partition/consumer/RemoteInputChannel.java | 2 +- .../org/apache/flink/runtime/operators/BatchTask.java | 1 - .../apache/flink/runtime/operators/DataSinkTask.java | 2 -- .../io/network/api/reader/AbstractReaderTest.java | 5 ----- .../streaming/runtime/io/StreamInputProcessor.java | 3 --- .../streaming/runtime/io/StreamTwoInputProcessor.java | 10 +++++++--- .../streaming/runtime/tasks/StreamIterationTail.java | 2 +- 15 files changed, 12 insertions(+), 58 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java index c6eb5605901ab..46bf2afa0d92d 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java @@ -42,11 +42,11 @@ public Counter getBytesOutCounter() { return numBytesOut; } - public Counter getNumBytesInLocal() { + public Counter getNumBytesInLocalCounter() { return numBytesInLocal; } - public Counter getNumBytesInRemote() { + public Counter getNumBytesInRemoteCounter() { return numBytesInRemote; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java index 48ac558daa55d..e0fe355ef4dd5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.api.reader; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; @@ -131,11 +130,4 @@ public void setReporter(AccumulatorRegistry.Reporter reporter) { deserializer.setReporter(reporter); } } - - @Override - public void setMetricGroup(IOMetricGroup metrics) { - for (RecordDeserializer deserializer : recordDeserializers) { - deserializer.instantiateMetrics(metrics); - } - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java index e5f5930698785..debb352589bcf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.api.reader; -import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; @@ -54,8 +53,4 @@ public Buffer getNextBuffer() throws IOException, InterruptedException { public void setReporter(AccumulatorRegistry.Reporter reporter) { } - - @Override - public void setMetricGroup(IOMetricGroup metrics) { - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java index 192a9abed5df5..a1d705f47a151 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java @@ -20,7 +20,6 @@ import java.io.IOException; -import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.util.event.EventListener; @@ -58,11 +57,4 @@ public interface ReaderBase { */ void setReporter(AccumulatorRegistry.Reporter reporter); - /** - * Setter for the metric group. - * - * @param metrics metric group to set - */ - void setMetricGroup(IOMetricGroup metrics); - } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java index ac3ac04e27769..cdd8731c3a74c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java @@ -21,7 +21,6 @@ import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.util.DataInputDeserializer; @@ -166,10 +165,6 @@ public void setReporter(AccumulatorRegistry.Reporter reporter) { this.spanningWrapper.setReporter(reporter); } - @Override - public void instantiateMetrics(IOMetricGroup metrics) { - } - // ----------------------------------------------------------------------------------------------------------------- private static final class NonSpanningWrapper implements DataInputView { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java index 2f0c1ac65486e..e4c7890bd69e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java @@ -23,7 +23,6 @@ import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -71,11 +70,4 @@ public boolean isBufferConsumed() { * Setter for the reporter, e.g. for the number of records emitted and the number of bytes read. */ void setReporter(AccumulatorRegistry.Reporter reporter); - - /** - * Instantiates all metrics. - * - * @param metrics metric group - */ - void instantiateMetrics(IOMetricGroup metrics); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java index 296736d2ce236..eab8e7ce43f79 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java @@ -22,7 +22,6 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.util.DataInputDeserializer; @@ -189,10 +188,6 @@ public void setReporter(AccumulatorRegistry.Reporter reporter) { this.spanningWrapper.setReporter(reporter); } - @Override - public void instantiateMetrics(IOMetricGroup metrics) { - } - // ----------------------------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 075b8d790e7ce..c4ec9f2a24fad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -84,7 +84,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe Tuple2 initialAndMaxBackoff, IOMetricGroup metrics) { - super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, metrics.getNumBytesInLocal()); + super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, metrics.getNumBytesInLocalCounter()); this.partitionManager = checkNotNull(partitionManager); this.taskEventDispatcher = checkNotNull(taskEventDispatcher); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index f3e605cfe8325..6e416da3f0b7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -97,7 +97,7 @@ public RemoteInputChannel( Tuple2 initialAndMaxBackoff, IOMetricGroup metrics) { - super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, metrics.getNumBytesInRemote()); + super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, metrics.getNumBytesInRemoteCounter()); this.connectionId = checkNotNull(connectionId); this.connectionManager = checkNotNull(connectionManager); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java index f38b988975db3..36965ab1bfb79 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java @@ -676,7 +676,6 @@ protected void initInputReaders() throws Exception { } inputReaders[i].setReporter(reporter); - inputReaders[i].setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup()); currentReaderOffset += groupSize; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index b0252c143f3ca..b73c85ae60dc6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -356,8 +356,6 @@ private void initInputReaders() throws Exception { inputReader.setReporter(reporter); - inputReader.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup()); - this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader()); @SuppressWarnings({ "rawtypes" }) final MutableObjectIterator iter = new ReaderIterator(inputReader, this.inputTypeSerializerFactory.getSerializer()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java index 9724a809c628e..6853722330c7a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java @@ -20,7 +20,6 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -190,9 +189,5 @@ protected MockReader(InputGate inputGate) { public void setReporter(AccumulatorRegistry.Reporter reporter) { } - - @Override - public void setMetricGroup(IOMetricGroup metrics) { - } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 8d8a2753257c7..33a0407b8ad61 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -219,9 +219,6 @@ public void setReporter(AccumulatorRegistry.Reporter reporter) { * @param metrics metric group */ public void setMetricGroup(IOMetricGroup metrics) { - for (RecordDeserializer deserializer : recordDeserializers) { - deserializer.instantiateMetrics(metrics); - } metrics.gauge("currentLowWatermark", new Gauge() { @Override public Long getValue() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index 733e7fb376554..1a66934459e4a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.AbstractEvent; @@ -287,9 +288,12 @@ public void setReporter(AccumulatorRegistry.Reporter reporter) { * @param metrics metric group */ public void setMetricGroup(IOMetricGroup metrics) { - for (RecordDeserializer deserializer : recordDeserializers) { - deserializer.instantiateMetrics(metrics); - } + metrics.gauge("currentLowWatermark", new Gauge() { + @Override + public Long getValue() { + return Math.min(lastEmittedWatermark1, lastEmittedWatermark2); + } + }); } public void cleanup() throws IOException { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java index 016abe640b71d..58e3cb808d9ea 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java @@ -99,7 +99,7 @@ public void emitWatermark(Watermark mark) { public void collect(StreamRecord record) { try { if (shouldWait) { - dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS); + dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS); } else { dataChannel.put(record); From 7a6391604bed58b43c96f5250fb1b043e9544b9a Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 15 Jun 2016 08:47:33 +0200 Subject: [PATCH 10/10] Remove splitCounter from FileSourceFunction --- .../streaming/api/functions/source/FileSourceFunction.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java index a6b26b44f1a77..0dcb9ffc796c0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; -import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -44,8 +43,6 @@ public class FileSourceFunction extends RichParallelSourceFunction { private volatile boolean isRunning = true; - private Counter splitCounter; - @SuppressWarnings("unchecked") public FileSourceFunction(InputFormat format, TypeInformation typeInfo) { this.format = (InputFormat) format; @@ -57,7 +54,6 @@ public FileSourceFunction(InputFormat format, TypeInformation typeI public void open(Configuration parameters) throws Exception { StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); this.provider = context.getInputSplitProvider(); - this.splitCounter = context.getMetricGroup().counter("numSplitsProcessed"); format.configure(parameters); serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); @@ -127,11 +123,9 @@ public void run(SourceContext ctx) throws Exception { OUT nextElement = serializer.createInstance(); nextElement = format.nextRecord(nextElement); if (nextElement == null && splitIterator.hasNext()) { - splitCounter.inc(); format.open(splitIterator.next()); continue; } else if (nextElement == null) { - splitCounter.inc(); break; } ctx.collect(nextElement);