Skip to content

Commit

Permalink
[FLINK-21766][metrics] Remove OperatorMetricGroup#parent()
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Mar 13, 2021
1 parent 767dd71 commit cd4c5c9
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 8 deletions.
Expand Up @@ -65,13 +65,13 @@ public Meter getNumRecordsOutRate() {

/** Causes the containing task to use this operators input record counter. */
public void reuseInputMetricsForTask() {
TaskIOMetricGroup taskIO = parentMetricGroup.parent().getIOMetricGroup();
TaskIOMetricGroup taskIO = parentMetricGroup.getTaskIOMetricGroup();
taskIO.reuseRecordsInputCounter(this.numRecordsIn);
}

/** Causes the containing task to use this operators output record counter. */
public void reuseOutputMetricsForTask() {
TaskIOMetricGroup taskIO = parentMetricGroup.parent().getIOMetricGroup();
TaskIOMetricGroup taskIO = parentMetricGroup.getTaskIOMetricGroup();
taskIO.reuseRecordsOutputCounter(this.numRecordsOut);
}
}
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
Expand Down Expand Up @@ -57,8 +58,12 @@ public OperatorMetricGroup(

// ------------------------------------------------------------------------

public final TaskMetricGroup parent() {
return parent;
public final TaskIOMetricGroup getTaskIOMetricGroup() {
return parent.getIOMetricGroup();
}

public final MetricGroup getJobMetricGroup() {
return parent.parent;
}

@Override
Expand Down
Expand Up @@ -36,7 +36,6 @@
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyedStateBackend;
Expand Down Expand Up @@ -209,7 +208,7 @@ public void setup(
MetricOptions.LATENCY_SOURCE_GRANULARITY.key(),
granularity);
}
TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent();
MetricGroup jobMetricGroup = this.metrics.getJobMetricGroup();
this.latencyStats =
new LatencyStats(
jobMetricGroup.addGroup("latency"),
Expand Down
Expand Up @@ -36,7 +36,6 @@
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyedStateBackend;
Expand Down Expand Up @@ -185,7 +184,7 @@ private LatencyStats createLatencyStats(
MetricOptions.LATENCY_SOURCE_GRANULARITY.key(),
granularity);
}
TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent();
MetricGroup jobMetricGroup = this.metrics.getJobMetricGroup();
return new LatencyStats(
jobMetricGroup.addGroup("latency"),
historySize,
Expand Down

0 comments on commit cd4c5c9

Please sign in to comment.