Skip to content

Commit

Permalink
[FLINK-12667][runtime] Add JobID to TaskExecutorGateway#releasePartit…
Browse files Browse the repository at this point in the history
…ions
  • Loading branch information
zentol committed Jun 13, 2019
1 parent dfa0f61 commit b86ba3b
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 9 deletions.
Expand Up @@ -1329,7 +1329,7 @@ private void sendReleaseIntermediateResultPartitionsRpcCall() {

if (!partitionIds.isEmpty()) {
// TODO For some tests this could be a problem when querying too early if all resources were released
taskManagerGateway.releasePartitions(partitionIds);
taskManagerGateway.releasePartitions(getVertex().getJobId(), partitionIds);
}
}
}
Expand Down
Expand Up @@ -102,9 +102,10 @@ CompletableFuture<Acknowledge> updatePartitions(
/**
* Batch release intermediate result partitions.
*
* @param jobId id of the job that the partitions belong to
* @param partitionIds partition ids to release
*/
void releasePartitions(Collection<ResultPartitionID> partitionIds);
void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds);

/**
* Notify the given task about a completed checkpoint.
Expand Down
Expand Up @@ -88,8 +88,8 @@ public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID execut
}

@Override
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
taskExecutorGateway.releasePartitions(partitionIds);
public void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds) {
taskExecutorGateway.releasePartitions(jobId, partitionIds);
}

@Override
Expand Down
Expand Up @@ -640,7 +640,7 @@ public CompletableFuture<Acknowledge> updatePartitions(
}

@Override
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
public void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds) {
try {
shuffleEnvironment.releasePartitions(partitionIds);
} catch (Throwable t) {
Expand Down
Expand Up @@ -103,9 +103,10 @@ CompletableFuture<Acknowledge> updatePartitions(
/**
* Batch release intermediate result partitions.
*
* @param jobId id of the job that the partitions belong to
* @param partitionIds partition ids to release
*/
void releasePartitions(Collection<ResultPartitionID> partitionIds);
void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds);

/**
* Trigger the checkpoint for the given task. The checkpoint is identified by the checkpoint ID
Expand Down
Expand Up @@ -19,12 +19,16 @@
package org.apache.flink.runtime.executiongraph;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
Expand Down Expand Up @@ -54,6 +58,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -64,6 +69,7 @@
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -293,6 +299,82 @@ public void testSlotAllocationCancellationWhenExecutionCancelled() throws Except
assertThat(canceledSlotRequests, equalTo(slotRequests));
}

/**
* Tests that the partitions are released in case of an execution cancellation after the execution is already finished.
*/
@Test
public void testPartitionReleaseOnCancelingAfterBeingFinished() throws Exception {
testPartitionReleaseAfterFinished(Execution::cancel);
}

/**
* Tests that the partitions are released in case of an execution suspension after the execution is already finished.
*/
@Test
public void testPartitionReleaseOnSuspendingAfterBeingFinished() throws Exception {
testPartitionReleaseAfterFinished(Execution::suspend);
}

private void testPartitionReleaseAfterFinished(Consumer<Execution> postFinishedExecutionAction) throws Exception {
final Tuple2<JobID, Collection<ResultPartitionID>> releasedPartitions = Tuple2.of(null, null);
final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
taskManagerGateway.setReleasePartitionsConsumer(releasedPartitions::setFields);

final JobVertex producerVertex = createNoOpJobVertex();
final JobVertex consumerVertex = createNoOpJobVertex();
consumerVertex.connectNewDataSetAsInput(producerVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);

final SimpleSlot slot = new SimpleSlot(
new SingleSlotTestingSlotOwner(),
new LocalTaskManagerLocation(),
0,
taskManagerGateway);

final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
slotProvider.addSlot(producerVertex.getID(), 0, CompletableFuture.completedFuture(slot));

ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
new JobID(),
slotProvider,
new NoRestartStrategy(),
producerVertex,
consumerVertex);

executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());

ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID());
ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];

final Execution execution = executionVertex.getCurrentExecutionAttempt();

execution.allocateResourcesForExecution(
slotProvider,
false,
LocationPreferenceConstraint.ALL,
Collections.emptySet(),
TestingUtils.infiniteTime());

execution.deploy();
execution.switchToRunning();

// simulate a case where a cancel/suspend call is too slow and the task is already finished
// in this case we have to explicitly release the finished partition
// if the task were canceled properly the TM would release the partition automatically
execution.markFinished();
postFinishedExecutionAction.accept(execution);

assertEquals(executionGraph.getJobID(), releasedPartitions.f0);
assertEquals(executionVertex.getProducedPartitions().size(), releasedPartitions.f1.size());
for (ResultPartitionID partitionId : releasedPartitions.f1) {
// ensure all IDs of released partitions are actually valid
IntermediateResultPartition intermediateResultPartition = executionVertex
.getProducedPartitions()
.get(partitionId.getPartitionId());
assertNotNull(intermediateResultPartition);
assertEquals(execution.getAttemptId(), partitionId.getProducerId());
}
}

/**
* Tests that all preferred locations are calculated.
*/
Expand Down Expand Up @@ -417,7 +499,7 @@ public void testTaskRestoreStateIsNulledAfterDeployment() throws Exception {
slotProvider,
new NoRestartStrategy(),
jobVertex);

ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);

ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
Expand Down
Expand Up @@ -34,6 +34,7 @@
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;

Expand All @@ -51,6 +52,8 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {

private volatile BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> freeSlotFunction;

private BiConsumer<JobID, Collection<ResultPartitionID>> releasePartitionsConsumer = (ignore1, ignore2) -> { };

public void setSubmitConsumer(Consumer<TaskDeploymentDescriptor> predicate) {
submitConsumer = predicate;
}
Expand All @@ -63,6 +66,10 @@ public void setFreeSlotFunction(BiFunction<AllocationID, Throwable, CompletableF
this.freeSlotFunction = freeSlotFunction;
}

public void setReleasePartitionsConsumer(BiConsumer<JobID, Collection<ResultPartitionID>> releasePartitionsConsumer) {
this.releasePartitionsConsumer = releasePartitionsConsumer;
}

@Override
public String getAddress() {
return address;
Expand Down Expand Up @@ -97,7 +104,8 @@ public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID execut
}

@Override
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
public void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds) {
releasePartitionsConsumer.accept(jobId, partitionIds);
}

@Override
Expand Down
Expand Up @@ -125,7 +125,7 @@ public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID execut
}

@Override
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
public void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds) {
// noop
}

Expand Down

0 comments on commit b86ba3b

Please sign in to comment.