Skip to content
Permalink
Browse files
[FLINK-17295][runtime] Rework MetricGroup methods to ensure param con…
…sistency
  • Loading branch information
zhuzhurk committed May 20, 2022
1 parent 1f101ed commit 13cb93b838603c27440afe4dfedf52d547195961
Showing 14 changed files with 71 additions and 102 deletions.
@@ -21,7 +21,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.MetricRegistry;

import javax.annotation.Nullable;
@@ -71,12 +70,7 @@ public final TaskManagerMetricGroup parent() {
// ------------------------------------------------------------------------

public TaskMetricGroup addTask(
final JobVertexID jobVertexId,
final ExecutionAttemptID executionAttemptID,
final String taskName,
final int subtaskIndex,
final int attemptNumber) {
checkNotNull(jobVertexId);
final ExecutionAttemptID executionAttemptID, final String taskName) {
checkNotNull(executionAttemptID);
checkNotNull(taskName);

@@ -87,14 +81,7 @@ public TaskMetricGroup addTask(
return prior;
} else {
TaskMetricGroup task =
new TaskMetricGroup(
registry,
this,
jobVertexId,
executionAttemptID,
taskName,
subtaskIndex,
attemptNumber);
new TaskMetricGroup(registry, this, executionAttemptID, taskName);
tasks.put(executionAttemptID, task);
return task;
}
@@ -67,29 +67,26 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
TaskMetricGroup(
MetricRegistry registry,
TaskManagerJobMetricGroup parent,
JobVertexID vertexId,
ExecutionAttemptID executionId,
String taskName,
int subtaskIndex,
int attemptNumber) {
String taskName) {
super(
registry,
registry.getScopeFormats()
.getTaskFormat()
.formatScope(
checkNotNull(parent),
vertexId,
checkNotNull(executionId).getJobVertexId(),
checkNotNull(executionId),
taskName,
subtaskIndex,
attemptNumber),
checkNotNull(executionId).getSubtaskIndex(),
checkNotNull(executionId).getAttemptNumber()),
parent);

this.executionId = checkNotNull(executionId);
this.vertexId = checkNotNull(vertexId);
this.vertexId = executionId.getJobVertexId();
this.taskName = checkNotNull(taskName);
this.subtaskIndex = subtaskIndex;
this.attemptNumber = attemptNumber;
this.subtaskIndex = executionId.getSubtaskIndex();
this.attemptNumber = executionId.getAttemptNumber();

this.ioMetrics = new TaskIOMetricGroup(this);
}
@@ -20,7 +20,6 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;

@@ -153,30 +152,22 @@ public UnregisteredTaskManagerJobMetricGroup() {

@Override
public TaskMetricGroup addTask(
final JobVertexID jobVertexId,
final ExecutionAttemptID executionAttemptID,
final String taskName,
final int subtaskIndex,
final int attemptNumber) {
final ExecutionAttemptID executionAttemptID, final String taskName) {
return createUnregisteredTaskMetricGroup();
}
}

/** A safe drop-in replacement for {@link TaskMetricGroup}s. */
public static class UnregisteredTaskMetricGroup extends TaskMetricGroup {
private static final JobVertexID DEFAULT_VERTEX_ID = new JobVertexID(0, 0);
private static final ExecutionAttemptID DEFAULT_ATTEMPT_ID = new ExecutionAttemptID();
private static final String DEFAULT_TASK_NAME = "UnregisteredTask";

protected UnregisteredTaskMetricGroup() {
super(
NoOpMetricRegistry.INSTANCE,
new UnregisteredTaskManagerJobMetricGroup(),
DEFAULT_VERTEX_ID,
DEFAULT_ATTEMPT_ID,
DEFAULT_TASK_NAME,
0,
0);
DEFAULT_TASK_NAME);
}

@Override
@@ -655,12 +655,7 @@ public CompletableFuture<Acknowledge> submitTask(
// note that a pre-existing job group can NOT be closed concurrently - this is done by
// the same TM thread in removeJobMetricsGroup
TaskMetricGroup taskMetricGroup =
jobGroup.addTask(
taskInformation.getJobVertexId(),
tdd.getExecutionAttemptId(),
taskInformation.getTaskName(),
tdd.getSubtaskIndex(),
tdd.getAttemptNumber());
jobGroup.addTask(tdd.getExecutionAttemptId(), taskInformation.getTaskName());

InputSplitProvider inputSplitProvider =
new RpcInputSplitProvider(
@@ -428,6 +428,18 @@ public static Execution getExecution(
return ejv.getTaskVertices()[subtaskIndex].getCurrentExecutionAttempt();
}

public static ExecutionAttemptID createExecutionAttemptId() {
return createExecutionAttemptId(new JobVertexID(0, 0), 0, 0);
}

public static ExecutionAttemptID createExecutionAttemptId(
JobVertexID jobVertexId, int subtaskIndex, int attemptNumber) {
return new ExecutionAttemptID(
new ExecutionGraphID(),
new ExecutionVertexID(jobVertexId, subtaskIndex),
attemptNumber);
}

// ------------------------------------------------------------------------
// graph vertex verifications
// ------------------------------------------------------------------------
@@ -43,7 +43,6 @@
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
@@ -69,6 +68,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
import static org.junit.Assert.assertEquals;
@@ -347,7 +347,7 @@ private static TaskMetricGroup createTaskMetricGroup(Map<String, Metric> metrics
return TaskManagerMetricGroup.createTaskManagerMetricGroup(
new TestMetricRegistry(metrics), "localhost", ResourceID.generate())
.addJob(new JobID(), "jobName")
.addTask(new JobVertexID(0, 0), new ExecutionAttemptID(), "test", 0, 0);
.addTask(createExecutionAttemptId(), "test");
}

/** The metric registry for storing the registered metrics to verify in tests. */
@@ -38,6 +38,7 @@

import java.util.Map;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -69,7 +70,7 @@ public void testGenerateScopeDefault() throws Exception {

TaskMetricGroup taskGroup =
tmGroup.addJob(new JobID(), "myJobName")
.addTask(new JobVertexID(), new ExecutionAttemptID(), "aTaskName", 11, 0);
.addTask(createExecutionAttemptId(new JobVertexID(), 11, 0), "aTaskName");
InternalOperatorMetricGroup opGroup =
taskGroup.getOrAddOperator(new OperatorID(), "myOpName");

@@ -103,7 +104,7 @@ public void testGenerateScopeCustom() throws Exception {
TaskManagerMetricGroup.createTaskManagerMetricGroup(
registry, "theHostName", new ResourceID(tmID))
.addJob(jid, "myJobName")
.addTask(vertexId, new ExecutionAttemptID(), "aTaskname", 13, 2)
.addTask(createExecutionAttemptId(vertexId, 13, 2), "aTaskname")
.getOrAddOperator(operatorID, operatorName);

assertArrayEquals(
@@ -133,7 +134,7 @@ public void testIOMetricGroupInstantiation() throws Exception {

TaskMetricGroup taskGroup =
tmGroup.addJob(new JobID(), "myJobName")
.addTask(new JobVertexID(), new ExecutionAttemptID(), "aTaskName", 11, 0);
.addTask(createExecutionAttemptId(new JobVertexID(), 11, 0), "aTaskName");
InternalOperatorMetricGroup opGroup =
taskGroup.getOrAddOperator(new OperatorID(), "myOpName");

@@ -146,15 +147,14 @@ public void testIOMetricGroupInstantiation() throws Exception {
public void testVariables() {
JobID jid = new JobID();
JobVertexID tid = new JobVertexID();
ExecutionAttemptID eid = new ExecutionAttemptID();
ExecutionAttemptID eid = createExecutionAttemptId(tid, 11, 0);
OperatorID oid = new OperatorID();

TaskManagerMetricGroup tmGroup =
TaskManagerMetricGroup.createTaskManagerMetricGroup(
registry, "theHostName", new ResourceID("test-tm-id"));

TaskMetricGroup taskGroup =
tmGroup.addJob(jid, "myJobName").addTask(tid, eid, "aTaskName", 11, 0);
TaskMetricGroup taskGroup = tmGroup.addJob(jid, "myJobName").addTask(eid, "aTaskName");
InternalOperatorMetricGroup opGroup = taskGroup.getOrAddOperator(oid, "myOpName");

Map<String, String> variables = opGroup.getAllVariables();
@@ -183,13 +183,13 @@ private static void testVariable(
public void testCreateQueryServiceMetricInfo() {
JobID jid = new JobID();
JobVertexID vid = new JobVertexID();
ExecutionAttemptID eid = new ExecutionAttemptID();
ExecutionAttemptID eid = createExecutionAttemptId(vid, 4, 5);
OperatorID oid = new OperatorID();
TaskManagerMetricGroup tm =
TaskManagerMetricGroup.createTaskManagerMetricGroup(
registry, "host", new ResourceID("id"));

TaskMetricGroup task = tm.addJob(jid, "jobname").addTask(vid, eid, "taskName", 4, 5);
TaskMetricGroup task = tm.addJob(jid, "jobname").addTask(eid, "taskName");
InternalOperatorMetricGroup operator = task.getOrAddOperator(oid, "operator");

QueryScopeInfo.OperatorQueryScopeInfo info =
@@ -45,6 +45,7 @@

import java.util.Arrays;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
@@ -335,13 +336,13 @@ public void tolerateMetricAndGroupNameCollisions() {
public void testCreateQueryServiceMetricInfo() {
JobID jid = new JobID();
JobVertexID vid = new JobVertexID();
ExecutionAttemptID eid = new ExecutionAttemptID();
ExecutionAttemptID eid = createExecutionAttemptId(vid, 4, 5);
MetricRegistryImpl registry = new MetricRegistryImpl(defaultMetricRegistryConfiguration);
TaskManagerMetricGroup tm =
TaskManagerMetricGroup.createTaskManagerMetricGroup(
registry, "host", new ResourceID("id"));

TaskMetricGroup task = tm.addJob(jid, "jobname").addTask(vid, eid, "taskName", 4, 5);
TaskMetricGroup task = tm.addJob(jid, "jobname").addTask(eid, "taskName");
GenericMetricGroup userGroup1 = new GenericMetricGroup(registry, task, "hello");
GenericMetricGroup userGroup2 = new GenericMetricGroup(registry, userGroup1, "world");

@@ -37,6 +37,7 @@

import java.io.IOException;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -83,19 +84,16 @@ public void addAndRemoveJobs() throws IOException {
final JobVertexID vertex13 = new JobVertexID();
final JobVertexID vertex21 = new JobVertexID();

final ExecutionAttemptID execution11 = new ExecutionAttemptID();
final ExecutionAttemptID execution12 = new ExecutionAttemptID();
final ExecutionAttemptID execution13 = new ExecutionAttemptID();
final ExecutionAttemptID execution21 = new ExecutionAttemptID();
final ExecutionAttemptID execution11 = createExecutionAttemptId(vertex11, 17, 0);
final ExecutionAttemptID execution12 = createExecutionAttemptId(vertex12, 13, 1);
final ExecutionAttemptID execution13 = createExecutionAttemptId(vertex13, 0, 0);
final ExecutionAttemptID execution21 = createExecutionAttemptId(vertex21, 7, 2);

TaskMetricGroup tmGroup11 =
group.addJob(jid1, jobName1).addTask(vertex11, execution11, "test", 17, 0);
TaskMetricGroup tmGroup11 = group.addJob(jid1, jobName1).addTask(execution11, "test");

TaskMetricGroup tmGroup12 =
group.addJob(jid1, jobName1).addTask(vertex12, execution12, "test", 13, 1);
TaskMetricGroup tmGroup12 = group.addJob(jid1, jobName1).addTask(execution12, "test");

TaskMetricGroup tmGroup21 =
group.addJob(jid2, jobName2).addTask(vertex21, execution21, "test", 7, 2);
TaskMetricGroup tmGroup21 = group.addJob(jid2, jobName2).addTask(execution21, "test");

assertEquals(2, group.numRegisteredJobMetricGroups());
assertFalse(tmGroup11.parent().isClosed());
@@ -118,8 +116,7 @@ public void addAndRemoveJobs() throws IOException {

// add one more to job one

TaskMetricGroup tmGroup13 =
group.addJob(jid1, jobName1).addTask(vertex13, execution13, "test", 0, 0);
TaskMetricGroup tmGroup13 = group.addJob(jid1, jobName1).addTask(execution13, "test");
assertSame(
tmGroup11.parent(),
tmGroup13.parent()); // should use the same TaskManagerJobMetricGroup
@@ -143,18 +140,15 @@ public void testCloseClosesAll() throws IOException {
final JobVertexID vertex12 = new JobVertexID();
final JobVertexID vertex21 = new JobVertexID();

final ExecutionAttemptID execution11 = new ExecutionAttemptID();
final ExecutionAttemptID execution12 = new ExecutionAttemptID();
final ExecutionAttemptID execution21 = new ExecutionAttemptID();
final ExecutionAttemptID execution11 = createExecutionAttemptId(vertex11, 17, 0);
final ExecutionAttemptID execution12 = createExecutionAttemptId(vertex12, 13, 1);
final ExecutionAttemptID execution21 = createExecutionAttemptId(vertex21, 7, 1);

TaskMetricGroup tmGroup11 =
group.addJob(jid1, jobName1).addTask(vertex11, execution11, "test", 17, 0);
TaskMetricGroup tmGroup11 = group.addJob(jid1, jobName1).addTask(execution11, "test");

TaskMetricGroup tmGroup12 =
group.addJob(jid1, jobName1).addTask(vertex12, execution12, "test", 13, 1);
TaskMetricGroup tmGroup12 = group.addJob(jid1, jobName1).addTask(execution12, "test");

TaskMetricGroup tmGroup21 =
group.addJob(jid2, jobName2).addTask(vertex21, execution21, "test", 7, 1);
TaskMetricGroup tmGroup21 = group.addJob(jid2, jobName2).addTask(execution21, "test");

group.close();

0 comments on commit 13cb93b

Please sign in to comment.