Skip to content
Permalink
Browse files
[FLINK-17295][runtime] Remove unnecessary fields from Execution and A…
…rchivedExecution
  • Loading branch information
zhuzhurk committed May 20, 2022
1 parent bccecc2 commit 579ac0f55422d7ef3215460c69d006acc48ab8b7
Showing 6 changed files with 16 additions and 36 deletions.
@@ -37,8 +37,6 @@ public class ArchivedExecution implements AccessExecution, Serializable {

private final long[] stateTimestamps;

private final int attemptNumber;

private final ExecutionState state;

@Nullable private final ErrorInfo failureInfo; // once assigned, never changes
@@ -50,44 +48,36 @@ public class ArchivedExecution implements AccessExecution, Serializable {
/* Continuously updated map of user-defined accumulators */
private final StringifiedAccumulatorResult[] userAccumulators;

private final int parallelSubtaskIndex;

private final IOMetrics ioMetrics;

public ArchivedExecution(Execution execution) {
this(
execution.getUserAccumulatorsStringified(),
execution.getIOMetrics(),
execution.getAttemptId(),
execution.getAttemptNumber(),
execution.getState(),
execution.getFailureInfo().orElse(null),
execution.getAssignedResourceLocation(),
execution.getAssignedAllocationID(),
execution.getVertex().getParallelSubtaskIndex(),
execution.getStateTimestamps());
}

public ArchivedExecution(
StringifiedAccumulatorResult[] userAccumulators,
IOMetrics ioMetrics,
ExecutionAttemptID attemptId,
int attemptNumber,
ExecutionState state,
@Nullable ErrorInfo failureCause,
TaskManagerLocation assignedResourceLocation,
AllocationID assignedAllocationID,
int parallelSubtaskIndex,
long[] stateTimestamps) {
this.userAccumulators = userAccumulators;
this.ioMetrics = ioMetrics;
this.failureInfo = failureCause;
this.assignedResourceLocation = assignedResourceLocation;
this.attemptNumber = attemptNumber;
this.attemptId = attemptId;
this.state = state;
this.stateTimestamps = stateTimestamps;
this.parallelSubtaskIndex = parallelSubtaskIndex;
this.assignedAllocationID = assignedAllocationID;
}

@@ -102,7 +92,7 @@ public ExecutionAttemptID getAttemptId() {

@Override
public int getAttemptNumber() {
return attemptNumber;
return attemptId.getAttemptNumber();
}

@Override
@@ -141,7 +131,7 @@ public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {

@Override
public int getParallelSubtaskIndex() {
return parallelSubtaskIndex;
return attemptId.getSubtaskIndex();
}

@Override
@@ -133,8 +133,6 @@
*/
private final long[] stateTimestamps;

private final int attemptNumber;

private final Time rpcTimeout;

private final Collection<PartitionInfo> partitionInfos;
@@ -212,8 +210,6 @@ public Execution(
attemptNumber);
this.rpcTimeout = checkNotNull(rpcTimeout);

this.attemptNumber = attemptNumber;

this.stateTimestamps = new long[ExecutionState.values().length];
markTimestamp(CREATED, startTimestamp);

@@ -242,7 +238,7 @@ public ExecutionAttemptID getAttemptId() {

@Override
public int getAttemptNumber() {
return attemptNumber;
return attemptId.getAttemptNumber();
}

@Override
@@ -550,7 +546,7 @@ public void deploy() throws JobException {
LOG.info(
"Deploying {} (attempt #{}) with attempt id {} and vertex id {} to {} with allocation id {}",
vertex.getTaskNameWithSubtaskIndex(),
attemptNumber,
getAttemptNumber(),
vertex.getCurrentExecutionAttempt().getAttemptId(),
vertex.getID(),
getAssignedResourceLocation(),
@@ -1467,7 +1463,7 @@ private void markTimestamp(ExecutionState state, long timestamp) {
}

public String getVertexWithAttempt() {
return vertex.getTaskNameWithSubtaskIndex() + " - execution #" + attemptNumber;
return vertex.getTaskNameWithSubtaskIndex() + " - execution #" + getAttemptNumber();
}

// ------------------------------------------------------------------------
@@ -1536,7 +1532,7 @@ public String toString() {

return String.format(
"Attempt #%d (%s) @ %s - [%s]",
attemptNumber,
getAttemptNumber(),
vertex.getTaskNameWithSubtaskIndex(),
(slot == null ? "(unassigned)" : slot),
state);
@@ -27,7 +27,6 @@
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
@@ -70,6 +69,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.collection.IsEmptyCollection.empty;
@@ -345,15 +345,13 @@ private static ArchivedExecutionJobVertex createArchivedExecutionJobVertex(
new ArchivedExecution(
new StringifiedAccumulatorResult[0],
null,
new ExecutionAttemptID(),
attempt,
createExecutionAttemptId(jobVertexID, subtaskIndex, attempt),
expectedState,
new ErrorInfo(
new RuntimeException("error"),
System.currentTimeMillis()),
assignedResourceLocation,
allocationID,
subtaskIndex,
timestamps),
new EvictingBoundedList<>(0))
},
@@ -27,7 +27,6 @@
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -52,6 +51,7 @@
import java.util.Collections;
import java.util.HashMap;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.junit.Assert.assertEquals;

/** Tests of {@link SubtaskCurrentAttemptDetailsHandler}. */
@@ -90,13 +90,11 @@ public void testHandleRequest() throws Exception {
new ArchivedExecution(
new StringifiedAccumulatorResult[0],
ioMetrics,
new ExecutionAttemptID(),
attempt,
createExecutionAttemptId(jobVertexID, subtaskIndex, attempt),
expectedState,
null,
assignedResourceLocation,
allocationID,
subtaskIndex,
timestamps);

final ArchivedExecutionVertex executionVertex =
@@ -26,7 +26,7 @@
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
@@ -47,6 +47,7 @@
import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.junit.Assert.assertEquals;

/** Tests of {@link SubtaskExecutionAttemptAccumulatorsHandler}. */
@@ -93,13 +94,11 @@ public void testHandleRequest() throws Exception {
new ArchivedExecution(
accumulatorResults,
null,
new ExecutionAttemptID(),
attemptNum,
createExecutionAttemptId(new JobVertexID(), subtaskIndex, attemptNum),
ExecutionState.FINISHED,
null,
null,
null,
subtaskIndex,
new long[ExecutionState.values().length]);

// Invoke tested method.
@@ -28,7 +28,6 @@
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -54,6 +53,7 @@
import java.util.Collections;
import java.util.HashMap;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.junit.Assert.assertEquals;

/** Tests of {@link SubtaskExecutionAttemptDetailsHandler}. */
@@ -90,13 +90,12 @@ public void testHandleRequest() throws Exception {
new ArchivedExecution(
emptyAccumulators,
ioMetrics,
new ExecutionAttemptID(),
attempt,
createExecutionAttemptId(
jobVertexId, subtaskIndex, attempt),
expectedState,
null,
null,
null,
subtaskIndex,
new long[ExecutionState.values().length]),
new EvictingBoundedList<>(0))
},

0 comments on commit 579ac0f

Please sign in to comment.