Skip to content

Commit

Permalink
[FLINK-21915] Optimize Execution#finishPartitionsAndUpdateConsumers
Browse files Browse the repository at this point in the history
  • Loading branch information
Thesharing committed Mar 26, 2021
1 parent 9c95cc1 commit 0e09d89
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 21 deletions.
Expand Up @@ -68,6 +68,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -694,21 +696,13 @@ public CompletableFuture<?> suspend() {
return releaseFuture;
}

private void updatePartitionConsumers(final IntermediateResultPartition partition) {
private void updatePartitionConsumers(
final Collection<IntermediateResultPartition> partitions,
final ConsumerVertexGroup allConsumers) {

final List<ConsumerVertexGroup> allConsumers = partition.getConsumers();
List<PartitionInfo> partitionInfos = null;

if (allConsumers.size() == 0) {
return;
}
if (allConsumers.size() > 1) {
fail(
new IllegalStateException(
"Currently, only a single consumer group per partition is supported."));
return;
}

for (ExecutionVertexID consumerVertexId : allConsumers.get(0)) {
for (ExecutionVertexID consumerVertexId : allConsumers) {
final ExecutionVertex consumerVertex =
vertex.getExecutionGraphAccessor().getExecutionVertexOrThrow(consumerVertexId);
final Execution consumer = consumerVertex.getCurrentExecutionAttempt();
Expand All @@ -720,12 +714,17 @@ private void updatePartitionConsumers(final IntermediateResultPartition partitio
// sent after switching to running
// ----------------------------------------------------------------
if (consumerState == DEPLOYING || consumerState == RUNNING) {
final PartitionInfo partitionInfo = createPartitionInfo(partition);
if (partitionInfos == null) {
partitionInfos =
partitions.stream()
.map(Execution::createPartitionInfo)
.collect(Collectors.toList());
}

if (consumerState == DEPLOYING) {
consumerVertex.cachePartitionInfo(partitionInfo);
consumerVertex.cachePartitionInfo(partitionInfos);
} else {
consumer.sendUpdatePartitionInfoRpcCall(Collections.singleton(partitionInfo));
consumer.sendUpdatePartitionInfoRpcCall(partitionInfos);
}
}
}
Expand Down Expand Up @@ -943,14 +942,25 @@ private void finishPartitionsAndUpdateConsumers() {
return;
}

final Map<ConsumerVertexGroup, Set<IntermediateResultPartition>> partitionConsumers =
new HashMap<>();

for (IntermediateResultPartition finishedPartition : newlyFinishedResults) {
final IntermediateResultPartition[] allPartitionsOfNewlyFinishedResults =
finishedPartition.getIntermediateResult().getPartitions();

for (IntermediateResultPartition partition : allPartitionsOfNewlyFinishedResults) {
updatePartitionConsumers(partition);
for (ConsumerVertexGroup consumerVertexGroup : partition.getConsumers()) {
partitionConsumers
.computeIfAbsent(consumerVertexGroup, group -> new HashSet<>())
.add(partition);
}
}
}
for (Map.Entry<ConsumerVertexGroup, Set<IntermediateResultPartition>> entry :
partitionConsumers.entrySet()) {
updatePartitionConsumers(entry.getValue(), entry.getKey());
}
}

private boolean cancelAtomically() {
Expand Down Expand Up @@ -1025,8 +1035,8 @@ private void finishCancellation(boolean releasePartitions) {
handlePartitionCleanup(releasePartitions, releasePartitions);
}

void cachePartitionInfo(PartitionInfo partitionInfo) {
partitionInfos.add(partitionInfo);
void cachePartitionInfo(Collection<PartitionInfo> partitionInfos) {
this.partitionInfos.addAll(partitionInfos);
}

private void sendPartitionInfos() {
Expand Down
Expand Up @@ -42,6 +42,7 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -486,8 +487,8 @@ void notifyPartitionDataAvailable(ResultPartitionID partitionId) {
partition.markDataProduced();
}

void cachePartitionInfo(PartitionInfo partitionInfo) {
getCurrentExecutionAttempt().cachePartitionInfo(partitionInfo);
void cachePartitionInfo(Collection<PartitionInfo> partitionInfos) {
getCurrentExecutionAttempt().cachePartitionInfo(partitionInfos);
}

/** Returns all blocking result partitions whose receivers can be scheduled/updated. */
Expand Down

0 comments on commit 0e09d89

Please sign in to comment.