Skip to content

Commit

Permalink
[FLINK-4812][metrics] Expose currentLowWatermark for all operators
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Feb 2, 2018
1 parent b5a7b35 commit 92ad53e
Show file tree
Hide file tree
Showing 17 changed files with 559 additions and 91 deletions.
40 changes: 33 additions & 7 deletions docs/monitoring/metrics.md
Expand Up @@ -1190,12 +1190,7 @@ Thus, in order to infer the metric identifier:
</thead> </thead>
<tbody> <tbody>
<tr> <tr>
<th rowspan="7"><strong>Task</strong></th> <th rowspan="6"><strong>Task</strong></th>
<td>currentLowWatermark</td>
<td>The lowest watermark this task has received (in milliseconds).</td>
<td>Gauge</td>
</tr>
<tr>
<td>numBytesInLocal</td> <td>numBytesInLocal</td>
<td>The total number of bytes this task has read from a local source.</td> <td>The total number of bytes this task has read from a local source.</td>
<td>Counter</td> <td>Counter</td>
Expand Down Expand Up @@ -1252,7 +1247,38 @@ Thus, in order to infer the metric identifier:
<td>Counter</td> <td>Counter</td>
</tr> </tr>
<tr> <tr>
<th rowspan="2"><strong>Operator</strong></th> <th rowspan="6"><strong>Operator</strong></th>
<td>currentInputWatermark</td>
<td>
The last watermark this operator has received (in milliseconds).
<p><strong>Note:</strong> For operators with 2 inputs this is the minimum of the last received watermarks.</p>
</td>
<td>Gauge</td>
</tr>
<tr>
<td>currentInput1Watermark</td>
<td>
The last watermark this operator has received in its first input (in milliseconds).
<p><strong>Note:</strong> Only for operators with 2 inputs.</p>
</td>
<td>Gauge</td>
</tr>
<tr>
<td>currentInput2Watermark</td>
<td>
The last watermark this operator has received in its second input (in milliseconds).
<p><strong>Note:</strong> Only for operators with 2 inputs.</p>
</td>
<td>Gauge</td>
</tr>
<tr>
<td>currentOutputWatermark</td>
<td>
The last watermark this operator has emitted (in milliseconds).
</td>
<td>Gauge</td>
</tr>
<tr>
<td>latency</td> <td>latency</td>
<td>The latency distributions from all incoming sources (in milliseconds).</td> <td>The latency distributions from all incoming sources (in milliseconds).</td>
<td>Histogram</td> <td>Histogram</td>
Expand Down
Expand Up @@ -39,4 +39,9 @@ private MetricNames() {
public static final String IO_NUM_BYTES_IN_LOCAL_RATE = IO_NUM_BYTES_IN_LOCAL + SUFFIX_RATE; public static final String IO_NUM_BYTES_IN_LOCAL_RATE = IO_NUM_BYTES_IN_LOCAL + SUFFIX_RATE;
public static final String IO_NUM_BYTES_IN_REMOTE_RATE = IO_NUM_BYTES_IN_REMOTE + SUFFIX_RATE; public static final String IO_NUM_BYTES_IN_REMOTE_RATE = IO_NUM_BYTES_IN_REMOTE + SUFFIX_RATE;
public static final String IO_NUM_BYTES_OUT_RATE = IO_NUM_BYTES_OUT + SUFFIX_RATE; public static final String IO_NUM_BYTES_OUT_RATE = IO_NUM_BYTES_OUT + SUFFIX_RATE;

public static final String IO_CURRENT_INPUT_WATERMARK = "currentInputWatermark";
public static final String IO_CURRENT_INPUT_1_WATERMARK = "currentInput1Watermark";
public static final String IO_CURRENT_INPUT_2_WATERMARK = "currentInput2Watermark";
public static final String IO_CURRENT_OUTPUT_WATERMARK = "currentOutputWatermark";
} }
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.metrics.util;

import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;

import java.util.HashMap;
import java.util.Map;

