From 31ac8f9713be658e9617afdbbe7f552ab1733d57 Mon Sep 17 00:00:00 2001 From: zentol Date: Thu, 20 Oct 2016 15:53:03 +0200 Subject: [PATCH] [FLINK-4875] [metrics] Use correct operator name --- .../apache/flink/streaming/api/graph/StreamConfig.java | 9 +++++++++ .../streaming/api/graph/StreamingJobGraphGenerator.java | 2 ++ .../streaming/api/operators/AbstractStreamOperator.java | 3 +-- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 0dd1b378b6a9e..ffe84561fae10 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -68,6 +68,7 @@ public class StreamConfig implements Serializable { private static final String EDGES_IN_ORDER = "edgesInOrder"; private static final String OUT_STREAM_EDGES = "outStreamEdges"; private static final String IN_STREAM_EDGES = "inStreamEdges"; + private static final String OPERATOR_NAME = "operatorName"; private static final String CHECKPOINTING_ENABLED = "checkpointing"; private static final String CHECKPOINT_MODE = "checkpointMode"; @@ -390,6 +391,14 @@ public Map getTransitiveChainedTaskConfigs(ClassLoader cl throw new StreamTaskException("Could not instantiate configuration.", e); } } + + public void setOperatorName(String name) { + this.config.setString(OPERATOR_NAME,name); + } + + public String getOperatorName() { + return this.config.getString(OPERATOR_NAME, null); + } public void setChainIndex(int index) { this.config.setInteger(CHAIN_INDEX, index); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 0b7dc2aaf141d..4be1f5f70e42b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -211,6 +211,7 @@ private List createChain( config.setChainStart(); config.setChainIndex(0); + config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName()); config.setOutEdgesInOrder(transitiveOutEdges); config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges()); @@ -228,6 +229,7 @@ private List createChain( chainedConfigs.put(startNodeId, new HashMap()); } config.setChainIndex(chainIndex); + config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName()); chainedConfigs.get(startNodeId).put(currentNodeId, config); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 167dfb04ad2d6..eae451f94258f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -127,9 +127,8 @@ public abstract class AbstractStreamOperator public void setup(StreamTask containingTask, StreamConfig config, Output> output) { this.container = containingTask; this.config = config; - String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim(); - this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); + this.metrics = container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName()); this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration(); int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);