Skip to content

Commit

Permalink
[FLINK-12203] Refactor ResultPartitionManager to break tie with Task
Browse files Browse the repository at this point in the history
At the moment, we have ResultPartitionManager.releasePartitionsProducedBy which uses indexing by task in network environment. These methods are eventually used only by Task which already knows its partitions so Task can use ResultPartition.fail(cause) and TaskExecutor.failPartition could directly use NetworkEnviroment.releasePartitions(Collection<ResultPartitionID>). This also requires that JM Execution sends produced partition ids instead of just ExecutionAttemptID.
  • Loading branch information
azagrebin committed Apr 18, 2019
1 parent 1cf78b6 commit 806b67c
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ else if (current == RUNNING || current == DEPLOYING) {
else if (current == FINISHED || current == FAILED) {
// nothing to do any more. finished failed before it could be cancelled.
// in any case, the task is removed from the TaskManager already
sendFailIntermediateResultPartitionsRpcCall();
sendReleaseIntermediateResultPartitionsRpcCall();

return;
}
Expand Down Expand Up @@ -752,7 +752,7 @@ public CompletableFuture<?> suspend() {
break;
case FINISHED:
case FAILED:
sendFailIntermediateResultPartitionsRpcCall();
sendReleaseIntermediateResultPartitionsRpcCall();
break;
case CANCELED:
break;
Expand Down Expand Up @@ -1294,14 +1294,23 @@ private void sendCancelRpcCall(int numberRetries) {
}
}

private void sendFailIntermediateResultPartitionsRpcCall() {
private void sendReleaseIntermediateResultPartitionsRpcCall() {
final LogicalSlot slot = assignedResource;
LOG.info("Discarding the results produced by task execution {}.", attemptId);

if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

// TODO For some tests this could be a problem when querying too early if all resources were released
taskManagerGateway.failPartition(attemptId);
Collection<IntermediateResultPartition> partitions = vertex.getProducedPartitions().values();
Collection<ResultPartitionID> partitionIds = new ArrayList<>(partitions.size());
for (IntermediateResultPartition partition : partitions) {
partitionIds.add(new ResultPartitionID(partition.getPartitionId(), attemptId));
}

if (!partitionIds.isEmpty()) {
// TODO For some tests this could be a problem when querying too early if all resources were released
taskManagerGateway.releasePartitions(partitionIds);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
Expand All @@ -36,6 +37,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -185,6 +187,17 @@ public void setupInputGate(SingleInputGate gate) throws IOException {
}
}

/**
* Batch release intermediate result partitions.
*
* @param partitionIds partition ids to release
*/
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
for (ResultPartitionID partitionId : partitionIds) {
resultPartitionManager.releasePartitionsProducedBy(partitionId, null);
}
}

public void start() throws IOException {
synchronized (lock) {
Preconditions.checkState(!isShutdown, "The NetworkEnvironment has already been shut down.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public void close() {
}

public void fail(@Nullable Throwable throwable) {
partitionManager.releasePartitionsProducedBy(partitionId.getProducerId(), throwable);
partitionManager.releasePartitionsProducedBy(partitionId, throwable);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,11 @@

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;

import org.apache.flink.shaded.guava18.com.google.common.collect.HashBasedTable;
import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
import org.apache.flink.shaded.guava18.com.google.common.collect.Table;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.util.Preconditions.checkState;
Expand All @@ -41,19 +35,15 @@ public class ResultPartitionManager implements ResultPartitionProvider {

private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionManager.class);

public final Table<ExecutionAttemptID, IntermediateResultPartitionID, ResultPartition>
registeredPartitions = HashBasedTable.create();
private final Map<ResultPartitionID, ResultPartition> registeredPartitions = new HashMap<>();

private boolean isShutdown;

public void registerResultPartition(ResultPartition partition) throws IOException {
public void registerResultPartition(ResultPartition partition) {
synchronized (registeredPartitions) {
checkState(!isShutdown, "Result partition manager already shut down.");

ResultPartitionID partitionId = partition.getPartitionId();

ResultPartition previous = registeredPartitions.put(
partitionId.getProducerId(), partitionId.getPartitionId(), partition);
ResultPartition previous = registeredPartitions.put(partition.getPartitionId(), partition);

if (previous != null) {
throw new IllegalStateException("Result partition already registered.");
Expand All @@ -70,8 +60,7 @@ public ResultSubpartitionView createSubpartitionView(
BufferAvailabilityListener availabilityListener) throws IOException {

synchronized (registeredPartitions) {
final ResultPartition partition = registeredPartitions.get(partitionId.getProducerId(),
partitionId.getPartitionId());
final ResultPartition partition = registeredPartitions.get(partitionId);

if (partition == null) {
throw new PartitionNotFoundException(partitionId);
Expand All @@ -83,26 +72,14 @@ public ResultSubpartitionView createSubpartitionView(
}
}

public void releasePartitionsProducedBy(ExecutionAttemptID executionId) {
releasePartitionsProducedBy(executionId, null);
}

public void releasePartitionsProducedBy(ExecutionAttemptID executionId, Throwable cause) {
public void releasePartitionsProducedBy(ResultPartitionID partitionId, Throwable cause) {
synchronized (registeredPartitions) {
final Map<IntermediateResultPartitionID, ResultPartition> partitions =
registeredPartitions.row(executionId);

for (ResultPartition partition : partitions.values()) {
partition.release(cause);
if (registeredPartitions.containsKey(partitionId)) {
registeredPartitions.get(partitionId).release(cause);
registeredPartitions.remove(partitionId);
LOG.debug("Released partition {} produced by {}.",
partitionId.getPartitionId(), partitionId.getPartitionId());
}

for (IntermediateResultPartitionID partitionId : ImmutableList
.copyOf(partitions.keySet())) {

registeredPartitions.remove(executionId, partitionId);
}

LOG.debug("Released all partitions produced by {}.", executionId);
}
}

Expand Down Expand Up @@ -134,10 +111,7 @@ void onConsumedPartition(ResultPartition partition) {
LOG.debug("Received consume notification from {}.", partition);

synchronized (registeredPartitions) {
ResultPartitionID partitionId = partition.getPartitionId();

previous = registeredPartitions.remove(partitionId.getProducerId(),
partitionId.getPartitionId());
previous = registeredPartitions.remove(partition.getPartitionId());
}

// Release the partition if it was successfully removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleMessages;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
Expand All @@ -37,6 +38,7 @@
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.apache.flink.util.Preconditions;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

import scala.concurrent.duration.FiniteDuration;
Expand Down Expand Up @@ -149,10 +151,10 @@ public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID execut
}

@Override
public void failPartition(ExecutionAttemptID executionAttemptID) {
Preconditions.checkNotNull(executionAttemptID);
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
Preconditions.checkNotNull(partitionIds);

actorGateway.tell(new TaskMessages.FailIntermediateResultPartitions(executionAttemptID));
actorGateway.tell(new TaskMessages.ReleaseIntermediateResultPartitions(partitionIds));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.rpc.RpcTimeout;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -109,11 +111,11 @@ CompletableFuture<Acknowledge> updatePartitions(
Time timeout);

/**
* Fail all intermediate result partitions of the given task.
* Batch release intermediate result partitions.
*
* @param executionAttemptID identifying the task
* @param partitionIds partition ids to release
*/
void failPartition(ExecutionAttemptID executionAttemptID);
void releasePartitions(Collection<ResultPartitionID> partitionIds);

/**
* Notify the given task about a completed checkpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.util.Preconditions;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -91,8 +93,8 @@ public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID execut
}

@Override
public void failPartition(ExecutionAttemptID executionAttemptID) {
taskExecutorGateway.failPartition(executionAttemptID);
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
taskExecutorGateway.releasePartitions(partitionIds);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
Expand Down Expand Up @@ -667,11 +668,9 @@ public CompletableFuture<Acknowledge> updatePartitions(
}

@Override
public void failPartition(ExecutionAttemptID executionAttemptID) {
log.info("Discarding the results produced by task execution {}.", executionAttemptID);

public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
try {
networkEnvironment.getResultPartitionManager().releasePartitionsProducedBy(executionAttemptID);
networkEnvironment.releasePartitions(partitionIds);
} catch (Throwable t) {
// TODO: Do we still need this catch branch?
onFatalError(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
Expand All @@ -38,6 +39,7 @@
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.types.SerializableOptional;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -99,11 +101,11 @@ CompletableFuture<Acknowledge> updatePartitions(
@RpcTimeout Time timeout);

/**
* Fail all intermediate result partitions of the given task.
* Batch release intermediate result partitions.
*
* @param executionAttemptID identifying the task
* @param partitionIds partition ids to release
*/
void failPartition(ExecutionAttemptID executionAttemptID);
void releasePartitions(Collection<ResultPartitionID> partitionIds);

/**
* Trigger the checkpoint for the given task. The checkpoint is identified by the checkpoint ID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util

import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, PartitionInfo}
import org.apache.flink.runtime.io.network.partition.ResultPartitionID
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
import org.apache.flink.runtime.taskmanager.TaskExecutionState

Expand Down Expand Up @@ -121,12 +122,11 @@ object TaskMessages {
extends UpdatePartitionInfo

/**
* Fails (and releases) all intermediate result partitions identified by
* [[executionID]] from the task manager.
*
* @param executionID The task's execution attempt ID.
*/
case class FailIntermediateResultPartitions(executionID: ExecutionAttemptID)
* Batch release intermediate result partitions.
*
* @param partitionIds partition ids to release
*/
case class ReleaseIntermediateResultPartitions(partitionIds: util.Collection[ResultPartitionID])
extends TaskMessage with RequiresLeaderSessionID


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
import org.apache.flink.runtime.messages.TaskMessages.ReleaseIntermediateResultPartitions;
import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
Expand Down Expand Up @@ -511,7 +511,7 @@ public Object handleMessage(Object message) {
return Acknowledge.get();
} else if(message instanceof CancelTask) {
return Acknowledge.get();
} else if(message instanceof FailIntermediateResultPartitions) {
} else if(message instanceof ReleaseIntermediateResultPartitions) {
return new Object();
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;

import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -100,7 +102,9 @@ public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID execut
}

@Override
public void failPartition(ExecutionAttemptID executionAttemptID) {}
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {

}

@Override
public void notifyCheckpointComplete(
Expand Down
Loading

0 comments on commit 806b67c

Please sign in to comment.