Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-13764][task, metrics] Pass the counter of numRecordsIn into the constructor of StreamInputProcessor #9476

Merged
merged 2 commits into from Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -22,10 +22,8 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
Expand Down Expand Up @@ -85,7 +83,7 @@ public final class StreamOneInputProcessor<IN> implements StreamInputProcessor {
// ---------------- Metrics ------------------

private final WatermarkGauge watermarkGauge;
private Counter numRecordsIn;
private final Counter numRecordsIn;

@SuppressWarnings("unchecked")
public StreamOneInputProcessor(
Expand All @@ -101,7 +99,8 @@ public StreamOneInputProcessor(
TaskIOMetricGroup metrics,
WatermarkGauge watermarkGauge,
String taskName,
OperatorChain<?, ?> operatorChain) throws IOException {
OperatorChain<?, ?> operatorChain,
Counter numRecordsIn) throws IOException {

InputGate inputGate = InputGateUtil.createInputGate(inputGates);

Expand All @@ -127,6 +126,7 @@ public StreamOneInputProcessor(
metrics.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos);

this.operatorChain = checkNotNull(operatorChain);
this.numRecordsIn = checkNotNull(numRecordsIn);
}

@Override
Expand All @@ -141,8 +141,6 @@ public CompletableFuture<?> isAvailable() {

@Override
public boolean processInput() throws Exception {
initializeNumRecordsIn();

StreamElement recordOrMark = input.pollNextNullable();
if (recordOrMark != null) {
int channel = input.getLastChannel();
Expand Down Expand Up @@ -189,17 +187,6 @@ private void checkFinished() throws Exception {
}
}

private void initializeNumRecordsIn() {
if (numRecordsIn == null) {
try {
numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
numRecordsIn = new SimpleCounter();
}
}
}

@Override
public void close() throws IOException {
input.close();
Expand Down
Expand Up @@ -22,10 +22,8 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
Expand Down Expand Up @@ -94,7 +92,7 @@ public final class StreamTwoInputSelectableProcessor<IN1, IN2> implements Stream

private InputSelection inputSelection;

private Counter numRecordsIn;
private final Counter numRecordsIn;

private boolean isPrepared;

Expand All @@ -113,7 +111,8 @@ public StreamTwoInputSelectableProcessor(
WatermarkGauge input1WatermarkGauge,
WatermarkGauge input2WatermarkGauge,
String taskName,
OperatorChain<?, ?> operatorChain) throws IOException {
OperatorChain<?, ?> operatorChain,
Counter numRecordsIn) throws IOException {

checkState(streamOperator instanceof InputSelectable);

Expand Down Expand Up @@ -146,6 +145,7 @@ public StreamTwoInputSelectableProcessor(
new ForwardingValveOutputHandler(streamOperator, lock, streamStatusMaintainer, input2WatermarkGauge, 1));

this.operatorChain = checkNotNull(operatorChain);
this.numRecordsIn = checkNotNull(numRecordsIn);

this.firstStatus = StreamStatus.ACTIVE;
this.secondStatus = StreamStatus.ACTIVE;
Expand Down Expand Up @@ -329,14 +329,6 @@ private void prepareForProcessing() {
// method take effect.
inputSelection = inputSelector.nextSelection();

try {
numRecordsIn = ((OperatorMetricGroup) streamOperator
.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
numRecordsIn = new SimpleCounter();
}

isPrepared = true;
}

Expand Down
Expand Up @@ -76,19 +76,20 @@ public void init() throws Exception {
InputGate[] inputGates = getEnvironment().getAllInputGates();

inputProcessor = new StreamOneInputProcessor<>(
inputGates,
inSerializer,
this,
configuration.getCheckpointMode(),
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),
this.headOperator,
getEnvironment().getMetricGroup().getIOMetricGroup(),
inputWatermarkGauge,
getTaskNameWithSubtaskAndId(),
operatorChain);
inputGates,
inSerializer,
this,
configuration.getCheckpointMode(),
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),
headOperator,
getEnvironment().getMetricGroup().getIOMetricGroup(),
inputWatermarkGauge,
getTaskNameWithSubtaskAndId(),
operatorChain,
setupNumRecordsInCounter(headOperator));
}
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
// wrap watermark gauge since registered metrics must be unique
Expand Down
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
Expand All @@ -36,6 +38,7 @@
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
Expand Down Expand Up @@ -319,6 +322,15 @@ public StreamTaskStateInitializer createStreamTaskStateInitializer() {
timerService);
}

protected Counter setupNumRecordsInCounter(StreamOperator streamOperator) {
try {
return ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
return new SimpleCounter();
}
}

@VisibleForTesting
SynchronousSavepointLatch getSynchronousSavepointLatch() {
return syncSavepointLatch;
Expand Down
Expand Up @@ -58,6 +58,7 @@ protected void createInputProcessor(
input1WatermarkGauge,
input2WatermarkGauge,
getTaskNameWithSubtaskAndId(),
operatorChain);
operatorChain,
setupNumRecordsInCounter(headOperator));
}
}