/**
* An {@link OperatorMetricGroup} that exposes all registered metrics.
*/
public class InterceptingOperatorMetricGroup extends UnregisteredMetricGroups.UnregisteredOperatorMetricGroup {

private Map<String, Metric> intercepted;

/**
* Returns the registered metric for the given name, or null if it was never registered.
*
* @param name metric name
* @return registered metric for the given name, or null if it was never registered
*/
public Metric get(String name) {
return intercepted.get(name);
}

@Override
protected void addMetric(String name, Metric metric) {
if (intercepted == null) {
intercepted = new HashMap<>();
}
intercepted.put(name, metric);
super.addMetric(name, metric);
}
}
Expand Up @@ -36,7 +36,7 @@ public class CopyingDirectedOutput<OUT> extends DirectedOutput<OUT> {
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
public CopyingDirectedOutput( public CopyingDirectedOutput(
List<OutputSelector<OUT>> outputSelectors, List<OutputSelector<OUT>> outputSelectors,
List<Tuple2<Output<StreamRecord<OUT>>, StreamEdge>> outputs) { List<? extends Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge>> outputs) {
super(outputSelectors, outputs); super(outputSelectors, outputs);
} }


Expand Down
Expand Up @@ -18,11 +18,14 @@
package org.apache.flink.streaming.api.collector.selector; package org.apache.flink.streaming.api.collector.selector;


import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.util.OutputTag; import org.apache.flink.util.OutputTag;
import org.apache.flink.util.XORShiftRandom; import org.apache.flink.util.XORShiftRandom;


Expand All @@ -39,7 +42,7 @@
* Wrapping {@link Output} that forwards to other {@link Output Outputs } based on a list of * Wrapping {@link Output} that forwards to other {@link Output Outputs } based on a list of
* {@link OutputSelector OutputSelectors}. * {@link OutputSelector OutputSelectors}.
*/ */
public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> { public class DirectedOutput<OUT> implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> {


protected final OutputSelector<OUT>[] outputSelectors; protected final OutputSelector<OUT>[] outputSelectors;


Expand All @@ -51,10 +54,12 @@ public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> {


private final Random random = new XORShiftRandom(); private final Random random = new XORShiftRandom();


protected final WatermarkGauge watermarkGauge = new WatermarkGauge();

@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
public DirectedOutput( public DirectedOutput(
List<OutputSelector<OUT>> outputSelectors, List<OutputSelector<OUT>> outputSelectors,
List<Tuple2<Output<StreamRecord<OUT>>, StreamEdge>> outputs) { List<? extends Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge>> outputs) {
this.outputSelectors = outputSelectors.toArray(new OutputSelector[outputSelectors.size()]); this.outputSelectors = outputSelectors.toArray(new OutputSelector[outputSelectors.size()]);


this.allOutputs = new Output[outputs.size()]; this.allOutputs = new Output[outputs.size()];
Expand All @@ -65,7 +70,7 @@ public DirectedOutput(
HashSet<Output<StreamRecord<OUT>>> selectAllOutputs = new HashSet<Output<StreamRecord<OUT>>>(); HashSet<Output<StreamRecord<OUT>>> selectAllOutputs = new HashSet<Output<StreamRecord<OUT>>>();
HashMap<String, ArrayList<Output<StreamRecord<OUT>>>> outputMap = new HashMap<String, ArrayList<Output<StreamRecord<OUT>>>>(); HashMap<String, ArrayList<Output<StreamRecord<OUT>>>> outputMap = new HashMap<String, ArrayList<Output<StreamRecord<OUT>>>>();


for (Tuple2<Output<StreamRecord<OUT>>, StreamEdge> outputPair : outputs) { for (Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge> outputPair : outputs) {
final Output<StreamRecord<OUT>> output = outputPair.f0; final Output<StreamRecord<OUT>> output = outputPair.f0;
final StreamEdge edge = outputPair.f1; final StreamEdge edge = outputPair.f1;


Expand Down Expand Up @@ -100,6 +105,7 @@ public DirectedOutput(


@Override @Override
public void emitWatermark(Watermark mark) { public void emitWatermark(Watermark mark) {
watermarkGauge.setCurrentWatermark(mark.getTimestamp());
for (Output<StreamRecord<OUT>> out : allOutputs) { for (Output<StreamRecord<OUT>> out : allOutputs) {
out.emitWatermark(mark); out.emitWatermark(mark);
} }
Expand Down Expand Up @@ -149,4 +155,9 @@ public void close() {
out.close(); out.close();
} }
} }

@Override
public Gauge<Long> getWatermarkGauge() {
return watermarkGauge;
}
} }
Expand Up @@ -19,17 +19,20 @@


import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.util.OutputTag; import org.apache.flink.util.OutputTag;


import java.io.IOException; import java.io.IOException;
Expand All @@ -40,7 +43,7 @@
* Implementation of {@link Output} that sends data using a {@link RecordWriter}. * Implementation of {@link Output} that sends data using a {@link RecordWriter}.
*/ */
@Internal @Internal
public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> { public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> {


private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter; private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;


Expand All @@ -50,6 +53,8 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {


private final OutputTag outputTag; private final OutputTag outputTag;


private final WatermarkGauge watermarkGauge = new WatermarkGauge();

@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public RecordWriterOutput( public RecordWriterOutput(
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter, StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
Expand Down Expand Up @@ -108,6 +113,7 @@ private <X> void pushToRecordWriter(StreamRecord<X> record) {


@Override @Override
public void emitWatermark(Watermark mark) { public void emitWatermark(Watermark mark) {
watermarkGauge.setCurrentWatermark(mark.getTimestamp());
serializationDelegate.setInstance(mark); serializationDelegate.setInstance(mark);


if (streamStatusProvider.getStreamStatus().isActive()) { if (streamStatusProvider.getStreamStatus().isActive()) {
Expand Down Expand Up @@ -158,4 +164,9 @@ public void close() {
public void clearBuffers() { public void clearBuffers() {
recordWriter.clearBuffers(); recordWriter.clearBuffers();
} }

@Override
public Gauge<Long> getWatermarkGauge() {
return watermarkGauge;
}
} }
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManager;
Expand All @@ -42,6 +41,7 @@
import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
Expand Down Expand Up @@ -106,7 +106,7 @@ public class StreamInputProcessor<IN> {


// ---------------- Metrics ------------------ // ---------------- Metrics ------------------


private long lastEmittedWatermark; private final WatermarkGauge watermarkGauge;
private Counter numRecordsIn; private Counter numRecordsIn;


private boolean isFinished; private boolean isFinished;
Expand All @@ -121,7 +121,9 @@ public StreamInputProcessor(
IOManager ioManager, IOManager ioManager,
Configuration taskManagerConfig, Configuration taskManagerConfig,
StreamStatusMaintainer streamStatusMaintainer, StreamStatusMaintainer streamStatusMaintainer,
OneInputStreamOperator<IN, ?> streamOperator) throws IOException { OneInputStreamOperator<IN, ?> streamOperator,
TaskIOMetricGroup metrics,
WatermarkGauge watermarkGauge) throws IOException {


InputGate inputGate = InputGateUtil.createInputGate(inputGates); InputGate inputGate = InputGateUtil.createInputGate(inputGates);


Expand Down Expand Up @@ -160,14 +162,15 @@ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {


this.numInputChannels = inputGate.getNumberOfInputChannels(); this.numInputChannels = inputGate.getNumberOfInputChannels();


this.lastEmittedWatermark = Long.MIN_VALUE;

this.streamStatusMaintainer = checkNotNull(streamStatusMaintainer); this.streamStatusMaintainer = checkNotNull(streamStatusMaintainer);
this.streamOperator = checkNotNull(streamOperator); this.streamOperator = checkNotNull(streamOperator);


this.statusWatermarkValve = new StatusWatermarkValve( this.statusWatermarkValve = new StatusWatermarkValve(
numInputChannels, numInputChannels,
new ForwardingValveOutputHandler(streamOperator, lock)); new ForwardingValveOutputHandler(streamOperator, lock));

this.watermarkGauge = watermarkGauge;
metrics.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos);
} }


public boolean processInput() throws Exception { public boolean processInput() throws Exception {
Expand Down Expand Up @@ -247,27 +250,6 @@ public boolean processInput() throws Exception {
} }
} }


/**
* Sets the metric group for this StreamInputProcessor.
*
* @param metrics metric group
*/
public void setMetricGroup(TaskIOMetricGroup metrics) {
metrics.gauge("currentLowWatermark", new Gauge<Long>() {
@Override
public Long getValue() {
return lastEmittedWatermark;
}
});

metrics.gauge("checkpointAlignmentTime", new Gauge<Long>() {
@Override
public Long getValue() {
return barrierHandler.getAlignmentDurationNanos();
}
});
}

public void cleanup() throws IOException { public void cleanup() throws IOException {
// clear the buffers first. this part should not ever fail // clear the buffers first. this part should not ever fail
for (RecordDeserializer<?> deserializer : recordDeserializers) { for (RecordDeserializer<?> deserializer : recordDeserializers) {
Expand Down Expand Up @@ -295,7 +277,7 @@ private ForwardingValveOutputHandler(final OneInputStreamOperator<IN, ?> operato
public void handleWatermark(Watermark watermark) { public void handleWatermark(Watermark watermark) {
try { try {
synchronized (lock) { synchronized (lock) {
lastEmittedWatermark = watermark.getTimestamp(); watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
operator.processWatermark(watermark); operator.processWatermark(watermark);
} }
} catch (Exception e) { } catch (Exception e) {
Expand Down

0 comments on commit 92ad53e

Please sign in to comment.