From cd4c5c934602f7c6b04bfe4657a27919341e0eec Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Sat, 13 Mar 2021 09:08:04 +0100 Subject: [PATCH] [FLINK-21766][metrics] Remove OperatorMetricGroup#parent() --- .../runtime/metrics/groups/OperatorIOMetricGroup.java | 4 ++-- .../runtime/metrics/groups/OperatorMetricGroup.java | 9 +++++++-- .../streaming/api/operators/AbstractStreamOperator.java | 3 +-- .../api/operators/AbstractStreamOperatorV2.java | 3 +-- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java index 92c097c62a0022..dba909cabb26be 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java @@ -65,13 +65,13 @@ public Meter getNumRecordsOutRate() { /** Causes the containing task to use this operators input record counter. */ public void reuseInputMetricsForTask() { - TaskIOMetricGroup taskIO = parentMetricGroup.parent().getIOMetricGroup(); + TaskIOMetricGroup taskIO = parentMetricGroup.getTaskIOMetricGroup(); taskIO.reuseRecordsInputCounter(this.numRecordsIn); } /** Causes the containing task to use this operators output record counter. */ public void reuseOutputMetricsForTask() { - TaskIOMetricGroup taskIO = parentMetricGroup.parent().getIOMetricGroup(); + TaskIOMetricGroup taskIO = parentMetricGroup.getTaskIOMetricGroup(); taskIO.reuseRecordsOutputCounter(this.numRecordsOut); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java index d333082c416276..0862e71ee40e11 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; @@ -57,8 +58,12 @@ public OperatorMetricGroup( // ------------------------------------------------------------------------ - public final TaskMetricGroup parent() { - return parent; + public final TaskIOMetricGroup getTaskIOMetricGroup() { + return parent.getIOMetricGroup(); + } + + public final MetricGroup getJobMetricGroup() { + return parent.parent; } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 25b5d20c18247f..eb4b6825e91ea4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -36,7 +36,6 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; -import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyedStateBackend; @@ -209,7 +208,7 @@ public void setup( MetricOptions.LATENCY_SOURCE_GRANULARITY.key(), granularity); } - TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent(); + MetricGroup jobMetricGroup = this.metrics.getJobMetricGroup(); this.latencyStats = new LatencyStats( jobMetricGroup.addGroup("latency"), diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java index 3e87f0f9d3e907..83446a04d9e3a4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java @@ -36,7 +36,6 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; -import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyedStateBackend; @@ -185,7 +184,7 @@ private LatencyStats createLatencyStats( MetricOptions.LATENCY_SOURCE_GRANULARITY.key(), granularity); } - TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent(); + MetricGroup jobMetricGroup = this.metrics.getJobMetricGroup(); return new LatencyStats( jobMetricGroup.addGroup("latency"), historySize,