Skip to content

Commit

Permalink
[FLINK-17295][runtime] Refine tests to ensure the consistency of jobV…
Browse files Browse the repository at this point in the history
…ertexId, subtaskIndex, attemptNumber and executionAttemptId
  • Loading branch information
zhuzhurk committed May 20, 2022
1 parent 579ac0f commit 014e8e0
Show file tree
Hide file tree
Showing 50 changed files with 205 additions and 148 deletions.
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.executiongraph;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;

Expand Down Expand Up @@ -48,11 +47,6 @@ public class ExecutionAttemptID implements java.io.Serializable {

private final int attemptNumber;

@VisibleForTesting
public ExecutionAttemptID() {
this(new ExecutionGraphID(), new ExecutionVertexID(new JobVertexID(0, 0), 0), 0);
}

public ExecutionAttemptID(
ExecutionGraphID executionGraphId,
ExecutionVertexID executionVertexId,
Expand All @@ -63,21 +57,6 @@ public ExecutionAttemptID(
this.attemptNumber = attemptNumber;
}

@VisibleForTesting
public ExecutionAttemptID(ExecutionAttemptID toCopy) {
// deep copy
this.executionGraphId = new ExecutionGraphID(toCopy.executionGraphId.getBytes());

final ExecutionVertexID executionVertexId = toCopy.executionVertexId;
final JobVertexID jobVertexId = executionVertexId.getJobVertexId();
this.executionVertexId =
new ExecutionVertexID(
new JobVertexID(jobVertexId.getBytes()),
executionVertexId.getSubtaskIndex());

this.attemptNumber = toCopy.attemptNumber;
}

public ExecutionVertexID getExecutionVertexId() {
return executionVertexId;
}
Expand Down Expand Up @@ -134,4 +113,9 @@ public int hashCode() {
public String toString() {
return String.format("%s_%s_%d", executionGraphId, executionVertexId, attemptNumber);
}

public static ExecutionAttemptID randomId() {
return new ExecutionAttemptID(
new ExecutionGraphID(), new ExecutionVertexID(new JobVertexID(0, 0), 0), 0);
}
}
Expand Up @@ -25,6 +25,7 @@

import java.io.Serializable;

import static org.apache.flink.runtime.executiongraph.ExecutionAttemptID.randomId;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand All @@ -44,7 +45,7 @@ public final class ResultPartitionID implements Serializable {

@VisibleForTesting
public ResultPartitionID() {
this(new IntermediateResultPartitionID(), new ExecutionAttemptID());
this(new IntermediateResultPartitionID(), randomId());
}

public ResultPartitionID(
Expand Down
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;

import static org.apache.flink.runtime.executiongraph.ExecutionAttemptID.randomId;

/** A collection of safe drop-in replacements for existing {@link ComponentMetricGroup}s. */
public class UnregisteredMetricGroups {

Expand Down Expand Up @@ -159,7 +161,7 @@ public TaskMetricGroup addTask(

/** A safe drop-in replacement for {@link TaskMetricGroup}s. */
public static class UnregisteredTaskMetricGroup extends TaskMetricGroup {
private static final ExecutionAttemptID DEFAULT_ATTEMPT_ID = new ExecutionAttemptID();
private static final ExecutionAttemptID DEFAULT_ATTEMPT_ID = randomId();
private static final String DEFAULT_TASK_NAME = "UnregisteredTask";

protected UnregisteredTaskMetricGroup() {
Expand Down
Expand Up @@ -126,6 +126,7 @@
import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.IO_EXCEPTION;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN;
import static org.apache.flink.runtime.checkpoint.CheckpointStoreUtil.INVALID_CHECKPOINT_ID;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -1787,7 +1788,8 @@ public void testHandleMessagesForNonExistingCheckpoints() throws Exception {

// unknown ack vertex
checkpointCoordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(graph.getJobID(), new ExecutionAttemptID(), checkpointId),
new AcknowledgeCheckpoint(
graph.getJobID(), createExecutionAttemptId(), checkpointId),
TASK_MANAGER_LOCATION_INFO);

checkpointCoordinator.shutdown();
Expand Down Expand Up @@ -1871,7 +1873,7 @@ public void testStateCleanupForLateOrUnknownMessages() throws Exception {
checkpointCoordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(
graph.getJobID(),
new ExecutionAttemptID(),
createExecutionAttemptId(),
checkpointId,
new CheckpointMetrics(),
unknownSubtaskState),
Expand All @@ -1886,7 +1888,7 @@ public void testStateCleanupForLateOrUnknownMessages() throws Exception {
checkpointCoordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(
new JobID(),
new ExecutionAttemptID(),
createExecutionAttemptId(),
checkpointId,
new CheckpointMetrics(),
differentJobSubtaskState),
Expand Down Expand Up @@ -1944,7 +1946,7 @@ public void testStateCleanupForLateOrUnknownMessages() throws Exception {
checkpointCoordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(
new JobID(),
new ExecutionAttemptID(),
createExecutionAttemptId(),
checkpointId,
new CheckpointMetrics(),
differentJobSubtaskState),
Expand All @@ -1959,7 +1961,7 @@ public void testStateCleanupForLateOrUnknownMessages() throws Exception {
checkpointCoordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(
graph.getJobID(),
new ExecutionAttemptID(),
createExecutionAttemptId(),
checkpointId,
new CheckpointMetrics(),
unknownSubtaskState2),
Expand Down Expand Up @@ -3231,7 +3233,7 @@ public void testSavepointScheduledInUnalignedMode() throws Exception {
coordinator.receiveDeclineMessage(
new DeclineCheckpoint(
graph.getJobID(),
new ExecutionAttemptID(),
createExecutionAttemptId(),
1L,
new CheckpointException(CHECKPOINT_DECLINED)),
"none");
Expand Down
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
Expand Down Expand Up @@ -51,6 +50,7 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -305,7 +305,7 @@ private Execution mockExecution() {

private Execution mockExecution(ExecutionState state) {
Execution mock = mock(Execution.class);
when(mock.getAttemptId()).thenReturn(new ExecutionAttemptID());
when(mock.getAttemptId()).thenReturn(createExecutionAttemptId());
when(mock.getState()).thenReturn(state);
return mock;
}
Expand Down
Expand Up @@ -64,6 +64,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
Expand All @@ -85,7 +86,7 @@ public class PendingCheckpointTest {

private static final List<Execution> ACK_TASKS = new ArrayList<>();
private static final List<ExecutionVertex> TASKS_TO_COMMIT = new ArrayList<>();
private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID();
private static final ExecutionAttemptID ATTEMPT_ID = createExecutionAttemptId();

public static final OperatorID OPERATOR_ID = new OperatorID();

Expand Down
Expand Up @@ -38,6 +38,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
Expand All @@ -49,7 +50,7 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger {

private static final IntermediateResultPartitionID partitionId =
new IntermediateResultPartitionID();
private static final ExecutionAttemptID producerExecutionId = new ExecutionAttemptID();
private static final ExecutionAttemptID producerExecutionId = createExecutionAttemptId();

private static final ResultPartitionType partitionType = ResultPartitionType.PIPELINED;
private static final int numberOfSubpartitions = 24;
Expand Down
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.runtime.executiongraph;

import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;

import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;

Expand All @@ -34,7 +36,9 @@ public class ExecutionAttemptIDTest {

@Test
public void testByteBufWriteAndRead() {
final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
final ExecutionAttemptID executionAttemptID =
new ExecutionAttemptID(
new ExecutionGraphID(), new ExecutionVertexID(new JobVertexID(), 123), 456);
final int byteBufLen = ExecutionAttemptID.getByteBufLength();
final ByteBuf byteBuf = ALLOCATOR.directBuffer(byteBufLen, byteBufLen);
executionAttemptID.writeTo(byteBuf);
Expand Down
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import java.util.function.Function;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.junit.Assert.assertEquals;

/** Tests for {@link ExecutionGraphResultPartitionAvailabilityChecker}. */
Expand Down Expand Up @@ -63,7 +64,7 @@ public void testPartitionAvailabilityCheck() {
final Function<IntermediateResultPartitionID, ResultPartitionID> partitionIDMapper =
intermediateResultPartitionID ->
new ResultPartitionID(
intermediateResultPartitionID, new ExecutionAttemptID());
intermediateResultPartitionID, createExecutionAttemptId());

final ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker =
new ExecutionGraphResultPartitionAvailabilityChecker(
Expand Down
Expand Up @@ -438,10 +438,13 @@ public static ExecutionAttemptID createExecutionAttemptId(JobVertexID jobVertexI

public static ExecutionAttemptID createExecutionAttemptId(
JobVertexID jobVertexId, int subtaskIndex, int attemptNumber) {
return new ExecutionAttemptID(
new ExecutionGraphID(),
new ExecutionVertexID(jobVertexId, subtaskIndex),
attemptNumber);
return createExecutionAttemptId(
new ExecutionVertexID(jobVertexId, subtaskIndex), attemptNumber);
}

public static ExecutionAttemptID createExecutionAttemptId(
ExecutionVertexID executionVertexId, int attemptNumber) {
return new ExecutionAttemptID(new ExecutionGraphID(), executionVertexId, attemptNumber);
}

// ------------------------------------------------------------------------
Expand Down
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.executiongraph.failover.flip1;

import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
Expand All @@ -37,6 +36,7 @@
import java.util.Iterator;
import java.util.stream.Stream;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;

Expand Down Expand Up @@ -353,7 +353,10 @@ private VerificationContext partitionConnectionCause(
SchedulingResultPartition failedProducer) {
return cause(
new PartitionConnectionException(
new ResultPartitionID(failedProducer.getId(), new ExecutionAttemptID()),
new ResultPartitionID(
failedProducer.getId(),
createExecutionAttemptId(
failedProducer.getProducer().getId(), 0)),
new Exception("Test failure")));
}

Expand Down
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -156,8 +157,8 @@ public void testDirectoryDownloadedFromDFS() throws Exception {
@Test
public void testDirectoryCleanUp() throws Exception {
JobID jobID = new JobID();
ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
ExecutionAttemptID attemptID1 = createExecutionAttemptId();
ExecutionAttemptID attemptID2 = createExecutionAttemptId();

final String fileName = "test_file";
// copy / create the file
Expand Down Expand Up @@ -215,7 +216,7 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
private void testDirectoryDownloaded(DistributedCache.DistributedCacheEntry entry)
throws Exception {
JobID jobID = new JobID();
ExecutionAttemptID attemptID = new ExecutionAttemptID();
ExecutionAttemptID attemptID = createExecutionAttemptId();

// copy / create the file
final String fileName = "test_file";
Expand Down
Expand Up @@ -41,6 +41,7 @@
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Future;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -104,7 +105,7 @@ public void shutdown() {
@Test
public void testFileDownloadedFromBlob() throws Exception {
JobID jobID = new JobID();
ExecutionAttemptID attemptID = new ExecutionAttemptID();
ExecutionAttemptID attemptID = createExecutionAttemptId();

final String fileName = "test_file";
// copy / create the file
Expand Down
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
Expand Down Expand Up @@ -166,7 +165,7 @@ public void testRegisteringDebloatingMetrics() throws IOException {
.build();
shuffleEnvironment.createInputGates(
shuffleEnvironment.createShuffleIOOwnerContext(
"test", new ExecutionAttemptID(), taskMetricGroup),
"test", createExecutionAttemptId(), taskMetricGroup),
(dsid, id, consumer) -> {},
Arrays.asList(
new InputGateDeploymentDescriptor(
Expand Down
Expand Up @@ -88,6 +88,7 @@
import static java.util.Arrays.asList;
import static org.apache.flink.runtime.checkpoint.CheckpointOptions.alignedNoTimeout;
import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createLocalInputChannel;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createResultSubpartitionView;
Expand Down Expand Up @@ -1182,7 +1183,7 @@ static SingleInputGate createSingleInputGate(
createRemoteWithIdAndLocation(partitionIds[1], ResourceID.generate()),
// Unknown
new UnknownShuffleDescriptor(
new ResultPartitionID(partitionIds[2], new ExecutionAttemptID()))
new ResultPartitionID(partitionIds[2], createExecutionAttemptId()))
};

InputGateDeploymentDescriptor gateDesc =
Expand Down Expand Up @@ -1236,7 +1237,7 @@ private static Map<InputGateID, SingleInputGate> createInputGateWithLocalChannel
ids[i], ResultPartitionType.PIPELINED, 0, channelDescs);
}

ExecutionAttemptID consumerID = new ExecutionAttemptID();
ExecutionAttemptID consumerID = createExecutionAttemptId();
SingleInputGate[] gates =
network.createInputGates(
network.createShuffleIOOwnerContext(
Expand Down

0 comments on commit 014e8e0

Please sign in to comment.